aio-win32: add support for sockets

Uses the same select/WSAEventSelect scheme as main-loop.c.
WSAEventSelect() is edge-triggered, so it cannot be used
directly, but it is still used as a way to exit from a
blocking g_poll().

Before g_poll() is called, we poll sockets with a non-blocking
select() to achieve the level-triggered semantics we require:
if a socket is ready, the g_poll() is made non-blocking too.

Based on a patch from Or Goshen.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
This commit is contained in:
Paolo Bonzini 2014-07-09 11:53:10 +02:00 committed by Stefan Hajnoczi
parent 79d9b6566b
commit b493317d34
3 changed files with 145 additions and 9 deletions

View File

@ -22,12 +22,80 @@
struct AioHandler {
EventNotifier *e;
IOHandler *io_read;
IOHandler *io_write;
EventNotifierHandler *io_notify;
GPollFD pfd;
int deleted;
void *opaque;
QLIST_ENTRY(AioHandler) node;
};
void aio_set_fd_handler(AioContext *ctx,
int fd,
IOHandler *io_read,
IOHandler *io_write,
void *opaque)
{
/* fd is a SOCKET in our case */
AioHandler *node;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
if (node->pfd.fd == fd && !node->deleted) {
break;
}
}
/* Are we deleting the fd handler? */
if (!io_read && !io_write) {
if (node) {
/* If the lock is held, just mark the node as deleted */
if (ctx->walking_handlers) {
node->deleted = 1;
node->pfd.revents = 0;
} else {
/* Otherwise, delete it for real. We can't just mark it as
* deleted because deleted nodes are only cleaned up after
* releasing the walking_handlers lock.
*/
QLIST_REMOVE(node, node);
g_free(node);
}
}
} else {
HANDLE event;
if (node == NULL) {
/* Alloc and insert if it's not already there */
node = g_malloc0(sizeof(AioHandler));
node->pfd.fd = fd;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
}
node->pfd.events = 0;
if (node->io_read) {
node->pfd.events |= G_IO_IN;
}
if (node->io_write) {
node->pfd.events |= G_IO_OUT;
}
node->e = &ctx->notifier;
/* Update handler with latest information */
node->opaque = opaque;
node->io_read = io_read;
node->io_write = io_write;
event = event_notifier_get_handle(&ctx->notifier);
WSAEventSelect(node->pfd.fd, event,
FD_READ | FD_ACCEPT | FD_CLOSE |
FD_CONNECT | FD_WRITE | FD_OOB);
}
aio_notify(ctx);
}
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *e,
EventNotifierHandler *io_notify)
@ -78,7 +146,39 @@ void aio_set_event_notifier(AioContext *ctx,
bool aio_prepare(AioContext *ctx)
{
return false;
static struct timeval tv0;
AioHandler *node;
bool have_select_revents = false;
fd_set rfds, wfds;
/* fill fd sets */
FD_ZERO(&rfds);
FD_ZERO(&wfds);
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
if (node->io_read) {
FD_SET ((SOCKET)node->pfd.fd, &rfds);
}
if (node->io_write) {
FD_SET ((SOCKET)node->pfd.fd, &wfds);
}
}
if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
node->pfd.revents = 0;
if (FD_ISSET(node->pfd.fd, &rfds)) {
node->pfd.revents |= G_IO_IN;
have_select_revents = true;
}
if (FD_ISSET(node->pfd.fd, &wfds)) {
node->pfd.revents |= G_IO_OUT;
have_select_revents = true;
}
}
}
return have_select_revents;
}
bool aio_pending(AioContext *ctx)
@ -89,6 +189,13 @@ bool aio_pending(AioContext *ctx)
if (node->pfd.revents && node->io_notify) {
return true;
}
if ((node->pfd.revents & G_IO_IN) && node->io_read) {
return true;
}
if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
return true;
}
}
return false;
@ -106,11 +213,12 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
node = QLIST_FIRST(&ctx->aio_handlers);
while (node) {
AioHandler *tmp;
int revents = node->pfd.revents;
ctx->walking_handlers++;
if (!node->deleted &&
(node->pfd.revents || event_notifier_get_handle(node->e) == event) &&
(revents || event_notifier_get_handle(node->e) == event) &&
node->io_notify) {
node->pfd.revents = 0;
node->io_notify(node->e);
@ -121,6 +229,28 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
}
}
if (!node->deleted &&
(node->io_read || node->io_write)) {
node->pfd.revents = 0;
if ((revents & G_IO_IN) && node->io_read) {
node->io_read(node->opaque);
progress = true;
}
if ((revents & G_IO_OUT) && node->io_write) {
node->io_write(node->opaque);
progress = true;
}
/* if the next select() will return an event, we have progressed */
if (event == event_notifier_get_handle(&ctx->notifier)) {
WSANETWORKEVENTS ev;
WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
if (ev.lNetworkEvents) {
progress = true;
}
}
}
tmp = node;
node = QLIST_NEXT(node, node);
@ -149,10 +279,15 @@ bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
bool was_dispatching, progress, first;
bool was_dispatching, progress, have_select_revents, first;
int count;
int timeout;
if (aio_prepare(ctx)) {
blocking = false;
have_select_revents = true;
}
was_dispatching = ctx->dispatching;
progress = false;
@ -183,6 +318,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
/* wait until next event */
while (count > 0) {
HANDLE event;
int ret;
timeout = blocking
@ -196,13 +332,17 @@ bool aio_poll(AioContext *ctx, bool blocking)
first = false;
/* if we have any signaled events, dispatch event */
if ((DWORD) (ret - WAIT_OBJECT_0) >= count) {
event = NULL;
if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
event = events[ret - WAIT_OBJECT_0];
} else if (!have_select_revents) {
break;
}
have_select_revents = false;
blocking = false;
progress |= aio_dispatch_handlers(ctx, events[ret - WAIT_OBJECT_0]);
progress |= aio_dispatch_handlers(ctx, event);
/* Try again, but only call each handler once. */
events[ret - WAIT_OBJECT_0] = events[--count];

View File

@ -10,7 +10,6 @@ block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
block-obj-$(CONFIG_POSIX) += raw-posix.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
ifeq ($(CONFIG_POSIX),y)
block-obj-y += nbd.o nbd-client.o sheepdog.o
block-obj-$(CONFIG_LIBISCSI) += iscsi.o
block-obj-$(CONFIG_LIBNFS) += nfs.o
@ -19,7 +18,6 @@ block-obj-$(CONFIG_RBD) += rbd.o
block-obj-$(CONFIG_GLUSTERFS) += gluster.o
block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o
block-obj-$(CONFIG_LIBSSH2) += ssh.o
endif
common-obj-y += stream.o
common-obj-y += commit.o

View File

@ -239,7 +239,6 @@ bool aio_dispatch(AioContext *ctx);
*/
bool aio_poll(AioContext *ctx, bool blocking);
#ifdef CONFIG_POSIX
/* Register a file descriptor and associated callbacks. Behaves very similarly
* to qemu_set_fd_handler2. Unlike qemu_set_fd_handler2, these callbacks will
* be invoked when using aio_poll().
@ -252,7 +251,6 @@ void aio_set_fd_handler(AioContext *ctx,
IOHandler *io_read,
IOHandler *io_write,
void *opaque);
#endif
/* Register an event notifier and associated callbacks. Behaves very similarly
* to event_notifier_set_handler. Unlike event_notifier_set_handler, these callbacks