summaryrefslogtreecommitdiff
path: root/net/tipc/group.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2018-02-01 01:31:10 +0300
committerLinus Torvalds <torvalds@linux-foundation.org>2018-02-01 01:31:10 +0300
commitb2fe5fa68642860e7de76167c3111623aa0d5de1 (patch)
treeb7f9b89b7039ecefbc35fe3c8e73a6ff972641dd /net/tipc/group.c
parenta103950e0dd2058df5e8a8d4a915707bdcf205f0 (diff)
parenta54667f6728c2714a400f3c884727da74b6d1717 (diff)
downloadlinux-b2fe5fa68642860e7de76167c3111623aa0d5de1.tar.xz
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next
Pull networking updates from David Miller: 1) Significantly shrink the core networking routing structures. Result of http://vger.kernel.org/~davem/seoul2017_netdev_keynote.pdf 2) Add netdevsim driver for testing various offloads, from Jakub Kicinski. 3) Support cross-chip FDB operations in DSA, from Vivien Didelot. 4) Add a 2nd listener hash table for TCP, similar to what was done for UDP. From Martin KaFai Lau. 5) Add eBPF based queue selection to tun, from Jason Wang. 6) Lockless qdisc support, from John Fastabend. 7) SCTP stream interleave support, from Xin Long. 8) Smoother TCP receive autotuning, from Eric Dumazet. 9) Lots of erspan tunneling enhancements, from William Tu. 10) Add true function call support to BPF, from Alexei Starovoitov. 11) Add explicit support for GRO HW offloading, from Michael Chan. 12) Support extack generation in more netlink subsystems. From Alexander Aring, Quentin Monnet, and Jakub Kicinski. 13) Add 1000BaseX, flow control, and EEE support to mvneta driver. From Russell King. 14) Add flow table abstraction to netfilter, from Pablo Neira Ayuso. 15) Many improvements and simplifications to the NFP driver bpf JIT, from Jakub Kicinski. 16) Support for ipv6 non-equal cost multipath routing, from Ido Schimmel. 17) Add resource abstration to devlink, from Arkadi Sharshevsky. 18) Packet scheduler classifier shared filter block support, from Jiri Pirko. 19) Avoid locking in act_csum, from Davide Caratti. 20) devinet_ioctl() simplifications from Al viro. 21) More TCP bpf improvements from Lawrence Brakmo. 22) Add support for onlink ipv6 route flag, similar to ipv4, from David Ahern. * git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next: (1925 commits) tls: Add support for encryption using async offload accelerator ip6mr: fix stale iterator net/sched: kconfig: Remove blank help texts openvswitch: meter: Use 64-bit arithmetic instead of 32-bit tcp_nv: fix potential integer overflow in tcpnv_acked r8169: fix RTL8168EP take too long to complete driver initialization. qmi_wwan: Add support for Quectel EP06 rtnetlink: enable IFLA_IF_NETNSID for RTM_NEWLINK ipmr: Fix ptrdiff_t print formatting ibmvnic: Wait for device response when changing MAC qlcnic: fix deadlock bug tcp: release sk_frag.page in tcp_disconnect ipv4: Get the address of interface correctly. net_sched: gen_estimator: fix lockdep splat net: macb: Handle HRESP error net/mlx5e: IPoIB, Fix copy-paste bug in flow steering refactoring ipv6: addrconf: break critical section in addrconf_verify_rtnl() ipv6: change route cache aging logic i40e/i40evf: Update DESC_NEEDED value to reflect larger value bnxt_en: cleanup DIM work on device shutdown ...
Diffstat (limited to 'net/tipc/group.c')
-rw-r--r--net/tipc/group.c371
1 files changed, 197 insertions, 174 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 5f4ffae807ee..122162a31816 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -49,8 +49,6 @@
#define ADV_ACTIVE (ADV_UNIT * 12)
enum mbr_state {
- MBR_QUARANTINED,
- MBR_DISCOVERED,
MBR_JOINING,
MBR_PUBLISHED,
MBR_JOINED,
@@ -64,8 +62,7 @@ enum mbr_state {
struct tipc_member {
struct rb_node tree_node;
struct list_head list;
- struct list_head congested;
- struct sk_buff *event_msg;
+ struct list_head small_win;
struct sk_buff_head deferredq;
struct tipc_group *group;
u32 node;
@@ -77,21 +74,18 @@ struct tipc_member {
u16 bc_rcv_nxt;
u16 bc_syncpt;
u16 bc_acked;
- bool usr_pending;
};
struct tipc_group {
struct rb_root members;
- struct list_head congested;
+ struct list_head small_win;
struct list_head pending;
struct list_head active;
- struct list_head reclaiming;
struct tipc_nlist dests;
struct net *net;
int subid;
u32 type;
u32 instance;
- u32 domain;
u32 scope;
u32 portid;
u16 member_cnt;
@@ -99,6 +93,7 @@ struct tipc_group {
u16 max_active;
u16 bc_snd_nxt;
u16 bc_ackers;
+ bool *open;
bool loopback;
bool events;
};
@@ -106,6 +101,16 @@ struct tipc_group {
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
int mtyp, struct sk_buff_head *xmitq);
+static void tipc_group_open(struct tipc_member *m, bool *wakeup)
+{
+ *wakeup = false;
+ if (list_empty(&m->small_win))
+ return;
+ list_del_init(&m->small_win);
+ *m->group->open = true;
+ *wakeup = true;
+}
+
static void tipc_group_decr_active(struct tipc_group *grp,
struct tipc_member *m)
{
@@ -137,14 +142,14 @@ u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
return grp->bc_snd_nxt;
}
-static bool tipc_group_is_enabled(struct tipc_member *m)
+static bool tipc_group_is_receiver(struct tipc_member *m)
{
- return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
+ return m && m->state != MBR_JOINING && m->state != MBR_LEAVING;
}
-static bool tipc_group_is_receiver(struct tipc_member *m)
+static bool tipc_group_is_sender(struct tipc_member *m)
{
- return m && m->state >= MBR_JOINED;
+ return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED;
}
u32 tipc_group_exclude(struct tipc_group *grp)
@@ -160,8 +165,11 @@ int tipc_group_size(struct tipc_group *grp)
}
struct tipc_group *tipc_group_create(struct net *net, u32 portid,
- struct tipc_group_req *mreq)
+ struct tipc_group_req *mreq,
+ bool *group_is_open)
{
+ u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS;
+ bool global = mreq->scope != TIPC_NODE_SCOPE;
struct tipc_group *grp;
u32 type = mreq->type;
@@ -169,25 +177,41 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
if (!grp)
return NULL;
tipc_nlist_init(&grp->dests, tipc_own_addr(net));
- INIT_LIST_HEAD(&grp->congested);
+ INIT_LIST_HEAD(&grp->small_win);
INIT_LIST_HEAD(&grp->active);
INIT_LIST_HEAD(&grp->pending);
- INIT_LIST_HEAD(&grp->reclaiming);
grp->members = RB_ROOT;
grp->net = net;
grp->portid = portid;
- grp->domain = addr_domain(net, mreq->scope);
grp->type = type;
grp->instance = mreq->instance;
grp->scope = mreq->scope;
grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
- if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
+ grp->open = group_is_open;
+ filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE;
+ if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0,
+ filter, &grp->subid))
return grp;
kfree(grp);
return NULL;
}
+void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
+{
+ struct rb_root *tree = &grp->members;
+ struct tipc_member *m, *tmp;
+ struct sk_buff_head xmitq;
+
+ skb_queue_head_init(&xmitq);
+ rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
+ tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
+ tipc_group_update_member(m, 0);
+ }
+ tipc_node_distr_xmit(net, &xmitq);
+ *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
+}
+
void tipc_group_delete(struct net *net, struct tipc_group *grp)
{
struct rb_root *tree = &grp->members;
@@ -233,7 +257,7 @@ static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
struct tipc_member *m;
m = tipc_group_find_member(grp, node, port);
- if (m && tipc_group_is_enabled(m))
+ if (m && tipc_group_is_receiver(m))
return m;
return NULL;
}
@@ -278,7 +302,7 @@ static void tipc_group_add_to_tree(struct tipc_group *grp,
static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
u32 node, u32 port,
- int state)
+ u32 instance, int state)
{
struct tipc_member *m;
@@ -286,11 +310,12 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
if (!m)
return NULL;
INIT_LIST_HEAD(&m->list);
- INIT_LIST_HEAD(&m->congested);
+ INIT_LIST_HEAD(&m->small_win);
__skb_queue_head_init(&m->deferredq);
m->group = grp;
m->node = node;
m->port = port;
+ m->instance = instance;
m->bc_acked = grp->bc_snd_nxt - 1;
grp->member_cnt++;
tipc_group_add_to_tree(grp, m);
@@ -299,9 +324,10 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
return m;
}
-void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
+void tipc_group_add_member(struct tipc_group *grp, u32 node,
+ u32 port, u32 instance)
{
- tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
+ tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED);
}
static void tipc_group_delete_member(struct tipc_group *grp,
@@ -315,7 +341,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,
grp->bc_ackers--;
list_del_init(&m->list);
- list_del_init(&m->congested);
+ list_del_init(&m->small_win);
tipc_group_decr_active(grp, m);
/* If last member on a node, remove node from dest list */
@@ -344,7 +370,7 @@ void tipc_group_update_member(struct tipc_member *m, int len)
struct tipc_group *grp = m->group;
struct tipc_member *_m, *tmp;
- if (!tipc_group_is_enabled(m))
+ if (!tipc_group_is_receiver(m))
return;
m->window -= len;
@@ -352,16 +378,14 @@ void tipc_group_update_member(struct tipc_member *m, int len)
if (m->window >= ADV_IDLE)
return;
- list_del_init(&m->congested);
+ list_del_init(&m->small_win);
- /* Sort member into congested members' list */
- list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
- if (m->window > _m->window)
- continue;
- list_add_tail(&m->congested, &_m->congested);
- return;
+ /* Sort member into small_window members' list */
+ list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) {
+ if (_m->window > m->window)
+ break;
}
- list_add_tail(&m->congested, &grp->congested);
+ list_add_tail(&m->small_win, &_m->small_win);
}
void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
@@ -373,7 +397,7 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
for (n = rb_first(&grp->members); n; n = rb_next(n)) {
m = container_of(n, struct tipc_member, tree_node);
- if (tipc_group_is_enabled(m)) {
+ if (tipc_group_is_receiver(m)) {
tipc_group_update_member(m, len);
m->bc_acked = prev;
ackers++;
@@ -394,20 +418,20 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
int adv, state;
m = tipc_group_find_dest(grp, dnode, dport);
- *mbr = m;
- if (!m)
+ if (!tipc_group_is_receiver(m)) {
+ *mbr = NULL;
return false;
- if (m->usr_pending)
- return true;
+ }
+ *mbr = m;
+
if (m->window >= len)
return false;
- m->usr_pending = true;
+
+ *grp->open = false;
/* If not fully advertised, do it now to prevent mutual blocking */
adv = m->advertised;
state = m->state;
- if (state < MBR_JOINED)
- return true;
if (state == MBR_JOINED && adv == ADV_IDLE)
return true;
if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
@@ -425,13 +449,14 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
struct tipc_member *m = NULL;
/* If prev bcast was replicast, reject until all receivers have acked */
- if (grp->bc_ackers)
+ if (grp->bc_ackers) {
+ *grp->open = false;
return true;
-
- if (list_empty(&grp->congested))
+ }
+ if (list_empty(&grp->small_win))
return false;
- m = list_first_entry(&grp->congested, struct tipc_member, congested);
+ m = list_first_entry(&grp->small_win, struct tipc_member, small_win);
if (m->window >= len)
return false;
@@ -486,7 +511,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
goto drop;
m = tipc_group_find_member(grp, node, port);
- if (!tipc_group_is_receiver(m))
+ if (!tipc_group_is_sender(m))
goto drop;
if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
@@ -573,24 +598,34 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
switch (m->state) {
case MBR_JOINED:
- /* Reclaim advertised space from least active member */
- if (!list_empty(active) && active_cnt >= reclaim_limit) {
+ /* First, decide if member can go active */
+ if (active_cnt <= max_active) {
+ m->state = MBR_ACTIVE;
+ list_add_tail(&m->list, active);
+ grp->active_cnt++;
+ tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+ } else {
+ m->state = MBR_PENDING;
+ list_add_tail(&m->list, &grp->pending);
+ }
+
+ if (active_cnt < reclaim_limit)
+ break;
+
+ /* Reclaim from oldest active member, if possible */
+ if (!list_empty(active)) {
rm = list_first_entry(active, struct tipc_member, list);
rm->state = MBR_RECLAIMING;
- list_move_tail(&rm->list, &grp->reclaiming);
+ list_del_init(&rm->list);
tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
- }
- /* If max active, become pending and wait for reclaimed space */
- if (active_cnt >= max_active) {
- m->state = MBR_PENDING;
- list_add_tail(&m->list, &grp->pending);
break;
}
- /* Otherwise become active */
- m->state = MBR_ACTIVE;
- list_add_tail(&m->list, &grp->active);
- grp->active_cnt++;
- /* Fall through */
+ /* Nobody to reclaim from; - revert oldest pending to JOINED */
+ pm = list_first_entry(&grp->pending, struct tipc_member, list);
+ list_del_init(&pm->list);
+ pm->state = MBR_JOINED;
+ tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
+ break;
case MBR_ACTIVE:
if (!list_is_last(&m->list, &grp->active))
list_move_tail(&m->list, &grp->active);
@@ -602,12 +637,12 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
if (m->advertised > ADV_IDLE)
break;
m->state = MBR_JOINED;
+ grp->active_cnt--;
if (m->advertised < ADV_IDLE) {
pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
}
- grp->active_cnt--;
- list_del_init(&m->list);
+
if (list_empty(&grp->pending))
return;
@@ -619,7 +654,6 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
break;
case MBR_RECLAIMING:
- case MBR_DISCOVERED:
case MBR_JOINING:
case MBR_LEAVING:
default:
@@ -627,6 +661,40 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
}
}
+static void tipc_group_create_event(struct tipc_group *grp,
+ struct tipc_member *m,
+ u32 event, u16 seqno,
+ struct sk_buff_head *inputq)
+{ u32 dnode = tipc_own_addr(grp->net);
+ struct tipc_event evt;
+ struct sk_buff *skb;
+ struct tipc_msg *hdr;
+
+ evt.event = event;
+ evt.found_lower = m->instance;
+ evt.found_upper = m->instance;
+ evt.port.ref = m->port;
+ evt.port.node = m->node;
+ evt.s.seq.type = grp->type;
+ evt.s.seq.lower = m->instance;
+ evt.s.seq.upper = m->instance;
+
+ skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT,
+ GROUP_H_SIZE, sizeof(evt), dnode, m->node,
+ grp->portid, m->port, 0);
+ if (!skb)
+ return;
+
+ hdr = buf_msg(skb);
+ msg_set_nametype(hdr, grp->type);
+ msg_set_grp_evt(hdr, event);
+ msg_set_dest_droppable(hdr, true);
+ msg_set_grp_bc_seqno(hdr, seqno);
+ memcpy(msg_data(hdr), &evt, sizeof(evt));
+ TIPC_SKB_CB(skb)->orig_member = m->instance;
+ __skb_queue_tail(inputq, skb);
+}
+
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
int mtyp, struct sk_buff_head *xmitq)
{
@@ -672,83 +740,73 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
u32 node = msg_orignode(hdr);
u32 port = msg_origport(hdr);
struct tipc_member *m, *pm;
- struct tipc_msg *ehdr;
u16 remitted, in_flight;
if (!grp)
return;
+ if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net))
+ return;
+
m = tipc_group_find_member(grp, node, port);
switch (msg_type(hdr)) {
case GRP_JOIN_MSG:
if (!m)
m = tipc_group_create_member(grp, node, port,
- MBR_QUARANTINED);
+ 0, MBR_JOINING);
if (!m)
return;
m->bc_syncpt = msg_grp_bc_syncpt(hdr);
m->bc_rcv_nxt = m->bc_syncpt;
m->window += msg_adv_win(hdr);
- /* Wait until PUBLISH event is received */
- if (m->state == MBR_DISCOVERED) {
- m->state = MBR_JOINING;
- } else if (m->state == MBR_PUBLISHED) {
- m->state = MBR_JOINED;
- *usr_wakeup = true;
- m->usr_pending = false;
- tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
- ehdr = buf_msg(m->event_msg);
- msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
- __skb_queue_tail(inputq, m->event_msg);
- }
- list_del_init(&m->congested);
+ /* Wait until PUBLISH event is received if necessary */
+ if (m->state != MBR_PUBLISHED)
+ return;
+
+ /* Member can be taken into service */
+ m->state = MBR_JOINED;
+ tipc_group_open(m, usr_wakeup);
tipc_group_update_member(m, 0);
+ tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+ tipc_group_create_event(grp, m, TIPC_PUBLISHED,
+ m->bc_syncpt, inputq);
return;
case GRP_LEAVE_MSG:
if (!m)
return;
m->bc_syncpt = msg_grp_bc_syncpt(hdr);
list_del_init(&m->list);
- list_del_init(&m->congested);
- *usr_wakeup = true;
-
- /* Wait until WITHDRAW event is received */
- if (m->state != MBR_LEAVING) {
- tipc_group_decr_active(grp, m);
- m->state = MBR_LEAVING;
- return;
- }
- /* Otherwise deliver already received WITHDRAW event */
- ehdr = buf_msg(m->event_msg);
- msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
- __skb_queue_tail(inputq, m->event_msg);
+ tipc_group_open(m, usr_wakeup);
+ tipc_group_decr_active(grp, m);
+ m->state = MBR_LEAVING;
+ tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
+ m->bc_syncpt, inputq);
return;
case GRP_ADV_MSG:
if (!m)
return;
m->window += msg_adv_win(hdr);
- *usr_wakeup = m->usr_pending;
- m->usr_pending = false;
- list_del_init(&m->congested);
+ tipc_group_open(m, usr_wakeup);
return;
case GRP_ACK_MSG:
if (!m)
return;
m->bc_acked = msg_grp_bc_acked(hdr);
if (--grp->bc_ackers)
- break;
+ return;
+ list_del_init(&m->small_win);
+ *m->group->open = true;
*usr_wakeup = true;
- m->usr_pending = false;
+ tipc_group_update_member(m, 0);
return;
case GRP_RECLAIM_MSG:
if (!m)
return;
- *usr_wakeup = m->usr_pending;
- m->usr_pending = false;
tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
m->window = ADV_IDLE;
+ tipc_group_open(m, usr_wakeup);
return;
case GRP_REMIT_MSG:
if (!m || m->state != MBR_RECLAIMING)
@@ -763,18 +821,14 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
m->advertised = ADV_IDLE + in_flight;
return;
}
- /* All messages preceding the REMIT have been read */
- if (m->advertised <= remitted) {
- m->state = MBR_JOINED;
- in_flight = 0;
- }
- /* ..and the REMIT overtaken by more messages => re-advertise */
+ /* This should never happen */
if (m->advertised < remitted)
- tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+ pr_warn_ratelimited("Unexpected REMIT msg\n");
- m->advertised = ADV_IDLE + in_flight;
+ /* All messages preceding the REMIT have been read */
+ m->state = MBR_JOINED;
grp->active_cnt--;
- list_del_init(&m->list);
+ m->advertised = ADV_IDLE;
/* Set oldest pending member to active and advertise */
if (list_empty(&grp->pending))
@@ -796,11 +850,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
void tipc_group_member_evt(struct tipc_group *grp,
bool *usr_wakeup,
int *sk_rcvbuf,
- struct sk_buff *skb,
+ struct tipc_msg *hdr,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
- struct tipc_msg *hdr = buf_msg(skb);
struct tipc_event *evt = (void *)msg_data(hdr);
u32 instance = evt->found_lower;
u32 node = evt->port.node;
@@ -808,89 +861,59 @@ void tipc_group_member_evt(struct tipc_group *grp,
int event = evt->event;
struct tipc_member *m;
struct net *net;
- bool node_up;
u32 self;
if (!grp)
- goto drop;
+ return;
net = grp->net;
self = tipc_own_addr(net);
if (!grp->loopback && node == self && port == grp->portid)
- goto drop;
-
- /* Convert message before delivery to user */
- msg_set_hdr_sz(hdr, GROUP_H_SIZE);
- msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
- msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
- msg_set_origport(hdr, port);
- msg_set_orignode(hdr, node);
- msg_set_nametype(hdr, grp->type);
- msg_set_grp_evt(hdr, event);
+ return;
m = tipc_group_find_member(grp, node, port);
- if (event == TIPC_PUBLISHED) {
- if (!m)
- m = tipc_group_create_member(grp, node, port,
- MBR_DISCOVERED);
- if (!m)
- goto drop;
-
- /* Hold back event if JOIN message not yet received */
- if (m->state == MBR_DISCOVERED) {
- m->event_msg = skb;
- m->state = MBR_PUBLISHED;
- } else {
- msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
- __skb_queue_tail(inputq, skb);
- m->state = MBR_JOINED;
- *usr_wakeup = true;
- m->usr_pending = false;
+ switch (event) {
+ case TIPC_PUBLISHED:
+ /* Send and wait for arrival of JOIN message if necessary */
+ if (!m) {
+ m = tipc_group_create_member(grp, node, port, instance,
+ MBR_PUBLISHED);
+ if (!m)
+ break;
+ tipc_group_update_member(m, 0);
+ tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
+ break;
}
+
+ if (m->state != MBR_JOINING)
+ break;
+
+ /* Member can be taken into service */
m->instance = instance;
- TIPC_SKB_CB(skb)->orig_member = m->instance;
+ m->state = MBR_JOINED;
+ tipc_group_open(m, usr_wakeup);
+ tipc_group_update_member(m, 0);
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
- if (m->window < ADV_IDLE)
- tipc_group_update_member(m, 0);
- else
- list_del_init(&m->congested);
- } else if (event == TIPC_WITHDRAWN) {
+ tipc_group_create_event(grp, m, TIPC_PUBLISHED,
+ m->bc_syncpt, inputq);
+ break;
+ case TIPC_WITHDRAWN:
if (!m)
- goto drop;
-
- TIPC_SKB_CB(skb)->orig_member = m->instance;
+ break;
- *usr_wakeup = true;
- m->usr_pending = false;
- node_up = tipc_node_is_up(net, node);
- m->event_msg = NULL;
-
- if (node_up) {
- /* Hold back event if a LEAVE msg should be expected */
- if (m->state != MBR_LEAVING) {
- m->event_msg = skb;
- tipc_group_decr_active(grp, m);
- m->state = MBR_LEAVING;
- } else {
- msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
- __skb_queue_tail(inputq, skb);
- }
- } else {
- if (m->state != MBR_LEAVING) {
- tipc_group_decr_active(grp, m);
- m->state = MBR_LEAVING;
- msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
- } else {
- msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
- }
- __skb_queue_tail(inputq, skb);
- }
+ tipc_group_decr_active(grp, m);
+ m->state = MBR_LEAVING;
list_del_init(&m->list);
- list_del_init(&m->congested);
+ tipc_group_open(m, usr_wakeup);
+
+ /* Only send event if no LEAVE message can be expected */
+ if (!tipc_node_is_up(net, node))
+ tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
+ m->bc_rcv_nxt, inputq);
+ break;
+ default:
+ break;
}
*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
- return;
-drop:
- kfree_skb(skb);
}