diff options
Diffstat (limited to 'net/tipc/socket.c')
| -rw-r--r-- | net/tipc/socket.c | 171 | 
1 files changed, 129 insertions, 42 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 4b92b196cfa6..6552f986774c 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -75,6 +75,7 @@ struct sockaddr_pair {   * @conn_instance: TIPC instance used when connection was established   * @published: non-zero if port has one or more associated names   * @max_pkt: maximum packet size "hint" used when building messages sent by port + * @maxnagle: maximum size of msg which can be subject to nagle   * @portid: unique port identity in TIPC socket hash table   * @phdr: preformatted message header used when sending messages   * #cong_links: list of congested links @@ -97,6 +98,7 @@ struct tipc_sock {  	u32 conn_instance;  	int published;  	u32 max_pkt; +	u32 maxnagle;  	u32 portid;  	struct tipc_msg phdr;  	struct list_head cong_links; @@ -116,6 +118,10 @@ struct tipc_sock {  	struct tipc_mc_method mc_method;  	struct rcu_head rcu;  	struct tipc_group *group; +	u32 oneway; +	u16 snd_backlog; +	bool expect_ack; +	bool nodelay;  	bool group_is_open;  }; @@ -137,6 +143,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk);  static void tipc_sk_remove(struct tipc_sock *tsk);  static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); +static void tipc_sk_push_backlog(struct tipc_sock *tsk);  static const struct proto_ops packet_ops;  static const struct proto_ops stream_ops; @@ -227,6 +234,26 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen)  	return 1;  } +/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle + */ +static void tsk_set_nagle(struct tipc_sock *tsk) +{ +	struct sock *sk = &tsk->sk; + +	tsk->maxnagle = 0; +	if (sk->sk_type != SOCK_STREAM) +		return; +	if (tsk->nodelay) +		return; +	if (!(tsk->peer_caps & TIPC_NAGLE)) +		return; +	/* Limit node local buffer size to avoid receive queue overflow */ +	if (tsk->max_pkt == MAX_MSG_SIZE) +		tsk->maxnagle = 1500; +	else +		tsk->maxnagle = tsk->max_pkt; +} +  /**   * tsk_advance_rx_queue - discard first buffer in socket receive queue   * @@ -446,6 +473,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,  	tsk = tipc_sk(sk);  	tsk->max_pkt = MAX_PKT_DEFAULT; +	tsk->maxnagle = 0;  	INIT_LIST_HEAD(&tsk->publications);  	INIT_LIST_HEAD(&tsk->cong_links);  	msg = &tsk->phdr; @@ -504,7 +532,7 @@ static void __tipc_shutdown(struct socket *sock, int error)  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk);  	struct net *net = sock_net(sk); -	long timeout = CONN_TIMEOUT_DEFAULT; +	long timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);  	u32 dnode = tsk_peer_node(tsk);  	struct sk_buff *skb; @@ -512,7 +540,9 @@ static void __tipc_shutdown(struct socket *sock, int error)  	tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&  					    !tsk_conn_cong(tsk))); -	/* Remove any pending SYN message */ +	/* Push out delayed messages if in Nagle mode */ +	tipc_sk_push_backlog(tsk); +	/* Remove pending SYN */  	__skb_queue_purge(&sk->sk_write_queue);  	/* Reject all unreceived messages, except on an active connection @@ -854,7 +884,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,  	/* Build message as chain of buffers */  	__skb_queue_head_init(&pkts); -	mtu = tipc_node_get_mtu(net, dnode, tsk->portid); +	mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);  	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);  	if (unlikely(rc != dlen))  		return rc; @@ -1208,6 +1238,32 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  	tipc_sk_rcv(net, inputq);  } +/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue + *                         when socket is in Nagle mode + */ +static void tipc_sk_push_backlog(struct tipc_sock *tsk) +{ +	struct sk_buff_head *txq = &tsk->sk.sk_write_queue; +	struct net *net = sock_net(&tsk->sk); +	u32 dnode = tsk_peer_node(tsk); +	struct sk_buff *skb = skb_peek(txq); +	int rc; + +	if (!skb || tsk->cong_link_cnt) +		return; + +	/* Do not send SYN again after congestion */ +	if (msg_is_syn(buf_msg(skb))) +		return; + +	tsk->snt_unacked += tsk->snd_backlog; +	tsk->snd_backlog = 0; +	tsk->expect_ack = true; +	rc = tipc_node_xmit(net, txq, dnode, tsk->portid); +	if (rc == -ELINKCONG) +		tsk->cong_link_cnt = 1; +} +  /**   * tipc_sk_conn_proto_rcv - receive a connection mng protocol message   * @tsk: receiving socket @@ -1221,7 +1277,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,  	u32 onode = tsk_own_node(tsk);  	struct sock *sk = &tsk->sk;  	int mtyp = msg_type(hdr); -	bool conn_cong; +	bool was_cong;  	/* Ignore if connection cannot be validated: */  	if (!tsk_peer_msg(tsk, hdr)) { @@ -1254,11 +1310,13 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,  			__skb_queue_tail(xmitq, skb);  		return;  	} else if (mtyp == CONN_ACK) { -		conn_cong = tsk_conn_cong(tsk); +		was_cong = tsk_conn_cong(tsk); +		tsk->expect_ack = false; +		tipc_sk_push_backlog(tsk);  		tsk->snt_unacked -= msg_conn_ack(hdr);  		if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)  			tsk->snd_win = msg_adv_win(hdr); -		if (conn_cong) +		if (was_cong && !tsk_conn_cong(tsk))  			sk->sk_write_space(sk);  	} else if (mtyp != CONN_PROBE_REPLY) {  		pr_warn("Received unknown CONN_PROTO msg\n"); @@ -1306,8 +1364,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)  	struct tipc_msg *hdr = &tsk->phdr;  	struct tipc_name_seq *seq;  	struct sk_buff_head pkts; -	u32 dport, dnode = 0; -	u32 type, inst; +	u32 dport = 0, dnode = 0; +	u32 type = 0, inst = 0;  	int mtu, rc;  	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) @@ -1360,23 +1418,11 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)  		type = dest->addr.name.name.type;  		inst = dest->addr.name.name.instance;  		dnode = dest->addr.name.domain; -		msg_set_type(hdr, TIPC_NAMED_MSG); -		msg_set_hdr_sz(hdr, NAMED_H_SIZE); -		msg_set_nametype(hdr, type); -		msg_set_nameinst(hdr, inst); -		msg_set_lookup_scope(hdr, tipc_node2scope(dnode));  		dport = tipc_nametbl_translate(net, type, inst, &dnode); -		msg_set_destnode(hdr, dnode); -		msg_set_destport(hdr, dport);  		if (unlikely(!dport && !dnode))  			return -EHOSTUNREACH;  	} else if (dest->addrtype == TIPC_ADDR_ID) {  		dnode = dest->addr.id.node; -		msg_set_type(hdr, TIPC_DIRECT_MSG); -		msg_set_lookup_scope(hdr, 0); -		msg_set_destnode(hdr, dnode); -		msg_set_destport(hdr, dest->addr.id.ref); -		msg_set_hdr_sz(hdr, BASIC_H_SIZE);  	} else {  		return -EINVAL;  	} @@ -1387,13 +1433,31 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)  	if (unlikely(rc))  		return rc; +	if (dest->addrtype == TIPC_ADDR_NAME) { +		msg_set_type(hdr, TIPC_NAMED_MSG); +		msg_set_hdr_sz(hdr, NAMED_H_SIZE); +		msg_set_nametype(hdr, type); +		msg_set_nameinst(hdr, inst); +		msg_set_lookup_scope(hdr, tipc_node2scope(dnode)); +		msg_set_destnode(hdr, dnode); +		msg_set_destport(hdr, dport); +	} else { /* TIPC_ADDR_ID */ +		msg_set_type(hdr, TIPC_DIRECT_MSG); +		msg_set_lookup_scope(hdr, 0); +		msg_set_destnode(hdr, dnode); +		msg_set_destport(hdr, dest->addr.id.ref); +		msg_set_hdr_sz(hdr, BASIC_H_SIZE); +	} +  	__skb_queue_head_init(&pkts); -	mtu = tipc_node_get_mtu(net, dnode, tsk->portid); +	mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);  	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);  	if (unlikely(rc != dlen))  		return rc; -	if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) +	if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) { +		__skb_queue_purge(&pkts);  		return -ENOMEM; +	}  	trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");  	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); @@ -1437,15 +1501,15 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)  	struct sock *sk = sock->sk;  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);  	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); +	struct sk_buff_head *txq = &sk->sk_write_queue;  	struct tipc_sock *tsk = tipc_sk(sk);  	struct tipc_msg *hdr = &tsk->phdr;  	struct net *net = sock_net(sk); -	struct sk_buff_head pkts;  	u32 dnode = tsk_peer_node(tsk); +	int maxnagle = tsk->maxnagle; +	int maxpkt = tsk->max_pkt;  	int send, sent = 0; -	int rc = 0; - -	__skb_queue_head_init(&pkts); +	int blocks, rc = 0;  	if (unlikely(dlen > INT_MAX))  		return -EMSGSIZE; @@ -1467,21 +1531,35 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)  					 tipc_sk_connected(sk)));  		if (unlikely(rc))  			break; -  		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); -		rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts); -		if (unlikely(rc != send)) -			break; - -		trace_tipc_sk_sendstream(sk, skb_peek(&pkts), +		blocks = tsk->snd_backlog; +		if (tsk->oneway++ >= 4 && send <= maxnagle) { +			rc = tipc_msg_append(hdr, m, send, maxnagle, txq); +			if (unlikely(rc < 0)) +				break; +			blocks += rc; +			if (blocks <= 64 && tsk->expect_ack) { +				tsk->snd_backlog = blocks; +				sent += send; +				break; +			} +			tsk->expect_ack = true; +		} else { +			rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq); +			if (unlikely(rc != send)) +				break; +			blocks += tsk_inc(tsk, send + MIN_H_SIZE); +		} +		trace_tipc_sk_sendstream(sk, skb_peek(txq),  					 TIPC_DUMP_SK_SNDQ, " "); -		rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); +		rc = tipc_node_xmit(net, txq, dnode, tsk->portid);  		if (unlikely(rc == -ELINKCONG)) {  			tsk->cong_link_cnt = 1;  			rc = 0;  		}  		if (likely(!rc)) { -			tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE); +			tsk->snt_unacked += blocks; +			tsk->snd_backlog = 0;  			sent += send;  		}  	} while (sent < dlen && !rc); @@ -1526,8 +1604,9 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,  	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);  	tipc_set_sk_state(sk, TIPC_ESTABLISHED);  	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port); -	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid); +	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);  	tsk->peer_caps = tipc_node_get_capabilities(net, peer_node); +	tsk_set_nagle(tsk);  	__skb_queue_purge(&sk->sk_write_queue);  	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)  		return; @@ -1848,6 +1927,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,  	bool peek = flags & MSG_PEEK;  	int offset, required, copy, copied = 0;  	int hlen, dlen, err, rc; +	bool ack = false;  	long timeout;  	/* Catch invalid receive attempts */ @@ -1892,6 +1972,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,  		/* Copy data if msg ok, otherwise return error/partial data */  		if (likely(!err)) { +			ack = msg_ack_required(hdr);  			offset = skb_cb->bytes_read;  			copy = min_t(int, dlen - offset, buflen - copied);  			rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy); @@ -1919,7 +2000,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,  		/* Send connection flow control advertisement when applicable */  		tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); -		if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)) +		if (ack || tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)  			tipc_sk_send_ack(tsk);  		/* Exit if all requested data or FIN/error received */ @@ -1990,6 +2071,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,  		smp_wmb();  		tsk->cong_link_cnt--;  		wakeup = true; +		tipc_sk_push_backlog(tsk);  		break;  	case GROUP_PROTOCOL:  		tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq); @@ -2029,6 +2111,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)  	if (unlikely(msg_mcast(hdr)))  		return false; +	tsk->oneway = 0;  	switch (sk->sk_state) {  	case TIPC_CONNECTING: @@ -2074,6 +2157,8 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)  			return true;  		return false;  	case TIPC_ESTABLISHED: +		if (!skb_queue_empty(&sk->sk_write_queue)) +			tipc_sk_push_backlog(tsk);  		/* Accept only connection-based messages sent by peer */  		if (likely(con_msg && !err && pport == oport && pnode == onode))  			return true; @@ -2681,6 +2766,7 @@ static void tipc_sk_timeout(struct timer_list *t)  	if (sock_owned_by_user(sk)) {  		sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);  		bh_unlock_sock(sk); +		sock_put(sk);  		return;  	} @@ -2804,7 +2890,7 @@ static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)  	struct tipc_sock *tsk;  	rcu_read_lock(); -	tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params); +	tsk = rhashtable_lookup(&tn->sk_rht, &portid, tsk_rht_params);  	if (tsk)  		sock_hold(&tsk->sk);  	rcu_read_unlock(); @@ -2959,6 +3045,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,  	case TIPC_SRC_DROPPABLE:  	case TIPC_DEST_DROPPABLE:  	case TIPC_CONN_TIMEOUT: +	case TIPC_NODELAY:  		if (ol < sizeof(value))  			return -EINVAL;  		if (get_user(value, (u32 __user *)ov)) @@ -3007,6 +3094,10 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,  	case TIPC_GROUP_LEAVE:  		res = tipc_sk_leave(tsk);  		break; +	case TIPC_NODELAY: +		tsk->nodelay = !!value; +		tsk_set_nagle(tsk); +		break;  	default:  		res = -EINVAL;  	} @@ -3588,13 +3679,9 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)  	struct tipc_sock *tsk;  	if (!tsk_portid) { -		struct nlattr **attrs; +		struct nlattr **attrs = genl_dumpit_info(cb)->attrs;  		struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1]; -		err = tipc_nlmsg_parse(cb->nlh, &attrs); -		if (err) -			return err; -  		if (!attrs[TIPC_NLA_SOCK])  			return -EINVAL;  | 
