diff options
author | Chuck Lever <chuck.lever@oracle.com> | 2015-10-25 00:27:10 +0300 |
---|---|---|
committer | Anna Schumaker <Anna.Schumaker@Netapp.com> | 2015-11-02 21:45:15 +0300 |
commit | fe97b47cd623ebbaa55a163c336abc47153526d1 (patch) | |
tree | d936169799d99d924f92b385d1af31d62d083b90 /net/sunrpc/xprtrdma/verbs.c | |
parent | 1e465fd4ff475cc29c866ee75496c941b3908e69 (diff) | |
download | linux-fe97b47cd623ebbaa55a163c336abc47153526d1.tar.xz |
xprtrdma: Use workqueue to process RPC/RDMA replies
The reply tasklet is fast, but it's single threaded. After reply
traffic saturates a single CPU, there's no more reply processing
capacity.
Replace the tasklet with a workqueue to spread reply handling across
all CPUs. This also moves RPC/RDMA reply handling out of the soft
IRQ context and into a context that allows sleeps.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Sagi Grimberg <sagig@mellanox.com>
Tested-By: Devesh Sharma <devesh.sharma@avagotech.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc/xprtrdma/verbs.c')
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 54 |
1 files changed, 44 insertions, 10 deletions
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index c09f1b6c3f0a..5c20629544bb 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -100,6 +100,35 @@ rpcrdma_run_tasklet(unsigned long data) static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); +static struct workqueue_struct *rpcrdma_receive_wq; + +int +rpcrdma_alloc_wq(void) +{ + struct workqueue_struct *recv_wq; + + recv_wq = alloc_workqueue("xprtrdma_receive", + WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, + 0); + if (!recv_wq) + return -ENOMEM; + + rpcrdma_receive_wq = recv_wq; + return 0; +} + +void +rpcrdma_destroy_wq(void) +{ + struct workqueue_struct *wq; + + if (rpcrdma_receive_wq) { + wq = rpcrdma_receive_wq; + rpcrdma_receive_wq = NULL; + destroy_workqueue(wq); + } +} + static void rpcrdma_schedule_tasklet(struct list_head *sched_list) { @@ -196,7 +225,16 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) } static void -rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) +rpcrdma_receive_worker(struct work_struct *work) +{ + struct rpcrdma_rep *rep = + container_of(work, struct rpcrdma_rep, rr_work); + + rpcrdma_reply_handler(rep); +} + +static void +rpcrdma_recvcq_process_wc(struct ib_wc *wc) { struct rpcrdma_rep *rep = (struct rpcrdma_rep *)(unsigned long)wc->wr_id; @@ -219,8 +257,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) prefetch(rdmab_to_msg(rep->rr_rdmabuf)); out_schedule: - list_add_tail(&rep->rr_list, sched_list); + queue_work(rpcrdma_receive_wq, &rep->rr_work); return; + out_fail: if (wc->status != IB_WC_WR_FLUSH_ERR) pr_err("RPC: %s: rep %p: %s\n", @@ -239,7 +278,6 @@ static void rpcrdma_recvcq_poll(struct ib_cq *cq) { struct ib_wc *pos, wcs[4]; - LIST_HEAD(sched_list); int count, rc; do { @@ -251,10 +289,8 @@ rpcrdma_recvcq_poll(struct ib_cq *cq) count = rc; while (count-- > 0) - rpcrdma_recvcq_process_wc(pos++, &sched_list); + rpcrdma_recvcq_process_wc(pos++); } while (rc == ARRAY_SIZE(wcs)); - - rpcrdma_schedule_tasklet(&sched_list); } /* Handle provider receive completion upcalls. @@ -272,12 +308,9 @@ static void rpcrdma_flush_cqs(struct rpcrdma_ep *ep) { struct ib_wc wc; - LIST_HEAD(sched_list); while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_recvcq_process_wc(&wc, &sched_list); - if (!list_empty(&sched_list)) - rpcrdma_schedule_tasklet(&sched_list); + rpcrdma_recvcq_process_wc(&wc); while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) rpcrdma_sendcq_process_wc(&wc); } @@ -913,6 +946,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) rep->rr_device = ia->ri_device; rep->rr_rxprt = r_xprt; + INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); return rep; out_free: |