diff options
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r-- | net/tipc/bcast.c | 57 |
1 files changed, 35 insertions, 22 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 3eaa931e2e8c..81b1fef1f5e0 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net) tipc_link_reset_all(node); } +void tipc_bclink_input(struct net *net) +{ + struct tipc_net *tn = net_generic(net, tipc_net_id); + + tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq); +} + uint tipc_bclink_get_mtu(void) { return MAX_PKT_DEFAULT_MCAST; @@ -356,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) tipc_node_unlock(n_ptr); } -/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster +/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster * and to identified node local sockets * @net: the applicable net namespace * @list: chain of buffers containing message @@ -371,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) int rc = 0; int bc = 0; struct sk_buff *skb; + struct sk_buff_head arrvq; + struct sk_buff_head inputq; /* Prepare clone of message for local node */ skb = tipc_msg_reassemble(list); @@ -379,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) return -EHOSTUNREACH; } - /* Broadcast to all other nodes */ + /* Broadcast to all nodes */ if (likely(bclink)) { tipc_bclink_lock(net); if (likely(bclink->bcast_nodes.count)) { @@ -399,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) if (unlikely(!bc)) __skb_queue_purge(list); - /* Deliver message clone */ - if (likely(!rc)) - tipc_sk_mcast_rcv(net, skb); - else + if (unlikely(rc)) { kfree_skb(skb); - + return rc; + } + /* Deliver message clone */ + __skb_queue_head_init(&arrvq); + skb_queue_head_init(&inputq); + __skb_queue_tail(&arrvq, skb); + tipc_sk_mcast_rcv(net, &arrvq, &inputq); return rc; } @@ -449,7 +461,7 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) int deferred = 0; int pos = 0; struct sk_buff *iskb; - struct sk_buff_head msgs; + struct sk_buff_head *arrvq, *inputq; /* Screen out unwanted broadcast messages */ if (msg_mc_netid(msg) != tn->net_id) @@ -486,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) /* Handle in-sequence broadcast message */ seqno = msg_seqno(msg); next_in = mod(node->bclink.last_in + 1); + arrvq = &tn->bclink->arrvq; + inputq = &tn->bclink->inputq; if (likely(seqno == next_in)) { receive: @@ -493,21 +507,26 @@ receive: if (likely(msg_isdata(msg))) { tipc_bclink_lock(net); bclink_accept_pkt(node, seqno); + spin_lock_bh(&inputq->lock); + __skb_queue_tail(arrvq, buf); + spin_unlock_bh(&inputq->lock); + node->action_flags |= TIPC_BCAST_MSG_EVT; tipc_bclink_unlock(net); tipc_node_unlock(node); - if (likely(msg_mcast(msg))) - tipc_sk_mcast_rcv(net, buf); - else - kfree_skb(buf); } else if (msg_user(msg) == MSG_BUNDLER) { tipc_bclink_lock(net); bclink_accept_pkt(node, seqno); bcl->stats.recv_bundles++; bcl->stats.recv_bundled += msg_msgcnt(msg); + pos = 0; + while (tipc_msg_extract(buf, &iskb, &pos)) { + spin_lock_bh(&inputq->lock); + __skb_queue_tail(arrvq, iskb); + spin_unlock_bh(&inputq->lock); + } + node->action_flags |= TIPC_BCAST_MSG_EVT; tipc_bclink_unlock(net); tipc_node_unlock(node); - while (tipc_msg_extract(buf, &iskb, &pos)) - tipc_sk_mcast_rcv(net, iskb); } else if (msg_user(msg) == MSG_FRAGMENTER) { tipc_buf_append(&node->bclink.reasm_buf, &buf); if (unlikely(!buf && !node->bclink.reasm_buf)) @@ -523,14 +542,6 @@ receive: } tipc_bclink_unlock(net); tipc_node_unlock(node); - } else if (msg_user(msg) == NAME_DISTRIBUTOR) { - tipc_bclink_lock(net); - bclink_accept_pkt(node, seqno); - tipc_bclink_unlock(net); - tipc_node_unlock(node); - skb_queue_head_init(&msgs); - skb_queue_tail(&msgs, buf); - tipc_named_rcv(net, &msgs); } else { tipc_bclink_lock(net); bclink_accept_pkt(node, seqno); @@ -950,6 +961,8 @@ int tipc_bclink_init(struct net *net) skb_queue_head_init(&bcl->wakeupq); bcl->next_out_no = 1; spin_lock_init(&bclink->node.lock); + __skb_queue_head_init(&bclink->arrvq); + skb_queue_head_init(&bclink->inputq); bcl->owner = &bclink->node; bcl->owner->net = net; bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; |