diff options
Diffstat (limited to 'net/rds/send.c')
-rw-r--r-- | net/rds/send.c | 355 |
1 files changed, 204 insertions, 151 deletions
diff --git a/net/rds/send.c b/net/rds/send.c index c9cdb358ea88..896626b9a0ef 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -62,14 +62,14 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status); * Reset the send state. Callers must ensure that this doesn't race with * rds_send_xmit(). */ -void rds_send_reset(struct rds_connection *conn) +void rds_send_path_reset(struct rds_conn_path *cp) { struct rds_message *rm, *tmp; unsigned long flags; - if (conn->c_xmit_rm) { - rm = conn->c_xmit_rm; - conn->c_xmit_rm = NULL; + if (cp->cp_xmit_rm) { + rm = cp->cp_xmit_rm; + cp->cp_xmit_rm = NULL; /* Tell the user the RDMA op is no longer mapped by the * transport. This isn't entirely true (it's flushed out * independently) but as the connection is down, there's @@ -78,36 +78,37 @@ void rds_send_reset(struct rds_connection *conn) rds_message_put(rm); } - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_atomic_sent = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_data_sent = 0; + cp->cp_xmit_sg = 0; + cp->cp_xmit_hdr_off = 0; + cp->cp_xmit_data_off = 0; + cp->cp_xmit_atomic_sent = 0; + cp->cp_xmit_rdma_sent = 0; + cp->cp_xmit_data_sent = 0; - conn->c_map_queued = 0; + cp->cp_conn->c_map_queued = 0; - conn->c_unacked_packets = rds_sysctl_max_unacked_packets; - conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; + cp->cp_unacked_packets = rds_sysctl_max_unacked_packets; + cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes; /* Mark messages as retransmissions, and move them to the send q */ - spin_lock_irqsave(&conn->c_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { + spin_lock_irqsave(&cp->cp_lock, flags); + list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); } - list_splice_init(&conn->c_retrans, &conn->c_send_queue); - spin_unlock_irqrestore(&conn->c_lock, flags); + list_splice_init(&cp->cp_retrans, &cp->cp_send_queue); + spin_unlock_irqrestore(&cp->cp_lock, flags); } +EXPORT_SYMBOL_GPL(rds_send_path_reset); -static int acquire_in_xmit(struct rds_connection *conn) +static int acquire_in_xmit(struct rds_conn_path *cp) { - return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; + return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0; } -static void release_in_xmit(struct rds_connection *conn) +static void release_in_xmit(struct rds_conn_path *cp) { - clear_bit(RDS_IN_XMIT, &conn->c_flags); + clear_bit(RDS_IN_XMIT, &cp->cp_flags); smp_mb__after_atomic(); /* * We don't use wait_on_bit()/wake_up_bit() because our waking is in a @@ -115,8 +116,8 @@ static void release_in_xmit(struct rds_connection *conn) * the system-wide hashed waitqueue buckets in the fast path only to * almost never find waiters. */ - if (waitqueue_active(&conn->c_waitq)) - wake_up_all(&conn->c_waitq); + if (waitqueue_active(&cp->cp_waitq)) + wake_up_all(&cp->cp_waitq); } /* @@ -133,8 +134,9 @@ static void release_in_xmit(struct rds_connection *conn) * - small message latency is higher behind queued large messages * - large message latency isn't starved by intervening small sends */ -int rds_send_xmit(struct rds_connection *conn) +int rds_send_xmit(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_message *rm; unsigned long flags; unsigned int tmp; @@ -154,7 +156,7 @@ restart: * avoids blocking the caller and trading per-connection data between * caches per message. */ - if (!acquire_in_xmit(conn)) { + if (!acquire_in_xmit(cp)) { rds_stats_inc(s_send_lock_contention); ret = -ENOMEM; goto out; @@ -168,21 +170,21 @@ restart: * The acquire_in_xmit() check above ensures that only one * caller can increment c_send_gen at any time. */ - conn->c_send_gen++; - send_gen = conn->c_send_gen; + cp->cp_send_gen++; + send_gen = cp->cp_send_gen; /* * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, * we do the opposite to avoid races. */ - if (!rds_conn_up(conn)) { - release_in_xmit(conn); + if (!rds_conn_path_up(cp)) { + release_in_xmit(cp); ret = 0; goto out; } - if (conn->c_trans->xmit_prepare) - conn->c_trans->xmit_prepare(conn); + if (conn->c_trans->xmit_path_prepare) + conn->c_trans->xmit_path_prepare(cp); /* * spin trying to push headers and data down the connection until @@ -190,7 +192,7 @@ restart: */ while (1) { - rm = conn->c_xmit_rm; + rm = cp->cp_xmit_rm; /* * If between sending messages, we can send a pending congestion @@ -203,14 +205,16 @@ restart: break; } rm->data.op_active = 1; + rm->m_inc.i_conn_path = cp; + rm->m_inc.i_conn = cp->cp_conn; - conn->c_xmit_rm = rm; + cp->cp_xmit_rm = rm; } /* * If not already working on one, grab the next message. * - * c_xmit_rm holds a ref while we're sending this message down + * cp_xmit_rm holds a ref while we're sending this message down * the connction. We can use this ref while holding the * send_sem.. rds_send_reset() is serialized with it. */ @@ -227,10 +231,10 @@ restart: if (batch_count >= send_batch_count) goto over_batch; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); - if (!list_empty(&conn->c_send_queue)) { - rm = list_entry(conn->c_send_queue.next, + if (!list_empty(&cp->cp_send_queue)) { + rm = list_entry(cp->cp_send_queue.next, struct rds_message, m_conn_item); rds_message_addref(rm); @@ -239,10 +243,11 @@ restart: * Move the message from the send queue to the retransmit * list right away. */ - list_move_tail(&rm->m_conn_item, &conn->c_retrans); + list_move_tail(&rm->m_conn_item, + &cp->cp_retrans); } - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); if (!rm) break; @@ -256,32 +261,34 @@ restart: */ if (rm->rdma.op_active && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) list_move(&rm->m_conn_item, &to_be_dropped); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); continue; } /* Require an ACK every once in a while */ len = ntohl(rm->m_inc.i_hdr.h_len); - if (conn->c_unacked_packets == 0 || - conn->c_unacked_bytes < len) { + if (cp->cp_unacked_packets == 0 || + cp->cp_unacked_bytes < len) { __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); - conn->c_unacked_packets = rds_sysctl_max_unacked_packets; - conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; + cp->cp_unacked_packets = + rds_sysctl_max_unacked_packets; + cp->cp_unacked_bytes = + rds_sysctl_max_unacked_bytes; rds_stats_inc(s_send_ack_required); } else { - conn->c_unacked_bytes -= len; - conn->c_unacked_packets--; + cp->cp_unacked_bytes -= len; + cp->cp_unacked_packets--; } - conn->c_xmit_rm = rm; + cp->cp_xmit_rm = rm; } /* The transport either sends the whole rdma or none of it */ - if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { + if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) { rm->m_final_op = &rm->rdma; /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue @@ -293,11 +300,11 @@ restart: wake_up_interruptible(&rm->m_flush_wait); break; } - conn->c_xmit_rdma_sent = 1; + cp->cp_xmit_rdma_sent = 1; } - if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { + if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) { rm->m_final_op = &rm->atomic; /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue @@ -309,7 +316,7 @@ restart: wake_up_interruptible(&rm->m_flush_wait); break; } - conn->c_xmit_atomic_sent = 1; + cp->cp_xmit_atomic_sent = 1; } @@ -335,41 +342,42 @@ restart: rm->data.op_active = 0; } - if (rm->data.op_active && !conn->c_xmit_data_sent) { + if (rm->data.op_active && !cp->cp_xmit_data_sent) { rm->m_final_op = &rm->data; + ret = conn->c_trans->xmit(conn, rm, - conn->c_xmit_hdr_off, - conn->c_xmit_sg, - conn->c_xmit_data_off); + cp->cp_xmit_hdr_off, + cp->cp_xmit_sg, + cp->cp_xmit_data_off); if (ret <= 0) break; - if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) { + if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) { tmp = min_t(int, ret, sizeof(struct rds_header) - - conn->c_xmit_hdr_off); - conn->c_xmit_hdr_off += tmp; + cp->cp_xmit_hdr_off); + cp->cp_xmit_hdr_off += tmp; ret -= tmp; } - sg = &rm->data.op_sg[conn->c_xmit_sg]; + sg = &rm->data.op_sg[cp->cp_xmit_sg]; while (ret) { tmp = min_t(int, ret, sg->length - - conn->c_xmit_data_off); - conn->c_xmit_data_off += tmp; + cp->cp_xmit_data_off); + cp->cp_xmit_data_off += tmp; ret -= tmp; - if (conn->c_xmit_data_off == sg->length) { - conn->c_xmit_data_off = 0; + if (cp->cp_xmit_data_off == sg->length) { + cp->cp_xmit_data_off = 0; sg++; - conn->c_xmit_sg++; - BUG_ON(ret != 0 && - conn->c_xmit_sg == rm->data.op_nents); + cp->cp_xmit_sg++; + BUG_ON(ret != 0 && cp->cp_xmit_sg == + rm->data.op_nents); } } - if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && - (conn->c_xmit_sg == rm->data.op_nents)) - conn->c_xmit_data_sent = 1; + if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) && + (cp->cp_xmit_sg == rm->data.op_nents)) + cp->cp_xmit_data_sent = 1; } /* @@ -377,23 +385,23 @@ restart: * if there is a data op. Thus, if the data is sent (or there was * none), then we're done with the rm. */ - if (!rm->data.op_active || conn->c_xmit_data_sent) { - conn->c_xmit_rm = NULL; - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_atomic_sent = 0; - conn->c_xmit_data_sent = 0; + if (!rm->data.op_active || cp->cp_xmit_data_sent) { + cp->cp_xmit_rm = NULL; + cp->cp_xmit_sg = 0; + cp->cp_xmit_hdr_off = 0; + cp->cp_xmit_data_off = 0; + cp->cp_xmit_rdma_sent = 0; + cp->cp_xmit_atomic_sent = 0; + cp->cp_xmit_data_sent = 0; rds_message_put(rm); } } over_batch: - if (conn->c_trans->xmit_complete) - conn->c_trans->xmit_complete(conn); - release_in_xmit(conn); + if (conn->c_trans->xmit_path_complete) + conn->c_trans->xmit_path_complete(cp); + release_in_xmit(cp); /* Nuke any messages we decided not to retransmit. */ if (!list_empty(&to_be_dropped)) { @@ -421,12 +429,12 @@ over_batch: if (ret == 0) { smp_mb(); if ((test_bit(0, &conn->c_map_queued) || - !list_empty(&conn->c_send_queue)) && - send_gen == conn->c_send_gen) { + !list_empty(&cp->cp_send_queue)) && + send_gen == cp->cp_send_gen) { rds_stats_inc(s_send_lock_queue_raced); if (batch_count < send_batch_count) goto restart; - queue_delayed_work(rds_wq, &conn->c_send_w, 1); + queue_delayed_work(rds_wq, &cp->cp_send_w, 1); } } out: @@ -559,42 +567,6 @@ __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) } /* - * This is called from the IB send completion when we detect - * a RDMA operation that failed with remote access error. - * So speed is not an issue here. - */ -struct rds_message *rds_send_get_message(struct rds_connection *conn, - struct rm_rdma_op *op) -{ - struct rds_message *rm, *tmp, *found = NULL; - unsigned long flags; - - spin_lock_irqsave(&conn->c_lock, flags); - - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (&rm->rdma == op) { - atomic_inc(&rm->m_refcount); - found = rm; - goto out; - } - } - - list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (&rm->rdma == op) { - atomic_inc(&rm->m_refcount); - found = rm; - break; - } - } - -out: - spin_unlock_irqrestore(&conn->c_lock, flags); - - return found; -} -EXPORT_SYMBOL_GPL(rds_send_get_message); - -/* * This removes messages from the socket's list if they're on it. The list * argument must be private to the caller, we must be able to modify it * without locks. The messages must have a reference held for their @@ -684,16 +656,16 @@ unlock_and_drop: * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked * checks the RDS_MSG_HAS_ACK_SEQ bit. */ -void rds_send_drop_acked(struct rds_connection *conn, u64 ack, - is_acked_func is_acked) +void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, + is_acked_func is_acked) { struct rds_message *rm, *tmp; unsigned long flags; LIST_HEAD(list); - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { + list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { if (!rds_send_is_acked(rm, ack, is_acked)) break; @@ -705,17 +677,26 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack, if (!list_empty(&list)) smp_mb__after_atomic(); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); /* now remove the messages from the sock list as needed */ rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); } +EXPORT_SYMBOL_GPL(rds_send_path_drop_acked); + +void rds_send_drop_acked(struct rds_connection *conn, u64 ack, + is_acked_func is_acked) +{ + WARN_ON(conn->c_trans->t_mp_capable); + rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked); +} EXPORT_SYMBOL_GPL(rds_send_drop_acked); void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) { struct rds_message *rm, *tmp; struct rds_connection *conn; + struct rds_conn_path *cp; unsigned long flags; LIST_HEAD(list); @@ -744,22 +725,26 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) list_for_each_entry(rm, &list, m_sock_item) { conn = rm->m_inc.i_conn; + if (conn->c_trans->t_mp_capable) + cp = rm->m_inc.i_conn_path; + else + cp = &conn->c_path[0]; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); /* * Maybe someone else beat us to removing rm from the conn. * If we race with their flag update we'll get the lock and * then really see that the flag has been cleared. */ if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); spin_lock_irqsave(&rm->m_rs_lock, flags); rm->m_rs = NULL; spin_unlock_irqrestore(&rm->m_rs_lock, flags); continue; } list_del_init(&rm->m_conn_item); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); /* * Couldn't grab m_rs_lock in top loop (lock ordering), @@ -808,6 +793,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) * message from the flow with RDS_CANCEL_SENT_TO. */ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, + struct rds_conn_path *cp, struct rds_message *rm, __be16 sport, __be16 dport, int *queued) { @@ -851,13 +837,14 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, trying to minimize the time we hold c_lock */ rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); rm->m_inc.i_conn = conn; + rm->m_inc.i_conn_path = cp; rds_message_addref(rm); - spin_lock(&conn->c_lock); - rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + spin_lock(&cp->cp_lock); + rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++); + list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); set_bit(RDS_MSG_ON_CONN, &rm->m_flags); - spin_unlock(&conn->c_lock); + spin_unlock(&cp->cp_lock); rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", rm, len, rs, rs->rs_snd_bytes, @@ -976,6 +963,29 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, return ret; } +static void rds_send_ping(struct rds_connection *conn); + +static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn) +{ + int hash; + + if (conn->c_npaths == 0) + hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS); + else + hash = RDS_MPATH_HASH(rs, conn->c_npaths); + if (conn->c_npaths == 0 && hash != 0) { + rds_send_ping(conn); + + if (conn->c_npaths == 0) { + wait_event_interruptible(conn->c_hs_waitq, + (conn->c_npaths != 0)); + } + if (conn->c_npaths == 1) + hash = 0; + } + return hash; +} + int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) { struct sock *sk = sock->sk; @@ -989,6 +999,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) int queued = 0, allocated_mr = 0; int nonblock = msg->msg_flags & MSG_DONTWAIT; long timeo = sock_sndtimeo(sk, nonblock); + struct rds_conn_path *cpath; /* Mirror Linux UDP mirror of BSD error message compatibility */ /* XXX: Perhaps MSG_MORE someday */ @@ -1087,15 +1098,19 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } - rds_conn_connect_if_down(conn); + if (conn->c_trans->t_mp_capable) + cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)]; + else + cpath = &conn->c_path[0]; + + rds_conn_path_connect_if_down(cpath); ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); if (ret) { rs->rs_seen_congestion = 1; goto out; } - - while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, + while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued)) { rds_stats_inc(s_send_queue_full); @@ -1105,7 +1120,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) } timeo = wait_event_interruptible_timeout(*sk_sleep(sk), - rds_send_queue_rm(rs, conn, rm, + rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued), @@ -1126,9 +1141,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) */ rds_stats_inc(s_send_queued); - ret = rds_send_xmit(conn); + ret = rds_send_xmit(cpath); if (ret == -ENOMEM || ret == -EAGAIN) - queue_delayed_work(rds_wq, &conn->c_send_w, 1); + queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); rds_message_put(rm); return payload_len; @@ -1146,10 +1161,16 @@ out: } /* - * Reply to a ping packet. + * send out a probe. Can be shared by rds_send_ping, + * rds_send_pong, rds_send_hb. + * rds_send_hb should use h_flags + * RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED + * or + * RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED */ int -rds_send_pong(struct rds_connection *conn, __be16 dport) +rds_send_probe(struct rds_conn_path *cp, __be16 sport, + __be16 dport, u8 h_flags) { struct rds_message *rm; unsigned long flags; @@ -1161,31 +1182,41 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) goto out; } - rm->m_daddr = conn->c_faddr; + rm->m_daddr = cp->cp_conn->c_faddr; rm->data.op_active = 1; - rds_conn_connect_if_down(conn); + rds_conn_path_connect_if_down(cp); - ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); + ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL); if (ret) goto out; - spin_lock_irqsave(&conn->c_lock, flags); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + spin_lock_irqsave(&cp->cp_lock, flags); + list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); set_bit(RDS_MSG_ON_CONN, &rm->m_flags); rds_message_addref(rm); - rm->m_inc.i_conn = conn; + rm->m_inc.i_conn = cp->cp_conn; + rm->m_inc.i_conn_path = cp; + + rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, + cp->cp_next_tx_seq); + rm->m_inc.i_hdr.h_flags |= h_flags; + cp->cp_next_tx_seq++; + + if (RDS_HS_PROBE(sport, dport) && cp->cp_conn->c_trans->t_mp_capable) { + u16 npaths = RDS_MPATH_WORKERS; - rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport, - conn->c_next_tx_seq); - conn->c_next_tx_seq++; - spin_unlock_irqrestore(&conn->c_lock, flags); + rds_message_add_extension(&rm->m_inc.i_hdr, + RDS_EXTHDR_NPATHS, &npaths, + sizeof(npaths)); + } + spin_unlock_irqrestore(&cp->cp_lock, flags); rds_stats_inc(s_send_queued); rds_stats_inc(s_send_pong); /* schedule the send work on rds_wq */ - queue_delayed_work(rds_wq, &conn->c_send_w, 1); + queue_delayed_work(rds_wq, &cp->cp_send_w, 1); rds_message_put(rm); return 0; @@ -1195,3 +1226,25 @@ out: rds_message_put(rm); return ret; } + +int +rds_send_pong(struct rds_conn_path *cp, __be16 dport) +{ + return rds_send_probe(cp, 0, dport, 0); +} + +void +rds_send_ping(struct rds_connection *conn) +{ + unsigned long flags; + struct rds_conn_path *cp = &conn->c_path[0]; + + spin_lock_irqsave(&cp->cp_lock, flags); + if (conn->c_ping_triggered) { + spin_unlock_irqrestore(&cp->cp_lock, flags); + return; + } + conn->c_ping_triggered = 1; + spin_unlock_irqrestore(&cp->cp_lock, flags); + rds_send_probe(&conn->c_path[0], RDS_FLAG_PROBE_PORT, 0, 0); +} |