diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c index f6242f9338..0dcf2ef57a 100644 --- a/tools/virtiofsd/fuse_virtio.c +++ b/tools/virtiofsd/fuse_virtio.c @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -37,18 +38,29 @@ struct fv_VuDev; struct fv_QueueInfo { pthread_t thread; + /* + * This lock protects the VuVirtq preventing races between + * fv_queue_thread() and fv_queue_worker(). + */ + pthread_mutex_t vq_lock; + struct fv_VuDev *virtio_dev; /* Our queue index, corresponds to array position */ int qidx; int kick_fd; int kill_fd; /* For killing the thread */ - - /* The element for the command currently being processed */ - VuVirtqElement *qe; - bool reply_sent; }; +/* A FUSE request */ +typedef struct { + VuVirtqElement elem; + struct fuse_chan ch; + + /* Used to complete requests that involve no reply */ + bool reply_sent; +} FVRequest; + /* * We pass the dev element into libvhost-user * and then use it to get back to the outer @@ -191,8 +203,11 @@ static void copy_iov(struct iovec *src_iov, int src_count, int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch, struct iovec *iov, int count) { - VuVirtqElement *elem; - VuVirtq *q; + FVRequest *req = container_of(ch, FVRequest, ch); + struct fv_QueueInfo *qi = ch->qi; + VuDev *dev = &se->virtio_dev->dev; + VuVirtq *q = vu_get_queue(dev, qi->qidx); + VuVirtqElement *elem = &req->elem; int ret = 0; assert(count >= 1); @@ -205,11 +220,7 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch, /* unique == 0 is notification, which we don't support */ assert(out->unique); - /* For virtio we always have ch */ - assert(ch); - assert(!ch->qi->reply_sent); - elem = ch->qi->qe; - q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx]; + assert(!req->reply_sent); /* The 'in' part of the elem is to qemu */ unsigned int in_num = elem->in_num; @@ -236,9 +247,15 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch, } copy_iov(iov, count, in_sg, in_num, tosend_len); - vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len); - vu_queue_notify(&se->virtio_dev->dev, q); - ch->qi->reply_sent = true; + + pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock); + pthread_mutex_lock(&qi->vq_lock); + vu_queue_push(dev, q, elem, tosend_len); + vu_queue_notify(dev, q); + pthread_mutex_unlock(&qi->vq_lock); + pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock); + + req->reply_sent = true; err: return ret; @@ -254,9 +271,12 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch, struct iovec *iov, int count, struct fuse_bufvec *buf, size_t len) { + FVRequest *req = container_of(ch, FVRequest, ch); + struct fv_QueueInfo *qi = ch->qi; + VuDev *dev = &se->virtio_dev->dev; + VuVirtq *q = vu_get_queue(dev, qi->qidx); + VuVirtqElement *elem = &req->elem; int ret = 0; - VuVirtqElement *elem; - VuVirtq *q; assert(count >= 1); assert(iov[0].iov_len >= sizeof(struct fuse_out_header)); @@ -275,11 +295,7 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch, /* unique == 0 is notification which we don't support */ assert(out->unique); - /* For virtio we always have ch */ - assert(ch); - assert(!ch->qi->reply_sent); - elem = ch->qi->qe; - q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx]; + assert(!req->reply_sent); /* The 'in' part of the elem is to qemu */ unsigned int in_num = elem->in_num; @@ -395,33 +411,175 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch, ret = 0; - vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len); - vu_queue_notify(&se->virtio_dev->dev, q); + pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock); + pthread_mutex_lock(&qi->vq_lock); + vu_queue_push(dev, q, elem, tosend_len); + vu_queue_notify(dev, q); + pthread_mutex_unlock(&qi->vq_lock); + pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock); err: if (ret == 0) { - ch->qi->reply_sent = true; + req->reply_sent = true; } return ret; } +/* Process one FVRequest in a thread pool */ +static void fv_queue_worker(gpointer data, gpointer user_data) +{ + struct fv_QueueInfo *qi = user_data; + struct fuse_session *se = qi->virtio_dev->se; + struct VuDev *dev = &qi->virtio_dev->dev; + FVRequest *req = data; + VuVirtqElement *elem = &req->elem; + struct fuse_buf fbuf = {}; + bool allocated_bufv = false; + struct fuse_bufvec bufv; + struct fuse_bufvec *pbufv; + + assert(se->bufsize > sizeof(struct fuse_in_header)); + + /* + * An element contains one request and the space to send our response + * They're spread over multiple descriptors in a scatter/gather set + * and we can't trust the guest to keep them still; so copy in/out. + */ + fbuf.mem = malloc(se->bufsize); + assert(fbuf.mem); + + fuse_mutex_init(&req->ch.lock); + req->ch.fd = -1; + req->ch.qi = qi; + + /* The 'out' part of the elem is from qemu */ + unsigned int out_num = elem->out_num; + struct iovec *out_sg = elem->out_sg; + size_t out_len = iov_size(out_sg, out_num); + fuse_log(FUSE_LOG_DEBUG, + "%s: elem %d: with %d out desc of length %zd\n", + __func__, elem->index, out_num, out_len); + + /* + * The elem should contain a 'fuse_in_header' (in to fuse) + * plus the data based on the len in the header. + */ + if (out_len < sizeof(struct fuse_in_header)) { + fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n", + __func__, elem->index); + assert(0); /* TODO */ + } + if (out_len > se->bufsize) { + fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__, + elem->index); + assert(0); /* TODO */ + } + /* Copy just the first element and look at it */ + copy_from_iov(&fbuf, 1, out_sg); + + pbufv = NULL; /* Compiler thinks an unitialised path */ + if (out_num > 2 && + out_sg[0].iov_len == sizeof(struct fuse_in_header) && + ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE && + out_sg[1].iov_len == sizeof(struct fuse_write_in)) { + /* + * For a write we don't actually need to copy the + * data, we can just do it straight out of guest memory + * but we must still copy the headers in case the guest + * was nasty and changed them while we were using them. + */ + fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__); + + /* copy the fuse_write_in header afte rthe fuse_in_header */ + fbuf.mem += out_sg->iov_len; + copy_from_iov(&fbuf, 1, out_sg + 1); + fbuf.mem -= out_sg->iov_len; + fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len; + + /* Allocate the bufv, with space for the rest of the iov */ + pbufv = malloc(sizeof(struct fuse_bufvec) + + sizeof(struct fuse_buf) * (out_num - 2)); + if (!pbufv) { + fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n", + __func__); + goto out; + } + + allocated_bufv = true; + pbufv->count = 1; + pbufv->buf[0] = fbuf; + + size_t iovindex, pbufvindex; + iovindex = 2; /* 2 headers, separate iovs */ + pbufvindex = 1; /* 2 headers, 1 fusebuf */ + + for (; iovindex < out_num; iovindex++, pbufvindex++) { + pbufv->count++; + pbufv->buf[pbufvindex].pos = ~0; /* Dummy */ + pbufv->buf[pbufvindex].flags = 0; + pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base; + pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len; + } + } else { + /* Normal (non fast write) path */ + + /* Copy the rest of the buffer */ + fbuf.mem += out_sg->iov_len; + copy_from_iov(&fbuf, out_num - 1, out_sg + 1); + fbuf.mem -= out_sg->iov_len; + fbuf.size = out_len; + + /* TODO! Endianness of header */ + + /* TODO: Add checks for fuse_session_exited */ + bufv.buf[0] = fbuf; + bufv.count = 1; + pbufv = &bufv; + } + pbufv->idx = 0; + pbufv->off = 0; + fuse_session_process_buf_int(se, pbufv, &req->ch); + +out: + if (allocated_bufv) { + free(pbufv); + } + + /* If the request has no reply, still recycle the virtqueue element */ + if (!req->reply_sent) { + struct VuVirtq *q = vu_get_queue(dev, qi->qidx); + + fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__, + elem->index); + + pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock); + pthread_mutex_lock(&qi->vq_lock); + vu_queue_push(dev, q, elem, 0); + vu_queue_notify(dev, q); + pthread_mutex_unlock(&qi->vq_lock); + pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock); + } + + pthread_mutex_destroy(&req->ch.lock); + free(fbuf.mem); + free(req); +} + /* Thread function for individual queues, created when a queue is 'started' */ static void *fv_queue_thread(void *opaque) { struct fv_QueueInfo *qi = opaque; struct VuDev *dev = &qi->virtio_dev->dev; struct VuVirtq *q = vu_get_queue(dev, qi->qidx); - struct fuse_session *se = qi->virtio_dev->se; - struct fuse_chan ch; - struct fuse_buf fbuf; + GThreadPool *pool; - fbuf.mem = NULL; - fbuf.flags = 0; - - fuse_mutex_init(&ch.lock); - ch.fd = (int)0xdaff0d111; - ch.qi = qi; + pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */, + TRUE, NULL); + if (!pool) { + fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__); + return NULL; + } fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__, qi->qidx, qi->kick_fd); @@ -478,6 +636,7 @@ static void *fv_queue_thread(void *opaque) /* Mutual exclusion with virtio_loop() */ ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock); assert(ret == 0); /* there is no possible error case */ + pthread_mutex_lock(&qi->vq_lock); /* out is from guest, in is too guest */ unsigned int in_bytes, out_bytes; vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0); @@ -486,141 +645,22 @@ static void *fv_queue_thread(void *opaque) "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n", __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes); - while (1) { - bool allocated_bufv = false; - struct fuse_bufvec bufv; - struct fuse_bufvec *pbufv; - - /* - * An element contains one request and the space to send our - * response They're spread over multiple descriptors in a - * scatter/gather set and we can't trust the guest to keep them - * still; so copy in/out. - */ - VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement)); - if (!elem) { + FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest)); + if (!req) { break; } - qi->qe = elem; - qi->reply_sent = false; + req->reply_sent = false; - if (!fbuf.mem) { - fbuf.mem = malloc(se->bufsize); - assert(fbuf.mem); - assert(se->bufsize > sizeof(struct fuse_in_header)); - } - /* The 'out' part of the elem is from qemu */ - unsigned int out_num = elem->out_num; - struct iovec *out_sg = elem->out_sg; - size_t out_len = iov_size(out_sg, out_num); - fuse_log(FUSE_LOG_DEBUG, - "%s: elem %d: with %d out desc of length %zd\n", __func__, - elem->index, out_num, out_len); - - /* - * The elem should contain a 'fuse_in_header' (in to fuse) - * plus the data based on the len in the header. - */ - if (out_len < sizeof(struct fuse_in_header)) { - fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n", - __func__, elem->index); - assert(0); /* TODO */ - } - if (out_len > se->bufsize) { - fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", - __func__, elem->index); - assert(0); /* TODO */ - } - /* Copy just the first element and look at it */ - copy_from_iov(&fbuf, 1, out_sg); - - if (out_num > 2 && - out_sg[0].iov_len == sizeof(struct fuse_in_header) && - ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE && - out_sg[1].iov_len == sizeof(struct fuse_write_in)) { - /* - * For a write we don't actually need to copy the - * data, we can just do it straight out of guest memory - * but we must still copy the headers in case the guest - * was nasty and changed them while we were using them. - */ - fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__); - - /* copy the fuse_write_in header after the fuse_in_header */ - fbuf.mem += out_sg->iov_len; - copy_from_iov(&fbuf, 1, out_sg + 1); - fbuf.mem -= out_sg->iov_len; - fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len; - - /* Allocate the bufv, with space for the rest of the iov */ - allocated_bufv = true; - pbufv = malloc(sizeof(struct fuse_bufvec) + - sizeof(struct fuse_buf) * (out_num - 2)); - if (!pbufv) { - vu_queue_unpop(dev, q, elem, 0); - free(elem); - fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n", - __func__); - goto out; - } - - pbufv->count = 1; - pbufv->buf[0] = fbuf; - - size_t iovindex, pbufvindex; - iovindex = 2; /* 2 headers, separate iovs */ - pbufvindex = 1; /* 2 headers, 1 fusebuf */ - - for (; iovindex < out_num; iovindex++, pbufvindex++) { - pbufv->count++; - pbufv->buf[pbufvindex].pos = ~0; /* Dummy */ - pbufv->buf[pbufvindex].flags = 0; - pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base; - pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len; - } - } else { - /* Normal (non fast write) path */ - - /* Copy the rest of the buffer */ - fbuf.mem += out_sg->iov_len; - copy_from_iov(&fbuf, out_num - 1, out_sg + 1); - fbuf.mem -= out_sg->iov_len; - fbuf.size = out_len; - - /* TODO! Endianness of header */ - - /* TODO: Add checks for fuse_session_exited */ - bufv.buf[0] = fbuf; - bufv.count = 1; - pbufv = &bufv; - } - pbufv->idx = 0; - pbufv->off = 0; - fuse_session_process_buf_int(se, pbufv, &ch); - - if (allocated_bufv) { - free(pbufv); - } - - if (!qi->reply_sent) { - fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", - __func__, elem->index); - /* I think we've still got to recycle the element */ - vu_queue_push(dev, q, elem, 0); - vu_queue_notify(dev, q); - } - qi->qe = NULL; - free(elem); - elem = NULL; + g_thread_pool_push(pool, req, NULL); } + pthread_mutex_unlock(&qi->vq_lock); pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock); } -out: - pthread_mutex_destroy(&ch.lock); - free(fbuf.mem); + + g_thread_pool_free(pool, FALSE, TRUE); return NULL; } @@ -643,6 +683,7 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx) fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n", __func__, qidx, ret); } + pthread_mutex_destroy(&ourqi->vq_lock); close(ourqi->kill_fd); ourqi->kick_fd = -1; free(vud->qi[qidx]); @@ -696,6 +737,8 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started) ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE); assert(ourqi->kill_fd != -1); + pthread_mutex_init(&ourqi->vq_lock, NULL); + if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) { fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n", __func__, qidx);