summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--net/rds/connection.c1
-rw-r--r--net/rds/rds.h1
-rw-r--r--net/rds/send.c33
3 files changed, 33 insertions, 2 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 7952a5b1b4c4..14f041398ca1 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -193,6 +193,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
}
atomic_set(&conn->c_state, RDS_CONN_DOWN);
+ conn->c_send_gen = 0;
conn->c_reconnect_jiffies = 0;
INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker);
INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker);
diff --git a/net/rds/rds.h b/net/rds/rds.h
index c2a5eef41343..02d8fd5b40c0 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -110,6 +110,7 @@ struct rds_connection {
void *c_transport_data;
atomic_t c_state;
+ unsigned long c_send_gen;
unsigned long c_flags;
unsigned long c_reconnect_jiffies;
struct delayed_work c_send_w;
diff --git a/net/rds/send.c b/net/rds/send.c
index 42f65d4305c8..49f77efd82b9 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -140,8 +140,11 @@ int rds_send_xmit(struct rds_connection *conn)
struct scatterlist *sg;
int ret = 0;
LIST_HEAD(to_be_dropped);
+ int batch_count;
+ unsigned long send_gen = 0;
restart:
+ batch_count = 0;
/*
* sendmsg calls here after having queued its message on the send
@@ -157,6 +160,17 @@ restart:
}
/*
+ * we record the send generation after doing the xmit acquire.
+ * if someone else manages to jump in and do some work, we'll use
+ * this to avoid a goto restart farther down.
+ *
+ * The acquire_in_xmit() check above ensures that only one
+ * caller can increment c_send_gen at any time.
+ */
+ conn->c_send_gen++;
+ send_gen = conn->c_send_gen;
+
+ /*
* rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
* we do the opposite to avoid races.
*/
@@ -202,6 +216,16 @@ restart:
if (!rm) {
unsigned int len;
+ batch_count++;
+
+ /* we want to process as big a batch as we can, but
+ * we also want to avoid softlockups. If we've been
+ * through a lot of messages, lets back off and see
+ * if anyone else jumps in
+ */
+ if (batch_count >= 1024)
+ goto over_batch;
+
spin_lock_irqsave(&conn->c_lock, flags);
if (!list_empty(&conn->c_send_queue)) {
@@ -357,9 +381,9 @@ restart:
}
}
+over_batch:
if (conn->c_trans->xmit_complete)
conn->c_trans->xmit_complete(conn);
-
release_in_xmit(conn);
/* Nuke any messages we decided not to retransmit. */
@@ -380,10 +404,15 @@ restart:
* If the transport cannot continue (i.e ret != 0), then it must
* call us when more room is available, such as from the tx
* completion handler.
+ *
+ * We have an extra generation check here so that if someone manages
+ * to jump in after our release_in_xmit, we'll see that they have done
+ * some work and we will skip our goto
*/
if (ret == 0) {
smp_mb();
- if (!list_empty(&conn->c_send_queue)) {
+ if (!list_empty(&conn->c_send_queue) &&
+ send_gen == conn->c_send_gen) {
rds_stats_inc(s_send_lock_queue_raced);
goto restart;
}