quorum: Implement .bdrv_co_readv/writev
This converts the quorum block driver from implementing callback-based interfaces for read/write to coroutine-based ones. This is the first step that will allow us further simplification of the code. Signed-off-by: Kevin Wolf <kwolf@redhat.com> Reviewed-by: Eric Blake <eblake@redhat.com> Reviewed-by: Alberto Garcia <berto@igalia.com>
This commit is contained in:
parent
10c8551968
commit
ce15dc08ef
194
block/quorum.c
194
block/quorum.c
|
@ -97,7 +97,7 @@ typedef struct QuorumAIOCB QuorumAIOCB;
|
||||||
* $children_count QuorumChildRequest.
|
* $children_count QuorumChildRequest.
|
||||||
*/
|
*/
|
||||||
typedef struct QuorumChildRequest {
|
typedef struct QuorumChildRequest {
|
||||||
BlockAIOCB *aiocb;
|
BlockDriverState *bs;
|
||||||
QEMUIOVector qiov;
|
QEMUIOVector qiov;
|
||||||
uint8_t *buf;
|
uint8_t *buf;
|
||||||
int ret;
|
int ret;
|
||||||
|
@ -110,7 +110,8 @@ typedef struct QuorumChildRequest {
|
||||||
* used to do operations on each children and track overall progress.
|
* used to do operations on each children and track overall progress.
|
||||||
*/
|
*/
|
||||||
struct QuorumAIOCB {
|
struct QuorumAIOCB {
|
||||||
BlockAIOCB common;
|
BlockDriverState *bs;
|
||||||
|
Coroutine *co;
|
||||||
|
|
||||||
/* Request metadata */
|
/* Request metadata */
|
||||||
uint64_t sector_num;
|
uint64_t sector_num;
|
||||||
|
@ -129,36 +130,23 @@ struct QuorumAIOCB {
|
||||||
QuorumVotes votes;
|
QuorumVotes votes;
|
||||||
|
|
||||||
bool is_read;
|
bool is_read;
|
||||||
|
bool has_completed;
|
||||||
int vote_ret;
|
int vote_ret;
|
||||||
int children_read; /* how many children have been read from */
|
int children_read; /* how many children have been read from */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct QuorumCo {
|
||||||
|
QuorumAIOCB *acb;
|
||||||
|
int idx;
|
||||||
|
} QuorumCo;
|
||||||
|
|
||||||
static bool quorum_vote(QuorumAIOCB *acb);
|
static bool quorum_vote(QuorumAIOCB *acb);
|
||||||
|
|
||||||
static void quorum_aio_cancel(BlockAIOCB *blockacb)
|
|
||||||
{
|
|
||||||
QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
|
|
||||||
BDRVQuorumState *s = acb->common.bs->opaque;
|
|
||||||
int i;
|
|
||||||
|
|
||||||
/* cancel all callbacks */
|
|
||||||
for (i = 0; i < s->num_children; i++) {
|
|
||||||
if (acb->qcrs[i].aiocb) {
|
|
||||||
bdrv_aio_cancel_async(acb->qcrs[i].aiocb);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static AIOCBInfo quorum_aiocb_info = {
|
|
||||||
.aiocb_size = sizeof(QuorumAIOCB),
|
|
||||||
.cancel_async = quorum_aio_cancel,
|
|
||||||
};
|
|
||||||
|
|
||||||
static void quorum_aio_finalize(QuorumAIOCB *acb)
|
static void quorum_aio_finalize(QuorumAIOCB *acb)
|
||||||
{
|
{
|
||||||
acb->common.cb(acb->common.opaque, acb->vote_ret);
|
acb->has_completed = true;
|
||||||
g_free(acb->qcrs);
|
g_free(acb->qcrs);
|
||||||
qemu_aio_unref(acb);
|
qemu_coroutine_enter_if_inactive(acb->co);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
|
static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
|
||||||
|
@ -174,14 +162,14 @@ static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
|
||||||
static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
|
static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
|
||||||
QEMUIOVector *qiov,
|
QEMUIOVector *qiov,
|
||||||
uint64_t sector_num,
|
uint64_t sector_num,
|
||||||
int nb_sectors,
|
int nb_sectors)
|
||||||
BlockCompletionFunc *cb,
|
|
||||||
void *opaque)
|
|
||||||
{
|
{
|
||||||
BDRVQuorumState *s = bs->opaque;
|
BDRVQuorumState *s = bs->opaque;
|
||||||
QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
|
QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
|
acb->co = qemu_coroutine_self();
|
||||||
|
acb->bs = bs;
|
||||||
acb->sector_num = sector_num;
|
acb->sector_num = sector_num;
|
||||||
acb->nb_sectors = nb_sectors;
|
acb->nb_sectors = nb_sectors;
|
||||||
acb->qiov = qiov;
|
acb->qiov = qiov;
|
||||||
|
@ -191,6 +179,7 @@ static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
|
||||||
acb->rewrite_count = 0;
|
acb->rewrite_count = 0;
|
||||||
acb->votes.compare = quorum_sha256_compare;
|
acb->votes.compare = quorum_sha256_compare;
|
||||||
QLIST_INIT(&acb->votes.vote_list);
|
QLIST_INIT(&acb->votes.vote_list);
|
||||||
|
acb->has_completed = false;
|
||||||
acb->is_read = false;
|
acb->is_read = false;
|
||||||
acb->vote_ret = 0;
|
acb->vote_ret = 0;
|
||||||
|
|
||||||
|
@ -217,7 +206,7 @@ static void quorum_report_bad(QuorumOpType type, uint64_t sector_num,
|
||||||
|
|
||||||
static void quorum_report_failure(QuorumAIOCB *acb)
|
static void quorum_report_failure(QuorumAIOCB *acb)
|
||||||
{
|
{
|
||||||
const char *reference = bdrv_get_device_or_node_name(acb->common.bs);
|
const char *reference = bdrv_get_device_or_node_name(acb->bs);
|
||||||
qapi_event_send_quorum_failure(reference, acb->sector_num,
|
qapi_event_send_quorum_failure(reference, acb->sector_num,
|
||||||
acb->nb_sectors, &error_abort);
|
acb->nb_sectors, &error_abort);
|
||||||
}
|
}
|
||||||
|
@ -226,7 +215,7 @@ static int quorum_vote_error(QuorumAIOCB *acb);
|
||||||
|
|
||||||
static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
|
static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
|
||||||
{
|
{
|
||||||
BDRVQuorumState *s = acb->common.bs->opaque;
|
BDRVQuorumState *s = acb->bs->opaque;
|
||||||
|
|
||||||
if (acb->success_count < s->threshold) {
|
if (acb->success_count < s->threshold) {
|
||||||
acb->vote_ret = quorum_vote_error(acb);
|
acb->vote_ret = quorum_vote_error(acb);
|
||||||
|
@ -252,7 +241,7 @@ static void quorum_rewrite_aio_cb(void *opaque, int ret)
|
||||||
quorum_aio_finalize(acb);
|
quorum_aio_finalize(acb);
|
||||||
}
|
}
|
||||||
|
|
||||||
static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb);
|
static int read_fifo_child(QuorumAIOCB *acb);
|
||||||
|
|
||||||
static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
|
static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
|
||||||
{
|
{
|
||||||
|
@ -272,14 +261,14 @@ static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret)
|
||||||
QuorumAIOCB *acb = sacb->parent;
|
QuorumAIOCB *acb = sacb->parent;
|
||||||
QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
|
QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
|
||||||
quorum_report_bad(type, acb->sector_num, acb->nb_sectors,
|
quorum_report_bad(type, acb->sector_num, acb->nb_sectors,
|
||||||
sacb->aiocb->bs->node_name, ret);
|
sacb->bs->node_name, ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void quorum_fifo_aio_cb(void *opaque, int ret)
|
static int quorum_fifo_aio_cb(void *opaque, int ret)
|
||||||
{
|
{
|
||||||
QuorumChildRequest *sacb = opaque;
|
QuorumChildRequest *sacb = opaque;
|
||||||
QuorumAIOCB *acb = sacb->parent;
|
QuorumAIOCB *acb = sacb->parent;
|
||||||
BDRVQuorumState *s = acb->common.bs->opaque;
|
BDRVQuorumState *s = acb->bs->opaque;
|
||||||
|
|
||||||
assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO);
|
assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO);
|
||||||
|
|
||||||
|
@ -288,8 +277,7 @@ static void quorum_fifo_aio_cb(void *opaque, int ret)
|
||||||
|
|
||||||
/* We try to read next child in FIFO order if we fail to read */
|
/* We try to read next child in FIFO order if we fail to read */
|
||||||
if (acb->children_read < s->num_children) {
|
if (acb->children_read < s->num_children) {
|
||||||
read_fifo_child(acb);
|
return read_fifo_child(acb);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,13 +285,14 @@ static void quorum_fifo_aio_cb(void *opaque, int ret)
|
||||||
|
|
||||||
/* FIXME: rewrite failed children if acb->children_read > 1? */
|
/* FIXME: rewrite failed children if acb->children_read > 1? */
|
||||||
quorum_aio_finalize(acb);
|
quorum_aio_finalize(acb);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void quorum_aio_cb(void *opaque, int ret)
|
static void quorum_aio_cb(void *opaque, int ret)
|
||||||
{
|
{
|
||||||
QuorumChildRequest *sacb = opaque;
|
QuorumChildRequest *sacb = opaque;
|
||||||
QuorumAIOCB *acb = sacb->parent;
|
QuorumAIOCB *acb = sacb->parent;
|
||||||
BDRVQuorumState *s = acb->common.bs->opaque;
|
BDRVQuorumState *s = acb->bs->opaque;
|
||||||
bool rewrite = false;
|
bool rewrite = false;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
|
@ -518,7 +507,7 @@ static bool quorum_compare(QuorumAIOCB *acb,
|
||||||
QEMUIOVector *a,
|
QEMUIOVector *a,
|
||||||
QEMUIOVector *b)
|
QEMUIOVector *b)
|
||||||
{
|
{
|
||||||
BDRVQuorumState *s = acb->common.bs->opaque;
|
BDRVQuorumState *s = acb->bs->opaque;
|
||||||
ssize_t offset;
|
ssize_t offset;
|
||||||
|
|
||||||
/* This driver will replace blkverify in this particular case */
|
/* This driver will replace blkverify in this particular case */
|
||||||
|
@ -538,7 +527,7 @@ static bool quorum_compare(QuorumAIOCB *acb,
|
||||||
/* Do a vote to get the error code */
|
/* Do a vote to get the error code */
|
||||||
static int quorum_vote_error(QuorumAIOCB *acb)
|
static int quorum_vote_error(QuorumAIOCB *acb)
|
||||||
{
|
{
|
||||||
BDRVQuorumState *s = acb->common.bs->opaque;
|
BDRVQuorumState *s = acb->bs->opaque;
|
||||||
QuorumVoteVersion *winner = NULL;
|
QuorumVoteVersion *winner = NULL;
|
||||||
QuorumVotes error_votes;
|
QuorumVotes error_votes;
|
||||||
QuorumVoteValue result_value;
|
QuorumVoteValue result_value;
|
||||||
|
@ -573,7 +562,7 @@ static bool quorum_vote(QuorumAIOCB *acb)
|
||||||
bool rewrite = false;
|
bool rewrite = false;
|
||||||
int i, j, ret;
|
int i, j, ret;
|
||||||
QuorumVoteValue hash;
|
QuorumVoteValue hash;
|
||||||
BDRVQuorumState *s = acb->common.bs->opaque;
|
BDRVQuorumState *s = acb->bs->opaque;
|
||||||
QuorumVoteVersion *winner;
|
QuorumVoteVersion *winner;
|
||||||
|
|
||||||
if (quorum_has_too_much_io_failed(acb)) {
|
if (quorum_has_too_much_io_failed(acb)) {
|
||||||
|
@ -649,10 +638,25 @@ free_exit:
|
||||||
return rewrite;
|
return rewrite;
|
||||||
}
|
}
|
||||||
|
|
||||||
static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb)
|
static void read_quorum_children_entry(void *opaque)
|
||||||
{
|
{
|
||||||
BDRVQuorumState *s = acb->common.bs->opaque;
|
QuorumCo *co = opaque;
|
||||||
int i;
|
QuorumAIOCB *acb = co->acb;
|
||||||
|
BDRVQuorumState *s = acb->bs->opaque;
|
||||||
|
int i = co->idx;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
acb->qcrs[i].bs = s->children[i]->bs;
|
||||||
|
ret = bdrv_co_preadv(s->children[i], acb->sector_num * BDRV_SECTOR_SIZE,
|
||||||
|
acb->nb_sectors * BDRV_SECTOR_SIZE,
|
||||||
|
&acb->qcrs[i].qiov, 0);
|
||||||
|
quorum_aio_cb(&acb->qcrs[i], ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int read_quorum_children(QuorumAIOCB *acb)
|
||||||
|
{
|
||||||
|
BDRVQuorumState *s = acb->bs->opaque;
|
||||||
|
int i, ret;
|
||||||
|
|
||||||
acb->children_read = s->num_children;
|
acb->children_read = s->num_children;
|
||||||
for (i = 0; i < s->num_children; i++) {
|
for (i = 0; i < s->num_children; i++) {
|
||||||
|
@ -662,65 +666,99 @@ static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb)
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i = 0; i < s->num_children; i++) {
|
for (i = 0; i < s->num_children; i++) {
|
||||||
acb->qcrs[i].aiocb = bdrv_aio_readv(s->children[i], acb->sector_num,
|
Coroutine *co;
|
||||||
&acb->qcrs[i].qiov, acb->nb_sectors,
|
QuorumCo data = {
|
||||||
quorum_aio_cb, &acb->qcrs[i]);
|
.acb = acb,
|
||||||
|
.idx = i,
|
||||||
|
};
|
||||||
|
|
||||||
|
co = qemu_coroutine_create(read_quorum_children_entry, &data);
|
||||||
|
qemu_coroutine_enter(co);
|
||||||
}
|
}
|
||||||
|
|
||||||
return &acb->common;
|
if (!acb->has_completed) {
|
||||||
|
qemu_coroutine_yield();
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = acb->vote_ret;
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb)
|
static int read_fifo_child(QuorumAIOCB *acb)
|
||||||
{
|
{
|
||||||
BDRVQuorumState *s = acb->common.bs->opaque;
|
BDRVQuorumState *s = acb->bs->opaque;
|
||||||
int n = acb->children_read++;
|
int n = acb->children_read++;
|
||||||
|
int ret;
|
||||||
|
|
||||||
acb->qcrs[n].aiocb = bdrv_aio_readv(s->children[n], acb->sector_num,
|
acb->qcrs[n].bs = s->children[n]->bs;
|
||||||
acb->qiov, acb->nb_sectors,
|
ret = bdrv_co_preadv(s->children[n], acb->sector_num * BDRV_SECTOR_SIZE,
|
||||||
quorum_fifo_aio_cb, &acb->qcrs[n]);
|
acb->nb_sectors * BDRV_SECTOR_SIZE, acb->qiov, 0);
|
||||||
|
ret = quorum_fifo_aio_cb(&acb->qcrs[n], ret);
|
||||||
|
|
||||||
return &acb->common;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static BlockAIOCB *quorum_aio_readv(BlockDriverState *bs,
|
static int quorum_co_readv(BlockDriverState *bs,
|
||||||
int64_t sector_num,
|
int64_t sector_num, int nb_sectors,
|
||||||
QEMUIOVector *qiov,
|
QEMUIOVector *qiov)
|
||||||
int nb_sectors,
|
|
||||||
BlockCompletionFunc *cb,
|
|
||||||
void *opaque)
|
|
||||||
{
|
{
|
||||||
BDRVQuorumState *s = bs->opaque;
|
BDRVQuorumState *s = bs->opaque;
|
||||||
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, sector_num,
|
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, sector_num, nb_sectors);
|
||||||
nb_sectors, cb, opaque);
|
int ret;
|
||||||
|
|
||||||
acb->is_read = true;
|
acb->is_read = true;
|
||||||
acb->children_read = 0;
|
acb->children_read = 0;
|
||||||
|
|
||||||
if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
|
if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
|
||||||
return read_quorum_children(acb);
|
ret = read_quorum_children(acb);
|
||||||
|
} else {
|
||||||
|
ret = read_fifo_child(acb);
|
||||||
}
|
}
|
||||||
|
g_free(acb);
|
||||||
return read_fifo_child(acb);
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs,
|
static void write_quorum_entry(void *opaque)
|
||||||
int64_t sector_num,
|
{
|
||||||
QEMUIOVector *qiov,
|
QuorumCo *co = opaque;
|
||||||
int nb_sectors,
|
QuorumAIOCB *acb = co->acb;
|
||||||
BlockCompletionFunc *cb,
|
BDRVQuorumState *s = acb->bs->opaque;
|
||||||
void *opaque)
|
int i = co->idx;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
acb->qcrs[i].bs = s->children[i]->bs;
|
||||||
|
ret = bdrv_co_pwritev(s->children[i], acb->sector_num * BDRV_SECTOR_SIZE,
|
||||||
|
acb->nb_sectors * BDRV_SECTOR_SIZE, acb->qiov, 0);
|
||||||
|
quorum_aio_cb(&acb->qcrs[i], ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int quorum_co_writev(BlockDriverState *bs,
|
||||||
|
int64_t sector_num, int nb_sectors,
|
||||||
|
QEMUIOVector *qiov)
|
||||||
{
|
{
|
||||||
BDRVQuorumState *s = bs->opaque;
|
BDRVQuorumState *s = bs->opaque;
|
||||||
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, sector_num, nb_sectors,
|
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, sector_num, nb_sectors);
|
||||||
cb, opaque);
|
int i, ret;
|
||||||
int i;
|
|
||||||
|
|
||||||
for (i = 0; i < s->num_children; i++) {
|
for (i = 0; i < s->num_children; i++) {
|
||||||
acb->qcrs[i].aiocb = bdrv_aio_writev(s->children[i], sector_num,
|
Coroutine *co;
|
||||||
qiov, nb_sectors, &quorum_aio_cb,
|
QuorumCo data = {
|
||||||
&acb->qcrs[i]);
|
.acb = acb,
|
||||||
|
.idx = i,
|
||||||
|
};
|
||||||
|
|
||||||
|
co = qemu_coroutine_create(write_quorum_entry, &data);
|
||||||
|
qemu_coroutine_enter(co);
|
||||||
}
|
}
|
||||||
|
|
||||||
return &acb->common;
|
if (!acb->has_completed) {
|
||||||
|
qemu_coroutine_yield();
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = acb->vote_ret;
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t quorum_getlength(BlockDriverState *bs)
|
static int64_t quorum_getlength(BlockDriverState *bs)
|
||||||
|
@ -1097,8 +1135,8 @@ static BlockDriver bdrv_quorum = {
|
||||||
|
|
||||||
.bdrv_getlength = quorum_getlength,
|
.bdrv_getlength = quorum_getlength,
|
||||||
|
|
||||||
.bdrv_aio_readv = quorum_aio_readv,
|
.bdrv_co_readv = quorum_co_readv,
|
||||||
.bdrv_aio_writev = quorum_aio_writev,
|
.bdrv_co_writev = quorum_co_writev,
|
||||||
|
|
||||||
.bdrv_add_child = quorum_add_child,
|
.bdrv_add_child = quorum_add_child,
|
||||||
.bdrv_del_child = quorum_del_child,
|
.bdrv_del_child = quorum_del_child,
|
||||||
|
|
Loading…
Reference in New Issue