svcrdma: Modify the RPC reply path to use FRMR when available

Use FRMR to map local RPC reply data. This allows RDMA_WRITE to send reply
data using a single WR. The FRMR is invalidated by linking the LOCAL_INV WR
to the RDMA_SEND message used to complete the reply.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
This commit is contained in:
Tom Tucker 2008-10-03 15:45:03 -05:00
parent 146b6df6a5
commit afd566ea08
2 changed files with 217 additions and 40 deletions

View File

@ -69,9 +69,127 @@
* array is only concerned with the reply we are assured that we have
* on extra page for the RPCRMDA header.
*/
static void xdr_to_sge(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr,
struct svc_rdma_req_map *vec)
int fast_reg_xdr(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr,
struct svc_rdma_req_map *vec)
{
int sge_no;
u32 sge_bytes;
u32 page_bytes;
u32 page_off;
int page_no = 0;
u8 *frva;
struct svc_rdma_fastreg_mr *frmr;
frmr = svc_rdma_get_frmr(xprt);
if (IS_ERR(frmr))
return -ENOMEM;
vec->frmr = frmr;
/* Skip the RPCRDMA header */
sge_no = 1;
/* Map the head. */
frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK);
vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
vec->count = 2;
sge_no++;
/* Build the FRMR */
frmr->kva = frva;
frmr->direction = DMA_TO_DEVICE;
frmr->access_flags = 0;
frmr->map_len = PAGE_SIZE;
frmr->page_list_len = 1;
frmr->page_list->page_list[page_no] =
ib_dma_map_single(xprt->sc_cm_id->device,
(void *)xdr->head[0].iov_base,
PAGE_SIZE, DMA_TO_DEVICE);
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
frmr->page_list->page_list[page_no]))
goto fatal_err;
atomic_inc(&xprt->sc_dma_used);
page_off = xdr->page_base;
page_bytes = xdr->page_len + page_off;
if (!page_bytes)
goto encode_tail;
/* Map the pages */
vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
vec->sge[sge_no].iov_len = page_bytes;
sge_no++;
while (page_bytes) {
struct page *page;
page = xdr->pages[page_no++];
sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
page_bytes -= sge_bytes;
frmr->page_list->page_list[page_no] =
ib_dma_map_page(xprt->sc_cm_id->device, page, 0,
PAGE_SIZE, DMA_TO_DEVICE);
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
frmr->page_list->page_list[page_no]))
goto fatal_err;
atomic_inc(&xprt->sc_dma_used);
page_off = 0; /* reset for next time through loop */
frmr->map_len += PAGE_SIZE;
frmr->page_list_len++;
}
vec->count++;
encode_tail:
/* Map tail */
if (0 == xdr->tail[0].iov_len)
goto done;
vec->count++;
vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
/*
* If head and tail use the same page, we don't need
* to map it again.
*/
vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
} else {
void *va;
/* Map another page for the tail */
page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK);
vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
frmr->page_list->page_list[page_no] =
ib_dma_map_single(xprt->sc_cm_id->device, va, PAGE_SIZE,
DMA_TO_DEVICE);
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
frmr->page_list->page_list[page_no]))
goto fatal_err;
atomic_inc(&xprt->sc_dma_used);
frmr->map_len += PAGE_SIZE;
frmr->page_list_len++;
}
done:
if (svc_rdma_fastreg(xprt, frmr))
goto fatal_err;
return 0;
fatal_err:
printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
svc_rdma_put_frmr(xprt, frmr);
return -EIO;
}
static int map_xdr(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr,
struct svc_rdma_req_map *vec)
{
int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3;
int sge_no;
@ -83,6 +201,9 @@ static void xdr_to_sge(struct svcxprt_rdma *xprt,
BUG_ON(xdr->len !=
(xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
if (xprt->sc_frmr_pg_list_len)
return fast_reg_xdr(xprt, xdr, vec);
/* Skip the first sge, this is for the RPCRDMA header */
sge_no = 1;
@ -116,9 +237,12 @@ static void xdr_to_sge(struct svcxprt_rdma *xprt,
BUG_ON(sge_no > sge_max);
vec->count = sge_no;
return 0;
}
/* Assumptions:
* - We are using FRMR
* - or -
* - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
*/
static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
@ -158,30 +282,35 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
sge_no = 0;
/* Copy the remaining SGE */
while (bc != 0 && xdr_sge_no < vec->count) {
sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
sge_bytes = min((size_t)bc,
(size_t)(vec->sge[xdr_sge_no].iov_len-sge_off));
while (bc != 0) {
sge_bytes = min_t(size_t,
bc, vec->sge[xdr_sge_no].iov_len-sge_off);
sge[sge_no].length = sge_bytes;
atomic_inc(&xprt->sc_dma_used);
sge[sge_no].addr =
ib_dma_map_single(xprt->sc_cm_id->device,
(void *)
vec->sge[xdr_sge_no].iov_base + sge_off,
sge_bytes, DMA_TO_DEVICE);
if (dma_mapping_error(xprt->sc_cm_id->device->dma_device,
sge[sge_no].addr))
goto err;
if (!vec->frmr) {
sge[sge_no].addr =
ib_dma_map_single(xprt->sc_cm_id->device,
(void *)
vec->sge[xdr_sge_no].iov_base + sge_off,
sge_bytes, DMA_TO_DEVICE);
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
sge[sge_no].addr))
goto err;
atomic_inc(&xprt->sc_dma_used);
sge[sge_no].lkey = xprt->sc_dma_lkey;
} else {
sge[sge_no].addr = (unsigned long)
vec->sge[xdr_sge_no].iov_base + sge_off;
sge[sge_no].lkey = vec->frmr->mr->lkey;
}
ctxt->count++;
ctxt->frmr = vec->frmr;
sge_off = 0;
sge_no++;
ctxt->count++;
xdr_sge_no++;
BUG_ON(xdr_sge_no > vec->count);
bc -= sge_bytes;
}
BUG_ON(bc != 0);
BUG_ON(xdr_sge_no > vec->count);
/* Prepare WRITE WR */
memset(&write_wr, 0, sizeof write_wr);
ctxt->wr_op = IB_WR_RDMA_WRITE;
@ -226,7 +355,10 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
res_ary = (struct rpcrdma_write_array *)
&rdma_resp->rm_body.rm_chunks[1];
max_write = xprt->sc_max_sge * PAGE_SIZE;
if (vec->frmr)
max_write = vec->frmr->map_len;
else
max_write = xprt->sc_max_sge * PAGE_SIZE;
/* Write chunks start at the pagelist */
for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
@ -297,7 +429,10 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
res_ary = (struct rpcrdma_write_array *)
&rdma_resp->rm_body.rm_chunks[2];
max_write = xprt->sc_max_sge * PAGE_SIZE;
if (vec->frmr)
max_write = vec->frmr->map_len;
else
max_write = xprt->sc_max_sge * PAGE_SIZE;
/* xdr offset starts at RPC message */
for (xdr_off = 0, chunk_no = 0;
@ -307,7 +442,6 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
ch = &arg_ary->wc_array[chunk_no].wc_target;
write_len = min(xfer_len, ch->rs_length);
/* Prepare the reply chunk given the length actually
* written */
rs_offset = get_unaligned(&(ch->rs_offset));
@ -366,6 +500,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
int byte_count)
{
struct ib_send_wr send_wr;
struct ib_send_wr inv_wr;
int sge_no;
int sge_bytes;
int page_no;
@ -385,27 +520,45 @@ static int send_reply(struct svcxprt_rdma *rdma,
/* Prepare the context */
ctxt->pages[0] = page;
ctxt->count = 1;
ctxt->frmr = vec->frmr;
if (vec->frmr)
set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
else
clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
/* Prepare the SGE for the RPCRDMA Header */
atomic_inc(&rdma->sc_dma_used);
ctxt->sge[0].addr =
ib_dma_map_page(rdma->sc_cm_id->device,
page, 0, PAGE_SIZE, DMA_TO_DEVICE);
if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
goto err;
atomic_inc(&rdma->sc_dma_used);
ctxt->direction = DMA_TO_DEVICE;
ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey;
ctxt->sge[0].lkey = rdma->sc_dma_lkey;
/* Determine how many of our SGE are to be transmitted */
for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
byte_count -= sge_bytes;
atomic_inc(&rdma->sc_dma_used);
ctxt->sge[sge_no].addr =
ib_dma_map_single(rdma->sc_cm_id->device,
vec->sge[sge_no].iov_base,
sge_bytes, DMA_TO_DEVICE);
if (!vec->frmr) {
ctxt->sge[sge_no].addr =
ib_dma_map_single(rdma->sc_cm_id->device,
vec->sge[sge_no].iov_base,
sge_bytes, DMA_TO_DEVICE);
if (ib_dma_mapping_error(rdma->sc_cm_id->device,
ctxt->sge[sge_no].addr))
goto err;
atomic_inc(&rdma->sc_dma_used);
ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
} else {
ctxt->sge[sge_no].addr = (unsigned long)
vec->sge[sge_no].iov_base;
ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
}
ctxt->sge[sge_no].length = sge_bytes;
ctxt->sge[sge_no].lkey = rdma->sc_phys_mr->lkey;
}
BUG_ON(byte_count != 0);
@ -417,11 +570,16 @@ static int send_reply(struct svcxprt_rdma *rdma,
ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
ctxt->count++;
rqstp->rq_respages[page_no] = NULL;
/* If there are more pages than SGE, terminate SGE list */
/*
* If there are more pages than SGE, terminate SGE
* list so that svc_rdma_unmap_dma doesn't attempt to
* unmap garbage.
*/
if (page_no+1 >= sge_no)
ctxt->sge[page_no+1].length = 0;
}
BUG_ON(sge_no > rdma->sc_max_sge);
BUG_ON(sge_no > ctxt->count);
memset(&send_wr, 0, sizeof send_wr);
ctxt->wr_op = IB_WR_SEND;
send_wr.wr_id = (unsigned long)ctxt;
@ -429,12 +587,26 @@ static int send_reply(struct svcxprt_rdma *rdma,
send_wr.num_sge = sge_no;
send_wr.opcode = IB_WR_SEND;
send_wr.send_flags = IB_SEND_SIGNALED;
if (vec->frmr) {
/* Prepare INVALIDATE WR */
memset(&inv_wr, 0, sizeof inv_wr);
inv_wr.opcode = IB_WR_LOCAL_INV;
inv_wr.send_flags = IB_SEND_SIGNALED;
inv_wr.ex.invalidate_rkey =
vec->frmr->mr->lkey;
send_wr.next = &inv_wr;
}
ret = svc_rdma_send(rdma, &send_wr);
if (ret)
svc_rdma_put_context(ctxt, 1);
goto err;
return ret;
return 0;
err:
svc_rdma_put_frmr(rdma, vec->frmr);
svc_rdma_put_context(ctxt, 1);
return -EIO;
}
void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
@ -477,8 +649,9 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
ctxt = svc_rdma_get_context(rdma);
ctxt->direction = DMA_TO_DEVICE;
vec = svc_rdma_get_req_map();
xdr_to_sge(rdma, &rqstp->rq_res, vec);
ret = map_xdr(rdma, &rqstp->rq_res, vec);
if (ret)
goto err0;
inline_bytes = rqstp->rq_res.len;
/* Create the RDMA response header */
@ -498,7 +671,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (ret < 0) {
printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
ret);
goto error;
goto err1;
}
inline_bytes -= ret;
@ -508,7 +681,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (ret < 0) {
printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
ret);
goto error;
goto err1;
}
inline_bytes -= ret;
@ -517,9 +690,11 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
svc_rdma_put_req_map(vec);
dprintk("svcrdma: send_reply returns %d\n", ret);
return ret;
error:
err1:
put_page(res_page);
err0:
svc_rdma_put_req_map(vec);
svc_rdma_put_context(ctxt, 0);
put_page(res_page);
return ret;
}

View File

@ -335,6 +335,8 @@ static void process_context(struct svcxprt_rdma *xprt,
switch (ctxt->wr_op) {
case IB_WR_SEND:
if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
svc_rdma_put_frmr(xprt, ctxt->frmr);
svc_rdma_put_context(ctxt, 1);
break;