-----BEGIN PGP SIGNATURE-----

iQEcBAABAgAGBQJYfMzsAAoJEJykq7OBq3PIcNcH/inAc66xT3mWxW2zpgvS+oM7
 wHzCNuGpeNstZ6BAsadUgkzJjbvLYSB3e2Ibgr3srBc+QIY1heBSpZO5hhtm07h9
 peNr4wri5fpOTqzfpYxZyPx/bVv7xpnmmBiFwqxgwPBWaP/wwYwELrokdLiZE51M
 PxFwaIR2Eo4EKfMdxOV7uKkE35F90RytnEJsMP/QvroX1SiyTajjocWXfhLLmoqL
 vFWOX3aCZWEugFq3M1KKJOjrVhHgVs/eT6Xs2wB/7UNdg8qLnd+2C8RjxhPEuU3+
 V7AL0L82Cb4x9JslmdTKtyFhQPwsdg/iE8Tsu/pUraNnTq6KSesFk7rxMC+uhw4=
 =rI8q
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-request' into staging

# gpg: Signature made Mon 16 Jan 2017 13:38:52 GMT
# gpg:                using RSA key 0x9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>"
# gpg:                 aka "Stefan Hajnoczi <stefanha@gmail.com>"
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35  775A 9CA4 ABB3 81AB 73C8

* remotes/stefanha/tags/block-pull-request:
  async: optimize aio_bh_poll
  aio: document locking
  aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
  aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  aio: tweak walking in dispatch phase
  aio-posix: split aio_dispatch_handlers out of aio_dispatch
  qemu-thread: optimize QemuLockCnt with futexes on Linux
  aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  qemu-thread: introduce QemuLockCnt
  aio: rename bh_lock to list_lock
  block: get rid of bdrv_io_unplugged_begin/end

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2017-01-17 11:20:27 +00:00
commit 02b351d846
16 changed files with 1025 additions and 218 deletions

View File

@ -16,7 +16,7 @@
#include "qemu/osdep.h"
#include "qemu-common.h"
#include "block/block.h"
#include "qemu/queue.h"
#include "qemu/rcu_queue.h"
#include "qemu/sockets.h"
#include "qemu/cutils.h"
#include "trace.h"
@ -66,7 +66,7 @@ static bool aio_epoll_try_enable(AioContext *ctx)
AioHandler *node;
struct epoll_event event;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
int r;
if (node->deleted || !node->pfd.events) {
continue;
@ -212,24 +212,27 @@ void aio_set_fd_handler(AioContext *ctx,
bool is_new = false;
bool deleted = false;
qemu_lockcnt_lock(&ctx->list_lock);
node = find_aio_handler(ctx, fd);
/* Are we deleting the fd handler? */
if (!io_read && !io_write && !io_poll) {
if (node == NULL) {
qemu_lockcnt_unlock(&ctx->list_lock);
return;
}
g_source_remove_poll(&ctx->source, &node->pfd);
/* If the lock is held, just mark the node as deleted */
if (ctx->walking_handlers) {
if (qemu_lockcnt_count(&ctx->list_lock)) {
node->deleted = 1;
node->pfd.revents = 0;
} else {
/* Otherwise, delete it for real. We can't just mark it as
* deleted because deleted nodes are only cleaned up after
* releasing the walking_handlers lock.
* deleted because deleted nodes are only cleaned up while
* no one is walking the handlers list.
*/
QLIST_REMOVE(node, node);
deleted = true;
@ -243,7 +246,7 @@ void aio_set_fd_handler(AioContext *ctx,
/* Alloc and insert if it's not already there */
node = g_new0(AioHandler, 1);
node->pfd.fd = fd;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
g_source_add_poll(&ctx->source, &node->pfd);
is_new = true;
@ -265,6 +268,7 @@ void aio_set_fd_handler(AioContext *ctx,
}
aio_epoll_update(ctx, node, is_new);
qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
if (deleted) {
@ -316,8 +320,8 @@ static void poll_set_started(AioContext *ctx, bool started)
ctx->poll_started = started;
ctx->walking_handlers++;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
qemu_lockcnt_inc(&ctx->list_lock);
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
IOHandler *fn;
if (node->deleted) {
@ -334,7 +338,7 @@ static void poll_set_started(AioContext *ctx, bool started)
fn(node->opaque);
}
}
ctx->walking_handlers--;
qemu_lockcnt_dec(&ctx->list_lock);
}
@ -349,54 +353,47 @@ bool aio_prepare(AioContext *ctx)
bool aio_pending(AioContext *ctx)
{
AioHandler *node;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
int revents;
revents = node->pfd.revents & node->pfd.events;
if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
aio_node_check(ctx, node->is_external)) {
return true;
}
if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
aio_node_check(ctx, node->is_external)) {
return true;
}
}
return false;
}
/*
* Note that dispatch_fds == false has the side-effect of post-poning the
* freeing of deleted handlers.
*/
bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
{
AioHandler *node = NULL;
bool progress = false;
/*
* If there are callbacks left that have been queued, we need to call them.
* Do not call select in this case, because it is possible that the caller
* does not need a complete flush (as is the case for aio_poll loops).
*/
if (aio_bh_poll(ctx)) {
progress = true;
}
bool result = false;
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
if (dispatch_fds) {
node = QLIST_FIRST(&ctx->aio_handlers);
}
while (node) {
AioHandler *tmp;
qemu_lockcnt_inc(&ctx->list_lock);
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
int revents;
ctx->walking_handlers++;
revents = node->pfd.revents & node->pfd.events;
if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
aio_node_check(ctx, node->is_external)) {
result = true;
break;
}
if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
aio_node_check(ctx, node->is_external)) {
result = true;
break;
}
}
qemu_lockcnt_dec(&ctx->list_lock);
return result;
}
static bool aio_dispatch_handlers(AioContext *ctx)
{
AioHandler *node, *tmp;
bool progress = false;
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
qemu_lockcnt_inc(&ctx->list_lock);
QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
int revents;
revents = node->pfd.revents & node->pfd.events;
node->pfd.revents = 0;
@ -420,17 +417,38 @@ bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
progress = true;
}
tmp = node;
node = QLIST_NEXT(node, node);
ctx->walking_handlers--;
if (!ctx->walking_handlers && tmp->deleted) {
QLIST_REMOVE(tmp, node);
g_free(tmp);
if (node->deleted) {
if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
QLIST_REMOVE(node, node);
g_free(node);
qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
}
}
}
qemu_lockcnt_dec(&ctx->list_lock);
return progress;
}
/*
* Note that dispatch_fds == false has the side-effect of post-poning the
* freeing of deleted handlers.
*/
bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
{
bool progress;
/*
* If there are callbacks left that have been queued, we need to call them.
* Do not call select in this case, because it is possible that the caller
* does not need a complete flush (as is the case for aio_poll loops).
*/
progress = aio_bh_poll(ctx);
if (dispatch_fds) {
progress |= aio_dispatch_handlers(ctx);
}
/* Run our timers */
progress |= timerlistgroup_run_timers(&ctx->tlg);
@ -488,7 +506,7 @@ static bool run_poll_handlers_once(AioContext *ctx)
bool progress = false;
AioHandler *node;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (!node->deleted && node->io_poll &&
node->io_poll(node->opaque)) {
progress = true;
@ -509,7 +527,7 @@ static bool run_poll_handlers_once(AioContext *ctx)
* Note that ctx->notify_me must be non-zero so this function can detect
* aio_notify().
*
* Note that the caller must have incremented ctx->walking_handlers.
* Note that the caller must have incremented ctx->list_lock.
*
* Returns: true if progress was made, false otherwise
*/
@ -519,7 +537,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
int64_t end_time;
assert(ctx->notify_me);
assert(ctx->walking_handlers > 0);
assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
assert(ctx->poll_disable_cnt == 0);
trace_run_poll_handlers_begin(ctx, max_ns);
@ -541,7 +559,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
*
* ctx->notify_me must be non-zero so this function can detect aio_notify().
*
* Note that the caller must have incremented ctx->walking_handlers.
* Note that the caller must have incremented ctx->list_lock.
*
* Returns: true if progress was made, false otherwise
*/
@ -592,7 +610,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
atomic_add(&ctx->notify_me, 2);
}
ctx->walking_handlers++;
qemu_lockcnt_inc(&ctx->list_lock);
if (ctx->poll_max_ns) {
start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
@ -606,7 +624,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
/* fill pollfds */
if (!aio_epoll_enabled(ctx)) {
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (!node->deleted && node->pfd.events
&& aio_node_check(ctx, node->is_external)) {
add_pollfd(node);
@ -691,7 +709,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
}
npfd = 0;
ctx->walking_handlers--;
qemu_lockcnt_dec(&ctx->list_lock);
/* Run dispatch even if there were no readable fds to run timers */
if (aio_dispatch(ctx, ret > 0)) {

View File

@ -21,6 +21,7 @@
#include "qemu/queue.h"
#include "qemu/sockets.h"
#include "qapi/error.h"
#include "qemu/rcu_queue.h"
struct AioHandler {
EventNotifier *e;
@ -45,6 +46,7 @@ void aio_set_fd_handler(AioContext *ctx,
/* fd is a SOCKET in our case */
AioHandler *node;
qemu_lockcnt_lock(&ctx->list_lock);
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
if (node->pfd.fd == fd && !node->deleted) {
break;
@ -54,14 +56,14 @@ void aio_set_fd_handler(AioContext *ctx,
/* Are we deleting the fd handler? */
if (!io_read && !io_write) {
if (node) {
/* If the lock is held, just mark the node as deleted */
if (ctx->walking_handlers) {
/* If aio_poll is in progress, just mark the node as deleted */
if (qemu_lockcnt_count(&ctx->list_lock)) {
node->deleted = 1;
node->pfd.revents = 0;
} else {
/* Otherwise, delete it for real. We can't just mark it as
* deleted because deleted nodes are only cleaned up after
* releasing the walking_handlers lock.
* releasing the list_lock.
*/
QLIST_REMOVE(node, node);
g_free(node);
@ -74,7 +76,7 @@ void aio_set_fd_handler(AioContext *ctx,
/* Alloc and insert if it's not already there */
node = g_new0(AioHandler, 1);
node->pfd.fd = fd;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
}
node->pfd.events = 0;
@ -99,6 +101,7 @@ void aio_set_fd_handler(AioContext *ctx,
FD_CONNECT | FD_WRITE | FD_OOB);
}
qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
}
@ -117,6 +120,7 @@ void aio_set_event_notifier(AioContext *ctx,
{
AioHandler *node;
qemu_lockcnt_lock(&ctx->list_lock);
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
if (node->e == e && !node->deleted) {
break;
@ -128,14 +132,14 @@ void aio_set_event_notifier(AioContext *ctx,
if (node) {
g_source_remove_poll(&ctx->source, &node->pfd);
/* If the lock is held, just mark the node as deleted */
if (ctx->walking_handlers) {
/* aio_poll is in progress, just mark the node as deleted */
if (qemu_lockcnt_count(&ctx->list_lock)) {
node->deleted = 1;
node->pfd.revents = 0;
} else {
/* Otherwise, delete it for real. We can't just mark it as
* deleted because deleted nodes are only cleaned up after
* releasing the walking_handlers lock.
* releasing the list_lock.
*/
QLIST_REMOVE(node, node);
g_free(node);
@ -149,7 +153,7 @@ void aio_set_event_notifier(AioContext *ctx,
node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
node->pfd.events = G_IO_IN;
node->is_external = is_external;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
g_source_add_poll(&ctx->source, &node->pfd);
}
@ -157,6 +161,7 @@ void aio_set_event_notifier(AioContext *ctx,
node->io_notify = io_notify;
}
qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
}
@ -175,10 +180,16 @@ bool aio_prepare(AioContext *ctx)
bool have_select_revents = false;
fd_set rfds, wfds;
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
qemu_lockcnt_inc(&ctx->list_lock);
/* fill fd sets */
FD_ZERO(&rfds);
FD_ZERO(&wfds);
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (node->io_read) {
FD_SET ((SOCKET)node->pfd.fd, &rfds);
}
@ -188,7 +199,7 @@ bool aio_prepare(AioContext *ctx)
}
if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
node->pfd.revents = 0;
if (FD_ISSET(node->pfd.fd, &rfds)) {
node->pfd.revents |= G_IO_IN;
@ -202,45 +213,55 @@ bool aio_prepare(AioContext *ctx)
}
}
qemu_lockcnt_dec(&ctx->list_lock);
return have_select_revents;
}
bool aio_pending(AioContext *ctx)
{
AioHandler *node;
bool result = false;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
qemu_lockcnt_inc(&ctx->list_lock);
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (node->pfd.revents && node->io_notify) {
return true;
result = true;
break;
}
if ((node->pfd.revents & G_IO_IN) && node->io_read) {
return true;
result = true;
break;
}
if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
return true;
result = true;
break;
}
}
return false;
qemu_lockcnt_dec(&ctx->list_lock);
return result;
}
static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
{
AioHandler *node;
bool progress = false;
AioHandler *tmp;
qemu_lockcnt_inc(&ctx->list_lock);
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
node = QLIST_FIRST(&ctx->aio_handlers);
while (node) {
AioHandler *tmp;
QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
int revents = node->pfd.revents;
ctx->walking_handlers++;
if (!node->deleted &&
(revents || event_notifier_get_handle(node->e) == event) &&
node->io_notify) {
@ -275,17 +296,16 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
}
}
tmp = node;
node = QLIST_NEXT(node, node);
ctx->walking_handlers--;
if (!ctx->walking_handlers && tmp->deleted) {
QLIST_REMOVE(tmp, node);
g_free(tmp);
if (node->deleted) {
if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
QLIST_REMOVE(node, node);
g_free(node);
qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
}
}
}
qemu_lockcnt_dec(&ctx->list_lock);
return progress;
}
@ -323,20 +343,19 @@ bool aio_poll(AioContext *ctx, bool blocking)
atomic_add(&ctx->notify_me, 2);
}
qemu_lockcnt_inc(&ctx->list_lock);
have_select_revents = aio_prepare(ctx);
ctx->walking_handlers++;
/* fill fd sets */
count = 0;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (!node->deleted && node->io_notify
&& aio_node_check(ctx, node->is_external)) {
events[count++] = event_notifier_get_handle(node->e);
}
}
ctx->walking_handlers--;
qemu_lockcnt_dec(&ctx->list_lock);
first = true;
/* ctx->notifier is always registered. */

45
async.c
View File

@ -53,14 +53,14 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
.cb = cb,
.opaque = opaque,
};
qemu_mutex_lock(&ctx->bh_lock);
qemu_lockcnt_lock(&ctx->list_lock);
bh->next = ctx->first_bh;
bh->scheduled = 1;
bh->deleted = 1;
/* Make sure that the members are ready before putting bh into list */
smp_wmb();
ctx->first_bh = bh;
qemu_mutex_unlock(&ctx->bh_lock);
qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
}
@ -73,12 +73,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
.cb = cb,
.opaque = opaque,
};
qemu_mutex_lock(&ctx->bh_lock);
qemu_lockcnt_lock(&ctx->list_lock);
bh->next = ctx->first_bh;
/* Make sure that the members are ready before putting bh into list */
smp_wmb();
ctx->first_bh = bh;
qemu_mutex_unlock(&ctx->bh_lock);
qemu_lockcnt_unlock(&ctx->list_lock);
return bh;
}
@ -92,14 +92,13 @@ int aio_bh_poll(AioContext *ctx)
{
QEMUBH *bh, **bhp, *next;
int ret;
bool deleted = false;
ctx->walking_bh++;
qemu_lockcnt_inc(&ctx->list_lock);
ret = 0;
for (bh = ctx->first_bh; bh; bh = next) {
/* Make sure that fetching bh happens before accessing its members */
smp_read_barrier_depends();
next = bh->next;
for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
next = atomic_rcu_read(&bh->next);
/* The atomic_xchg is paired with the one in qemu_bh_schedule. The
* implicit memory barrier ensures that the callback sees all writes
* done by the scheduling thread. It also ensures that the scheduling
@ -114,13 +113,18 @@ int aio_bh_poll(AioContext *ctx)
bh->idle = 0;
aio_bh_call(bh);
}
if (bh->deleted) {
deleted = true;
}
}
ctx->walking_bh--;
/* remove deleted bhs */
if (!ctx->walking_bh) {
qemu_mutex_lock(&ctx->bh_lock);
if (!deleted) {
qemu_lockcnt_dec(&ctx->list_lock);
return ret;
}
if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
bhp = &ctx->first_bh;
while (*bhp) {
bh = *bhp;
@ -131,9 +135,8 @@ int aio_bh_poll(AioContext *ctx)
bhp = &bh->next;
}
}
qemu_mutex_unlock(&ctx->bh_lock);
qemu_lockcnt_unlock(&ctx->list_lock);
}
return ret;
}
@ -187,7 +190,8 @@ aio_compute_timeout(AioContext *ctx)
int timeout = -1;
QEMUBH *bh;
for (bh = ctx->first_bh; bh; bh = bh->next) {
for (bh = atomic_rcu_read(&ctx->first_bh); bh;
bh = atomic_rcu_read(&bh->next)) {
if (bh->scheduled) {
if (bh->idle) {
/* idle bottom halves will be polled at least
@ -270,7 +274,8 @@ aio_ctx_finalize(GSource *source)
}
#endif
qemu_mutex_lock(&ctx->bh_lock);
qemu_lockcnt_lock(&ctx->list_lock);
assert(!qemu_lockcnt_count(&ctx->list_lock));
while (ctx->first_bh) {
QEMUBH *next = ctx->first_bh->next;
@ -280,12 +285,12 @@ aio_ctx_finalize(GSource *source)
g_free(ctx->first_bh);
ctx->first_bh = next;
}
qemu_mutex_unlock(&ctx->bh_lock);
qemu_lockcnt_unlock(&ctx->list_lock);
aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
event_notifier_cleanup(&ctx->notifier);
qemu_rec_mutex_destroy(&ctx->lock);
qemu_mutex_destroy(&ctx->bh_lock);
qemu_lockcnt_destroy(&ctx->list_lock);
timerlistgroup_deinit(&ctx->tlg);
}
@ -372,6 +377,7 @@ AioContext *aio_context_new(Error **errp)
goto fail;
}
g_source_set_can_recurse(&ctx->source, true);
qemu_lockcnt_init(&ctx->list_lock);
aio_set_event_notifier(ctx, &ctx->notifier,
false,
(EventNotifierHandler *)
@ -381,7 +387,6 @@ AioContext *aio_context_new(Error **errp)
ctx->linux_aio = NULL;
#endif
ctx->thread_pool = NULL;
qemu_mutex_init(&ctx->bh_lock);
qemu_rec_mutex_init(&ctx->lock);
timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);

View File

@ -228,9 +228,7 @@ void bdrv_drained_begin(BlockDriverState *bs)
bdrv_parent_drained_begin(bs);
}
bdrv_io_unplugged_begin(bs);
bdrv_drain_recurse(bs);
bdrv_io_unplugged_end(bs);
}
void bdrv_drained_end(BlockDriverState *bs)
@ -302,7 +300,6 @@ void bdrv_drain_all_begin(void)
aio_context_acquire(aio_context);
bdrv_parent_drained_begin(bs);
bdrv_io_unplugged_begin(bs);
aio_disable_external(aio_context);
aio_context_release(aio_context);
@ -347,7 +344,6 @@ void bdrv_drain_all_end(void)
aio_context_acquire(aio_context);
aio_enable_external(aio_context);
bdrv_io_unplugged_end(bs);
bdrv_parent_drained_end(bs);
aio_context_release(aio_context);
}
@ -2650,7 +2646,7 @@ void bdrv_io_plug(BlockDriverState *bs)
bdrv_io_plug(child->bs);
}
if (bs->io_plugged++ == 0 && bs->io_plug_disabled == 0) {
if (bs->io_plugged++ == 0) {
BlockDriver *drv = bs->drv;
if (drv && drv->bdrv_io_plug) {
drv->bdrv_io_plug(bs);
@ -2663,7 +2659,7 @@ void bdrv_io_unplug(BlockDriverState *bs)
BdrvChild *child;
assert(bs->io_plugged);
if (--bs->io_plugged == 0 && bs->io_plug_disabled == 0) {
if (--bs->io_plugged == 0) {
BlockDriver *drv = bs->drv;
if (drv && drv->bdrv_io_unplug) {
drv->bdrv_io_unplug(bs);
@ -2674,36 +2670,3 @@ void bdrv_io_unplug(BlockDriverState *bs)
bdrv_io_unplug(child->bs);
}
}
void bdrv_io_unplugged_begin(BlockDriverState *bs)
{
BdrvChild *child;
if (bs->io_plug_disabled++ == 0 && bs->io_plugged > 0) {
BlockDriver *drv = bs->drv;
if (drv && drv->bdrv_io_unplug) {
drv->bdrv_io_unplug(bs);
}
}
QLIST_FOREACH(child, &bs->children, next) {
bdrv_io_unplugged_begin(child->bs);
}
}
void bdrv_io_unplugged_end(BlockDriverState *bs)
{
BdrvChild *child;
assert(bs->io_plug_disabled);
QLIST_FOREACH(child, &bs->children, next) {
bdrv_io_unplugged_end(child->bs);
}
if (--bs->io_plug_disabled == 0 && bs->io_plugged > 0) {
BlockDriver *drv = bs->drv;
if (drv && drv->bdrv_io_plug) {
drv->bdrv_io_plug(bs);
}
}
}

277
docs/lockcnt.txt Normal file
View File

@ -0,0 +1,277 @@
DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt)
===================================================
QEMU often uses reference counts to track data structures that are being
accessed and should not be freed. For example, a loop that invoke
callbacks like this is not safe:
QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
if (ioh->revents & G_IO_OUT) {
ioh->fd_write(ioh->opaque);
}
}
QLIST_FOREACH_SAFE protects against deletion of the current node (ioh)
by stashing away its "next" pointer. However, ioh->fd_write could
actually delete the next node from the list. The simplest way to
avoid this is to mark the node as deleted, and remove it from the
list in the above loop:
QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
if (ioh->deleted) {
QLIST_REMOVE(ioh, next);
g_free(ioh);
} else {
if (ioh->revents & G_IO_OUT) {
ioh->fd_write(ioh->opaque);
}
}
}
If however this loop must also be reentrant, i.e. it is possible that
ioh->fd_write invokes the loop again, some kind of counting is needed:
walking_handlers++;
QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
if (ioh->deleted) {
if (walking_handlers == 1) {
QLIST_REMOVE(ioh, next);
g_free(ioh);
}
} else {
if (ioh->revents & G_IO_OUT) {
ioh->fd_write(ioh->opaque);
}
}
}
walking_handlers--;
One may think of using the RCU primitives, rcu_read_lock() and
rcu_read_unlock(); effectively, the RCU nesting count would take
the place of the walking_handlers global variable. Indeed,
reference counting and RCU have similar purposes, but their usage in
general is complementary:
- reference counting is fine-grained and limited to a single data
structure; RCU delays reclamation of *all* RCU-protected data
structures;
- reference counting works even in the presence of code that keeps
a reference for a long time; RCU critical sections in principle
should be kept short;
- reference counting is often applied to code that is not thread-safe
but is reentrant; in fact, usage of reference counting in QEMU predates
the introduction of threads by many years. RCU is generally used to
protect readers from other threads freeing memory after concurrent
modifications to a data structure.
- reclaiming data can be done by a separate thread in the case of RCU;
this can improve performance, but also delay reclamation undesirably.
With reference counting, reclamation is deterministic.
This file documents QemuLockCnt, an abstraction for using reference
counting in code that has to be both thread-safe and reentrant.
QemuLockCnt concepts
--------------------
A QemuLockCnt comprises both a counter and a mutex; it has primitives
to increment and decrement the counter, and to take and release the
mutex. The counter notes how many visits to the data structures are
taking place (the visits could be from different threads, or there could
be multiple reentrant visits from the same thread). The basic rules
governing the counter/mutex pair then are the following:
- Data protected by the QemuLockCnt must not be freed unless the
counter is zero and the mutex is taken.
- A new visit cannot be started while the counter is zero and the
mutex is taken.
Most of the time, the mutex protects all writes to the data structure,
not just frees, though there could be cases where this is not necessary.
Reads, instead, can be done without taking the mutex, as long as the
readers and writers use the same macros that are used for RCU, for
example atomic_rcu_read, atomic_rcu_set, QLIST_FOREACH_RCU, etc. This is
because the reads are done outside a lock and a set or QLIST_INSERT_HEAD
can happen concurrently with the read. The RCU API ensures that the
processor and the compiler see all required memory barriers.
This could be implemented simply by protecting the counter with the
mutex, for example:
// (1)
qemu_mutex_lock(&walking_handlers_mutex);
walking_handlers++;
qemu_mutex_unlock(&walking_handlers_mutex);
...
// (2)
qemu_mutex_lock(&walking_handlers_mutex);
if (--walking_handlers == 0) {
QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
if (ioh->deleted) {
QLIST_REMOVE(ioh, next);
g_free(ioh);
}
}
}
qemu_mutex_unlock(&walking_handlers_mutex);
Here, no frees can happen in the code represented by the ellipsis.
If another thread is executing critical section (2), that part of
the code cannot be entered, because the thread will not be able
to increment the walking_handlers variable. And of course
during the visit any other thread will see a nonzero value for
walking_handlers, as in the single-threaded code.
Note that it is possible for multiple concurrent accesses to delay
the cleanup arbitrarily; in other words, for the walking_handlers
counter to never become zero. For this reason, this technique is
more easily applicable if concurrent access to the structure is rare.
However, critical sections are easy to forget since you have to do
them for each modification of the counter. QemuLockCnt ensures that
all modifications of the counter take the lock appropriately, and it
can also be more efficient in two ways:
- it avoids taking the lock for many operations (for example
incrementing the counter while it is non-zero);
- on some platforms, one can implement QemuLockCnt to hold the lock
and the mutex in a single word, making the fast path no more expensive
than simply managing a counter using atomic operations (see
docs/atomics.txt). This can be very helpful if concurrent access to
the data structure is expected to be rare.
Using the same mutex for frees and writes can still incur some small
inefficiencies; for example, a visit can never start if the counter is
zero and the mutex is taken---even if the mutex is taken by a write,
which in principle need not block a visit of the data structure.
However, these are usually not a problem if any of the following
assumptions are valid:
- concurrent access is possible but rare
- writes are rare
- writes are frequent, but this kind of write (e.g. appending to a
list) has a very small critical section.
For example, QEMU uses QemuLockCnt to manage an AioContext's list of
bottom halves and file descriptor handlers. Modifications to the list
of file descriptor handlers are rare. Creation of a new bottom half is
frequent and can happen on a fast path; however: 1) it is almost never
concurrent with a visit to the list of bottom halves; 2) it only has
three instructions in the critical path, two assignments and a smp_wmb().
QemuLockCnt API
---------------
The QemuLockCnt API is described in include/qemu/thread.h.
QemuLockCnt usage
-----------------
This section explains the typical usage patterns for QemuLockCnt functions.
Setting a variable to a non-NULL value can be done between
qemu_lockcnt_lock and qemu_lockcnt_unlock:
qemu_lockcnt_lock(&xyz_lockcnt);
if (!xyz) {
new_xyz = g_new(XYZ, 1);
...
atomic_rcu_set(&xyz, new_xyz);
}
qemu_lockcnt_unlock(&xyz_lockcnt);
Accessing the value can be done between qemu_lockcnt_inc and
qemu_lockcnt_dec:
qemu_lockcnt_inc(&xyz_lockcnt);
if (xyz) {
XYZ *p = atomic_rcu_read(&xyz);
...
/* Accesses can now be done through "p". */
}
qemu_lockcnt_dec(&xyz_lockcnt);
Freeing the object can similarly use qemu_lockcnt_lock and
qemu_lockcnt_unlock, but you also need to ensure that the count
is zero (i.e. there is no concurrent visit). Because qemu_lockcnt_inc
takes the QemuLockCnt's lock, the count cannot become non-zero while
the object is being freed. Freeing an object looks like this:
qemu_lockcnt_lock(&xyz_lockcnt);
if (!qemu_lockcnt_count(&xyz_lockcnt)) {
g_free(xyz);
xyz = NULL;
}
qemu_lockcnt_unlock(&xyz_lockcnt);
If an object has to be freed right after a visit, you can combine
the decrement, the locking and the check on count as follows:
qemu_lockcnt_inc(&xyz_lockcnt);
if (xyz) {
XYZ *p = atomic_rcu_read(&xyz);
...
/* Accesses can now be done through "p". */
}
if (qemu_lockcnt_dec_and_lock(&xyz_lockcnt)) {
g_free(xyz);
xyz = NULL;
qemu_lockcnt_unlock(&xyz_lockcnt);
}
QemuLockCnt can also be used to access a list as follows:
qemu_lockcnt_inc(&io_handlers_lockcnt);
QLIST_FOREACH_RCU(ioh, &io_handlers, pioh) {
if (ioh->revents & G_IO_OUT) {
ioh->fd_write(ioh->opaque);
}
}
if (qemu_lockcnt_dec_and_lock(&io_handlers_lockcnt)) {
QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
if (ioh->deleted) {
QLIST_REMOVE(ioh, next);
g_free(ioh);
}
}
qemu_lockcnt_unlock(&io_handlers_lockcnt);
}
Again, the RCU primitives are used because new items can be added to the
list during the walk. QLIST_FOREACH_RCU ensures that the processor and
the compiler see the appropriate memory barriers.
An alternative pattern uses qemu_lockcnt_dec_if_lock:
qemu_lockcnt_inc(&io_handlers_lockcnt);
QLIST_FOREACH_SAFE_RCU(ioh, &io_handlers, next, pioh) {
if (ioh->deleted) {
if (qemu_lockcnt_dec_if_lock(&io_handlers_lockcnt)) {
QLIST_REMOVE(ioh, next);
g_free(ioh);
qemu_lockcnt_inc_and_unlock(&io_handlers_lockcnt);
}
} else {
if (ioh->revents & G_IO_OUT) {
ioh->fd_write(ioh->opaque);
}
}
}
qemu_lockcnt_dec(&io_handlers_lockcnt);
Here you can use qemu_lockcnt_dec instead of qemu_lockcnt_dec_and_lock,
because there is no special task to do if the count goes from 1 to 0.

View File

@ -84,9 +84,8 @@ How to synchronize with an IOThread
AioContext is not thread-safe so some rules must be followed when using file
descriptors, event notifiers, timers, or BHs across threads:
1. AioContext functions can be called safely from file descriptor, event
notifier, timer, or BH callbacks invoked by the AioContext. No locking is
necessary.
1. AioContext functions can always be called safely. They handle their
own locking internally.
2. Other threads wishing to access the AioContext must use
aio_context_acquire()/aio_context_release() for mutual exclusion. Once the
@ -94,16 +93,14 @@ context is acquired no other thread can access it or run event loop iterations
in this AioContext.
aio_context_acquire()/aio_context_release() calls may be nested. This
means you can call them if you're not sure whether #1 applies.
means you can call them if you're not sure whether #2 applies.
There is currently no lock ordering rule if a thread needs to acquire multiple
AioContexts simultaneously. Therefore, it is only safe for code holding the
QEMU global mutex to acquire other AioContexts.
Side note: the best way to schedule a function call across threads is to create
a BH in the target AioContext beforehand and then call qemu_bh_schedule(). No
acquire/release or locking is needed for the qemu_bh_schedule() call. But be
sure to acquire the AioContext for aio_bh_new() if necessary.
Side note: the best way to schedule a function call across threads is to call
aio_bh_schedule_oneshot(). No acquire/release or locking is needed.
AioContext and the block layer
------------------------------

View File

@ -53,18 +53,12 @@ struct LinuxAioState;
struct AioContext {
GSource source;
/* Protects all fields from multi-threaded access */
/* Used by AioContext users to protect from multi-threaded access. */
QemuRecMutex lock;
/* The list of registered AIO handlers */
/* The list of registered AIO handlers. Protected by ctx->list_lock. */
QLIST_HEAD(, AioHandler) aio_handlers;
/* This is a simple lock used to protect the aio_handlers list.
* Specifically, it's used to ensure that no callbacks are removed while
* we're walking and dispatching callbacks.
*/
int walking_handlers;
/* Used to avoid unnecessary event_notifier_set calls in aio_notify;
* accessed with atomic primitives. If this field is 0, everything
* (file descriptors, bottom halves, timers) will be re-evaluated
@ -90,17 +84,15 @@ struct AioContext {
*/
uint32_t notify_me;
/* lock to protect between bh's adders and deleter */
QemuMutex bh_lock;
/* A lock to protect between QEMUBH and AioHandler adders and deleter,
* and to ensure that no callbacks are removed while we're walking and
* dispatching them.
*/
QemuLockCnt list_lock;
/* Anchor of the list of Bottom Halves belonging to the context */
struct QEMUBH *first_bh;
/* A simple lock used to protect the first_bh list, and ensure that
* no callbacks are removed while we're walking and dispatching callbacks.
*/
int walking_bh;
/* Used by aio_notify.
*
* "notified" is used to avoid expensive event_notifier_test_and_clear
@ -116,7 +108,9 @@ struct AioContext {
bool notified;
EventNotifier notifier;
/* Thread pool for performing work and receiving completion callbacks */
/* Thread pool for performing work and receiving completion callbacks.
* Has its own locking.
*/
struct ThreadPool *thread_pool;
#ifdef CONFIG_LINUX_AIO
@ -126,7 +120,9 @@ struct AioContext {
struct LinuxAioState *linux_aio;
#endif
/* TimerLists for calling timers - one per clock type */
/* TimerLists for calling timers - one per clock type. Has its own
* locking.
*/
QEMUTimerListGroup tlg;
int external_disable_cnt;
@ -180,9 +176,11 @@ void aio_context_unref(AioContext *ctx);
* automatically takes care of calling aio_context_acquire and
* aio_context_release.
*
* Access to timers and BHs from a thread that has not acquired AioContext
* is possible. Access to callbacks for now must be done while the AioContext
* is owned by the thread (FIXME).
* Note that this is separate from bdrv_drained_begin/bdrv_drained_end. A
* thread still has to call those to avoid being interrupted by the guest.
*
* Bottom halves, timers and callbacks can be created or removed without
* acquiring the AioContext.
*/
void aio_context_acquire(AioContext *ctx);

View File

@ -526,8 +526,6 @@ int bdrv_probe_geometry(BlockDriverState *bs, HDGeometry *geo);
void bdrv_io_plug(BlockDriverState *bs);
void bdrv_io_unplug(BlockDriverState *bs);
void bdrv_io_unplugged_begin(BlockDriverState *bs);
void bdrv_io_unplugged_end(BlockDriverState *bs);
/**
* bdrv_drained_begin:

View File

@ -526,9 +526,8 @@ struct BlockDriverState {
uint64_t write_threshold_offset;
NotifierWithReturn write_threshold_notifier;
/* counters for nested bdrv_io_plug and bdrv_io_unplugged_begin */
/* counter for nested bdrv_io_plug */
unsigned io_plugged;
unsigned io_plug_disabled;
int quiesce_counter;
};

36
include/qemu/futex.h Normal file
View File

@ -0,0 +1,36 @@
/*
* Wrappers around Linux futex syscall
*
* Copyright Red Hat, Inc. 2017
*
* Author:
* Paolo Bonzini <pbonzini@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*
*/
#include <sys/syscall.h>
#include <linux/futex.h>
#define qemu_futex(...) syscall(__NR_futex, __VA_ARGS__)
static inline void qemu_futex_wake(void *f, int n)
{
qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
}
static inline void qemu_futex_wait(void *f, unsigned val)
{
while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
switch (errno) {
case EWOULDBLOCK:
return;
case EINTR:
break; /* get out of switch and retry */
default:
abort();
}
}
}

View File

@ -8,6 +8,7 @@ typedef struct QemuMutex QemuMutex;
typedef struct QemuCond QemuCond;
typedef struct QemuSemaphore QemuSemaphore;
typedef struct QemuEvent QemuEvent;
typedef struct QemuLockCnt QemuLockCnt;
typedef struct QemuThread QemuThread;
#ifdef _WIN32
@ -98,4 +99,115 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
__sync_lock_release(&spin->value);
}
struct QemuLockCnt {
#ifndef CONFIG_LINUX
QemuMutex mutex;
#endif
unsigned count;
};
/**
* qemu_lockcnt_init: initialize a QemuLockcnt
* @lockcnt: the lockcnt to initialize
*
* Initialize lockcnt's counter to zero and prepare its mutex
* for usage.
*/
void qemu_lockcnt_init(QemuLockCnt *lockcnt);
/**
* qemu_lockcnt_destroy: destroy a QemuLockcnt
* @lockcnt: the lockcnt to destruct
*
* Destroy lockcnt's mutex.
*/
void qemu_lockcnt_destroy(QemuLockCnt *lockcnt);
/**
* qemu_lockcnt_inc: increment a QemuLockCnt's counter
* @lockcnt: the lockcnt to operate on
*
* If the lockcnt's count is zero, wait for critical sections
* to finish and increment lockcnt's count to 1. If the count
* is not zero, just increment it.
*
* Because this function can wait on the mutex, it must not be
* called while the lockcnt's mutex is held by the current thread.
* For the same reason, qemu_lockcnt_inc can also contribute to
* AB-BA deadlocks. This is a sample deadlock scenario:
*
* thread 1 thread 2
* -------------------------------------------------------
* qemu_lockcnt_lock(&lc1);
* qemu_lockcnt_lock(&lc2);
* qemu_lockcnt_inc(&lc2);
* qemu_lockcnt_inc(&lc1);
*/
void qemu_lockcnt_inc(QemuLockCnt *lockcnt);
/**
* qemu_lockcnt_dec: decrement a QemuLockCnt's counter
* @lockcnt: the lockcnt to operate on
*/
void qemu_lockcnt_dec(QemuLockCnt *lockcnt);
/**
* qemu_lockcnt_dec_and_lock: decrement a QemuLockCnt's counter and
* possibly lock it.
* @lockcnt: the lockcnt to operate on
*
* Decrement lockcnt's count. If the new count is zero, lock
* the mutex and return true. Otherwise, return false.
*/
bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt);
/**
* qemu_lockcnt_dec_if_lock: possibly decrement a QemuLockCnt's counter and
* lock it.
* @lockcnt: the lockcnt to operate on
*
* If the count is 1, decrement the count to zero, lock
* the mutex and return true. Otherwise, return false.
*/
bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt);
/**
* qemu_lockcnt_lock: lock a QemuLockCnt's mutex.
* @lockcnt: the lockcnt to operate on
*
* Remember that concurrent visits are not blocked unless the count is
* also zero. You can use qemu_lockcnt_count to check for this inside a
* critical section.
*/
void qemu_lockcnt_lock(QemuLockCnt *lockcnt);
/**
* qemu_lockcnt_unlock: release a QemuLockCnt's mutex.
* @lockcnt: the lockcnt to operate on.
*/
void qemu_lockcnt_unlock(QemuLockCnt *lockcnt);
/**
* qemu_lockcnt_inc_and_unlock: combined unlock/increment on a QemuLockCnt.
* @lockcnt: the lockcnt to operate on.
*
* This is the same as
*
* qemu_lockcnt_unlock(lockcnt);
* qemu_lockcnt_inc(lockcnt);
*
* but more efficient.
*/
void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt);
/**
* qemu_lockcnt_count: query a LockCnt's count.
* @lockcnt: the lockcnt to query.
*
* Note that the count can change at any time. Still, while the
* lockcnt is locked, one can usefully check whether the count
* is non-zero.
*/
unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt);
#endif

View File

@ -1,5 +1,6 @@
util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o
util-obj-y += bufferiszero.o
util-obj-y += lockcnt.o
util-obj-$(CONFIG_POSIX) += compatfd.o
util-obj-$(CONFIG_POSIX) += event_notifier-posix.o
util-obj-$(CONFIG_POSIX) += mmap-alloc.o

397
util/lockcnt.c Normal file
View File

@ -0,0 +1,397 @@
/*
* QemuLockCnt implementation
*
* Copyright Red Hat, Inc. 2017
*
* Author:
* Paolo Bonzini <pbonzini@redhat.com>
*/
#include "qemu/osdep.h"
#include "qemu/thread.h"
#include "qemu/atomic.h"
#include "trace.h"
#ifdef CONFIG_LINUX
#include "qemu/futex.h"
/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
* For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
* this is not the most relaxing citation I could make...). It is similar
* to mutex2 in the paper.
*/
#define QEMU_LOCKCNT_STATE_MASK 3
#define QEMU_LOCKCNT_STATE_FREE 0 /* free, uncontended */
#define QEMU_LOCKCNT_STATE_LOCKED 1 /* locked, uncontended */
#define QEMU_LOCKCNT_STATE_WAITING 2 /* locked, contended */
#define QEMU_LOCKCNT_COUNT_STEP 4
#define QEMU_LOCKCNT_COUNT_SHIFT 2
void qemu_lockcnt_init(QemuLockCnt *lockcnt)
{
lockcnt->count = 0;
}
void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
{
}
/* *val is the current value of lockcnt->count.
*
* If the lock is free, try a cmpxchg from *val to new_if_free; return
* true and set *val to the old value found by the cmpxchg in
* lockcnt->count.
*
* If the lock is taken, wait for it to be released and return false
* *without trying again to take the lock*. Again, set *val to the
* new value of lockcnt->count.
*
* If *waited is true on return, new_if_free's bottom two bits must not
* be QEMU_LOCKCNT_STATE_LOCKED on subsequent calls, because the caller
* does not know if there are other waiters. Furthermore, after *waited
* is set the caller has effectively acquired the lock. If it returns
* with the lock not taken, it must wake another futex waiter.
*/
static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
int new_if_free, bool *waited)
{
/* Fast path for when the lock is free. */
if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
int expected = *val;
trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
*val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
if (*val == expected) {
trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
*val = new_if_free;
return true;
}
}
/* The slow path moves from locked to waiting if necessary, then
* does a futex wait. Both steps can be repeated ad nauseam,
* only getting out of the loop if we can have another shot at the
* fast path. Once we can, get out to compute the new destination
* value for the fast path.
*/
while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
int expected = *val;
int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
*val = atomic_cmpxchg(&lockcnt->count, expected, new);
if (*val == expected) {
*val = new;
}
continue;
}
if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
*waited = true;
trace_lockcnt_futex_wait(lockcnt, *val);
qemu_futex_wait(&lockcnt->count, *val);
*val = atomic_read(&lockcnt->count);
trace_lockcnt_futex_wait_resume(lockcnt, *val);
continue;
}
abort();
}
return false;
}
static void lockcnt_wake(QemuLockCnt *lockcnt)
{
trace_lockcnt_futex_wake(lockcnt);
qemu_futex_wake(&lockcnt->count, 1);
}
void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
{
int val = atomic_read(&lockcnt->count);
bool waited = false;
for (;;) {
if (val >= QEMU_LOCKCNT_COUNT_STEP) {
int expected = val;
val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
if (val == expected) {
break;
}
} else {
/* The fast path is (0, unlocked)->(1, unlocked). */
if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
&waited)) {
break;
}
}
}
/* If we were woken by another thread, we should also wake one because
* we are effectively releasing the lock that was given to us. This is
* the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
* in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
* wake someone.
*/
if (waited) {
lockcnt_wake(lockcnt);
}
}
void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
{
atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP);
}
/* Decrement a counter, and return locked if it is decremented to zero.
* If the function returns true, it is impossible for the counter to
* become nonzero until the next qemu_lockcnt_unlock.
*/
bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
{
int val = atomic_read(&lockcnt->count);
int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
bool waited = false;
for (;;) {
if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
int expected = val;
val = atomic_cmpxchg(&lockcnt->count, val, val - QEMU_LOCKCNT_COUNT_STEP);
if (val == expected) {
break;
}
} else {
/* If count is going 1->0, take the lock. The fast path is
* (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
*/
if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
return true;
}
if (waited) {
/* At this point we do not know if there are more waiters. Assume
* there are.
*/
locked_state = QEMU_LOCKCNT_STATE_WAITING;
}
}
}
/* If we were woken by another thread, but we're returning in unlocked
* state, we should also wake a thread because we are effectively
* releasing the lock that was given to us. This is the case where
* qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
* bits, and qemu_lockcnt_unlock would find it and wake someone.
*/
if (waited) {
lockcnt_wake(lockcnt);
}
return false;
}
/* If the counter is one, decrement it and return locked. Otherwise do
* nothing.
*
* If the function returns true, it is impossible for the counter to
* become nonzero until the next qemu_lockcnt_unlock.
*/
bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
{
int val = atomic_read(&lockcnt->count);
int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
bool waited = false;
while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
/* If count is going 1->0, take the lock. The fast path is
* (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
*/
if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
return true;
}
if (waited) {
/* At this point we do not know if there are more waiters. Assume
* there are.
*/
locked_state = QEMU_LOCKCNT_STATE_WAITING;
}
}
/* If we were woken by another thread, but we're returning in unlocked
* state, we should also wake a thread because we are effectively
* releasing the lock that was given to us. This is the case where
* qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
* bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
*/
if (waited) {
lockcnt_wake(lockcnt);
}
return false;
}
void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
{
int val = atomic_read(&lockcnt->count);
int step = QEMU_LOCKCNT_STATE_LOCKED;
bool waited = false;
/* The third argument is only used if the low bits of val are 0
* (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
* state.
*/
while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
if (waited) {
/* At this point we do not know if there are more waiters. Assume
* there are.
*/
step = QEMU_LOCKCNT_STATE_WAITING;
}
}
}
void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
{
int expected, new, val;
val = atomic_read(&lockcnt->count);
do {
expected = val;
new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
trace_lockcnt_unlock_attempt(lockcnt, val, new);
val = atomic_cmpxchg(&lockcnt->count, val, new);
} while (val != expected);
trace_lockcnt_unlock_success(lockcnt, val, new);
if (val & QEMU_LOCKCNT_STATE_WAITING) {
lockcnt_wake(lockcnt);
}
}
void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
{
int expected, new, val;
val = atomic_read(&lockcnt->count);
do {
expected = val;
new = val & ~QEMU_LOCKCNT_STATE_MASK;
trace_lockcnt_unlock_attempt(lockcnt, val, new);
val = atomic_cmpxchg(&lockcnt->count, val, new);
} while (val != expected);
trace_lockcnt_unlock_success(lockcnt, val, new);
if (val & QEMU_LOCKCNT_STATE_WAITING) {
lockcnt_wake(lockcnt);
}
}
unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
{
return atomic_read(&lockcnt->count) >> QEMU_LOCKCNT_COUNT_SHIFT;
}
#else
void qemu_lockcnt_init(QemuLockCnt *lockcnt)
{
qemu_mutex_init(&lockcnt->mutex);
lockcnt->count = 0;
}
void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
{
qemu_mutex_destroy(&lockcnt->mutex);
}
void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
{
int old;
for (;;) {
old = atomic_read(&lockcnt->count);
if (old == 0) {
qemu_lockcnt_lock(lockcnt);
qemu_lockcnt_inc_and_unlock(lockcnt);
return;
} else {
if (atomic_cmpxchg(&lockcnt->count, old, old + 1) == old) {
return;
}
}
}
}
void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
{
atomic_dec(&lockcnt->count);
}
/* Decrement a counter, and return locked if it is decremented to zero.
* It is impossible for the counter to become nonzero while the mutex
* is taken.
*/
bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
{
int val = atomic_read(&lockcnt->count);
while (val > 1) {
int old = atomic_cmpxchg(&lockcnt->count, val, val - 1);
if (old != val) {
val = old;
continue;
}
return false;
}
qemu_lockcnt_lock(lockcnt);
if (atomic_fetch_dec(&lockcnt->count) == 1) {
return true;
}
qemu_lockcnt_unlock(lockcnt);
return false;
}
/* Decrement a counter and return locked if it is decremented to zero.
* Otherwise do nothing.
*
* It is impossible for the counter to become nonzero while the mutex
* is taken.
*/
bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
{
/* No need for acquire semantics if we return false. */
int val = atomic_read(&lockcnt->count);
if (val > 1) {
return false;
}
qemu_lockcnt_lock(lockcnt);
if (atomic_fetch_dec(&lockcnt->count) == 1) {
return true;
}
qemu_lockcnt_inc_and_unlock(lockcnt);
return false;
}
void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
{
qemu_mutex_lock(&lockcnt->mutex);
}
void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
{
atomic_inc(&lockcnt->count);
qemu_mutex_unlock(&lockcnt->mutex);
}
void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
{
qemu_mutex_unlock(&lockcnt->mutex);
}
unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
{
return atomic_read(&lockcnt->count);
}
#endif

View File

@ -11,10 +11,6 @@
*
*/
#include "qemu/osdep.h"
#ifdef __linux__
#include <sys/syscall.h>
#include <linux/futex.h>
#endif
#include "qemu/thread.h"
#include "qemu/atomic.h"
#include "qemu/notify.h"
@ -294,28 +290,9 @@ void qemu_sem_wait(QemuSemaphore *sem)
}
#ifdef __linux__
#define futex(...) syscall(__NR_futex, __VA_ARGS__)
static inline void futex_wake(QemuEvent *ev, int n)
{
futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
}
static inline void futex_wait(QemuEvent *ev, unsigned val)
{
while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
switch (errno) {
case EWOULDBLOCK:
return;
case EINTR:
break; /* get out of switch and retry */
default:
abort();
}
}
}
#include "qemu/futex.h"
#else
static inline void futex_wake(QemuEvent *ev, int n)
static inline void qemu_futex_wake(QemuEvent *ev, int n)
{
pthread_mutex_lock(&ev->lock);
if (n == 1) {
@ -326,7 +303,7 @@ static inline void futex_wake(QemuEvent *ev, int n)
pthread_mutex_unlock(&ev->lock);
}
static inline void futex_wait(QemuEvent *ev, unsigned val)
static inline void qemu_futex_wait(QemuEvent *ev, unsigned val)
{
pthread_mutex_lock(&ev->lock);
if (ev->value == val) {
@ -338,7 +315,7 @@ static inline void futex_wait(QemuEvent *ev, unsigned val)
/* Valid transitions:
* - free->set, when setting the event
* - busy->set, when setting the event, followed by futex_wake
* - busy->set, when setting the event, followed by qemu_futex_wake
* - set->free, when resetting the event
* - free->busy, when waiting
*
@ -381,7 +358,7 @@ void qemu_event_set(QemuEvent *ev)
if (atomic_read(&ev->value) != EV_SET) {
if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
/* There were waiters, wake them up. */
futex_wake(ev, INT_MAX);
qemu_futex_wake(ev, INT_MAX);
}
}
}
@ -419,7 +396,7 @@ void qemu_event_wait(QemuEvent *ev)
return;
}
}
futex_wait(ev, EV_BUSY);
qemu_futex_wait(ev, EV_BUSY);
}
}

View File

@ -269,7 +269,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
*
* Valid transitions:
* - free->set, when setting the event
* - busy->set, when setting the event, followed by futex_wake
* - busy->set, when setting the event, followed by SetEvent
* - set->free, when resetting the event
* - free->busy, when waiting
*

View File

@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu"
hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx"
hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
# util/lockcnt.c
lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d"
lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded"
lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d"
lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded"
lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d"
lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d"
lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"