From 42d42a5b0cd263757f8e519debbc744fdaefdaf7 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 23 May 2016 09:24:55 -0400 Subject: SUNRPC: Small optimisation of client receive Do not queue the client receive work if we're still processing. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 44 +++++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 11 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 2d3e0c42361e..2f2178027761 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -755,11 +755,19 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s sk->sk_error_report = transport->old_error_report; } +static void xs_sock_reset_state_flags(struct rpc_xprt *xprt) +{ + struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); +} + static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) { smp_mb__before_atomic(); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); clear_bit(XPRT_CLOSING, &xprt->state); + xs_sock_reset_state_flags(xprt); smp_mb__after_atomic(); } @@ -962,10 +970,13 @@ static void xs_local_data_receive(struct sock_xprt *transport) goto out; for (;;) { skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) + if (skb != NULL) { + xs_local_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + continue; + } + if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break; - xs_local_data_read_skb(&transport->xprt, sk, skb); - skb_free_datagram(sk, skb); } out: mutex_unlock(&transport->recv_mutex); @@ -1043,10 +1054,13 @@ static void xs_udp_data_receive(struct sock_xprt *transport) goto out; for (;;) { skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) + if (skb != NULL) { + xs_udp_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + continue; + } + if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break; - xs_udp_data_read_skb(&transport->xprt, sk, skb); - skb_free_datagram(sk, skb); } out: mutex_unlock(&transport->recv_mutex); @@ -1074,7 +1088,8 @@ static void xs_data_ready(struct sock *sk) if (xprt != NULL) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - queue_work(rpciod_workqueue, &transport->recv_worker); + if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + queue_work(rpciod_workqueue, &transport->recv_worker); } read_unlock_bh(&sk->sk_callback_lock); } @@ -1474,10 +1489,15 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) for (;;) { lock_sock(sk); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - release_sock(sk); - if (read <= 0) - break; - total += read; + if (read <= 0) { + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); + release_sock(sk); + if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + break; + } else { + release_sock(sk); + total += read; + } rd_desc.count = 65536; } out: @@ -1508,6 +1528,8 @@ static void xs_tcp_data_ready(struct sock *sk) if (!(xprt = xprt_from_sock(sk))) goto out; transport = container_of(xprt, struct sock_xprt, xprt); + if (test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + goto out; /* Any data means we had a useful conversation, so * the we don't need to delay the next reconnect -- cgit v1.2.3 From 5157b956961d78effd78399e1574b08b9b618422 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sun, 29 May 2016 10:13:24 -0400 Subject: SUNRPC: Consolidate xs_tcp_data_ready and xs_data_ready The only difference between the two at this point is the reset of the connection timeout, and since everyone expect tcp ignore that value, we can just throw it into the generic function. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 38 +++++++------------------------------- 1 file changed, 7 insertions(+), 31 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 2f2178027761..62b4f5a2a331 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1088,6 +1088,12 @@ static void xs_data_ready(struct sock *sk) if (xprt != NULL) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + transport->old_data_ready(sk); + /* Any data means we had a useful conversation, so + * then we don't need to delay the next reconnect + */ + if (xprt->reestablish_timeout) + xprt->reestablish_timeout = 0; if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) queue_work(rpciod_workqueue, &transport->recv_worker); } @@ -1512,36 +1518,6 @@ static void xs_tcp_data_receive_workfn(struct work_struct *work) xs_tcp_data_receive(transport); } -/** - * xs_tcp_data_ready - "data ready" callback for TCP sockets - * @sk: socket with data to read - * - */ -static void xs_tcp_data_ready(struct sock *sk) -{ - struct sock_xprt *transport; - struct rpc_xprt *xprt; - - dprintk("RPC: xs_tcp_data_ready...\n"); - - read_lock_bh(&sk->sk_callback_lock); - if (!(xprt = xprt_from_sock(sk))) - goto out; - transport = container_of(xprt, struct sock_xprt, xprt); - if (test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) - goto out; - - /* Any data means we had a useful conversation, so - * the we don't need to delay the next reconnect - */ - if (xprt->reestablish_timeout) - xprt->reestablish_timeout = 0; - queue_work(rpciod_workqueue, &transport->recv_worker); - -out: - read_unlock_bh(&sk->sk_callback_lock); -} - /** * xs_tcp_state_change - callback to handle TCP socket state changes * @sk: socket whose state has changed @@ -2263,7 +2239,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_tcp_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_state_change = xs_tcp_state_change; sk->sk_write_space = xs_tcp_write_space; sock_set_flag(sk, SOCK_FASYNC); -- cgit v1.2.3 From 40a5f1b19bacb2de7a051be952dee85e38c9e5f5 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 27 May 2016 10:39:50 -0400 Subject: SUNRPC: RPC transport queue must be low latency rpciod can easily get congested due to the long list of queued rpc_tasks. Having the receive queue wait in turn for those tasks to complete can therefore be a bottleneck. Address the problem by separating the workqueues into: - rpciod: manages rpc_tasks - xprtiod: manages transport related work. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/sched.h | 1 + net/sunrpc/sched.c | 24 ++++++++++++++++++++---- net/sunrpc/xprt.c | 8 ++++---- net/sunrpc/xprtsock.c | 6 +++--- 4 files changed, 28 insertions(+), 11 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h index 05a1809c44d9..ef780b3b5e31 100644 --- a/include/linux/sunrpc/sched.h +++ b/include/linux/sunrpc/sched.h @@ -247,6 +247,7 @@ void rpc_show_tasks(struct net *); int rpc_init_mempool(void); void rpc_destroy_mempool(void); extern struct workqueue_struct *rpciod_workqueue; +extern struct workqueue_struct *xprtiod_workqueue; void rpc_prepare_task(struct rpc_task *task); static inline int rpc_wait_for_completion_task(struct rpc_task *task) diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index fcfd48d263f6..a9f786247ffb 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue; /* * rpciod-related stuff */ -struct workqueue_struct *rpciod_workqueue; +struct workqueue_struct *rpciod_workqueue __read_mostly; +struct workqueue_struct *xprtiod_workqueue __read_mostly; /* * Disable the timer for a given RPC task. Should be called with @@ -1071,10 +1072,22 @@ static int rpciod_start(void) * Create the rpciod thread and wait for it to start. */ dprintk("RPC: creating workqueue rpciod\n"); - /* Note: highpri because network receive is latency sensitive */ - wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); + wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0); + if (!wq) + goto out_failed; rpciod_workqueue = wq; - return rpciod_workqueue != NULL; + /* Note: highpri because network receive is latency sensitive */ + wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); + if (!wq) + goto free_rpciod; + xprtiod_workqueue = wq; + return 1; +free_rpciod: + wq = rpciod_workqueue; + rpciod_workqueue = NULL; + destroy_workqueue(wq); +out_failed: + return 0; } static void rpciod_stop(void) @@ -1088,6 +1101,9 @@ static void rpciod_stop(void) wq = rpciod_workqueue; rpciod_workqueue = NULL; destroy_workqueue(wq); + wq = xprtiod_workqueue; + xprtiod_workqueue = NULL; + destroy_workqueue(wq); } void diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 216a1385718a..71df082b84a9 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt) clear_bit(XPRT_LOCKED, &xprt->state); smp_mb__after_atomic(); } else - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); } /* @@ -645,7 +645,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) set_bit(XPRT_CLOSE_WAIT, &xprt->state); /* Try to schedule an autoclose RPC call */ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); xprt_wake_pending_tasks(xprt, -EAGAIN); spin_unlock_bh(&xprt->transport_lock); } @@ -672,7 +672,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) set_bit(XPRT_CLOSE_WAIT, &xprt->state); /* Try to schedule an autoclose RPC call */ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); xprt_wake_pending_tasks(xprt, -EAGAIN); out: spin_unlock_bh(&xprt->transport_lock); @@ -689,7 +689,7 @@ xprt_init_autodisconnect(unsigned long data) if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) goto out_abort; spin_unlock(&xprt->transport_lock); - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); return; out_abort: spin_unlock(&xprt->transport_lock); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 62b4f5a2a331..646170d0cb86 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1095,7 +1095,7 @@ static void xs_data_ready(struct sock *sk) if (xprt->reestablish_timeout) xprt->reestablish_timeout = 0; if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) - queue_work(rpciod_workqueue, &transport->recv_worker); + queue_work(xprtiod_workqueue, &transport->recv_worker); } read_unlock_bh(&sk->sk_callback_lock); } @@ -2378,7 +2378,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) /* Start by resetting any existing state */ xs_reset_transport(transport); - queue_delayed_work(rpciod_workqueue, + queue_delayed_work(xprtiod_workqueue, &transport->connect_worker, xprt->reestablish_timeout); xprt->reestablish_timeout <<= 1; @@ -2388,7 +2388,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; } else { dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); - queue_delayed_work(rpciod_workqueue, + queue_delayed_work(xprtiod_workqueue, &transport->connect_worker, 0); } } -- cgit v1.2.3 From 9ffadfbc092fc25d9639a019fb3079cf352ef978 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sun, 29 May 2016 00:42:03 -0400 Subject: SUNRPC: Fix suspicious enobufs issues. The current test is racy when dealing with fast NICs. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 646170d0cb86..6b3efeb3edc5 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -642,6 +642,7 @@ static int xs_tcp_send_request(struct rpc_task *task) struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; bool zerocopy = true; + bool vm_wait = false; int status; int sent; @@ -677,15 +678,33 @@ static int xs_tcp_send_request(struct rpc_task *task) return 0; } + WARN_ON_ONCE(sent == 0 && status == 0); + + if (status == -EAGAIN ) { + /* + * Return EAGAIN if we're sure we're hitting the + * socket send buffer limits. + */ + if (test_bit(SOCK_NOSPACE, &transport->sock->flags)) + break; + /* + * Did we hit a memory allocation failure? + */ + if (sent == 0) { + status = -ENOBUFS; + if (vm_wait) + break; + /* Retry, knowing now that we're below the + * socket send buffer limit + */ + vm_wait = true; + } + continue; + } if (status < 0) break; - if (sent == 0) { - status = -EAGAIN; - break; - } + vm_wait = false; } - if (status == -EAGAIN && sk_stream_is_writeable(transport->inet)) - status = -ENOBUFS; switch (status) { case -ENOTSOCK: -- cgit v1.2.3