diff options
author | David S. Miller <davem@davemloft.net> | 2017-10-13 18:46:01 +0300 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2017-10-13 18:46:01 +0300 |
commit | a00344bd1bbea2ba40719ae0eb3b6da7fae08cf2 (patch) | |
tree | 3224d16251675b877b9a7d7b94420de7bbd36a9e | |
parent | 2d0d21c12dfa3851620f1fa9fe2d444538f1fad4 (diff) | |
parent | 04d7b574b245c66001a33cb9da2c0311063af73f (diff) | |
download | linux-a00344bd1bbea2ba40719ae0eb3b6da7fae08cf2.tar.xz |
Merge branch 'tipc-comm-groups'
Jon Maloy says:
====================
tipc: Introduce Communcation Group feature
With this commit series we introduce a 'Group Communication' feature in
order to resolve the datagram and multicast flow control problem. This
new feature makes it possible for a user to instantiate multiple private
virtual brokerless message buses by just creating and joining member
sockets.
The main features are as follows:
---------------------------------
- Sockets can join a group via a new setsockopt() call TIPC_GROUP_JOIN.
If it is the first socket of the group this implies creation of the
group. This call takes four parameters: 'type' serves as group
identifier, 'instance' serves as member identifier, and 'scope'
indicates the visibility of the group (node/cluster/zone). Finally,
'flags' indicates different options for the socket joining the group.
For the time being, there are only two such flags: 1) 'LOOPBACK'
indicates if the creator of the socket wants to receive a copy of
broadcast or multicast messages it sends to the group, 2) EVENTS
indicates if it wants to receive membership (JOINED/LEFT) events for
the other members of the group.
- Groups are closed, i.e., sockets which have not joined a group will
not be able to send messages to or receive messages from members of
the group, and vice versa. A socket can only be member of one group
at a time.
- There are four transmission modes.
1: Unicast. The sender transmits a message using the port identity
(node:port tuple) of the receiving socket.
2: Anycast. The sender transmits a message using a port name (type:
instance:scope) of one of the receiving sockets. If more than
one member socket matches the given address a destination is
selected according to a round-robin algorithm, but also considering
the destination load (advertised window size) as an additional
criteria.
3: Multicast. The sender transmits a message using a port name
(type:instance:scope) of one or more of the receiving sockets.
All sockets in the group matching the given address will receive
a copy of the message.
4: Broadcast. The sender transmits a message using the primtive
send(). All members of the group, irrespective of their member
identity (instance) number receive a copy of the message.
- TIPC broadcast is used for carrying messages in mode 3 or 4 when
this is deemed more efficient, i.e., depending on number of actual
destinations.
- All transmission modes are flow controlled, so that messages never
are dropped or rejected, just like we are used to from connection
oriented communication. A special algorithm guarantees that this is
true even for multipoint-to-point communication, i.e., at occasions
where many source sockets may decide to send simultaneously towards
the same destination socket.
- Sequence order is always guaranteed, even between the different
transmission modes.
- Member join/leave events are received in all other member sockets
in guaranteed order. I.e., a 'JOINED' (an empty message with the OOB
bit set) will always be received before the first data message from
a new member, and a 'LEAVE' (like 'JOINED', but with EOR bit set) will
always arrive after the last data message from a leaving member.
-----
v2: Reordered variable declarations in descending length order, as per
feedback from David Miller. This was done as far as permitted by the
the initialization order.
====================
Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r-- | include/uapi/linux/tipc.h | 15 | ||||
-rw-r--r-- | net/tipc/Makefile | 2 | ||||
-rw-r--r-- | net/tipc/bcast.c | 18 | ||||
-rw-r--r-- | net/tipc/core.h | 5 | ||||
-rw-r--r-- | net/tipc/group.c | 871 | ||||
-rw-r--r-- | net/tipc/group.h | 73 | ||||
-rw-r--r-- | net/tipc/link.c | 9 | ||||
-rw-r--r-- | net/tipc/msg.c | 7 | ||||
-rw-r--r-- | net/tipc/msg.h | 118 | ||||
-rw-r--r-- | net/tipc/name_table.c | 174 | ||||
-rw-r--r-- | net/tipc/name_table.h | 28 | ||||
-rw-r--r-- | net/tipc/node.c | 42 | ||||
-rw-r--r-- | net/tipc/node.h | 5 | ||||
-rw-r--r-- | net/tipc/server.c | 121 | ||||
-rw-r--r-- | net/tipc/server.h | 5 | ||||
-rw-r--r-- | net/tipc/socket.c | 787 |
16 files changed, 1997 insertions, 283 deletions
diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h index 5351b08c897a..ef41c11a7f38 100644 --- a/include/uapi/linux/tipc.h +++ b/include/uapi/linux/tipc.h @@ -231,6 +231,21 @@ struct sockaddr_tipc { #define TIPC_SOCK_RECVQ_DEPTH 132 /* Default: none (read only) */ #define TIPC_MCAST_BROADCAST 133 /* Default: TIPC selects. No arg */ #define TIPC_MCAST_REPLICAST 134 /* Default: TIPC selects. No arg */ +#define TIPC_GROUP_JOIN 135 /* Takes struct tipc_group_req* */ +#define TIPC_GROUP_LEAVE 136 /* No argument */ + +/* + * Flag values + */ +#define TIPC_GROUP_LOOPBACK 0x1 /* Receive copy of sent msg when match */ +#define TIPC_GROUP_MEMBER_EVTS 0x2 /* Receive membership events in socket */ + +struct tipc_group_req { + __u32 type; /* group id */ + __u32 instance; /* member id */ + __u32 scope; /* zone/cluster/node */ + __u32 flags; +}; /* * Maximum sizes of TIPC bearer-related names (including terminating NULL) diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 31b9f9c52974..a3af73ec0b78 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -8,7 +8,7 @@ tipc-y += addr.o bcast.o bearer.o \ core.o link.o discover.o msg.o \ name_distr.o subscr.o monitor.o name_table.o net.o \ netlink.o netlink_compat.o node.o socket.o eth_media.o \ - server.o socket.o + server.o socket.o group.o tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index a140dd4a84af..329325bd553e 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -258,20 +258,20 @@ static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, struct tipc_nlist *dests, u16 *cong_link_cnt) { + struct tipc_dest *dst, *tmp; struct sk_buff_head _pkts; - struct u32_item *n, *tmp; - u32 dst, selector; + u32 dnode, selector; selector = msg_link_selector(buf_msg(skb_peek(pkts))); skb_queue_head_init(&_pkts); - list_for_each_entry_safe(n, tmp, &dests->list, list) { - dst = n->value; - if (!tipc_msg_pskb_copy(dst, pkts, &_pkts)) + list_for_each_entry_safe(dst, tmp, &dests->list, list) { + dnode = dst->node; + if (!tipc_msg_pskb_copy(dnode, pkts, &_pkts)) return -ENOMEM; /* Any other return value than -ELINKCONG is ignored */ - if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG) + if (tipc_node_xmit(net, &_pkts, dnode, selector) == -ELINKCONG) (*cong_link_cnt)++; } return 0; @@ -554,7 +554,7 @@ void tipc_nlist_add(struct tipc_nlist *nl, u32 node) { if (node == nl->self) nl->local = true; - else if (u32_push(&nl->list, node)) + else if (tipc_dest_push(&nl->list, node, 0)) nl->remote++; } @@ -562,13 +562,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node) { if (node == nl->self) nl->local = false; - else if (u32_del(&nl->list, node)) + else if (tipc_dest_del(&nl->list, node, 0)) nl->remote--; } void tipc_nlist_purge(struct tipc_nlist *nl) { - u32_list_purge(&nl->list); + tipc_dest_list_purge(&nl->list); nl->remote = 0; nl->local = 0; } diff --git a/net/tipc/core.h b/net/tipc/core.h index 5cc5398be722..964342689f2c 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -132,6 +132,11 @@ static inline struct list_head *tipc_nodes(struct net *net) return &tipc_net(net)->node_list; } +static inline struct tipc_server *tipc_topsrv(struct net *net) +{ + return tipc_net(net)->topsrv; +} + static inline unsigned int tipc_hashfn(u32 addr) { return addr & (NODE_HTABLE_SIZE - 1); diff --git a/net/tipc/group.c b/net/tipc/group.c new file mode 100644 index 000000000000..7821085a7dd8 --- /dev/null +++ b/net/tipc/group.c @@ -0,0 +1,871 @@ +/* + * net/tipc/group.c: TIPC group messaging code + * + * Copyright (c) 2017, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "core.h" +#include "addr.h" +#include "group.h" +#include "bcast.h" +#include "server.h" +#include "msg.h" +#include "socket.h" +#include "node.h" +#include "name_table.h" +#include "subscr.h" + +#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) +#define ADV_IDLE ADV_UNIT +#define ADV_ACTIVE (ADV_UNIT * 12) + +enum mbr_state { + MBR_QUARANTINED, + MBR_DISCOVERED, + MBR_JOINING, + MBR_PUBLISHED, + MBR_JOINED, + MBR_PENDING, + MBR_ACTIVE, + MBR_RECLAIMING, + MBR_REMITTED, + MBR_LEAVING +}; + +struct tipc_member { + struct rb_node tree_node; + struct list_head list; + struct list_head congested; + struct sk_buff *event_msg; + struct sk_buff_head deferredq; + struct tipc_group *group; + u32 node; + u32 port; + u32 instance; + enum mbr_state state; + u16 advertised; + u16 window; + u16 bc_rcv_nxt; + u16 bc_syncpt; + u16 bc_acked; + bool usr_pending; +}; + +struct tipc_group { + struct rb_root members; + struct list_head congested; + struct list_head pending; + struct list_head active; + struct list_head reclaiming; + struct tipc_nlist dests; + struct net *net; + int subid; + u32 type; + u32 instance; + u32 domain; + u32 scope; + u32 portid; + u16 member_cnt; + u16 active_cnt; + u16 max_active; + u16 bc_snd_nxt; + u16 bc_ackers; + bool loopback; + bool events; +}; + +static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, + int mtyp, struct sk_buff_head *xmitq); + +static void tipc_group_decr_active(struct tipc_group *grp, + struct tipc_member *m) +{ + if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING) + grp->active_cnt--; +} + +static int tipc_group_rcvbuf_limit(struct tipc_group *grp) +{ + int max_active, active_pool, idle_pool; + int mcnt = grp->member_cnt + 1; + + /* Limit simultaneous reception from other members */ + max_active = min(mcnt / 8, 64); + max_active = max(max_active, 16); + grp->max_active = max_active; + + /* Reserve blocks for active and idle members */ + active_pool = max_active * ADV_ACTIVE; + idle_pool = (mcnt - max_active) * ADV_IDLE; + + /* Scale to bytes, considering worst-case truesize/msgsize ratio */ + return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; +} + +u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) +{ + return grp->bc_snd_nxt; +} + +static bool tipc_group_is_enabled(struct tipc_member *m) +{ + return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; +} + +static bool tipc_group_is_receiver(struct tipc_member *m) +{ + return m && m->state >= MBR_JOINED; +} + +u32 tipc_group_exclude(struct tipc_group *grp) +{ + if (!grp->loopback) + return grp->portid; + return 0; +} + +int tipc_group_size(struct tipc_group *grp) +{ + return grp->member_cnt; +} + +struct tipc_group *tipc_group_create(struct net *net, u32 portid, + struct tipc_group_req *mreq) +{ + struct tipc_group *grp; + u32 type = mreq->type; + + grp = kzalloc(sizeof(*grp), GFP_ATOMIC); + if (!grp) + return NULL; + tipc_nlist_init(&grp->dests, tipc_own_addr(net)); + INIT_LIST_HEAD(&grp->congested); + INIT_LIST_HEAD(&grp->active); + INIT_LIST_HEAD(&grp->pending); + INIT_LIST_HEAD(&grp->reclaiming); + grp->members = RB_ROOT; + grp->net = net; + grp->portid = portid; + grp->domain = addr_domain(net, mreq->scope); + grp->type = type; + grp->instance = mreq->instance; + grp->scope = mreq->scope; + grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; + grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; + if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) + return grp; + kfree(grp); + return NULL; +} + +void tipc_group_delete(struct net *net, struct tipc_group *grp) +{ + struct rb_root *tree = &grp->members; + struct tipc_member *m, *tmp; + struct sk_buff_head xmitq; + + __skb_queue_head_init(&xmitq); + + rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { + tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); + list_del(&m->list); + kfree(m); + } + tipc_node_distr_xmit(net, &xmitq); + tipc_nlist_purge(&grp->dests); + tipc_topsrv_kern_unsubscr(net, grp->subid); + kfree(grp); +} + +struct tipc_member *tipc_group_find_member(struct tipc_group *grp, + u32 node, u32 port) +{ + struct rb_node *n = grp->members.rb_node; + u64 nkey, key = (u64)node << 32 | port; + struct tipc_member *m; + + while (n) { + m = container_of(n, struct tipc_member, tree_node); + nkey = (u64)m->node << 32 | m->port; + if (key < nkey) + n = n->rb_left; + else if (key > nkey) + n = n->rb_right; + else + return m; + } + return NULL; +} + +static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, + u32 node, u32 port) +{ + struct tipc_member *m; + + m = tipc_group_find_member(grp, node, port); + if (m && tipc_group_is_enabled(m)) + return m; + return NULL; +} + +static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, + u32 node) +{ + struct tipc_member *m; + struct rb_node *n; + + for (n = rb_first(&grp->members); n; n = rb_next(n)) { + m = container_of(n, struct tipc_member, tree_node); + if (m->node == node) + return m; + } + return NULL; +} + +static void tipc_group_add_to_tree(struct tipc_group *grp, + struct tipc_member *m) +{ + u64 nkey, key = (u64)m->node << 32 | m->port; + struct rb_node **n, *parent = NULL; + struct tipc_member *tmp; + + n = &grp->members.rb_node; + while (*n) { + tmp = container_of(*n, struct tipc_member, tree_node); + parent = *n; + tmp = container_of(parent, struct tipc_member, tree_node); + nkey = (u64)tmp->node << 32 | tmp->port; + if (key < nkey) + n = &(*n)->rb_left; + else if (key > nkey) + n = &(*n)->rb_right; + else + return; + } + rb_link_node(&m->tree_node, parent, n); + rb_insert_color(&m->tree_node, &grp->members); +} + +static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, + u32 node, u32 port, + int state) +{ + struct tipc_member *m; + + m = kzalloc(sizeof(*m), GFP_ATOMIC); + if (!m) + return NULL; + INIT_LIST_HEAD(&m->list); + INIT_LIST_HEAD(&m->congested); + __skb_queue_head_init(&m->deferredq); + m->group = grp; + m->node = node; + m->port = port; + m->bc_acked = grp->bc_snd_nxt - 1; + grp->member_cnt++; + tipc_group_add_to_tree(grp, m); + tipc_nlist_add(&grp->dests, m->node); + m->state = state; + return m; +} + +void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) +{ + tipc_group_create_member(grp, node, port, MBR_DISCOVERED); +} + +static void tipc_group_delete_member(struct tipc_group *grp, + struct tipc_member *m) +{ + rb_erase(&m->tree_node, &grp->members); + grp->member_cnt--; + + /* Check if we were waiting for replicast ack from this member */ + if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) + grp->bc_ackers--; + + list_del_init(&m->list); + list_del_init(&m->congested); + tipc_group_decr_active(grp, m); + + /* If last member on a node, remove node from dest list */ + if (!tipc_group_find_node(grp, m->node)) + tipc_nlist_del(&grp->dests, m->node); + + kfree(m); +} + +struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) +{ + return &grp->dests; +} + +void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, + int *scope) +{ + seq->type = grp->type; + seq->lower = grp->instance; + seq->upper = grp->instance; + *scope = grp->scope; +} + +void tipc_group_update_member(struct tipc_member *m, int len) +{ + struct tipc_group *grp = m->group; + struct tipc_member *_m, *tmp; + + if (!tipc_group_is_enabled(m)) + return; + + m->window -= len; + + if (m->window >= ADV_IDLE) + return; + + if (!list_empty(&m->congested)) + return; + + /* Sort member into congested members' list */ + list_for_each_entry_safe(_m, tmp, &grp->congested, congested) { + if (m->window > _m->window) + continue; + list_add_tail(&m->congested, &_m->congested); + return; + } + list_add_tail(&m->congested, &grp->congested); +} + +void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) +{ + u16 prev = grp->bc_snd_nxt - 1; + struct tipc_member *m; + struct rb_node *n; + + for (n = rb_first(&grp->members); n; n = rb_next(n)) { + m = container_of(n, struct tipc_member, tree_node); + if (tipc_group_is_enabled(m)) { + tipc_group_update_member(m, len); + m->bc_acked = prev; + } + } + + /* Mark number of acknowledges to expect, if any */ + if (ack) + grp->bc_ackers = grp->member_cnt; + grp->bc_snd_nxt++; +} + +bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, + int len, struct tipc_member **mbr) +{ + struct sk_buff_head xmitq; + struct tipc_member *m; + int adv, state; + + m = tipc_group_find_dest(grp, dnode, dport); + *mbr = m; + if (!m) + return false; + if (m->usr_pending) + return true; + if (m->window >= len) + return false; + m->usr_pending = true; + + /* If not fully advertised, do it now to prevent mutual blocking */ + adv = m->advertised; + state = m->state; + if (state < MBR_JOINED) + return true; + if (state == MBR_JOINED && adv == ADV_IDLE) + return true; + if (state == MBR_ACTIVE && adv == ADV_ACTIVE) + return true; + if (state == MBR_PENDING && adv == ADV_IDLE) + return true; + skb_queue_head_init(&xmitq); + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); + tipc_node_distr_xmit(grp->net, &xmitq); + return true; +} + +bool tipc_group_bc_cong(struct tipc_group *grp, int len) +{ + struct tipc_member *m = NULL; + + /* If prev bcast was replicast, reject until all receivers have acked */ + if (grp->bc_ackers) + return true; + + if (list_empty(&grp->congested)) + return false; + + m = list_first_entry(&grp->congested, struct tipc_member, congested); + if (m->window >= len) + return false; + + return tipc_group_cong(grp, m->node, m->port, len, &m); +} + +/* tipc_group_sort_msg() - sort msg into queue by bcast sequence number + */ +static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) +{ + struct tipc_msg *_hdr, *hdr = buf_msg(skb); + u16 bc_seqno = msg_grp_bc_seqno(hdr); + struct sk_buff *_skb, *tmp; + int mtyp = msg_type(hdr); + + /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ + if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { + skb_queue_walk_safe(defq, _skb, tmp) { + _hdr = buf_msg(_skb); + if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) + continue; + __skb_queue_before(defq, _skb, skb); + return; + } + /* Bcast was not bypassed, - add to tail */ + } + /* Unicasts are never bypassed, - always add to tail */ + __skb_queue_tail(defq, skb); +} + +/* tipc_group_filter_msg() - determine if we should accept arriving message + */ +void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb = __skb_dequeue(inputq); + bool ack, deliver, update, leave = false; + struct sk_buff_head *defq; + struct tipc_member *m; + struct tipc_msg *hdr; + u32 node, port; + int mtyp, blks; + + if (!skb) + return; + + hdr = buf_msg(skb); + node = msg_orignode(hdr); + port = msg_origport(hdr); + + if (!msg_in_group(hdr)) + goto drop; + + m = tipc_group_find_member(grp, node, port); + if (!tipc_group_is_receiver(m)) + goto drop; + + if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) + goto drop; + + TIPC_SKB_CB(skb)->orig_member = m->instance; + defq = &m->deferredq; + tipc_group_sort_msg(skb, defq); + + while ((skb = skb_peek(defq))) { + hdr = buf_msg(skb); + mtyp = msg_type(hdr); + deliver = true; + ack = false; + update = false; + + if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) + break; + + /* Decide what to do with message */ + switch (mtyp) { + case TIPC_GRP_MCAST_MSG: + if (msg_nameinst(hdr) != grp->instance) { + update = true; + deliver = false; + } + /* Fall thru */ + case TIPC_GRP_BCAST_MSG: + m->bc_rcv_nxt++; + ack = msg_grp_bc_ack_req(hdr); + break; + case TIPC_GRP_UCAST_MSG: + break; + case TIPC_GRP_MEMBER_EVT: + if (m->state == MBR_LEAVING) + leave = true; + if (!grp->events) + deliver = false; + break; + default: + break; + } + + /* Execute decisions */ + __skb_dequeue(defq); + if (deliver) + __skb_queue_tail(inputq, skb); + else + kfree_skb(skb); + + if (ack) + tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); + + if (leave) { + tipc_group_delete_member(grp, m); + __skb_queue_purge(defq); + break; + } + if (!update) + continue; + + blks = msg_blocks(hdr); + tipc_group_update_rcv_win(grp, blks, node, port, xmitq); + } + return; +drop: + kfree_skb(skb); +} + +void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, + u32 port, struct sk_buff_head *xmitq) +{ + struct list_head *active = &grp->active; + int max_active = grp->max_active; + int reclaim_limit = max_active * 3 / 4; + int active_cnt = grp->active_cnt; + struct tipc_member *m, *rm; + + m = tipc_group_find_member(grp, node, port); + if (!m) + return; + + m->advertised -= blks; + + switch (m->state) { + case MBR_JOINED: + /* Reclaim advertised space from least active member */ + if (!list_empty(active) && active_cnt >= reclaim_limit) { + rm = list_first_entry(active, struct tipc_member, list); + rm->state = MBR_RECLAIMING; + list_move_tail(&rm->list, &grp->reclaiming); + tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); + } + /* If max active, become pending and wait for reclaimed space */ + if (active_cnt >= max_active) { + m->state = MBR_PENDING; + list_add_tail(&m->list, &grp->pending); + break; + } + /* Otherwise become active */ + m->state = MBR_ACTIVE; + list_add_tail(&m->list, &grp->active); + grp->active_cnt++; + /* Fall through */ + case MBR_ACTIVE: + if (!list_is_last(&m->list, &grp->active)) + list_move_tail(&m->list, &grp->active); + if (m->advertised > (ADV_ACTIVE * 3 / 4)) + break; + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + break; + case MBR_REMITTED: + if (m->advertised > ADV_IDLE) + break; + m->state = MBR_JOINED; + if (m->advertised < ADV_IDLE) { + pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + } + break; + case MBR_RECLAIMING: + case MBR_DISCOVERED: + case MBR_JOINING: + case MBR_LEAVING: + default: + break; + } +} + +static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, + int mtyp, struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr; + struct sk_buff *skb; + int adv = 0; + + skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, + m->node, tipc_own_addr(grp->net), + m->port, grp->portid, 0); + if (!skb) + return; + + if (m->state == MBR_ACTIVE) + adv = ADV_ACTIVE - m->advertised; + else if (m->state == MBR_JOINED || m->state == MBR_PENDING) + adv = ADV_IDLE - m->advertised; + + hdr = buf_msg(skb); + + if (mtyp == GRP_JOIN_MSG) { + msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); + msg_set_adv_win(hdr, adv); + m->advertised += adv; + } else if (mtyp == GRP_LEAVE_MSG) { + msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); + } else if (mtyp == GRP_ADV_MSG) { + msg_set_adv_win(hdr, adv); + m->advertised += adv; + } else if (mtyp == GRP_ACK_MSG) { + msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); + } else if (mtyp == GRP_REMIT_MSG) { + msg_set_grp_remitted(hdr, m->window); + } + __skb_queue_tail(xmitq, skb); +} + +void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, + struct tipc_msg *hdr, struct sk_buff_head *inputq, + struct sk_buff_head *xmitq) +{ + u32 node = msg_orignode(hdr); + u32 port = msg_origport(hdr); + struct tipc_member *m, *pm; + struct tipc_msg *ehdr; + u16 remitted, in_flight; + + if (!grp) + return; + + m = tipc_group_find_member(grp, node, port); + + switch (msg_type(hdr)) { + case GRP_JOIN_MSG: + if (!m) + m = tipc_group_create_member(grp, node, port, + MBR_QUARANTINED); + if (!m) + return; + m->bc_syncpt = msg_grp_bc_syncpt(hdr); + m->bc_rcv_nxt = m->bc_syncpt; + m->window += msg_adv_win(hdr); + + /* Wait until PUBLISH event is received */ + if (m->state == MBR_DISCOVERED) { + m->state = MBR_JOINING; + } else if (m->state == MBR_PUBLISHED) { + m->state = MBR_JOINED; + *usr_wakeup = true; + m->usr_pending = false; + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + ehdr = buf_msg(m->event_msg); + msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); + __skb_queue_tail(inputq, m->event_msg); + } + if (m->window < ADV_IDLE) + tipc_group_update_member(m, 0); + else + list_del_init(&m->congested); + return; + case GRP_LEAVE_MSG: + if (!m) + return; + m->bc_syncpt = msg_grp_bc_syncpt(hdr); + + /* Wait until WITHDRAW event is received */ + if (m->state != MBR_LEAVING) { + tipc_group_decr_active(grp, m); + m->state = MBR_LEAVING; + return; + } + /* Otherwise deliver already received WITHDRAW event */ + ehdr = buf_msg(m->event_msg); + msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); + __skb_queue_tail(inputq, m->event_msg); + *usr_wakeup = true; + list_del_init(&m->congested); + return; + case GRP_ADV_MSG: + if (!m) + return; + m->window += msg_adv_win(hdr); + *usr_wakeup = m->usr_pending; + m->usr_pending = false; + list_del_init(&m->congested); + return; + case GRP_ACK_MSG: + if (!m) + return; + m->bc_acked = msg_grp_bc_acked(hdr); + if (--grp->bc_ackers) + break; + *usr_wakeup = true; + m->usr_pending = false; + return; + case GRP_RECLAIM_MSG: + if (!m) + return; + *usr_wakeup = m->usr_pending; + m->usr_pending = false; + tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); + m->window = ADV_IDLE; + return; + case GRP_REMIT_MSG: + if (!m || m->state != MBR_RECLAIMING) + return; + + list_del_init(&m->list); + grp->active_cnt--; + remitted = msg_grp_remitted(hdr); + + /* Messages preceding the REMIT still in receive queue */ + if (m->advertised > remitted) { + m->state = MBR_REMITTED; + in_flight = m->advertised - remitted; + } + /* All messages preceding the REMIT have been read */ + if (m->advertised <= remitted) { + m->state = MBR_JOINED; + in_flight = 0; + } + /* ..and the REMIT overtaken by more messages => re-advertise */ + if (m->advertised < remitted) + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + + m->advertised = ADV_IDLE + in_flight; + + /* Set oldest pending member to active and advertise */ + if (list_empty(&grp->pending)) + return; + pm = list_first_entry(&grp->pending, struct tipc_member, list); + pm->state = MBR_ACTIVE; + list_move_tail(&pm->list, &grp->active); + grp->active_cnt++; + if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) + tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); + return; + default: + pr_warn("Received unknown GROUP_PROTO message\n"); + } +} + +/* tipc_group_member_evt() - receive and handle a member up/down event + */ +void tipc_group_member_evt(struct tipc_group *grp, + bool *usr_wakeup, + int *sk_rcvbuf, + struct sk_buff *skb, + struct sk_buff_head *inputq, + struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb); + struct tipc_event *evt = (void *)msg_data(hdr); + u32 instance = evt->found_lower; + u32 node = evt->port.node; + u32 port = evt->port.ref; + int event = evt->event; + struct tipc_member *m; + struct net *net; + bool node_up; + u32 self; + + if (!grp) + goto drop; + + net = grp->net; + self = tipc_own_addr(net); + if (!grp->loopback && node == self && port == grp->portid) + goto drop; + + /* Convert message before delivery to user */ + msg_set_hdr_sz(hdr, GROUP_H_SIZE); + msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE); + msg_set_type(hdr, TIPC_GRP_MEMBER_EVT); + msg_set_origport(hdr, port); + msg_set_orignode(hdr, node); + msg_set_nametype(hdr, grp->type); + msg_set_grp_evt(hdr, event); + + m = tipc_group_find_member(grp, node, port); + + if (event == TIPC_PUBLISHED) { + if (!m) + m = tipc_group_create_member(grp, node, port, + MBR_DISCOVERED); + if (!m) + goto drop; + + /* Hold back event if JOIN message not yet received */ + if (m->state == MBR_DISCOVERED) { + m->event_msg = skb; + m->state = MBR_PUBLISHED; + } else { + msg_set_grp_bc_seqno(hdr, m->bc_syncpt); + __skb_queue_tail(inputq, skb); + m->state = MBR_JOINED; + *usr_wakeup = true; + m->usr_pending = false; + } + m->instance = instance; + TIPC_SKB_CB(skb)->orig_member = m->instance; + tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); + if (m->window < ADV_IDLE) + tipc_group_update_member(m, 0); + else + list_del_init(&m->congested); + } else if (event == TIPC_WITHDRAWN) { + if (!m) + goto drop; + + TIPC_SKB_CB(skb)->orig_member = m->instance; + + *usr_wakeup = true; + m->usr_pending = false; + node_up = tipc_node_is_up(net, node); + + /* Hold back event if more messages might be expected */ + if (m->state != MBR_LEAVING && node_up) { + m->event_msg = skb; + tipc_group_decr_active(grp, m); + m->state = MBR_LEAVING; + } else { + if (node_up) + msg_set_grp_bc_seqno(hdr, m->bc_syncpt); + else + msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt); + __skb_queue_tail(inputq, skb); + } + list_del_init(&m->congested); + } + *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); + return; +drop: + kfree_skb(skb); +} diff --git a/net/tipc/group.h b/net/tipc/group.h new file mode 100644 index 000000000000..d525e1cd7de5 --- /dev/null +++ b/net/tipc/group.h @@ -0,0 +1,73 @@ +/* + * net/tipc/group.h: Include file for TIPC group unicast/multicast functions + * + * Copyright (c) 2017, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_GROUP_H +#define _TIPC_GROUP_H + +#include "core.h" + +struct tipc_group; +struct tipc_member; +struct tipc_msg; + +struct tipc_group *tipc_group_create(struct net *net, u32 portid, + struct tipc_group_req *mreq); +void tipc_group_delete(struct net *net, struct tipc_group *grp); +void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port); +struct tipc_nlist *tipc_group_dests(struct tipc_group *grp); +void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, + int *scope); +u32 tipc_group_exclude(struct tipc_group *grp); +void tipc_group_filter_msg(struct tipc_group *grp, + struct sk_buff_head *inputq, + struct sk_buff_head *xmitq); +void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup, + int *sk_rcvbuf, struct sk_buff *skb, + struct sk_buff_head *inputq, + struct sk_buff_head *xmitq); +void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, + struct tipc_msg *hdr, + struct sk_buff_head *inputq, + struct sk_buff_head *xmitq); +void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack); +bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, + int len, struct tipc_member **m); +bool tipc_group_bc_cong(struct tipc_group *grp, int len); +void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, + u32 port, struct sk_buff_head *xmitq); +u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); +void tipc_group_update_member(struct tipc_member *m, int len); +int tipc_group_size(struct tipc_group *grp); +#endif diff --git a/net/tipc/link.c b/net/tipc/link.c index ac0144f532aa..723dd6998684 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1039,6 +1039,7 @@ int tipc_link_retrans(struct tipc_link *l, struct tipc_link *nacker, static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { + struct sk_buff_head *mc_inputq = l->bc_rcvlink->inputq; struct tipc_msg *hdr = buf_msg(skb); switch (msg_user(hdr)) { @@ -1046,12 +1047,14 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, case TIPC_MEDIUM_IMPORTANCE: case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: - if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { - skb_queue_tail(l->bc_rcvlink->inputq, skb); + if (unlikely(msg_in_group(hdr) || msg_mcast(hdr))) { + skb_queue_tail(mc_inputq, skb); return true; } case CONN_MANAGER: - skb_queue_tail(inputq, skb); + return true; + case GROUP_PROTOCOL: + skb_queue_tail(mc_inputq, skb); return true; case NAME_DISTRIBUTOR: l->bc_rcvlink->state = LINK_ESTABLISHED; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 17146c16ee2d..1649d456e22d 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -666,3 +666,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, } kfree_skb(skb); } + +void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) + __skb_queue_tail(xmitq, skb); +} diff --git a/net/tipc/msg.h b/net/tipc/msg.h index c843fd2bc48d..cedf811317fb 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1,7 +1,7 @@ /* * net/tipc/msg.h: Include file for TIPC message header routines * - * Copyright (c) 2000-2007, 2014-2015 Ericsson AB + * Copyright (c) 2000-2007, 2014-2017 Ericsson AB * Copyright (c) 2005-2008, 2010-2011, Wind River Systems * All rights reserved. * @@ -61,10 +61,14 @@ struct plist; /* * Payload message types */ -#define TIPC_CONN_MSG 0 -#define TIPC_MCAST_MSG 1 -#define TIPC_NAMED_MSG 2 -#define TIPC_DIRECT_MSG 3 +#define TIPC_CONN_MSG 0 +#define TIPC_MCAST_MSG 1 +#define TIPC_NAMED_MSG 2 +#define TIPC_DIRECT_MSG 3 +#define TIPC_GRP_MEMBER_EVT 4 +#define TIPC_GRP_BCAST_MSG 5 +#define TIPC_GRP_MCAST_MSG 6 +#define TIPC_GRP_UCAST_MSG 7 /* * Internal message users @@ -73,11 +77,13 @@ struct plist; #define MSG_BUNDLER 6 #define LINK_PROTOCOL 7 #define CONN_MANAGER 8 +#define GROUP_PROTOCOL 9 #define TUNNEL_PROTOCOL 10 #define NAME_DISTRIBUTOR 11 #define MSG_FRAGMENTER 12 #define LINK_CONFIG 13 #define SOCK_WAKEUP 14 /* pseudo user */ +#define TOP_SRV 15 /* pseudo user */ /* * Message header sizes @@ -86,6 +92,7 @@ struct plist; #define BASIC_H_SIZE 32 /* Basic payload message */ #define NAMED_H_SIZE 40 /* Named payload message */ #define MCAST_H_SIZE 44 /* Multicast payload message */ +#define GROUP_H_SIZE 44 /* Group payload message */ #define INT_H_SIZE 40 /* Internal messages */ #define MIN_H_SIZE 24 /* Smallest legal TIPC header size */ #define MAX_H_SIZE 60 /* Largest possible TIPC header size */ @@ -96,6 +103,7 @@ struct plist; struct tipc_skb_cb { u32 bytes_read; + u32 orig_member; struct sk_buff *tail; bool validated; u16 chain_imp; @@ -188,6 +196,11 @@ static inline u32 msg_size(struct tipc_msg *m) return msg_bits(m, 0, 0, 0x1ffff); } +static inline u32 msg_blocks(struct tipc_msg *m) +{ + return (msg_size(m) / 1024) + 1; +} + static inline u32 msg_data_sz(struct tipc_msg *m) { return msg_size(m) - msg_hdr_sz(m); @@ -251,6 +264,18 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n) msg_set_bits(m, 1, 29, 0x7, n); } +static inline int msg_in_group(struct tipc_msg *m) +{ + int mtyp = msg_type(m); + + return mtyp >= TIPC_GRP_MEMBER_EVT && mtyp <= TIPC_GRP_UCAST_MSG; +} + +static inline bool msg_is_grp_evt(struct tipc_msg *m) +{ + return msg_type(m) == TIPC_GRP_MEMBER_EVT; +} + static inline u32 msg_named(struct tipc_msg *m) { return msg_type(m) == TIPC_NAMED_MSG; @@ -258,7 +283,10 @@ static inline u32 msg_named(struct tipc_msg *m) static inline u32 msg_mcast(struct tipc_msg *m) { - return msg_type(m) == TIPC_MCAST_MSG; + int mtyp = msg_type(m); + + return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG) || + (mtyp == TIPC_GRP_MCAST_MSG)); } static inline u32 msg_connected(struct tipc_msg *m) @@ -514,6 +542,16 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) #define DSC_RESP_MSG 1 /* + * Group protocol message types + */ +#define GRP_JOIN_MSG 0 +#define GRP_LEAVE_MSG 1 +#define GRP_ADV_MSG 2 +#define GRP_ACK_MSG 3 +#define GRP_RECLAIM_MSG 4 +#define GRP_REMIT_MSG 5 + +/* * Word 1 */ static inline u32 msg_seq_gap(struct tipc_msg *m) @@ -764,12 +802,12 @@ static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 16, 0xffff, n); } -static inline u32 msg_adv_win(struct tipc_msg *m) +static inline u16 msg_adv_win(struct tipc_msg *m) { return msg_bits(m, 9, 0, 0xffff); } -static inline void msg_set_adv_win(struct tipc_msg *m, u32 n) +static inline void msg_set_adv_win(struct tipc_msg *m, u16 n) { msg_set_bits(m, 9, 0, 0xffff, n); } @@ -794,6 +832,68 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 0, 0xffff, n); } +static inline u16 msg_grp_bc_syncpt(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + +static inline u16 msg_grp_bc_acked(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + +static inline u16 msg_grp_remitted(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_grp_remitted(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + +/* Word 10 + */ +static inline u16 msg_grp_evt(struct tipc_msg *m) +{ + return msg_bits(m, 10, 0, 0x3); +} + +static inline void msg_set_grp_evt(struct tipc_msg *m, int n) +{ + msg_set_bits(m, 10, 0, 0x3, n); +} + +static inline u16 msg_grp_bc_ack_req(struct tipc_msg *m) +{ + return msg_bits(m, 10, 0, 0x1); +} + +static inline void msg_set_grp_bc_ack_req(struct tipc_msg *m, bool n) +{ + msg_set_bits(m, 10, 0, 0x1, n); +} + +static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) +{ + return msg_bits(m, 10, 16, 0xffff); +} + +static inline void msg_set_grp_bc_seqno(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 10, 16, 0xffff, n); +} + static inline bool msg_peer_link_is_up(struct tipc_msg *m) { if (likely(msg_user(m) != LINK_PROTOCOL)) @@ -818,6 +918,8 @@ static inline bool msg_is_reset(struct tipc_msg *hdr) struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp); bool tipc_msg_validate(struct sk_buff *skb); bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err); +void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb, + struct sk_buff_head *xmitq); void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode); struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index bd0aac87b41a..2856e19e036e 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -43,6 +43,7 @@ #include "bcast.h" #include "addr.h" #include "node.h" +#include "group.h" #include <net/genetlink.h> #define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ @@ -596,18 +597,47 @@ not_found: return ref; } -/** - * tipc_nametbl_mc_translate - find multicast destinations - * - * Creates list of all local ports that overlap the given multicast address; - * also determines if any off-node ports overlap. - * - * Note: Publications with a scope narrower than 'limit' are ignored. - * (i.e. local node-scope publications mustn't receive messages arriving - * from another node, even if the multcast link brought it here) - * - * Returns non-zero if any off-node ports overlap - */ +bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain, + struct list_head *dsts, int *dstcnt, u32 exclude, + bool all) +{ + u32 self = tipc_own_addr(net); + struct publication *publ; + struct name_info *info; + struct name_seq *seq; + struct sub_seq *sseq; + + if (!tipc_in_scope(domain, self)) + return false; + + *dstcnt = 0; + rcu_read_lock(); + seq = nametbl_find_seq(net, type); + if (unlikely(!seq)) + goto exit; + spin_lock_bh(&seq->lock); + sseq = nameseq_find_subseq(seq, instance); + if (likely(sseq)) { + info = sseq->info; + list_for_each_entry(publ, &info->zone_list, zone_list) { + if (!tipc_in_scope(domain, publ->node)) + continue; + if (publ->ref == exclude && publ->node == self) + continue; + tipc_dest_push(dsts, publ->node, publ->ref); + (*dstcnt)++; + if (all) + continue; + list_move_tail(&publ->zone_list, &info->zone_list); + break; + } + } + spin_unlock_bh(&seq->lock); +exit: + rcu_read_unlock(); + return !list_empty(dsts); +} + int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, u32 limit, struct list_head *dports) { @@ -634,7 +664,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, info = sseq->info; list_for_each_entry(publ, &info->node_list, node_list) { if (publ->scope <= limit) - u32_push(dports, publ->ref); + tipc_dest_push(dports, 0, publ->ref); } if (info->cluster_list_size != info->node_list_size) @@ -679,6 +709,37 @@ exit: rcu_read_unlock(); } +/* tipc_nametbl_build_group - build list of communication group members + */ +void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, + u32 type, u32 domain) +{ + struct sub_seq *sseq, *stop; + struct name_info *info; + struct publication *p; + struct name_seq *seq; + + rcu_read_lock(); + seq = nametbl_find_seq(net, type); + if (!seq) + goto exit; + + spin_lock_bh(&seq->lock); + sseq = seq->sseqs; + stop = seq->sseqs + seq->first_free; + for (; sseq != stop; sseq++) { + info = sseq->info; + list_for_each_entry(p, &info->zone_list, zone_list) { + if (!tipc_in_scope(domain, p->node)) + continue; + tipc_group_add_member(grp, p->node, p->ref); + } + } + spin_unlock_bh(&seq->lock); +exit: + rcu_read_unlock(); +} + /* * tipc_nametbl_publish - add name publication to network name tables */ @@ -1057,78 +1118,79 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) return skb->len; } -bool u32_find(struct list_head *l, u32 value) +struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port) { - struct u32_item *item; + u64 value = (u64)node << 32 | port; + struct tipc_dest *dst; - list_for_each_entry(item, l, list) { - if (item->value == value) - return true; + list_for_each_entry(dst, l, list) { + if (dst->value != value) + continue; + return dst; } - return false; + return NULL; } -bool u32_push(struct list_head *l, u32 value) +bool tipc_dest_push(struct list_head *l, u32 node, u32 port) { - struct u32_item *item; + u64 value = (u64)node << 32 | port; + struct tipc_dest *dst; - list_for_each_entry(item, l, list) { - if (item->value == value) - return false; - } - item = kmalloc(sizeof(*item), GFP_ATOMIC); - if (unlikely(!item)) + if (tipc_dest_find(l, node, port)) return false; - item->value = value; - list_add(&item->list, l); + dst = kmalloc(sizeof(*dst), GFP_ATOMIC); + if (unlikely(!dst)) + return false; + dst->value = value; + list_add(&dst->list, l); return true; } -u32 u32_pop(struct list_head *l) +bool tipc_dest_pop(struct list_head *l, u32 *node, u32 *port) { - struct u32_item *item; - u32 value = 0; + struct tipc_dest *dst; if (list_empty(l)) - return 0; - item = list_first_entry(l, typeof(*item), list); - value = item->value; - list_del(&item->list); - kfree(item); - return value; + return false; + dst = list_first_entry(l, typeof(*dst), list); + if (port) + *port = dst->port; + if (node) + *node = dst->node; + list_del(&dst->list); + kfree(dst); + return true; } -bool u32_del(struct list_head *l, u32 value) +bool tipc_dest_del(struct list_head *l, u32 node, u32 port) { - struct u32_item *item, *tmp; + struct tipc_dest *dst; - list_for_each_entry_safe(item, tmp, l, list) { - if (item->value != value) - continue; - list_del(&item->list); - kfree(item); - return true; - } - return false; + dst = tipc_dest_find(l, node, port); + if (!dst) + return false; + list_del(&dst->list); + kfree(dst); + return true; } -void u32_list_purge(struct list_head *l) +void tipc_dest_list_purge(struct list_head *l) { - struct u32_item *item, *tmp; + struct tipc_dest *dst, *tmp; - list_for_each_entry_safe(item, tmp, l, list) { - list_del(&item->list); - kfree(item); + list_for_each_entry_safe(dst, tmp, l, list) { + list_del(&dst->list); + kfree(dst); } } -int u32_list_len(struct list_head *l) +int tipc_dest_list_len(struct list_head *l) { - struct u32_item *item; + struct tipc_dest *dst; int i = 0; - list_for_each_entry(item, l, list) { + list_for_each_entry(dst, l, list) { i++; } return i; diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index 6ebdeb1d84a5..71926e429446 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -40,6 +40,7 @@ struct tipc_subscription; struct tipc_plist; struct tipc_nlist; +struct tipc_group; /* * TIPC name types reserved for internal TIPC use (both current and planned) @@ -101,9 +102,14 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, u32 limit, struct list_head *dports); +void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, + u32 type, u32 domain); void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, u32 upper, u32 domain, struct tipc_nlist *nodes); +bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain, + struct list_head *dsts, int *dstcnt, u32 exclude, + bool all); struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, u32 upper, u32 scope, u32 port_ref, u32 key); @@ -120,16 +126,22 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s); int tipc_nametbl_init(struct net *net); void tipc_nametbl_stop(struct net *net); -struct u32_item { +struct tipc_dest { struct list_head list; - u32 value; + union { + struct { + u32 port; + u32 node; + }; + u64 value; + }; }; -bool u32_push(struct list_head *l, u32 value); -u32 u32_pop(struct list_head *l); -bool u32_find(struct list_head *l, u32 value); -bool u32_del(struct list_head *l, u32 value); -void u32_list_purge(struct list_head *l); -int u32_list_len(struct list_head *l); +struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port); +bool tipc_dest_push(struct list_head *l, u32 node, u32 port); +bool tipc_dest_pop(struct list_head *l, u32 *node, u32 *port); +bool tipc_dest_del(struct list_head *l, u32 node, u32 port); +void tipc_dest_list_purge(struct list_head *l); +int tipc_dest_list_len(struct list_head *l); #endif diff --git a/net/tipc/node.c b/net/tipc/node.c index 198dbc7adbe1..89f8ac73bf65 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -157,7 +157,7 @@ static void tipc_node_timeout(unsigned long data); static void tipc_node_fsm_evt(struct tipc_node *n, int evt); static struct tipc_node *tipc_node_find(struct net *net, u32 addr); static void tipc_node_put(struct tipc_node *node); -static bool tipc_node_is_up(struct tipc_node *n); +static bool node_is_up(struct tipc_node *n); struct tipc_sock_conn { u32 port; @@ -657,7 +657,7 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, *slot1 = i; } - if (!tipc_node_is_up(n)) { + if (!node_is_up(n)) { if (tipc_link_peer_is_down(l)) tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT); @@ -717,11 +717,27 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) tipc_sk_rcv(n->net, &le->inputq); } -static bool tipc_node_is_up(struct tipc_node *n) +static bool node_is_up(struct tipc_node *n) { return n->active_links[0] != INVALID_BEARER_ID; } +bool tipc_node_is_up(struct net *net, u32 addr) +{ + struct tipc_node *n; + bool retval = false; + + if (in_own_node(net, addr)) + return true; + + n = tipc_node_find(net, addr); + if (!n) + return false; + retval = node_is_up(n); + tipc_node_put(n); + return retval; +} + void tipc_node_check_dest(struct net *net, u32 onode, struct tipc_bearer *b, u16 capabilities, u32 signature, @@ -1149,7 +1165,7 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node) if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr)) goto attr_msg_full; - if (tipc_node_is_up(node)) + if (node_is_up(node)) if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP)) goto attr_msg_full; @@ -1238,6 +1254,22 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, return 0; } +/* tipc_node_distr_xmit(): send single buffer msgs to individual destinations + * Note: this is only for SYSTEM_IMPORTANCE messages, which cannot be rejected + */ +int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *xmitq) +{ + struct sk_buff *skb; + u32 selector, dnode; + + while ((skb = __skb_dequeue(xmitq))) { + selector = msg_origport(buf_msg(skb)); + dnode = msg_destnode(buf_msg(skb)); + tipc_node_xmit_skb(net, skb, dnode, selector); + } + return 0; +} + void tipc_node_broadcast(struct net *net, struct sk_buff *skb) { struct sk_buff *txskb; @@ -1249,7 +1281,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb) dst = n->addr; if (in_own_node(net, dst)) continue; - if (!tipc_node_is_up(n)) + if (!node_is_up(n)) continue; txskb = pskb_copy(skb, GFP_ATOMIC); if (!txskb) diff --git a/net/tipc/node.h b/net/tipc/node.h index 898c22916984..acd58d23a70e 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -48,7 +48,8 @@ enum { TIPC_BCAST_SYNCH = (1 << 1), TIPC_BCAST_STATE_NACK = (1 << 2), TIPC_BLOCK_FLOWCTL = (1 << 3), - TIPC_BCAST_RCAST = (1 << 4) + TIPC_BCAST_RCAST = (1 << 4), + TIPC_MCAST_GROUPS = (1 << 5) }; #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ @@ -68,6 +69,7 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node, char *linkname, size_t len); int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, int selector); +int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *list); int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, u32 selector); void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr); @@ -76,6 +78,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb); int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel); +bool tipc_node_is_up(struct net *net, u32 addr); u16 tipc_node_get_capabilities(struct net *net, u32 addr); int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb); int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb); diff --git a/net/tipc/server.c b/net/tipc/server.c index 3cd6402e812c..713077536d0c 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -36,6 +36,8 @@ #include "server.h" #include "core.h" #include "socket.h" +#include "addr.h" +#include "msg.h" #include <net/sock.h> #include <linux/module.h> @@ -105,13 +107,11 @@ static void tipc_conn_kref_release(struct kref *kref) kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr)); sock_release(sock); con->sock = NULL; - - spin_lock_bh(&s->idr_lock); - idr_remove(&s->conn_idr, con->conid); - s->idr_in_use--; - spin_unlock_bh(&s->idr_lock); } - + spin_lock_bh(&s->idr_lock); + idr_remove(&s->conn_idr, con->conid); + s->idr_in_use--; + spin_unlock_bh(&s->idr_lock); tipc_clean_outqueues(con); kfree(con); } @@ -197,7 +197,8 @@ static void tipc_close_conn(struct tipc_conn *con) struct tipc_server *s = con->server; if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { - tipc_unregister_callbacks(con); + if (con->sock) + tipc_unregister_callbacks(con); if (con->conid) s->tipc_conn_release(con->conid, con->usr_data); @@ -207,8 +208,8 @@ static void tipc_close_conn(struct tipc_conn *con) * are harmless for us here as we have already deleted this * connection from server connection list. */ - kernel_sock_shutdown(con->sock, SHUT_RDWR); - + if (con->sock) + kernel_sock_shutdown(con->sock, SHUT_RDWR); conn_put(con); } } @@ -487,38 +488,104 @@ void tipc_conn_terminate(struct tipc_server *s, int conid) } } +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, + u32 lower, u32 upper, int *conid) +{ + struct tipc_subscriber *scbr; + struct tipc_subscr sub; + struct tipc_server *s; + struct tipc_conn *con; + + sub.seq.type = type; + sub.seq.lower = lower; + sub.seq.upper = upper; + sub.timeout = TIPC_WAIT_FOREVER; + sub.filter = TIPC_SUB_PORTS; + *(u32 *)&sub.usr_handle = port; + + con = tipc_alloc_conn(tipc_topsrv(net)); + if (!con) + return false; + + *conid = con->conid; + s = con->server; + scbr = s->tipc_conn_new(*conid); + if (!scbr) { + tipc_close_conn(con); + return false; + } + + con->usr_data = scbr; + con->sock = NULL; + s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub)); + return true; +} + +void tipc_topsrv_kern_unsubscr(struct net *net, int conid) +{ + struct tipc_conn *con; + + con = tipc_conn_lookup(tipc_topsrv(net), conid); + if (!con) + return; + tipc_close_conn(con); + conn_put(con); +} + +static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) +{ + u32 port = *(u32 *)&evt->s.usr_handle; + u32 self = tipc_own_addr(net); + struct sk_buff_head evtq; + struct sk_buff *skb; + + skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt), + self, self, port, port, 0); + if (!skb) + return; + msg_set_dest_droppable(buf_msg(skb), true); + memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt)); + skb_queue_head_init(&evtq); + __skb_queue_tail(&evtq, skb); + tipc_sk_rcv(net, &evtq); +} + static void tipc_send_to_sock(struct tipc_conn *con) { - int count = 0; struct tipc_server *s = con->server; struct outqueue_entry *e; + struct tipc_event *evt; struct msghdr msg; + int count = 0; int ret; spin_lock_bh(&con->outqueue_lock); while (test_bit(CF_CONNECTED, &con->flags)) { - e = list_entry(con->outqueue.next, struct outqueue_entry, - list); + e = list_entry(con->outqueue.next, struct outqueue_entry, list); if ((struct list_head *) e == &con->outqueue) break; - spin_unlock_bh(&con->outqueue_lock); - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT; + spin_unlock_bh(&con->outqueue_lock); - if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { - msg.msg_name = &e->dest; - msg.msg_namelen = sizeof(struct sockaddr_tipc); - } - ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, - e->iov.iov_len); - if (ret == -EWOULDBLOCK || ret == 0) { - cond_resched(); - goto out; - } else if (ret < 0) { - goto send_err; + if (con->sock) { + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { + msg.msg_name = &e->dest; + msg.msg_namelen = sizeof(struct sockaddr_tipc); + } + ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, + e->iov.iov_len); + if (ret == -EWOULDBLOCK || ret == 0) { + cond_resched(); + goto out; + } else if (ret < 0) { + goto send_err; + } + } else { + evt = e->iov.iov_base; + tipc_send_kern_top_evt(s->net, evt); } - /* Don't starve users filling buffers */ if (++count >= MAX_SEND_MSG_COUNT) { cond_resched(); diff --git a/net/tipc/server.h b/net/tipc/server.h index 34f8055afa3b..2113c9192633 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -83,13 +83,16 @@ struct tipc_server { int tipc_conn_sendmsg(struct tipc_server *s, int conid, struct sockaddr_tipc *addr, void *data, size_t len); +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, + u32 lower, u32 upper, int *conid); +void tipc_topsrv_kern_unsubscr(struct net *net, int conid); + /** * tipc_conn_terminate - terminate connection with server * * Note: Must call it in process context since it might sleep */ void tipc_conn_terminate(struct tipc_server *s, int conid); - int tipc_server_start(struct tipc_server *s); void tipc_server_stop(struct tipc_server *s); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index d50edd6e0019..2bbab4fe2f53 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,7 +1,7 @@ /* * net/tipc/socket.c: TIPC socket API * - * Copyright (c) 2001-2007, 2012-2016, Ericsson AB + * Copyright (c) 2001-2007, 2012-2017, Ericsson AB * Copyright (c) 2004-2008, 2010-2013, Wind River Systems * All rights reserved. * @@ -45,6 +45,7 @@ #include "socket.h" #include "bcast.h" #include "netlink.h" +#include "group.h" #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ #define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */ @@ -61,6 +62,11 @@ enum { TIPC_CONNECTING = TCP_SYN_SENT, }; +struct sockaddr_pair { + struct sockaddr_tipc sock; + struct sockaddr_tipc member; +}; + /** * struct tipc_sock - TIPC socket structure * @sk: socket - interacts with 'port' and with user via the socket API @@ -78,7 +84,7 @@ enum { * @conn_timeout: the time we can wait for an unresponded setup request * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue * @cong_link_cnt: number of congested links - * @sent_unacked: # messages sent by socket, and not yet acked by peer + * @snt_unacked: # messages sent by socket, and not yet acked by peer * @rcv_unacked: # messages read by user, but not yet acked back to peer * @peer: 'connected' peer for dgram/rdm * @node: hash table node @@ -109,9 +115,10 @@ struct tipc_sock { struct rhash_head node; struct tipc_mc_method mc_method; struct rcu_head rcu; + struct tipc_group *group; }; -static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); +static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb); static void tipc_data_ready(struct sock *sk); static void tipc_write_space(struct sock *sk); static void tipc_sock_destruct(struct sock *sk); @@ -123,6 +130,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); +static int tipc_sk_leave(struct tipc_sock *tsk); static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); static int tipc_sk_insert(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk); @@ -193,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk) return tsk->snt_unacked > tsk->snd_win; } +static u16 tsk_blocks(int len) +{ + return ((len / FLOWCTL_BLK_SZ) + 1); +} + /* tsk_blocks(): translate a buffer size in bytes to number of * advertisable blocks, taking into account the ratio truesize(len)/len * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ @@ -453,7 +466,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, msg_set_origport(msg, tsk->portid); setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk); sk->sk_shutdown = 0; - sk->sk_backlog_rcv = tipc_backlog_rcv; + sk->sk_backlog_rcv = tipc_sk_backlog_rcv; sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; @@ -559,13 +572,14 @@ static int tipc_release(struct socket *sock) __tipc_shutdown(sock, TIPC_ERR_NO_PORT); sk->sk_shutdown = SHUTDOWN_MASK; + tipc_sk_leave(tsk); tipc_sk_withdraw(tsk, 0, NULL); sk_stop_timer(sk, &sk->sk_timer); tipc_sk_remove(tsk); /* Reject any messages that accumulated in backlog queue */ release_sock(sk); - u32_list_purge(&tsk->cong_links); + tipc_dest_list_purge(&tsk->cong_links); tsk->cong_link_cnt = 0; call_rcu(&tsk->rcu, tipc_sk_callback); sock->sk = NULL; @@ -601,7 +615,10 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr, res = tipc_sk_withdraw(tsk, 0, NULL); goto exit; } - + if (tsk->group) { + res = -EACCES; + goto exit; + } if (uaddr_len < sizeof(struct sockaddr_tipc)) { res = -EINVAL; goto exit; @@ -697,39 +714,43 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, poll_table *wait) { struct sock *sk = sock->sk; + struct sk_buff *skb = skb_peek(&sk->sk_receive_queue); struct tipc_sock *tsk = tipc_sk(sk); - u32 mask = 0; + struct tipc_group *grp = tsk->group; + u32 revents = 0; sock_poll_wait(file, sk_sleep(sk), wait); if (sk->sk_shutdown & RCV_SHUTDOWN) - mask |= POLLRDHUP | POLLIN | POLLRDNORM; + revents |= POLLRDHUP | POLLIN | POLLRDNORM; if (sk->sk_shutdown == SHUTDOWN_MASK) - mask |= POLLHUP; + revents |= POLLHUP; switch (sk->sk_state) { case TIPC_ESTABLISHED: if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) - mask |= POLLOUT; + revents |= POLLOUT; /* fall thru' */ case TIPC_LISTEN: case TIPC_CONNECTING: - if (!skb_queue_empty(&sk->sk_receive_queue)) - mask |= (POLLIN | POLLRDNORM); + if (skb) + revents |= POLLIN | POLLRDNORM; break; case TIPC_OPEN: - if (!tsk->cong_link_cnt) - mask |= POLLOUT; - if (tipc_sk_type_connectionless(sk) && - (!skb_queue_empty(&sk->sk_receive_queue))) - mask |= (POLLIN | POLLRDNORM); + if (!grp || tipc_group_size(grp)) + if (!tsk->cong_link_cnt) + revents |= POLLOUT; + if (!tipc_sk_type_connectionless(sk)) + break; + if (!skb) + break; + revents |= POLLIN | POLLRDNORM; break; case TIPC_DISCONNECTING: - mask = (POLLIN | POLLRDNORM | POLLHUP); + revents = POLLIN | POLLRDNORM | POLLHUP; break; } - - return mask; + return revents; } /** @@ -757,6 +778,9 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, struct tipc_nlist dsts; int rc; + if (tsk->group) + return -EACCES; + /* Block or return if any destination link is congested */ rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); if (unlikely(rc)) @@ -794,6 +818,296 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, } /** + * tipc_send_group_msg - send a message to a member in the group + * @net: network namespace + * @m: message to send + * @mb: group member + * @dnode: destination node + * @dport: destination port + * @dlen: total length of message data + */ +static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, + struct msghdr *m, struct tipc_member *mb, + u32 dnode, u32 dport, int dlen) +{ + u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group); + struct tipc_mc_method *method = &tsk->mc_method; + int blks = tsk_blocks(GROUP_H_SIZE + dlen); + struct tipc_msg *hdr = &tsk->phdr; + struct sk_buff_head pkts; + int mtu, rc; + + /* Complete message header */ + msg_set_type(hdr, TIPC_GRP_UCAST_MSG); + msg_set_hdr_sz(hdr, GROUP_H_SIZE); + msg_set_destport(hdr, dport); + msg_set_destnode(hdr, dnode); + msg_set_grp_bc_seqno(hdr, bc_snd_nxt); + + /* Build message as chain of buffers */ + skb_queue_head_init(&pkts); + mtu = tipc_node_get_mtu(net, dnode, tsk->portid); + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) + return rc; + + /* Send message */ + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); + if (unlikely(rc == -ELINKCONG)) { + tipc_dest_push(&tsk->cong_links, dnode, 0); + tsk->cong_link_cnt++; + } + + /* Update send window */ + tipc_group_update_member(mb, blks); + + /* A broadcast sent within next EXPIRE period must follow same path */ + method->rcast = true; + method->mandatory = true; + return dlen; +} + +/** + * tipc_send_group_unicast - send message to a member in the group + * @sock: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + struct sock *sk = sock->sk; + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + int blks = tsk_blocks(GROUP_H_SIZE + dlen); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; + struct net *net = sock_net(sk); + struct tipc_member *mb = NULL; + u32 node, port; + int rc; + + node = dest->addr.id.node; + port = dest->addr.id.ref; + if (!port && !node) + return -EHOSTUNREACH; + + /* Block or return if destination link or member is congested */ + rc = tipc_wait_for_cond(sock, &timeout, + !tipc_dest_find(&tsk->cong_links, node, 0) && + !tipc_group_cong(grp, node, port, blks, &mb)); + if (unlikely(rc)) + return rc; + + if (unlikely(!mb)) + return -EHOSTUNREACH; + + rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen); + + return rc ? rc : dlen; +} + +/** + * tipc_send_group_anycast - send message to any member with given identity + * @sock: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + struct sock *sk = sock->sk; + struct tipc_sock *tsk = tipc_sk(sk); + struct list_head *cong_links = &tsk->cong_links; + int blks = tsk_blocks(GROUP_H_SIZE + dlen); + struct tipc_group *grp = tsk->group; + struct tipc_member *first = NULL; + struct tipc_member *mbr = NULL; + struct net *net = sock_net(sk); + u32 node, port, exclude; + u32 type, inst, domain; + struct list_head dsts; + int lookups = 0; + int dstcnt, rc; + bool cong; + + INIT_LIST_HEAD(&dsts); + + type = dest->addr.name.name.type; + inst = dest->addr.name.name.instance; + domain = addr_domain(net, dest->scope); + exclude = tipc_group_exclude(grp); + + while (++lookups < 4) { + first = NULL; + + /* Look for a non-congested destination member, if any */ + while (1) { + if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts, + &dstcnt, exclude, false)) + return -EHOSTUNREACH; + tipc_dest_pop(&dsts, &node, &port); + cong = tipc_group_cong(grp, node, port, blks, &mbr); + if (!cong) + break; + if (mbr == first) + break; + if (!first) + first = mbr; + } + + /* Start over if destination was not in member list */ + if (unlikely(!mbr)) + continue; + + if (likely(!cong && !tipc_dest_find(cong_links, node, 0))) + break; + + /* Block or return if destination link or member is congested */ + rc = tipc_wait_for_cond(sock, &timeout, + !tipc_dest_find(cong_links, node, 0) && + !tipc_group_cong(grp, node, port, + blks, &mbr)); + if (unlikely(rc)) + return rc; + + /* Send, unless destination disappeared while waiting */ + if (likely(mbr)) + break; + } + + if (unlikely(lookups >= 4)) + return -EHOSTUNREACH; + + rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen); + + return rc ? rc : dlen; +} + +/** + * tipc_send_group_bcast - send message to all members in communication group + * @sk: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + struct sock *sk = sock->sk; + struct net *net = sock_net(sk); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; + struct tipc_nlist *dsts = tipc_group_dests(grp); + struct tipc_mc_method *method = &tsk->mc_method; + bool ack = method->mandatory && method->rcast; + int blks = tsk_blocks(MCAST_H_SIZE + dlen); + struct tipc_msg *hdr = &tsk->phdr; + int mtu = tipc_bcast_get_mtu(net); + struct sk_buff_head pkts; + int rc = -EHOSTUNREACH; + + if (!dsts->local && !dsts->remote) + return -EHOSTUNREACH; + + /* Block or return if any destination link or member is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt && + !tipc_group_bc_cong(grp, blks)); + if (unlikely(rc)) + return rc; + + /* Complete message header */ + if (dest) { + msg_set_type(hdr, TIPC_GRP_MCAST_MSG); + msg_set_nameinst(hdr, dest->addr.name.name.instance); + } else { + msg_set_type(hdr, TIPC_GRP_BCAST_MSG); + msg_set_nameinst(hdr, 0); + } + msg_set_hdr_sz(hdr, GROUP_H_SIZE); + msg_set_destport(hdr, 0); + msg_set_destnode(hdr, 0); + msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); + + /* Avoid getting stuck with repeated forced replicasts */ + msg_set_grp_bc_ack_req(hdr, ack); + + /* Build message as chain of buffers */ + skb_queue_head_init(&pkts); + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) + return rc; + + /* Send message */ + rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt); + if (unlikely(rc)) + return rc; + + /* Update broadcast sequence number and send windows */ + tipc_group_update_bc_members(tsk->group, blks, ack); + + /* Broadcast link is now free to choose method for next broadcast */ + method->mandatory = false; + method->expires = jiffies; + + return dlen; +} + +/** + * tipc_send_group_mcast - send message to all members with given identity + * @sock: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + struct sock *sk = sock->sk; + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + struct tipc_name_seq *seq = &dest->addr.nameseq; + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; + struct net *net = sock_net(sk); + u32 domain, exclude, dstcnt; + struct list_head dsts; + + INIT_LIST_HEAD(&dsts); + + if (seq->lower != seq->upper) + return -ENOTSUPP; + + domain = addr_domain(net, dest->scope); + exclude = tipc_group_exclude(grp); + if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain, + &dsts, &dstcnt, exclude, true)) + return -EHOSTUNREACH; + + if (dstcnt == 1) { + tipc_dest_pop(&dsts, &dest->addr.id.node, &dest->addr.id.ref); + return tipc_send_group_unicast(sock, m, dlen, timeout); + } + + tipc_dest_list_purge(&dsts); + return tipc_send_group_bcast(sock, m, dlen, timeout); +} + +/** * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets * @arrvq: queue with arriving messages, to be cloned after destination lookup * @inputq: queue with cloned messages, delivered to socket after dest lookup @@ -803,13 +1117,15 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, struct sk_buff_head *inputq) { - struct tipc_msg *msg; - struct list_head dports; - u32 portid; u32 scope = TIPC_CLUSTER_SCOPE; - struct sk_buff_head tmpq; - uint hsz; + u32 self = tipc_own_addr(net); struct sk_buff *skb, *_skb; + u32 lower = 0, upper = ~0; + struct sk_buff_head tmpq; + u32 portid, oport, onode; + struct list_head dports; + struct tipc_msg *msg; + int user, mtyp, hsz; __skb_queue_head_init(&tmpq); INIT_LIST_HEAD(&dports); @@ -817,17 +1133,32 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, skb = tipc_skb_peek(arrvq, &inputq->lock); for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { msg = buf_msg(skb); + user = msg_user(msg); + mtyp = msg_type(msg); + if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) { + spin_lock_bh(&inputq->lock); + if (skb_peek(arrvq) == skb) { + __skb_dequeue(arrvq); + __skb_queue_tail(inputq, skb); + } + refcount_dec(&skb->users); + spin_unlock_bh(&inputq->lock); + continue; + } hsz = skb_headroom(skb) + msg_hdr_sz(msg); - - if (in_own_node(net, msg_orignode(msg))) + oport = msg_origport(msg); + onode = msg_orignode(msg); + if (onode == self) scope = TIPC_NODE_SCOPE; /* Create destination port list and message clones: */ - tipc_nametbl_mc_translate(net, - msg_nametype(msg), msg_namelower(msg), - msg_nameupper(msg), scope, &dports); - portid = u32_pop(&dports); - for (; portid; portid = u32_pop(&dports)) { + if (!msg_in_group(msg)) { + lower = msg_namelower(msg); + upper = msg_nameupper(msg); + } + tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper, + scope, &dports); + while (tipc_dest_pop(&dports, NULL, &portid)) { _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); if (_skb) { msg_set_destport(buf_msg(_skb), portid); @@ -850,16 +1181,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, } /** - * tipc_sk_proto_rcv - receive a connection mng protocol message + * tipc_sk_conn_proto_rcv - receive a connection mng protocol message * @tsk: receiving socket * @skb: pointer to message buffer. */ -static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, - struct sk_buff_head *xmitq) +static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, + struct sk_buff_head *xmitq) { - struct sock *sk = &tsk->sk; - u32 onode = tsk_own_node(tsk); struct tipc_msg *hdr = buf_msg(skb); + u32 onode = tsk_own_node(tsk); + struct sock *sk = &tsk->sk; int mtyp = msg_type(hdr); bool conn_cong; @@ -931,6 +1262,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); struct list_head *clinks = &tsk->cong_links; bool syn = !tipc_sk_type_connectionless(sk); + struct tipc_group *grp = tsk->group; struct tipc_msg *hdr = &tsk->phdr; struct tipc_name_seq *seq; struct sk_buff_head pkts; @@ -941,18 +1273,31 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; + if (likely(dest)) { + if (unlikely(m->msg_namelen < sizeof(*dest))) + return -EINVAL; + if (unlikely(dest->family != AF_TIPC)) + return -EINVAL; + } + + if (grp) { + if (!dest) + return tipc_send_group_bcast(sock, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_NAME) + return tipc_send_group_anycast(sock, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_ID) + return tipc_send_group_unicast(sock, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_MCAST) + return tipc_send_group_mcast(sock, m, dlen, timeout); + return -EINVAL; + } + if (unlikely(!dest)) { dest = &tsk->peer; if (!syn || dest->family != AF_TIPC) return -EDESTADDRREQ; } - if (unlikely(m->msg_namelen < sizeof(*dest))) - return -EINVAL; - - if (unlikely(dest->family != AF_TIPC)) - return -EINVAL; - if (unlikely(syn)) { if (sk->sk_state == TIPC_LISTEN) return -EPIPE; @@ -985,7 +1330,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) msg_set_destport(hdr, dport); if (unlikely(!dport && !dnode)) return -EHOSTUNREACH; - } else if (dest->addrtype == TIPC_ADDR_ID) { dnode = dest->addr.id.node; msg_set_type(hdr, TIPC_DIRECT_MSG); @@ -996,7 +1340,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) } /* Block or return if destination link is congested */ - rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode)); + rc = tipc_wait_for_cond(sock, &timeout, + !tipc_dest_find(clinks, dnode, 0)); if (unlikely(rc)) return rc; @@ -1008,7 +1353,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); if (unlikely(rc == -ELINKCONG)) { - u32_push(clinks, dnode); + tipc_dest_push(clinks, dnode, 0); tsk->cong_link_cnt++; rc = 0; } @@ -1142,26 +1487,38 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port, } /** - * set_orig_addr - capture sender's address for received message + * tipc_sk_set_orig_addr - capture sender's address for received message * @m: descriptor for message info - * @msg: received message header + * @hdr: received message header * * Note: Address is not captured if not requested by receiver. */ -static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg) +static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb) { - DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name); + DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name); + struct tipc_msg *hdr = buf_msg(skb); - if (addr) { - addr->family = AF_TIPC; - addr->addrtype = TIPC_ADDR_ID; - memset(&addr->addr, 0, sizeof(addr->addr)); - addr->addr.id.ref = msg_origport(msg); - addr->addr.id.node = msg_orignode(msg); - addr->addr.name.domain = 0; /* could leave uninitialized */ - addr->scope = 0; /* could leave uninitialized */ - m->msg_namelen = sizeof(struct sockaddr_tipc); - } + if (!srcaddr) + return; + + srcaddr->sock.family = AF_TIPC; + srcaddr->sock.addrtype = TIPC_ADDR_ID; + srcaddr->sock.addr.id.ref = msg_origport(hdr); + srcaddr->sock.addr.id.node = msg_orignode(hdr); + srcaddr->sock.addr.name.domain = 0; + srcaddr->sock.scope = 0; + m->msg_namelen = sizeof(struct sockaddr_tipc); + + if (!msg_in_group(hdr)) + return; + + /* Group message users may also want to know sending member's id */ + srcaddr->member.family = AF_TIPC; + srcaddr->member.addrtype = TIPC_ADDR_NAME; + srcaddr->member.addr.name.name.type = msg_nametype(hdr); + srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member; + srcaddr->member.addr.name.domain = 0; + m->msg_namelen = sizeof(*srcaddr); } /** @@ -1318,11 +1675,13 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buflen, int flags) { struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - struct sk_buff *skb; - struct tipc_msg *hdr; bool connected = !tipc_sk_type_connectionless(sk); + struct tipc_sock *tsk = tipc_sk(sk); int rc, err, hlen, dlen, copy; + struct sk_buff_head xmitq; + struct tipc_msg *hdr; + struct sk_buff *skb; + bool grp_evt; long timeout; /* Catch invalid receive requests */ @@ -1336,8 +1695,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, } timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); + /* Step rcv queue to first msg with data or error; wait if necessary */ do { - /* Look at first msg in receive queue; wait if necessary */ rc = tipc_wait_for_rcvmsg(sock, &timeout); if (unlikely(rc)) goto exit; @@ -1346,13 +1705,14 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, dlen = msg_data_sz(hdr); hlen = msg_hdr_sz(hdr); err = msg_errcode(hdr); + grp_evt = msg_is_grp_evt(hdr); if (likely(dlen || err)) break; tsk_advance_rx_queue(sk); } while (1); /* Collect msg meta data, including error code and rejected data */ - set_orig_addr(m, hdr); + tipc_sk_set_orig_addr(m, skb); rc = tipc_sk_anc_data_recv(m, hdr, tsk); if (unlikely(rc)) goto exit; @@ -1372,15 +1732,33 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, if (unlikely(rc)) goto exit; + /* Mark message as group event if applicable */ + if (unlikely(grp_evt)) { + if (msg_grp_evt(hdr) == TIPC_WITHDRAWN) + m->msg_flags |= MSG_EOR; + m->msg_flags |= MSG_OOB; + copy = 0; + } + /* Caption of data or error code/rejected data was successful */ if (unlikely(flags & MSG_PEEK)) goto exit; + /* Send group flow control advertisement when applicable */ + if (tsk->group && msg_in_group(hdr) && !grp_evt) { + skb_queue_head_init(&xmitq); + tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen), + msg_orignode(hdr), msg_origport(hdr), + &xmitq); + tipc_node_distr_xmit(sock_net(sk), &xmitq); + } + tsk_advance_rx_queue(sk); + if (likely(!connected)) goto exit; - /* Send connection flow control ack when applicable */ + /* Send connection flow control advertisement when applicable */ tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) tipc_sk_send_ack(tsk); @@ -1446,7 +1824,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, /* Collect msg meta data, incl. error code and rejected data */ if (!copied) { - set_orig_addr(m, hdr); + tipc_sk_set_orig_addr(m, skb); rc = tipc_sk_anc_data_recv(m, hdr, tsk); if (rc) break; @@ -1532,14 +1910,51 @@ static void tipc_sock_destruct(struct sock *sk) __skb_queue_purge(&sk->sk_receive_queue); } +static void tipc_sk_proto_rcv(struct sock *sk, + struct sk_buff_head *inputq, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb = __skb_dequeue(inputq); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *hdr = buf_msg(skb); + struct tipc_group *grp = tsk->group; + bool wakeup = false; + + switch (msg_user(hdr)) { + case CONN_MANAGER: + tipc_sk_conn_proto_rcv(tsk, skb, xmitq); + return; + case SOCK_WAKEUP: + tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0); + tsk->cong_link_cnt--; + wakeup = true; + break; + case GROUP_PROTOCOL: + tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq); + break; + case TOP_SRV: + tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf, + skb, inputq, xmitq); + skb = NULL; + break; + default: + break; + } + + if (wakeup) + sk->sk_write_space(sk); + + kfree_skb(skb); +} + /** - * filter_connect - Handle all incoming messages for a connection-based socket + * tipc_filter_connect - Handle incoming message for a connection-based socket * @tsk: TIPC socket * @skb: pointer to message buffer. Set to NULL if buffer is consumed * * Returns true if everything ok, false otherwise */ -static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) +static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) { struct sock *sk = &tsk->sk; struct net *net = sock_net(sk); @@ -1643,6 +2058,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = buf_msg(skb); + if (unlikely(msg_in_group(hdr))) + return sk->sk_rcvbuf; + if (unlikely(!msg_connected(hdr))) return sk->sk_rcvbuf << msg_importance(hdr); @@ -1653,7 +2071,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) } /** - * filter_rcv - validate incoming message + * tipc_sk_filter_rcv - validate incoming message * @sk: socket * @skb: pointer to message. * @@ -1662,99 +2080,71 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) * * Called with socket lock already taken * - * Returns true if message was added to socket receive queue, otherwise false */ -static bool filter_rcv(struct sock *sk, struct sk_buff *skb, - struct sk_buff_head *xmitq) +static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, + struct sk_buff_head *xmitq) { + bool sk_conn = !tipc_sk_type_connectionless(sk); struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; struct tipc_msg *hdr = buf_msg(skb); - unsigned int limit = rcvbuf_limit(sk, skb); - int err = TIPC_OK; - int usr = msg_user(hdr); - u32 onode; + struct net *net = sock_net(sk); + struct sk_buff_head inputq; + int limit, err = TIPC_OK; - if (unlikely(msg_user(hdr) == CONN_MANAGER)) { - tipc_sk_proto_rcv(tsk, skb, xmitq); - return false; - } + TIPC_SKB_CB(skb)->bytes_read = 0; + __skb_queue_head_init(&inputq); + __skb_queue_tail(&inputq, skb); - if (unlikely(usr == SOCK_WAKEUP)) { - onode = msg_orignode(hdr); - kfree_skb(skb); - u32_del(&tsk->cong_links, onode); - tsk->cong_link_cnt--; - sk->sk_write_space(sk); - return false; - } + if (unlikely(!msg_isdata(hdr))) + tipc_sk_proto_rcv(sk, &inputq, xmitq); - /* Drop if illegal message type */ - if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) { - kfree_skb(skb); - return false; - } + if (unlikely(grp)) + tipc_group_filter_msg(grp, &inputq, xmitq); - /* Reject if wrong message type for current socket state */ - if (tipc_sk_type_connectionless(sk)) { - if (msg_connected(hdr)) { + /* Validate and add to receive buffer if there is space */ + while ((skb = __skb_dequeue(&inputq))) { + hdr = buf_msg(skb); + limit = rcvbuf_limit(sk, skb); + if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || + (!sk_conn && msg_connected(hdr)) || + (!grp && msg_in_group(hdr))) err = TIPC_ERR_NO_PORT; - goto reject; - } - } else if (unlikely(!filter_connect(tsk, skb))) { - err = TIPC_ERR_NO_PORT; - goto reject; - } + else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) + err = TIPC_ERR_OVERLOAD; - /* Reject message if there isn't room to queue it */ - if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) { - err = TIPC_ERR_OVERLOAD; - goto reject; + if (unlikely(err)) { + tipc_skb_reject(net, err, skb, xmitq); + err = TIPC_OK; + continue; + } + __skb_queue_tail(&sk->sk_receive_queue, skb); + skb_set_owner_r(skb, sk); + sk->sk_data_ready(sk); } - - /* Enqueue message */ - TIPC_SKB_CB(skb)->bytes_read = 0; - __skb_queue_tail(&sk->sk_receive_queue, skb); - skb_set_owner_r(skb, sk); - - sk->sk_data_ready(sk); - return true; - -reject: - if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err)) - __skb_queue_tail(xmitq, skb); - return false; } /** - * tipc_backlog_rcv - handle incoming message from backlog queue + * tipc_sk_backlog_rcv - handle incoming message from backlog queue * @sk: socket * @skb: message * * Caller must hold socket lock - * - * Returns 0 */ -static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) +static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb) { - unsigned int truesize = skb->truesize; + unsigned int before = sk_rmem_alloc_get(sk); struct sk_buff_head xmitq; - u32 dnode, selector; + unsigned int added; __skb_queue_head_init(&xmitq); - if (likely(filter_rcv(sk, skb, &xmitq))) { - atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); - return 0; - } - - if (skb_queue_empty(&xmitq)) - return 0; + tipc_sk_filter_rcv(sk, skb, &xmitq); + added = sk_rmem_alloc_get(sk) - before; + atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt); - /* Send response/rejected message */ - skb = __skb_dequeue(&xmitq); - dnode = msg_destnode(buf_msg(skb)); - selector = msg_origport(buf_msg(skb)); - tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); + /* Send pending response/rejected messages, if any */ + tipc_node_distr_xmit(sock_net(sk), &xmitq); return 0; } @@ -1786,7 +2176,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, /* Add message directly to receive queue if possible */ if (!sock_owned_by_user(sk)) { - filter_rcv(sk, skb, xmitq); + tipc_sk_filter_rcv(sk, skb, xmitq); continue; } @@ -1833,14 +2223,10 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) spin_unlock_bh(&sk->sk_lock.slock); } /* Send pending response/rejected messages, if any */ - while ((skb = __skb_dequeue(&xmitq))) { - dnode = msg_destnode(buf_msg(skb)); - tipc_node_xmit_skb(net, skb, dnode, dport); - } + tipc_node_distr_xmit(sock_net(sk), &xmitq); sock_put(sk); continue; } - /* No destination socket => dequeue skb if still there */ skb = tipc_skb_dequeue(inputq, dport); if (!skb) @@ -1903,28 +2289,32 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest, int previous; int res = 0; + if (destlen != sizeof(struct sockaddr_tipc)) + return -EINVAL; + lock_sock(sk); - /* DGRAM/RDM connect(), just save the destaddr */ - if (tipc_sk_type_connectionless(sk)) { - if (dst->family == AF_UNSPEC) { - memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc)); - } else if (destlen != sizeof(struct sockaddr_tipc)) { - res = -EINVAL; - } else { - memcpy(&tsk->peer, dest, destlen); - } + if (tsk->group) { + res = -EINVAL; goto exit; } - /* - * Reject connection attempt using multicast address - * - * Note: send_msg() validates the rest of the address fields, - * so there's no need to do it here - */ - if (dst->addrtype == TIPC_ADDR_MCAST) { + if (dst->family == AF_UNSPEC) { + memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc)); + if (!tipc_sk_type_connectionless(sk)) + res = -EINVAL; + goto exit; + } else if (dst->family != AF_TIPC) { res = -EINVAL; + } + if (dst->addrtype != TIPC_ADDR_ID && dst->addrtype != TIPC_ADDR_NAME) + res = -EINVAL; + if (res) + goto exit; + + /* DGRAM/RDM connect(), just save the destaddr */ + if (tipc_sk_type_connectionless(sk)) { + memcpy(&tsk->peer, dest, destlen); goto exit; } @@ -2345,6 +2735,56 @@ void tipc_sk_rht_destroy(struct net *net) rhashtable_destroy(&tn->sk_rht); } +static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq) +{ + struct net *net = sock_net(&tsk->sk); + u32 domain = addr_domain(net, mreq->scope); + struct tipc_group *grp = tsk->group; + struct tipc_msg *hdr = &tsk->phdr; + struct tipc_name_seq seq; + int rc; + + if (mreq->type < TIPC_RESERVED_TYPES) + return -EACCES; + if (grp) + return -EACCES; + grp = tipc_group_create(net, tsk->portid, mreq); + if (!grp) + return -ENOMEM; + tsk->group = grp; + msg_set_lookup_scope(hdr, mreq->scope); + msg_set_nametype(hdr, mreq->type); + msg_set_dest_droppable(hdr, true); + seq.type = mreq->type; + seq.lower = mreq->instance; + seq.upper = seq.lower; + tipc_nametbl_build_group(net, grp, mreq->type, domain); + rc = tipc_sk_publish(tsk, mreq->scope, &seq); + if (rc) + tipc_group_delete(net, grp); + + /* Eliminate any risk that a broadcast overtakes the sent JOIN */ + tsk->mc_method.rcast = true; + tsk->mc_method.mandatory = true; + return rc; +} + +static int tipc_sk_leave(struct tipc_sock *tsk) +{ + struct net *net = sock_net(&tsk->sk); + struct tipc_group *grp = tsk->group; + struct tipc_name_seq seq; + int scope; + + if (!grp) + return -EINVAL; + tipc_group_self(grp, &seq, &scope); + tipc_group_delete(net, grp); + tsk->group = NULL; + tipc_sk_withdraw(tsk, scope, &seq); + return 0; +} + /** * tipc_setsockopt - set socket option * @sock: socket structure @@ -2363,6 +2803,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group_req mreq; u32 value = 0; int res = 0; @@ -2378,9 +2819,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, case TIPC_CONN_TIMEOUT: if (ol < sizeof(value)) return -EINVAL; - res = get_user(value, (u32 __user *)ov); - if (res) - return res; + if (get_user(value, (u32 __user *)ov)) + return -EFAULT; + break; + case TIPC_GROUP_JOIN: + if (ol < sizeof(mreq)) + return -EINVAL; + if (copy_from_user(&mreq, ov, sizeof(mreq))) + return -EFAULT; break; default: if (ov || ol) @@ -2413,6 +2859,12 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, tsk->mc_method.rcast = true; tsk->mc_method.mandatory = true; break; + case TIPC_GROUP_JOIN: + res = tipc_sk_join(tsk, &mreq); + break; + case TIPC_GROUP_LEAVE: + res = tipc_sk_leave(tsk); + break; default: res = -EINVAL; } @@ -2440,7 +2892,8 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - int len; + struct tipc_name_seq seq; + int len, scope; u32 value; int res; @@ -2474,6 +2927,12 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, case TIPC_SOCK_RECVQ_DEPTH: value = skb_queue_len(&sk->sk_receive_queue); break; + case TIPC_GROUP_JOIN: + seq.type = 0; + if (tsk->group) + tipc_group_self(tsk->group, &seq, &scope); + value = seq.type; + break; default: res = -EINVAL; } |