diff options
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 4 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 209 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 39 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 6 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 131 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 29 |
6 files changed, 208 insertions, 210 deletions
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index a249837d6a55..1151efd09b27 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -155,9 +155,11 @@ void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) { struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + struct rpcrdma_rep *rep = req->rl_reply; struct rpc_xprt *xprt = rqst->rq_xprt; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); - rpcrdma_recv_buffer_put(req->rl_reply); + rpcrdma_rep_put(&r_xprt->rx_buf, rep); req->rl_reply = NULL; spin_lock(&xprt->bc_pa_lock); diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 766a1048a48a..229fcc9a9064 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -49,20 +49,13 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif -/** - * frwr_release_mr - Destroy one MR - * @mr: MR allocated by frwr_mr_init - * - */ -void frwr_release_mr(struct rpcrdma_mr *mr) +static void frwr_cid_init(struct rpcrdma_ep *ep, + struct rpcrdma_mr *mr) { - int rc; + struct rpc_rdma_cid *cid = &mr->mr_cid; - rc = ib_dereg_mr(mr->frwr.fr_mr); - if (rc) - trace_xprtrdma_frwr_dereg(mr, rc); - kfree(mr->mr_sg); - kfree(mr); + cid->ci_queue_id = ep->re_attr.send_cq->res.id; + cid->ci_completion_id = mr->mr_ibmr->res.id; } static void frwr_mr_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) @@ -75,20 +68,22 @@ static void frwr_mr_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) } } -static void frwr_mr_recycle(struct rpcrdma_mr *mr) +/** + * frwr_mr_release - Destroy one MR + * @mr: MR allocated by frwr_mr_init + * + */ +void frwr_mr_release(struct rpcrdma_mr *mr) { - struct rpcrdma_xprt *r_xprt = mr->mr_xprt; - - trace_xprtrdma_mr_recycle(mr); - - frwr_mr_unmap(r_xprt, mr); + int rc; - spin_lock(&r_xprt->rx_buf.rb_lock); - list_del(&mr->mr_all); - r_xprt->rx_stats.mrs_recycled++; - spin_unlock(&r_xprt->rx_buf.rb_lock); + frwr_mr_unmap(mr->mr_xprt, mr); - frwr_release_mr(mr); + rc = ib_dereg_mr(mr->mr_ibmr); + if (rc) + trace_xprtrdma_frwr_dereg(mr, rc); + kfree(mr->mr_sg); + kfree(mr); } static void frwr_mr_put(struct rpcrdma_mr *mr) @@ -144,10 +139,11 @@ int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) goto out_list_err; mr->mr_xprt = r_xprt; - mr->frwr.fr_mr = frmr; + mr->mr_ibmr = frmr; mr->mr_device = NULL; INIT_LIST_HEAD(&mr->mr_list); - init_completion(&mr->frwr.fr_linv_done); + init_completion(&mr->mr_linv_done); + frwr_cid_init(ep, mr); sg_init_table(sg, depth); mr->mr_sg = sg; @@ -257,6 +253,7 @@ int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device) ep->re_attr.cap.max_send_wr += 1; /* for ib_drain_sq */ ep->re_attr.cap.max_recv_wr = ep->re_max_requests; ep->re_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; + ep->re_attr.cap.max_recv_wr += RPCRDMA_MAX_RECV_BATCH; ep->re_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */ ep->re_max_rdma_segs = @@ -326,7 +323,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, goto out_dmamap_err; mr->mr_device = ep->re_id->device; - ibmr = mr->frwr.fr_mr; + ibmr = mr->mr_ibmr; n = ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, PAGE_SIZE); if (n != dma_nents) goto out_mapmr_err; @@ -336,7 +333,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, key = (u8)(ibmr->rkey & 0x000000FF); ib_update_fast_reg_key(ibmr, ++key); - reg_wr = &mr->frwr.fr_regwr; + reg_wr = &mr->mr_regwr; reg_wr->mr = ibmr; reg_wr->key = ibmr->rkey; reg_wr->access = writing ? @@ -364,29 +361,19 @@ out_mapmr_err: * @cq: completion queue * @wc: WCE for a completed FastReg WR * + * Each flushed MR gets destroyed after the QP has drained. */ static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_fastreg(wc, &frwr->fr_cid); - /* The MR will get recycled when the associated req is retransmitted */ + trace_xprtrdma_wc_fastreg(wc, &mr->mr_cid); rpcrdma_flush_disconnect(cq->cq_context, wc); } -static void frwr_cid_init(struct rpcrdma_ep *ep, - struct rpcrdma_frwr *frwr) -{ - struct rpc_rdma_cid *cid = &frwr->fr_cid; - - cid->ci_queue_id = ep->re_attr.send_cq->res.id; - cid->ci_completion_id = frwr->fr_mr->res.id; -} - /** * frwr_send - post Send WRs containing the RPC Call message * @r_xprt: controlling transport instance @@ -403,27 +390,36 @@ static void frwr_cid_init(struct rpcrdma_ep *ep, */ int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { + struct ib_send_wr *post_wr, *send_wr = &req->rl_wr; struct rpcrdma_ep *ep = r_xprt->rx_ep; - struct ib_send_wr *post_wr; struct rpcrdma_mr *mr; + unsigned int num_wrs; - post_wr = &req->rl_wr; + num_wrs = 1; + post_wr = send_wr; list_for_each_entry(mr, &req->rl_registered, mr_list) { - struct rpcrdma_frwr *frwr; - - frwr = &mr->frwr; - - frwr->fr_cqe.done = frwr_wc_fastreg; - frwr_cid_init(ep, frwr); - frwr->fr_regwr.wr.next = post_wr; - frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe; - frwr->fr_regwr.wr.num_sge = 0; - frwr->fr_regwr.wr.opcode = IB_WR_REG_MR; - frwr->fr_regwr.wr.send_flags = 0; + trace_xprtrdma_mr_fastreg(mr); + + mr->mr_cqe.done = frwr_wc_fastreg; + mr->mr_regwr.wr.next = post_wr; + mr->mr_regwr.wr.wr_cqe = &mr->mr_cqe; + mr->mr_regwr.wr.num_sge = 0; + mr->mr_regwr.wr.opcode = IB_WR_REG_MR; + mr->mr_regwr.wr.send_flags = 0; + post_wr = &mr->mr_regwr.wr; + ++num_wrs; + } - post_wr = &frwr->fr_regwr.wr; + if ((kref_read(&req->rl_kref) > 1) || num_wrs > ep->re_send_count) { + send_wr->send_flags |= IB_SEND_SIGNALED; + ep->re_send_count = min_t(unsigned int, ep->re_send_batch, + num_wrs - ep->re_send_count); + } else { + send_wr->send_flags &= ~IB_SEND_SIGNALED; + ep->re_send_count -= num_wrs; } + trace_xprtrdma_post_send(req); return ib_post_send(ep->re_id->qp, post_wr, NULL); } @@ -440,6 +436,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) list_for_each_entry(mr, mrs, mr_list) if (mr->mr_handle == rep->rr_inv_rkey) { list_del_init(&mr->mr_list); + trace_xprtrdma_mr_reminv(mr); frwr_mr_put(mr); break; /* only one invalidated MR per RPC */ } @@ -447,9 +444,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) static void frwr_mr_done(struct ib_wc *wc, struct rpcrdma_mr *mr) { - if (wc->status != IB_WC_SUCCESS) - frwr_mr_recycle(mr); - else + if (likely(wc->status == IB_WC_SUCCESS)) frwr_mr_put(mr); } @@ -462,12 +457,10 @@ static void frwr_mr_done(struct ib_wc *wc, struct rpcrdma_mr *mr) static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_li(wc, &frwr->fr_cid); + trace_xprtrdma_wc_li(wc, &mr->mr_cid); frwr_mr_done(wc, mr); rpcrdma_flush_disconnect(cq->cq_context, wc); @@ -483,14 +476,12 @@ static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_li_wake(wc, &frwr->fr_cid); + trace_xprtrdma_wc_li_wake(wc, &mr->mr_cid); frwr_mr_done(wc, mr); - complete(&frwr->fr_linv_done); + complete(&mr->mr_linv_done); rpcrdma_flush_disconnect(cq->cq_context, wc); } @@ -511,7 +502,6 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) struct ib_send_wr *first, **prev, *last; struct rpcrdma_ep *ep = r_xprt->rx_ep; const struct ib_send_wr *bad_wr; - struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; int rc; @@ -520,35 +510,34 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * Chain the LOCAL_INV Work Requests and post them with * a single ib_post_send() call. */ - frwr = NULL; prev = &first; while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; - frwr = &mr->frwr; - frwr->fr_cqe.done = frwr_wc_localinv; - frwr_cid_init(ep, frwr); - last = &frwr->fr_invwr; + last = &mr->mr_invwr; last->next = NULL; - last->wr_cqe = &frwr->fr_cqe; + last->wr_cqe = &mr->mr_cqe; last->sg_list = NULL; last->num_sge = 0; last->opcode = IB_WR_LOCAL_INV; last->send_flags = IB_SEND_SIGNALED; last->ex.invalidate_rkey = mr->mr_handle; + last->wr_cqe->done = frwr_wc_localinv; + *prev = last; prev = &last->next; } + mr = container_of(last, struct rpcrdma_mr, mr_invwr); /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain * are complete. */ - frwr->fr_cqe.done = frwr_wc_localinv_wake; - reinit_completion(&frwr->fr_linv_done); + last->wr_cqe->done = frwr_wc_localinv_wake; + reinit_completion(&mr->mr_linv_done); /* Transport disconnect drains the receive CQ before it * replaces the QP. The RPC reply handler won't call us @@ -562,22 +551,12 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * not happen, so don't wait in that case. */ if (bad_wr != first) - wait_for_completion(&frwr->fr_linv_done); + wait_for_completion(&mr->mr_linv_done); if (!rc) return; - /* Recycle MRs in the LOCAL_INV chain that did not get posted. - */ + /* On error, the MRs get destroyed once the QP has drained. */ trace_xprtrdma_post_linv_err(req, rc); - while (bad_wr) { - frwr = container_of(bad_wr, struct rpcrdma_frwr, - fr_invwr); - mr = container_of(frwr, struct rpcrdma_mr, frwr); - bad_wr = bad_wr->next; - - list_del_init(&mr->mr_list); - frwr_mr_recycle(mr); - } } /** @@ -589,20 +568,24 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); - struct rpcrdma_rep *rep = mr->mr_req->rl_reply; + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); + struct rpcrdma_rep *rep; /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_li_done(wc, &frwr->fr_cid); - frwr_mr_done(wc, mr); + trace_xprtrdma_wc_li_done(wc, &mr->mr_cid); - /* Ensure @rep is generated before frwr_mr_done */ + /* Ensure that @rep is generated before the MR is released */ + rep = mr->mr_req->rl_reply; smp_rmb(); - rpcrdma_complete_rqst(rep); - rpcrdma_flush_disconnect(cq->cq_context, wc); + if (wc->status != IB_WC_SUCCESS) { + if (rep) + rpcrdma_unpin_rqst(rep); + rpcrdma_flush_disconnect(cq->cq_context, wc); + return; + } + frwr_mr_put(mr); + rpcrdma_complete_rqst(rep); } /** @@ -619,33 +602,29 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { struct ib_send_wr *first, *last, **prev; struct rpcrdma_ep *ep = r_xprt->rx_ep; - const struct ib_send_wr *bad_wr; - struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; int rc; /* Chain the LOCAL_INV Work Requests and post them with * a single ib_post_send() call. */ - frwr = NULL; prev = &first; while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; - frwr = &mr->frwr; - frwr->fr_cqe.done = frwr_wc_localinv; - frwr_cid_init(ep, frwr); - last = &frwr->fr_invwr; + last = &mr->mr_invwr; last->next = NULL; - last->wr_cqe = &frwr->fr_cqe; + last->wr_cqe = &mr->mr_cqe; last->sg_list = NULL; last->num_sge = 0; last->opcode = IB_WR_LOCAL_INV; last->send_flags = IB_SEND_SIGNALED; last->ex.invalidate_rkey = mr->mr_handle; + last->wr_cqe->done = frwr_wc_localinv; + *prev = last; prev = &last->next; } @@ -655,31 +634,23 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * are complete. The last completion will wake up the * RPC waiter. */ - frwr->fr_cqe.done = frwr_wc_localinv_done; + last->wr_cqe->done = frwr_wc_localinv_done; /* Transport disconnect drains the receive CQ before it * replaces the QP. The RPC reply handler won't call us * unless re_id->qp is a valid pointer. */ - bad_wr = NULL; - rc = ib_post_send(ep->re_id->qp, first, &bad_wr); + rc = ib_post_send(ep->re_id->qp, first, NULL); if (!rc) return; - /* Recycle MRs in the LOCAL_INV chain that did not get posted. - */ + /* On error, the MRs get destroyed once the QP has drained. */ trace_xprtrdma_post_linv_err(req, rc); - while (bad_wr) { - frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); - mr = container_of(frwr, struct rpcrdma_mr, frwr); - bad_wr = bad_wr->next; - - frwr_mr_recycle(mr); - } /* The final LOCAL_INV WR in the chain is supposed to - * do the wake. If it was never posted, the wake will - * not happen, so wake here in that case. + * do the wake. If it was never posted, the wake does + * not happen. Unpin the rqst in preparation for its + * retransmission. */ - rpcrdma_complete_rqst(req->rl_reply); + rpcrdma_unpin_rqst(req->rl_reply); } diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 292f066d006e..649f7d8b9733 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1326,9 +1326,35 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, return -EIO; } -/* Perform XID lookup, reconstruction of the RPC reply, and - * RPC completion while holding the transport lock to ensure - * the rep, rqst, and rq_task pointers remain stable. +/** + * rpcrdma_unpin_rqst - Release rqst without completing it + * @rep: RPC/RDMA Receive context + * + * This is done when a connection is lost so that a Reply + * can be dropped and its matching Call can be subsequently + * retransmitted on a new connection. + */ +void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep) +{ + struct rpc_xprt *xprt = &rep->rr_rxprt->rx_xprt; + struct rpc_rqst *rqst = rep->rr_rqst; + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + + req->rl_reply = NULL; + rep->rr_rqst = NULL; + + spin_lock(&xprt->queue_lock); + xprt_unpin_rqst(rqst); + spin_unlock(&xprt->queue_lock); +} + +/** + * rpcrdma_complete_rqst - Pass completed rqst back to RPC + * @rep: RPC/RDMA Receive context + * + * Reconstruct the RPC reply and complete the transaction + * while @rqst is still pinned to ensure the rep, rqst, and + * rq_task pointers remain stable. */ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) { @@ -1430,13 +1456,14 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) credits = 1; /* don't deadlock */ else if (credits > r_xprt->rx_ep->re_max_requests) credits = r_xprt->rx_ep->re_max_requests; + rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1), + false); if (buf->rb_credits != credits) rpcrdma_update_cwnd(r_xprt, credits); - rpcrdma_post_recvs(r_xprt, false); req = rpcr_to_rdmar(rqst); if (unlikely(req->rl_reply)) - rpcrdma_recv_buffer_put(req->rl_reply); + rpcrdma_rep_put(buf, req->rl_reply); req->rl_reply = rep; rep->rr_rqst = rqst; @@ -1464,5 +1491,5 @@ out_shortreply: trace_xprtrdma_reply_short_err(rep); out: - rpcrdma_recv_buffer_put(rep); + rpcrdma_rep_put(buf, rep); } diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 78d29d1bcc20..09953597d055 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -262,8 +262,10 @@ xprt_rdma_connect_worker(struct work_struct *work) * xprt_rdma_inject_disconnect - inject a connection fault * @xprt: transport context * - * If @xprt is connected, disconnect it to simulate spurious connection - * loss. + * If @xprt is connected, disconnect it to simulate spurious + * connection loss. Caller must hold @xprt's send lock to + * ensure that data structures and hardware resources are + * stable during the rdma_disconnect() call. */ static void xprt_rdma_inject_disconnect(struct rpc_xprt *xprt) diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index ec912cf9c618..1e965a380896 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -101,6 +101,12 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) struct rpcrdma_ep *ep = r_xprt->rx_ep; struct rdma_cm_id *id = ep->re_id; + /* Wait for rpcrdma_post_recvs() to leave its critical + * section. + */ + if (atomic_inc_return(&ep->re_receiving) > 1) + wait_for_completion(&ep->re_done); + /* Flush Receives, then wait for deferred Reply work * to complete. */ @@ -114,22 +120,6 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) rpcrdma_ep_put(ep); } -/** - * rpcrdma_qp_event_handler - Handle one QP event (error notification) - * @event: details of the event - * @context: ep that owns QP where event occurred - * - * Called from the RDMA provider (device driver) possibly in an interrupt - * context. The QP is always destroyed before the ID, so the ID will be - * reliably available when this handler is invoked. - */ -static void rpcrdma_qp_event_handler(struct ib_event *event, void *context) -{ - struct rpcrdma_ep *ep = context; - - trace_xprtrdma_qp_event(ep, event); -} - /* Ensure xprt_force_disconnect() is invoked exactly once when a * connection is closed or lost. (The important thing is it needs * to be invoked "at least" once). @@ -205,7 +195,7 @@ static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) out_flushed: rpcrdma_flush_disconnect(r_xprt, wc); - rpcrdma_rep_destroy(rep); + rpcrdma_rep_put(&r_xprt->rx_buf, rep); } static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep, @@ -414,6 +404,7 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) __module_get(THIS_MODULE); device = id->device; ep->re_id = id; + reinit_completion(&ep->re_done); ep->re_max_requests = r_xprt->rx_xprt.max_reqs; ep->re_inline_send = xprt_rdma_max_inline_write; @@ -424,8 +415,6 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests); - ep->re_attr.event_handler = rpcrdma_qp_event_handler; - ep->re_attr.qp_context = ep; ep->re_attr.srq = NULL; ep->re_attr.cap.max_inline_data = 0; ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR; @@ -535,7 +524,7 @@ int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt) * outstanding Receives. */ rpcrdma_ep_get(ep); - rpcrdma_post_recvs(r_xprt, true); + rpcrdma_post_recvs(r_xprt, 1, true); rc = rdma_connect(ep->re_id, &ep->re_remote_cma); if (rc) @@ -954,13 +943,11 @@ static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt) rpcrdma_req_reset(req); } -/* No locking needed here. This function is called only by the - * Receive completion handler. - */ static noinline struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) { + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_rep *rep; rep = kzalloc(sizeof(*rep), GFP_KERNEL); @@ -987,7 +974,10 @@ struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; rep->rr_recv_wr.num_sge = 1; rep->rr_temp = temp; - list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps); + + spin_lock(&buf->rb_lock); + list_add(&rep->rr_all, &buf->rb_all_reps); + spin_unlock(&buf->rb_lock); return rep; out_free_regbuf: @@ -998,16 +988,23 @@ out: return NULL; } -/* No locking needed here. This function is invoked only by the - * Receive completion handler, or during transport shutdown. - */ -static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) +static void rpcrdma_rep_free(struct rpcrdma_rep *rep) { - list_del(&rep->rr_all); rpcrdma_regbuf_free(rep->rr_rdmabuf); kfree(rep); } +static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) +{ + struct rpcrdma_buffer *buf = &rep->rr_rxprt->rx_buf; + + spin_lock(&buf->rb_lock); + list_del(&rep->rr_all); + spin_unlock(&buf->rb_lock); + + rpcrdma_rep_free(rep); +} + static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf) { struct llist_node *node; @@ -1019,12 +1016,21 @@ static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf) return llist_entry(node, struct rpcrdma_rep, rr_node); } -static void rpcrdma_rep_put(struct rpcrdma_buffer *buf, - struct rpcrdma_rep *rep) +/** + * rpcrdma_rep_put - Release rpcrdma_rep back to free list + * @buf: buffer pool + * @rep: rep to release + * + */ +void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep) { llist_add(&rep->rr_node, &buf->rb_free_reps); } +/* Caller must ensure the QP is quiescent (RQ is drained) before + * invoking this function, to guarantee rb_all_reps is not + * changing. + */ static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; @@ -1032,7 +1038,7 @@ static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt) list_for_each_entry(rep, &buf->rb_all_reps, rr_all) { rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf); - rep->rr_temp = true; + rep->rr_temp = true; /* Mark this rep for destruction */ } } @@ -1040,8 +1046,18 @@ static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_rep *rep; - while ((rep = rpcrdma_rep_get_locked(buf)) != NULL) - rpcrdma_rep_destroy(rep); + spin_lock(&buf->rb_lock); + while ((rep = list_first_entry_or_null(&buf->rb_all_reps, + struct rpcrdma_rep, + rr_all)) != NULL) { + list_del(&rep->rr_all); + spin_unlock(&buf->rb_lock); + + rpcrdma_rep_free(rep); + + spin_lock(&buf->rb_lock); + } + spin_unlock(&buf->rb_lock); } /** @@ -1104,7 +1120,7 @@ void rpcrdma_req_destroy(struct rpcrdma_req *req) list_del(&mr->mr_all); spin_unlock(&buf->rb_lock); - frwr_release_mr(mr); + frwr_mr_release(mr); } rpcrdma_regbuf_free(req->rl_recvbuf); @@ -1135,7 +1151,7 @@ static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt) list_del(&mr->mr_all); spin_unlock(&buf->rb_lock); - frwr_release_mr(mr); + frwr_mr_release(mr); spin_lock(&buf->rb_lock); } @@ -1221,17 +1237,6 @@ void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) spin_unlock(&buffers->rb_lock); } -/** - * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list - * @rep: rep to release - * - * Used after error conditions. - */ -void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) -{ - rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep); -} - /* Returns a pointer to a rpcrdma_regbuf object, or NULL. * * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for @@ -1342,21 +1347,7 @@ static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb) */ int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { - struct ib_send_wr *send_wr = &req->rl_wr; - struct rpcrdma_ep *ep = r_xprt->rx_ep; - int rc; - - if (!ep->re_send_count || kref_read(&req->rl_kref) > 1) { - send_wr->send_flags |= IB_SEND_SIGNALED; - ep->re_send_count = ep->re_send_batch; - } else { - send_wr->send_flags &= ~IB_SEND_SIGNALED; - --ep->re_send_count; - } - - trace_xprtrdma_post_send(req); - rc = frwr_send(r_xprt, req); - if (rc) + if (frwr_send(r_xprt, req)) return -ENOTCONN; return 0; } @@ -1364,27 +1355,30 @@ int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) /** * rpcrdma_post_recvs - Refill the Receive Queue * @r_xprt: controlling transport instance - * @temp: mark Receive buffers to be deleted after use + * @needed: current credit grant + * @temp: mark Receive buffers to be deleted after one use * */ -void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) +void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ep *ep = r_xprt->rx_ep; struct ib_recv_wr *wr, *bad_wr; struct rpcrdma_rep *rep; - int needed, count, rc; + int count, rc; rc = 0; count = 0; - needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); if (likely(ep->re_receive_count > needed)) goto out; needed -= ep->re_receive_count; if (!temp) needed += RPCRDMA_MAX_RECV_BATCH; + if (atomic_inc_return(&ep->re_receiving) > 1) + goto out; + /* fast path: all needed reps can be found on the free list */ wr = NULL; while (needed) { @@ -1410,6 +1404,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) rc = ib_post_recv(ep->re_id->qp, wr, (const struct ib_recv_wr **)&bad_wr); + if (atomic_dec_return(&ep->re_receiving) > 0) + complete(&ep->re_done); + out: trace_xprtrdma_post_recvs(r_xprt, count, rc); if (rc) { @@ -1418,7 +1415,7 @@ out: rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr); wr = wr->next; - rpcrdma_recv_buffer_put(rep); + rpcrdma_rep_put(buf, rep); --count; } } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index fe3be985e239..436ad7312614 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -83,6 +83,7 @@ struct rpcrdma_ep { unsigned int re_max_inline_recv; int re_async_rc; int re_connect_status; + atomic_t re_receiving; atomic_t re_force_disconnect; struct ib_qp_init_attr re_attr; wait_queue_head_t re_connect_wait; @@ -228,31 +229,28 @@ struct rpcrdma_sendctx { * An external memory region is any buffer or page that is registered * on the fly (ie, not pre-registered). */ -struct rpcrdma_frwr { - struct ib_mr *fr_mr; - struct ib_cqe fr_cqe; - struct rpc_rdma_cid fr_cid; - struct completion fr_linv_done; - union { - struct ib_reg_wr fr_regwr; - struct ib_send_wr fr_invwr; - }; -}; - struct rpcrdma_req; struct rpcrdma_mr { struct list_head mr_list; struct rpcrdma_req *mr_req; + + struct ib_mr *mr_ibmr; struct ib_device *mr_device; struct scatterlist *mr_sg; int mr_nents; enum dma_data_direction mr_dir; - struct rpcrdma_frwr frwr; + struct ib_cqe mr_cqe; + struct completion mr_linv_done; + union { + struct ib_reg_wr mr_regwr; + struct ib_send_wr mr_invwr; + }; struct rpcrdma_xprt *mr_xprt; u32 mr_handle; u32 mr_length; u64 mr_offset; struct list_head mr_all; + struct rpc_rdma_cid mr_cid; }; /* @@ -461,7 +459,7 @@ int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt); void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt); int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); -void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); +void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp); /* * Buffer calls - xprtrdma/verbs.c @@ -480,7 +478,7 @@ void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt); struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req); -void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); +void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep); bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags); @@ -527,7 +525,7 @@ rpcrdma_data_dir(bool writing) void frwr_reset(struct rpcrdma_req *req); int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device); int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr); -void frwr_release_mr(struct rpcrdma_mr *mr); +void frwr_mr_release(struct rpcrdma_mr *mr); struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing, __be32 xid, @@ -560,6 +558,7 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep); void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); +void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep); static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) |