diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index d7b9605fd51d..c7dfcb8a1fb2 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -76,6 +76,7 @@ enum ceph_msg_data_type { #ifdef CONFIG_BLOCK CEPH_MSG_DATA_BIO, /* data source/destination is a bio list */ #endif /* CONFIG_BLOCK */ + CEPH_MSG_DATA_BVECS, /* data source/destination is a bio_vec array */ }; static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) @@ -87,6 +88,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) #ifdef CONFIG_BLOCK case CEPH_MSG_DATA_BIO: #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_BVECS: return true; default: return false; @@ -139,6 +141,42 @@ struct ceph_bio_iter { #endif /* CONFIG_BLOCK */ +struct ceph_bvec_iter { + struct bio_vec *bvecs; + struct bvec_iter iter; +}; + +#define __ceph_bvec_iter_advance_step(it, n, STEP) do { \ + BUG_ON((n) > (it)->iter.bi_size); \ + (void)(STEP); \ + bvec_iter_advance((it)->bvecs, &(it)->iter, (n)); \ +} while (0) + +/* + * Advance @it by @n bytes. + */ +#define ceph_bvec_iter_advance(it, n) \ + __ceph_bvec_iter_advance_step(it, n, 0) + +/* + * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec. + */ +#define ceph_bvec_iter_advance_step(it, n, BVEC_STEP) \ + __ceph_bvec_iter_advance_step(it, n, ({ \ + struct bio_vec bv; \ + struct bvec_iter __cur_iter; \ + \ + __cur_iter = (it)->iter; \ + __cur_iter.bi_size = (n); \ + for_each_bvec(bv, (it)->bvecs, __cur_iter, __cur_iter) \ + (void)(BVEC_STEP); \ + })) + +#define ceph_bvec_iter_shorten(it, n) do { \ + BUG_ON((n) > (it)->iter.bi_size); \ + (it)->iter.bi_size = (n); \ +} while (0) + struct ceph_msg_data { struct list_head links; /* ceph_msg->data */ enum ceph_msg_data_type type; @@ -149,6 +187,7 @@ struct ceph_msg_data { u32 bio_length; }; #endif /* CONFIG_BLOCK */ + struct ceph_bvec_iter bvec_pos; struct { struct page **pages; /* NOT OWNER. */ size_t length; /* total # bytes */ @@ -170,6 +209,7 @@ struct ceph_msg_data_cursor { #ifdef CONFIG_BLOCK struct ceph_bio_iter bio_iter; #endif /* CONFIG_BLOCK */ + struct bvec_iter bvec_iter; struct { /* pages */ unsigned int page_offset; /* offset in page */ unsigned short page_index; /* index in array */ @@ -336,6 +376,8 @@ extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg, void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, u32 length); #endif /* CONFIG_BLOCK */ +void ceph_msg_data_add_bvecs(struct ceph_msg *msg, + struct ceph_bvec_iter *bvec_pos); extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, bool can_fail); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 315691490cb0..528ccc943cee 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -57,6 +57,7 @@ enum ceph_osd_data_type { #ifdef CONFIG_BLOCK CEPH_OSD_DATA_TYPE_BIO, #endif /* CONFIG_BLOCK */ + CEPH_OSD_DATA_TYPE_BVECS, }; struct ceph_osd_data { @@ -76,6 +77,7 @@ struct ceph_osd_data { u32 bio_length; }; #endif /* CONFIG_BLOCK */ + struct ceph_bvec_iter bvec_pos; }; }; @@ -410,6 +412,9 @@ void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, struct ceph_bio_iter *bio_pos, u32 bio_length); #endif /* CONFIG_BLOCK */ +void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, + unsigned int which, + struct ceph_bvec_iter *bvec_pos); extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *, unsigned int which, @@ -419,6 +424,9 @@ extern void osd_req_op_cls_request_data_pages(struct ceph_osd_request *, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages); +void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req, + unsigned int which, + struct bio_vec *bvecs, u32 bytes); extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *, unsigned int which, struct page **pages, u64 length, diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index b9fa8b869c08..91a57857cf11 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -894,6 +894,58 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor, } #endif /* CONFIG_BLOCK */ +static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor, + size_t length) +{ + struct ceph_msg_data *data = cursor->data; + struct bio_vec *bvecs = data->bvec_pos.bvecs; + + cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size); + cursor->bvec_iter = data->bvec_pos.iter; + cursor->bvec_iter.bi_size = cursor->resid; + + BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter)); + cursor->last_piece = + cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter); +} + +static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor, + size_t *page_offset, + size_t *length) +{ + struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs, + cursor->bvec_iter); + + *page_offset = bv.bv_offset; + *length = bv.bv_len; + return bv.bv_page; +} + +static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor, + size_t bytes) +{ + struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs; + + BUG_ON(bytes > cursor->resid); + BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter)); + cursor->resid -= bytes; + bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes); + + if (!cursor->resid) { + BUG_ON(!cursor->last_piece); + return false; /* no more data */ + } + + if (!bytes || cursor->bvec_iter.bi_bvec_done) + return false; /* more bytes to process in this segment */ + + BUG_ON(cursor->last_piece); + BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter)); + cursor->last_piece = + cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter); + return true; +} + /* * For a page array, a piece comes from the first page in the array * that has not already been fully consumed. @@ -1077,6 +1129,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) ceph_msg_data_bio_cursor_init(cursor, length); break; #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_BVECS: + ceph_msg_data_bvecs_cursor_init(cursor, length); + break; case CEPH_MSG_DATA_NONE: default: /* BUG(); */ @@ -1125,6 +1180,9 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, page = ceph_msg_data_bio_next(cursor, page_offset, length); break; #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_BVECS: + page = ceph_msg_data_bvecs_next(cursor, page_offset, length); + break; case CEPH_MSG_DATA_NONE: default: page = NULL; @@ -1163,6 +1221,9 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, new_piece = ceph_msg_data_bio_advance(cursor, bytes); break; #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_BVECS: + new_piece = ceph_msg_data_bvecs_advance(cursor, bytes); + break; case CEPH_MSG_DATA_NONE: default: BUG(); @@ -3247,6 +3308,20 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, EXPORT_SYMBOL(ceph_msg_data_add_bio); #endif /* CONFIG_BLOCK */ +void ceph_msg_data_add_bvecs(struct ceph_msg *msg, + struct ceph_bvec_iter *bvec_pos) +{ + struct ceph_msg_data *data; + + data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS); + BUG_ON(!data); + data->bvec_pos = *bvec_pos; + + list_add_tail(&data->links, &msg->data); + msg->data_length += bvec_pos->iter.bi_size; +} +EXPORT_SYMBOL(ceph_msg_data_add_bvecs); + /* * construct a new message with given type, size * the new msg has a ref count of 1. diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 339d8773ebe8..407be0533c18 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -155,6 +155,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, } #endif /* CONFIG_BLOCK */ +static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data, + struct ceph_bvec_iter *bvec_pos) +{ + osd_data->type = CEPH_OSD_DATA_TYPE_BVECS; + osd_data->bvec_pos = *bvec_pos; +} + #define osd_req_op_data(oreq, whch, typ, fld) \ ({ \ struct ceph_osd_request *__oreq = (oreq); \ @@ -229,6 +236,17 @@ void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); #endif /* CONFIG_BLOCK */ +void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, + unsigned int which, + struct ceph_bvec_iter *bvec_pos) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_data(osd_req, which, extent, osd_data); + ceph_osd_data_bvecs_init(osd_data, bvec_pos); +} +EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos); + static void osd_req_op_cls_request_info_pagelist( struct ceph_osd_request *osd_req, unsigned int which, struct ceph_pagelist *pagelist) @@ -266,6 +284,23 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); +void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req, + unsigned int which, + struct bio_vec *bvecs, u32 bytes) +{ + struct ceph_osd_data *osd_data; + struct ceph_bvec_iter it = { + .bvecs = bvecs, + .iter = { .bi_size = bytes }, + }; + + osd_data = osd_req_op_data(osd_req, which, cls, request_data); + ceph_osd_data_bvecs_init(osd_data, &it); + osd_req->r_ops[which].cls.indata_len += bytes; + osd_req->r_ops[which].indata_len += bytes; +} +EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs); + void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, unsigned int which, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages) @@ -291,6 +326,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) case CEPH_OSD_DATA_TYPE_BIO: return (u64)osd_data->bio_length; #endif /* CONFIG_BLOCK */ + case CEPH_OSD_DATA_TYPE_BVECS: + return osd_data->bvec_pos.iter.bi_size; default: WARN(true, "unrecognized data type %d\n", (int)osd_data->type); return 0; @@ -831,6 +868,8 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length); #endif + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) { + ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos); } else { BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); }