summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZach Brown <zach.brown@oracle.com>2010-07-15 00:55:35 +0400
committerAndy Grover <andy.grover@oracle.com>2010-09-09 05:16:40 +0400
commitf046011cd73c372267befd10242988eb744649fe (patch)
treed184275400dee81f2a5027728bda849bec338d99
parentef87b7ea39a91906218a262686bcb8bad8b6b46e (diff)
downloadlinux-f046011cd73c372267befd10242988eb744649fe.tar.xz
RDS/IB: track signaled sends
We're seeing bugs today where IB connection shutdown clears the send ring while the tasklet is processing completed sends. Implementation details cause this to dereference a null pointer. Shutdown needs to wait for send completion to stop before tearing down the connection. We can't simply wait for the ring to empty because it may contain unsignaled sends that will never be processed. This patch tracks the number of signaled sends that we've posted and waits for them to complete. It also makes sure that the tasklet has finished executing. Signed-off-by: Zach Brown <zach.brown@oracle.com>
-rw-r--r--net/rds/ib.h1
-rw-r--r--net/rds/ib_cm.c14
-rw-r--r--net/rds/ib_send.c47
3 files changed, 54 insertions, 8 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h
index acda2dbc6576..a13ced504145 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -108,6 +108,7 @@ struct rds_ib_connection {
struct rds_header *i_send_hdrs;
u64 i_send_hdrs_dma;
struct rds_ib_send_work *i_sends;
+ atomic_t i_signaled_sends;
/* rx */
struct tasklet_struct i_recv_tasklet;
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 10f6a8815cd0..123c7d33b54e 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -615,11 +615,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
}
/*
- * Don't wait for the send ring to be empty -- there may be completed
- * non-signaled entries sitting on there. We unmap these below.
+ * We want to wait for tx and rx completion to finish
+ * before we tear down the connection, but we have to be
+ * careful not to get stuck waiting on a send ring that
+ * only has unsignaled sends in it. We've shutdown new
+ * sends before getting here so by waiting for signaled
+ * sends to complete we're ensured that there will be no
+ * more tx processing.
*/
wait_event(rds_ib_ring_empty_wait,
- rds_ib_ring_empty(&ic->i_recv_ring));
+ rds_ib_ring_empty(&ic->i_recv_ring) &&
+ (atomic_read(&ic->i_signaled_sends) == 0));
+ tasklet_kill(&ic->i_recv_tasklet);
if (ic->i_send_hdrs)
ib_dma_free_coherent(dev,
@@ -729,6 +736,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
#ifndef KERNEL_HAS_ATOMIC64
spin_lock_init(&ic->i_ack_lock);
#endif
+ atomic_set(&ic->i_signaled_sends, 0);
/*
* rds_ib_conn_shutdown() waits for these to be emptied so they
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index e88cb4af009b..15f75692574c 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -220,6 +220,18 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
}
/*
+ * The only fast path caller always has a non-zero nr, so we don't
+ * bother testing nr before performing the atomic sub.
+ */
+static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
+{
+ if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
+ waitqueue_active(&rds_ib_ring_empty_wait))
+ wake_up(&rds_ib_ring_empty_wait);
+ BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
+}
+
+/*
* The _oldest/_free ring operations here race cleanly with the alloc/unalloc
* operations performed in the send path. As the sender allocs and potentially
* unallocs the next free entry in the ring it doesn't alter which is
@@ -236,6 +248,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
u32 oldest;
u32 i = 0;
int ret;
+ int nr_sig = 0;
rdsdebug("cq %p conn %p\n", cq, conn);
rds_ib_stats_inc(s_ib_tx_cq_call);
@@ -262,6 +275,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
for (i = 0; i < completed; i++) {
send = &ic->i_sends[oldest];
+ if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+ nr_sig++;
rm = rds_ib_send_unmap_op(ic, send, wc.status);
@@ -282,6 +297,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
}
rds_ib_ring_free(&ic->i_send_ring, completed);
+ rds_ib_sub_signaled(ic, nr_sig);
+ nr_sig = 0;
if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
test_bit(0, &conn->c_map_queued))
@@ -440,9 +457,9 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
}
-static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
- struct rds_ib_send_work *send,
- bool notify)
+static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+ struct rds_ib_send_work *send,
+ bool notify)
{
/*
* We want to delay signaling completions just enough to get
@@ -452,7 +469,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
if (ic->i_unsignaled_wrs-- == 0 || notify) {
ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
send->s_wr.send_flags |= IB_SEND_SIGNALED;
+ return 1;
}
+ return 0;
}
/*
@@ -488,6 +507,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
int bytes_sent = 0;
int ret;
int flow_controlled = 0;
+ int nr_sig = 0;
BUG_ON(off % RDS_FRAG_SIZE);
BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
@@ -645,6 +665,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
+ if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+ nr_sig++;
+
rdsdebug("send %p wr %p num_sge %u next %p\n", send,
&send->s_wr, send->s_wr.num_sge, send->s_wr.next);
@@ -689,6 +712,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
if (ic->i_flowctl && i < credit_alloc)
rds_ib_send_add_credits(conn, credit_alloc - i);
+ if (nr_sig)
+ atomic_add(nr_sig, &ic->i_signaled_sends);
+
/* XXX need to worry about failed_wr and partial sends. */
failed_wr = &first->s_wr;
ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
@@ -699,6 +725,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+ rds_ib_sub_signaled(ic, nr_sig);
if (prev->s_op) {
ic->i_data_op = prev->s_op;
prev->s_op = NULL;
@@ -728,6 +755,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
u32 pos;
u32 work_alloc;
int ret;
+ int nr_sig = 0;
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
@@ -752,7 +780,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
send->s_wr.wr.atomic.compare_add = op->op_swap_add;
send->s_wr.wr.atomic.swap = 0;
}
- rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+ nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
send->s_wr.num_sge = 1;
send->s_wr.next = NULL;
send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
@@ -778,6 +806,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
send->s_sge[0].addr, send->s_sge[0].length);
+ if (nr_sig)
+ atomic_add(nr_sig, &ic->i_signaled_sends);
+
failed_wr = &send->s_wr;
ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
@@ -787,6 +818,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+ rds_ib_sub_signaled(ic, nr_sig);
goto out;
}
@@ -817,6 +849,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
int sent;
int ret;
int num_sge;
+ int nr_sig = 0;
/* map the op the first time we see it */
if (!op->op_mapped) {
@@ -859,7 +892,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
send->s_queued = jiffies;
send->s_op = NULL;
- rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+ nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -910,6 +943,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
work_alloc = i;
}
+ if (nr_sig)
+ atomic_add(nr_sig, &ic->i_signaled_sends);
+
failed_wr = &first->s_wr;
ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
@@ -919,6 +955,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+ rds_ib_sub_signaled(ic, nr_sig);
goto out;
}