summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/verbs.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/verbs.c')
-rw-r--r--net/sunrpc/xprtrdma/verbs.c194
1 files changed, 110 insertions, 84 deletions
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 3a907537e2cf..77c7dd7f05e8 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -74,17 +74,17 @@
/*
* internal functions
*/
-static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_sendctx *sc);
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
-static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
-static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
+static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags);
static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
-static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
/* Wait for outstanding transport work to finish. ib_drain_qp
* handles the drains in the wrong order for us, so open code
@@ -125,7 +125,7 @@ rpcrdma_qp_event_handler(struct ib_event *event, void *context)
/**
* rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
- * @cq: completion queue (ignored)
+ * @cq: completion queue
* @wc: completed WR
*
*/
@@ -138,7 +138,7 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_send(sc, wc);
- rpcrdma_sendctx_put_locked(sc);
+ rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc);
}
/**
@@ -170,7 +170,6 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
rdmab_addr(rep->rr_rdmabuf),
wc->byte_len, DMA_FROM_DEVICE);
- rpcrdma_post_recvs(r_xprt, false);
rpcrdma_reply_handler(rep);
return;
@@ -178,11 +177,11 @@ out_flushed:
rpcrdma_recv_buffer_put(rep);
}
-static void
-rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
- struct rdma_conn_param *param)
+static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt,
+ struct rdma_conn_param *param)
{
const struct rpcrdma_connect_private *pmsg = param->private_data;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
unsigned int rsize, wsize;
/* Default settings for RPC-over-RDMA Version One */
@@ -198,13 +197,11 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
}
- if (rsize < r_xprt->rx_ep.rep_inline_recv)
- r_xprt->rx_ep.rep_inline_recv = rsize;
- if (wsize < r_xprt->rx_ep.rep_inline_send)
- r_xprt->rx_ep.rep_inline_send = wsize;
- dprintk("RPC: %s: max send %u, max recv %u\n", __func__,
- r_xprt->rx_ep.rep_inline_send,
- r_xprt->rx_ep.rep_inline_recv);
+ if (rsize < ep->rep_inline_recv)
+ ep->rep_inline_recv = rsize;
+ if (wsize < ep->rep_inline_send)
+ ep->rep_inline_send = wsize;
+
rpcrdma_set_max_header_sizes(r_xprt);
}
@@ -258,7 +255,8 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
case RDMA_CM_EVENT_ESTABLISHED:
++xprt->connect_cookie;
ep->rep_connected = 1;
- rpcrdma_update_connect_private(r_xprt, &event->param.conn);
+ rpcrdma_update_cm_private(r_xprt, &event->param.conn);
+ trace_xprtrdma_inline_thresh(r_xprt);
wake_up_all(&ep->rep_connect_wait);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
@@ -298,8 +296,6 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
struct rdma_cm_id *id;
int rc;
- trace_xprtrdma_conn_start(xprt);
-
init_completion(&ia->ri_done);
init_completion(&ia->ri_remove_done);
@@ -315,10 +311,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
if (rc)
goto out;
rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
- if (rc < 0) {
- trace_xprtrdma_conn_tout(xprt);
+ if (rc < 0)
goto out;
- }
rc = ia->ri_async_rc;
if (rc)
@@ -329,10 +323,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
if (rc)
goto out;
rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
- if (rc < 0) {
- trace_xprtrdma_conn_tout(xprt);
+ if (rc < 0)
goto out;
- }
rc = ia->ri_async_rc;
if (rc)
goto out;
@@ -409,8 +401,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
- cancel_work_sync(&buf->rb_refresh_worker);
-
/* This is similar to rpcrdma_ep_destroy, but:
* - Don't cancel the connect worker.
* - Don't call rpcrdma_ep_disconnect, which waits
@@ -437,7 +427,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
}
- rpcrdma_mrs_destroy(buf);
+ rpcrdma_mrs_destroy(r_xprt);
ib_dealloc_pd(ia->ri_pd);
ia->ri_pd = NULL;
@@ -522,7 +512,7 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
init_waitqueue_head(&ep->rep_connect_wait);
ep->rep_receive_count = 0;
- sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
+ sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt,
ep->rep_attr.cap.max_send_wr + 1,
IB_POLL_WORKQUEUE);
if (IS_ERR(sendcq)) {
@@ -630,8 +620,6 @@ static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
goto out3;
}
-
- rpcrdma_mrs_create(r_xprt);
return 0;
out3:
@@ -649,8 +637,6 @@ static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
struct rdma_cm_id *id, *old;
int err, rc;
- trace_xprtrdma_reconnect(r_xprt);
-
rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
rc = -EHOSTUNREACH;
@@ -705,7 +691,6 @@ retry:
memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
switch (ep->rep_connected) {
case 0:
- dprintk("RPC: %s: connecting...\n", __func__);
rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
if (rc) {
rc = -ENETUNREACH;
@@ -726,6 +711,7 @@ retry:
ep->rep_connected = 0;
xprt_clear_connected(xprt);
+ rpcrdma_reset_cwnd(r_xprt);
rpcrdma_post_recvs(r_xprt, true);
rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
@@ -742,13 +728,14 @@ retry:
goto out;
}
- dprintk("RPC: %s: connected\n", __func__);
+ rpcrdma_mrs_create(r_xprt);
out:
if (rc)
ep->rep_connected = rc;
out_noupdate:
+ trace_xprtrdma_connect(r_xprt, rc);
return rc;
}
@@ -757,11 +744,8 @@ out_noupdate:
* @ep: endpoint to disconnect
* @ia: associated interface adapter
*
- * This is separate from destroy to facilitate the ability
- * to reconnect without recreating the endpoint.
- *
- * This call is not reentrant, and must not be made in parallel
- * on the same endpoint.
+ * Caller serializes. Either the transport send lock is held,
+ * or we're being called to destroy the transport.
*/
void
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
@@ -780,6 +764,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
trace_xprtrdma_disconnect(r_xprt, rc);
rpcrdma_xprt_drain(r_xprt);
+ rpcrdma_reqs_reset(r_xprt);
+ rpcrdma_mrs_destroy(r_xprt);
}
/* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -817,9 +803,6 @@ static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
if (!sc)
return NULL;
- sc->sc_wr.wr_cqe = &sc->sc_cqe;
- sc->sc_wr.sg_list = sc->sc_sges;
- sc->sc_wr.opcode = IB_WR_SEND;
sc->sc_cqe.done = rpcrdma_wc_send;
return sc;
}
@@ -847,7 +830,6 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
if (!sc)
return -ENOMEM;
- sc->sc_xprt = r_xprt;
buf->rb_sc_ctxs[i] = sc;
}
@@ -910,6 +892,7 @@ out_emptyq:
/**
* rpcrdma_sendctx_put_locked - Release a send context
+ * @r_xprt: controlling transport instance
* @sc: send context to release
*
* Usage: Called from Send completion to return a sendctxt
@@ -917,10 +900,10 @@ out_emptyq:
*
* The caller serializes calls to this function (per transport).
*/
-static void
-rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
+static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_sendctx *sc)
{
- struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
unsigned long next_tail;
/* Unmap SGEs of previously completed but unsignaled
@@ -938,7 +921,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
/* Paired with READ_ONCE */
smp_store_release(&buf->rb_sc_tail, next_tail);
- xprt_write_space(&sc->sc_xprt->rx_xprt);
+ xprt_write_space(&r_xprt->rx_xprt);
}
static void
@@ -965,7 +948,7 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
mr->mr_xprt = r_xprt;
spin_lock(&buf->rb_lock);
- list_add(&mr->mr_list, &buf->rb_mrs);
+ rpcrdma_mr_push(mr, &buf->rb_mrs);
list_add(&mr->mr_all, &buf->rb_all_mrs);
spin_unlock(&buf->rb_lock);
}
@@ -987,6 +970,28 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
}
/**
+ * rpcrdma_mrs_refresh - Wake the MR refresh worker
+ * @r_xprt: controlling transport instance
+ *
+ */
+void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+
+ /* If there is no underlying device, it's no use to
+ * wake the refresh worker.
+ */
+ if (ep->rep_connected != -ENODEV) {
+ /* The work is scheduled on a WQ_MEM_RECLAIM
+ * workqueue in order to prevent MR allocation
+ * from recursing into NFS during direct reclaim.
+ */
+ queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
+ }
+}
+
+/**
* rpcrdma_req_create - Allocate an rpcrdma_req object
* @r_xprt: controlling r_xprt
* @size: initial size, in bytes, of send and receive buffers
@@ -1042,6 +1047,26 @@ out1:
return NULL;
}
+/**
+ * rpcrdma_reqs_reset - Reset all reqs owned by a transport
+ * @r_xprt: controlling transport instance
+ *
+ * ASSUMPTION: the rb_allreqs list is stable for the duration,
+ * and thus can be walked without holding rb_lock. Eg. the
+ * caller is holding the transport send lock to exclude
+ * device removal or disconnection.
+ */
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_req *req;
+
+ list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
+ /* Credits are valid only for one connection */
+ req->rl_slot.rq_cong = 0;
+ }
+}
+
static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
bool temp)
{
@@ -1125,8 +1150,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
INIT_LIST_HEAD(&buf->rb_all_mrs);
INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
- rpcrdma_mrs_create(r_xprt);
-
INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
@@ -1134,14 +1157,13 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
- req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
+ req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
GFP_KERNEL);
if (!req)
goto out;
list_add(&req->rl_list, &buf->rb_send_bufs);
}
- buf->rb_credits = 1;
init_llist_head(&buf->rb_free_reps);
rc = rpcrdma_sendctxs_create(r_xprt);
@@ -1158,15 +1180,24 @@ out:
* rpcrdma_req_destroy - Destroy an rpcrdma_req object
* @req: unused object to be destroyed
*
- * This function assumes that the caller prevents concurrent device
- * unload and transport tear-down.
+ * Relies on caller holding the transport send lock to protect
+ * removing req->rl_all from buf->rb_all_reqs safely.
*/
void rpcrdma_req_destroy(struct rpcrdma_req *req)
{
+ struct rpcrdma_mr *mr;
+
list_del(&req->rl_all);
- while (!list_empty(&req->rl_free_mrs))
- rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
+ while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
+ struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
+
+ spin_lock(&buf->rb_lock);
+ list_del(&mr->mr_all);
+ spin_unlock(&buf->rb_lock);
+
+ frwr_release_mr(mr);
+ }
rpcrdma_regbuf_free(req->rl_recvbuf);
rpcrdma_regbuf_free(req->rl_sendbuf);
@@ -1174,28 +1205,33 @@ void rpcrdma_req_destroy(struct rpcrdma_req *req)
kfree(req);
}
-static void
-rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
+/**
+ * rpcrdma_mrs_destroy - Release all of a transport's MRs
+ * @r_xprt: controlling transport instance
+ *
+ * Relies on caller holding the transport send lock to protect
+ * removing mr->mr_list from req->rl_free_mrs safely.
+ */
+static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
- rx_buf);
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_mr *mr;
- unsigned int count;
- count = 0;
+ cancel_work_sync(&buf->rb_refresh_worker);
+
spin_lock(&buf->rb_lock);
while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
struct rpcrdma_mr,
mr_all)) != NULL) {
+ list_del(&mr->mr_list);
list_del(&mr->mr_all);
spin_unlock(&buf->rb_lock);
frwr_release_mr(mr);
- count++;
+
spin_lock(&buf->rb_lock);
}
spin_unlock(&buf->rb_lock);
- r_xprt->rx_stats.mrs_allocated = 0;
}
/**
@@ -1209,8 +1245,6 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
- cancel_work_sync(&buf->rb_refresh_worker);
-
rpcrdma_sendctxs_destroy(buf);
rpcrdma_reps_destroy(buf);
@@ -1222,8 +1256,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
list_del(&req->rl_list);
rpcrdma_req_destroy(req);
}
-
- rpcrdma_mrs_destroy(buf);
}
/**
@@ -1264,17 +1296,6 @@ void rpcrdma_mr_put(struct rpcrdma_mr *mr)
rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
}
-static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
-{
- struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-
- mr->mr_req = NULL;
- spin_lock(&buf->rb_lock);
- rpcrdma_mr_push(mr, &buf->rb_mrs);
- spin_unlock(&buf->rb_lock);
-}
-
/**
* rpcrdma_buffer_get - Get a request buffer
* @buffers: Buffer pool from which to obtain a buffer
@@ -1437,7 +1458,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
- struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
+ struct ib_send_wr *send_wr = &req->rl_wr;
int rc;
if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
@@ -1455,8 +1476,13 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
return 0;
}
-static void
-rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
+/**
+ * rpcrdma_post_recvs - Refill the Receive Queue
+ * @r_xprt: controlling transport instance
+ * @temp: mark Receive buffers to be deleted after use
+ *
+ */
+void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;