nbd: switch to asynchronous operation

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
This commit is contained in:
Paolo Bonzini 2011-09-08 14:28:59 +02:00
parent 8c5135f90e
commit ae255e523c
2 changed files with 131 additions and 65 deletions

View File

@ -47,13 +47,17 @@
#endif
typedef struct BDRVNBDState {
CoMutex lock;
int sock;
uint32_t nbdflags;
off_t size;
size_t blocksize;
char *export_name; /* An NBD server may export several devices */
CoMutex mutex;
Coroutine *coroutine;
struct nbd_reply reply;
/* If it begins with '/', this is a UNIX domain socket. Otherwise,
* it's a string of the form <hostname|ip4|\[ip6\]>:port
*/
@ -106,6 +110,95 @@ out:
return err;
}
static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request)
{
qemu_co_mutex_lock(&s->mutex);
s->coroutine = qemu_coroutine_self();
request->handle = (uint64_t)(intptr_t)s;
}
static int nbd_have_request(void *opaque)
{
BDRVNBDState *s = opaque;
return !!s->coroutine;
}
static void nbd_reply_ready(void *opaque)
{
BDRVNBDState *s = opaque;
if (s->reply.handle == 0) {
/* No reply already in flight. Fetch a header. */
if (nbd_receive_reply(s->sock, &s->reply) < 0) {
s->reply.handle = 0;
}
}
/* There's no need for a mutex on the receive side, because the
* handler acts as a synchronization point and ensures that only
* one coroutine is called until the reply finishes. */
if (s->coroutine) {
qemu_coroutine_enter(s->coroutine, NULL);
}
}
static void nbd_restart_write(void *opaque)
{
BDRVNBDState *s = opaque;
qemu_coroutine_enter(s->coroutine, NULL);
}
static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request,
struct iovec *iov, int offset)
{
int rc, ret;
qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write,
nbd_have_request, NULL, s);
rc = nbd_send_request(s->sock, request);
if (rc != -1 && iov) {
ret = qemu_co_sendv(s->sock, iov, request->len, offset);
if (ret != request->len) {
errno = -EIO;
rc = -1;
}
}
qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
nbd_have_request, NULL, s);
return rc;
}
static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request,
struct nbd_reply *reply,
struct iovec *iov, int offset)
{
int ret;
/* Wait until we're woken up by the read handler. */
qemu_coroutine_yield();
*reply = s->reply;
if (reply->handle != request->handle) {
reply->error = EIO;
} else {
if (iov && reply->error == 0) {
ret = qemu_co_recvv(s->sock, iov, request->len, offset);
if (ret != request->len) {
reply->error = EIO;
}
}
/* Tell the read handler to read another header. */
s->reply.handle = 0;
}
}
static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request)
{
s->coroutine = NULL;
qemu_co_mutex_unlock(&s->mutex);
}
static int nbd_establish_connection(BlockDriverState *bs)
{
BDRVNBDState *s = bs->opaque;
@ -135,8 +228,11 @@ static int nbd_establish_connection(BlockDriverState *bs)
return -errno;
}
/* Now that we're connected, set the socket to be non-blocking */
/* Now that we're connected, set the socket to be non-blocking and
* kick the reply mechanism. */
socket_set_nonblock(sock);
qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
nbd_have_request, NULL, s);
s->sock = sock;
s->size = size;
@ -152,11 +248,11 @@ static void nbd_teardown_connection(BlockDriverState *bs)
struct nbd_request request;
request.type = NBD_CMD_DISC;
request.handle = (uint64_t)(intptr_t)bs;
request.from = 0;
request.len = 0;
nbd_send_request(s->sock, &request);
qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
closesocket(s->sock);
}
@ -165,6 +261,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
BDRVNBDState *s = bs->opaque;
int result;
qemu_co_mutex_init(&s->mutex);
/* Pop the config into our state object. Exit if invalid. */
result = nbd_config(s, filename, flags);
if (result != 0) {
@ -176,90 +274,50 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
*/
result = nbd_establish_connection(bs);
qemu_co_mutex_init(&s->lock);
return result;
}
static int nbd_read(BlockDriverState *bs, int64_t sector_num,
uint8_t *buf, int nb_sectors)
static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num,
int nb_sectors, QEMUIOVector *qiov)
{
BDRVNBDState *s = bs->opaque;
struct nbd_request request;
struct nbd_reply reply;
request.type = NBD_CMD_READ;
request.handle = (uint64_t)(intptr_t)bs;
request.from = sector_num * 512;
request.len = nb_sectors * 512;
if (nbd_send_request(s->sock, &request) == -1)
return -errno;
if (nbd_receive_reply(s->sock, &reply) == -1)
return -errno;
if (reply.error !=0)
nbd_coroutine_start(s, &request);
if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
reply.error = errno;
} else {
nbd_co_receive_reply(s, &request, &reply, qiov->iov, 0);
}
nbd_coroutine_end(s, &request);
return -reply.error;
if (reply.handle != request.handle)
return -EIO;
if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
return -EIO;
return 0;
}
static int nbd_write(BlockDriverState *bs, int64_t sector_num,
const uint8_t *buf, int nb_sectors)
static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num,
int nb_sectors, QEMUIOVector *qiov)
{
BDRVNBDState *s = bs->opaque;
struct nbd_request request;
struct nbd_reply reply;
request.type = NBD_CMD_WRITE;
request.handle = (uint64_t)(intptr_t)bs;
request.from = sector_num * 512;
request.len = nb_sectors * 512;
if (nbd_send_request(s->sock, &request) == -1)
return -errno;
if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
return -EIO;
if (nbd_receive_reply(s->sock, &reply) == -1)
return -errno;
if (reply.error !=0)
nbd_coroutine_start(s, &request);
if (nbd_co_send_request(s, &request, qiov->iov, 0) == -1) {
reply.error = errno;
} else {
nbd_co_receive_reply(s, &request, &reply, NULL, 0);
}
nbd_coroutine_end(s, &request);
return -reply.error;
if (reply.handle != request.handle)
return -EIO;
return 0;
}
static coroutine_fn int nbd_co_read(BlockDriverState *bs, int64_t sector_num,
uint8_t *buf, int nb_sectors)
{
int ret;
BDRVNBDState *s = bs->opaque;
qemu_co_mutex_lock(&s->lock);
ret = nbd_read(bs, sector_num, buf, nb_sectors);
qemu_co_mutex_unlock(&s->lock);
return ret;
}
static coroutine_fn int nbd_co_write(BlockDriverState *bs, int64_t sector_num,
const uint8_t *buf, int nb_sectors)
{
int ret;
BDRVNBDState *s = bs->opaque;
qemu_co_mutex_lock(&s->lock);
ret = nbd_write(bs, sector_num, buf, nb_sectors);
qemu_co_mutex_unlock(&s->lock);
return ret;
}
static void nbd_close(BlockDriverState *bs)
@ -282,8 +340,8 @@ static BlockDriver bdrv_nbd = {
.format_name = "nbd",
.instance_size = sizeof(BDRVNBDState),
.bdrv_file_open = nbd_open,
.bdrv_read = nbd_co_read,
.bdrv_write = nbd_co_write,
.bdrv_co_readv = nbd_co_readv,
.bdrv_co_writev = nbd_co_writev,
.bdrv_close = nbd_close,
.bdrv_getlength = nbd_getlength,
.protocol_name = "nbd",

8
nbd.c
View File

@ -81,6 +81,14 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
{
size_t offset = 0;
if (qemu_in_coroutine()) {
if (do_read) {
return qemu_co_recv(fd, buffer, size);
} else {
return qemu_co_send(fd, buffer, size);
}
}
while (offset < size) {
ssize_t len;