summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/rpc_rdma.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c125
1 files changed, 81 insertions, 44 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 694e9b13ecf0..ca4d6e4528f3 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -141,7 +141,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
if (xdr->page_len) {
remaining = xdr->page_len;
- offset = xdr->page_base & ~PAGE_MASK;
+ offset = offset_in_page(xdr->page_base);
count = 0;
while (remaining) {
remaining -= min_t(unsigned int,
@@ -222,7 +222,7 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
len = xdrbuf->page_len;
ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
- page_base = xdrbuf->page_base & ~PAGE_MASK;
+ page_base = offset_in_page(xdrbuf->page_base);
p = 0;
while (len && n < RPCRDMA_MAX_SEGS) {
if (!ppages[p]) {
@@ -540,7 +540,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
goto out;
page = virt_to_page(xdr->tail[0].iov_base);
- page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+ page_base = offset_in_page(xdr->tail[0].iov_base);
/* If the content in the page list is an odd length,
* xdr_write_pages() has added a pad at the beginning
@@ -557,7 +557,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
*/
if (xdr->page_len) {
ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
- page_base = xdr->page_base & ~PAGE_MASK;
+ page_base = offset_in_page(xdr->page_base);
remaining = xdr->page_len;
while (remaining) {
sge_no++;
@@ -587,7 +587,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
*/
if (xdr->tail[0].iov_len) {
page = virt_to_page(xdr->tail[0].iov_base);
- page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+ page_base = offset_in_page(xdr->tail[0].iov_base);
len = xdr->tail[0].iov_len;
map_tail:
@@ -734,6 +734,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
rpclen = 0;
}
+ req->rl_xid = rqst->rq_xid;
+ rpcrdma_insert_req(&r_xprt->rx_buf, req);
+
/* This implementation supports the following combinations
* of chunk lists in one RPC-over-RDMA Call message:
*
@@ -875,9 +878,9 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
srcp += curlen;
copy_len -= curlen;
- page_base = rqst->rq_rcv_buf.page_base;
- ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
- page_base &= ~PAGE_MASK;
+ ppages = rqst->rq_rcv_buf.pages +
+ (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
+ page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
fixup_copy_count = 0;
if (copy_len && rqst->rq_rcv_buf.page_len) {
int pagelist_len;
@@ -928,6 +931,24 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
return fixup_copy_count;
}
+/* Caller must guarantee @rep remains stable during this call.
+ */
+static void
+rpcrdma_mark_remote_invalidation(struct list_head *mws,
+ struct rpcrdma_rep *rep)
+{
+ struct rpcrdma_mw *mw;
+
+ if (!(rep->rr_wc_flags & IB_WC_WITH_INVALIDATE))
+ return;
+
+ list_for_each_entry(mw, mws, mw_list)
+ if (mw->mw_handle == rep->rr_inv_rkey) {
+ mw->mw_flags = RPCRDMA_MW_F_RI;
+ break; /* only one invalidated MR per RPC */
+ }
+}
+
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
/* By convention, backchannel calls arrive via rdma_msg type
* messages, and never populate the chunk lists. This makes
@@ -969,14 +990,16 @@ rpcrdma_reply_handler(struct work_struct *work)
{
struct rpcrdma_rep *rep =
container_of(work, struct rpcrdma_rep, rr_work);
+ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct rpcrdma_msg *headerp;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
- struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
- struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
int rdmalen, status, rmerr;
unsigned long cwnd;
+ struct list_head mws;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
@@ -994,27 +1017,45 @@ rpcrdma_reply_handler(struct work_struct *work)
/* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks.
*/
- spin_lock_bh(&xprt->transport_lock);
- rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
- if (!rqst)
+ spin_lock(&buf->rb_lock);
+ req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
+ headerp->rm_xid);
+ if (!req)
goto out_nomatch;
-
- req = rpcr_to_rdmar(rqst);
if (req->rl_reply)
goto out_duplicate;
- /* Sanity checking has passed. We are now committed
- * to complete this transaction.
+ list_replace_init(&req->rl_registered, &mws);
+ rpcrdma_mark_remote_invalidation(&mws, rep);
+
+ /* Avoid races with signals and duplicate replies
+ * by marking this req as matched.
*/
- list_del_init(&rqst->rq_list);
- spin_unlock_bh(&xprt->transport_lock);
+ req->rl_reply = rep;
+ spin_unlock(&buf->rb_lock);
+
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
__func__, rep, req, be32_to_cpu(headerp->rm_xid));
- /* from here on, the reply is no longer an orphan */
- req->rl_reply = rep;
- xprt->reestablish_timeout = 0;
+ /* Invalidate and unmap the data payloads before waking the
+ * waiting application. This guarantees the memory regions
+ * are properly fenced from the server before the application
+ * accesses the data. It also ensures proper send flow control:
+ * waking the next RPC waits until this RPC has relinquished
+ * all its Send Queue entries.
+ */
+ if (!list_empty(&mws))
+ r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
+ /* Perform XID lookup, reconstruction of the RPC reply, and
+ * RPC completion while holding the transport lock to ensure
+ * the rep, rqst, and rq_task pointers remain stable.
+ */
+ spin_lock_bh(&xprt->transport_lock);
+ rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+ if (!rqst)
+ goto out_norqst;
+ xprt->reestablish_timeout = 0;
if (headerp->rm_vers != rpcrdma_version)
goto out_badversion;
@@ -1024,12 +1065,9 @@ rpcrdma_reply_handler(struct work_struct *work)
case rdma_msg:
/* never expect read chunks */
/* never expect reply chunks (two ways to check) */
- /* never expect write chunks without having offered RDMA */
if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
(headerp->rm_body.rm_chunks[1] == xdr_zero &&
- headerp->rm_body.rm_chunks[2] != xdr_zero) ||
- (headerp->rm_body.rm_chunks[1] != xdr_zero &&
- list_empty(&req->rl_registered)))
+ headerp->rm_body.rm_chunks[2] != xdr_zero))
goto badheader;
if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
/* count any expected write chunks in read reply */
@@ -1066,8 +1104,7 @@ rpcrdma_reply_handler(struct work_struct *work)
/* never expect read or write chunks, always reply chunks */
if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
headerp->rm_body.rm_chunks[1] != xdr_zero ||
- headerp->rm_body.rm_chunks[2] != xdr_one ||
- list_empty(&req->rl_registered))
+ headerp->rm_body.rm_chunks[2] != xdr_one)
goto badheader;
iptr = (__be32 *)((unsigned char *)headerp +
RPCRDMA_HDRLEN_MIN);
@@ -1093,17 +1130,6 @@ badheader:
}
out:
- /* Invalidate and flush the data payloads before waking the
- * waiting application. This guarantees the memory region is
- * properly fenced from the server before the application
- * accesses the data. It also ensures proper send flow
- * control: waking the next RPC waits until this RPC has
- * relinquished all its Send Queue entries.
- */
- if (!list_empty(&req->rl_registered))
- r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
-
- spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd;
xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
@@ -1112,7 +1138,7 @@ out:
xprt_complete_rqst(rqst->rq_task, status);
spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
- __func__, xprt, rqst, status);
+ __func__, xprt, rqst, status);
return;
out_badstatus:
@@ -1161,26 +1187,37 @@ out_rdmaerr:
r_xprt->rx_stats.bad_reply_count++;
goto out;
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
+/* The req was still available, but by the time the transport_lock
+ * was acquired, the rqst and task had been released. Thus the RPC
+ * has already been terminated.
*/
+out_norqst:
+ spin_unlock_bh(&xprt->transport_lock);
+ rpcrdma_buffer_put(req);
+ dprintk("RPC: %s: race, no rqst left for req %p\n",
+ __func__, req);
+ return;
+
out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
out_nomatch:
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&buf->rb_lock);
dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
__func__, be32_to_cpu(headerp->rm_xid),
rep->rr_len);
goto repost;
out_duplicate:
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&buf->rb_lock);
dprintk("RPC: %s: "
"duplicate reply %p to RPC request %p: xid 0x%08x\n",
__func__, rep, req, be32_to_cpu(headerp->rm_xid));
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
repost:
r_xprt->rx_stats.bad_reply_count++;
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))