diff options
Diffstat (limited to 'net/tipc')
| -rw-r--r-- | net/tipc/group.c | 148 | ||||
| -rw-r--r-- | net/tipc/group.h | 11 | ||||
| -rw-r--r-- | net/tipc/msg.h | 5 | ||||
| -rw-r--r-- | net/tipc/socket.c | 48 | 
4 files changed, 190 insertions, 22 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c index 1bfa9348b26d..b8ed70abba01 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -46,6 +46,7 @@  #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)  #define ADV_IDLE ADV_UNIT +#define ADV_ACTIVE (ADV_UNIT * 12)  enum mbr_state {  	MBR_QUARANTINED, @@ -59,16 +60,22 @@ enum mbr_state {  struct tipc_member {  	struct rb_node tree_node;  	struct list_head list; +	struct list_head congested;  	struct sk_buff *event_msg; +	struct tipc_group *group;  	u32 node;  	u32 port;  	u32 instance;  	enum mbr_state state; +	u16 advertised; +	u16 window;  	u16 bc_rcv_nxt; +	bool usr_pending;  };  struct tipc_group {  	struct rb_root members; +	struct list_head congested;  	struct tipc_nlist dests;  	struct net *net;  	int subid; @@ -86,11 +93,24 @@ struct tipc_group {  static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,  				  int mtyp, struct sk_buff_head *xmitq); +static int tipc_group_rcvbuf_limit(struct tipc_group *grp) +{ +	int mcnt = grp->member_cnt + 1; + +	/* Scale to bytes, considering worst-case truesize/msgsize ratio */ +	return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4; +} +  u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)  {  	return grp->bc_snd_nxt;  } +static bool tipc_group_is_enabled(struct tipc_member *m) +{ +	return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; +} +  static bool tipc_group_is_receiver(struct tipc_member *m)  {  	return m && m->state >= MBR_JOINED; @@ -111,6 +131,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,  	if (!grp)  		return NULL;  	tipc_nlist_init(&grp->dests, tipc_own_addr(net)); +	INIT_LIST_HEAD(&grp->congested);  	grp->members = RB_ROOT;  	grp->net = net;  	grp->portid = portid; @@ -213,6 +234,8 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,  	if (!m)  		return NULL;  	INIT_LIST_HEAD(&m->list); +	INIT_LIST_HEAD(&m->congested); +	m->group = grp;  	m->node = node;  	m->port = port;  	grp->member_cnt++; @@ -233,6 +256,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,  	rb_erase(&m->tree_node, &grp->members);  	grp->member_cnt--;  	list_del_init(&m->list); +	list_del_init(&m->congested);  	/* If last member on a node, remove node from dest list */  	if (!tipc_group_find_node(grp, m->node)) @@ -255,11 +279,59 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,  	*scope = grp->scope;  } -void tipc_group_update_bc_members(struct tipc_group *grp) +void tipc_group_update_member(struct tipc_member *m, int len) +{ +	struct tipc_group *grp = m->group; +	struct tipc_member *_m, *tmp; + +	if (!tipc_group_is_enabled(m)) +		return; + +	m->window -= len; + +	if (m->window >= ADV_IDLE) +		return; + +	if (!list_empty(&m->congested)) +		return; + +	/* Sort member into congested members' list */ +	list_for_each_entry_safe(_m, tmp, &grp->congested, congested) { +		if (m->window > _m->window) +			continue; +		list_add_tail(&m->congested, &_m->congested); +		return; +	} +	list_add_tail(&m->congested, &grp->congested); +} + +void tipc_group_update_bc_members(struct tipc_group *grp, int len)  { +	struct tipc_member *m; +	struct rb_node *n; + +	for (n = rb_first(&grp->members); n; n = rb_next(n)) { +		m = container_of(n, struct tipc_member, tree_node); +		if (tipc_group_is_enabled(m)) +			tipc_group_update_member(m, len); +	}  	grp->bc_snd_nxt++;  } +bool tipc_group_bc_cong(struct tipc_group *grp, int len) +{ +	struct tipc_member *m; + +	if (list_empty(&grp->congested)) +		return false; + +	m = list_first_entry(&grp->congested, struct tipc_member, congested); +	if (m->window >= len) +		return false; + +	return true; +} +  /* tipc_group_filter_msg() - determine if we should accept arriving message   */  void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, @@ -302,11 +374,36 @@ drop:  	kfree_skb(skb);  } +void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, +			       u32 port, struct sk_buff_head *xmitq) +{ +	struct tipc_member *m; + +	m = tipc_group_find_member(grp, node, port); +	if (!m) +		return; + +	m->advertised -= blks; + +	switch (m->state) { +	case MBR_JOINED: +		if (m->advertised <= (ADV_ACTIVE - ADV_UNIT)) +			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); +		break; +	case MBR_DISCOVERED: +	case MBR_JOINING: +	case MBR_LEAVING: +	default: +		break; +	} +} +  static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,  				  int mtyp, struct sk_buff_head *xmitq)  {  	struct tipc_msg *hdr;  	struct sk_buff *skb; +	int adv = 0;  	skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,  			      m->node, tipc_own_addr(grp->net), @@ -314,14 +411,24 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,  	if (!skb)  		return; +	if (m->state == MBR_JOINED) +		adv = ADV_ACTIVE - m->advertised; +  	hdr = buf_msg(skb); -	if (mtyp == GRP_JOIN_MSG) + +	if (mtyp == GRP_JOIN_MSG) {  		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); +		msg_set_adv_win(hdr, adv); +		m->advertised += adv; +	} else if (mtyp == GRP_ADV_MSG) { +		msg_set_adv_win(hdr, adv); +		m->advertised += adv; +	}  	__skb_queue_tail(xmitq, skb);  } -void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, -			  struct sk_buff_head *inputq, +void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, +			  struct tipc_msg *hdr, struct sk_buff_head *inputq,  			  struct sk_buff_head *xmitq)  {  	u32 node = msg_orignode(hdr); @@ -341,14 +448,22 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,  		if (!m)  			return;  		m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); +		m->window += msg_adv_win(hdr);  		/* Wait until PUBLISH event is received */  		if (m->state == MBR_DISCOVERED) {  			m->state = MBR_JOINING;  		} else if (m->state == MBR_PUBLISHED) {  			m->state = MBR_JOINED; +			*usr_wakeup = true; +			m->usr_pending = false; +			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);  			__skb_queue_tail(inputq, m->event_msg);  		} +		if (m->window < ADV_IDLE) +			tipc_group_update_member(m, 0); +		else +			list_del_init(&m->congested);  		return;  	case GRP_LEAVE_MSG:  		if (!m) @@ -361,14 +476,28 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,  		}  		/* Otherwise deliver already received WITHDRAW event */  		__skb_queue_tail(inputq, m->event_msg); +		*usr_wakeup = m->usr_pending;  		tipc_group_delete_member(grp, m); +		list_del_init(&m->congested); +		return; +	case GRP_ADV_MSG: +		if (!m) +			return; +		m->window += msg_adv_win(hdr); +		*usr_wakeup = m->usr_pending; +		m->usr_pending = false; +		list_del_init(&m->congested);  		return;  	default:  		pr_warn("Received unknown GROUP_PROTO message\n");  	}  } +/* tipc_group_member_evt() - receive and handle a member up/down event + */  void tipc_group_member_evt(struct tipc_group *grp, +			   bool *usr_wakeup, +			   int *sk_rcvbuf,  			   struct sk_buff *skb,  			   struct sk_buff_head *inputq,  			   struct sk_buff_head *xmitq) @@ -416,16 +545,25 @@ void tipc_group_member_evt(struct tipc_group *grp,  		} else {  			__skb_queue_tail(inputq, skb);  			m->state = MBR_JOINED; +			*usr_wakeup = true; +			m->usr_pending = false;  		}  		m->instance = instance;  		TIPC_SKB_CB(skb)->orig_member = m->instance;  		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); +		if (m->window < ADV_IDLE) +			tipc_group_update_member(m, 0); +		else +			list_del_init(&m->congested);  	} else if (event == TIPC_WITHDRAWN) {  		if (!m)  			goto drop;  		TIPC_SKB_CB(skb)->orig_member = m->instance; +		*usr_wakeup = m->usr_pending; +		m->usr_pending = false; +  		/* Hold back event if more messages might be expected */  		if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {  			m->event_msg = skb; @@ -434,7 +572,9 @@ void tipc_group_member_evt(struct tipc_group *grp,  			__skb_queue_tail(inputq, skb);  			tipc_group_delete_member(grp, m);  		} +		list_del_init(&m->congested);  	} +	*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);  	return;  drop:  	kfree_skb(skb); diff --git a/net/tipc/group.h b/net/tipc/group.h index 5d3f10d28967..0e2740e1da90 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -52,15 +52,18 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,  void tipc_group_filter_msg(struct tipc_group *grp,  			   struct sk_buff_head *inputq,  			   struct sk_buff_head *xmitq); -void tipc_group_member_evt(struct tipc_group *grp, -			   struct sk_buff *skb, +void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup, +			   int *sk_rcvbuf, struct sk_buff *skb,  			   struct sk_buff_head *inputq,  			   struct sk_buff_head *xmitq); -void tipc_group_proto_rcv(struct tipc_group *grp, +void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,  			  struct tipc_msg *hdr,  			  struct sk_buff_head *inputq,  			  struct sk_buff_head *xmitq); -void tipc_group_update_bc_members(struct tipc_group *grp); +void tipc_group_update_bc_members(struct tipc_group *grp, int len); +bool tipc_group_bc_cong(struct tipc_group *grp, int len); +void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, +			       u32 port, struct sk_buff_head *xmitq);  u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);  int tipc_group_size(struct tipc_group *grp);  #endif diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 1b527b154e46..237d007499f9 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -538,6 +538,7 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)   */  #define GRP_JOIN_MSG         0  #define GRP_LEAVE_MSG        1 +#define GRP_ADV_MSG          2  /*   * Word 1 @@ -790,12 +791,12 @@ static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)  	msg_set_bits(m, 9, 16, 0xffff, n);  } -static inline u32 msg_adv_win(struct tipc_msg *m) +static inline u16 msg_adv_win(struct tipc_msg *m)  {  	return msg_bits(m, 9, 0, 0xffff);  } -static inline void msg_set_adv_win(struct tipc_msg *m, u32 n) +static inline void msg_set_adv_win(struct tipc_msg *m, u16 n)  {  	msg_set_bits(m, 9, 0, 0xffff, n);  } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 0a2eac309177..50145c95ac96 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -201,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk)  	return tsk->snt_unacked > tsk->snd_win;  } +static u16 tsk_blocks(int len) +{ +	return ((len / FLOWCTL_BLK_SZ) + 1); +} +  /* tsk_blocks(): translate a buffer size in bytes to number of   * advertisable blocks, taking into account the ratio truesize(len)/len   * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ @@ -831,6 +836,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,  	struct tipc_group *grp = tsk->group;  	struct tipc_nlist *dsts = tipc_group_dests(grp);  	struct tipc_mc_method *method = &tsk->mc_method; +	int blks = tsk_blocks(MCAST_H_SIZE + dlen);  	struct tipc_msg *hdr = &tsk->phdr;  	int mtu = tipc_bcast_get_mtu(net);  	struct sk_buff_head pkts; @@ -839,14 +845,15 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,  	if (!dsts->local && !dsts->remote)  		return -EHOSTUNREACH; -	/* Block or return if any destination link is congested */ -	rc = tipc_wait_for_cond(sock, &timeout,	!tsk->cong_link_cnt); +	/* Block or return if any destination link or member is congested */ +	rc = tipc_wait_for_cond(sock, &timeout,	!tsk->cong_link_cnt && +				!tipc_group_bc_cong(grp, blks));  	if (unlikely(rc))  		return rc;  	/* Complete message header */  	msg_set_type(hdr, TIPC_GRP_BCAST_MSG); -	msg_set_hdr_sz(hdr, MCAST_H_SIZE); +	msg_set_hdr_sz(hdr, GROUP_H_SIZE);  	msg_set_destport(hdr, 0);  	msg_set_destnode(hdr, 0);  	msg_set_nameinst(hdr, 0); @@ -864,9 +871,8 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,  	if (unlikely(rc))  		return rc; -	/* Update broadcast sequence number */ -	tipc_group_update_bc_members(tsk->group); - +	/* Update broadcast sequence number and send windows */ +	tipc_group_update_bc_members(tsk->group, blks);  	return dlen;  } @@ -1024,7 +1030,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)  	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))  		return -EMSGSIZE; -	if (unlikely(grp)) +	if (unlikely(grp && !dest))  		return tipc_send_group_bcast(sock, m, dlen, timeout);  	if (unlikely(!dest)) { @@ -1420,6 +1426,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,  	bool connected = !tipc_sk_type_connectionless(sk);  	struct tipc_sock *tsk = tipc_sk(sk);  	int rc, err, hlen, dlen, copy; +	struct sk_buff_head xmitq;  	struct tipc_msg *hdr;  	struct sk_buff *skb;  	bool grp_evt; @@ -1436,8 +1443,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,  	}  	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); +	/* Step rcv queue to first msg with data or error; wait if necessary */  	do { -		/* Look at first msg in receive queue; wait if necessary */  		rc = tipc_wait_for_rcvmsg(sock, &timeout);  		if (unlikely(rc))  			goto exit; @@ -1485,12 +1492,21 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,  	if (unlikely(flags & MSG_PEEK))  		goto exit; +	/* Send group flow control advertisement when applicable */ +	if (tsk->group && msg_in_group(hdr) && !grp_evt) { +		skb_queue_head_init(&xmitq); +		tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen), +					  msg_orignode(hdr), msg_origport(hdr), +					  &xmitq); +		tipc_node_distr_xmit(sock_net(sk), &xmitq); +	} +  	tsk_advance_rx_queue(sk);  	if (likely(!connected))  		goto exit; -	/* Send connection flow control ack when applicable */ +	/* Send connection flow control advertisement when applicable */  	tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);  	if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)  		tipc_sk_send_ack(tsk); @@ -1650,6 +1666,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,  	struct tipc_sock *tsk = tipc_sk(sk);  	struct tipc_msg *hdr = buf_msg(skb);  	struct tipc_group *grp = tsk->group; +	bool wakeup = false;  	switch (msg_user(hdr)) {  	case CONN_MANAGER: @@ -1658,19 +1675,23 @@ static void tipc_sk_proto_rcv(struct sock *sk,  	case SOCK_WAKEUP:  		tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);  		tsk->cong_link_cnt--; -		sk->sk_write_space(sk); +		wakeup = true;  		break;  	case GROUP_PROTOCOL: -		tipc_group_proto_rcv(grp, hdr, inputq, xmitq); +		tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);  		break;  	case TOP_SRV: -		tipc_group_member_evt(tsk->group, skb, inputq, xmitq); +		tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf, +				      skb, inputq, xmitq);  		skb = NULL;  		break;  	default:  		break;  	} +	if (wakeup) +		sk->sk_write_space(sk); +  	kfree_skb(skb);  } @@ -1785,6 +1806,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)  	struct tipc_sock *tsk = tipc_sk(sk);  	struct tipc_msg *hdr = buf_msg(skb); +	if (unlikely(msg_in_group(hdr))) +		return sk->sk_rcvbuf; +  	if (unlikely(!msg_connected(hdr)))  		return sk->sk_rcvbuf << msg_importance(hdr);  | 
