summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtsock.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r--net/sunrpc/xprtsock.c168
1 files changed, 111 insertions, 57 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 732d4b57411a..70e52f567b2a 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -562,10 +562,14 @@ xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
printk(KERN_WARNING "Callback slot table overflowed\n");
return -ESHUTDOWN;
}
+ if (transport->recv.copied && !req->rq_private_buf.len)
+ return -ESHUTDOWN;
ret = xs_read_stream_request(transport, msg, flags, req);
if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
xprt_complete_bc_request(req, transport->recv.copied);
+ else
+ req->rq_private_buf.len = transport->recv.copied;
return ret;
}
@@ -587,7 +591,7 @@ xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
/* Look up and lock the request corresponding to the given XID */
spin_lock(&xprt->queue_lock);
req = xprt_lookup_rqst(xprt, transport->recv.xid);
- if (!req) {
+ if (!req || (transport->recv.copied && !req->rq_private_buf.len)) {
msg->msg_flags |= MSG_TRUNC;
goto out;
}
@@ -599,6 +603,8 @@ xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
spin_lock(&xprt->queue_lock);
if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
xprt_complete_rqst(req->rq_task, transport->recv.copied);
+ else
+ req->rq_private_buf.len = transport->recv.copied;
xprt_unpin_rqst(req);
out:
spin_unlock(&xprt->queue_lock);
@@ -880,7 +886,7 @@ static int xs_nospace(struct rpc_rqst *req)
req->rq_slen);
/* Protect against races with write_space */
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->transport_lock);
/* Don't race with disconnect */
if (xprt_connected(xprt)) {
@@ -890,7 +896,7 @@ static int xs_nospace(struct rpc_rqst *req)
} else
ret = -ENOTCONN;
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->transport_lock);
/* Race breaker in case memory is freed before above code is called */
if (ret == -EAGAIN) {
@@ -909,6 +915,7 @@ static int xs_nospace(struct rpc_rqst *req)
static void
xs_stream_prepare_request(struct rpc_rqst *req)
{
+ xdr_free_bvec(&req->rq_rcv_buf);
req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_KERNEL);
}
@@ -950,6 +957,8 @@ static int xs_local_send_request(struct rpc_rqst *req)
struct sock_xprt *transport =
container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf;
+ rpc_fraghdr rm = xs_stream_record_marker(xdr);
+ unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen;
int status;
int sent = 0;
@@ -964,9 +973,7 @@ static int xs_local_send_request(struct rpc_rqst *req)
req->rq_xtime = ktime_get();
status = xs_sendpages(transport->sock, NULL, 0, xdr,
- transport->xmit.offset,
- xs_stream_record_marker(xdr),
- &sent);
+ transport->xmit.offset, rm, &sent);
dprintk("RPC: %s(%u) = %d\n",
__func__, xdr->len - transport->xmit.offset, status);
@@ -976,7 +983,7 @@ static int xs_local_send_request(struct rpc_rqst *req)
if (likely(sent > 0) || status == 0) {
transport->xmit.offset += sent;
req->rq_bytes_sent = transport->xmit.offset;
- if (likely(req->rq_bytes_sent >= req->rq_slen)) {
+ if (likely(req->rq_bytes_sent >= msglen)) {
req->rq_xmit_bytes_sent += transport->xmit.offset;
transport->xmit.offset = 0;
return 0;
@@ -1097,6 +1104,8 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
struct rpc_xprt *xprt = req->rq_xprt;
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf;
+ rpc_fraghdr rm = xs_stream_record_marker(xdr);
+ unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen;
bool vm_wait = false;
int status;
int sent;
@@ -1122,9 +1131,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
while (1) {
sent = 0;
status = xs_sendpages(transport->sock, NULL, 0, xdr,
- transport->xmit.offset,
- xs_stream_record_marker(xdr),
- &sent);
+ transport->xmit.offset, rm, &sent);
dprintk("RPC: xs_tcp_send_request(%u) = %d\n",
xdr->len - transport->xmit.offset, status);
@@ -1133,7 +1140,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
* reset the count of bytes sent. */
transport->xmit.offset += sent;
req->rq_bytes_sent = transport->xmit.offset;
- if (likely(req->rq_bytes_sent >= req->rq_slen)) {
+ if (likely(req->rq_bytes_sent >= msglen)) {
req->rq_xmit_bytes_sent += transport->xmit.offset;
transport->xmit.offset = 0;
return 0;
@@ -1211,6 +1218,15 @@ static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+ clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state);
+ clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state);
+ clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state);
+}
+
+static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr)
+{
+ set_bit(nr, &transport->sock_state);
+ queue_work(xprtiod_workqueue, &transport->error_worker);
}
static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
@@ -1231,20 +1247,24 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
*/
static void xs_error_report(struct sock *sk)
{
+ struct sock_xprt *transport;
struct rpc_xprt *xprt;
- int err;
read_lock_bh(&sk->sk_callback_lock);
if (!(xprt = xprt_from_sock(sk)))
goto out;
- err = -sk->sk_err;
- if (err == 0)
+ transport = container_of(xprt, struct sock_xprt, xprt);
+ transport->xprt_err = -sk->sk_err;
+ if (transport->xprt_err == 0)
goto out;
dprintk("RPC: xs_error_report client %p, error=%d...\n",
- xprt, -err);
- trace_rpc_socket_error(xprt, sk->sk_socket, err);
- xprt_wake_pending_tasks(xprt, err);
+ xprt, -transport->xprt_err);
+ trace_rpc_socket_error(xprt, sk->sk_socket, transport->xprt_err);
+
+ /* barrier ensures xprt_err is set before XPRT_SOCK_WAKE_ERROR */
+ smp_mb__before_atomic();
+ xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR);
out:
read_unlock_bh(&sk->sk_callback_lock);
}
@@ -1333,6 +1353,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
cancel_delayed_work_sync(&transport->connect_worker);
xs_close(xprt);
cancel_work_sync(&transport->recv_worker);
+ cancel_work_sync(&transport->error_worker);
xs_xprt_free(xprt);
module_put(THIS_MODULE);
}
@@ -1386,9 +1407,9 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
}
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, copied);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->transport_lock);
spin_lock(&xprt->queue_lock);
xprt_complete_rqst(task, copied);
__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
@@ -1498,7 +1519,6 @@ static void xs_tcp_state_change(struct sock *sk)
trace_rpc_socket_state_change(xprt, sk->sk_socket);
switch (sk->sk_state) {
case TCP_ESTABLISHED:
- spin_lock(&xprt->transport_lock);
if (!xprt_test_and_set_connected(xprt)) {
xprt->connect_cookie++;
clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
@@ -1507,9 +1527,8 @@ static void xs_tcp_state_change(struct sock *sk)
xprt->stat.connect_count++;
xprt->stat.connect_time += (long)jiffies -
xprt->stat.connect_start;
- xprt_wake_pending_tasks(xprt, -EAGAIN);
+ xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
}
- spin_unlock(&xprt->transport_lock);
break;
case TCP_FIN_WAIT1:
/* The client initiated a shutdown of the socket */
@@ -1525,7 +1544,7 @@ static void xs_tcp_state_change(struct sock *sk)
/* The server initiated a shutdown of the socket */
xprt->connect_cookie++;
clear_bit(XPRT_CONNECTED, &xprt->state);
- xs_tcp_force_close(xprt);
+ xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
/* fall through */
case TCP_CLOSING:
/*
@@ -1547,7 +1566,7 @@ static void xs_tcp_state_change(struct sock *sk)
xprt_clear_connecting(xprt);
clear_bit(XPRT_CLOSING, &xprt->state);
/* Trigger the socket release */
- xs_tcp_force_close(xprt);
+ xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
}
out:
read_unlock_bh(&sk->sk_callback_lock);
@@ -1556,6 +1575,7 @@ static void xs_tcp_state_change(struct sock *sk)
static void xs_write_space(struct sock *sk)
{
struct socket_wq *wq;
+ struct sock_xprt *transport;
struct rpc_xprt *xprt;
if (!sk->sk_socket)
@@ -1564,13 +1584,14 @@ static void xs_write_space(struct sock *sk)
if (unlikely(!(xprt = xprt_from_sock(sk))))
return;
+ transport = container_of(xprt, struct sock_xprt, xprt);
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0)
goto out;
- if (xprt_write_space(xprt))
- sk->sk_write_pending--;
+ xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
+ sk->sk_write_pending--;
out:
rcu_read_unlock();
}
@@ -1664,9 +1685,9 @@ static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t
*/
static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
{
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, -ETIMEDOUT);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->transport_lock);
}
static int xs_get_random_port(void)
@@ -2017,6 +2038,7 @@ static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task)
* we'll need to figure out how to pass a namespace to
* connect.
*/
+ task->tk_rpc_status = -ENOTCONN;
rpc_exit(task, -ENOTCONN);
return;
}
@@ -2200,13 +2222,13 @@ static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
unsigned int opt_on = 1;
unsigned int timeo;
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->transport_lock);
keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ);
keepcnt = xprt->timeout->to_retries + 1;
timeo = jiffies_to_msecs(xprt->timeout->to_initval) *
(xprt->timeout->to_retries + 1);
clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->transport_lock);
/* TCP Keepalive options */
kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
@@ -2231,7 +2253,7 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt,
struct rpc_timeout to;
unsigned long initval;
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->transport_lock);
if (reconnect_timeout < xprt->max_reconnect_timeout)
xprt->max_reconnect_timeout = reconnect_timeout;
if (connect_timeout < xprt->connect_timeout) {
@@ -2248,7 +2270,7 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt,
xprt->connect_timeout = connect_timeout;
}
set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->transport_lock);
}
static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
@@ -2401,25 +2423,6 @@ out:
xprt_wake_pending_tasks(xprt, status);
}
-static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt)
-{
- unsigned long start, now = jiffies;
-
- start = xprt->stat.connect_start + xprt->reestablish_timeout;
- if (time_after(start, now))
- return start - now;
- return 0;
-}
-
-static void xs_reconnect_backoff(struct rpc_xprt *xprt)
-{
- xprt->reestablish_timeout <<= 1;
- if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
- xprt->reestablish_timeout = xprt->max_reconnect_timeout;
- if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
- xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
-}
-
/**
* xs_connect - connect a socket to a remote endpoint
* @xprt: pointer to transport structure
@@ -2449,8 +2452,8 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
/* Start by resetting any existing state */
xs_reset_transport(transport);
- delay = xs_reconnect_delay(xprt);
- xs_reconnect_backoff(xprt);
+ delay = xprt_reconnect_delay(xprt);
+ xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO);
} else
dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
@@ -2460,6 +2463,53 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
delay);
}
+static void xs_wake_disconnect(struct sock_xprt *transport)
+{
+ if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state))
+ xs_tcp_force_close(&transport->xprt);
+}
+
+static void xs_wake_write(struct sock_xprt *transport)
+{
+ if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state))
+ xprt_write_space(&transport->xprt);
+}
+
+static void xs_wake_error(struct sock_xprt *transport)
+{
+ int sockerr;
+
+ if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
+ return;
+ mutex_lock(&transport->recv_mutex);
+ if (transport->sock == NULL)
+ goto out;
+ if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
+ goto out;
+ sockerr = xchg(&transport->xprt_err, 0);
+ if (sockerr < 0)
+ xprt_wake_pending_tasks(&transport->xprt, sockerr);
+out:
+ mutex_unlock(&transport->recv_mutex);
+}
+
+static void xs_wake_pending(struct sock_xprt *transport)
+{
+ if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state))
+ xprt_wake_pending_tasks(&transport->xprt, -EAGAIN);
+}
+
+static void xs_error_handle(struct work_struct *work)
+{
+ struct sock_xprt *transport = container_of(work,
+ struct sock_xprt, error_worker);
+
+ xs_wake_disconnect(transport);
+ xs_wake_write(transport);
+ xs_wake_error(transport);
+ xs_wake_pending(transport);
+}
+
/**
* xs_local_print_stats - display AF_LOCAL socket-specifc stats
* @xprt: rpc_xprt struct containing statistics
@@ -2690,7 +2740,7 @@ static const struct rpc_xprt_ops xs_local_ops = {
.buf_free = rpc_free,
.prepare_request = xs_stream_prepare_request,
.send_request = xs_local_send_request,
- .set_retrans_timeout = xprt_set_retrans_timeout_def,
+ .wait_for_reply_request = xprt_wait_for_reply_request_def,
.close = xs_close,
.destroy = xs_destroy,
.print_stats = xs_local_print_stats,
@@ -2710,7 +2760,7 @@ static const struct rpc_xprt_ops xs_udp_ops = {
.buf_alloc = rpc_malloc,
.buf_free = rpc_free,
.send_request = xs_udp_send_request,
- .set_retrans_timeout = xprt_set_retrans_timeout_rtt,
+ .wait_for_reply_request = xprt_wait_for_reply_request_rtt,
.timer = xs_udp_timer,
.release_request = xprt_release_rqst_cong,
.close = xs_close,
@@ -2733,7 +2783,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
.buf_free = rpc_free,
.prepare_request = xs_stream_prepare_request,
.send_request = xs_tcp_send_request,
- .set_retrans_timeout = xprt_set_retrans_timeout_def,
+ .wait_for_reply_request = xprt_wait_for_reply_request_def,
.close = xs_tcp_shutdown,
.destroy = xs_destroy,
.set_connect_timeout = xs_tcp_set_connect_timeout,
@@ -2744,6 +2794,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
#ifdef CONFIG_SUNRPC_BACKCHANNEL
.bc_setup = xprt_setup_bc,
.bc_maxpayload = xs_tcp_bc_maxpayload,
+ .bc_num_slots = xprt_bc_max_slots,
.bc_free_rqst = xprt_free_bc_rqst,
.bc_destroy = xprt_destroy_bc,
#endif
@@ -2761,7 +2812,7 @@ static const struct rpc_xprt_ops bc_tcp_ops = {
.buf_alloc = bc_malloc,
.buf_free = bc_free,
.send_request = bc_send_request,
- .set_retrans_timeout = xprt_set_retrans_timeout_def,
+ .wait_for_reply_request = xprt_wait_for_reply_request_def,
.close = bc_close,
.destroy = bc_destroy,
.print_stats = xs_tcp_print_stats,
@@ -2872,6 +2923,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
xprt->timeout = &xs_local_default_timeout;
INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
+ INIT_WORK(&transport->error_worker, xs_error_handle);
INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
switch (sun->sun_family) {
@@ -2942,6 +2994,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
xprt->timeout = &xs_udp_default_timeout;
INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
+ INIT_WORK(&transport->error_worker, xs_error_handle);
INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
switch (addr->sa_family) {
@@ -3023,6 +3076,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
(xprt->timeout->to_retries + 1);
INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
+ INIT_WORK(&transport->error_worker, xs_error_handle);
INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
switch (addr->sa_family) {