diff options
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r-- | net/sunrpc/xprtsock.c | 238 |
1 files changed, 87 insertions, 151 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 87ce7e8bb8dc..66891e32c5e3 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -63,6 +63,8 @@ static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE; static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) + #define XS_TCP_LINGER_TO (15U * HZ) static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; @@ -75,8 +77,6 @@ static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; * someone else's file names! */ -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) - static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT; @@ -627,7 +627,7 @@ process_status: * @xprt: transport * * Initiates a graceful shutdown of the TCP socket by calling the - * equivalent of shutdown(SHUT_WR); + * equivalent of shutdown(SHUT_RDWR); */ static void xs_tcp_shutdown(struct rpc_xprt *xprt) { @@ -635,7 +635,7 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt) struct socket *sock = transport->sock; if (sock != NULL) { - kernel_sock_shutdown(sock, SHUT_WR); + kernel_sock_shutdown(sock, SHUT_RDWR); trace_rpc_socket_shutdown(xprt, sock); } } @@ -718,9 +718,9 @@ static int xs_tcp_send_request(struct rpc_task *task) dprintk("RPC: sendmsg returned unrecognized error %d\n", -status); case -ECONNRESET: - xs_tcp_shutdown(xprt); case -ECONNREFUSED: case -ENOTCONN: + case -EADDRINUSE: case -EPIPE: clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); } @@ -773,6 +773,21 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s sk->sk_error_report = transport->old_error_report; } +static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) +{ + smp_mb__before_atomic(); + clear_bit(XPRT_CLOSE_WAIT, &xprt->state); + clear_bit(XPRT_CLOSING, &xprt->state); + smp_mb__after_atomic(); +} + +static void xs_sock_mark_closed(struct rpc_xprt *xprt) +{ + xs_sock_reset_connection_flags(xprt); + /* Mark transport as closed and wake up all pending tasks */ + xprt_disconnect_done(xprt); +} + /** * xs_error_report - callback to handle TCP socket state errors * @sk: socket @@ -792,11 +807,12 @@ static void xs_error_report(struct sock *sk) err = -sk->sk_err; if (err == 0) goto out; + /* Is this a reset event? */ + if (sk->sk_state == TCP_CLOSE) + xs_sock_mark_closed(xprt); dprintk("RPC: xs_error_report client %p, error=%d...\n", xprt, -err); trace_rpc_socket_error(xprt, sk->sk_socket, err); - if (test_bit(XPRT_CONNECTION_REUSE, &xprt->state)) - goto out; xprt_wake_pending_tasks(xprt, err); out: read_unlock_bh(&sk->sk_callback_lock); @@ -806,12 +822,11 @@ static void xs_reset_transport(struct sock_xprt *transport) { struct socket *sock = transport->sock; struct sock *sk = transport->inet; + struct rpc_xprt *xprt = &transport->xprt; if (sk == NULL) return; - transport->srcport = 0; - write_lock_bh(&sk->sk_callback_lock); transport->inet = NULL; transport->sock = NULL; @@ -820,8 +835,9 @@ static void xs_reset_transport(struct sock_xprt *transport) xs_restore_old_callbacks(transport, sk); write_unlock_bh(&sk->sk_callback_lock); + xs_sock_reset_connection_flags(xprt); - trace_rpc_socket_close(&transport->xprt, sock); + trace_rpc_socket_close(xprt, sock); sock_release(sock); } @@ -841,27 +857,12 @@ static void xs_close(struct rpc_xprt *xprt) dprintk("RPC: xs_close xprt %p\n", xprt); - cancel_delayed_work_sync(&transport->connect_worker); - xs_reset_transport(transport); xprt->reestablish_timeout = 0; - smp_mb__before_atomic(); - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); - clear_bit(XPRT_CLOSE_WAIT, &xprt->state); - clear_bit(XPRT_CLOSING, &xprt->state); - smp_mb__after_atomic(); xprt_disconnect_done(xprt); } -static void xs_tcp_close(struct rpc_xprt *xprt) -{ - if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state)) - xs_close(xprt); - else - xs_tcp_shutdown(xprt); -} - static void xs_xprt_free(struct rpc_xprt *xprt) { xs_free_peer_addresses(xprt); @@ -1032,7 +1033,6 @@ static void xs_udp_data_ready(struct sock *sk) */ static void xs_tcp_force_close(struct rpc_xprt *xprt) { - set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); xprt_force_disconnect(xprt); } @@ -1425,54 +1425,6 @@ out: read_unlock_bh(&sk->sk_callback_lock); } -/* - * Do the equivalent of linger/linger2 handling for dealing with - * broken servers that don't close the socket in a timely - * fashion - */ -static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt, - unsigned long timeout) -{ - struct sock_xprt *transport; - - if (xprt_test_and_set_connecting(xprt)) - return; - set_bit(XPRT_CONNECTION_ABORT, &xprt->state); - transport = container_of(xprt, struct sock_xprt, xprt); - queue_delayed_work(rpciod_workqueue, &transport->connect_worker, - timeout); -} - -static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt) -{ - struct sock_xprt *transport; - - transport = container_of(xprt, struct sock_xprt, xprt); - - if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) || - !cancel_delayed_work(&transport->connect_worker)) - return; - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); - xprt_clear_connecting(xprt); -} - -static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) -{ - smp_mb__before_atomic(); - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); - clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state); - clear_bit(XPRT_CLOSE_WAIT, &xprt->state); - clear_bit(XPRT_CLOSING, &xprt->state); - smp_mb__after_atomic(); -} - -static void xs_sock_mark_closed(struct rpc_xprt *xprt) -{ - xs_sock_reset_connection_flags(xprt); - /* Mark transport as closed and wake up all pending tasks */ - xprt_disconnect_done(xprt); -} - /** * xs_tcp_state_change - callback to handle TCP socket state changes * @sk: socket whose state has changed @@ -1521,7 +1473,6 @@ static void xs_tcp_state_change(struct sock *sk) clear_bit(XPRT_CONNECTED, &xprt->state); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); smp_mb__after_atomic(); - xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); break; case TCP_CLOSE_WAIT: /* The server initiated a shutdown of the socket */ @@ -1538,13 +1489,11 @@ static void xs_tcp_state_change(struct sock *sk) break; case TCP_LAST_ACK: set_bit(XPRT_CLOSING, &xprt->state); - xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); smp_mb__before_atomic(); clear_bit(XPRT_CONNECTED, &xprt->state); smp_mb__after_atomic(); break; case TCP_CLOSE: - xs_tcp_cancel_linger_timeout(xprt); xs_sock_mark_closed(xprt); } out: @@ -1667,6 +1616,40 @@ static unsigned short xs_get_random_port(void) } /** + * xs_set_reuseaddr_port - set the socket's port and address reuse options + * @sock: socket + * + * Note that this function has to be called on all sockets that share the + * same port, and it must be called before binding. + */ +static void xs_sock_set_reuseport(struct socket *sock) +{ + int opt = 1; + + kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, + (char *)&opt, sizeof(opt)); +} + +static unsigned short xs_sock_getport(struct socket *sock) +{ + struct sockaddr_storage buf; + int buflen; + unsigned short port = 0; + + if (kernel_getsockname(sock, (struct sockaddr *)&buf, &buflen) < 0) + goto out; + switch (buf.ss_family) { + case AF_INET6: + port = ntohs(((struct sockaddr_in6 *)&buf)->sin6_port); + break; + case AF_INET: + port = ntohs(((struct sockaddr_in *)&buf)->sin_port); + } +out: + return port; +} + +/** * xs_set_port - reset the port number in the remote endpoint address * @xprt: generic transport * @port: new port number @@ -1680,6 +1663,12 @@ static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) xs_update_peer_port(xprt); } +static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock) +{ + if (transport->srcport == 0) + transport->srcport = xs_sock_getport(sock); +} + static unsigned short xs_get_srcport(struct sock_xprt *transport) { unsigned short port = transport->srcport; @@ -1833,7 +1822,8 @@ static void xs_dummy_setup_socket(struct work_struct *work) } static struct socket *xs_create_sock(struct rpc_xprt *xprt, - struct sock_xprt *transport, int family, int type, int protocol) + struct sock_xprt *transport, int family, int type, + int protocol, bool reuseport) { struct socket *sock; int err; @@ -1846,6 +1836,9 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt, } xs_reclassify_socket(family, sock); + if (reuseport) + xs_sock_set_reuseport(sock); + err = xs_bind(transport, sock); if (err) { sock_release(sock); @@ -1903,7 +1896,6 @@ static int xs_local_setup_socket(struct sock_xprt *transport) struct socket *sock; int status = -EIO; - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); status = __sock_create(xprt->xprt_net, AF_LOCAL, SOCK_STREAM, 0, &sock, 1); if (status < 0) { @@ -2044,10 +2036,9 @@ static void xs_udp_setup_socket(struct work_struct *work) struct socket *sock = transport->sock; int status = -EIO; - /* Start by resetting any existing state */ - xs_reset_transport(transport); sock = xs_create_sock(xprt, transport, - xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP); + xs_addr(xprt)->sa_family, SOCK_DGRAM, + IPPROTO_UDP, false); if (IS_ERR(sock)) goto out; @@ -2061,61 +2052,11 @@ static void xs_udp_setup_socket(struct work_struct *work) trace_rpc_socket_connect(xprt, sock, 0); status = 0; out: + xprt_unlock_connect(xprt, transport); xprt_clear_connecting(xprt); xprt_wake_pending_tasks(xprt, status); } -/* - * We need to preserve the port number so the reply cache on the server can - * find our cached RPC replies when we get around to reconnecting. - */ -static void xs_abort_connection(struct sock_xprt *transport) -{ - int result; - struct sockaddr any; - - dprintk("RPC: disconnecting xprt %p to reuse port\n", transport); - - /* - * Disconnect the transport socket by doing a connect operation - * with AF_UNSPEC. This should return immediately... - */ - memset(&any, 0, sizeof(any)); - any.sa_family = AF_UNSPEC; - result = kernel_connect(transport->sock, &any, sizeof(any), 0); - trace_rpc_socket_reset_connection(&transport->xprt, - transport->sock, result); - if (!result) - xs_sock_reset_connection_flags(&transport->xprt); - dprintk("RPC: AF_UNSPEC connect return code %d\n", result); -} - -static void xs_tcp_reuse_connection(struct sock_xprt *transport) -{ - unsigned int state = transport->inet->sk_state; - - if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) { - /* we don't need to abort the connection if the socket - * hasn't undergone a shutdown - */ - if (transport->inet->sk_shutdown == 0) - return; - dprintk("RPC: %s: TCP_CLOSEd and sk_shutdown set to %d\n", - __func__, transport->inet->sk_shutdown); - } - if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) { - /* we don't need to abort the connection if the socket - * hasn't undergone a shutdown - */ - if (transport->inet->sk_shutdown == 0) - return; - dprintk("RPC: %s: ESTABLISHED/SYN_SENT " - "sk_shutdown set to %d\n", - __func__, transport->inet->sk_shutdown); - } - xs_abort_connection(transport); -} - static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); @@ -2149,9 +2090,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) sk->sk_allocation = GFP_ATOMIC; /* socket options */ - sk->sk_userlocks |= SOCK_BINDPORT_LOCK; sock_reset_flag(sk, SOCK_LINGER); - tcp_sk(sk)->linger2 = 0; tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; xprt_clear_connected(xprt); @@ -2174,6 +2113,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); switch (ret) { case 0: + xs_set_srcport(transport, sock); case -EINPROGRESS: /* SYN_SENT! */ if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) @@ -2200,25 +2140,13 @@ static void xs_tcp_setup_socket(struct work_struct *work) int status = -EIO; if (!sock) { - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); sock = xs_create_sock(xprt, transport, - xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP); + xs_addr(xprt)->sa_family, SOCK_STREAM, + IPPROTO_TCP, true); if (IS_ERR(sock)) { status = PTR_ERR(sock); goto out; } - } else { - int abort_and_exit; - - abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT, - &xprt->state); - /* "close" the socket, preserving the local port */ - set_bit(XPRT_CONNECTION_REUSE, &xprt->state); - xs_tcp_reuse_connection(transport); - clear_bit(XPRT_CONNECTION_REUSE, &xprt->state); - - if (abort_and_exit) - goto out_eagain; } dprintk("RPC: worker connecting xprt %p via %s to " @@ -2245,6 +2173,7 @@ static void xs_tcp_setup_socket(struct work_struct *work) case 0: case -EINPROGRESS: case -EALREADY: + xprt_unlock_connect(xprt, transport); xprt_clear_connecting(xprt); return; case -EINVAL: @@ -2254,13 +2183,15 @@ static void xs_tcp_setup_socket(struct work_struct *work) case -ECONNREFUSED: case -ECONNRESET: case -ENETUNREACH: + case -EADDRINUSE: case -ENOBUFS: /* retry with existing socket, after a delay */ + xs_tcp_force_close(xprt); goto out; } -out_eagain: status = -EAGAIN; out: + xprt_unlock_connect(xprt, transport); xprt_clear_connecting(xprt); xprt_wake_pending_tasks(xprt, status); } @@ -2283,6 +2214,11 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport)); + + /* Start by resetting any existing state */ + xs_reset_transport(transport); + if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) { dprintk("RPC: xs_connect delayed xprt %p for %lu " "seconds\n", @@ -2559,7 +2495,7 @@ static struct rpc_xprt_ops xs_tcp_ops = { .buf_free = rpc_free, .send_request = xs_tcp_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, - .close = xs_tcp_close, + .close = xs_tcp_shutdown, .destroy = xs_destroy, .print_stats = xs_tcp_print_stats, }; |