diff options
author | Chuck Lever <chuck.lever@oracle.com> | 2018-05-07 22:28:04 +0300 |
---|---|---|
committer | J. Bruce Fields <bfields@redhat.com> | 2018-05-11 22:48:57 +0300 |
commit | 4201c7464753827803366b40e82eb050c04ebdef (patch) | |
tree | e22275b92394c10cf40bed41ceb8900d530d3e3f /net/sunrpc/xprtrdma/svc_rdma_sendto.c | |
parent | 232627905f12a05df75853c62451ce0886803cee (diff) | |
download | linux-4201c7464753827803366b40e82eb050c04ebdef.tar.xz |
svcrdma: Introduce svc_rdma_send_ctxt
svc_rdma_op_ctxt's are pre-allocated and maintained on a per-xprt
free list. This eliminates the overhead of calling kmalloc / kfree,
both of which grab a globally shared lock that disables interrupts.
Introduce a replacement to svc_rdma_op_ctxt's that is built
especially for the svcrdma Send path.
Subsequent patches will take advantage of this new structure by
allocating real resources which are then cached in these objects.
The allocations are freed when the transport is torn down.
I've renamed the structure so that static type checking can be used
to ensure that uses of op_ctxt and send_ctxt are not confused. As an
additional clean up, structure fields are renamed to conform with
kernel coding conventions.
Additional clean ups:
- Handle svc_rdma_send_ctxt_get allocation failure at each call
site, rather than pre-allocating and hoping we guessed correctly
- All send_ctxt_put call-sites request page freeing, so remove
the @free_pages argument
- All send_ctxt_put call-sites unmap SGEs, so fold that into
svc_rdma_send_ctxt_put
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_sendto.c')
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 254 |
1 files changed, 212 insertions, 42 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 4591017adc1e..b286d6a6e429 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -75,11 +75,11 @@ * DMA-unmap the pages under I/O for that Write segment. The Write * completion handler does not release any pages. * - * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt. + * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt. * The ownership of all of the Reply's pages are transferred into that * ctxt, the Send WR is posted, and sendto returns. * - * The svc_rdma_op_ctxt is presented when the Send WR completes. The + * The svc_rdma_send_ctxt is presented when the Send WR completes. The * Send completion handler finally releases the Reply's pages. * * This mechanism also assumes that completions on the transport's Send @@ -114,6 +114,184 @@ #define RPCDBG_FACILITY RPCDBG_SVCXPRT +static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); + +static inline struct svc_rdma_send_ctxt * +svc_rdma_next_send_ctxt(struct list_head *list) +{ + return list_first_entry_or_null(list, struct svc_rdma_send_ctxt, + sc_list); +} + +static struct svc_rdma_send_ctxt * +svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + int i; + + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); + if (!ctxt) + return NULL; + + ctxt->sc_cqe.done = svc_rdma_wc_send; + ctxt->sc_send_wr.next = NULL; + ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe; + ctxt->sc_send_wr.sg_list = ctxt->sc_sges; + ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED; + for (i = 0; i < ARRAY_SIZE(ctxt->sc_sges); i++) + ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey; + return ctxt; +} + +/** + * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt + * @rdma: svcxprt_rdma being torn down + * + */ +void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) { + list_del(&ctxt->sc_list); + kfree(ctxt); + } +} + +/** + * svc_rdma_send_ctxt_get - Get a free send_ctxt + * @rdma: controlling svcxprt_rdma + * + * Returns a ready-to-use send_ctxt, or NULL if none are + * available and a fresh one cannot be allocated. + */ +struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + + spin_lock(&rdma->sc_send_lock); + ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts); + if (!ctxt) + goto out_empty; + list_del(&ctxt->sc_list); + spin_unlock(&rdma->sc_send_lock); + +out: + ctxt->sc_send_wr.num_sge = 0; + ctxt->sc_page_count = 0; + return ctxt; + +out_empty: + spin_unlock(&rdma->sc_send_lock); + ctxt = svc_rdma_send_ctxt_alloc(rdma); + if (!ctxt) + return NULL; + goto out; +} + +/** + * svc_rdma_send_ctxt_put - Return send_ctxt to free list + * @rdma: controlling svcxprt_rdma + * @ctxt: object to return to the free list + * + * Pages left in sc_pages are DMA unmapped and released. + */ +void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) +{ + struct ib_device *device = rdma->sc_cm_id->device; + unsigned int i; + + for (i = 0; i < ctxt->sc_send_wr.num_sge; i++) + ib_dma_unmap_page(device, + ctxt->sc_sges[i].addr, + ctxt->sc_sges[i].length, + DMA_TO_DEVICE); + + for (i = 0; i < ctxt->sc_page_count; ++i) + put_page(ctxt->sc_pages[i]); + + spin_lock(&rdma->sc_send_lock); + list_add(&ctxt->sc_list, &rdma->sc_send_ctxts); + spin_unlock(&rdma->sc_send_lock); +} + +/** + * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC + * @cq: Completion Queue context + * @wc: Work Completion object + * + * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that + * the Send completion handler could be running. + */ +static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) +{ + struct svcxprt_rdma *rdma = cq->cq_context; + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_send_ctxt *ctxt; + + trace_svcrdma_wc_send(wc); + + atomic_inc(&rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + + ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe); + svc_rdma_send_ctxt_put(rdma, ctxt); + + if (unlikely(wc->status != IB_WC_SUCCESS)) { + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + svc_xprt_enqueue(&rdma->sc_xprt); + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("svcrdma: Send: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); + } + + svc_xprt_put(&rdma->sc_xprt); +} + +int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr) +{ + struct ib_send_wr *bad_wr, *n_wr; + int wr_count; + int i; + int ret; + + wr_count = 1; + for (n_wr = wr->next; n_wr; n_wr = n_wr->next) + wr_count++; + + /* If the SQ is full, wait until an SQ entry is available */ + while (1) { + if ((atomic_sub_return(wr_count, &rdma->sc_sq_avail) < 0)) { + atomic_inc(&rdma_stat_sq_starve); + trace_svcrdma_sq_full(rdma); + atomic_add(wr_count, &rdma->sc_sq_avail); + wait_event(rdma->sc_send_wait, + atomic_read(&rdma->sc_sq_avail) > wr_count); + if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) + return -ENOTCONN; + trace_svcrdma_sq_retry(rdma); + continue; + } + /* Take a transport ref for each WR posted */ + for (i = 0; i < wr_count; i++) + svc_xprt_get(&rdma->sc_xprt); + + /* Bump used SQ WR count and post */ + ret = ib_post_send(rdma->sc_qp, wr, &bad_wr); + trace_svcrdma_post_send(wr, ret); + if (ret) { + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + for (i = 0; i < wr_count; i++) + svc_xprt_put(&rdma->sc_xprt); + wake_up(&rdma->sc_send_wait); + } + break; + } + return ret; +} + static u32 xdr_padsize(u32 len) { return (len & 3) ? (4 - (len & 3)) : 0; @@ -303,7 +481,7 @@ static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp, } static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, unsigned int sge_no, struct page *page, unsigned long offset, @@ -316,10 +494,9 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, if (ib_dma_mapping_error(dev, dma_addr)) goto out_maperr; - ctxt->sge[sge_no].addr = dma_addr; - ctxt->sge[sge_no].length = len; - ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; - ctxt->mapped_sges++; + ctxt->sc_sges[sge_no].addr = dma_addr; + ctxt->sc_sges[sge_no].length = len; + ctxt->sc_send_wr.num_sge++; return 0; out_maperr: @@ -331,7 +508,7 @@ out_maperr: * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively. */ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, unsigned int sge_no, unsigned char *base, unsigned int len) @@ -352,14 +529,13 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, * %-EIO if DMA mapping failed. */ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, __be32 *rdma_resp, unsigned int len) { - ctxt->direction = DMA_TO_DEVICE; - ctxt->pages[0] = virt_to_page(rdma_resp); - ctxt->count = 1; - return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); + ctxt->sc_pages[0] = virt_to_page(rdma_resp); + ctxt->sc_page_count++; + return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->sc_pages[0], 0, len); } /* Load the xdr_buf into the ctxt's sge array, and DMA map each @@ -368,7 +544,7 @@ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, * Returns zero on success, or a negative errno on failure. */ static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, struct xdr_buf *xdr, __be32 *wr_lst) { unsigned int len, sge_no, remaining; @@ -436,13 +612,13 @@ tail: * so they are released by the Send completion handler. */ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *ctxt) + struct svc_rdma_send_ctxt *ctxt) { int i, pages = rqstp->rq_next_page - rqstp->rq_respages; - ctxt->count += pages; + ctxt->sc_page_count += pages; for (i = 0; i < pages; i++) { - ctxt->pages[i + 1] = rqstp->rq_respages[i]; + ctxt->sc_pages[i + 1] = rqstp->rq_respages[i]; rqstp->rq_respages[i] = NULL; } rqstp->rq_next_page = rqstp->rq_respages + 1; @@ -461,37 +637,29 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, * %-ENOMEM if ib_post_send failed. */ int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, u32 inv_rkey) { - struct ib_send_wr *send_wr = &ctxt->send_wr; - dprintk("svcrdma: posting Send WR with %u sge(s)\n", - ctxt->mapped_sges); - - send_wr->next = NULL; - ctxt->cqe.done = svc_rdma_wc_send; - send_wr->wr_cqe = &ctxt->cqe; - send_wr->sg_list = ctxt->sge; - send_wr->num_sge = ctxt->mapped_sges; - send_wr->send_flags = IB_SEND_SIGNALED; + ctxt->sc_send_wr.num_sge); + if (inv_rkey) { - send_wr->opcode = IB_WR_SEND_WITH_INV; - send_wr->ex.invalidate_rkey = inv_rkey; + ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV; + ctxt->sc_send_wr.ex.invalidate_rkey = inv_rkey; } else { - send_wr->opcode = IB_WR_SEND; + ctxt->sc_send_wr.opcode = IB_WR_SEND; } - return svc_rdma_send(rdma, send_wr); + return svc_rdma_send(rdma, &ctxt->sc_send_wr); } /* Prepare the portion of the RPC Reply that will be transmitted * via RDMA Send. The RPC-over-RDMA transport header is prepared - * in sge[0], and the RPC xdr_buf is prepared in following sges. + * in sc_sges[0], and the RPC xdr_buf is prepared in following sges. * * Depending on whether a Write list or Reply chunk is present, * the server may send all, a portion of, or none of the xdr_buf. - * In the latter case, only the transport header (sge[0]) is + * In the latter case, only the transport header (sc_sges[0]) is * transmitted. * * RDMA Send is the last step of transmitting an RPC reply. Pages @@ -508,11 +676,13 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, __be32 *wr_lst, __be32 *rp_ch) { - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt; u32 inv_rkey; int ret; - ctxt = svc_rdma_get_context(rdma); + ctxt = svc_rdma_send_ctxt_get(rdma); + if (!ctxt) + return -ENOMEM; ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, svc_rdma_reply_hdr_len(rdma_resp)); @@ -538,8 +708,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, return 0; err: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); + svc_rdma_send_ctxt_put(rdma, ctxt); return ret; } @@ -553,11 +722,13 @@ err: static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, __be32 *rdma_resp, struct svc_rqst *rqstp) { - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt; __be32 *p; int ret; - ctxt = svc_rdma_get_context(rdma); + ctxt = svc_rdma_send_ctxt_get(rdma); + if (!ctxt) + return -ENOMEM; /* Replace the original transport header with an * RDMA_ERROR response. XID etc are preserved. @@ -580,8 +751,7 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, return 0; err: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); + svc_rdma_send_ctxt_put(rdma, ctxt); return ret; } |