diff options
Diffstat (limited to 'net/tipc/socket.c')
| -rw-r--r-- | net/tipc/socket.c | 193 |
1 files changed, 142 insertions, 51 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c index b542f14ed444..41688da233ab 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -75,6 +75,7 @@ struct sockaddr_pair { * @conn_instance: TIPC instance used when connection was established * @published: non-zero if port has one or more associated names * @max_pkt: maximum packet size "hint" used when building messages sent by port + * @maxnagle: maximum size of msg which can be subject to nagle * @portid: unique port identity in TIPC socket hash table * @phdr: preformatted message header used when sending messages * #cong_links: list of congested links @@ -97,6 +98,7 @@ struct tipc_sock { u32 conn_instance; int published; u32 max_pkt; + u32 maxnagle; u32 portid; struct tipc_msg phdr; struct list_head cong_links; @@ -116,6 +118,10 @@ struct tipc_sock { struct tipc_mc_method mc_method; struct rcu_head rcu; struct tipc_group *group; + u32 oneway; + u16 snd_backlog; + bool expect_ack; + bool nodelay; bool group_is_open; }; @@ -137,6 +143,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk); static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz); static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); +static void tipc_sk_push_backlog(struct tipc_sock *tsk); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -227,6 +234,26 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen) return 1; } +/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle + */ +static void tsk_set_nagle(struct tipc_sock *tsk) +{ + struct sock *sk = &tsk->sk; + + tsk->maxnagle = 0; + if (sk->sk_type != SOCK_STREAM) + return; + if (tsk->nodelay) + return; + if (!(tsk->peer_caps & TIPC_NAGLE)) + return; + /* Limit node local buffer size to avoid receive queue overflow */ + if (tsk->max_pkt == MAX_MSG_SIZE) + tsk->maxnagle = 1500; + else + tsk->maxnagle = tsk->max_pkt; +} + /** * tsk_advance_rx_queue - discard first buffer in socket receive queue * @@ -446,6 +473,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk = tipc_sk(sk); tsk->max_pkt = MAX_PKT_DEFAULT; + tsk->maxnagle = 0; INIT_LIST_HEAD(&tsk->publications); INIT_LIST_HEAD(&tsk->cong_links); msg = &tsk->phdr; @@ -486,7 +514,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, if (sock->type == SOCK_DGRAM) tsk_set_unreliable(tsk, true); } - + __skb_queue_head_init(&tsk->mc_method.deferredq); trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " "); return 0; } @@ -504,7 +532,7 @@ static void __tipc_shutdown(struct socket *sock, int error) struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); struct net *net = sock_net(sk); - long timeout = CONN_TIMEOUT_DEFAULT; + long timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT); u32 dnode = tsk_peer_node(tsk); struct sk_buff *skb; @@ -512,7 +540,9 @@ static void __tipc_shutdown(struct socket *sock, int error) tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))); - /* Remove any pending SYN message */ + /* Push out delayed messages if in Nagle mode */ + tipc_sk_push_backlog(tsk); + /* Remove pending SYN */ __skb_queue_purge(&sk->sk_write_queue); /* Reject all unreceived messages, except on an active connection @@ -582,6 +612,7 @@ static int tipc_release(struct socket *sock) sk->sk_shutdown = SHUTDOWN_MASK; tipc_sk_leave(tsk); tipc_sk_withdraw(tsk, 0, NULL); + __skb_queue_purge(&tsk->mc_method.deferredq); sk_stop_timer(sk, &sk->sk_timer); tipc_sk_remove(tsk); @@ -734,12 +765,12 @@ static __poll_t tipc_poll(struct file *file, struct socket *sock, switch (sk->sk_state) { case TIPC_ESTABLISHED: - case TIPC_CONNECTING: if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) revents |= EPOLLOUT; /* fall through */ case TIPC_LISTEN: - if (!skb_queue_empty(&sk->sk_receive_queue)) + case TIPC_CONNECTING: + if (!skb_queue_empty_lockless(&sk->sk_receive_queue)) revents |= EPOLLIN | EPOLLRDNORM; break; case TIPC_OPEN: @@ -747,7 +778,7 @@ static __poll_t tipc_poll(struct file *file, struct socket *sock, revents |= EPOLLOUT; if (!tipc_sk_type_connectionless(sk)) break; - if (skb_queue_empty(&sk->sk_receive_queue)) + if (skb_queue_empty_lockless(&sk->sk_receive_queue)) break; revents |= EPOLLIN | EPOLLRDNORM; break; @@ -808,7 +839,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, msg_set_nameupper(hdr, seq->upper); /* Build message as chain of buffers */ - skb_queue_head_init(&pkts); + __skb_queue_head_init(&pkts); rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); /* Send message if build was successful */ @@ -852,8 +883,8 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, msg_set_grp_bc_seqno(hdr, bc_snd_nxt); /* Build message as chain of buffers */ - skb_queue_head_init(&pkts); - mtu = tipc_node_get_mtu(net, dnode, tsk->portid); + __skb_queue_head_init(&pkts); + mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false); rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); if (unlikely(rc != dlen)) return rc; @@ -1057,7 +1088,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, msg_set_grp_bc_ack_req(hdr, ack); /* Build message as chain of buffers */ - skb_queue_head_init(&pkts); + __skb_queue_head_init(&pkts); rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); if (unlikely(rc != dlen)) return rc; @@ -1207,6 +1238,32 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, tipc_sk_rcv(net, inputq); } +/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue + * when socket is in Nagle mode + */ +static void tipc_sk_push_backlog(struct tipc_sock *tsk) +{ + struct sk_buff_head *txq = &tsk->sk.sk_write_queue; + struct net *net = sock_net(&tsk->sk); + u32 dnode = tsk_peer_node(tsk); + struct sk_buff *skb = skb_peek(txq); + int rc; + + if (!skb || tsk->cong_link_cnt) + return; + + /* Do not send SYN again after congestion */ + if (msg_is_syn(buf_msg(skb))) + return; + + tsk->snt_unacked += tsk->snd_backlog; + tsk->snd_backlog = 0; + tsk->expect_ack = true; + rc = tipc_node_xmit(net, txq, dnode, tsk->portid); + if (rc == -ELINKCONG) + tsk->cong_link_cnt = 1; +} + /** * tipc_sk_conn_proto_rcv - receive a connection mng protocol message * @tsk: receiving socket @@ -1220,7 +1277,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, u32 onode = tsk_own_node(tsk); struct sock *sk = &tsk->sk; int mtyp = msg_type(hdr); - bool conn_cong; + bool was_cong; /* Ignore if connection cannot be validated: */ if (!tsk_peer_msg(tsk, hdr)) { @@ -1253,11 +1310,13 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, __skb_queue_tail(xmitq, skb); return; } else if (mtyp == CONN_ACK) { - conn_cong = tsk_conn_cong(tsk); + was_cong = tsk_conn_cong(tsk); + tsk->expect_ack = false; + tipc_sk_push_backlog(tsk); tsk->snt_unacked -= msg_conn_ack(hdr); if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) tsk->snd_win = msg_adv_win(hdr); - if (conn_cong) + if (was_cong && !tsk_conn_cong(tsk)) sk->sk_write_space(sk); } else if (mtyp != CONN_PROBE_REPLY) { pr_warn("Received unknown CONN_PROTO msg\n"); @@ -1386,13 +1445,15 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (unlikely(rc)) return rc; - skb_queue_head_init(&pkts); - mtu = tipc_node_get_mtu(net, dnode, tsk->portid); + __skb_queue_head_init(&pkts); + mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false); rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); if (unlikely(rc != dlen)) return rc; - if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) + if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) { + __skb_queue_purge(&pkts); return -ENOMEM; + } trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " "); rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); @@ -1436,15 +1497,15 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) struct sock *sk = sock->sk; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + struct sk_buff_head *txq = &sk->sk_write_queue; struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = &tsk->phdr; struct net *net = sock_net(sk); - struct sk_buff_head pkts; u32 dnode = tsk_peer_node(tsk); + int maxnagle = tsk->maxnagle; + int maxpkt = tsk->max_pkt; int send, sent = 0; - int rc = 0; - - skb_queue_head_init(&pkts); + int blocks, rc = 0; if (unlikely(dlen > INT_MAX)) return -EMSGSIZE; @@ -1466,21 +1527,35 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) tipc_sk_connected(sk))); if (unlikely(rc)) break; - send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); - rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts); - if (unlikely(rc != send)) - break; - - trace_tipc_sk_sendstream(sk, skb_peek(&pkts), + blocks = tsk->snd_backlog; + if (tsk->oneway++ >= 4 && send <= maxnagle) { + rc = tipc_msg_append(hdr, m, send, maxnagle, txq); + if (unlikely(rc < 0)) + break; + blocks += rc; + if (blocks <= 64 && tsk->expect_ack) { + tsk->snd_backlog = blocks; + sent += send; + break; + } + tsk->expect_ack = true; + } else { + rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq); + if (unlikely(rc != send)) + break; + blocks += tsk_inc(tsk, send + MIN_H_SIZE); + } + trace_tipc_sk_sendstream(sk, skb_peek(txq), TIPC_DUMP_SK_SNDQ, " "); - rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); + rc = tipc_node_xmit(net, txq, dnode, tsk->portid); if (unlikely(rc == -ELINKCONG)) { tsk->cong_link_cnt = 1; rc = 0; } if (likely(!rc)) { - tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE); + tsk->snt_unacked += blocks; + tsk->snd_backlog = 0; sent += send; } } while (sent < dlen && !rc); @@ -1525,8 +1600,9 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port, sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV); tipc_set_sk_state(sk, TIPC_ESTABLISHED); tipc_node_add_conn(net, peer_node, tsk->portid, peer_port); - tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid); + tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true); tsk->peer_caps = tipc_node_get_capabilities(net, peer_node); + tsk_set_nagle(tsk); __skb_queue_purge(&sk->sk_write_queue); if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) return; @@ -1804,7 +1880,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, /* Send group flow control advertisement when applicable */ if (tsk->group && msg_in_group(hdr) && !grp_evt) { - skb_queue_head_init(&xmitq); + __skb_queue_head_init(&xmitq); tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen), msg_orignode(hdr), msg_origport(hdr), &xmitq); @@ -1847,6 +1923,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, bool peek = flags & MSG_PEEK; int offset, required, copy, copied = 0; int hlen, dlen, err, rc; + bool ack = false; long timeout; /* Catch invalid receive attempts */ @@ -1891,6 +1968,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, /* Copy data if msg ok, otherwise return error/partial data */ if (likely(!err)) { + ack = msg_ack_required(hdr); offset = skb_cb->bytes_read; copy = min_t(int, dlen - offset, buflen - copied); rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy); @@ -1918,7 +1996,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, /* Send connection flow control advertisement when applicable */ tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); - if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)) + if (ack || tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) tipc_sk_send_ack(tsk); /* Exit if all requested data or FIN/error received */ @@ -1989,6 +2067,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, smp_wmb(); tsk->cong_link_cnt--; wakeup = true; + tipc_sk_push_backlog(tsk); break; case GROUP_PROTOCOL: tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq); @@ -2028,6 +2107,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) if (unlikely(msg_mcast(hdr))) return false; + tsk->oneway = 0; switch (sk->sk_state) { case TIPC_CONNECTING: @@ -2041,7 +2121,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) if (msg_data_sz(hdr)) return true; /* Empty ACK-, - wake up sleeping connect() and drop */ - sk->sk_data_ready(sk); + sk->sk_state_change(sk); msg_set_dest_droppable(hdr, 1); return false; } @@ -2073,6 +2153,8 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) return true; return false; case TIPC_ESTABLISHED: + if (!skb_queue_empty(&sk->sk_write_queue)) + tipc_sk_push_backlog(tsk); /* Accept only connection-based messages sent by peer */ if (likely(con_msg && !err && pport == oport && pnode == onode)) return true; @@ -2118,13 +2200,13 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) struct tipc_msg *hdr = buf_msg(skb); if (unlikely(msg_in_group(hdr))) - return sk->sk_rcvbuf; + return READ_ONCE(sk->sk_rcvbuf); if (unlikely(!msg_connected(hdr))) - return sk->sk_rcvbuf << msg_importance(hdr); + return READ_ONCE(sk->sk_rcvbuf) << msg_importance(hdr); if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL)) - return sk->sk_rcvbuf; + return READ_ONCE(sk->sk_rcvbuf); return FLOWCTL_MSG_LIM; } @@ -2149,6 +2231,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, struct tipc_msg *hdr = buf_msg(skb); struct net *net = sock_net(sk); struct sk_buff_head inputq; + int mtyp = msg_type(hdr); int limit, err = TIPC_OK; trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " "); @@ -2162,6 +2245,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(grp)) tipc_group_filter_msg(grp, &inputq, xmitq); + if (unlikely(!grp) && mtyp == TIPC_MCAST_MSG) + tipc_mcast_filter_msg(net, &tsk->mc_method.deferredq, &inputq); + /* Validate and add to receive buffer if there is space */ while ((skb = __skb_dequeue(&inputq))) { hdr = buf_msg(skb); @@ -2669,13 +2755,14 @@ static void tipc_sk_timeout(struct timer_list *t) struct sk_buff_head list; int rc = 0; - skb_queue_head_init(&list); + __skb_queue_head_init(&list); bh_lock_sock(sk); /* Try again later if socket is busy */ if (sock_owned_by_user(sk)) { sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20); bh_unlock_sock(sk); + sock_put(sk); return; } @@ -2799,7 +2886,7 @@ static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid) struct tipc_sock *tsk; rcu_read_lock(); - tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params); + tsk = rhashtable_lookup(&tn->sk_rht, &portid, tsk_rht_params); if (tsk) sock_hold(&tsk->sk); rcu_read_unlock(); @@ -2954,6 +3041,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, case TIPC_SRC_DROPPABLE: case TIPC_DEST_DROPPABLE: case TIPC_CONN_TIMEOUT: + case TIPC_NODELAY: if (ol < sizeof(value)) return -EINVAL; if (get_user(value, (u32 __user *)ov)) @@ -3002,6 +3090,10 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, case TIPC_GROUP_LEAVE: res = tipc_sk_leave(tsk); break; + case TIPC_NODELAY: + tsk->nodelay = !!value; + tsk_set_nagle(tsk); + break; default: res = -EINVAL; } @@ -3064,6 +3156,9 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, case TIPC_SOCK_RECVQ_DEPTH: value = skb_queue_len(&sk->sk_receive_queue); break; + case TIPC_SOCK_RECVQ_USED: + value = sk_rmem_alloc_get(sk); + break; case TIPC_GROUP_JOIN: seq.type = 0; if (tsk->group) @@ -3264,7 +3359,7 @@ static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk) peer_node = tsk_peer_node(tsk); peer_port = tsk_peer_port(tsk); - nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON); + nest = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_CON); if (!nest) return -EMSGSIZE; @@ -3323,7 +3418,7 @@ static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb, if (!hdr) goto msg_cancel; - attrs = nla_nest_start(skb, TIPC_NLA_SOCK); + attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK); if (!attrs) goto genlmsg_cancel; @@ -3428,7 +3523,7 @@ int tipc_sk_fill_sock_diag(struct sk_buff *skb, struct netlink_callback *cb, if (!(sk_filter_state & (1 << sk->sk_state))) return 0; - attrs = nla_nest_start(skb, TIPC_NLA_SOCK); + attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK); if (!attrs) goto msg_cancel; @@ -3446,7 +3541,7 @@ int tipc_sk_fill_sock_diag(struct sk_buff *skb, struct netlink_callback *cb, TIPC_NLA_SOCK_PAD)) goto attr_msg_cancel; - stat = nla_nest_start(skb, TIPC_NLA_SOCK_STAT); + stat = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_STAT); if (!stat) goto attr_msg_cancel; @@ -3503,7 +3598,7 @@ static int __tipc_nl_add_sk_publ(struct sk_buff *skb, if (!hdr) goto msg_cancel; - attrs = nla_nest_start(skb, TIPC_NLA_PUBL); + attrs = nla_nest_start_noflag(skb, TIPC_NLA_PUBL); if (!attrs) goto genlmsg_cancel; @@ -3580,19 +3675,15 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb) struct tipc_sock *tsk; if (!tsk_portid) { - struct nlattr **attrs; + struct nlattr **attrs = genl_dumpit_info(cb)->attrs; struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1]; - err = tipc_nlmsg_parse(cb->nlh, &attrs); - if (err) - return err; - if (!attrs[TIPC_NLA_SOCK]) return -EINVAL; - err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX, - attrs[TIPC_NLA_SOCK], - tipc_nl_sock_policy, NULL); + err = nla_parse_nested_deprecated(sock, TIPC_NLA_SOCK_MAX, + attrs[TIPC_NLA_SOCK], + tipc_nl_sock_policy, NULL); if (err) return err; @@ -3782,7 +3873,7 @@ int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf) i += scnprintf(buf + i, sz - i, " %d", sk->sk_sndbuf); i += scnprintf(buf + i, sz - i, " | %d", sk_rmem_alloc_get(sk)); i += scnprintf(buf + i, sz - i, " %d", sk->sk_rcvbuf); - i += scnprintf(buf + i, sz - i, " | %d\n", sk->sk_backlog.len); + i += scnprintf(buf + i, sz - i, " | %d\n", READ_ONCE(sk->sk_backlog.len)); if (dqueues & TIPC_DUMP_SK_SNDQ) { i += scnprintf(buf + i, sz - i, "sk_write_queue: "); |
