diff options
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma.c | 32 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 11 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 211 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_rw.c | 450 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 96 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 36 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 2 |
7 files changed, 470 insertions, 368 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index f0d5eeed4c88..f86970733eb0 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -256,28 +256,44 @@ out_err: return rc; } +struct workqueue_struct *svcrdma_wq; + void svc_rdma_cleanup(void) { - dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n"); svc_unreg_xprt_class(&svc_rdma_class); svc_rdma_proc_cleanup(); + if (svcrdma_wq) { + struct workqueue_struct *wq = svcrdma_wq; + + svcrdma_wq = NULL; + destroy_workqueue(wq); + } + + dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n"); } int svc_rdma_init(void) { + struct workqueue_struct *wq; int rc; - dprintk("SVCRDMA Module Init, register RPC RDMA transport\n"); - dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord); - dprintk("\tmax_requests : %u\n", svcrdma_max_requests); - dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests); - dprintk("\tmax_inline : %d\n", svcrdma_max_req_size); + wq = alloc_workqueue("svcrdma", WQ_UNBOUND, 0); + if (!wq) + return -ENOMEM; rc = svc_rdma_proc_init(); - if (rc) + if (rc) { + destroy_workqueue(wq); return rc; + } - /* Register RDMA with the SVC transport switch */ + svcrdma_wq = wq; svc_reg_xprt_class(&svc_rdma_class); + + dprintk("SVCRDMA Module Init, register RPC RDMA transport\n"); + dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord); + dprintk("\tmax_requests : %u\n", svcrdma_max_requests); + dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests); + dprintk("\tmax_inline : %d\n", svcrdma_max_req_size); return 0; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index 7420a2c990c7..c9be6778643b 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -76,15 +76,12 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst, struct svc_rdma_send_ctxt *sctxt) { - struct svc_rdma_recv_ctxt *rctxt; + struct svc_rdma_pcl empty_pcl; int ret; - rctxt = svc_rdma_recv_ctxt_get(rdma); - if (!rctxt) - return -EIO; - - ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqst->rq_snd_buf); - svc_rdma_recv_ctxt_put(rdma, rctxt); + pcl_init(&empty_pcl); + ret = svc_rdma_map_reply_msg(rdma, sctxt, &empty_pcl, &empty_pcl, + &rqst->rq_snd_buf); if (ret < 0) return -EIO; diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 3b05f90a3e50..d72953f29258 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -115,13 +115,6 @@ svc_rdma_next_recv_ctxt(struct list_head *list) rc_list); } -static void svc_rdma_recv_cid_init(struct svcxprt_rdma *rdma, - struct rpc_rdma_cid *cid) -{ - cid->ci_queue_id = rdma->sc_rq_cq->res.id; - cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids); -} - static struct svc_rdma_recv_ctxt * svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma) { @@ -130,7 +123,7 @@ svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma) dma_addr_t addr; void *buffer; - ctxt = kmalloc_node(sizeof(*ctxt), GFP_KERNEL, node); + ctxt = kzalloc_node(sizeof(*ctxt), GFP_KERNEL, node); if (!ctxt) goto fail0; buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node); @@ -156,6 +149,7 @@ svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma) ctxt->rc_recv_sge.length = rdma->sc_max_req_size; ctxt->rc_recv_sge.lkey = rdma->sc_pd->local_dma_lkey; ctxt->rc_recv_buf = buffer; + svc_rdma_cc_init(rdma, &ctxt->rc_cc); return ctxt; fail2: @@ -204,18 +198,11 @@ struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) node = llist_del_first(&rdma->sc_recv_ctxts); if (!node) - goto out_empty; - ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node); + return NULL; -out: + ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node); ctxt->rc_page_count = 0; return ctxt; - -out_empty: - ctxt = svc_rdma_recv_ctxt_alloc(rdma); - if (!ctxt) - return NULL; - goto out; } /** @@ -227,6 +214,13 @@ out_empty: void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, struct svc_rdma_recv_ctxt *ctxt) { + svc_rdma_cc_release(rdma, &ctxt->rc_cc, DMA_FROM_DEVICE); + + /* @rc_page_count is normally zero here, but error flows + * can leave pages in @rc_pages. + */ + release_pages(ctxt->rc_pages, ctxt->rc_page_count); + pcl_free(&ctxt->rc_call_pcl); pcl_free(&ctxt->rc_read_pcl); pcl_free(&ctxt->rc_write_pcl); @@ -271,13 +265,13 @@ static bool svc_rdma_refresh_recvs(struct svcxprt_rdma *rdma, if (!ctxt) break; - trace_svcrdma_post_recv(ctxt); + trace_svcrdma_post_recv(&ctxt->rc_cid); ctxt->rc_recv_wr.next = recv_chain; recv_chain = &ctxt->rc_recv_wr; rdma->sc_pending_recvs++; } if (!recv_chain) - return false; + return true; ret = ib_post_recv(rdma->sc_qp, recv_chain, &bad_wr); if (ret) @@ -301,10 +295,27 @@ err_free: * svc_rdma_post_recvs - Post initial set of Recv WRs * @rdma: fresh svcxprt_rdma * - * Returns true if successful, otherwise false. + * Return values: + * %true: Receive Queue initialization successful + * %false: memory allocation or DMA error */ bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma) { + unsigned int total; + + /* For each credit, allocate enough recv_ctxts for one + * posted Receive and one RPC in process. + */ + total = (rdma->sc_max_requests * 2) + rdma->sc_recv_batch; + while (total--) { + struct svc_rdma_recv_ctxt *ctxt; + + ctxt = svc_rdma_recv_ctxt_alloc(rdma); + if (!ctxt) + return false; + llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts); + } + return svc_rdma_refresh_recvs(rdma, rdma->sc_max_requests); } @@ -373,6 +384,10 @@ void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma) { struct svc_rdma_recv_ctxt *ctxt; + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_read_complete_q))) { + list_del(&ctxt->rc_list); + svc_rdma_recv_ctxt_put(rdma, ctxt); + } while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) { list_del(&ctxt->rc_list); svc_rdma_recv_ctxt_put(rdma, ctxt); @@ -754,6 +769,122 @@ static bool svc_rdma_is_reverse_direction_reply(struct svc_xprt *xprt, return true; } +/* Finish constructing the RPC Call message in rqstp::rq_arg. + * + * The incoming RPC/RDMA message is an RDMA_MSG type message + * with a single Read chunk (only the upper layer data payload + * was conveyed via RDMA Read). + */ +static void svc_rdma_read_complete_one(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *ctxt) +{ + struct svc_rdma_chunk *chunk = pcl_first_chunk(&ctxt->rc_read_pcl); + struct xdr_buf *buf = &rqstp->rq_arg; + unsigned int length; + + /* Split the Receive buffer between the head and tail + * buffers at Read chunk's position. XDR roundup of the + * chunk is not included in either the pagelist or in + * the tail. + */ + buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position; + buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position; + buf->head[0].iov_len = chunk->ch_position; + + /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). + * + * If the client already rounded up the chunk length, the + * length does not change. Otherwise, the length of the page + * list is increased to include XDR round-up. + * + * Currently these chunks always start at page offset 0, + * thus the rounded-up length never crosses a page boundary. + */ + buf->pages = &rqstp->rq_pages[0]; + length = xdr_align_size(chunk->ch_length); + buf->page_len = length; + buf->len += length; + buf->buflen += length; +} + +/* Finish constructing the RPC Call message in rqstp::rq_arg. + * + * The incoming RPC/RDMA message is an RDMA_MSG type message + * with payload in multiple Read chunks and no PZRC. + */ +static void svc_rdma_read_complete_multiple(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *ctxt) +{ + struct xdr_buf *buf = &rqstp->rq_arg; + + buf->len += ctxt->rc_readbytes; + buf->buflen += ctxt->rc_readbytes; + + buf->head[0].iov_base = page_address(rqstp->rq_pages[0]); + buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, ctxt->rc_readbytes); + buf->pages = &rqstp->rq_pages[1]; + buf->page_len = ctxt->rc_readbytes - buf->head[0].iov_len; +} + +/* Finish constructing the RPC Call message in rqstp::rq_arg. + * + * The incoming RPC/RDMA message is an RDMA_NOMSG type message + * (the RPC message body was conveyed via RDMA Read). + */ +static void svc_rdma_read_complete_pzrc(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *ctxt) +{ + struct xdr_buf *buf = &rqstp->rq_arg; + + buf->len += ctxt->rc_readbytes; + buf->buflen += ctxt->rc_readbytes; + + buf->head[0].iov_base = page_address(rqstp->rq_pages[0]); + buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, ctxt->rc_readbytes); + buf->pages = &rqstp->rq_pages[1]; + buf->page_len = ctxt->rc_readbytes - buf->head[0].iov_len; +} + +static noinline void svc_rdma_read_complete(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *ctxt) +{ + unsigned int i; + + /* Transfer the Read chunk pages into @rqstp.rq_pages, replacing + * the rq_pages that were already allocated for this rqstp. + */ + release_pages(rqstp->rq_respages, ctxt->rc_page_count); + for (i = 0; i < ctxt->rc_page_count; i++) + rqstp->rq_pages[i] = ctxt->rc_pages[i]; + + /* Update @rqstp's result send buffer to start after the + * last page in the RDMA Read payload. + */ + rqstp->rq_respages = &rqstp->rq_pages[ctxt->rc_page_count]; + rqstp->rq_next_page = rqstp->rq_respages + 1; + + /* Prevent svc_rdma_recv_ctxt_put() from releasing the + * pages in ctxt::rc_pages a second time. + */ + ctxt->rc_page_count = 0; + + /* Finish constructing the RPC Call message. The exact + * procedure for that depends on what kind of RPC/RDMA + * chunks were provided by the client. + */ + rqstp->rq_arg = ctxt->rc_saved_arg; + if (pcl_is_empty(&ctxt->rc_call_pcl)) { + if (ctxt->rc_read_pcl.cl_count == 1) + svc_rdma_read_complete_one(rqstp, ctxt); + else + svc_rdma_read_complete_multiple(rqstp, ctxt); + } else { + svc_rdma_read_complete_pzrc(rqstp, ctxt); + } + + trace_svcrdma_read_finished(&ctxt->rc_cid); +} + /** * svc_rdma_recvfrom - Receive an RPC call * @rqstp: request structure into which to receive an RPC Call @@ -798,8 +929,15 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) rqstp->rq_xprt_ctxt = NULL; - ctxt = NULL; spin_lock(&rdma_xprt->sc_rq_dto_lock); + ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q); + if (ctxt) { + list_del(&ctxt->rc_list); + spin_unlock(&rdma_xprt->sc_rq_dto_lock); + svc_xprt_received(xprt); + svc_rdma_read_complete(rqstp, ctxt); + goto complete; + } ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q); if (ctxt) list_del(&ctxt->rc_list); @@ -831,12 +969,10 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) svc_rdma_get_inv_rkey(rdma_xprt, ctxt); if (!pcl_is_empty(&ctxt->rc_read_pcl) || - !pcl_is_empty(&ctxt->rc_call_pcl)) { - ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt); - if (ret < 0) - goto out_readfail; - } + !pcl_is_empty(&ctxt->rc_call_pcl)) + goto out_readlist; +complete: rqstp->rq_xprt_ctxt = ctxt; rqstp->rq_prot = IPPROTO_MAX; svc_xprt_copy_addrs(rqstp, xprt); @@ -848,12 +984,23 @@ out_err: svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); return 0; -out_readfail: - if (ret == -EINVAL) - svc_rdma_send_error(rdma_xprt, ctxt, ret); - svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); - svc_xprt_deferred_close(xprt); - return -ENOTCONN; +out_readlist: + /* This @rqstp is about to be recycled. Save the work + * already done constructing the Call message in rq_arg + * so it can be restored when the RDMA Reads have + * completed. + */ + ctxt->rc_saved_arg = rqstp->rq_arg; + + ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt); + if (ret < 0) { + if (ret == -EINVAL) + svc_rdma_send_error(rdma_xprt, ctxt, ret); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); + svc_xprt_deferred_close(xprt); + return ret; + } + return 0; out_backchannel: svc_rdma_handle_bc_reply(rqstp, ctxt); diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index e460e25a1d6d..c00fcce61d1e 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -39,6 +39,7 @@ struct svc_rdma_rw_ctxt { struct list_head rw_list; struct rdma_rw_ctx rw_ctx; unsigned int rw_nents; + unsigned int rw_first_sgl_nents; struct sg_table rw_sg_table; struct scatterlist rw_first_sgl[]; }; @@ -53,6 +54,8 @@ svc_rdma_next_ctxt(struct list_head *list) static struct svc_rdma_rw_ctxt * svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) { + struct ib_device *dev = rdma->sc_cm_id->device; + unsigned int first_sgl_nents = dev->attrs.max_send_sge; struct svc_rdma_rw_ctxt *ctxt; struct llist_node *node; @@ -62,32 +65,33 @@ svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) if (node) { ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); } else { - ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE), - GFP_KERNEL, ibdev_to_node(rdma->sc_cm_id->device)); + ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, first_sgl_nents), + GFP_KERNEL, ibdev_to_node(dev)); if (!ctxt) goto out_noctx; INIT_LIST_HEAD(&ctxt->rw_list); + ctxt->rw_first_sgl_nents = first_sgl_nents; } ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, ctxt->rw_sg_table.sgl, - SG_CHUNK_SIZE)) + first_sgl_nents)) goto out_free; return ctxt; out_free: kfree(ctxt); out_noctx: - trace_svcrdma_no_rwctx_err(rdma, sges); + trace_svcrdma_rwctx_empty(rdma, sges); return NULL; } static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt, struct llist_head *list) { - sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE); + sg_free_table_chained(&ctxt->rw_sg_table, ctxt->rw_first_sgl_nents); llist_add(&ctxt->rw_node, list); } @@ -135,57 +139,40 @@ static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, ctxt->rw_sg_table.sgl, ctxt->rw_nents, 0, offset, handle, direction); if (unlikely(ret < 0)) { + trace_svcrdma_dma_map_rw_err(rdma, offset, handle, + ctxt->rw_nents, ret); svc_rdma_put_rw_ctxt(rdma, ctxt); - trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret); } return ret; } -/* A chunk context tracks all I/O for moving one Read or Write - * chunk. This is a set of rdma_rw's that handle data movement - * for all segments of one chunk. - * - * These are small, acquired with a single allocator call, and - * no more than one is needed per chunk. They are allocated on - * demand, and not cached. +/** + * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt + * @rdma: controlling transport instance + * @cc: svc_rdma_chunk_ctxt to be initialized */ -struct svc_rdma_chunk_ctxt { - struct rpc_rdma_cid cc_cid; - struct ib_cqe cc_cqe; - struct svcxprt_rdma *cc_rdma; - struct list_head cc_rwctxts; - ktime_t cc_posttime; - int cc_sqecount; - enum ib_wc_status cc_status; - struct completion cc_done; -}; - -static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma, - struct rpc_rdma_cid *cid) +void svc_rdma_cc_init(struct svcxprt_rdma *rdma, + struct svc_rdma_chunk_ctxt *cc) { - cid->ci_queue_id = rdma->sc_sq_cq->res.id; - cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids); -} + struct rpc_rdma_cid *cid = &cc->cc_cid; -static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, - struct svc_rdma_chunk_ctxt *cc) -{ - svc_rdma_cc_cid_init(rdma, &cc->cc_cid); - cc->cc_rdma = rdma; + if (unlikely(!cid->ci_completion_id)) + svc_rdma_send_cid_init(rdma, cid); INIT_LIST_HEAD(&cc->cc_rwctxts); cc->cc_sqecount = 0; } -/* - * The consumed rw_ctx's are cleaned and placed on a local llist so - * that only one atomic llist operation is needed to put them all - * back on the free list. +/** + * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt + * @rdma: controlling transport instance + * @cc: svc_rdma_chunk_ctxt to be released + * @dir: DMA direction */ -static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, - enum dma_data_direction dir) +void svc_rdma_cc_release(struct svcxprt_rdma *rdma, + struct svc_rdma_chunk_ctxt *cc, + enum dma_data_direction dir) { - struct svcxprt_rdma *rdma = cc->cc_rdma; struct llist_node *first, *last; struct svc_rdma_rw_ctxt *ctxt; LLIST_HEAD(free); @@ -215,6 +202,8 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, * - Stores arguments for the SGL constructor functions */ struct svc_rdma_write_info { + struct svcxprt_rdma *wi_rdma; + const struct svc_rdma_chunk *wi_chunk; /* write state of this chunk */ @@ -227,6 +216,7 @@ struct svc_rdma_write_info { unsigned int wi_next_off; struct svc_rdma_chunk_ctxt wi_cc; + struct work_struct wi_work; }; static struct svc_rdma_write_info * @@ -235,25 +225,33 @@ svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, { struct svc_rdma_write_info *info; - info = kmalloc_node(sizeof(*info), GFP_KERNEL, + info = kzalloc_node(sizeof(*info), GFP_KERNEL, ibdev_to_node(rdma->sc_cm_id->device)); if (!info) return info; + info->wi_rdma = rdma; info->wi_chunk = chunk; - info->wi_seg_off = 0; - info->wi_seg_no = 0; svc_rdma_cc_init(rdma, &info->wi_cc); info->wi_cc.cc_cqe.done = svc_rdma_write_done; return info; } -static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) +static void svc_rdma_write_info_free_async(struct work_struct *work) { - svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE); + struct svc_rdma_write_info *info; + + info = container_of(work, struct svc_rdma_write_info, wi_work); + svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE); kfree(info); } +static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) +{ + INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async); + queue_work(svcrdma_wq, &info->wi_work); +} + /** * svc_rdma_write_done - Write chunk completion * @cq: controlling Completion Queue @@ -263,16 +261,16 @@ static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) */ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) { + struct svcxprt_rdma *rdma = cq->cq_context; struct ib_cqe *cqe = wc->wr_cqe; struct svc_rdma_chunk_ctxt *cc = container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); - struct svcxprt_rdma *rdma = cc->cc_rdma; struct svc_rdma_write_info *info = container_of(cc, struct svc_rdma_write_info, wi_cc); switch (wc->status) { case IB_WC_SUCCESS: - trace_svcrdma_wc_write(wc, &cc->cc_cid); + trace_svcrdma_wc_write(&cc->cc_cid); break; case IB_WC_WR_FLUSH_ERR: trace_svcrdma_wc_write_flush(wc, &cc->cc_cid); @@ -289,39 +287,6 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) svc_rdma_write_info_free(info); } -/* State for pulling a Read chunk. - */ -struct svc_rdma_read_info { - struct svc_rqst *ri_rqst; - struct svc_rdma_recv_ctxt *ri_readctxt; - unsigned int ri_pageno; - unsigned int ri_pageoff; - unsigned int ri_totalbytes; - - struct svc_rdma_chunk_ctxt ri_cc; -}; - -static struct svc_rdma_read_info * -svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma) -{ - struct svc_rdma_read_info *info; - - info = kmalloc_node(sizeof(*info), GFP_KERNEL, - ibdev_to_node(rdma->sc_cm_id->device)); - if (!info) - return info; - - svc_rdma_cc_init(rdma, &info->ri_cc); - info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done; - return info; -} - -static void svc_rdma_read_info_free(struct svc_rdma_read_info *info) -{ - svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE); - kfree(info); -} - /** * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx * @cq: controlling Completion Queue @@ -330,17 +295,27 @@ static void svc_rdma_read_info_free(struct svc_rdma_read_info *info) */ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) { + struct svcxprt_rdma *rdma = cq->cq_context; struct ib_cqe *cqe = wc->wr_cqe; struct svc_rdma_chunk_ctxt *cc = container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); - struct svc_rdma_read_info *info; + struct svc_rdma_recv_ctxt *ctxt; + svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); + + ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc); switch (wc->status) { case IB_WC_SUCCESS: - info = container_of(cc, struct svc_rdma_read_info, ri_cc); - trace_svcrdma_wc_read(wc, &cc->cc_cid, info->ri_totalbytes, + trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes, cc->cc_posttime); - break; + + spin_lock(&rdma->sc_rq_dto_lock); + list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q); + /* the unlock pairs with the smp_rmb in svc_xprt_ready */ + set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); + spin_unlock(&rdma->sc_rq_dto_lock); + svc_xprt_enqueue(&rdma->sc_xprt); + return; case IB_WC_WR_FLUSH_ERR: trace_svcrdma_wc_read_flush(wc, &cc->cc_cid); break; @@ -348,10 +323,13 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) trace_svcrdma_wc_read_err(wc, &cc->cc_cid); } - svc_rdma_wake_send_waiters(cc->cc_rdma, cc->cc_sqecount); - cc->cc_status = wc->status; - complete(&cc->cc_done); - return; + /* The RDMA Read has flushed, so the incoming RPC message + * cannot be constructed and must be dropped. Signal the + * loss to the client by closing the connection. + */ + svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE); + svc_rdma_recv_ctxt_put(rdma, ctxt); + svc_xprt_deferred_close(&rdma->sc_xprt); } /* @@ -360,9 +338,9 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) * even if one or more WRs are flushed. This is true when posting * an rdma_rw_ctx or when posting a single signaled WR. */ -static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) +static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma, + struct svc_rdma_chunk_ctxt *cc) { - struct svcxprt_rdma *rdma = cc->cc_rdma; struct ib_send_wr *first_wr; const struct ib_send_wr *bad_wr; struct list_head *tmp; @@ -396,14 +374,14 @@ static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) } percpu_counter_inc(&svcrdma_stat_sq_starve); - trace_svcrdma_sq_full(rdma); + trace_svcrdma_sq_full(rdma, &cc->cc_cid); atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); wait_event(rdma->sc_send_wait, atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); - trace_svcrdma_sq_retry(rdma); + trace_svcrdma_sq_retry(rdma, &cc->cc_cid); } while (1); - trace_svcrdma_sq_post_err(rdma, ret); + trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret); svc_xprt_deferred_close(&rdma->sc_xprt); /* If even one was posted, there will be a completion. */ @@ -473,7 +451,7 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info, unsigned int remaining) { struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; - struct svcxprt_rdma *rdma = cc->cc_rdma; + struct svcxprt_rdma *rdma = info->wi_rdma; const struct svc_rdma_segment *seg; struct svc_rdma_rw_ctxt *ctxt; int ret; @@ -516,7 +494,7 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info, return 0; out_overflow: - trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no, + trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no, info->wi_chunk->ch_segcount); return -E2BIG; } @@ -633,7 +611,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, goto out_err; trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); - ret = svc_rdma_post_chunk_ctxt(cc); + ret = svc_rdma_post_chunk_ctxt(rdma, cc); if (ret < 0) goto out_err; return xdr->len; @@ -680,7 +658,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, goto out_err; trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); - ret = svc_rdma_post_chunk_ctxt(cc); + ret = svc_rdma_post_chunk_ctxt(rdma, cc); if (ret < 0) goto out_err; @@ -693,7 +671,8 @@ out_err: /** * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment - * @info: context for ongoing I/O + * @rqstp: RPC transaction context + * @head: context for ongoing I/O * @segment: co-ordinates of remote memory to be read * * Returns: @@ -702,20 +681,20 @@ out_err: * %-ENOMEM: allocating a local resources failed * %-EIO: a DMA mapping error occurred */ -static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, +static int svc_rdma_build_read_segment(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *head, const struct svc_rdma_segment *segment) { - struct svc_rdma_recv_ctxt *head = info->ri_readctxt; - struct svc_rdma_chunk_ctxt *cc = &info->ri_cc; - struct svc_rqst *rqstp = info->ri_rqst; + struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp); + struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; unsigned int sge_no, seg_len, len; struct svc_rdma_rw_ctxt *ctxt; struct scatterlist *sg; int ret; len = segment->rs_length; - sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT; - ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no); + sge_no = PAGE_ALIGN(head->rc_pageoff + len) >> PAGE_SHIFT; + ctxt = svc_rdma_get_rw_ctxt(rdma, sge_no); if (!ctxt) return -ENOMEM; ctxt->rw_nents = sge_no; @@ -723,29 +702,27 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, sg = ctxt->rw_sg_table.sgl; for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { seg_len = min_t(unsigned int, len, - PAGE_SIZE - info->ri_pageoff); + PAGE_SIZE - head->rc_pageoff); - if (!info->ri_pageoff) + if (!head->rc_pageoff) head->rc_page_count++; - sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], - seg_len, info->ri_pageoff); + sg_set_page(sg, rqstp->rq_pages[head->rc_curpage], + seg_len, head->rc_pageoff); sg = sg_next(sg); - info->ri_pageoff += seg_len; - if (info->ri_pageoff == PAGE_SIZE) { - info->ri_pageno++; - info->ri_pageoff = 0; + head->rc_pageoff += seg_len; + if (head->rc_pageoff == PAGE_SIZE) { + head->rc_curpage++; + head->rc_pageoff = 0; } len -= seg_len; - /* Safety check */ - if (len && - &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end) + if (len && ((head->rc_curpage + 1) > ARRAY_SIZE(rqstp->rq_pages))) goto out_overrun; } - ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset, + ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset, segment->rs_handle, DMA_FROM_DEVICE); if (ret < 0) return -EIO; @@ -756,13 +733,14 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, return 0; out_overrun: - trace_svcrdma_page_overrun_err(cc->cc_rdma, rqstp, info->ri_pageno); + trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage); return -EINVAL; } /** * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk - * @info: context for ongoing I/O + * @rqstp: RPC transaction context + * @head: context for ongoing I/O * @chunk: Read chunk to pull * * Return values: @@ -771,7 +749,8 @@ out_overrun: * %-ENOMEM: allocating a local resources failed * %-EIO: a DMA mapping error occurred */ -static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info, +static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *head, const struct svc_rdma_chunk *chunk) { const struct svc_rdma_segment *segment; @@ -779,56 +758,56 @@ static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info, ret = -EINVAL; pcl_for_each_segment(segment, chunk) { - ret = svc_rdma_build_read_segment(info, segment); + ret = svc_rdma_build_read_segment(rqstp, head, segment); if (ret < 0) break; - info->ri_totalbytes += segment->rs_length; + head->rc_readbytes += segment->rs_length; } return ret; } /** * svc_rdma_copy_inline_range - Copy part of the inline content into pages - * @info: context for RDMA Reads + * @rqstp: RPC transaction context + * @head: context for ongoing I/O * @offset: offset into the Receive buffer of region to copy * @remaining: length of region to copy * * Take a page at a time from rqstp->rq_pages and copy the inline * content from the Receive buffer into that page. Update - * info->ri_pageno and info->ri_pageoff so that the next RDMA Read + * head->rc_curpage and head->rc_pageoff so that the next RDMA Read * result will land contiguously with the copied content. * * Return values: * %0: Inline content was successfully copied * %-EINVAL: offset or length was incorrect */ -static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info, +static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *head, unsigned int offset, unsigned int remaining) { - struct svc_rdma_recv_ctxt *head = info->ri_readctxt; unsigned char *dst, *src = head->rc_recv_buf; - struct svc_rqst *rqstp = info->ri_rqst; unsigned int page_no, numpages; - numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT; + numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT; for (page_no = 0; page_no < numpages; page_no++) { unsigned int page_len; page_len = min_t(unsigned int, remaining, - PAGE_SIZE - info->ri_pageoff); + PAGE_SIZE - head->rc_pageoff); - if (!info->ri_pageoff) + if (!head->rc_pageoff) head->rc_page_count++; - dst = page_address(rqstp->rq_pages[info->ri_pageno]); - memcpy(dst + info->ri_pageno, src + offset, page_len); + dst = page_address(rqstp->rq_pages[head->rc_curpage]); + memcpy(dst + head->rc_curpage, src + offset, page_len); - info->ri_totalbytes += page_len; - info->ri_pageoff += page_len; - if (info->ri_pageoff == PAGE_SIZE) { - info->ri_pageno++; - info->ri_pageoff = 0; + head->rc_readbytes += page_len; + head->rc_pageoff += page_len; + if (head->rc_pageoff == PAGE_SIZE) { + head->rc_curpage++; + head->rc_pageoff = 0; } remaining -= page_len; offset += page_len; @@ -839,7 +818,8 @@ static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info, /** * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks - * @info: context for RDMA Reads + * @rqstp: RPC transaction context + * @head: context for ongoing I/O * * The chunk data lands in rqstp->rq_arg as a series of contiguous pages, * like an incoming TCP call. @@ -851,11 +831,11 @@ static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info, * %-ENOTCONN: posting failed (connection is lost), * %-EIO: rdma_rw initialization failed (DMA mapping, etc). */ -static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info) +static noinline int +svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *head) { - struct svc_rdma_recv_ctxt *head = info->ri_readctxt; const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; - struct xdr_buf *buf = &info->ri_rqst->rq_arg; struct svc_rdma_chunk *chunk, *next; unsigned int start, length; int ret; @@ -863,12 +843,12 @@ static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *inf start = 0; chunk = pcl_first_chunk(pcl); length = chunk->ch_position; - ret = svc_rdma_copy_inline_range(info, start, length); + ret = svc_rdma_copy_inline_range(rqstp, head, start, length); if (ret < 0) return ret; pcl_for_each_chunk(chunk, pcl) { - ret = svc_rdma_build_read_chunk(info, chunk); + ret = svc_rdma_build_read_chunk(rqstp, head, chunk); if (ret < 0) return ret; @@ -877,31 +857,21 @@ static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *inf break; start += length; - length = next->ch_position - info->ri_totalbytes; - ret = svc_rdma_copy_inline_range(info, start, length); + length = next->ch_position - head->rc_readbytes; + ret = svc_rdma_copy_inline_range(rqstp, head, start, length); if (ret < 0) return ret; } start += length; length = head->rc_byte_len - start; - ret = svc_rdma_copy_inline_range(info, start, length); - if (ret < 0) - return ret; - - buf->len += info->ri_totalbytes; - buf->buflen += info->ri_totalbytes; - - buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]); - buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes); - buf->pages = &info->ri_rqst->rq_pages[1]; - buf->page_len = info->ri_totalbytes - buf->head[0].iov_len; - return 0; + return svc_rdma_copy_inline_range(rqstp, head, start, length); } /** * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks - * @info: context for RDMA Reads + * @rqstp: RPC transaction context + * @head: context for ongoing I/O * * The chunk data lands in the page list of rqstp->rq_arg.pages. * @@ -916,50 +886,17 @@ static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *inf * %-ENOTCONN: posting failed (connection is lost), * %-EIO: rdma_rw initialization failed (DMA mapping, etc). */ -static int svc_rdma_read_data_item(struct svc_rdma_read_info *info) +static int svc_rdma_read_data_item(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *head) { - struct svc_rdma_recv_ctxt *head = info->ri_readctxt; - struct xdr_buf *buf = &info->ri_rqst->rq_arg; - struct svc_rdma_chunk *chunk; - unsigned int length; - int ret; - - chunk = pcl_first_chunk(&head->rc_read_pcl); - ret = svc_rdma_build_read_chunk(info, chunk); - if (ret < 0) - goto out; - - /* Split the Receive buffer between the head and tail - * buffers at Read chunk's position. XDR roundup of the - * chunk is not included in either the pagelist or in - * the tail. - */ - buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position; - buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position; - buf->head[0].iov_len = chunk->ch_position; - - /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). - * - * If the client already rounded up the chunk length, the - * length does not change. Otherwise, the length of the page - * list is increased to include XDR round-up. - * - * Currently these chunks always start at page offset 0, - * thus the rounded-up length never crosses a page boundary. - */ - buf->pages = &info->ri_rqst->rq_pages[0]; - length = xdr_align_size(chunk->ch_length); - buf->page_len = length; - buf->len += length; - buf->buflen += length; - -out: - return ret; + return svc_rdma_build_read_chunk(rqstp, head, + pcl_first_chunk(&head->rc_read_pcl)); } /** - * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk - * @info: context for RDMA Reads + * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk + * @rqstp: RPC transaction context + * @head: context for ongoing I/O * @chunk: parsed Call chunk to pull * @offset: offset of region to pull * @length: length of region to pull @@ -971,7 +908,8 @@ out: * %-ENOTCONN: posting failed (connection is lost), * %-EIO: rdma_rw initialization failed (DMA mapping, etc). */ -static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info, +static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *head, const struct svc_rdma_chunk *chunk, unsigned int offset, unsigned int length) { @@ -991,11 +929,11 @@ static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info, dummy.rs_length = min_t(u32, length, segment->rs_length) - offset; dummy.rs_offset = segment->rs_offset + offset; - ret = svc_rdma_build_read_segment(info, &dummy); + ret = svc_rdma_build_read_segment(rqstp, head, &dummy); if (ret < 0) break; - info->ri_totalbytes += dummy.rs_length; + head->rc_readbytes += dummy.rs_length; length -= dummy.rs_length; offset = 0; } @@ -1004,7 +942,8 @@ static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info, /** * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message - * @info: context for RDMA Reads + * @rqstp: RPC transaction context + * @head: context for ongoing I/O * * Return values: * %0: RDMA Read WQEs were successfully built @@ -1013,9 +952,9 @@ static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info, * %-ENOTCONN: posting failed (connection is lost), * %-EIO: rdma_rw initialization failed (DMA mapping, etc). */ -static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info) +static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *head) { - struct svc_rdma_recv_ctxt *head = info->ri_readctxt; const struct svc_rdma_chunk *call_chunk = pcl_first_chunk(&head->rc_call_pcl); const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; @@ -1024,17 +963,18 @@ static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info) int ret; if (pcl_is_empty(pcl)) - return svc_rdma_build_read_chunk(info, call_chunk); + return svc_rdma_build_read_chunk(rqstp, head, call_chunk); start = 0; chunk = pcl_first_chunk(pcl); length = chunk->ch_position; - ret = svc_rdma_read_chunk_range(info, call_chunk, start, length); + ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, + start, length); if (ret < 0) return ret; pcl_for_each_chunk(chunk, pcl) { - ret = svc_rdma_build_read_chunk(info, chunk); + ret = svc_rdma_build_read_chunk(rqstp, head, chunk); if (ret < 0) return ret; @@ -1043,8 +983,8 @@ static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info) break; start += length; - length = next->ch_position - info->ri_totalbytes; - ret = svc_rdma_read_chunk_range(info, call_chunk, + length = next->ch_position - head->rc_readbytes; + ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk, start, length); if (ret < 0) return ret; @@ -1052,12 +992,14 @@ static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info) start += length; length = call_chunk->ch_length - start; - return svc_rdma_read_chunk_range(info, call_chunk, start, length); + return svc_rdma_read_chunk_range(rqstp, head, call_chunk, + start, length); } /** * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message - * @info: context for RDMA Reads + * @rqstp: RPC transaction context + * @head: context for ongoing I/O * * The start of the data lands in the first page just after the * Transport header, and the rest lands in rqstp->rq_arg.pages. @@ -1073,25 +1015,31 @@ static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info) * %-ENOTCONN: posting failed (connection is lost), * %-EIO: rdma_rw initialization failed (DMA mapping, etc). */ -static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info) +static noinline int svc_rdma_read_special(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *head) { - struct xdr_buf *buf = &info->ri_rqst->rq_arg; - int ret; - - ret = svc_rdma_read_call_chunk(info); - if (ret < 0) - goto out; - - buf->len += info->ri_totalbytes; - buf->buflen += info->ri_totalbytes; + return svc_rdma_read_call_chunk(rqstp, head); +} - buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]); - buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes); - buf->pages = &info->ri_rqst->rq_pages[1]; - buf->page_len = info->ri_totalbytes - buf->head[0].iov_len; +/* Pages under I/O have been copied to head->rc_pages. Ensure that + * svc_xprt_release() does not put them when svc_rdma_recvfrom() + * returns. This has to be done after all Read WRs are constructed + * to properly handle a page that happens to be part of I/O on behalf + * of two different RDMA segments. + * + * Note: if the subsequent post_send fails, these pages have already + * been moved to head->rc_pages and thus will be cleaned up by + * svc_rdma_recv_ctxt_put(). + */ +static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *head) +{ + unsigned int i; -out: - return ret; + for (i = 0; i < head->rc_page_count; i++) { + head->rc_pages[i] = rqstp->rq_pages[i]; + rqstp->rq_pages[i] = NULL; + } } /** @@ -1121,49 +1069,27 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *head) { - struct svc_rdma_read_info *info; - struct svc_rdma_chunk_ctxt *cc; + struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; int ret; - info = svc_rdma_read_info_alloc(rdma); - if (!info) - return -ENOMEM; - cc = &info->ri_cc; - info->ri_rqst = rqstp; - info->ri_readctxt = head; - info->ri_pageno = 0; - info->ri_pageoff = 0; - info->ri_totalbytes = 0; + cc->cc_cqe.done = svc_rdma_wc_read_done; + cc->cc_sqecount = 0; + head->rc_pageoff = 0; + head->rc_curpage = 0; + head->rc_readbytes = 0; if (pcl_is_empty(&head->rc_call_pcl)) { if (head->rc_read_pcl.cl_count == 1) - ret = svc_rdma_read_data_item(info); + ret = svc_rdma_read_data_item(rqstp, head); else - ret = svc_rdma_read_multiple_chunks(info); + ret = svc_rdma_read_multiple_chunks(rqstp, head); } else - ret = svc_rdma_read_special(info); + ret = svc_rdma_read_special(rqstp, head); + svc_rdma_clear_rqst_pages(rqstp, head); if (ret < 0) - goto out_err; + return ret; trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); - init_completion(&cc->cc_done); - ret = svc_rdma_post_chunk_ctxt(cc); - if (ret < 0) - goto out_err; - - ret = 1; - wait_for_completion(&cc->cc_done); - if (cc->cc_status != IB_WC_SUCCESS) - ret = -EIO; - - /* rq_respages starts after the last arg page */ - rqstp->rq_respages = &rqstp->rq_pages[head->rc_page_count]; - rqstp->rq_next_page = rqstp->rq_respages + 1; - - /* Ensure svc_rdma_recv_ctxt_put() does not try to release pages */ - head->rc_page_count = 0; - -out_err: - svc_rdma_read_info_free(info); - return ret; + ret = svc_rdma_post_chunk_ctxt(rdma, cc); + return ret < 0 ? ret : 1; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index c6644cca52c5..1a49b7f02041 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -113,13 +113,6 @@ static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); -static void svc_rdma_send_cid_init(struct svcxprt_rdma *rdma, - struct rpc_rdma_cid *cid) -{ - cid->ci_queue_id = rdma->sc_sq_cq->res.id; - cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids); -} - static struct svc_rdma_send_ctxt * svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) { @@ -129,7 +122,7 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) void *buffer; int i; - ctxt = kmalloc_node(struct_size(ctxt, sc_sges, rdma->sc_max_send_sges), + ctxt = kzalloc_node(struct_size(ctxt, sc_sges, rdma->sc_max_send_sges), GFP_KERNEL, node); if (!ctxt) goto fail0; @@ -143,6 +136,7 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) svc_rdma_send_cid_init(rdma, &ctxt->sc_cid); + ctxt->sc_rdma = rdma; ctxt->sc_send_wr.next = NULL; ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe; ctxt->sc_send_wr.sg_list = ctxt->sc_sges; @@ -200,10 +194,11 @@ struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) spin_lock(&rdma->sc_send_lock); node = llist_del_first(&rdma->sc_send_ctxts); + spin_unlock(&rdma->sc_send_lock); if (!node) goto out_empty; + ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node); - spin_unlock(&rdma->sc_send_lock); out: rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0); @@ -216,22 +211,14 @@ out: return ctxt; out_empty: - spin_unlock(&rdma->sc_send_lock); ctxt = svc_rdma_send_ctxt_alloc(rdma); if (!ctxt) return NULL; goto out; } -/** - * svc_rdma_send_ctxt_put - Return send_ctxt to free list - * @rdma: controlling svcxprt_rdma - * @ctxt: object to return to the free list - * - * Pages left in sc_pages are DMA unmapped and released. - */ -void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, - struct svc_rdma_send_ctxt *ctxt) +static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) { struct ib_device *device = rdma->sc_cm_id->device; unsigned int i; @@ -243,18 +230,40 @@ void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, * remains mapped until @ctxt is destroyed. */ for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) { + trace_svcrdma_dma_unmap_page(&ctxt->sc_cid, + ctxt->sc_sges[i].addr, + ctxt->sc_sges[i].length); ib_dma_unmap_page(device, ctxt->sc_sges[i].addr, ctxt->sc_sges[i].length, DMA_TO_DEVICE); - trace_svcrdma_dma_unmap_page(rdma, - ctxt->sc_sges[i].addr, - ctxt->sc_sges[i].length); } llist_add(&ctxt->sc_node, &rdma->sc_send_ctxts); } +static void svc_rdma_send_ctxt_put_async(struct work_struct *work) +{ + struct svc_rdma_send_ctxt *ctxt; + + ctxt = container_of(work, struct svc_rdma_send_ctxt, sc_work); + svc_rdma_send_ctxt_release(ctxt->sc_rdma, ctxt); +} + +/** + * svc_rdma_send_ctxt_put - Return send_ctxt to free list + * @rdma: controlling svcxprt_rdma + * @ctxt: object to return to the free list + * + * Pages left in sc_pages are DMA unmapped and released. + */ +void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) +{ + INIT_WORK(&ctxt->sc_work, svc_rdma_send_ctxt_put_async); + queue_work(svcrdma_wq, &ctxt->sc_work); +} + /** * svc_rdma_wake_send_waiters - manage Send Queue accounting * @rdma: controlling transport @@ -289,7 +298,7 @@ static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) if (unlikely(wc->status != IB_WC_SUCCESS)) goto flushed; - trace_svcrdma_wc_send(wc, &ctxt->sc_cid); + trace_svcrdma_wc_send(&ctxt->sc_cid); svc_rdma_send_ctxt_put(rdma, ctxt); return; @@ -327,13 +336,13 @@ int svc_rdma_send(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt) while (1) { if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) { percpu_counter_inc(&svcrdma_stat_sq_starve); - trace_svcrdma_sq_full(rdma); + trace_svcrdma_sq_full(rdma, &ctxt->sc_cid); atomic_inc(&rdma->sc_sq_avail); wait_event(rdma->sc_send_wait, atomic_read(&rdma->sc_sq_avail) > 1); if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) return -ENOTCONN; - trace_svcrdma_sq_retry(rdma); + trace_svcrdma_sq_retry(rdma, &ctxt->sc_cid); continue; } @@ -344,7 +353,7 @@ int svc_rdma_send(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt) return 0; } - trace_svcrdma_sq_post_err(rdma, ret); + trace_svcrdma_sq_post_err(rdma, &ctxt->sc_cid, ret); svc_xprt_deferred_close(&rdma->sc_xprt); wake_up(&rdma->sc_send_wait); return ret; @@ -534,14 +543,14 @@ static int svc_rdma_page_dma_map(void *data, struct page *page, if (ib_dma_mapping_error(dev, dma_addr)) goto out_maperr; - trace_svcrdma_dma_map_page(rdma, dma_addr, len); + trace_svcrdma_dma_map_page(&ctxt->sc_cid, dma_addr, len); ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr; ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len; ctxt->sc_send_wr.num_sge++; return 0; out_maperr: - trace_svcrdma_dma_map_err(rdma, dma_addr, len); + trace_svcrdma_dma_map_err(&ctxt->sc_cid, dma_addr, len); return -EIO; } @@ -653,7 +662,7 @@ static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr, * svc_rdma_pull_up_needed - Determine whether to use pull-up * @rdma: controlling transport * @sctxt: send_ctxt for the Send WR - * @rctxt: Write and Reply chunks provided by client + * @write_pcl: Write chunk list provided by client * @xdr: xdr_buf containing RPC message to transmit * * Returns: @@ -662,7 +671,7 @@ static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr, */ static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma, const struct svc_rdma_send_ctxt *sctxt, - const struct svc_rdma_recv_ctxt *rctxt, + const struct svc_rdma_pcl *write_pcl, const struct xdr_buf *xdr) { /* Resources needed for the transport header */ @@ -672,7 +681,7 @@ static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma, }; int ret; - ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, + ret = pcl_process_nonpayloads(write_pcl, xdr, svc_rdma_xb_count_sges, &args); if (ret < 0) return false; @@ -728,7 +737,7 @@ static int svc_rdma_xb_linearize(const struct xdr_buf *xdr, * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer * @rdma: controlling transport * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared - * @rctxt: Write and Reply chunks provided by client + * @write_pcl: Write chunk list provided by client * @xdr: prepared xdr_buf containing RPC message * * The device is not capable of sending the reply directly. @@ -743,7 +752,7 @@ static int svc_rdma_xb_linearize(const struct xdr_buf *xdr, */ static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *sctxt, - const struct svc_rdma_recv_ctxt *rctxt, + const struct svc_rdma_pcl *write_pcl, const struct xdr_buf *xdr) { struct svc_rdma_pullup_data args = { @@ -751,7 +760,7 @@ static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma, }; int ret; - ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, + ret = pcl_process_nonpayloads(write_pcl, xdr, svc_rdma_xb_linearize, &args); if (ret < 0) return ret; @@ -764,7 +773,8 @@ static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma, /* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message * @rdma: controlling transport * @sctxt: send_ctxt for the Send WR - * @rctxt: Write and Reply chunks provided by client + * @write_pcl: Write chunk list provided by client + * @reply_pcl: Reply chunk provided by client * @xdr: prepared xdr_buf containing RPC message * * Returns: @@ -776,7 +786,8 @@ static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma, */ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *sctxt, - const struct svc_rdma_recv_ctxt *rctxt, + const struct svc_rdma_pcl *write_pcl, + const struct svc_rdma_pcl *reply_pcl, const struct xdr_buf *xdr) { struct svc_rdma_map_data args = { @@ -789,18 +800,18 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len; /* If there is a Reply chunk, nothing follows the transport - * header, and we're done here. + * header, so there is nothing to map. */ - if (!pcl_is_empty(&rctxt->rc_reply_pcl)) + if (!pcl_is_empty(reply_pcl)) return 0; /* For pull-up, svc_rdma_send() will sync the transport header. * No additional DMA mapping is necessary. */ - if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr)) - return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr); + if (svc_rdma_pull_up_needed(rdma, sctxt, write_pcl, xdr)) + return svc_rdma_pull_up_reply_msg(rdma, sctxt, write_pcl, xdr); - return pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, + return pcl_process_nonpayloads(write_pcl, xdr, svc_rdma_xb_dma_map, &args); } @@ -848,7 +859,8 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, { int ret; - ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqstp->rq_res); + ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl, + &rctxt->rc_reply_pcl, &rqstp->rq_res); if (ret < 0) return ret; diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 2abd895046ee..4f27325ace4a 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -125,6 +125,9 @@ static void qp_event_handler(struct ib_event *event, void *context) static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, struct net *net, int node) { + static struct lock_class_key svcrdma_rwctx_lock; + static struct lock_class_key svcrdma_sctx_lock; + static struct lock_class_key svcrdma_dto_lock; struct svcxprt_rdma *cma_xprt; cma_xprt = kzalloc_node(sizeof(*cma_xprt), GFP_KERNEL, node); @@ -134,6 +137,7 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, svc_xprt_init(net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); INIT_LIST_HEAD(&cma_xprt->sc_accept_q); INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); + INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); init_llist_head(&cma_xprt->sc_send_ctxts); init_llist_head(&cma_xprt->sc_recv_ctxts); init_llist_head(&cma_xprt->sc_rw_ctxts); @@ -141,8 +145,11 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, spin_lock_init(&cma_xprt->sc_lock); spin_lock_init(&cma_xprt->sc_rq_dto_lock); + lockdep_set_class(&cma_xprt->sc_rq_dto_lock, &svcrdma_dto_lock); spin_lock_init(&cma_xprt->sc_send_lock); + lockdep_set_class(&cma_xprt->sc_send_lock, &svcrdma_sctx_lock); spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); + lockdep_set_class(&cma_xprt->sc_rw_ctxt_lock, &svcrdma_rwctx_lock); /* * Note that this implies that the underlying transport support @@ -391,37 +398,35 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) dev = newxprt->sc_cm_id->device; newxprt->sc_port_num = newxprt->sc_cm_id->port_num; - /* Qualify the transport resource defaults with the - * capabilities of this particular device */ + newxprt->sc_max_req_size = svcrdma_max_req_size; + newxprt->sc_max_requests = svcrdma_max_requests; + newxprt->sc_max_bc_requests = svcrdma_max_bc_requests; + newxprt->sc_recv_batch = RPCRDMA_MAX_RECV_BATCH; + newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); + + /* Qualify the transport's resource defaults with the + * capabilities of this particular device. + */ + /* Transport header, head iovec, tail iovec */ newxprt->sc_max_send_sges = 3; /* Add one SGE per page list entry */ newxprt->sc_max_send_sges += (svcrdma_max_req_size / PAGE_SIZE) + 1; if (newxprt->sc_max_send_sges > dev->attrs.max_send_sge) newxprt->sc_max_send_sges = dev->attrs.max_send_sge; - newxprt->sc_max_req_size = svcrdma_max_req_size; - newxprt->sc_max_requests = svcrdma_max_requests; - newxprt->sc_max_bc_requests = svcrdma_max_bc_requests; - newxprt->sc_recv_batch = RPCRDMA_MAX_RECV_BATCH; rq_depth = newxprt->sc_max_requests + newxprt->sc_max_bc_requests + newxprt->sc_recv_batch; if (rq_depth > dev->attrs.max_qp_wr) { - pr_warn("svcrdma: reducing receive depth to %d\n", - dev->attrs.max_qp_wr); rq_depth = dev->attrs.max_qp_wr; newxprt->sc_recv_batch = 1; newxprt->sc_max_requests = rq_depth - 2; newxprt->sc_max_bc_requests = 2; } - newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); ctxts = rdma_rw_mr_factor(dev, newxprt->sc_port_num, RPCSVC_MAXPAGES); ctxts *= newxprt->sc_max_requests; newxprt->sc_sq_depth = rq_depth + ctxts; - if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) { - pr_warn("svcrdma: reducing send depth to %d\n", - dev->attrs.max_qp_wr); + if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) newxprt->sc_sq_depth = dev->attrs.max_qp_wr; - } atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); newxprt->sc_pd = ib_alloc_pd(dev, 0); @@ -451,8 +456,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) qp_attr.qp_type = IB_QPT_RC; qp_attr.send_cq = newxprt->sc_sq_cq; qp_attr.recv_cq = newxprt->sc_rq_cq; - dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n", - newxprt->sc_cm_id, newxprt->sc_pd); dprintk(" cap.max_send_wr = %d, cap.max_recv_wr = %d\n", qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr); dprintk(" cap.max_send_sge = %d, cap.max_recv_sge = %d\n", @@ -506,7 +509,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) } #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) - dprintk("svcrdma: new connection %p accepted:\n", newxprt); + dprintk("svcrdma: new connection accepted on device %s:\n", dev->name); sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap)); sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; @@ -547,6 +550,7 @@ static void __svc_rdma_free(struct work_struct *work) /* This blocks until the Completion Queues are empty */ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) ib_drain_qp(rdma->sc_qp); + flush_workqueue(svcrdma_wq); svc_rdma_flush_recv_queues(rdma); diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 28c0771c4e8c..4f8d7efa469f 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1364,7 +1364,7 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp) } rep->rr_cid.ci_queue_id = ep->re_attr.recv_cq->res.id; - trace_xprtrdma_post_recv(rep); + trace_xprtrdma_post_recv(&rep->rr_cid); rep->rr_recv_wr.next = wr; wr = &rep->rr_recv_wr; --needed; |