diff options
Diffstat (limited to 'net/tipc')
-rw-r--r-- | net/tipc/bcast.c | 31 | ||||
-rw-r--r-- | net/tipc/bcast.h | 1 | ||||
-rw-r--r-- | net/tipc/bearer.c | 30 | ||||
-rw-r--r-- | net/tipc/bearer.h | 3 | ||||
-rw-r--r-- | net/tipc/core.h | 10 | ||||
-rw-r--r-- | net/tipc/discover.c | 130 | ||||
-rw-r--r-- | net/tipc/link.c | 2030 | ||||
-rw-r--r-- | net/tipc/link.h | 109 | ||||
-rw-r--r-- | net/tipc/msg.c | 86 | ||||
-rw-r--r-- | net/tipc/msg.h | 112 | ||||
-rw-r--r-- | net/tipc/name_distr.c | 6 | ||||
-rw-r--r-- | net/tipc/netlink_compat.c | 2 | ||||
-rw-r--r-- | net/tipc/node.c | 964 | ||||
-rw-r--r-- | net/tipc/node.h | 84 | ||||
-rw-r--r-- | net/tipc/socket.c | 385 | ||||
-rw-r--r-- | net/tipc/socket.h | 2 | ||||
-rw-r--r-- | net/tipc/udp_media.c | 3 |
17 files changed, 2200 insertions, 1788 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index a816382fc8af..8b010c976b2f 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -316,6 +316,29 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, } } +void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr) +{ + u16 last = msg_last_bcast(hdr); + int mtyp = msg_type(hdr); + + if (unlikely(msg_user(hdr) != LINK_PROTOCOL)) + return; + if (mtyp == STATE_MSG) { + tipc_bclink_update_link_state(n, last); + return; + } + /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization, + * and transfer synch info in LINK_PROTOCOL messages. + */ + if (tipc_node_is_up(n)) + return; + if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG)) + return; + n->bclink.last_sent = last; + n->bclink.last_in = last; + n->bclink.oos_state = 0; +} + /** * bclink_peek_nack - monitor retransmission requests sent by other nodes * @@ -358,10 +381,9 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) /* Prepare clone of message for local node */ skb = tipc_msg_reassemble(list); - if (unlikely(!skb)) { - __skb_queue_purge(list); + if (unlikely(!skb)) return -EHOSTUNREACH; - } + /* Broadcast to all nodes */ if (likely(bclink)) { tipc_bclink_lock(net); @@ -413,7 +435,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) * all nodes in the cluster don't ACK at the same time */ if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) { - tipc_link_proto_xmit(node->active_links[node->addr & 1], + tipc_link_proto_xmit(node_active_link(node, node->addr), STATE_MSG, 0, 0, 0, 0); tn->bcl->stats.sent_acks++; } @@ -925,7 +947,6 @@ int tipc_bclink_init(struct net *net) tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); bcl->bearer_id = MAX_BEARERS; rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); - bcl->state = WORKING_WORKING; bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg; msg_set_prevnode(bcl->pmsg, tn->own_addr); strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 3c290a48f720..d74c69bcf60b 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -133,5 +133,6 @@ void tipc_bclink_wakeup_users(struct net *net); int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); void tipc_bclink_input(struct net *net); +void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg); #endif diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 00bc0e620532..ce9f7bfc0b92 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -343,7 +343,7 @@ restart: static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b_ptr) { pr_info("Resetting bearer <%s>\n", b_ptr->name); - tipc_link_delete_list(net, b_ptr->identity); + tipc_node_delete_links(net, b_ptr->identity); tipc_disc_reset(net, b_ptr); return 0; } @@ -361,7 +361,7 @@ static void bearer_disable(struct net *net, struct tipc_bearer *b_ptr) pr_info("Disabling bearer <%s>\n", b_ptr->name); b_ptr->media->disable_media(b_ptr); - tipc_link_delete_list(net, b_ptr->identity); + tipc_node_delete_links(net, b_ptr->identity); if (b_ptr->link_req) tipc_disc_delete(b_ptr->link_req); @@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, rcu_read_unlock(); } +/* tipc_bearer_xmit() -send buffer to destination over bearer + */ +void tipc_bearer_xmit(struct net *net, u32 bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr *dst) +{ + struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_bearer *b; + struct sk_buff *skb, *tmp; + + if (skb_queue_empty(xmitq)) + return; + + rcu_read_lock(); + b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); + if (likely(b)) { + skb_queue_walk_safe(xmitq, skb, tmp) { + __skb_dequeue(xmitq); + b->media->send_msg(net, skb, b, dst); + /* Until we remove cloning in tipc_l2_send_msg(): */ + kfree_skb(skb); + } + } + rcu_read_unlock(); +} + /** * tipc_l2_rcv_msg - handle incoming TIPC message from an interface * @buf: the received packet diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index dc714d977768..6426f242f626 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void); void tipc_bearer_stop(struct net *net); void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, struct tipc_media_addr *dest); +void tipc_bearer_xmit(struct net *net, u32 bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr *dst); #endif /* _TIPC_BEARER_H */ diff --git a/net/tipc/core.h b/net/tipc/core.h index 0fcf133d5cb7..b96b41eabf12 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -109,6 +109,11 @@ struct tipc_net { atomic_t subscription_count; }; +static inline struct tipc_net *tipc_net(struct net *net) +{ + return net_generic(net, tipc_net_id); +} + static inline u16 mod(u16 x) { return x & 0xffffu; @@ -129,6 +134,11 @@ static inline int less(u16 left, u16 right) return less_eq(left, right) && (mod(right) != mod(left)); } +static inline int in_range(u16 val, u16 min, u16 max) +{ + return !less(val, min) && !more(val, max); +} + #ifdef CONFIG_SYSCTL int tipc_register_sysctl(void); void tipc_unregister_sysctl(void); diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 967e292f53c8..d14e0a4aa9af 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -35,7 +35,7 @@ */ #include "core.h" -#include "link.h" +#include "node.h" #include "discover.h" /* min delay during bearer start up */ @@ -120,30 +120,24 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr, * @buf: buffer containing message * @bearer: bearer that message arrived on */ -void tipc_disc_rcv(struct net *net, struct sk_buff *buf, +void tipc_disc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *bearer) { struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_node *node; - struct tipc_link *link; struct tipc_media_addr maddr; - struct sk_buff *rbuf; - struct tipc_msg *msg = buf_msg(buf); - u32 ddom = msg_dest_domain(msg); - u32 onode = msg_prevnode(msg); - u32 net_id = msg_bc_netid(msg); - u32 mtyp = msg_type(msg); - u32 signature = msg_node_sig(msg); - u16 caps = msg_node_capabilities(msg); - bool addr_match = false; - bool sign_match = false; - bool link_up = false; - bool accept_addr = false; - bool accept_sign = false; + struct sk_buff *rskb; + struct tipc_msg *hdr = buf_msg(skb); + u32 ddom = msg_dest_domain(hdr); + u32 onode = msg_prevnode(hdr); + u32 net_id = msg_bc_netid(hdr); + u32 mtyp = msg_type(hdr); + u32 signature = msg_node_sig(hdr); + u16 caps = msg_node_capabilities(hdr); bool respond = false; + bool dupl_addr = false; - bearer->media->msg2addr(bearer, &maddr, msg_media_addr(msg)); - kfree_skb(buf); + bearer->media->msg2addr(bearer, &maddr, msg_media_addr(hdr)); + kfree_skb(skb); /* Ensure message from node is valid and communication is permitted */ if (net_id != tn->net_id) @@ -165,102 +159,20 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf, if (!tipc_in_scope(bearer->domain, onode)) return; - node = tipc_node_create(net, onode); - if (!node) - return; - tipc_node_lock(node); - node->capabilities = caps; - link = node->links[bearer->identity]; - - /* Prepare to validate requesting node's signature and media address */ - sign_match = (signature == node->signature); - addr_match = link && !memcmp(&link->media_addr, &maddr, sizeof(maddr)); - link_up = link && tipc_link_is_up(link); - - - /* These three flags give us eight permutations: */ - - if (sign_match && addr_match && link_up) { - /* All is fine. Do nothing. */ - } else if (sign_match && addr_match && !link_up) { - /* Respond. The link will come up in due time */ - respond = true; - } else if (sign_match && !addr_match && link_up) { - /* Peer has changed i/f address without rebooting. - * If so, the link will reset soon, and the next - * discovery will be accepted. So we can ignore it. - * It may also be an cloned or malicious peer having - * chosen the same node address and signature as an - * existing one. - * Ignore requests until the link goes down, if ever. - */ - disc_dupl_alert(bearer, onode, &maddr); - } else if (sign_match && !addr_match && !link_up) { - /* Peer link has changed i/f address without rebooting. - * It may also be a cloned or malicious peer; we can't - * distinguish between the two. - * The signature is correct, so we must accept. - */ - accept_addr = true; - respond = true; - } else if (!sign_match && addr_match && link_up) { - /* Peer node rebooted. Two possibilities: - * - Delayed re-discovery; this link endpoint has already - * reset and re-established contact with the peer, before - * receiving a discovery message from that node. - * (The peer happened to receive one from this node first). - * - The peer came back so fast that our side has not - * discovered it yet. Probing from this side will soon - * reset the link, since there can be no working link - * endpoint at the peer end, and the link will re-establish. - * Accept the signature, since it comes from a known peer. - */ - accept_sign = true; - } else if (!sign_match && addr_match && !link_up) { - /* The peer node has rebooted. - * Accept signature, since it is a known peer. - */ - accept_sign = true; - respond = true; - } else if (!sign_match && !addr_match && link_up) { - /* Peer rebooted with new address, or a new/duplicate peer. - * Ignore until the link goes down, if ever. - */ + tipc_node_check_dest(net, onode, bearer, caps, signature, + &maddr, &respond, &dupl_addr); + if (dupl_addr) disc_dupl_alert(bearer, onode, &maddr); - } else if (!sign_match && !addr_match && !link_up) { - /* Peer rebooted with new address, or it is a new peer. - * Accept signature and address. - */ - accept_sign = true; - accept_addr = true; - respond = true; - } - - if (accept_sign) - node->signature = signature; - - if (accept_addr) { - if (!link) - link = tipc_link_create(node, bearer, &maddr); - if (link) { - memcpy(&link->media_addr, &maddr, sizeof(maddr)); - tipc_link_reset(link); - } else { - respond = false; - } - } /* Send response, if necessary */ if (respond && (mtyp == DSC_REQ_MSG)) { - rbuf = tipc_buf_acquire(MAX_H_SIZE); - if (rbuf) { - tipc_disc_init_msg(net, rbuf, DSC_RESP_MSG, bearer); - tipc_bearer_send(net, bearer->identity, rbuf, &maddr); - kfree_skb(rbuf); + rskb = tipc_buf_acquire(MAX_H_SIZE); + if (rskb) { + tipc_disc_init_msg(net, rskb, DSC_RESP_MSG, bearer); + tipc_bearer_send(net, bearer->identity, rskb, &maddr); + kfree_skb(rskb); } } - tipc_node_unlock(node); - tipc_node_put(node); } /** diff --git a/net/tipc/link.c b/net/tipc/link.c index eaa9fe54b4ae..f067e5425560 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -48,9 +48,8 @@ /* * Error message prefixes */ -static const char *link_co_err = "Link changeover error, "; +static const char *link_co_err = "Link tunneling error, "; static const char *link_rst_msg = "Resetting link "; -static const char *link_unk_evt = "Unknown link event "; static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = { [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC }, @@ -77,256 +76,413 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = { }; /* + * Interval between NACKs when packets arrive out of order + */ +#define TIPC_NACK_INTV (TIPC_MIN_LINK_WIN * 2) +/* * Out-of-range value for link session numbers */ -#define INVALID_SESSION 0x10000 +#define WILDCARD_SESSION 0x10000 -/* - * Link state events: +/* Link FSM states: */ -#define STARTING_EVT 856384768 /* link processing trigger */ -#define TRAFFIC_MSG_EVT 560815u /* rx'd ??? */ -#define SILENCE_EVT 560817u /* timer dicovered silence from peer */ +enum { + LINK_ESTABLISHED = 0xe, + LINK_ESTABLISHING = 0xe << 4, + LINK_RESET = 0x1 << 8, + LINK_RESETTING = 0x2 << 12, + LINK_PEER_RESET = 0xd << 16, + LINK_FAILINGOVER = 0xf << 20, + LINK_SYNCHING = 0xc << 24 +}; -/* - * State value stored in 'failover_pkts' +/* Link FSM state checking routines */ -#define FIRST_FAILOVER 0xffffu - -static void link_handle_out_of_seq_msg(struct tipc_link *link, - struct sk_buff *skb); -static void tipc_link_proto_rcv(struct tipc_link *link, - struct sk_buff *skb); -static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol); -static void link_state_event(struct tipc_link *l_ptr, u32 event); +static int link_is_up(struct tipc_link *l) +{ + return l->state & (LINK_ESTABLISHED | LINK_SYNCHING); +} + +static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq); +static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, + u16 rcvgap, int tolerance, int priority, + struct sk_buff_head *xmitq); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static void tipc_link_sync_xmit(struct tipc_link *l); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); -static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb); -static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); -static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb); -static void link_set_timer(struct tipc_link *link, unsigned long time); + /* - * Simple link routines + * Simple non-static link routines (i.e. referenced outside this file) */ -static unsigned int align(unsigned int i) +bool tipc_link_is_up(struct tipc_link *l) { - return (i + 3) & ~3u; + return link_is_up(l); } -static void tipc_link_release(struct kref *kref) +bool tipc_link_is_reset(struct tipc_link *l) { - kfree(container_of(kref, struct tipc_link, ref)); + return l->state & (LINK_RESET | LINK_FAILINGOVER | LINK_ESTABLISHING); } -static void tipc_link_get(struct tipc_link *l_ptr) +bool tipc_link_is_synching(struct tipc_link *l) { - kref_get(&l_ptr->ref); + return l->state == LINK_SYNCHING; } -static void tipc_link_put(struct tipc_link *l_ptr) +bool tipc_link_is_failingover(struct tipc_link *l) { - kref_put(&l_ptr->ref, tipc_link_release); + return l->state == LINK_FAILINGOVER; } -static struct tipc_link *tipc_parallel_link(struct tipc_link *l) +bool tipc_link_is_blocked(struct tipc_link *l) { - if (l->owner->active_links[0] != l) - return l->owner->active_links[0]; - return l->owner->active_links[1]; + return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); } -/* - * Simple non-static link routines (i.e. referenced outside this file) - */ -int tipc_link_is_up(struct tipc_link *l_ptr) +int tipc_link_is_active(struct tipc_link *l) { - if (!l_ptr) - return 0; - return link_working_working(l_ptr) || link_working_unknown(l_ptr); + struct tipc_node *n = l->owner; + + return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); } -int tipc_link_is_active(struct tipc_link *l_ptr) +static u32 link_own_addr(struct tipc_link *l) { - return (l_ptr->owner->active_links[0] == l_ptr) || - (l_ptr->owner->active_links[1] == l_ptr); + return msg_prevnode(l->pmsg); } /** - * link_timeout - handle expiration of link timer - * @l_ptr: pointer to link + * tipc_link_create - create a new link + * @n: pointer to associated node + * @b: pointer to associated bearer + * @ownnode: identity of own node + * @peer: identity of peer node + * @maddr: media address to be used + * @inputq: queue to put messages ready for delivery + * @namedq: queue to put binding table update messages ready for delivery + * @link: return value, pointer to put the created link + * + * Returns true if link was created, otherwise false */ -static void link_timeout(unsigned long data) +bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, + u32 ownnode, u32 peer, struct tipc_media_addr *maddr, + struct sk_buff_head *inputq, struct sk_buff_head *namedq, + struct tipc_link **link) { - struct tipc_link *l_ptr = (struct tipc_link *)data; - struct sk_buff *skb; + struct tipc_link *l; + struct tipc_msg *hdr; + char *if_name; + + l = kzalloc(sizeof(*l), GFP_ATOMIC); + if (!l) + return false; + *link = l; + + /* Note: peer i/f name is completed by reset/activate message */ + if_name = strchr(b->name, ':') + 1; + sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown", + tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode), + if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); + + l->addr = peer; + l->media_addr = maddr; + l->owner = n; + l->peer_session = WILDCARD_SESSION; + l->bearer_id = b->identity; + l->tolerance = b->tolerance; + l->net_plane = b->net_plane; + l->advertised_mtu = b->mtu; + l->mtu = b->mtu; + l->priority = b->priority; + tipc_link_set_queue_limits(l, b->window); + l->inputq = inputq; + l->namedq = namedq; + l->state = LINK_RESETTING; + l->pmsg = (struct tipc_msg *)&l->proto_msg; + hdr = l->pmsg; + tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer); + msg_set_size(hdr, sizeof(l->proto_msg)); + msg_set_session(hdr, session); + msg_set_bearer_id(hdr, l->bearer_id); + strcpy((char *)msg_data(hdr), if_name); + __skb_queue_head_init(&l->transmq); + __skb_queue_head_init(&l->backlogq); + __skb_queue_head_init(&l->deferdq); + skb_queue_head_init(&l->wakeupq); + skb_queue_head_init(l->inputq); + return true; +} - tipc_node_lock(l_ptr->owner); +/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. + * + * Give a newly added peer node the sequence number where it should + * start receiving and acking broadcast packets. + */ +void tipc_link_build_bcast_sync_msg(struct tipc_link *l, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb; + struct sk_buff_head list; + u16 last_sent; - /* update counters used in statistical profiling of send traffic */ - l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq); - l_ptr->stats.queue_sz_counts++; + skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, + 0, l->addr, link_own_addr(l), 0, 0, 0); + if (!skb) + return; + last_sent = tipc_bclink_get_last_sent(l->owner->net); + msg_set_last_bcast(buf_msg(skb), last_sent); + __skb_queue_head_init(&list); + __skb_queue_tail(&list, skb); + tipc_link_xmit(l, &list, xmitq); +} - skb = skb_peek(&l_ptr->transmq); - if (skb) { - struct tipc_msg *msg = buf_msg(skb); - u32 length = msg_size(msg); +/** + * tipc_link_fsm_evt - link finite state machine + * @l: pointer to link + * @evt: state machine event to be processed + */ +int tipc_link_fsm_evt(struct tipc_link *l, int evt) +{ + int rc = 0; - if ((msg_user(msg) == MSG_FRAGMENTER) && - (msg_type(msg) == FIRST_FRAGMENT)) { - length = msg_size(msg_get_wrapped(msg)); + switch (l->state) { + case LINK_RESETTING: + switch (evt) { + case LINK_PEER_RESET_EVT: + l->state = LINK_PEER_RESET; + break; + case LINK_RESET_EVT: + l->state = LINK_RESET; + break; + case LINK_FAILURE_EVT: + case LINK_FAILOVER_BEGIN_EVT: + case LINK_ESTABLISH_EVT: + case LINK_FAILOVER_END_EVT: + case LINK_SYNCH_BEGIN_EVT: + case LINK_SYNCH_END_EVT: + default: + goto illegal_evt; } - if (length) { - l_ptr->stats.msg_lengths_total += length; - l_ptr->stats.msg_length_counts++; - if (length <= 64) - l_ptr->stats.msg_length_profile[0]++; - else if (length <= 256) - l_ptr->stats.msg_length_profile[1]++; - else if (length <= 1024) - l_ptr->stats.msg_length_profile[2]++; - else if (length <= 4096) - l_ptr->stats.msg_length_profile[3]++; - else if (length <= 16384) - l_ptr->stats.msg_length_profile[4]++; - else if (length <= 32768) - l_ptr->stats.msg_length_profile[5]++; - else - l_ptr->stats.msg_length_profile[6]++; + break; + case LINK_RESET: + switch (evt) { + case LINK_PEER_RESET_EVT: + l->state = LINK_ESTABLISHING; + break; + case LINK_FAILOVER_BEGIN_EVT: + l->state = LINK_FAILINGOVER; + case LINK_FAILURE_EVT: + case LINK_RESET_EVT: + case LINK_ESTABLISH_EVT: + case LINK_FAILOVER_END_EVT: + break; + case LINK_SYNCH_BEGIN_EVT: + case LINK_SYNCH_END_EVT: + default: + goto illegal_evt; + } + break; + case LINK_PEER_RESET: + switch (evt) { + case LINK_RESET_EVT: + l->state = LINK_ESTABLISHING; + break; + case LINK_PEER_RESET_EVT: + case LINK_ESTABLISH_EVT: + case LINK_FAILURE_EVT: + break; + case LINK_SYNCH_BEGIN_EVT: + case LINK_SYNCH_END_EVT: + case LINK_FAILOVER_BEGIN_EVT: + case LINK_FAILOVER_END_EVT: + default: + goto illegal_evt; } + break; + case LINK_FAILINGOVER: + switch (evt) { + case LINK_FAILOVER_END_EVT: + l->state = LINK_RESET; + break; + case LINK_PEER_RESET_EVT: + case LINK_RESET_EVT: + case LINK_ESTABLISH_EVT: + case LINK_FAILURE_EVT: + break; + case LINK_FAILOVER_BEGIN_EVT: + case LINK_SYNCH_BEGIN_EVT: + case LINK_SYNCH_END_EVT: + default: + goto illegal_evt; + } + break; + case LINK_ESTABLISHING: + switch (evt) { + case LINK_ESTABLISH_EVT: + l->state = LINK_ESTABLISHED; + rc |= TIPC_LINK_UP_EVT; + break; + case LINK_FAILOVER_BEGIN_EVT: + l->state = LINK_FAILINGOVER; + break; + case LINK_PEER_RESET_EVT: + case LINK_RESET_EVT: + case LINK_FAILURE_EVT: + case LINK_SYNCH_BEGIN_EVT: + case LINK_FAILOVER_END_EVT: + break; + case LINK_SYNCH_END_EVT: + default: + goto illegal_evt; + } + break; + case LINK_ESTABLISHED: + switch (evt) { + case LINK_PEER_RESET_EVT: + l->state = LINK_PEER_RESET; + rc |= TIPC_LINK_DOWN_EVT; + break; + case LINK_FAILURE_EVT: + l->state = LINK_RESETTING; + rc |= TIPC_LINK_DOWN_EVT; + break; + case LINK_RESET_EVT: + l->state = LINK_RESET; + break; + case LINK_ESTABLISH_EVT: + break; + case LINK_SYNCH_BEGIN_EVT: + l->state = LINK_SYNCHING; + break; + case LINK_SYNCH_END_EVT: + case LINK_FAILOVER_BEGIN_EVT: + case LINK_FAILOVER_END_EVT: + default: + goto illegal_evt; + } + break; + case LINK_SYNCHING: + switch (evt) { + case LINK_PEER_RESET_EVT: + l->state = LINK_PEER_RESET; + rc |= TIPC_LINK_DOWN_EVT; + break; + case LINK_FAILURE_EVT: + l->state = LINK_RESETTING; + rc |= TIPC_LINK_DOWN_EVT; + break; + case LINK_RESET_EVT: + l->state = LINK_RESET; + break; + case LINK_ESTABLISH_EVT: + case LINK_SYNCH_BEGIN_EVT: + break; + case LINK_SYNCH_END_EVT: + l->state = LINK_ESTABLISHED; + break; + case LINK_FAILOVER_BEGIN_EVT: + case LINK_FAILOVER_END_EVT: + default: + goto illegal_evt; + } + break; + default: + pr_err("Unknown FSM state %x in %s\n", l->state, l->name); } - - /* do all other link processing performed on a periodic basis */ - if (l_ptr->silent_intv_cnt || tipc_bclink_acks_missing(l_ptr->owner)) - link_state_event(l_ptr, SILENCE_EVT); - l_ptr->silent_intv_cnt++; - if (skb_queue_len(&l_ptr->backlogq)) - tipc_link_push_packets(l_ptr); - link_set_timer(l_ptr, l_ptr->keepalive_intv); - tipc_node_unlock(l_ptr->owner); - tipc_link_put(l_ptr); -} - -static void link_set_timer(struct tipc_link *link, unsigned long time) -{ - if (!mod_timer(&link->timer, jiffies + time)) - tipc_link_get(link); + return rc; +illegal_evt: + pr_err("Illegal FSM event %x in state %x on link %s\n", + evt, l->state, l->name); + return rc; } -/** - * tipc_link_create - create a new link - * @n_ptr: pointer to associated node - * @b_ptr: pointer to associated bearer - * @media_addr: media address to use when sending messages over link - * - * Returns pointer to link. +/* link_profile_stats - update statistical profiling of traffic */ -struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, - struct tipc_bearer *b_ptr, - const struct tipc_media_addr *media_addr) +static void link_profile_stats(struct tipc_link *l) { - struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); - struct tipc_link *l_ptr; + struct sk_buff *skb; struct tipc_msg *msg; - char *if_name; - char addr_string[16]; - u32 peer = n_ptr->addr; + int length; - if (n_ptr->link_cnt >= MAX_BEARERS) { - tipc_addr_string_fill(addr_string, n_ptr->addr); - pr_err("Cannot establish %uth link to %s. Max %u allowed.\n", - n_ptr->link_cnt, addr_string, MAX_BEARERS); - return NULL; - } + /* Update counters used in statistical profiling of send traffic */ + l->stats.accu_queue_sz += skb_queue_len(&l->transmq); + l->stats.queue_sz_counts++; - if (n_ptr->links[b_ptr->identity]) { - tipc_addr_string_fill(addr_string, n_ptr->addr); - pr_err("Attempt to establish second link on <%s> to %s\n", - b_ptr->name, addr_string); - return NULL; - } + skb = skb_peek(&l->transmq); + if (!skb) + return; + msg = buf_msg(skb); + length = msg_size(msg); - l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC); - if (!l_ptr) { - pr_warn("Link creation failed, no memory\n"); - return NULL; + if (msg_user(msg) == MSG_FRAGMENTER) { + if (msg_type(msg) != FIRST_FRAGMENT) + return; + length = msg_size(msg_get_wrapped(msg)); } - kref_init(&l_ptr->ref); - l_ptr->addr = peer; - if_name = strchr(b_ptr->name, ':') + 1; - sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown", - tipc_zone(tn->own_addr), tipc_cluster(tn->own_addr), - tipc_node(tn->own_addr), - if_name, - tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); - /* note: peer i/f name is updated by reset/activate message */ - memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); - l_ptr->owner = n_ptr; - l_ptr->peer_session = INVALID_SESSION; - l_ptr->bearer_id = b_ptr->identity; - link_set_supervision_props(l_ptr, b_ptr->tolerance); - l_ptr->state = RESET_UNKNOWN; - - l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg; - msg = l_ptr->pmsg; - tipc_msg_init(tn->own_addr, msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, - l_ptr->addr); - msg_set_size(msg, sizeof(l_ptr->proto_msg)); - msg_set_session(msg, (tn->random & 0xffff)); - msg_set_bearer_id(msg, b_ptr->identity); - strcpy((char *)msg_data(msg), if_name); - l_ptr->net_plane = b_ptr->net_plane; - l_ptr->advertised_mtu = b_ptr->mtu; - l_ptr->mtu = l_ptr->advertised_mtu; - l_ptr->priority = b_ptr->priority; - tipc_link_set_queue_limits(l_ptr, b_ptr->window); - l_ptr->snd_nxt = 1; - __skb_queue_head_init(&l_ptr->transmq); - __skb_queue_head_init(&l_ptr->backlogq); - __skb_queue_head_init(&l_ptr->deferdq); - skb_queue_head_init(&l_ptr->wakeupq); - skb_queue_head_init(&l_ptr->inputq); - skb_queue_head_init(&l_ptr->namedq); - link_reset_statistics(l_ptr); - tipc_node_attach_link(n_ptr, l_ptr); - setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr); - link_state_event(l_ptr, STARTING_EVT); - - return l_ptr; + l->stats.msg_lengths_total += length; + l->stats.msg_length_counts++; + if (length <= 64) + l->stats.msg_length_profile[0]++; + else if (length <= 256) + l->stats.msg_length_profile[1]++; + else if (length <= 1024) + l->stats.msg_length_profile[2]++; + else if (length <= 4096) + l->stats.msg_length_profile[3]++; + else if (length <= 16384) + l->stats.msg_length_profile[4]++; + else if (length <= 32768) + l->stats.msg_length_profile[5]++; + else + l->stats.msg_length_profile[6]++; } -/** - * tipc_link_delete - Delete a link - * @l: link to be deleted +/* tipc_link_timeout - perform periodic task as instructed from node timeout */ -void tipc_link_delete(struct tipc_link *l) +int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) { - tipc_link_reset(l); - if (del_timer(&l->timer)) - tipc_link_put(l); - l->flags |= LINK_STOPPED; - /* Delete link now, or when timer is finished: */ - tipc_link_reset_fragments(l); - tipc_node_detach_link(l->owner, l); - tipc_link_put(l); -} + int rc = 0; + int mtyp = STATE_MSG; + bool xmit = false; + bool prb = false; + + link_profile_stats(l); + + switch (l->state) { + case LINK_ESTABLISHED: + case LINK_SYNCHING: + if (!l->silent_intv_cnt) { + if (tipc_bclink_acks_missing(l->owner)) + xmit = true; + } else if (l->silent_intv_cnt <= l->abort_limit) { + xmit = true; + prb = true; + } else { + rc |= tipc_link_fsm_evt(l, LINK_FAILURE_EVT); + } + l->silent_intv_cnt++; + break; + case LINK_RESET: + xmit = true; + mtyp = RESET_MSG; + break; + case LINK_ESTABLISHING: + xmit = true; + mtyp = ACTIVATE_MSG; + break; + case LINK_PEER_RESET: + case LINK_RESETTING: + case LINK_FAILINGOVER: + break; + default: + break; + } -void tipc_link_delete_list(struct net *net, unsigned int bearer_id) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *link; - struct tipc_node *node; + if (xmit) + tipc_link_build_proto_msg(l, mtyp, prb, 0, 0, 0, xmitq); - rcu_read_lock(); - list_for_each_entry_rcu(node, &tn->node_list, list) { - tipc_node_lock(node); - link = node->links[bearer_id]; - if (link) - tipc_link_delete(link); - tipc_node_unlock(node); - } - rcu_read_unlock(); + return rc; } /** @@ -334,7 +490,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id) * @link: congested link * @list: message that was attempted sent * Create pseudo msg to send back to user when congestion abates - * Only consumes message if there is an error + * Does not consume buffer list */ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) { @@ -347,8 +503,7 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) /* This really cannot happen... */ if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); - tipc_link_reset(link); - goto err; + return -ENOBUFS; } /* Non-blocking sender: */ if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending) @@ -358,15 +513,12 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, addr, addr, oport, 0, 0); if (!skb) - goto err; + return -ENOBUFS; TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list); TIPC_SKB_CB(skb)->chain_imp = imp; skb_queue_tail(&link->wakeupq, skb); link->stats.link_congs++; return -ELINKCONG; -err: - __skb_queue_purge(list); - return -ENOBUFS; } /** @@ -388,9 +540,7 @@ void link_prepare_wakeup(struct tipc_link *l) if ((pnd[imp] + l->backlog[imp].len) >= lim) break; skb_unlink(skb, &l->wakeupq); - skb_queue_tail(&l->inputq, skb); - l->owner->inputq = &l->inputq; - l->owner->action_flags |= TIPC_MSG_EVT; + skb_queue_tail(l->inputq, skb); } } @@ -426,208 +576,36 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr) tipc_link_reset_fragments(l_ptr); } -void tipc_link_reset(struct tipc_link *l_ptr) +void tipc_link_reset(struct tipc_link *l) { - u32 prev_state = l_ptr->state; - int was_active_link = tipc_link_is_active(l_ptr); - struct tipc_node *owner = l_ptr->owner; - struct tipc_link *pl = tipc_parallel_link(l_ptr); - - msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); + tipc_link_fsm_evt(l, LINK_RESET_EVT); /* Link is down, accept any session */ - l_ptr->peer_session = INVALID_SESSION; - - /* Prepare for renewed mtu size negotiation */ - l_ptr->mtu = l_ptr->advertised_mtu; - - l_ptr->state = RESET_UNKNOWN; + l->peer_session = WILDCARD_SESSION; - if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET)) - return; - - tipc_node_link_down(l_ptr->owner, l_ptr); - tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr); + /* If peer is up, it only accepts an incremented session number */ + msg_set_session(l->pmsg, msg_session(l->pmsg) + 1); - if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) { - l_ptr->flags |= LINK_FAILINGOVER; - l_ptr->failover_checkpt = l_ptr->rcv_nxt; - pl->failover_pkts = FIRST_FAILOVER; - pl->failover_checkpt = l_ptr->rcv_nxt; - pl->failover_skb = l_ptr->reasm_buf; - } else { - kfree_skb(l_ptr->reasm_buf); - } - /* Clean up all queues, except inputq: */ - __skb_queue_purge(&l_ptr->transmq); - __skb_queue_purge(&l_ptr->deferdq); - if (!owner->inputq) - owner->inputq = &l_ptr->inputq; - skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); - if (!skb_queue_empty(owner->inputq)) - owner->action_flags |= TIPC_MSG_EVT; - tipc_link_purge_backlog(l_ptr); - l_ptr->reasm_buf = NULL; - l_ptr->rcv_unacked = 0; - l_ptr->snd_nxt = 1; - l_ptr->silent_intv_cnt = 0; - l_ptr->stale_count = 0; - link_reset_statistics(l_ptr); -} - -static void link_activate(struct tipc_link *link) -{ - struct tipc_node *node = link->owner; - - link->rcv_nxt = 1; - link->stats.recv_info = 1; - link->silent_intv_cnt = 0; - tipc_node_link_up(node, link); - tipc_bearer_add_dest(node->net, link->bearer_id, link->addr); -} - -/** - * link_state_event - link finite state machine - * @l_ptr: pointer to link - * @event: state machine event to process - */ -static void link_state_event(struct tipc_link *l_ptr, unsigned int event) -{ - struct tipc_link *other; - unsigned long timer_intv = l_ptr->keepalive_intv; - - if (l_ptr->flags & LINK_STOPPED) - return; - - if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT)) - return; /* Not yet. */ - - if (l_ptr->flags & LINK_FAILINGOVER) - return; - - switch (l_ptr->state) { - case WORKING_WORKING: - switch (event) { - case TRAFFIC_MSG_EVT: - case ACTIVATE_MSG: - l_ptr->silent_intv_cnt = 0; - break; - case SILENCE_EVT: - if (!l_ptr->silent_intv_cnt) { - if (tipc_bclink_acks_missing(l_ptr->owner)) - tipc_link_proto_xmit(l_ptr, STATE_MSG, - 0, 0, 0, 0); - break; - } - l_ptr->state = WORKING_UNKNOWN; - tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0); - break; - case RESET_MSG: - pr_debug("%s<%s>, requested by peer\n", - link_rst_msg, l_ptr->name); - tipc_link_reset(l_ptr); - l_ptr->state = RESET_RESET; - tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, - 0, 0, 0, 0); - break; - default: - pr_debug("%s%u in WW state\n", link_unk_evt, event); - } - break; - case WORKING_UNKNOWN: - switch (event) { - case TRAFFIC_MSG_EVT: - case ACTIVATE_MSG: - l_ptr->state = WORKING_WORKING; - l_ptr->silent_intv_cnt = 0; - break; - case RESET_MSG: - pr_debug("%s<%s>, requested by peer while probing\n", - link_rst_msg, l_ptr->name); - tipc_link_reset(l_ptr); - l_ptr->state = RESET_RESET; - tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, - 0, 0, 0, 0); - break; - case SILENCE_EVT: - if (!l_ptr->silent_intv_cnt) { - l_ptr->state = WORKING_WORKING; - if (tipc_bclink_acks_missing(l_ptr->owner)) - tipc_link_proto_xmit(l_ptr, STATE_MSG, - 0, 0, 0, 0); - } else if (l_ptr->silent_intv_cnt < - l_ptr->abort_limit) { - tipc_link_proto_xmit(l_ptr, STATE_MSG, - 1, 0, 0, 0); - } else { /* Link has failed */ - pr_debug("%s<%s>, peer not responding\n", - link_rst_msg, l_ptr->name); - tipc_link_reset(l_ptr); - l_ptr->state = RESET_UNKNOWN; - tipc_link_proto_xmit(l_ptr, RESET_MSG, - 0, 0, 0, 0); - } - break; - default: - pr_err("%s%u in WU state\n", link_unk_evt, event); - } - break; - case RESET_UNKNOWN: - switch (event) { - case TRAFFIC_MSG_EVT: - break; - case ACTIVATE_MSG: - other = l_ptr->owner->active_links[0]; - if (other && link_working_unknown(other)) - break; - l_ptr->state = WORKING_WORKING; - link_activate(l_ptr); - tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0); - if (l_ptr->owner->working_links == 1) - tipc_link_sync_xmit(l_ptr); - break; - case RESET_MSG: - l_ptr->state = RESET_RESET; - tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, - 1, 0, 0, 0); - break; - case STARTING_EVT: - l_ptr->flags |= LINK_STARTED; - link_set_timer(l_ptr, timer_intv); - break; - case SILENCE_EVT: - tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0); - break; - default: - pr_err("%s%u in RU state\n", link_unk_evt, event); - } - break; - case RESET_RESET: - switch (event) { - case TRAFFIC_MSG_EVT: - case ACTIVATE_MSG: - other = l_ptr->owner->active_links[0]; - if (other && link_working_unknown(other)) - break; - l_ptr->state = WORKING_WORKING; - link_activate(l_ptr); - tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0); - if (l_ptr->owner->working_links == 1) - tipc_link_sync_xmit(l_ptr); - break; - case RESET_MSG: - break; - case SILENCE_EVT: - tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, - 0, 0, 0, 0); - break; - default: - pr_err("%s%u in RR state\n", link_unk_evt, event); - } - break; - default: - pr_err("Unknown link state %u/%u\n", l_ptr->state, event); - } + /* Prepare for renewed mtu size negotiation */ + l->mtu = l->advertised_mtu; + + /* Clean up all queues: */ + __skb_queue_purge(&l->transmq); + __skb_queue_purge(&l->deferdq); + skb_queue_splice_init(&l->wakeupq, l->inputq); + + tipc_link_purge_backlog(l); + kfree_skb(l->reasm_buf); + kfree_skb(l->failover_reasm_skb); + l->reasm_buf = NULL; + l->failover_reasm_skb = NULL; + l->rcv_unacked = 0; + l->snd_nxt = 1; + l->rcv_nxt = 1; + l->silent_intv_cnt = 0; + l->stats.recv_info = 0; + l->stale_count = 0; + link_reset_statistics(l); } /** @@ -635,8 +613,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) * @link: link to use * @list: chain of buffers containing message * - * Consumes the buffer chain, except when returning -ELINKCONG, - * since the caller then may want to make more send attempts. + * Consumes the buffer chain, except when returning an error code, * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted */ @@ -650,7 +627,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, u16 ack = mod(link->rcv_nxt - 1); u16 seqno = link->snd_nxt; u16 bc_last_in = link->owner->bclink.last_in; - struct tipc_media_addr *addr = &link->media_addr; + struct tipc_media_addr *addr = link->media_addr; struct sk_buff_head *transmq = &link->transmq; struct sk_buff_head *backlogq = &link->backlogq; struct sk_buff *skb, *bskb; @@ -660,10 +637,9 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, if (unlikely(link->backlog[i].len >= link->backlog[i].limit)) return link_schedule_user(link, list); } - if (unlikely(msg_size(msg) > mtu)) { - __skb_queue_purge(list); + if (unlikely(msg_size(msg) > mtu)) return -EMSGSIZE; - } + /* Prepare each packet for sending, and add to relevant queue: */ while (skb_queue_len(list)) { skb = skb_peek(list); @@ -700,101 +676,76 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, return 0; } -static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) -{ - skb_queue_head_init(list); - __skb_queue_tail(list, skb); -} - -static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb) -{ - struct sk_buff_head head; - - skb2list(skb, &head); - return __tipc_link_xmit(link->owner->net, link, &head); -} - -/* tipc_link_xmit_skb(): send single buffer to destination - * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE - * messages, which will not be rejected - * The only exception is datagram messages rerouted after secondary - * lookup, which are rare and safe to dispose of anyway. - * TODO: Return real return value, and let callers use - * tipc_wait_for_sendpkt() where applicable - */ -int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, - u32 selector) -{ - struct sk_buff_head head; - int rc; - - skb2list(skb, &head); - rc = tipc_link_xmit(net, &head, dnode, selector); - if (rc == -ELINKCONG) - kfree_skb(skb); - return 0; -} - /** - * tipc_link_xmit() is the general link level function for message sending - * @net: the applicable net namespace + * tipc_link_xmit(): enqueue buffer list according to queue situation + * @link: link to use * @list: chain of buffers containing message - * @dsz: amount of user data to be sent - * @dnode: address of destination node - * @selector: a number used for deterministic link selection - * Consumes the buffer chain, except when returning -ELINKCONG - * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + * @xmitq: returned list of packets to be sent by caller + * + * Consumes the buffer chain, except when returning -ELINKCONG, + * since the caller then may want to make more send attempts. + * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS + * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted */ -int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, - u32 selector) +int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, + struct sk_buff_head *xmitq) { - struct tipc_link *link = NULL; - struct tipc_node *node; - int rc = -EHOSTUNREACH; + struct tipc_msg *hdr = buf_msg(skb_peek(list)); + unsigned int maxwin = l->window; + unsigned int i, imp = msg_importance(hdr); + unsigned int mtu = l->mtu; + u16 ack = l->rcv_nxt - 1; + u16 seqno = l->snd_nxt; + u16 bc_last_in = l->owner->bclink.last_in; + struct sk_buff_head *transmq = &l->transmq; + struct sk_buff_head *backlogq = &l->backlogq; + struct sk_buff *skb, *_skb, *bskb; - node = tipc_node_find(net, dnode); - if (node) { - tipc_node_lock(node); - link = node->active_links[selector & 1]; - if (link) - rc = __tipc_link_xmit(net, link, list); - tipc_node_unlock(node); - tipc_node_put(node); - } - if (link) - return rc; - - if (likely(in_own_node(net, dnode))) { - tipc_sk_rcv(net, list); - return 0; + /* Match msg importance against this and all higher backlog limits: */ + for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { + if (unlikely(l->backlog[i].len >= l->backlog[i].limit)) + return link_schedule_user(l, list); } + if (unlikely(msg_size(hdr) > mtu)) + return -EMSGSIZE; - __skb_queue_purge(list); - return rc; -} - -/* - * tipc_link_sync_xmit - synchronize broadcast link endpoints. - * - * Give a newly added peer node the sequence number where it should - * start receiving and acking broadcast packets. - * - * Called with node locked - */ -static void tipc_link_sync_xmit(struct tipc_link *link) -{ - struct sk_buff *skb; - struct tipc_msg *msg; - - skb = tipc_buf_acquire(INT_H_SIZE); - if (!skb) - return; + /* Prepare each packet for sending, and add to relevant queue: */ + while (skb_queue_len(list)) { + skb = skb_peek(list); + hdr = buf_msg(skb); + msg_set_seqno(hdr, seqno); + msg_set_ack(hdr, ack); + msg_set_bcast_ack(hdr, bc_last_in); - msg = buf_msg(skb); - tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG, - INT_H_SIZE, link->addr); - msg_set_last_bcast(msg, link->owner->bclink.acked); - __tipc_link_xmit_skb(link, skb); + if (likely(skb_queue_len(transmq) < maxwin)) { + _skb = skb_clone(skb, GFP_ATOMIC); + if (!_skb) + return -ENOBUFS; + __skb_dequeue(list); + __skb_queue_tail(transmq, skb); + __skb_queue_tail(xmitq, _skb); + l->rcv_unacked = 0; + seqno++; + continue; + } + if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) { + kfree_skb(__skb_dequeue(list)); + l->stats.sent_bundled++; + continue; + } + if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) { + kfree_skb(__skb_dequeue(list)); + __skb_queue_tail(backlogq, bskb); + l->backlog[msg_importance(buf_msg(bskb))].len++; + l->stats.sent_bundled++; + l->stats.sent_bundles++; + continue; + } + l->backlog[imp].len += skb_queue_len(list); + skb_queue_splice_tail_init(list, backlogq); + } + l->snd_nxt = seqno; + return 0; } /* @@ -842,29 +793,37 @@ void tipc_link_push_packets(struct tipc_link *link) link->rcv_unacked = 0; __skb_queue_tail(&link->transmq, skb); tipc_bearer_send(link->owner->net, link->bearer_id, - skb, &link->media_addr); + skb, link->media_addr); } link->snd_nxt = seqno; } -void tipc_link_reset_all(struct tipc_node *node) +void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) { - char addr_string[16]; - u32 i; - - tipc_node_lock(node); + struct sk_buff *skb, *_skb; + struct tipc_msg *hdr; + u16 seqno = l->snd_nxt; + u16 ack = l->rcv_nxt - 1; - pr_warn("Resetting all links to %s\n", - tipc_addr_string_fill(addr_string, node->addr)); - - for (i = 0; i < MAX_BEARERS; i++) { - if (node->links[i]) { - link_print(node->links[i], "Resetting link\n"); - tipc_link_reset(node->links[i]); - } + while (skb_queue_len(&l->transmq) < l->window) { + skb = skb_peek(&l->backlogq); + if (!skb) + break; + _skb = skb_clone(skb, GFP_ATOMIC); + if (!_skb) + break; + __skb_dequeue(&l->backlogq); + hdr = buf_msg(skb); + l->backlog[msg_importance(hdr)].len--; + __skb_queue_tail(&l->transmq, skb); + __skb_queue_tail(xmitq, _skb); + msg_set_ack(hdr, ack); + msg_set_seqno(hdr, seqno); + msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + l->rcv_unacked = 0; + seqno++; } - - tipc_node_unlock(node); + l->snd_nxt = seqno; } static void link_retransmit_failure(struct tipc_link *l_ptr, @@ -877,9 +836,12 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, if (l_ptr->addr) { /* Handle failure on standard link */ - link_print(l_ptr, "Resetting link\n"); - tipc_link_reset(l_ptr); - + link_print(l_ptr, "Resetting link "); + pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", + msg_user(msg), msg_type(msg), msg_size(msg), + msg_errcode(msg)); + pr_info("sqno %u, prev: %x, src: %x\n", + msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg)); } else { /* Handle failure on broadcast link */ struct tipc_node *n_ptr; @@ -934,191 +896,45 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb, - &l_ptr->media_addr); + l_ptr->media_addr); retransmits--; l_ptr->stats.retransmitted++; } } -/* link_synch(): check if all packets arrived before the synch - * point have been consumed - * Returns true if the parallel links are synched, otherwise false - */ -static bool link_synch(struct tipc_link *l) +static int tipc_link_retransm(struct tipc_link *l, int retransm, + struct sk_buff_head *xmitq) { - unsigned int post_synch; - struct tipc_link *pl; + struct sk_buff *_skb, *skb = skb_peek(&l->transmq); + struct tipc_msg *hdr; - pl = tipc_parallel_link(l); - if (pl == l) - goto synched; - - /* Was last pre-synch packet added to input queue ? */ - if (less_eq(pl->rcv_nxt, l->synch_point)) - return false; - - /* Is it still in the input queue ? */ - post_synch = mod(pl->rcv_nxt - l->synch_point) - 1; - if (skb_queue_len(&pl->inputq) > post_synch) - return false; -synched: - l->flags &= ~LINK_SYNCHING; - return true; -} - -static void link_retrieve_defq(struct tipc_link *link, - struct sk_buff_head *list) -{ - u16 seq_no; - - if (skb_queue_empty(&link->deferdq)) - return; - - seq_no = buf_seqno(skb_peek(&link->deferdq)); - if (seq_no == link->rcv_nxt) - skb_queue_splice_tail_init(&link->deferdq, list); -} - -/** - * tipc_rcv - process TIPC packets/messages arriving from off-node - * @net: the applicable net namespace - * @skb: TIPC packet - * @b_ptr: pointer to bearer message arrived on - * - * Invoked with no locks held. Bearer pointer must point to a valid bearer - * structure (i.e. cannot be NULL), but bearer can be inactive. - */ -void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct sk_buff_head head; - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; - struct sk_buff *skb1, *tmp; - struct tipc_msg *msg; - u16 seq_no; - u16 ackd; - u32 released; - - skb2list(skb, &head); - - while ((skb = __skb_dequeue(&head))) { - /* Ensure message is well-formed */ - if (unlikely(!tipc_msg_validate(skb))) - goto discard; - - /* Handle arrival of a non-unicast link message */ - msg = buf_msg(skb); - if (unlikely(msg_non_seq(msg))) { - if (msg_user(msg) == LINK_CONFIG) - tipc_disc_rcv(net, skb, b_ptr); - else - tipc_bclink_rcv(net, skb); - continue; - } - - /* Discard unicast link messages destined for another node */ - if (unlikely(!msg_short(msg) && - (msg_destnode(msg) != tn->own_addr))) - goto discard; - - /* Locate neighboring node that sent message */ - n_ptr = tipc_node_find(net, msg_prevnode(msg)); - if (unlikely(!n_ptr)) - goto discard; - - tipc_node_lock(n_ptr); - /* Locate unicast link endpoint that should handle message */ - l_ptr = n_ptr->links[b_ptr->identity]; - if (unlikely(!l_ptr)) - goto unlock; - - /* Verify that communication with node is currently allowed */ - if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) && - msg_user(msg) == LINK_PROTOCOL && - (msg_type(msg) == RESET_MSG || - msg_type(msg) == ACTIVATE_MSG) && - !msg_redundant_link(msg)) - n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN; - - if (tipc_node_blocked(n_ptr)) - goto unlock; - - /* Validate message sequence number info */ - seq_no = msg_seqno(msg); - ackd = msg_ack(msg); - - /* Release acked messages */ - if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg))) - tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); - - released = 0; - skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) { - if (more(buf_seqno(skb1), ackd)) - break; - __skb_unlink(skb1, &l_ptr->transmq); - kfree_skb(skb1); - released = 1; - } - - /* Try sending any messages link endpoint has pending */ - if (unlikely(skb_queue_len(&l_ptr->backlogq))) - tipc_link_push_packets(l_ptr); - - if (released && !skb_queue_empty(&l_ptr->wakeupq)) - link_prepare_wakeup(l_ptr); - - /* Process the incoming packet */ - if (unlikely(!link_working_working(l_ptr))) { - if (msg_user(msg) == LINK_PROTOCOL) { - tipc_link_proto_rcv(l_ptr, skb); - link_retrieve_defq(l_ptr, &head); - skb = NULL; - goto unlock; - } - - /* Traffic message. Conditionally activate link */ - link_state_event(l_ptr, TRAFFIC_MSG_EVT); - - if (link_working_working(l_ptr)) { - /* Re-insert buffer in front of queue */ - __skb_queue_head(&head, skb); - skb = NULL; - goto unlock; - } - goto unlock; - } - - /* Link is now in state WORKING_WORKING */ - if (unlikely(seq_no != l_ptr->rcv_nxt)) { - link_handle_out_of_seq_msg(l_ptr, skb); - link_retrieve_defq(l_ptr, &head); - skb = NULL; - goto unlock; - } - l_ptr->silent_intv_cnt = 0; + if (!skb) + return 0; - /* Synchronize with parallel link if applicable */ - if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) { - if (!link_synch(l_ptr)) - goto unlock; - } - l_ptr->rcv_nxt++; - if (unlikely(!skb_queue_empty(&l_ptr->deferdq))) - link_retrieve_defq(l_ptr, &head); - if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) { - l_ptr->stats.sent_acks++; - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0); - } - tipc_link_input(l_ptr, skb); - skb = NULL; -unlock: - tipc_node_unlock(n_ptr); - tipc_node_put(n_ptr); -discard: - if (unlikely(skb)) - kfree_skb(skb); + /* Detect repeated retransmit failures on same packet */ + if (likely(l->last_retransm != buf_seqno(skb))) { + l->last_retransm = buf_seqno(skb); + l->stale_count = 1; + } else if (++l->stale_count > 100) { + link_retransmit_failure(l, skb); + return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); + } + skb_queue_walk(&l->transmq, skb) { + if (!retransm) + return 0; + hdr = buf_msg(skb); + _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); + if (!_skb) + return 0; + hdr = buf_msg(_skb); + msg_set_ack(hdr, l->rcv_nxt - 1); + msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + _skb->priority = TC_PRIO_CONTROL; + __skb_queue_tail(xmitq, _skb); + retransm--; + l->stats.retransmitted++; } + return 0; } /* tipc_data_input - deliver data and name distr msgs to upper layer @@ -1126,29 +942,22 @@ discard: * Consumes buffer if message is of right type * Node lock must be held */ -static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb) +static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, + struct sk_buff_head *inputq) { struct tipc_node *node = link->owner; - struct tipc_msg *msg = buf_msg(skb); - u32 dport = msg_destport(msg); - switch (msg_user(msg)) { + switch (msg_user(buf_msg(skb))) { case TIPC_LOW_IMPORTANCE: case TIPC_MEDIUM_IMPORTANCE: case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: case CONN_MANAGER: - if (tipc_skb_queue_tail(&link->inputq, skb, dport)) { - node->inputq = &link->inputq; - node->action_flags |= TIPC_MSG_EVT; - } + __skb_queue_tail(inputq, skb); return true; case NAME_DISTRIBUTOR: node->bclink.recv_permitted = true; - node->namedq = &link->namedq; - skb_queue_tail(&link->namedq, skb); - if (skb_queue_len(&link->namedq) == 1) - node->action_flags |= TIPC_NAMED_MSG_EVT; + skb_queue_tail(link->namedq, skb); return true; case MSG_BUNDLER: case TUNNEL_PROTOCOL: @@ -1165,54 +974,160 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb) /* tipc_link_input - process packet that has passed link protocol check * * Consumes buffer - * Node lock must be held */ -static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb) +static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *inputq) { - struct tipc_node *node = link->owner; - struct tipc_msg *msg = buf_msg(skb); + struct tipc_node *node = l->owner; + struct tipc_msg *hdr = buf_msg(skb); + struct sk_buff **reasm_skb = &l->reasm_buf; struct sk_buff *iskb; + int usr = msg_user(hdr); + int rc = 0; int pos = 0; + int ipos = 0; - if (likely(tipc_data_input(link, skb))) - return; + if (unlikely(usr == TUNNEL_PROTOCOL)) { + if (msg_type(hdr) == SYNCH_MSG) { + __skb_queue_purge(&l->deferdq); + goto drop; + } + if (!tipc_msg_extract(skb, &iskb, &ipos)) + return rc; + kfree_skb(skb); + skb = iskb; + hdr = buf_msg(skb); + if (less(msg_seqno(hdr), l->drop_point)) + goto drop; + if (tipc_data_input(l, skb, inputq)) + return rc; + usr = msg_user(hdr); + reasm_skb = &l->failover_reasm_skb; + } - switch (msg_user(msg)) { - case TUNNEL_PROTOCOL: - if (msg_dup(msg)) { - link->flags |= LINK_SYNCHING; - link->synch_point = msg_seqno(msg_get_wrapped(msg)); - kfree_skb(skb); - break; + if (usr == MSG_BUNDLER) { + l->stats.recv_bundles++; + l->stats.recv_bundled += msg_msgcnt(hdr); + while (tipc_msg_extract(skb, &iskb, &pos)) + tipc_data_input(l, iskb, inputq); + return 0; + } else if (usr == MSG_FRAGMENTER) { + l->stats.recv_fragments++; + if (tipc_buf_append(reasm_skb, &skb)) { + l->stats.recv_fragmented++; + tipc_data_input(l, skb, inputq); + } else if (!*reasm_skb) { + return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); } - if (!tipc_link_failover_rcv(link, &skb)) - break; - if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { - tipc_data_input(link, skb); + return 0; + } else if (usr == BCAST_PROTOCOL) { + tipc_link_sync_rcv(node, skb); + return 0; + } +drop: + kfree_skb(skb); + return 0; +} + +static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) +{ + bool released = false; + struct sk_buff *skb, *tmp; + + skb_queue_walk_safe(&l->transmq, skb, tmp) { + if (more(buf_seqno(skb), acked)) break; + __skb_unlink(skb, &l->transmq); + kfree_skb(skb); + released = true; + } + return released; +} + +/* tipc_link_rcv - process TIPC packets/messages arriving from off-node + * @link: the link that should handle the message + * @skb: TIPC packet + * @xmitq: queue to place packets to be sent after this call + */ +int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + struct sk_buff_head *arrvq = &l->deferdq; + struct sk_buff_head tmpq; + struct tipc_msg *hdr; + u16 seqno, rcv_nxt; + int rc = 0; + + __skb_queue_head_init(&tmpq); + + if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) { + if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV)) + tipc_link_build_proto_msg(l, STATE_MSG, 0, + 0, 0, 0, xmitq); + return rc; + } + + while ((skb = skb_peek(arrvq))) { + hdr = buf_msg(skb); + + /* Verify and update link state */ + if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) { + __skb_dequeue(arrvq); + rc = tipc_link_proto_rcv(l, skb, xmitq); + continue; } - case MSG_BUNDLER: - link->stats.recv_bundles++; - link->stats.recv_bundled += msg_msgcnt(msg); - while (tipc_msg_extract(skb, &iskb, &pos)) - tipc_data_input(link, iskb); - break; - case MSG_FRAGMENTER: - link->stats.recv_fragments++; - if (tipc_buf_append(&link->reasm_buf, &skb)) { - link->stats.recv_fragmented++; - tipc_data_input(link, skb); - } else if (!link->reasm_buf) { - tipc_link_reset(link); + if (unlikely(!link_is_up(l))) { + rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); + if (!link_is_up(l)) { + kfree_skb(__skb_dequeue(arrvq)); + goto exit; + } } - break; - case BCAST_PROTOCOL: - tipc_link_sync_rcv(node, skb); - break; - default: - break; - }; + + l->silent_intv_cnt = 0; + + /* Forward queues and wake up waiting users */ + if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) { + tipc_link_advance_backlog(l, xmitq); + if (unlikely(!skb_queue_empty(&l->wakeupq))) + link_prepare_wakeup(l); + } + + /* Defer reception if there is a gap in the sequence */ + seqno = msg_seqno(hdr); + rcv_nxt = l->rcv_nxt; + if (unlikely(less(rcv_nxt, seqno))) { + l->stats.deferred_recv++; + goto exit; + } + + __skb_dequeue(arrvq); + + /* Drop if packet already received */ + if (unlikely(more(rcv_nxt, seqno))) { + l->stats.duplicates++; + kfree_skb(skb); + goto exit; + } + + /* Packet can be delivered */ + l->rcv_nxt++; + l->stats.recv_info++; + if (unlikely(!tipc_data_input(l, skb, &tmpq))) + rc = tipc_link_input(l, skb, &tmpq); + + /* Ack at regular intervals */ + if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { + l->rcv_unacked = 0; + l->stats.sent_acks++; + tipc_link_build_proto_msg(l, STATE_MSG, + 0, 0, 0, 0, xmitq); + } + } +exit: + tipc_skb_queue_splice_tail(&tmpq, l->inputq); + return rc; } /** @@ -1255,458 +1170,249 @@ u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb) } /* - * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet + * Send protocol message to the other endpoint. */ -static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, - struct sk_buff *buf) +void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg, + u32 gap, u32 tolerance, u32 priority) { - u32 seq_no = buf_seqno(buf); + struct sk_buff *skb = NULL; + struct sk_buff_head xmitq; - if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { - tipc_link_proto_rcv(l_ptr, buf); + __skb_queue_head_init(&xmitq); + tipc_link_build_proto_msg(l, msg_typ, probe_msg, gap, + tolerance, priority, &xmitq); + skb = __skb_dequeue(&xmitq); + if (!skb) return; - } - - /* Record OOS packet arrival */ - l_ptr->silent_intv_cnt = 0; + tipc_bearer_send(l->owner->net, l->bearer_id, skb, l->media_addr); + l->rcv_unacked = 0; + kfree_skb(skb); +} - /* - * Discard packet if a duplicate; otherwise add it to deferred queue - * and notify peer of gap as per protocol specification - */ - if (less(seq_no, l_ptr->rcv_nxt)) { - l_ptr->stats.duplicates++; - kfree_skb(buf); +/* tipc_link_build_proto_msg: prepare link protocol message for transmission + */ +static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, + u16 rcvgap, int tolerance, int priority, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb = NULL; + struct tipc_msg *hdr = l->pmsg; + u16 snd_nxt = l->snd_nxt; + u16 rcv_nxt = l->rcv_nxt; + u16 rcv_last = rcv_nxt - 1; + int node_up = l->owner->bclink.recv_permitted; + + /* Don't send protocol message during reset or link failover */ + if (tipc_link_is_blocked(l)) return; - } - if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) { - l_ptr->stats.deferred_recv++; - if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1) - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0); + msg_set_type(hdr, mtyp); + msg_set_net_plane(hdr, l->net_plane); + msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); + msg_set_link_tolerance(hdr, tolerance); + msg_set_linkprio(hdr, priority); + msg_set_redundant_link(hdr, node_up); + msg_set_seq_gap(hdr, 0); + + /* Compatibility: created msg must not be in sequence with pkt flow */ + msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); + + if (mtyp == STATE_MSG) { + if (!tipc_link_is_up(l)) + return; + msg_set_next_sent(hdr, snd_nxt); + + /* Override rcvgap if there are packets in deferred queue */ + if (!skb_queue_empty(&l->deferdq)) + rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; + if (rcvgap) { + msg_set_seq_gap(hdr, rcvgap); + l->stats.sent_nacks++; + } + msg_set_ack(hdr, rcv_last); + msg_set_probe(hdr, probe); + if (probe) + l->stats.sent_probes++; + l->stats.sent_states++; } else { - l_ptr->stats.duplicates++; + /* RESET_MSG or ACTIVATE_MSG */ + msg_set_max_pkt(hdr, l->advertised_mtu); + msg_set_ack(hdr, l->rcv_nxt - 1); + msg_set_next_sent(hdr, 1); } + skb = tipc_buf_acquire(msg_size(hdr)); + if (!skb) + return; + skb_copy_to_linear_data(skb, hdr, msg_size(hdr)); + skb->priority = TC_PRIO_CONTROL; + __skb_queue_tail(xmitq, skb); } -/* - * Send protocol message to the other endpoint. +/* tipc_link_tnl_prepare(): prepare and return a list of tunnel packets + * with contents of the link's tranmsit and backlog queues. */ -void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, - u32 gap, u32 tolerance, u32 priority) +void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, + int mtyp, struct sk_buff_head *xmitq) { - struct sk_buff *buf = NULL; - struct tipc_msg *msg = l_ptr->pmsg; - u32 msg_size = sizeof(l_ptr->proto_msg); - int r_flag; - u16 last_rcv; - - /* Don't send protocol message during link failover */ - if (l_ptr->flags & LINK_FAILINGOVER) - return; + struct sk_buff *skb, *tnlskb; + struct tipc_msg *hdr, tnlhdr; + struct sk_buff_head *queue = &l->transmq; + struct sk_buff_head tmpxq, tnlq; + u16 pktlen, pktcnt, seqno = l->snd_nxt; - /* Abort non-RESET send if communication with node is prohibited */ - if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG)) + if (!tnl) return; - /* Create protocol message with "out-of-sequence" sequence number */ - msg_set_type(msg, msg_typ); - msg_set_net_plane(msg, l_ptr->net_plane); - msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - msg_set_last_bcast(msg, tipc_bclink_get_last_sent(l_ptr->owner->net)); - - if (msg_typ == STATE_MSG) { - u16 next_sent = l_ptr->snd_nxt; + skb_queue_head_init(&tnlq); + skb_queue_head_init(&tmpxq); - if (!tipc_link_is_up(l_ptr)) + /* At least one packet required for safe algorithm => add dummy */ + skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, + BASIC_H_SIZE, 0, l->addr, link_own_addr(l), + 0, 0, TIPC_ERR_NO_PORT); + if (!skb) { + pr_warn("%sunable to create tunnel packet\n", link_co_err); + return; + } + skb_queue_tail(&tnlq, skb); + tipc_link_xmit(l, &tnlq, &tmpxq); + __skb_queue_purge(&tmpxq); + + /* Initialize reusable tunnel packet header */ + tipc_msg_init(link_own_addr(l), &tnlhdr, TUNNEL_PROTOCOL, + mtyp, INT_H_SIZE, l->addr); + pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq); + msg_set_msgcnt(&tnlhdr, pktcnt); + msg_set_bearer_id(&tnlhdr, l->peer_bearer_id); +tnl: + /* Wrap each packet into a tunnel packet */ + skb_queue_walk(queue, skb) { + hdr = buf_msg(skb); + if (queue == &l->backlogq) + msg_set_seqno(hdr, seqno++); + pktlen = msg_size(hdr); + msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); + tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE); + if (!tnlskb) { + pr_warn("%sunable to send packet\n", link_co_err); return; - msg_set_next_sent(msg, next_sent); - if (!skb_queue_empty(&l_ptr->deferdq)) { - last_rcv = buf_seqno(skb_peek(&l_ptr->deferdq)); - gap = mod(last_rcv - l_ptr->rcv_nxt); } - msg_set_seq_gap(msg, gap); - if (gap) - l_ptr->stats.sent_nacks++; - msg_set_link_tolerance(msg, tolerance); - msg_set_linkprio(msg, priority); - msg_set_max_pkt(msg, l_ptr->mtu); - msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1)); - msg_set_probe(msg, probe_msg != 0); - if (probe_msg) - l_ptr->stats.sent_probes++; - l_ptr->stats.sent_states++; - } else { /* RESET_MSG or ACTIVATE_MSG */ - msg_set_ack(msg, mod(l_ptr->failover_checkpt - 1)); - msg_set_seq_gap(msg, 0); - msg_set_next_sent(msg, 1); - msg_set_probe(msg, 0); - msg_set_link_tolerance(msg, l_ptr->tolerance); - msg_set_linkprio(msg, l_ptr->priority); - msg_set_max_pkt(msg, l_ptr->advertised_mtu); + skb_copy_to_linear_data(tnlskb, &tnlhdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(tnlskb, INT_H_SIZE, hdr, pktlen); + __skb_queue_tail(&tnlq, tnlskb); + } + if (queue != &l->backlogq) { + queue = &l->backlogq; + goto tnl; } - r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr)); - msg_set_redundant_link(msg, r_flag); - msg_set_linkprio(msg, l_ptr->priority); - msg_set_size(msg, msg_size); - - msg_set_seqno(msg, mod(l_ptr->snd_nxt + (0xffff / 2))); - - buf = tipc_buf_acquire(msg_size); - if (!buf) - return; + tipc_link_xmit(tnl, &tnlq, xmitq); - skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); - buf->priority = TC_PRIO_CONTROL; - tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf, - &l_ptr->media_addr); - l_ptr->rcv_unacked = 0; - kfree_skb(buf); + if (mtyp == FAILOVER_MSG) { + tnl->drop_point = l->rcv_nxt; + tnl->failover_reasm_skb = l->reasm_buf; + l->reasm_buf = NULL; + } } -/* - * Receive protocol message : +/* tipc_link_proto_rcv(): receive link level protocol message : * Note that network plane id propagates through the network, and may - * change at any time. The node with lowest address rules + * change at any time. The node with lowest numerical id determines + * network plane */ -static void tipc_link_proto_rcv(struct tipc_link *l_ptr, - struct sk_buff *buf) +static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq) { - u32 rec_gap = 0; - u32 msg_tol; - struct tipc_msg *msg = buf_msg(buf); + struct tipc_msg *hdr = buf_msg(skb); + u16 rcvgap = 0; + u16 nacked_gap = msg_seq_gap(hdr); + u16 peers_snd_nxt = msg_next_sent(hdr); + u16 peers_tol = msg_link_tolerance(hdr); + u16 peers_prio = msg_linkprio(hdr); + char *if_name; + int rc = 0; - if (l_ptr->flags & LINK_FAILINGOVER) + if (tipc_link_is_blocked(l)) goto exit; - if (l_ptr->net_plane != msg_net_plane(msg)) - if (link_own_addr(l_ptr) > msg_prevnode(msg)) - l_ptr->net_plane = msg_net_plane(msg); - - switch (msg_type(msg)) { + if (link_own_addr(l) > msg_prevnode(hdr)) + l->net_plane = msg_net_plane(hdr); + switch (msg_type(hdr)) { case RESET_MSG: - if (!link_working_unknown(l_ptr) && - (l_ptr->peer_session != INVALID_SESSION)) { - if (less_eq(msg_session(msg), l_ptr->peer_session)) - break; /* duplicate or old reset: ignore */ - } - - if (!msg_redundant_link(msg) && (link_working_working(l_ptr) || - link_working_unknown(l_ptr))) { - /* - * peer has lost contact -- don't allow peer's links - * to reactivate before we recognize loss & clean up - */ - l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN; - } - - link_state_event(l_ptr, RESET_MSG); + /* Ignore duplicate RESET with old session number */ + if ((less_eq(msg_session(hdr), l->peer_session)) && + (l->peer_session != WILDCARD_SESSION)) + break; /* fall thru' */ - case ACTIVATE_MSG: - /* Update link settings according other endpoint's values */ - strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg)); - msg_tol = msg_link_tolerance(msg); - if (msg_tol > l_ptr->tolerance) - link_set_supervision_props(l_ptr, msg_tol); - - if (msg_linkprio(msg) > l_ptr->priority) - l_ptr->priority = msg_linkprio(msg); - - if (l_ptr->mtu > msg_max_pkt(msg)) - l_ptr->mtu = msg_max_pkt(msg); - - /* Synchronize broadcast link info, if not done previously */ - if (!tipc_node_is_up(l_ptr->owner)) { - l_ptr->owner->bclink.last_sent = - l_ptr->owner->bclink.last_in = - msg_last_bcast(msg); - l_ptr->owner->bclink.oos_state = 0; - } - - l_ptr->peer_session = msg_session(msg); - l_ptr->peer_bearer_id = msg_bearer_id(msg); - - if (msg_type(msg) == ACTIVATE_MSG) - link_state_event(l_ptr, ACTIVATE_MSG); - break; - case STATE_MSG: + case ACTIVATE_MSG: - msg_tol = msg_link_tolerance(msg); - if (msg_tol) - link_set_supervision_props(l_ptr, msg_tol); - - if (msg_linkprio(msg) && - (msg_linkprio(msg) != l_ptr->priority)) { - pr_debug("%s<%s>, priority change %u->%u\n", - link_rst_msg, l_ptr->name, - l_ptr->priority, msg_linkprio(msg)); - l_ptr->priority = msg_linkprio(msg); - tipc_link_reset(l_ptr); /* Enforce change to take effect */ + /* Complete own link name with peer's interface name */ + if_name = strrchr(l->name, ':') + 1; + if (sizeof(l->name) - (if_name - l->name) <= TIPC_MAX_IF_NAME) break; - } - - /* Record reception; force mismatch at next timeout: */ - l_ptr->silent_intv_cnt = 0; - - link_state_event(l_ptr, TRAFFIC_MSG_EVT); - l_ptr->stats.recv_states++; - if (link_reset_unknown(l_ptr)) + if (msg_data_sz(hdr) < TIPC_MAX_IF_NAME) break; + strncpy(if_name, msg_data(hdr), TIPC_MAX_IF_NAME); - if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg))) - rec_gap = mod(msg_next_sent(msg) - l_ptr->rcv_nxt); + /* Update own tolerance if peer indicates a non-zero value */ + if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL)) + l->tolerance = peers_tol; - if (msg_probe(msg)) - l_ptr->stats.recv_probes++; + /* Update own priority if peer's priority is higher */ + if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI)) + l->priority = peers_prio; - /* Protocol message before retransmits, reduce loss risk */ - if (l_ptr->owner->bclink.recv_permitted) - tipc_bclink_update_link_state(l_ptr->owner, - msg_last_bcast(msg)); - - if (rec_gap || (msg_probe(msg))) { - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, - rec_gap, 0, 0); - } - if (msg_seq_gap(msg)) { - l_ptr->stats.recv_nacks++; - tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq), - msg_seq_gap(msg)); + if (msg_type(hdr) == RESET_MSG) { + rc |= tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); + } else if (!link_is_up(l)) { + tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); + rc |= tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); } + l->peer_session = msg_session(hdr); + l->peer_bearer_id = msg_bearer_id(hdr); + if (l->mtu > msg_max_pkt(hdr)) + l->mtu = msg_max_pkt(hdr); break; - } -exit: - kfree_skb(buf); -} - - -/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to - * a different bearer. Owner node is locked. - */ -static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, - struct tipc_msg *tunnel_hdr, - struct tipc_msg *msg, - u32 selector) -{ - struct tipc_link *tunnel; - struct sk_buff *skb; - u32 length = msg_size(msg); - - tunnel = l_ptr->owner->active_links[selector & 1]; - if (!tipc_link_is_up(tunnel)) { - pr_warn("%stunnel link no longer available\n", link_co_err); - return; - } - msg_set_size(tunnel_hdr, length + INT_H_SIZE); - skb = tipc_buf_acquire(length + INT_H_SIZE); - if (!skb) { - pr_warn("%sunable to send tunnel msg\n", link_co_err); - return; - } - skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length); - __tipc_link_xmit_skb(tunnel, skb); -} - - -/* tipc_link_failover_send_queue(): A link has gone down, but a second - * link is still active. We can do failover. Tunnel the failing link's - * whole send queue via the remaining link. This way, we don't lose - * any packets, and sequence order is preserved for subsequent traffic - * sent over the remaining link. Owner node is locked. - */ -void tipc_link_failover_send_queue(struct tipc_link *l_ptr) -{ - int msgcount; - struct tipc_link *tunnel = l_ptr->owner->active_links[0]; - struct tipc_msg tunnel_hdr; - struct sk_buff *skb; - int split_bundles; - - if (!tunnel) - return; - tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL, - FAILOVER_MSG, INT_H_SIZE, l_ptr->addr); - - skb_queue_walk(&l_ptr->backlogq, skb) { - msg_set_seqno(buf_msg(skb), l_ptr->snd_nxt); - l_ptr->snd_nxt = mod(l_ptr->snd_nxt + 1); - } - skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); - tipc_link_purge_backlog(l_ptr); - msgcount = skb_queue_len(&l_ptr->transmq); - msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); - msg_set_msgcnt(&tunnel_hdr, msgcount); - - if (skb_queue_empty(&l_ptr->transmq)) { - skb = tipc_buf_acquire(INT_H_SIZE); - if (skb) { - skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); - msg_set_size(&tunnel_hdr, INT_H_SIZE); - __tipc_link_xmit_skb(tunnel, skb); - } else { - pr_warn("%sunable to send changeover msg\n", - link_co_err); - } - return; - } - - split_bundles = (l_ptr->owner->active_links[0] != - l_ptr->owner->active_links[1]); + case STATE_MSG: - skb_queue_walk(&l_ptr->transmq, skb) { - struct tipc_msg *msg = buf_msg(skb); + /* Update own tolerance if peer indicates a non-zero value */ + if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL)) + l->tolerance = peers_tol; - if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { - struct tipc_msg *m = msg_get_wrapped(msg); - unchar *pos = (unchar *)m; + l->silent_intv_cnt = 0; + l->stats.recv_states++; + if (msg_probe(hdr)) + l->stats.recv_probes++; + rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); + if (!link_is_up(l)) + break; - msgcount = msg_msgcnt(msg); - while (msgcount--) { - msg_set_seqno(m, msg_seqno(msg)); - tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m, - msg_link_selector(m)); - pos += align(msg_size(m)); - m = (struct tipc_msg *)pos; - } - } else { - tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, - msg_link_selector(msg)); + /* Send NACK if peer has sent pkts we haven't received yet */ + if (more(peers_snd_nxt, l->rcv_nxt)) + rcvgap = peers_snd_nxt - l->rcv_nxt; + if (rcvgap || (msg_probe(hdr))) + tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, + 0, 0, xmitq); + tipc_link_release_pkts(l, msg_ack(hdr)); + + /* If NACK, retransmit will now start at right position */ + if (nacked_gap) { + rc = tipc_link_retransm(l, nacked_gap, xmitq); + l->stats.recv_nacks++; } - } -} - -/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a - * duplicate of the first link's send queue via the new link. This way, we - * are guaranteed that currently queued packets from a socket are delivered - * before future traffic from the same socket, even if this is using the - * new link. The last arriving copy of each duplicate packet is dropped at - * the receiving end by the regular protocol check, so packet cardinality - * and sequence order is preserved per sender/receiver socket pair. - * Owner node is locked. - */ -void tipc_link_dup_queue_xmit(struct tipc_link *link, - struct tipc_link *tnl) -{ - struct sk_buff *skb; - struct tipc_msg tnl_hdr; - struct sk_buff_head *queue = &link->transmq; - int mcnt; - u16 seqno; - - tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL, - SYNCH_MSG, INT_H_SIZE, link->addr); - mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq); - msg_set_msgcnt(&tnl_hdr, mcnt); - msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id); - -tunnel_queue: - skb_queue_walk(queue, skb) { - struct sk_buff *outskb; - struct tipc_msg *msg = buf_msg(skb); - u32 len = msg_size(msg); - msg_set_ack(msg, mod(link->rcv_nxt - 1)); - msg_set_bcast_ack(msg, link->owner->bclink.last_in); - msg_set_size(&tnl_hdr, len + INT_H_SIZE); - outskb = tipc_buf_acquire(len + INT_H_SIZE); - if (outskb == NULL) { - pr_warn("%sunable to send duplicate msg\n", - link_co_err); - return; - } - skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, - skb->data, len); - __tipc_link_xmit_skb(tnl, outskb); - if (!tipc_link_is_up(link)) - return; - } - if (queue == &link->backlogq) - return; - seqno = link->snd_nxt; - skb_queue_walk(&link->backlogq, skb) { - msg_set_seqno(buf_msg(skb), seqno); - seqno = mod(seqno + 1); - } - queue = &link->backlogq; - goto tunnel_queue; -} - -/* tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet - * Owner node is locked. - */ -static bool tipc_link_failover_rcv(struct tipc_link *link, - struct sk_buff **skb) -{ - struct tipc_msg *msg = buf_msg(*skb); - struct sk_buff *iskb = NULL; - struct tipc_link *pl = NULL; - int bearer_id = msg_bearer_id(msg); - int pos = 0; - - if (msg_type(msg) != FAILOVER_MSG) { - pr_warn("%sunknown tunnel pkt received\n", link_co_err); - goto exit; - } - if (bearer_id >= MAX_BEARERS) - goto exit; - - if (bearer_id == link->bearer_id) - goto exit; - - pl = link->owner->links[bearer_id]; - if (pl && tipc_link_is_up(pl)) - tipc_link_reset(pl); - - if (link->failover_pkts == FIRST_FAILOVER) - link->failover_pkts = msg_msgcnt(msg); - - /* Should we expect an inner packet? */ - if (!link->failover_pkts) - goto exit; - - if (!tipc_msg_extract(*skb, &iskb, &pos)) { - pr_warn("%sno inner failover pkt\n", link_co_err); - *skb = NULL; - goto exit; - } - link->failover_pkts--; - *skb = NULL; - - /* Was this packet already delivered? */ - if (less(buf_seqno(iskb), link->failover_checkpt)) { - kfree_skb(iskb); - iskb = NULL; - goto exit; - } - if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) { - link->stats.recv_fragments++; - tipc_buf_append(&link->failover_skb, &iskb); + tipc_link_advance_backlog(l, xmitq); + if (unlikely(!skb_queue_empty(&l->wakeupq))) + link_prepare_wakeup(l); } exit: - if (!link->failover_pkts && pl) - pl->flags &= ~LINK_FAILINGOVER; - kfree_skb(*skb); - *skb = iskb; - return *skb; -} - -static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) -{ - unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4; - - if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL)) - return; - - l_ptr->tolerance = tol; - l_ptr->keepalive_intv = msecs_to_jiffies(intv); - l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->keepalive_intv)); + kfree_skb(skb); + return rc; } void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) @@ -1743,7 +1449,7 @@ static struct tipc_node *tipc_link_find_owner(struct net *net, list_for_each_entry_rcu(n_ptr, &tn->node_list, list) { tipc_node_lock(n_ptr); for (i = 0; i < MAX_BEARERS; i++) { - l_ptr = n_ptr->links[i]; + l_ptr = n_ptr->links[i].link; if (l_ptr && !strcmp(l_ptr->name, link_name)) { *bearer_id = i; found_node = n_ptr; @@ -1770,27 +1476,16 @@ static void link_reset_statistics(struct tipc_link *l_ptr) l_ptr->stats.recv_info = l_ptr->rcv_nxt; } -static void link_print(struct tipc_link *l_ptr, const char *str) +static void link_print(struct tipc_link *l, const char *str) { - struct tipc_net *tn = net_generic(l_ptr->owner->net, tipc_net_id); - struct tipc_bearer *b_ptr; - - rcu_read_lock(); - b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]); - if (b_ptr) - pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name); - rcu_read_unlock(); - - if (link_working_unknown(l_ptr)) - pr_cont(":WU\n"); - else if (link_reset_reset(l_ptr)) - pr_cont(":RR\n"); - else if (link_reset_unknown(l_ptr)) - pr_cont(":RU\n"); - else if (link_working_working(l_ptr)) - pr_cont(":WW\n"); - else - pr_cont("\n"); + struct sk_buff *hskb = skb_peek(&l->transmq); + u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt; + u16 tail = l->snd_nxt - 1; + + pr_info("%s Link <%s> state %x\n", str, l->name, l->state); + pr_info("XMTQ: %u [%u-%u], BKLGQ: %u, SNDNX: %u, RCVNX: %u\n", + skb_queue_len(&l->transmq), head, tail, + skb_queue_len(&l->backlogq), l->snd_nxt, l->rcv_nxt); } /* Parse and validate nested (link) properties valid for media, bearer and link @@ -1865,7 +1560,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info) tipc_node_lock(node); - link = node->links[bearer_id]; + link = node->links[bearer_id].link; if (!link) { res = -EINVAL; goto out; @@ -1885,7 +1580,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info) u32 tol; tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]); - link_set_supervision_props(link, tol); + link->tolerance = tol; tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0); } if (props[TIPC_NLA_PROP_PRIO]) { @@ -2055,10 +1750,11 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg, for (i = *prev_link; i < MAX_BEARERS; i++) { *prev_link = i; - if (!node->links[i]) + if (!node->links[i].link) continue; - err = __tipc_nl_add_link(net, msg, node->links[i], NLM_F_MULTI); + err = __tipc_nl_add_link(net, msg, + node->links[i].link, NLM_F_MULTI); if (err) return err; } @@ -2172,7 +1868,7 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info) return -EINVAL; tipc_node_lock(node); - link = node->links[bearer_id]; + link = node->links[bearer_id].link; if (!link) { tipc_node_unlock(node); nlmsg_free(msg.skb); @@ -2227,7 +1923,7 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info) tipc_node_lock(node); - link = node->links[bearer_id]; + link = node->links[bearer_id].link; if (!link) { tipc_node_unlock(node); return -EINVAL; diff --git a/net/tipc/link.h b/net/tipc/link.h index ae0a0ea572f2..39ff8b6919a4 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -49,19 +49,25 @@ */ #define INVALID_LINK_SEQ 0x10000 -/* Link working states +/* Link FSM events: */ -#define WORKING_WORKING 560810u -#define WORKING_UNKNOWN 560811u -#define RESET_UNKNOWN 560812u -#define RESET_RESET 560813u +enum { + LINK_ESTABLISH_EVT = 0xec1ab1e, + LINK_PEER_RESET_EVT = 0x9eed0e, + LINK_FAILURE_EVT = 0xfa110e, + LINK_RESET_EVT = 0x10ca1d0e, + LINK_FAILOVER_BEGIN_EVT = 0xfa110bee, + LINK_FAILOVER_END_EVT = 0xfa110ede, + LINK_SYNCH_BEGIN_EVT = 0xc1ccbee, + LINK_SYNCH_END_EVT = 0xc1ccede +}; -/* Link endpoint execution states +/* Events returned from link at packet reception or at timeout */ -#define LINK_STARTED 0x0001 -#define LINK_STOPPED 0x0002 -#define LINK_SYNCHING 0x0004 -#define LINK_FAILINGOVER 0x0008 +enum { + TIPC_LINK_UP_EVT = 1, + TIPC_LINK_DOWN_EVT = (1 << 1) +}; /* Starting value for maximum packet size negotiation on unicast links * (unless bearer MTU is less) @@ -106,7 +112,6 @@ struct tipc_stats { * @timer: link timer * @owner: pointer to peer node * @refcnt: reference counter for permanent references (owner node & timer) - * @flags: execution state flags for link endpoint instance * @peer_session: link session # being used by peer end of link * @peer_bearer_id: bearer id used by link's peer endpoint * @bearer_id: local bearer id used by link @@ -143,20 +148,17 @@ struct tipc_stats { struct tipc_link { u32 addr; char name[TIPC_MAX_LINK_NAME]; - struct tipc_media_addr media_addr; - struct timer_list timer; + struct tipc_media_addr *media_addr; struct tipc_node *owner; - struct kref ref; /* Management and link supervision data */ - unsigned int flags; u32 peer_session; u32 peer_bearer_id; u32 bearer_id; u32 tolerance; unsigned long keepalive_intv; u32 abort_limit; - int state; + u32 state; u32 silent_intv_cnt; struct { unchar hdr[INT_H_SIZE]; @@ -165,12 +167,10 @@ struct tipc_link { struct tipc_msg *pmsg; u32 priority; char net_plane; - u16 synch_point; - /* Failover */ - u16 failover_pkts; - u16 failover_checkpt; - struct sk_buff *failover_skb; + /* Failover/synch */ + u16 drop_point; + struct sk_buff *failover_reasm_skb; /* Max packet negotiation */ u16 mtu; @@ -192,8 +192,8 @@ struct tipc_link { u16 rcv_nxt; u32 rcv_unacked; struct sk_buff_head deferdq; - struct sk_buff_head inputq; - struct sk_buff_head namedq; + struct sk_buff_head *inputq; + struct sk_buff_head *namedq; /* Congestion handling */ struct sk_buff_head wakeupq; @@ -205,28 +205,29 @@ struct tipc_link { struct tipc_stats stats; }; -struct tipc_port; - -struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, - struct tipc_bearer *b_ptr, - const struct tipc_media_addr *media_addr); -void tipc_link_delete(struct tipc_link *link); -void tipc_link_delete_list(struct net *net, unsigned int bearer_id); -void tipc_link_failover_send_queue(struct tipc_link *l_ptr); -void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *dest); +bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, + u32 ownnode, u32 peer, struct tipc_media_addr *maddr, + struct sk_buff_head *inputq, struct sk_buff_head *namedq, + struct tipc_link **link); +void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, + int mtyp, struct sk_buff_head *xmitq); +void tipc_link_build_bcast_sync_msg(struct tipc_link *l, + struct sk_buff_head *xmitq); +int tipc_link_fsm_evt(struct tipc_link *l, int evt); void tipc_link_reset_fragments(struct tipc_link *l_ptr); -int tipc_link_is_up(struct tipc_link *l_ptr); +bool tipc_link_is_up(struct tipc_link *l); +bool tipc_link_is_reset(struct tipc_link *l); +bool tipc_link_is_synching(struct tipc_link *l); +bool tipc_link_is_failingover(struct tipc_link *l); +bool tipc_link_is_blocked(struct tipc_link *l); int tipc_link_is_active(struct tipc_link *l_ptr); void tipc_link_purge_queues(struct tipc_link *l_ptr); void tipc_link_purge_backlog(struct tipc_link *l); -void tipc_link_reset_all(struct tipc_node *node); void tipc_link_reset(struct tipc_link *l_ptr); -int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, - u32 selector); -int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest, - u32 selector); int __tipc_link_xmit(struct net *net, struct tipc_link *link, struct sk_buff_head *list); +int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, + struct sk_buff_head *xmitq); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, u32 gap, u32 tolerance, u32 priority); void tipc_link_push_packets(struct tipc_link *l_ptr); @@ -242,34 +243,8 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info); int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info); int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info); int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); -void link_prepare_wakeup(struct tipc_link *l); - -static inline u32 link_own_addr(struct tipc_link *l) -{ - return msg_prevnode(l->pmsg); -} - -/* - * Link status checking routines - */ -static inline int link_working_working(struct tipc_link *l_ptr) -{ - return l_ptr->state == WORKING_WORKING; -} - -static inline int link_working_unknown(struct tipc_link *l_ptr) -{ - return l_ptr->state == WORKING_UNKNOWN; -} - -static inline int link_reset_unknown(struct tipc_link *l_ptr) -{ - return l_ptr->state == RESET_UNKNOWN; -} - -static inline int link_reset_reset(struct tipc_link *l_ptr) -{ - return l_ptr->state == RESET_RESET; -} +int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); +int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq); #endif diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 08b4cc7d496d..562c926a51cc 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -463,60 +463,72 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, /** * tipc_msg_reverse(): swap source and destination addresses and add error code - * @buf: buffer containing message to be reversed - * @dnode: return value: node where to send message after reversal - * @err: error code to be set in message - * Consumes buffer if failure + * @own_node: originating node id for reversed message + * @skb: buffer containing message to be reversed; may be replaced. + * @err: error code to be set in message, if any + * Consumes buffer at failure * Returns true if success, otherwise false */ -bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode, - int err) +bool tipc_msg_reverse(u32 own_node, struct sk_buff **skb, int err) { - struct tipc_msg *msg = buf_msg(buf); + struct sk_buff *_skb = *skb; + struct tipc_msg *hdr = buf_msg(_skb); struct tipc_msg ohdr; - uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE); + int dlen = min_t(uint, msg_data_sz(hdr), MAX_FORWARD_SIZE); - if (skb_linearize(buf)) + if (skb_linearize(_skb)) goto exit; - msg = buf_msg(buf); - if (msg_dest_droppable(msg)) + hdr = buf_msg(_skb); + if (msg_dest_droppable(hdr)) goto exit; - if (msg_errcode(msg)) + if (msg_errcode(hdr)) goto exit; - memcpy(&ohdr, msg, msg_hdr_sz(msg)); - msg_set_errcode(msg, err); - msg_set_origport(msg, msg_destport(&ohdr)); - msg_set_destport(msg, msg_origport(&ohdr)); - msg_set_prevnode(msg, own_addr); - if (!msg_short(msg)) { - msg_set_orignode(msg, msg_destnode(&ohdr)); - msg_set_destnode(msg, msg_orignode(&ohdr)); + + /* Take a copy of original header before altering message */ + memcpy(&ohdr, hdr, msg_hdr_sz(hdr)); + + /* Never return SHORT header; expand by replacing buffer if necessary */ + if (msg_short(hdr)) { + *skb = tipc_buf_acquire(BASIC_H_SIZE + dlen); + if (!*skb) + goto exit; + memcpy((*skb)->data + BASIC_H_SIZE, msg_data(hdr), dlen); + kfree_skb(_skb); + _skb = *skb; + hdr = buf_msg(_skb); + memcpy(hdr, &ohdr, BASIC_H_SIZE); + msg_set_hdr_sz(hdr, BASIC_H_SIZE); } - msg_set_size(msg, msg_hdr_sz(msg) + rdsz); - skb_trim(buf, msg_size(msg)); - skb_orphan(buf); - *dnode = msg_orignode(&ohdr); + + /* Now reverse the concerned fields */ + msg_set_errcode(hdr, err); + msg_set_origport(hdr, msg_destport(&ohdr)); + msg_set_destport(hdr, msg_origport(&ohdr)); + msg_set_destnode(hdr, msg_prevnode(&ohdr)); + msg_set_prevnode(hdr, own_node); + msg_set_orignode(hdr, own_node); + msg_set_size(hdr, msg_hdr_sz(hdr) + dlen); + skb_trim(_skb, msg_size(hdr)); + skb_orphan(_skb); return true; exit: - kfree_skb(buf); - *dnode = 0; + kfree_skb(_skb); + *skb = NULL; return false; } /** * tipc_msg_lookup_dest(): try to find new destination for named message * @skb: the buffer containing the message. - * @dnode: return value: next-hop node, if destination found - * @err: return value: error code to use, if message to be rejected + * @err: error code to be used by caller if lookup fails * Does not consume buffer * Returns true if a destination is found, false otherwise */ -bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, - u32 *dnode, int *err) +bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err) { struct tipc_msg *msg = buf_msg(skb); - u32 dport; - u32 own_addr = tipc_own_addr(net); + u32 dport, dnode; + u32 onode = tipc_own_addr(net); if (!msg_isdata(msg)) return false; @@ -529,15 +541,15 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, return false; if (msg_reroute_cnt(msg)) return false; - *dnode = addr_domain(net, msg_lookup_scope(msg)); + dnode = addr_domain(net, msg_lookup_scope(msg)); dport = tipc_nametbl_translate(net, msg_nametype(msg), - msg_nameinst(msg), dnode); + msg_nameinst(msg), &dnode); if (!dport) return false; msg_incr_reroute_cnt(msg); - if (*dnode != own_addr) - msg_set_prevnode(msg, own_addr); - msg_set_destnode(msg, *dnode); + if (dnode != onode) + msg_set_prevnode(msg, onode); + msg_set_destnode(msg, dnode); msg_set_destport(msg, dport); *err = TIPC_OK; return true; diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 19c45fb66238..a82c5848d4bc 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -38,6 +38,7 @@ #define _TIPC_MSG_H #include <linux/tipc.h> +#include "core.h" /* * Constants and routines used to read and write TIPC payload message headers @@ -109,7 +110,6 @@ struct tipc_skb_cb { struct sk_buff *tail; bool validated; bool wakeup_pending; - bool bundling; u16 chain_sz; u16 chain_imp; }; @@ -558,15 +558,6 @@ static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n) msg_set_bits(m, 1, 15, 0x1fff, n); } -static inline bool msg_dup(struct tipc_msg *m) -{ - if (likely(msg_user(m) != TUNNEL_PROTOCOL)) - return false; - if (msg_type(m) != SYNCH_MSG) - return false; - return true; -} - /* * Word 2 */ @@ -620,12 +611,12 @@ static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n) } -static inline u32 msg_next_sent(struct tipc_msg *m) +static inline u16 msg_next_sent(struct tipc_msg *m) { return msg_bits(m, 4, 0, 0xffff); } -static inline void msg_set_next_sent(struct tipc_msg *m, u32 n) +static inline void msg_set_next_sent(struct tipc_msg *m, u16 n) { msg_set_bits(m, 4, 0, 0xffff, n); } @@ -658,12 +649,12 @@ static inline void msg_set_link_selector(struct tipc_msg *m, u32 n) /* * Word 5 */ -static inline u32 msg_session(struct tipc_msg *m) +static inline u16 msg_session(struct tipc_msg *m) { return msg_bits(m, 5, 16, 0xffff); } -static inline void msg_set_session(struct tipc_msg *m, u32 n) +static inline void msg_set_session(struct tipc_msg *m, u16 n) { msg_set_bits(m, 5, 16, 0xffff, n); } @@ -726,12 +717,12 @@ static inline char *msg_media_addr(struct tipc_msg *m) /* * Word 9 */ -static inline u32 msg_msgcnt(struct tipc_msg *m) +static inline u16 msg_msgcnt(struct tipc_msg *m) { return msg_bits(m, 9, 16, 0xffff); } -static inline void msg_set_msgcnt(struct tipc_msg *m, u32 n) +static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n) { msg_set_bits(m, 9, 16, 0xffff, n); } @@ -766,10 +757,25 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 0, 0xffff, n); } +static inline bool msg_peer_link_is_up(struct tipc_msg *m) +{ + if (likely(msg_user(m) != LINK_PROTOCOL)) + return true; + if (msg_type(m) == STATE_MSG) + return true; + return false; +} + +static inline bool msg_peer_node_is_up(struct tipc_msg *m) +{ + if (msg_peer_link_is_up(m)) + return true; + return msg_redundant_link(m); +} + struct sk_buff *tipc_buf_acquire(u32 size); bool tipc_msg_validate(struct sk_buff *skb); -bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode, - int err); +bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err); void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode); struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, @@ -782,8 +788,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, int dsz, int mtu, struct sk_buff_head *list); -bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode, - int *err); +bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); static inline u16 buf_seqno(struct sk_buff *skb) @@ -857,26 +862,65 @@ static inline struct sk_buff *tipc_skb_dequeue(struct sk_buff_head *list, return skb; } -/* tipc_skb_queue_tail(): add buffer to tail of list; +/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number * @list: list to be appended to - * @skb: buffer to append. Always appended - * @dport: the destination port of the buffer - * returns true if dport differs from previous destination + * @skb: buffer to add + * Returns true if queue should treated further, otherwise false */ -static inline bool tipc_skb_queue_tail(struct sk_buff_head *list, - struct sk_buff *skb, u32 dport) +static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list, + struct sk_buff *skb) { - struct sk_buff *_skb = NULL; - bool rv = false; + struct sk_buff *_skb, *tmp; + struct tipc_msg *hdr = buf_msg(skb); + u16 seqno = msg_seqno(hdr); - spin_lock_bh(&list->lock); - _skb = skb_peek_tail(list); - if (!_skb || (msg_destport(buf_msg(_skb)) != dport) || - (skb_queue_len(list) > 32)) - rv = true; + if (skb_queue_empty(list) || (msg_user(hdr) == LINK_PROTOCOL)) { + __skb_queue_head(list, skb); + return true; + } + if (likely(less(seqno, buf_seqno(skb_peek(list))))) { + __skb_queue_head(list, skb); + return true; + } + if (!more(seqno, buf_seqno(skb_peek_tail(list)))) { + skb_queue_walk_safe(list, _skb, tmp) { + if (likely(less(seqno, buf_seqno(_skb)))) { + __skb_queue_before(list, _skb, skb); + return true; + } + } + } __skb_queue_tail(list, skb); + return false; +} + +/* tipc_skb_queue_splice_tail - append an skb list to lock protected list + * @list: the new list to append. Not lock protected + * @head: target list. Lock protected. + */ +static inline void tipc_skb_queue_splice_tail(struct sk_buff_head *list, + struct sk_buff_head *head) +{ + spin_lock_bh(&head->lock); + skb_queue_splice_tail(list, head); + spin_unlock_bh(&head->lock); +} + +/* tipc_skb_queue_splice_tail_init - merge two lock protected skb lists + * @list: the new list to add. Lock protected. Will be reinitialized + * @head: target list. Lock protected. + */ +static inline void tipc_skb_queue_splice_tail_init(struct sk_buff_head *list, + struct sk_buff_head *head) +{ + struct sk_buff_head tmp; + + __skb_queue_head_init(&tmp); + + spin_lock_bh(&list->lock); + skb_queue_splice_tail_init(list, &tmp); spin_unlock_bh(&list->lock); - return rv; + tipc_skb_queue_splice_tail(&tmp, head); } #endif diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 41e7b7e4dda0..e6018b7eb197 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -96,13 +96,13 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb) dnode = node->addr; if (in_own_node(net, dnode)) continue; - if (!tipc_node_active_links(node)) + if (!tipc_node_is_up(node)) continue; oskb = pskb_copy(skb, GFP_ATOMIC); if (!oskb) break; msg_set_destnode(buf_msg(oskb), dnode); - tipc_link_xmit_skb(net, oskb, dnode, dnode); + tipc_node_xmit_skb(net, oskb, dnode, dnode); } rcu_read_unlock(); @@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode) &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]); rcu_read_unlock(); - tipc_link_xmit(net, &head, dnode, dnode); + tipc_node_xmit(net, &head, dnode, dnode); } static void tipc_publ_subscribe(struct net *net, struct publication *publ, diff --git a/net/tipc/netlink_compat.c b/net/tipc/netlink_compat.c index 53e0fee80086..1eadc95e1132 100644 --- a/net/tipc/netlink_compat.c +++ b/net/tipc/netlink_compat.c @@ -1114,7 +1114,7 @@ static int tipc_nl_compat_recv(struct sk_buff *skb, struct genl_info *info) } len = nlmsg_attrlen(req_nlh, GENL_HDRLEN + TIPC_GENL_HDRLEN); - if (TLV_GET_LEN(msg.req) && !TLV_OK(msg.req, len)) { + if (len && !TLV_OK(msg.req, len)) { msg.rep = tipc_get_err_tlv(TIPC_CFG_NOT_SUPPORTED); err = -EOPNOTSUPP; goto send; diff --git a/net/tipc/node.c b/net/tipc/node.c index 0b1d61a5f853..7c191641b44f 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -40,10 +40,42 @@ #include "name_distr.h" #include "socket.h" #include "bcast.h" +#include "discover.h" -static void node_lost_contact(struct tipc_node *n_ptr); +/* Node FSM states and events: + */ +enum { + SELF_DOWN_PEER_DOWN = 0xdd, + SELF_UP_PEER_UP = 0xaa, + SELF_DOWN_PEER_LEAVING = 0xd1, + SELF_UP_PEER_COMING = 0xac, + SELF_COMING_PEER_UP = 0xca, + SELF_LEAVING_PEER_DOWN = 0x1d, + NODE_FAILINGOVER = 0xf0, + NODE_SYNCHING = 0xcc +}; + +enum { + SELF_ESTABL_CONTACT_EVT = 0xece, + SELF_LOST_CONTACT_EVT = 0x1ce, + PEER_ESTABL_CONTACT_EVT = 0x9ece, + PEER_LOST_CONTACT_EVT = 0x91ce, + NODE_FAILOVER_BEGIN_EVT = 0xfbe, + NODE_FAILOVER_END_EVT = 0xfee, + NODE_SYNCH_BEGIN_EVT = 0xcbe, + NODE_SYNCH_END_EVT = 0xcee +}; + +static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr **maddr); +static void tipc_node_link_down(struct tipc_node *n, int bearer_id, + bool delete); +static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); static void node_established_contact(struct tipc_node *n_ptr); static void tipc_node_delete(struct tipc_node *node); +static void tipc_node_timeout(unsigned long data); +static void tipc_node_fsm_evt(struct tipc_node *n, int evt); struct tipc_sock_conn { u32 port; @@ -110,7 +142,7 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr) return NULL; } -struct tipc_node *tipc_node_create(struct net *net, u32 addr) +struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities) { struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_node *n_ptr, *temp_node; @@ -126,12 +158,14 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) } n_ptr->addr = addr; n_ptr->net = net; + n_ptr->capabilities = capabilities; kref_init(&n_ptr->kref); spin_lock_init(&n_ptr->lock); INIT_HLIST_NODE(&n_ptr->hash); INIT_LIST_HEAD(&n_ptr->list); INIT_LIST_HEAD(&n_ptr->publ_list); INIT_LIST_HEAD(&n_ptr->conn_sks); + skb_queue_head_init(&n_ptr->bclink.namedq); __skb_queue_head_init(&n_ptr->bclink.deferdq); hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); list_for_each_entry_rcu(temp_node, &tn->node_list, list) { @@ -139,14 +173,32 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) break; } list_add_tail_rcu(&n_ptr->list, &temp_node->list); - n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN; + n_ptr->state = SELF_DOWN_PEER_LEAVING; n_ptr->signature = INVALID_NODE_SIG; + n_ptr->active_links[0] = INVALID_BEARER_ID; + n_ptr->active_links[1] = INVALID_BEARER_ID; tipc_node_get(n_ptr); + setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr); + n_ptr->keepalive_intv = U32_MAX; exit: spin_unlock_bh(&tn->node_list_lock); return n_ptr; } +static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l) +{ + unsigned long tol = l->tolerance; + unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4; + unsigned long keepalive_intv = msecs_to_jiffies(intv); + + /* Link with lowest tolerance determines timer interval */ + if (keepalive_intv < n->keepalive_intv) + n->keepalive_intv = keepalive_intv; + + /* Ensure link's abort limit corresponds to current interval */ + l->abort_limit = l->tolerance / jiffies_to_msecs(n->keepalive_intv); +} + static void tipc_node_delete(struct tipc_node *node) { list_del_rcu(&node->list); @@ -160,8 +212,11 @@ void tipc_node_stop(struct net *net) struct tipc_node *node, *t_node; spin_lock_bh(&tn->node_list_lock); - list_for_each_entry_safe(node, t_node, &tn->node_list, list) + list_for_each_entry_safe(node, t_node, &tn->node_list, list) { + if (del_timer(&node->timer)) + tipc_node_put(node); tipc_node_put(node); + } spin_unlock_bh(&tn->node_list_lock); } @@ -219,158 +274,547 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port) tipc_node_put(node); } +/* tipc_node_timeout - handle expiration of node timer + */ +static void tipc_node_timeout(unsigned long data) +{ + struct tipc_node *n = (struct tipc_node *)data; + struct tipc_link_entry *le; + struct sk_buff_head xmitq; + int bearer_id; + int rc = 0; + + __skb_queue_head_init(&xmitq); + + for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { + tipc_node_lock(n); + le = &n->links[bearer_id]; + if (le->link) { + /* Link tolerance may change asynchronously: */ + tipc_node_calculate_timer(n, le->link); + rc = tipc_link_timeout(le->link, &xmitq); + } + tipc_node_unlock(n); + tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); + if (rc & TIPC_LINK_DOWN_EVT) + tipc_node_link_down(n, bearer_id, false); + } + if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) + tipc_node_get(n); + tipc_node_put(n); +} + /** - * tipc_node_link_up - handle addition of link - * + * __tipc_node_link_up - handle addition of link + * Node lock must be held by caller * Link becomes active (alone or shared) or standby, depending on its priority. */ -void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, + struct sk_buff_head *xmitq) { - struct tipc_link **active = &n_ptr->active_links[0]; + int *slot0 = &n->active_links[0]; + int *slot1 = &n->active_links[1]; + struct tipc_link *ol = node_active_link(n, 0); + struct tipc_link *nl = n->links[bearer_id].link; - n_ptr->working_links++; - n_ptr->action_flags |= TIPC_NOTIFY_LINK_UP; - n_ptr->link_id = l_ptr->peer_bearer_id << 16 | l_ptr->bearer_id; + if (!nl || !tipc_link_is_up(nl)) + return; - pr_debug("Established link <%s> on network plane %c\n", - l_ptr->name, l_ptr->net_plane); + n->working_links++; + n->action_flags |= TIPC_NOTIFY_LINK_UP; + n->link_id = nl->peer_bearer_id << 16 | bearer_id; - if (!active[0]) { - active[0] = active[1] = l_ptr; - node_established_contact(n_ptr); - goto exit; - } - if (l_ptr->priority < active[0]->priority) { - pr_debug("New link <%s> becomes standby\n", l_ptr->name); - goto exit; + /* Leave room for tunnel header when returning 'mtu' to users: */ + n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; + + tipc_bearer_add_dest(n->net, bearer_id, n->addr); + + pr_debug("Established link <%s> on network plane %c\n", + nl->name, nl->net_plane); + + /* First link? => give it both slots */ + if (!ol) { + *slot0 = bearer_id; + *slot1 = bearer_id; + tipc_link_build_bcast_sync_msg(nl, xmitq); + node_established_contact(n); + return; } - tipc_link_dup_queue_xmit(active[0], l_ptr); - if (l_ptr->priority == active[0]->priority) { - active[0] = l_ptr; - goto exit; + + /* Second link => redistribute slots */ + if (nl->priority > ol->priority) { + pr_debug("Old link <%s> becomes standby\n", ol->name); + *slot0 = bearer_id; + *slot1 = bearer_id; + } else if (nl->priority == ol->priority) { + *slot0 = bearer_id; + } else { + pr_debug("New link <%s> is standby\n", nl->name); } - pr_debug("Old link <%s> becomes standby\n", active[0]->name); - if (active[1] != active[0]) - pr_debug("Old link <%s> becomes standby\n", active[1]->name); - active[0] = active[1] = l_ptr; -exit: - /* Leave room for changeover header when returning 'mtu' to users: */ - n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE; - n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE; + + /* Prepare synchronization with first link */ + tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq); } /** - * node_select_active_links - select active link + * tipc_node_link_up - handle addition of link + * + * Link becomes active (alone or shared) or standby, depending on its priority. */ -static void node_select_active_links(struct tipc_node *n_ptr) +static void tipc_node_link_up(struct tipc_node *n, int bearer_id, + struct sk_buff_head *xmitq) { - struct tipc_link **active = &n_ptr->active_links[0]; - u32 i; - u32 highest_prio = 0; + tipc_node_lock(n); + __tipc_node_link_up(n, bearer_id, xmitq); + tipc_node_unlock(n); +} - active[0] = active[1] = NULL; +/** + * __tipc_node_link_down - handle loss of link + */ +static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr **maddr) +{ + struct tipc_link_entry *le = &n->links[*bearer_id]; + int *slot0 = &n->active_links[0]; + int *slot1 = &n->active_links[1]; + int i, highest = 0; + struct tipc_link *l, *_l, *tnl; + + l = n->links[*bearer_id].link; + if (!l || tipc_link_is_reset(l)) + return; - for (i = 0; i < MAX_BEARERS; i++) { - struct tipc_link *l_ptr = n_ptr->links[i]; + n->working_links--; + n->action_flags |= TIPC_NOTIFY_LINK_DOWN; + n->link_id = l->peer_bearer_id << 16 | *bearer_id; - if (!l_ptr || !tipc_link_is_up(l_ptr) || - (l_ptr->priority < highest_prio)) - continue; + tipc_bearer_remove_dest(n->net, *bearer_id, n->addr); + + pr_debug("Lost link <%s> on network plane %c\n", + l->name, l->net_plane); - if (l_ptr->priority > highest_prio) { - highest_prio = l_ptr->priority; - active[0] = active[1] = l_ptr; - } else { - active[1] = l_ptr; + /* Select new active link if any available */ + *slot0 = INVALID_BEARER_ID; + *slot1 = INVALID_BEARER_ID; + for (i = 0; i < MAX_BEARERS; i++) { + _l = n->links[i].link; + if (!_l || !tipc_link_is_up(_l)) + continue; + if (_l == l) + continue; + if (_l->priority < highest) + continue; + if (_l->priority > highest) { + highest = _l->priority; + *slot0 = i; + *slot1 = i; + continue; } + *slot1 = i; + } + + if (!tipc_node_is_up(n)) { + tipc_link_reset(l); + node_lost_contact(n, &le->inputq); + return; } + + /* There is still a working link => initiate failover */ + tnl = node_active_link(n, 0); + n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); + tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); + tipc_link_reset(l); + tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); + tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT); + *maddr = &n->links[tnl->bearer_id].maddr; + *bearer_id = tnl->bearer_id; } -/** - * tipc_node_link_down - handle loss of link - */ -void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) { - struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); - struct tipc_link **active; + struct tipc_link_entry *le = &n->links[bearer_id]; + struct tipc_media_addr *maddr; + struct sk_buff_head xmitq; + + __skb_queue_head_init(&xmitq); + + tipc_node_lock(n); + __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); + if (delete && le->link) { + kfree(le->link); + le->link = NULL; + n->link_cnt--; + } + tipc_node_unlock(n); - n_ptr->working_links--; - n_ptr->action_flags |= TIPC_NOTIFY_LINK_DOWN; - n_ptr->link_id = l_ptr->peer_bearer_id << 16 | l_ptr->bearer_id; + tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); + tipc_sk_rcv(n->net, &le->inputq); +} - if (!tipc_link_is_active(l_ptr)) { - pr_debug("Lost standby link <%s> on network plane %c\n", - l_ptr->name, l_ptr->net_plane); - return; - } - pr_debug("Lost link <%s> on network plane %c\n", - l_ptr->name, l_ptr->net_plane); - - active = &n_ptr->active_links[0]; - if (active[0] == l_ptr) - active[0] = active[1]; - if (active[1] == l_ptr) - active[1] = active[0]; - if (active[0] == l_ptr) - node_select_active_links(n_ptr); - if (tipc_node_is_up(n_ptr)) - tipc_link_failover_send_queue(l_ptr); - else - node_lost_contact(n_ptr); - - /* Leave room for changeover header when returning 'mtu' to users: */ - if (active[0]) { - n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE; - n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE; +bool tipc_node_is_up(struct tipc_node *n) +{ + return n->active_links[0] != INVALID_BEARER_ID; +} + +void tipc_node_check_dest(struct net *net, u32 onode, + struct tipc_bearer *b, + u16 capabilities, u32 signature, + struct tipc_media_addr *maddr, + bool *respond, bool *dupl_addr) +{ + struct tipc_node *n; + struct tipc_link *l; + struct tipc_link_entry *le; + bool addr_match = false; + bool sign_match = false; + bool link_up = false; + bool accept_addr = false; + bool reset = true; + + *dupl_addr = false; + *respond = false; + + n = tipc_node_create(net, onode, capabilities); + if (!n) return; + + tipc_node_lock(n); + + le = &n->links[b->identity]; + + /* Prepare to validate requesting node's signature and media address */ + l = le->link; + link_up = l && tipc_link_is_up(l); + addr_match = l && !memcmp(&le->maddr, maddr, sizeof(*maddr)); + sign_match = (signature == n->signature); + + /* These three flags give us eight permutations: */ + + if (sign_match && addr_match && link_up) { + /* All is fine. Do nothing. */ + reset = false; + } else if (sign_match && addr_match && !link_up) { + /* Respond. The link will come up in due time */ + *respond = true; + } else if (sign_match && !addr_match && link_up) { + /* Peer has changed i/f address without rebooting. + * If so, the link will reset soon, and the next + * discovery will be accepted. So we can ignore it. + * It may also be an cloned or malicious peer having + * chosen the same node address and signature as an + * existing one. + * Ignore requests until the link goes down, if ever. + */ + *dupl_addr = true; + } else if (sign_match && !addr_match && !link_up) { + /* Peer link has changed i/f address without rebooting. + * It may also be a cloned or malicious peer; we can't + * distinguish between the two. + * The signature is correct, so we must accept. + */ + accept_addr = true; + *respond = true; + } else if (!sign_match && addr_match && link_up) { + /* Peer node rebooted. Two possibilities: + * - Delayed re-discovery; this link endpoint has already + * reset and re-established contact with the peer, before + * receiving a discovery message from that node. + * (The peer happened to receive one from this node first). + * - The peer came back so fast that our side has not + * discovered it yet. Probing from this side will soon + * reset the link, since there can be no working link + * endpoint at the peer end, and the link will re-establish. + * Accept the signature, since it comes from a known peer. + */ + n->signature = signature; + } else if (!sign_match && addr_match && !link_up) { + /* The peer node has rebooted. + * Accept signature, since it is a known peer. + */ + n->signature = signature; + *respond = true; + } else if (!sign_match && !addr_match && link_up) { + /* Peer rebooted with new address, or a new/duplicate peer. + * Ignore until the link goes down, if ever. + */ + *dupl_addr = true; + } else if (!sign_match && !addr_match && !link_up) { + /* Peer rebooted with new address, or it is a new peer. + * Accept signature and address. + */ + n->signature = signature; + accept_addr = true; + *respond = true; } - /* Loopback link went down? No fragmentation needed from now on. */ - if (n_ptr->addr == tn->own_addr) { - n_ptr->act_mtus[0] = MAX_MSG_SIZE; - n_ptr->act_mtus[1] = MAX_MSG_SIZE; + + if (!accept_addr) + goto exit; + + /* Now create new link if not already existing */ + if (!l) { + if (n->link_cnt == 2) { + pr_warn("Cannot establish 3rd link to %x\n", n->addr); + goto exit; + } + if (!tipc_link_create(n, b, mod(tipc_net(net)->random), + tipc_own_addr(net), onode, &le->maddr, + &le->inputq, &n->bclink.namedq, &l)) { + *respond = false; + goto exit; + } + tipc_link_reset(l); + le->link = l; + n->link_cnt++; + tipc_node_calculate_timer(n, l); + if (n->link_cnt == 1) + if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) + tipc_node_get(n); } + memcpy(&le->maddr, maddr, sizeof(*maddr)); +exit: + tipc_node_unlock(n); + if (reset) + tipc_node_link_down(n, b->identity, false); + tipc_node_put(n); } -int tipc_node_active_links(struct tipc_node *n_ptr) +void tipc_node_delete_links(struct net *net, int bearer_id) { - return n_ptr->active_links[0] != NULL; + struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_node *n; + + rcu_read_lock(); + list_for_each_entry_rcu(n, &tn->node_list, list) { + tipc_node_link_down(n, bearer_id, true); + } + rcu_read_unlock(); } -int tipc_node_is_up(struct tipc_node *n_ptr) +static void tipc_node_reset_links(struct tipc_node *n) { - return tipc_node_active_links(n_ptr); + char addr_string[16]; + int i; + + pr_warn("Resetting all links to %s\n", + tipc_addr_string_fill(addr_string, n->addr)); + + for (i = 0; i < MAX_BEARERS; i++) { + tipc_node_link_down(n, i, false); + } } -void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +/* tipc_node_fsm_evt - node finite state machine + * Determines when contact is allowed with peer node + */ +static void tipc_node_fsm_evt(struct tipc_node *n, int evt) { - n_ptr->links[l_ptr->bearer_id] = l_ptr; - n_ptr->link_cnt++; + int state = n->state; + + switch (state) { + case SELF_DOWN_PEER_DOWN: + switch (evt) { + case SELF_ESTABL_CONTACT_EVT: + state = SELF_UP_PEER_COMING; + break; + case PEER_ESTABL_CONTACT_EVT: + state = SELF_COMING_PEER_UP; + break; + case SELF_LOST_CONTACT_EVT: + case PEER_LOST_CONTACT_EVT: + break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: + default: + goto illegal_evt; + } + break; + case SELF_UP_PEER_UP: + switch (evt) { + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_LEAVING; + break; + case PEER_LOST_CONTACT_EVT: + state = SELF_LEAVING_PEER_DOWN; + break; + case NODE_SYNCH_BEGIN_EVT: + state = NODE_SYNCHING; + break; + case NODE_FAILOVER_BEGIN_EVT: + state = NODE_FAILINGOVER; + break; + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + case NODE_SYNCH_END_EVT: + case NODE_FAILOVER_END_EVT: + break; + default: + goto illegal_evt; + } + break; + case SELF_DOWN_PEER_LEAVING: + switch (evt) { + case PEER_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_DOWN; + break; + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + case SELF_LOST_CONTACT_EVT: + break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: + default: + goto illegal_evt; + } + break; + case SELF_UP_PEER_COMING: + switch (evt) { + case PEER_ESTABL_CONTACT_EVT: + state = SELF_UP_PEER_UP; + break; + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_LEAVING; + break; + case SELF_ESTABL_CONTACT_EVT: + case PEER_LOST_CONTACT_EVT: + break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: + default: + goto illegal_evt; + } + break; + case SELF_COMING_PEER_UP: + switch (evt) { + case SELF_ESTABL_CONTACT_EVT: + state = SELF_UP_PEER_UP; + break; + case PEER_LOST_CONTACT_EVT: + state = SELF_LEAVING_PEER_DOWN; + break; + case SELF_LOST_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: + default: + goto illegal_evt; + } + break; + case SELF_LEAVING_PEER_DOWN: + switch (evt) { + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_DOWN; + break; + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + case PEER_LOST_CONTACT_EVT: + break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: + default: + goto illegal_evt; + } + break; + case NODE_FAILINGOVER: + switch (evt) { + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_LEAVING; + break; + case PEER_LOST_CONTACT_EVT: + state = SELF_LEAVING_PEER_DOWN; + break; + case NODE_FAILOVER_END_EVT: + state = SELF_UP_PEER_UP; + break; + case NODE_FAILOVER_BEGIN_EVT: + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + break; + case NODE_SYNCH_BEGIN_EVT: + case NODE_SYNCH_END_EVT: + default: + goto illegal_evt; + } + break; + case NODE_SYNCHING: + switch (evt) { + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_LEAVING; + break; + case PEER_LOST_CONTACT_EVT: + state = SELF_LEAVING_PEER_DOWN; + break; + case NODE_SYNCH_END_EVT: + state = SELF_UP_PEER_UP; + break; + case NODE_FAILOVER_BEGIN_EVT: + state = NODE_FAILINGOVER; + break; + case NODE_SYNCH_BEGIN_EVT: + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + break; + case NODE_FAILOVER_END_EVT: + default: + goto illegal_evt; + } + break; + default: + pr_err("Unknown node fsm state %x\n", state); + break; + } + n->state = state; + return; + +illegal_evt: + pr_err("Illegal node fsm evt %x in state %x\n", evt, state); } -void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr) { - int i; + int state = n->state; - for (i = 0; i < MAX_BEARERS; i++) { - if (l_ptr != n_ptr->links[i]) - continue; - n_ptr->links[i] = NULL; - n_ptr->link_cnt--; + if (likely(state == SELF_UP_PEER_UP)) + return true; + + if (state == SELF_LEAVING_PEER_DOWN) + return false; + + if (state == SELF_DOWN_PEER_LEAVING) { + if (msg_peer_node_is_up(hdr)) + return false; } + + return true; } static void node_established_contact(struct tipc_node *n_ptr) { + tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT); n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP; n_ptr->bclink.oos_state = 0; n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net); tipc_bclink_add_node(n_ptr->net, n_ptr->addr); } -static void node_lost_contact(struct tipc_node *n_ptr) +static void node_lost_contact(struct tipc_node *n_ptr, + struct sk_buff_head *inputq) { char addr_string[16]; struct tipc_sock_conn *conn, *safe; + struct tipc_link *l; struct list_head *conns = &n_ptr->conn_sks; struct sk_buff *skb; struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); @@ -396,21 +840,13 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Abort any ongoing link failover */ for (i = 0; i < MAX_BEARERS; i++) { - struct tipc_link *l_ptr = n_ptr->links[i]; - if (!l_ptr) - continue; - l_ptr->flags &= ~LINK_FAILINGOVER; - l_ptr->failover_checkpt = 0; - l_ptr->failover_pkts = 0; - kfree_skb(l_ptr->failover_skb); - l_ptr->failover_skb = NULL; - tipc_link_reset_fragments(l_ptr); + l = n_ptr->links[i].link; + if (l) + tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); } - n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN; - /* Prevent re-contact with node until cleanup is done */ - n_ptr->action_flags |= TIPC_WAIT_PEER_LINKS_DOWN; + tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT); /* Notify publications from this node */ n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN; @@ -421,10 +857,8 @@ static void node_lost_contact(struct tipc_node *n_ptr) SHORT_H_SIZE, 0, tn->own_addr, conn->peer_node, conn->port, conn->peer_port, TIPC_ERR_NO_NODE); - if (likely(skb)) { - skb_queue_tail(n_ptr->inputq, skb); - n_ptr->action_flags |= TIPC_MSG_EVT; - } + if (likely(skb)) + skb_queue_tail(inputq, skb); list_del(&conn->list); kfree(conn); } @@ -453,7 +887,7 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr, goto exit; tipc_node_lock(node); - link = node->links[bearer_id]; + link = node->links[bearer_id].link; if (link) { strncpy(linkname, link->name, len); err = 0; @@ -471,27 +905,20 @@ void tipc_node_unlock(struct tipc_node *node) u32 flags = node->action_flags; u32 link_id = 0; struct list_head *publ_list; - struct sk_buff_head *inputq = node->inputq; - struct sk_buff_head *namedq; - if (likely(!flags || (flags == TIPC_MSG_EVT))) { - node->action_flags = 0; + if (likely(!flags)) { spin_unlock_bh(&node->lock); - if (flags == TIPC_MSG_EVT) - tipc_sk_rcv(net, inputq); return; } addr = node->addr; link_id = node->link_id; - namedq = node->namedq; publ_list = &node->publ_list; - node->action_flags &= ~(TIPC_MSG_EVT | - TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | + node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP | TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT | - TIPC_NAMED_MSG_EVT | TIPC_BCAST_RESET); + TIPC_BCAST_RESET); spin_unlock_bh(&node->lock); @@ -512,17 +939,11 @@ void tipc_node_unlock(struct tipc_node *node) tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, link_id, addr); - if (flags & TIPC_MSG_EVT) - tipc_sk_rcv(net, inputq); - - if (flags & TIPC_NAMED_MSG_EVT) - tipc_named_rcv(net, namedq); - if (flags & TIPC_BCAST_MSG_EVT) tipc_bclink_input(net); if (flags & TIPC_BCAST_RESET) - tipc_link_reset_all(node); + tipc_node_reset_links(node); } /* Caller should hold node lock for the passed node */ @@ -559,6 +980,279 @@ msg_full: return -EMSGSIZE; } +static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel, + int *bearer_id, + struct tipc_media_addr **maddr) +{ + int id = n->active_links[sel & 1]; + + if (unlikely(id < 0)) + return NULL; + + *bearer_id = id; + *maddr = &n->links[id].maddr; + return n->links[id].link; +} + +/** + * tipc_node_xmit() is the general link level function for message sending + * @net: the applicable net namespace + * @list: chain of buffers containing message + * @dnode: address of destination node + * @selector: a number used for deterministic link selection + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + */ +int tipc_node_xmit(struct net *net, struct sk_buff_head *list, + u32 dnode, int selector) +{ + struct tipc_link *l = NULL; + struct tipc_node *n; + struct sk_buff_head xmitq; + struct tipc_media_addr *maddr; + int bearer_id; + int rc = -EHOSTUNREACH; + + __skb_queue_head_init(&xmitq); + n = tipc_node_find(net, dnode); + if (likely(n)) { + tipc_node_lock(n); + l = tipc_node_select_link(n, selector, &bearer_id, &maddr); + if (likely(l)) + rc = tipc_link_xmit(l, list, &xmitq); + tipc_node_unlock(n); + if (unlikely(rc == -ENOBUFS)) + tipc_node_link_down(n, bearer_id, false); + tipc_node_put(n); + } + if (likely(!rc)) { + tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); + return 0; + } + if (likely(in_own_node(net, dnode))) { + tipc_sk_rcv(net, list); + return 0; + } + return rc; +} + +/* tipc_node_xmit_skb(): send single buffer to destination + * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE + * messages, which will not be rejected + * The only exception is datagram messages rerouted after secondary + * lookup, which are rare and safe to dispose of anyway. + * TODO: Return real return value, and let callers use + * tipc_wait_for_sendpkt() where applicable + */ +int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, + u32 selector) +{ + struct sk_buff_head head; + int rc; + + skb_queue_head_init(&head); + __skb_queue_tail(&head, skb); + rc = tipc_node_xmit(net, &head, dnode, selector); + if (rc == -ELINKCONG) + kfree_skb(skb); + return 0; +} + +/** + * tipc_node_check_state - check and if necessary update node state + * @skb: TIPC packet + * @bearer_id: identity of bearer delivering the packet + * Returns true if state is ok, otherwise consumes buffer and returns false + */ +static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, + int bearer_id, struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb); + int usr = msg_user(hdr); + int mtyp = msg_type(hdr); + u16 oseqno = msg_seqno(hdr); + u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); + u16 exp_pkts = msg_msgcnt(hdr); + u16 rcv_nxt, syncpt, dlv_nxt; + int state = n->state; + struct tipc_link *l, *pl = NULL; + struct tipc_media_addr *maddr; + int i, pb_id; + + l = n->links[bearer_id].link; + if (!l) + return false; + rcv_nxt = l->rcv_nxt; + + + if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) + return true; + + /* Find parallel link, if any */ + for (i = 0; i < MAX_BEARERS; i++) { + if ((i != bearer_id) && n->links[i].link) { + pl = n->links[i].link; + break; + } + } + + /* Update node accesibility if applicable */ + if (state == SELF_UP_PEER_COMING) { + if (!tipc_link_is_up(l)) + return true; + if (!msg_peer_link_is_up(hdr)) + return true; + tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT); + } + + if (state == SELF_DOWN_PEER_LEAVING) { + if (msg_peer_node_is_up(hdr)) + return false; + tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); + } + + /* Ignore duplicate packets */ + if (less(oseqno, rcv_nxt)) + return true; + + /* Initiate or update failover mode if applicable */ + if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { + syncpt = oseqno + exp_pkts - 1; + if (pl && tipc_link_is_up(pl)) { + pb_id = pl->bearer_id; + __tipc_node_link_down(n, &pb_id, xmitq, &maddr); + tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq); + } + /* If pkts arrive out of order, use lowest calculated syncpt */ + if (less(syncpt, n->sync_point)) + n->sync_point = syncpt; + } + + /* Open parallel link when tunnel link reaches synch point */ + if ((n->state == NODE_FAILINGOVER) && !tipc_link_is_failingover(l)) { + if (!more(rcv_nxt, n->sync_point)) + return true; + tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT); + if (pl) + tipc_link_fsm_evt(pl, LINK_FAILOVER_END_EVT); + return true; + } + + /* Initiate or update synch mode if applicable */ + if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) { + syncpt = iseqno + exp_pkts - 1; + if (!tipc_link_is_up(l)) { + tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); + __tipc_node_link_up(n, bearer_id, xmitq); + } + if (n->state == SELF_UP_PEER_UP) { + n->sync_point = syncpt; + tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT); + tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT); + } + if (less(syncpt, n->sync_point)) + n->sync_point = syncpt; + } + + /* Open tunnel link when parallel link reaches synch point */ + if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) { + if (pl) + dlv_nxt = mod(pl->rcv_nxt - skb_queue_len(pl->inputq)); + if (!pl || more(dlv_nxt, n->sync_point)) { + tipc_link_fsm_evt(l, LINK_SYNCH_END_EVT); + tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT); + return true; + } + if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) + return true; + if (usr == LINK_PROTOCOL) + return true; + return false; + } + return true; +} + +/** + * tipc_rcv - process TIPC packets/messages arriving from off-node + * @net: the applicable net namespace + * @skb: TIPC packet + * @bearer: pointer to bearer message arrived on + * + * Invoked with no locks held. Bearer pointer must point to a valid bearer + * structure (i.e. cannot be NULL), but bearer can be inactive. + */ +void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) +{ + struct sk_buff_head xmitq; + struct tipc_node *n; + struct tipc_msg *hdr = buf_msg(skb); + int usr = msg_user(hdr); + int bearer_id = b->identity; + struct tipc_link_entry *le; + int rc = 0; + + __skb_queue_head_init(&xmitq); + + /* Ensure message is well-formed */ + if (unlikely(!tipc_msg_validate(skb))) + goto discard; + + /* Handle arrival of a non-unicast link packet */ + if (unlikely(msg_non_seq(hdr))) { + if (usr == LINK_CONFIG) + tipc_disc_rcv(net, skb, b); + else + tipc_bclink_rcv(net, skb); + return; + } + + /* Locate neighboring node that sent packet */ + n = tipc_node_find(net, msg_prevnode(hdr)); + if (unlikely(!n)) + goto discard; + le = &n->links[bearer_id]; + + tipc_node_lock(n); + + /* Is reception permitted at the moment ? */ + if (!tipc_node_filter_pkt(n, hdr)) + goto unlock; + + if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) + tipc_bclink_sync_state(n, hdr); + + /* Release acked broadcast packets */ + if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) + tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); + + /* Check and if necessary update node state */ + if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { + rc = tipc_link_rcv(le->link, skb, &xmitq); + skb = NULL; + } +unlock: + tipc_node_unlock(n); + + if (unlikely(rc & TIPC_LINK_UP_EVT)) + tipc_node_link_up(n, bearer_id, &xmitq); + + if (unlikely(rc & TIPC_LINK_DOWN_EVT)) + tipc_node_link_down(n, bearer_id, false); + + if (unlikely(!skb_queue_empty(&n->bclink.namedq))) + tipc_named_rcv(net, &n->bclink.namedq); + + if (!skb_queue_empty(&le->inputq)) + tipc_sk_rcv(net, &le->inputq); + + if (!skb_queue_empty(&xmitq)) + tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); + + tipc_node_put(n); +discard: + kfree_skb(skb); +} + int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb) { int err; diff --git a/net/tipc/node.h b/net/tipc/node.h index 5a834cf142c8..344b3e7594fd 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -45,23 +45,19 @@ /* Out-of-range value for node signature */ #define INVALID_NODE_SIG 0x10000 +#define INVALID_BEARER_ID -1 + /* Flags used to take different actions according to flag type - * TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down - * TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down * TIPC_NOTIFY_NODE_DOWN: notify node is down * TIPC_NOTIFY_NODE_UP: notify node is up * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type */ enum { - TIPC_MSG_EVT = 1, - TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1), - TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2), TIPC_NOTIFY_NODE_DOWN = (1 << 3), TIPC_NOTIFY_NODE_UP = (1 << 4), TIPC_WAKEUP_BCAST_USERS = (1 << 5), TIPC_NOTIFY_LINK_UP = (1 << 6), TIPC_NOTIFY_LINK_DOWN = (1 << 7), - TIPC_NAMED_MSG_EVT = (1 << 8), TIPC_BCAST_MSG_EVT = (1 << 9), TIPC_BCAST_RESET = (1 << 10) }; @@ -85,10 +81,17 @@ struct tipc_node_bclink { u32 deferred_size; struct sk_buff_head deferdq; struct sk_buff *reasm_buf; - int inputq_map; + struct sk_buff_head namedq; bool recv_permitted; }; +struct tipc_link_entry { + struct tipc_link *link; + u32 mtu; + struct sk_buff_head inputq; + struct tipc_media_addr maddr; +}; + /** * struct tipc_node - TIPC node structure * @addr: network address of node @@ -98,11 +101,12 @@ struct tipc_node_bclink { * @hash: links to adjacent nodes in unsorted hash chain * @inputq: pointer to input queue containing messages for msg event * @namedq: pointer to name table input queue with name table messages - * @curr_link: the link holding the node lock, if any - * @active_links: pointers to active links to node - * @links: pointers to all links to node + * @active_links: bearer ids of active links, used as index into links[] array + * @links: array containing references to all links to node * @action_flags: bit mask of different types of node actions * @bclink: broadcast-related info + * @state: connectivity state vs peer node + * @sync_point: sequence number where synch/failover is finished * @list: links to adjacent nodes in sorted list of cluster's nodes * @working_links: number of working links to node (both active and standby) * @link_cnt: number of links to node @@ -118,14 +122,13 @@ struct tipc_node { spinlock_t lock; struct net *net; struct hlist_node hash; - struct sk_buff_head *inputq; - struct sk_buff_head *namedq; - struct tipc_link *active_links[2]; - u32 act_mtus[2]; - struct tipc_link *links[MAX_BEARERS]; + int active_links[2]; + struct tipc_link_entry links[MAX_BEARERS]; int action_flags; struct tipc_node_bclink bclink; struct list_head list; + int state; + u16 sync_point; int link_cnt; u16 working_links; u16 capabilities; @@ -133,25 +136,32 @@ struct tipc_node { u32 link_id; struct list_head publ_list; struct list_head conn_sks; + unsigned long keepalive_intv; + struct timer_list timer; struct rcu_head rcu; }; struct tipc_node *tipc_node_find(struct net *net, u32 addr); void tipc_node_put(struct tipc_node *node); -struct tipc_node *tipc_node_create(struct net *net, u32 addr); void tipc_node_stop(struct net *net); +void tipc_node_check_dest(struct net *net, u32 onode, + struct tipc_bearer *bearer, + u16 capabilities, u32 signature, + struct tipc_media_addr *maddr, + bool *respond, bool *dupl_addr); +void tipc_node_delete_links(struct net *net, int bearer_id); void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr); void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr); -void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr); -void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr); -int tipc_node_active_links(struct tipc_node *n_ptr); -int tipc_node_is_up(struct tipc_node *n_ptr); +bool tipc_node_is_up(struct tipc_node *n); int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node, char *linkname, size_t len); void tipc_node_unlock(struct tipc_node *node); +int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, + int selector); +int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, + u32 selector); int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); - int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb); static inline void tipc_node_lock(struct tipc_node *node) @@ -159,26 +169,30 @@ static inline void tipc_node_lock(struct tipc_node *node) spin_lock_bh(&node->lock); } -static inline bool tipc_node_blocked(struct tipc_node *node) +static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel) { - return (node->action_flags & (TIPC_WAIT_PEER_LINKS_DOWN | - TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN)); + int bearer_id = n->active_links[sel & 1]; + + if (unlikely(bearer_id == INVALID_BEARER_ID)) + return NULL; + + return n->links[bearer_id].link; } -static inline uint tipc_node_get_mtu(struct net *net, u32 addr, u32 selector) +static inline unsigned int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel) { - struct tipc_node *node; - u32 mtu; - - node = tipc_node_find(net, addr); + struct tipc_node *n; + int bearer_id; + unsigned int mtu = MAX_MSG_SIZE; - if (likely(node)) { - mtu = node->act_mtus[selector & 1]; - tipc_node_put(node); - } else { - mtu = MAX_MSG_SIZE; - } + n = tipc_node_find(net, addr); + if (unlikely(!n)) + return mtu; + bearer_id = n->active_links[sel & 1]; + if (likely(bearer_id != INVALID_BEARER_ID)) + mtu = n->links[bearer_id].mtu; + tipc_node_put(n); return mtu; } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 3a7567f690f3..1060d52ff23e 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -248,6 +248,22 @@ static void tsk_advance_rx_queue(struct sock *sk) kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); } +/* tipc_sk_respond() : send response message back to sender + */ +static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err) +{ + u32 selector; + u32 dnode; + u32 onode = tipc_own_addr(sock_net(sk)); + + if (!tipc_msg_reverse(onode, &skb, err)) + return; + + dnode = msg_destnode(buf_msg(skb)); + selector = msg_origport(buf_msg(skb)); + tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); +} + /** * tsk_rej_rx_queue - reject all buffers in socket receive queue * @@ -256,13 +272,9 @@ static void tsk_advance_rx_queue(struct sock *sk) static void tsk_rej_rx_queue(struct sock *sk) { struct sk_buff *skb; - u32 dnode; - u32 own_node = tsk_own_node(tipc_sk(sk)); - while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { - if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT)) - tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0); - } + while ((skb = __skb_dequeue(&sk->sk_receive_queue))) + tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT); } /* tsk_peer_msg - verify if message was sent by connected port's peer @@ -441,9 +453,7 @@ static int tipc_release(struct socket *sock) tsk->connected = 0; tipc_node_remove_conn(net, dnode, tsk->portid); } - if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, - TIPC_ERR_NO_PORT)) - tipc_link_xmit_skb(net, skb, dnode, 0); + tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT); } } @@ -456,7 +466,7 @@ static int tipc_release(struct socket *sock) tsk_own_node(tsk), tsk_peer_port(tsk), tsk->portid, TIPC_ERR_NO_PORT); if (skb) - tipc_link_xmit_skb(net, skb, dnode, tsk->portid); + tipc_node_xmit_skb(net, skb, dnode, tsk->portid); tipc_node_remove_conn(net, dnode, tsk->portid); } @@ -686,21 +696,22 @@ new_mtu: do { rc = tipc_bclink_xmit(net, pktchain); - if (likely(rc >= 0)) { - rc = dsz; - break; + if (likely(!rc)) + return dsz; + + if (rc == -ELINKCONG) { + tsk->link_cong = 1; + rc = tipc_wait_for_sndmsg(sock, &timeo); + if (!rc) + continue; } + __skb_queue_purge(pktchain); if (rc == -EMSGSIZE) { msg->msg_iter = save; goto new_mtu; } - if (rc != -ELINKCONG) - break; - tipc_sk(sk)->link_cong = 1; - rc = tipc_wait_for_sndmsg(sock, &timeo); - if (rc) - __skb_queue_purge(pktchain); - } while (!rc); + break; + } while (1); return rc; } @@ -763,35 +774,35 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, /** * tipc_sk_proto_rcv - receive a connection mng protocol message * @tsk: receiving socket - * @skb: pointer to message buffer. Set to NULL if buffer is consumed. + * @skb: pointer to message buffer. */ -static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb) +static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb) { - struct tipc_msg *msg = buf_msg(*skb); + struct sock *sk = &tsk->sk; + struct tipc_msg *hdr = buf_msg(skb); + int mtyp = msg_type(hdr); int conn_cong; - u32 dnode; - u32 own_node = tsk_own_node(tsk); + /* Ignore if connection cannot be validated: */ - if (!tsk_peer_msg(tsk, msg)) + if (!tsk_peer_msg(tsk, hdr)) goto exit; tsk->probing_state = TIPC_CONN_OK; - if (msg_type(msg) == CONN_ACK) { + if (mtyp == CONN_PROBE) { + msg_set_type(hdr, CONN_PROBE_REPLY); + tipc_sk_respond(sk, skb, TIPC_OK); + return; + } else if (mtyp == CONN_ACK) { conn_cong = tsk_conn_cong(tsk); - tsk->sent_unacked -= msg_msgcnt(msg); + tsk->sent_unacked -= msg_msgcnt(hdr); if (conn_cong) - tsk->sk.sk_write_space(&tsk->sk); - } else if (msg_type(msg) == CONN_PROBE) { - if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) { - msg_set_type(msg, CONN_PROBE_REPLY); - return; - } + sk->sk_write_space(sk); + } else if (mtyp != CONN_PROBE_REPLY) { + pr_warn("Received unknown CONN_PROTO msg\n"); } - /* Do nothing if msg_type() == CONN_PROBE_REPLY */ exit: - kfree_skb(*skb); - *skb = NULL; + kfree_skb(skb); } static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) @@ -924,24 +935,25 @@ new_mtu: do { skb = skb_peek(pktchain); TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; - rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid); - if (likely(rc >= 0)) { + rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid); + if (likely(!rc)) { if (sock->state != SS_READY) sock->state = SS_CONNECTING; - rc = dsz; - break; + return dsz; } + if (rc == -ELINKCONG) { + tsk->link_cong = 1; + rc = tipc_wait_for_sndmsg(sock, &timeo); + if (!rc) + continue; + } + __skb_queue_purge(pktchain); if (rc == -EMSGSIZE) { m->msg_iter = save; goto new_mtu; } - if (rc != -ELINKCONG) - break; - tsk->link_cong = 1; - rc = tipc_wait_for_sndmsg(sock, &timeo); - if (rc) - __skb_queue_purge(pktchain); - } while (!rc); + break; + } while (1); return rc; } @@ -1043,15 +1055,16 @@ next: return rc; do { if (likely(!tsk_conn_cong(tsk))) { - rc = tipc_link_xmit(net, pktchain, dnode, portid); + rc = tipc_node_xmit(net, pktchain, dnode, portid); if (likely(!rc)) { tsk->sent_unacked++; sent += send; if (sent == dsz) - break; + return dsz; goto next; } if (rc == -EMSGSIZE) { + __skb_queue_purge(pktchain); tsk->max_pkt = tipc_node_get_mtu(net, dnode, portid); m->msg_iter = save; @@ -1059,13 +1072,13 @@ next: } if (rc != -ELINKCONG) break; + tsk->link_cong = 1; } rc = tipc_wait_for_sndpkt(sock, &timeo); - if (rc) - __skb_queue_purge(pktchain); } while (!rc); + __skb_queue_purge(pktchain); return sent ? sent : rc; } @@ -1221,7 +1234,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) return; msg = buf_msg(skb); msg_set_msgcnt(msg, ack); - tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg)); + tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg)); } static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) @@ -1507,82 +1520,81 @@ static void tipc_data_ready(struct sock *sk) * @tsk: TIPC socket * @skb: pointer to message buffer. Set to NULL if buffer is consumed * - * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise + * Returns true if everything ok, false otherwise */ -static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb) +static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) { struct sock *sk = &tsk->sk; struct net *net = sock_net(sk); struct socket *sock = sk->sk_socket; - struct tipc_msg *msg = buf_msg(*skb); - int retval = -TIPC_ERR_NO_PORT; + struct tipc_msg *hdr = buf_msg(skb); - if (msg_mcast(msg)) - return retval; + if (unlikely(msg_mcast(hdr))) + return false; switch ((int)sock->state) { case SS_CONNECTED: + /* Accept only connection-based messages sent by peer */ - if (tsk_peer_msg(tsk, msg)) { - if (unlikely(msg_errcode(msg))) { - sock->state = SS_DISCONNECTING; - tsk->connected = 0; - /* let timer expire on it's own */ - tipc_node_remove_conn(net, tsk_peer_node(tsk), - tsk->portid); - } - retval = TIPC_OK; + if (unlikely(!tsk_peer_msg(tsk, hdr))) + return false; + + if (unlikely(msg_errcode(hdr))) { + sock->state = SS_DISCONNECTING; + tsk->connected = 0; + /* Let timer expire on it's own */ + tipc_node_remove_conn(net, tsk_peer_node(tsk), + tsk->portid); } - break; + return true; + case SS_CONNECTING: - /* Accept only ACK or NACK message */ - if (unlikely(!msg_connected(msg))) - break; + /* Accept only ACK or NACK message */ + if (unlikely(!msg_connected(hdr))) + return false; - if (unlikely(msg_errcode(msg))) { + if (unlikely(msg_errcode(hdr))) { sock->state = SS_DISCONNECTING; sk->sk_err = ECONNREFUSED; - retval = TIPC_OK; - break; + return true; } - if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) { + if (unlikely(!msg_isdata(hdr))) { sock->state = SS_DISCONNECTING; sk->sk_err = EINVAL; - retval = TIPC_OK; - break; + return true; } - tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg)); - msg_set_importance(&tsk->phdr, msg_importance(msg)); + tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr)); + msg_set_importance(&tsk->phdr, msg_importance(hdr)); sock->state = SS_CONNECTED; - /* If an incoming message is an 'ACK-', it should be - * discarded here because it doesn't contain useful - * data. In addition, we should try to wake up - * connect() routine if sleeping. - */ - if (msg_data_sz(msg) == 0) { - kfree_skb(*skb); - *skb = NULL; - if (waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); - } - retval = TIPC_OK; - break; + /* If 'ACK+' message, add to socket receive queue */ + if (msg_data_sz(hdr)) + return true; + + /* If empty 'ACK-' message, wake up sleeping connect() */ + if (waitqueue_active(sk_sleep(sk))) + wake_up_interruptible(sk_sleep(sk)); + + /* 'ACK-' message is neither accepted nor rejected: */ + msg_set_dest_droppable(hdr, 1); + return false; + case SS_LISTENING: case SS_UNCONNECTED: + /* Accept only SYN message */ - if (!msg_connected(msg) && !(msg_errcode(msg))) - retval = TIPC_OK; + if (!msg_connected(hdr) && !(msg_errcode(hdr))) + return true; break; case SS_DISCONNECTING: break; default: pr_err("Unknown socket state %u\n", sock->state); } - return retval; + return false; } /** @@ -1617,61 +1629,70 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) /** * filter_rcv - validate incoming message * @sk: socket - * @skb: pointer to message. Set to NULL if buffer is consumed. + * @skb: pointer to message. * * Enqueues message on receive queue if acceptable; optionally handles * disconnect indication for a connected socket. * * Called with socket lock already taken * - * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected + * Returns true if message was added to socket receive queue, otherwise false */ -static int filter_rcv(struct sock *sk, struct sk_buff **skb) +static bool filter_rcv(struct sock *sk, struct sk_buff *skb) { struct socket *sock = sk->sk_socket; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_msg *msg = buf_msg(*skb); - unsigned int limit = rcvbuf_limit(sk, *skb); - int rc = TIPC_OK; + struct tipc_msg *hdr = buf_msg(skb); + unsigned int limit = rcvbuf_limit(sk, skb); + int err = TIPC_OK; + int usr = msg_user(hdr); - if (unlikely(msg_user(msg) == CONN_MANAGER)) { + if (unlikely(msg_user(hdr) == CONN_MANAGER)) { tipc_sk_proto_rcv(tsk, skb); - return TIPC_OK; + return false; } - if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { - kfree_skb(*skb); + if (unlikely(usr == SOCK_WAKEUP)) { + kfree_skb(skb); tsk->link_cong = 0; sk->sk_write_space(sk); - *skb = NULL; - return TIPC_OK; + return false; } - /* Reject message if it is wrong sort of message for socket */ - if (msg_type(msg) > TIPC_DIRECT_MSG) - return -TIPC_ERR_NO_PORT; + /* Drop if illegal message type */ + if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) { + kfree_skb(skb); + return false; + } - if (sock->state == SS_READY) { - if (msg_connected(msg)) - return -TIPC_ERR_NO_PORT; - } else { - rc = filter_connect(tsk, skb); - if (rc != TIPC_OK || !*skb) - return rc; + /* Reject if wrong message type for current socket state */ + if (unlikely(sock->state == SS_READY)) { + if (msg_connected(hdr)) { + err = TIPC_ERR_NO_PORT; + goto reject; + } + } else if (unlikely(!filter_connect(tsk, skb))) { + err = TIPC_ERR_NO_PORT; + goto reject; } /* Reject message if there isn't room to queue it */ - if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit) - return -TIPC_ERR_OVERLOAD; + if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) { + err = TIPC_ERR_OVERLOAD; + goto reject; + } /* Enqueue message */ - TIPC_SKB_CB(*skb)->handle = NULL; - __skb_queue_tail(&sk->sk_receive_queue, *skb); - skb_set_owner_r(*skb, sk); + TIPC_SKB_CB(skb)->handle = NULL; + __skb_queue_tail(&sk->sk_receive_queue, skb); + skb_set_owner_r(skb, sk); sk->sk_data_ready(sk); - *skb = NULL; - return TIPC_OK; + return true; + +reject: + tipc_sk_respond(sk, skb, err); + return false; } /** @@ -1685,22 +1706,10 @@ static int filter_rcv(struct sock *sk, struct sk_buff **skb) */ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) { - int err; - atomic_t *dcnt; - u32 dnode; - struct tipc_sock *tsk = tipc_sk(sk); - struct net *net = sock_net(sk); - uint truesize = skb->truesize; + unsigned int truesize = skb->truesize; - err = filter_rcv(sk, &skb); - if (likely(!skb)) { - dcnt = &tsk->dupl_rcvcnt; - if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT) - atomic_add(truesize, dcnt); - return 0; - } - if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err)) - tipc_link_xmit_skb(net, skb, dnode, tsk->portid); + if (likely(filter_rcv(sk, skb))) + atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); return 0; } @@ -1710,45 +1719,43 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) * @inputq: list of incoming buffers with potentially different destinations * @sk: socket where the buffers should be enqueued * @dport: port number for the socket - * @_skb: returned buffer to be forwarded or rejected, if applicable * * Caller must hold socket lock - * - * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD - * or -TIPC_ERR_NO_PORT */ -static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, - u32 dport, struct sk_buff **_skb) +static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, + u32 dport) { unsigned int lim; atomic_t *dcnt; - int err; struct sk_buff *skb; unsigned long time_limit = jiffies + 2; while (skb_queue_len(inputq)) { if (unlikely(time_after_eq(jiffies, time_limit))) - return TIPC_OK; + return; + skb = tipc_skb_dequeue(inputq, dport); if (unlikely(!skb)) - return TIPC_OK; + return; + + /* Add message directly to receive queue if possible */ if (!sock_owned_by_user(sk)) { - err = filter_rcv(sk, &skb); - if (likely(!skb)) - continue; - *_skb = skb; - return err; + filter_rcv(sk, skb); + continue; } + + /* Try backlog, compensating for double-counted bytes */ dcnt = &tipc_sk(sk)->dupl_rcvcnt; if (sk->sk_backlog.len) atomic_set(dcnt, 0); lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); if (likely(!sk_add_backlog(sk, skb, lim))) continue; - *_skb = skb; - return -TIPC_ERR_OVERLOAD; + + /* Overload => reject message back to sender */ + tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD); + break; } - return TIPC_OK; } /** @@ -1756,49 +1763,46 @@ static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, * @inputq: buffer list containing the buffers * Consumes all buffers in list until inputq is empty * Note: may be called in multiple threads referring to the same queue - * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH - * Only node local calls check the return value, sending single-buffer queues */ -int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) +void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) { u32 dnode, dport = 0; int err; - struct sk_buff *skb; struct tipc_sock *tsk; - struct tipc_net *tn; struct sock *sk; + struct sk_buff *skb; while (skb_queue_len(inputq)) { - err = -TIPC_ERR_NO_PORT; - skb = NULL; dport = tipc_skb_peek_port(inputq, dport); tsk = tipc_sk_lookup(net, dport); + if (likely(tsk)) { sk = &tsk->sk; if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { - err = tipc_sk_enqueue(inputq, sk, dport, &skb); + tipc_sk_enqueue(inputq, sk, dport); spin_unlock_bh(&sk->sk_lock.slock); - dport = 0; } sock_put(sk); - } else { - skb = tipc_skb_dequeue(inputq, dport); - } - if (likely(!skb)) continue; - if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) - goto xmit; - if (!err) { - dnode = msg_destnode(buf_msg(skb)); - goto xmit; } - tn = net_generic(net, tipc_net_id); - if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) + + /* No destination socket => dequeue skb if still there */ + skb = tipc_skb_dequeue(inputq, dport); + if (!skb) + return; + + /* Try secondary lookup if unresolved named message */ + err = TIPC_ERR_NO_PORT; + if (tipc_msg_lookup_dest(net, skb, &err)) + goto xmit; + + /* Prepare for message rejection */ + if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err)) continue; xmit: - tipc_link_xmit_skb(net, skb, dnode, dport); + dnode = msg_destnode(buf_msg(skb)); + tipc_node_xmit_skb(net, skb, dnode, dport); } - return err ? -EHOSTUNREACH : 0; } static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) @@ -2067,7 +2071,10 @@ static int tipc_shutdown(struct socket *sock, int how) struct net *net = sock_net(sk); struct tipc_sock *tsk = tipc_sk(sk); struct sk_buff *skb; - u32 dnode; + u32 dnode = tsk_peer_node(tsk); + u32 dport = tsk_peer_port(tsk); + u32 onode = tipc_own_addr(net); + u32 oport = tsk->portid; int res; if (how != SHUT_RDWR) @@ -2080,6 +2087,8 @@ static int tipc_shutdown(struct socket *sock, int how) case SS_CONNECTED: restart: + dnode = tsk_peer_node(tsk); + /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ skb = __skb_dequeue(&sk->sk_receive_queue); if (skb) { @@ -2087,19 +2096,13 @@ restart: kfree_skb(skb); goto restart; } - if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, - TIPC_CONN_SHUTDOWN)) - tipc_link_xmit_skb(net, skb, dnode, - tsk->portid); + tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN); } else { - dnode = tsk_peer_node(tsk); - skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, - 0, dnode, tsk_own_node(tsk), - tsk_peer_port(tsk), - tsk->portid, TIPC_CONN_SHUTDOWN); - tipc_link_xmit_skb(net, skb, dnode, tsk->portid); + 0, dnode, onode, dport, oport, + TIPC_CONN_SHUTDOWN); + tipc_node_xmit_skb(net, skb, dnode, tsk->portid); } tsk->connected = 0; sock->state = SS_DISCONNECTING; @@ -2161,7 +2164,7 @@ static void tipc_sk_timeout(unsigned long data) } bh_unlock_sock(sk); if (skb) - tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid); + tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid); exit: sock_put(sk); } diff --git a/net/tipc/socket.h b/net/tipc/socket.h index bf6551389522..4241f22069dc 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -44,7 +44,7 @@ SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) int tipc_socket_init(void); void tipc_socket_stop(void); -int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); +void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, struct sk_buff_head *inputq); void tipc_sk_reinit(struct net *net); diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index 66deebc66aa1..c170d3138953 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -194,7 +194,8 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb, .saddr = src->ipv6, .flowi6_proto = IPPROTO_UDP }; - err = ipv6_stub->ipv6_dst_lookup(ub->ubsock->sk, &ndst, &fl6); + err = ipv6_stub->ipv6_dst_lookup(net, ub->ubsock->sk, &ndst, + &fl6); if (err) goto tx_error; ttl = ip6_dst_hoplimit(ndst); |