summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtsock.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r--net/sunrpc/xprtsock.c113
1 files changed, 65 insertions, 48 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 83e6f3316149..111767ab124a 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -642,6 +642,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf;
bool zerocopy = true;
+ bool vm_wait = false;
int status;
int sent;
@@ -677,15 +678,33 @@ static int xs_tcp_send_request(struct rpc_task *task)
return 0;
}
+ WARN_ON_ONCE(sent == 0 && status == 0);
+
+ if (status == -EAGAIN ) {
+ /*
+ * Return EAGAIN if we're sure we're hitting the
+ * socket send buffer limits.
+ */
+ if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
+ break;
+ /*
+ * Did we hit a memory allocation failure?
+ */
+ if (sent == 0) {
+ status = -ENOBUFS;
+ if (vm_wait)
+ break;
+ /* Retry, knowing now that we're below the
+ * socket send buffer limit
+ */
+ vm_wait = true;
+ }
+ continue;
+ }
if (status < 0)
break;
- if (sent == 0) {
- status = -EAGAIN;
- break;
- }
+ vm_wait = false;
}
- if (status == -EAGAIN && sk_stream_is_writeable(transport->inet))
- status = -ENOBUFS;
switch (status) {
case -ENOTSOCK:
@@ -755,11 +774,19 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s
sk->sk_error_report = transport->old_error_report;
}
+static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
+{
+ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+
+ clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+}
+
static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
{
smp_mb__before_atomic();
clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
clear_bit(XPRT_CLOSING, &xprt->state);
+ xs_sock_reset_state_flags(xprt);
smp_mb__after_atomic();
}
@@ -962,10 +989,13 @@ static void xs_local_data_receive(struct sock_xprt *transport)
goto out;
for (;;) {
skb = skb_recv_datagram(sk, 0, 1, &err);
- if (skb == NULL)
+ if (skb != NULL) {
+ xs_local_data_read_skb(&transport->xprt, sk, skb);
+ skb_free_datagram(sk, skb);
+ continue;
+ }
+ if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
break;
- xs_local_data_read_skb(&transport->xprt, sk, skb);
- skb_free_datagram(sk, skb);
}
out:
mutex_unlock(&transport->recv_mutex);
@@ -1043,10 +1073,13 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
goto out;
for (;;) {
skb = skb_recv_datagram(sk, 0, 1, &err);
- if (skb == NULL)
+ if (skb != NULL) {
+ xs_udp_data_read_skb(&transport->xprt, sk, skb);
+ skb_free_datagram(sk, skb);
+ continue;
+ }
+ if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
break;
- xs_udp_data_read_skb(&transport->xprt, sk, skb);
- skb_free_datagram(sk, skb);
}
out:
mutex_unlock(&transport->recv_mutex);
@@ -1074,7 +1107,14 @@ static void xs_data_ready(struct sock *sk)
if (xprt != NULL) {
struct sock_xprt *transport = container_of(xprt,
struct sock_xprt, xprt);
- queue_work(rpciod_workqueue, &transport->recv_worker);
+ transport->old_data_ready(sk);
+ /* Any data means we had a useful conversation, so
+ * then we don't need to delay the next reconnect
+ */
+ if (xprt->reestablish_timeout)
+ xprt->reestablish_timeout = 0;
+ if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+ queue_work(xprtiod_workqueue, &transport->recv_worker);
}
read_unlock_bh(&sk->sk_callback_lock);
}
@@ -1474,10 +1514,15 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
for (;;) {
lock_sock(sk);
read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
- release_sock(sk);
- if (read <= 0)
- break;
- total += read;
+ if (read <= 0) {
+ clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+ release_sock(sk);
+ if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+ break;
+ } else {
+ release_sock(sk);
+ total += read;
+ }
rd_desc.count = 65536;
}
out:
@@ -1493,34 +1538,6 @@ static void xs_tcp_data_receive_workfn(struct work_struct *work)
}
/**
- * xs_tcp_data_ready - "data ready" callback for TCP sockets
- * @sk: socket with data to read
- *
- */
-static void xs_tcp_data_ready(struct sock *sk)
-{
- struct sock_xprt *transport;
- struct rpc_xprt *xprt;
-
- dprintk("RPC: xs_tcp_data_ready...\n");
-
- read_lock_bh(&sk->sk_callback_lock);
- if (!(xprt = xprt_from_sock(sk)))
- goto out;
- transport = container_of(xprt, struct sock_xprt, xprt);
-
- /* Any data means we had a useful conversation, so
- * the we don't need to delay the next reconnect
- */
- if (xprt->reestablish_timeout)
- xprt->reestablish_timeout = 0;
- queue_work(rpciod_workqueue, &transport->recv_worker);
-
-out:
- read_unlock_bh(&sk->sk_callback_lock);
-}
-
-/**
* xs_tcp_state_change - callback to handle TCP socket state changes
* @sk: socket whose state has changed
*
@@ -2241,7 +2258,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
xs_save_old_callbacks(transport, sk);
sk->sk_user_data = xprt;
- sk->sk_data_ready = xs_tcp_data_ready;
+ sk->sk_data_ready = xs_data_ready;
sk->sk_state_change = xs_tcp_state_change;
sk->sk_write_space = xs_tcp_write_space;
sock_set_flag(sk, SOCK_FASYNC);
@@ -2380,7 +2397,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
/* Start by resetting any existing state */
xs_reset_transport(transport);
- queue_delayed_work(rpciod_workqueue,
+ queue_delayed_work(xprtiod_workqueue,
&transport->connect_worker,
xprt->reestablish_timeout);
xprt->reestablish_timeout <<= 1;
@@ -2390,7 +2407,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
} else {
dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
- queue_delayed_work(rpciod_workqueue,
+ queue_delayed_work(xprtiod_workqueue,
&transport->connect_worker, 0);
}
}