summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/uapi/linux/tipc.h1
-rw-r--r--net/tipc/group.c60
-rw-r--r--net/tipc/group.h2
-rw-r--r--net/tipc/msg.h22
-rw-r--r--net/tipc/socket.c49
5 files changed, 101 insertions, 33 deletions
diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h
index 5f7b2c4a09ab..ef41c11a7f38 100644
--- a/include/uapi/linux/tipc.h
+++ b/include/uapi/linux/tipc.h
@@ -238,6 +238,7 @@ struct sockaddr_tipc {
* Flag values
*/
#define TIPC_GROUP_LOOPBACK 0x1 /* Receive copy of sent msg when match */
+#define TIPC_GROUP_MEMBER_EVTS 0x2 /* Receive membership events in socket */
struct tipc_group_req {
__u32 type; /* group id */
diff --git a/net/tipc/group.c b/net/tipc/group.c
index beb214a3420c..1bfa9348b26d 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -59,6 +59,7 @@ enum mbr_state {
struct tipc_member {
struct rb_node tree_node;
struct list_head list;
+ struct sk_buff *event_msg;
u32 node;
u32 port;
u32 instance;
@@ -79,6 +80,7 @@ struct tipc_group {
u16 member_cnt;
u16 bc_snd_nxt;
bool loopback;
+ bool events;
};
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
@@ -117,6 +119,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
grp->instance = mreq->instance;
grp->scope = mreq->scope;
grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
+ grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
return grp;
kfree(grp);
@@ -279,6 +282,13 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
if (!msg_in_group(hdr))
goto drop;
+ if (mtyp == TIPC_GRP_MEMBER_EVT) {
+ if (!grp->events)
+ goto drop;
+ __skb_queue_tail(inputq, skb);
+ return;
+ }
+
m = tipc_group_find_member(grp, node, port);
if (!tipc_group_is_receiver(m))
goto drop;
@@ -311,6 +321,7 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
}
void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
+ struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
u32 node = msg_orignode(hdr);
@@ -332,10 +343,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
/* Wait until PUBLISH event is received */
- if (m->state == MBR_DISCOVERED)
+ if (m->state == MBR_DISCOVERED) {
m->state = MBR_JOINING;
- else if (m->state == MBR_PUBLISHED)
+ } else if (m->state == MBR_PUBLISHED) {
m->state = MBR_JOINED;
+ __skb_queue_tail(inputq, m->event_msg);
+ }
return;
case GRP_LEAVE_MSG:
if (!m)
@@ -347,6 +360,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
return;
}
/* Otherwise deliver already received WITHDRAW event */
+ __skb_queue_tail(inputq, m->event_msg);
tipc_group_delete_member(grp, m);
return;
default:
@@ -354,16 +368,17 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
}
}
-/* tipc_group_member_evt() - receive and handle a member up/down event
- */
void tipc_group_member_evt(struct tipc_group *grp,
struct sk_buff *skb,
+ struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
struct tipc_msg *hdr = buf_msg(skb);
struct tipc_event *evt = (void *)msg_data(hdr);
+ u32 instance = evt->found_lower;
u32 node = evt->port.node;
u32 port = evt->port.ref;
+ int event = evt->event;
struct tipc_member *m;
struct net *net;
u32 self;
@@ -376,32 +391,51 @@ void tipc_group_member_evt(struct tipc_group *grp,
if (!grp->loopback && node == self && port == grp->portid)
goto drop;
+ /* Convert message before delivery to user */
+ msg_set_hdr_sz(hdr, GROUP_H_SIZE);
+ msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
+ msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
+ msg_set_origport(hdr, port);
+ msg_set_orignode(hdr, node);
+ msg_set_nametype(hdr, grp->type);
+ msg_set_grp_evt(hdr, event);
+
m = tipc_group_find_member(grp, node, port);
- if (evt->event == TIPC_PUBLISHED) {
+ if (event == TIPC_PUBLISHED) {
if (!m)
m = tipc_group_create_member(grp, node, port,
MBR_DISCOVERED);
if (!m)
goto drop;
- /* Wait if JOIN message not yet received */
- if (m->state == MBR_DISCOVERED)
+ /* Hold back event if JOIN message not yet received */
+ if (m->state == MBR_DISCOVERED) {
+ m->event_msg = skb;
m->state = MBR_PUBLISHED;
- else
+ } else {
+ __skb_queue_tail(inputq, skb);
m->state = MBR_JOINED;
- m->instance = evt->found_lower;
+ }
+ m->instance = instance;
+ TIPC_SKB_CB(skb)->orig_member = m->instance;
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
- } else if (evt->event == TIPC_WITHDRAWN) {
+ } else if (event == TIPC_WITHDRAWN) {
if (!m)
goto drop;
- /* Keep back event if more messages might be expected */
- if (m->state != MBR_LEAVING && tipc_node_is_up(net, node))
+ TIPC_SKB_CB(skb)->orig_member = m->instance;
+
+ /* Hold back event if more messages might be expected */
+ if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
+ m->event_msg = skb;
m->state = MBR_LEAVING;
- else
+ } else {
+ __skb_queue_tail(inputq, skb);
tipc_group_delete_member(grp, m);
+ }
}
+ return;
drop:
kfree_skb(skb);
}
diff --git a/net/tipc/group.h b/net/tipc/group.h
index 9bdf4479fc03..5d3f10d28967 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -54,9 +54,11 @@ void tipc_group_filter_msg(struct tipc_group *grp,
struct sk_buff_head *xmitq);
void tipc_group_member_evt(struct tipc_group *grp,
struct sk_buff *skb,
+ struct sk_buff_head *inputq,
struct sk_buff_head *xmitq);
void tipc_group_proto_rcv(struct tipc_group *grp,
struct tipc_msg *hdr,
+ struct sk_buff_head *inputq,
struct sk_buff_head *xmitq);
void tipc_group_update_bc_members(struct tipc_group *grp);
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index e438716d2372..1b527b154e46 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -65,7 +65,8 @@ struct plist;
#define TIPC_MCAST_MSG 1
#define TIPC_NAMED_MSG 2
#define TIPC_DIRECT_MSG 3
-#define TIPC_GRP_BCAST_MSG 4
+#define TIPC_GRP_MEMBER_EVT 4
+#define TIPC_GRP_BCAST_MSG 5
/*
* Internal message users
@@ -258,7 +259,14 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n)
static inline int msg_in_group(struct tipc_msg *m)
{
- return (msg_type(m) == TIPC_GRP_BCAST_MSG);
+ int mtyp = msg_type(m);
+
+ return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT);
+}
+
+static inline bool msg_is_grp_evt(struct tipc_msg *m)
+{
+ return msg_type(m) == TIPC_GRP_MEMBER_EVT;
}
static inline u32 msg_named(struct tipc_msg *m)
@@ -824,6 +832,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
/* Word 10
*/
+static inline u16 msg_grp_evt(struct tipc_msg *m)
+{
+ return msg_bits(m, 10, 0, 0x3);
+}
+
+static inline void msg_set_grp_evt(struct tipc_msg *m, int n)
+{
+ msg_set_bits(m, 10, 0, 0x3, n);
+}
+
static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
{
return msg_bits(m, 10, 16, 0xffff);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 25ecf1201527..0a2eac309177 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -709,41 +709,43 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
poll_table *wait)
{
struct sock *sk = sock->sk;
+ struct sk_buff *skb = skb_peek(&sk->sk_receive_queue);
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_group *grp = tsk->group;
- u32 mask = 0;
+ u32 revents = 0;
sock_poll_wait(file, sk_sleep(sk), wait);
if (sk->sk_shutdown & RCV_SHUTDOWN)
- mask |= POLLRDHUP | POLLIN | POLLRDNORM;
+ revents |= POLLRDHUP | POLLIN | POLLRDNORM;
if (sk->sk_shutdown == SHUTDOWN_MASK)
- mask |= POLLHUP;
+ revents |= POLLHUP;
switch (sk->sk_state) {
case TIPC_ESTABLISHED:
if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
- mask |= POLLOUT;
+ revents |= POLLOUT;
/* fall thru' */
case TIPC_LISTEN:
case TIPC_CONNECTING:
- if (!skb_queue_empty(&sk->sk_receive_queue))
- mask |= (POLLIN | POLLRDNORM);
+ if (skb)
+ revents |= POLLIN | POLLRDNORM;
break;
case TIPC_OPEN:
if (!grp || tipc_group_size(grp))
if (!tsk->cong_link_cnt)
- mask |= POLLOUT;
- if (tipc_sk_type_connectionless(sk) &&
- (!skb_queue_empty(&sk->sk_receive_queue)))
- mask |= (POLLIN | POLLRDNORM);
+ revents |= POLLOUT;
+ if (!tipc_sk_type_connectionless(sk))
+ break;
+ if (!skb)
+ break;
+ revents |= POLLIN | POLLRDNORM;
break;
case TIPC_DISCONNECTING:
- mask = (POLLIN | POLLRDNORM | POLLHUP);
+ revents = POLLIN | POLLRDNORM | POLLHUP;
break;
}
-
- return mask;
+ return revents;
}
/**
@@ -1415,11 +1417,12 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
size_t buflen, int flags)
{
struct sock *sk = sock->sk;
- struct tipc_sock *tsk = tipc_sk(sk);
- struct sk_buff *skb;
- struct tipc_msg *hdr;
bool connected = !tipc_sk_type_connectionless(sk);
+ struct tipc_sock *tsk = tipc_sk(sk);
int rc, err, hlen, dlen, copy;
+ struct tipc_msg *hdr;
+ struct sk_buff *skb;
+ bool grp_evt;
long timeout;
/* Catch invalid receive requests */
@@ -1443,6 +1446,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
dlen = msg_data_sz(hdr);
hlen = msg_hdr_sz(hdr);
err = msg_errcode(hdr);
+ grp_evt = msg_is_grp_evt(hdr);
if (likely(dlen || err))
break;
tsk_advance_rx_queue(sk);
@@ -1469,11 +1473,20 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
if (unlikely(rc))
goto exit;
+ /* Mark message as group event if applicable */
+ if (unlikely(grp_evt)) {
+ if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
+ m->msg_flags |= MSG_EOR;
+ m->msg_flags |= MSG_OOB;
+ copy = 0;
+ }
+
/* Caption of data or error code/rejected data was successful */
if (unlikely(flags & MSG_PEEK))
goto exit;
tsk_advance_rx_queue(sk);
+
if (likely(!connected))
goto exit;
@@ -1648,10 +1661,10 @@ static void tipc_sk_proto_rcv(struct sock *sk,
sk->sk_write_space(sk);
break;
case GROUP_PROTOCOL:
- tipc_group_proto_rcv(grp, hdr, xmitq);
+ tipc_group_proto_rcv(grp, hdr, inputq, xmitq);
break;
case TOP_SRV:
- tipc_group_member_evt(tsk->group, skb, xmitq);
+ tipc_group_member_evt(tsk->group, skb, inputq, xmitq);
skb = NULL;
break;
default: