summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/rpc_rdma.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c108
1 files changed, 78 insertions, 30 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 0f28f2d743ed..888823bb6dae 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -132,6 +132,33 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
return tlen;
}
+/* Split "vec" on page boundaries into segments. FMR registers pages,
+ * not a byte range. Other modes coalesce these segments into a single
+ * MR when they can.
+ */
+static int
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
+ int n, int nsegs)
+{
+ size_t page_offset;
+ u32 remaining;
+ char *base;
+
+ base = vec->iov_base;
+ page_offset = offset_in_page(base);
+ remaining = vec->iov_len;
+ while (remaining && n < nsegs) {
+ seg[n].mr_page = NULL;
+ seg[n].mr_offset = base;
+ seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
+ remaining -= seg[n].mr_len;
+ base += seg[n].mr_len;
+ ++n;
+ page_offset = 0;
+ }
+ return n;
+}
+
/*
* Chunk assembly from upper layer xdr_buf.
*
@@ -150,11 +177,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
int page_base;
struct page **ppages;
- if (pos == 0 && xdrbuf->head[0].iov_len) {
- seg[n].mr_page = NULL;
- seg[n].mr_offset = xdrbuf->head[0].iov_base;
- seg[n].mr_len = xdrbuf->head[0].iov_len;
- ++n;
+ if (pos == 0) {
+ n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
+ if (n == nsegs)
+ return -EIO;
}
len = xdrbuf->page_len;
@@ -192,13 +218,9 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
* xdr pad bytes, saving the server an RDMA operation. */
if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
return n;
+ n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
if (n == nsegs)
- /* Tail remains, but we're out of segments */
return -EIO;
- seg[n].mr_page = NULL;
- seg[n].mr_offset = xdrbuf->tail[0].iov_base;
- seg[n].mr_len = xdrbuf->tail[0].iov_len;
- ++n;
}
return n;
@@ -773,20 +795,17 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
- int rdmalen, status;
+ int rdmalen, status, rmerr;
unsigned long cwnd;
- u32 credits;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
if (rep->rr_len == RPCRDMA_BAD_LEN)
goto out_badstatus;
- if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+ if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
goto out_shortreply;
headerp = rdmab_to_msg(rep->rr_rdmabuf);
- if (headerp->rm_vers != rpcrdma_version)
- goto out_badversion;
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
if (rpcrdma_is_bcall(headerp))
goto out_bcall;
@@ -809,15 +828,16 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
*/
list_del_init(&rqst->rq_list);
spin_unlock_bh(&xprt->transport_lock);
- dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
- " RPC request 0x%p xid 0x%08x\n",
- __func__, rep, req, rqst,
- be32_to_cpu(headerp->rm_xid));
+ dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
+ __func__, rep, req, be32_to_cpu(headerp->rm_xid));
/* from here on, the reply is no longer an orphan */
req->rl_reply = rep;
xprt->reestablish_timeout = 0;
+ if (headerp->rm_vers != rpcrdma_version)
+ goto out_badversion;
+
/* check for expected message types */
/* The order of some of these tests is important. */
switch (headerp->rm_type) {
@@ -878,6 +898,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
status = rdmalen;
break;
+ case rdma_error:
+ goto out_rdmaerr;
+
badheader:
default:
dprintk("%s: invalid rpcrdma reply header (type %d):"
@@ -893,6 +916,7 @@ badheader:
break;
}
+out:
/* Invalidate and flush the data payloads before waking the
* waiting application. This guarantees the memory region is
* properly fenced from the server before the application
@@ -903,15 +927,9 @@ badheader:
if (req->rl_nchunks)
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
- credits = be32_to_cpu(headerp->rm_credit);
- if (credits == 0)
- credits = 1; /* don't deadlock */
- else if (credits > r_xprt->rx_buf.rb_max_requests)
- credits = r_xprt->rx_buf.rb_max_requests;
-
spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd;
- xprt->cwnd = credits << RPC_CWNDSHIFT;
+ xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);
@@ -935,13 +953,43 @@ out_bcall:
return;
#endif
-out_shortreply:
- dprintk("RPC: %s: short/invalid reply\n", __func__);
- goto repost;
-
+/* If the incoming reply terminated a pending RPC, the next
+ * RPC call will post a replacement receive buffer as it is
+ * being marshaled.
+ */
out_badversion:
dprintk("RPC: %s: invalid version %d\n",
__func__, be32_to_cpu(headerp->rm_vers));
+ status = -EIO;
+ r_xprt->rx_stats.bad_reply_count++;
+ goto out;
+
+out_rdmaerr:
+ rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
+ switch (rmerr) {
+ case ERR_VERS:
+ pr_err("%s: server reports header version error (%u-%u)\n",
+ __func__,
+ be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
+ be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
+ break;
+ case ERR_CHUNK:
+ pr_err("%s: server reports header decoding error\n",
+ __func__);
+ break;
+ default:
+ pr_err("%s: server reports unknown error %d\n",
+ __func__, rmerr);
+ }
+ status = -EREMOTEIO;
+ r_xprt->rx_stats.bad_reply_count++;
+ goto out;
+
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
+out_shortreply:
+ dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
out_nomatch: