summaryrefslogtreecommitdiff
path: root/net/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/debugfs.c34
-rw-r--r--net/ceph/osd_client.c1164
2 files changed, 587 insertions, 611 deletions
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 6d3ff713edeb..61dbd9de4650 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -182,21 +182,39 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
seq_putc(s, '\n');
}
+static void dump_requests(struct seq_file *s, struct ceph_osd *osd)
+{
+ struct rb_node *n;
+
+ mutex_lock(&osd->lock);
+ for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+
+ dump_request(s, req);
+ }
+
+ mutex_unlock(&osd->lock);
+}
+
static int osdc_show(struct seq_file *s, void *pp)
{
struct ceph_client *client = s->private;
struct ceph_osd_client *osdc = &client->osdc;
- struct rb_node *p;
-
- mutex_lock(&osdc->request_mutex);
- for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
- struct ceph_osd_request *req;
+ struct rb_node *n;
- req = rb_entry(p, struct ceph_osd_request, r_node);
+ down_read(&osdc->lock);
+ seq_printf(s, "REQUESTS %d homeless %d\n",
+ atomic_read(&osdc->num_requests),
+ atomic_read(&osdc->num_homeless));
+ for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
- dump_request(s, req);
+ dump_requests(s, osd);
}
- mutex_unlock(&osdc->request_mutex);
+ dump_requests(s, &osdc->homeless_osd);
+
+ up_read(&osdc->lock);
return 0;
}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index d1c8e06f1261..4c856c87b1a9 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -25,16 +25,6 @@ static struct kmem_cache *ceph_osd_request_cache;
static const struct ceph_connection_operations osd_con_ops;
-static void __send_queued(struct ceph_osd_client *osdc);
-static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
-static void __register_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req);
-static void __unregister_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req);
-static void __unregister_linger_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req);
-static void __enqueue_request(struct ceph_osd_request *req);
-
/*
* Implement client access to distributed object storage cluster.
*
@@ -53,6 +43,43 @@ static void __enqueue_request(struct ceph_osd_request *req);
* channel with an OSD is reset.
*/
+static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
+static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
+
+#if 1
+static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
+{
+ bool wrlocked = true;
+
+ if (unlikely(down_read_trylock(sem))) {
+ wrlocked = false;
+ up_read(sem);
+ }
+
+ return wrlocked;
+}
+static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
+{
+ WARN_ON(!rwsem_is_locked(&osdc->lock));
+}
+static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
+{
+ WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
+}
+static inline void verify_osd_locked(struct ceph_osd *osd)
+{
+ struct ceph_osd_client *osdc = osd->o_osdc;
+
+ WARN_ON(!(mutex_is_locked(&osd->lock) &&
+ rwsem_is_locked(&osdc->lock)) &&
+ !rwsem_is_wrlocked(&osdc->lock));
+}
+#else
+static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
+static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
+static inline void verify_osd_locked(struct ceph_osd *osd) { }
+#endif
+
/*
* calculate the mapping of a file extent onto an object, and fill out the
* request accordingly. shorten extent as necessary if it crosses an
@@ -336,18 +363,14 @@ static void ceph_osdc_release_request(struct kref *kref)
dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
req->r_request, req->r_reply);
WARN_ON(!RB_EMPTY_NODE(&req->r_node));
- WARN_ON(!list_empty(&req->r_req_lru_item));
- WARN_ON(!list_empty(&req->r_osd_item));
WARN_ON(!list_empty(&req->r_linger_item));
WARN_ON(!list_empty(&req->r_linger_osd_item));
WARN_ON(req->r_osd);
if (req->r_request)
ceph_msg_put(req->r_request);
- if (req->r_reply) {
- ceph_msg_revoke_incoming(req->r_reply);
+ if (req->r_reply)
ceph_msg_put(req->r_reply);
- }
for (which = 0; which < req->r_num_ops; which++)
osd_req_op_data_release(req, which);
@@ -418,8 +441,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_unsafe_item);
INIT_LIST_HEAD(&req->r_linger_item);
INIT_LIST_HEAD(&req->r_linger_osd_item);
- INIT_LIST_HEAD(&req->r_req_lru_item);
- INIT_LIST_HEAD(&req->r_osd_item);
target_init(&req->r_t);
@@ -869,141 +890,11 @@ static bool osd_homeless(struct ceph_osd *osd)
return osd->o_osd == CEPH_HOMELESS_OSD;
}
-static struct ceph_osd_request *
-__lookup_request_ge(struct ceph_osd_client *osdc,
- u64 tid)
-{
- struct ceph_osd_request *req;
- struct rb_node *n = osdc->requests.rb_node;
-
- while (n) {
- req = rb_entry(n, struct ceph_osd_request, r_node);
- if (tid < req->r_tid) {
- if (!n->rb_left)
- return req;
- n = n->rb_left;
- } else if (tid > req->r_tid) {
- n = n->rb_right;
- } else {
- return req;
- }
- }
- return NULL;
-}
-
-static void __kick_linger_request(struct ceph_osd_request *req)
-{
- struct ceph_osd_client *osdc = req->r_osdc;
- struct ceph_osd *osd = req->r_osd;
-
- /*
- * Linger requests need to be resent with a new tid to avoid
- * the dup op detection logic on the OSDs. Achieve this with
- * a re-register dance instead of open-coding.
- */
- ceph_osdc_get_request(req);
- if (!list_empty(&req->r_linger_item))
- __unregister_linger_request(osdc, req);
- else
- __unregister_request(osdc, req);
- __register_request(osdc, req);
- ceph_osdc_put_request(req);
-
- /*
- * Unless request has been registered as both normal and
- * lingering, __unregister{,_linger}_request clears r_osd.
- * However, here we need to preserve r_osd to make sure we
- * requeue on the same OSD.
- */
- WARN_ON(req->r_osd || !osd);
- req->r_osd = osd;
-
- dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
- __enqueue_request(req);
-}
-
-/*
- * Resubmit requests pending on the given osd.
- */
-static void __kick_osd_requests(struct ceph_osd_client *osdc,
- struct ceph_osd *osd)
-{
- struct ceph_osd_request *req, *nreq;
- LIST_HEAD(resend);
- LIST_HEAD(resend_linger);
- int err;
-
- dout("%s osd%d\n", __func__, osd->o_osd);
- err = __reset_osd(osdc, osd);
- if (err)
- return;
-
- /*
- * Build up a list of requests to resend by traversing the
- * osd's list of requests. Requests for a given object are
- * sent in tid order, and that is also the order they're
- * kept on this list. Therefore all requests that are in
- * flight will be found first, followed by all requests that
- * have not yet been sent. And to resend requests while
- * preserving this order we will want to put any sent
- * requests back on the front of the osd client's unsent
- * list.
- *
- * So we build a separate ordered list of already-sent
- * requests for the affected osd and splice it onto the
- * front of the osd client's unsent list. Once we've seen a
- * request that has not yet been sent we're done. Those
- * requests are already sitting right where they belong.
- */
- list_for_each_entry(req, &osd->o_requests, r_osd_item) {
- if (!req->r_sent)
- break;
-
- if (!req->r_linger) {
- dout("%s requeueing %p tid %llu\n", __func__, req,
- req->r_tid);
- list_move_tail(&req->r_req_lru_item, &resend);
- req->r_flags |= CEPH_OSD_FLAG_RETRY;
- } else {
- list_move_tail(&req->r_req_lru_item, &resend_linger);
- }
- }
- list_splice(&resend, &osdc->req_unsent);
-
- /*
- * Both registered and not yet registered linger requests are
- * enqueued with a new tid on the same OSD. We add/move them
- * to req_unsent/o_requests at the end to keep things in tid
- * order.
- */
- list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
- r_linger_osd_item) {
- WARN_ON(!list_empty(&req->r_req_lru_item));
- __kick_linger_request(req);
- }
-
- list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
- __kick_linger_request(req);
-}
-
-/*
- * If the osd connection drops, we need to resubmit all requests.
- */
-static void osd_reset(struct ceph_connection *con)
+static bool osd_registered(struct ceph_osd *osd)
{
- struct ceph_osd *osd = con->private;
- struct ceph_osd_client *osdc;
+ verify_osdc_locked(osd->o_osdc);
- if (!osd)
- return;
- dout("osd_reset osd%d\n", osd->o_osd);
- osdc = osd->o_osdc;
- down_read(&osdc->map_sem);
- mutex_lock(&osdc->request_mutex);
- __kick_osd_requests(osdc, osd);
- __send_queued(osdc);
- mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
+ return !RB_EMPTY_NODE(&osd->o_node);
}
/*
@@ -1013,17 +904,18 @@ static void osd_init(struct ceph_osd *osd)
{
atomic_set(&osd->o_ref, 1);
RB_CLEAR_NODE(&osd->o_node);
- INIT_LIST_HEAD(&osd->o_requests);
+ osd->o_requests = RB_ROOT;
INIT_LIST_HEAD(&osd->o_linger_requests);
INIT_LIST_HEAD(&osd->o_osd_lru);
INIT_LIST_HEAD(&osd->o_keepalive_item);
osd->o_incarnation = 1;
+ mutex_init(&osd->lock);
}
static void osd_cleanup(struct ceph_osd *osd)
{
WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
- WARN_ON(!list_empty(&osd->o_requests));
+ WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
WARN_ON(!list_empty(&osd->o_linger_requests));
WARN_ON(!list_empty(&osd->o_osd_lru));
WARN_ON(!list_empty(&osd->o_keepalive_item));
@@ -1077,30 +969,6 @@ static void put_osd(struct ceph_osd *osd)
DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
-/*
- * remove an osd from our map
- */
-static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
-{
- dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
- WARN_ON(!list_empty(&osd->o_requests));
- WARN_ON(!list_empty(&osd->o_linger_requests));
-
- list_del_init(&osd->o_osd_lru);
- erase_osd(&osdc->osds, osd);
-}
-
-static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
-{
- dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
-
- if (!RB_EMPTY_NODE(&osd->o_node)) {
- ceph_con_close(&osd->o_con);
- __remove_osd(osdc, osd);
- put_osd(osd);
- }
-}
-
static void __move_osd_to_lru(struct ceph_osd *osd)
{
struct ceph_osd_client *osdc = osd->o_osdc;
@@ -1117,7 +985,7 @@ static void __move_osd_to_lru(struct ceph_osd *osd)
static void maybe_move_osd_to_lru(struct ceph_osd *osd)
{
- if (list_empty(&osd->o_requests) &&
+ if (RB_EMPTY_ROOT(&osd->o_requests) &&
list_empty(&osd->o_linger_requests))
__move_osd_to_lru(osd);
}
@@ -1135,29 +1003,63 @@ static void __remove_osd_from_lru(struct ceph_osd *osd)
}
/*
+ * Close the connection and assign any leftover requests to the
+ * homeless session.
+ */
+static void close_osd(struct ceph_osd *osd)
+{
+ struct ceph_osd_client *osdc = osd->o_osdc;
+ struct rb_node *n;
+
+ verify_osdc_wrlocked(osdc);
+ dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+ ceph_con_close(&osd->o_con);
+
+ for (n = rb_first(&osd->o_requests); n; ) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+
+ n = rb_next(n); /* unlink_request() */
+
+ dout(" reassigning req %p tid %llu\n", req, req->r_tid);
+ unlink_request(osd, req);
+ link_request(&osdc->homeless_osd, req);
+ }
+
+ __remove_osd_from_lru(osd);
+ erase_osd(&osdc->osds, osd);
+ put_osd(osd);
+}
+
+/*
* reset osd connect
*/
-static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static int reopen_osd(struct ceph_osd *osd)
{
struct ceph_entity_addr *peer_addr;
- dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
- if (list_empty(&osd->o_requests) &&
+ dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+ if (RB_EMPTY_ROOT(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) {
- remove_osd(osdc, osd);
+ close_osd(osd);
return -ENODEV;
}
- peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+ peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
!ceph_con_opened(&osd->o_con)) {
- struct ceph_osd_request *req;
+ struct rb_node *n;
dout("osd addr hasn't changed and connection never opened, "
"letting msgr retry\n");
/* touch each r_stamp for handle_timeout()'s benfit */
- list_for_each_entry(req, &osd->o_requests, r_osd_item)
+ for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
req->r_stamp = jiffies;
+ }
return -EAGAIN;
}
@@ -1169,73 +1071,84 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
return 0;
}
-/*
- * Register request, assign tid. If this is the first request, set up
- * the timeout event.
- */
-static void __register_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
-{
- req->r_tid = ++osdc->last_tid;
- req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
- dout("__register_request %p tid %lld\n", req, req->r_tid);
- insert_request(&osdc->requests, req);
- ceph_osdc_get_request(req);
- osdc->num_requests++;
-}
-
-/*
- * called under osdc->request_mutex
- */
-static void __unregister_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
+ bool wrlocked)
{
- if (RB_EMPTY_NODE(&req->r_node)) {
- dout("__unregister_request %p tid %lld not registered\n",
- req, req->r_tid);
- return;
- }
+ struct ceph_osd *osd;
- dout("__unregister_request %p tid %lld\n", req, req->r_tid);
- erase_request(&osdc->requests, req);
- osdc->num_requests--;
+ if (wrlocked)
+ verify_osdc_wrlocked(osdc);
+ else
+ verify_osdc_locked(osdc);
- if (req->r_osd) {
- /* make sure the original request isn't in flight. */
- ceph_msg_revoke(req->r_request);
+ if (o != CEPH_HOMELESS_OSD)
+ osd = lookup_osd(&osdc->osds, o);
+ else
+ osd = &osdc->homeless_osd;
+ if (!osd) {
+ if (!wrlocked)
+ return ERR_PTR(-EAGAIN);
- list_del_init(&req->r_osd_item);
- maybe_move_osd_to_lru(req->r_osd);
- if (list_empty(&req->r_linger_osd_item))
- req->r_osd = NULL;
+ osd = create_osd(osdc, o);
+ insert_osd(&osdc->osds, osd);
+ ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
+ &osdc->osdmap->osd_addr[osd->o_osd]);
}
- list_del_init(&req->r_req_lru_item);
- ceph_osdc_put_request(req);
+ dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
+ return osd;
}
/*
- * Cancel a previously queued request message
+ * Create request <-> OSD session relation.
+ *
+ * @req has to be assigned a tid, @osd may be homeless.
*/
-static void __cancel_request(struct ceph_osd_request *req)
+static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
{
- if (req->r_sent && req->r_osd) {
- ceph_msg_revoke(req->r_request);
- req->r_sent = 0;
- }
+ verify_osd_locked(osd);
+ WARN_ON(!req->r_tid || req->r_osd);
+ dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
+ req, req->r_tid);
+
+ if (!osd_homeless(osd))
+ __remove_osd_from_lru(osd);
+ else
+ atomic_inc(&osd->o_osdc->num_homeless);
+
+ get_osd(osd);
+ insert_request(&osd->o_requests, req);
+ req->r_osd = osd;
}
-static void __register_linger_request(struct ceph_osd_client *osdc,
+static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
+{
+ verify_osd_locked(osd);
+ WARN_ON(req->r_osd != osd);
+ dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
+ req, req->r_tid);
+
+ req->r_osd = NULL;
+ erase_request(&osd->o_requests, req);
+ put_osd(osd);
+
+ if (!osd_homeless(osd))
+ maybe_move_osd_to_lru(osd);
+ else
+ atomic_dec(&osd->o_osdc->num_homeless);
+}
+
+static void __register_linger_request(struct ceph_osd *osd,
struct ceph_osd_request *req)
{
dout("%s %p tid %llu\n", __func__, req, req->r_tid);
WARN_ON(!req->r_linger);
ceph_osdc_get_request(req);
- list_add_tail(&req->r_linger_item, &osdc->req_linger);
- if (req->r_osd)
- list_add_tail(&req->r_linger_osd_item,
- &req->r_osd->o_linger_requests);
+ list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger);
+ list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests);
+ __remove_osd_from_lru(osd);
+ req->r_osd = osd;
}
static void __unregister_linger_request(struct ceph_osd_client *osdc,
@@ -1255,7 +1168,7 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
if (req->r_osd) {
list_del_init(&req->r_linger_osd_item);
maybe_move_osd_to_lru(req->r_osd);
- if (list_empty(&req->r_osd_item))
+ if (RB_EMPTY_ROOT(&req->r_osd->o_requests))
req->r_osd = NULL;
}
ceph_osdc_put_request(req);
@@ -1291,11 +1204,20 @@ static bool have_pool_full(struct ceph_osd_client *osdc)
return false;
}
+static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
+{
+ struct ceph_pg_pool_info *pi;
+
+ pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
+ if (!pi)
+ return false;
+
+ return __pool_full(pi);
+}
+
/*
* Returns whether a request should be blocked from being sent
* based on the current osdmap and osd_client settings.
- *
- * Caller should hold map_sem for read.
*/
static bool target_should_be_paused(struct ceph_osd_client *osdc,
const struct ceph_osd_request_target *t,
@@ -1421,87 +1343,6 @@ out:
return ct_res;
}
-static void __enqueue_request(struct ceph_osd_request *req)
-{
- struct ceph_osd_client *osdc = req->r_osdc;
-
- dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
- req->r_osd ? req->r_osd->o_osd : -1);
-
- if (req->r_osd) {
- __remove_osd_from_lru(req->r_osd);
- list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
- list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
- } else {
- list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
- }
-}
-
-/*
- * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
- * (as needed), and set the request r_osd appropriately. If there is
- * no up osd, set r_osd to NULL. Move the request to the appropriate list
- * (unsent, homeless) or leave on in-flight lru.
- *
- * Return 0 if unchanged, 1 if changed, or negative on error.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static int __map_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req, int force_resend)
-{
- enum calc_target_result ct_res;
- int err;
-
- dout("map_request %p tid %lld\n", req, req->r_tid);
-
- ct_res = calc_target(osdc, &req->r_t, NULL, force_resend);
- switch (ct_res) {
- case CALC_TARGET_POOL_DNE:
- list_move(&req->r_req_lru_item, &osdc->req_notarget);
- return -EIO;
- case CALC_TARGET_NO_ACTION:
- return 0; /* no change */
- default:
- BUG_ON(ct_res != CALC_TARGET_NEED_RESEND);
- }
-
- dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
- req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, req->r_t.osd,
- req->r_osd ? req->r_osd->o_osd : -1);
-
- if (req->r_osd) {
- __cancel_request(req);
- list_del_init(&req->r_osd_item);
- list_del_init(&req->r_linger_osd_item);
- req->r_osd = NULL;
- }
-
- req->r_osd = lookup_osd(&osdc->osds, req->r_t.osd);
- if (!req->r_osd && req->r_t.osd >= 0) {
- err = -ENOMEM;
- req->r_osd = create_osd(osdc, req->r_t.osd);
- if (!req->r_osd) {
- list_move(&req->r_req_lru_item, &osdc->req_notarget);
- goto out;
- }
-
- dout("map_request osd %p is osd%d\n", req->r_osd,
- req->r_osd->o_osd);
- insert_osd(&osdc->osds, req->r_osd);
-
- ceph_con_open(&req->r_osd->o_con,
- CEPH_ENTITY_TYPE_OSD, req->r_osd->o_osd,
- &osdc->osdmap->osd_addr[req->r_osd->o_osd]);
- }
-
- __enqueue_request(req);
- err = 1; /* osd or pg changed */
-
-out:
- return err;
-}
-
static void setup_request_data(struct ceph_osd_request *req,
struct ceph_msg *msg)
{
@@ -1648,8 +1489,16 @@ static void send_request(struct ceph_osd_request *req)
{
struct ceph_osd *osd = req->r_osd;
+ verify_osd_locked(osd);
WARN_ON(osd->o_osd != req->r_t.osd);
+ /*
+ * We may have a previously queued request message hanging
+ * around. Cancel it to avoid corrupting the msgr.
+ */
+ if (req->r_sent)
+ ceph_msg_revoke(req->r_request);
+
req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
if (req->r_attempts)
req->r_flags |= CEPH_OSD_FLAG_RETRY;
@@ -1671,24 +1520,11 @@ static void send_request(struct ceph_osd_request *req)
ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
}
-/*
- * Send any requests in the queue (req_unsent).
- */
-static void __send_queued(struct ceph_osd_client *osdc)
-{
- struct ceph_osd_request *req, *tmp;
-
- dout("__send_queued\n");
- list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
- list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
- send_request(req);
- }
-}
-
static void maybe_request_map(struct ceph_osd_client *osdc)
{
bool continuous = false;
+ verify_osdc_locked(osdc);
WARN_ON(!osdc->osdmap->epoch);
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
@@ -1705,38 +1541,121 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
ceph_monc_renew_subs(&osdc->client->monc);
}
-/*
- * Caller should hold map_sem for read and request_mutex.
- */
-static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req,
- bool nofail)
+static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
{
- int rc;
+ struct ceph_osd_client *osdc = req->r_osdc;
+ struct ceph_osd *osd;
+ bool need_send = false;
+ bool promoted = false;
- __register_request(osdc, req);
- req->r_sent = 0;
- req->r_got_reply = 0;
- rc = __map_request(osdc, req, 0);
- if (rc < 0) {
- if (nofail) {
- dout("osdc_start_request failed map, "
- " will retry %lld\n", req->r_tid);
- rc = 0;
- } else {
- __unregister_request(osdc, req);
- }
- return rc;
+ WARN_ON(req->r_tid || req->r_got_reply);
+ dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
+
+again:
+ calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+ osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
+ if (IS_ERR(osd)) {
+ WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
+ goto promote;
}
- if (req->r_osd == NULL) {
- dout("send_request %p no up osds in pg\n", req);
- ceph_monc_request_next_osdmap(&osdc->client->monc);
+ if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
+ dout("req %p pausewr\n", req);
+ req->r_t.paused = true;
+ maybe_request_map(osdc);
+ } else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
+ dout("req %p pauserd\n", req);
+ req->r_t.paused = true;
+ maybe_request_map(osdc);
+ } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+ !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
+ CEPH_OSD_FLAG_FULL_FORCE)) &&
+ (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+ pool_full(osdc, req->r_t.base_oloc.pool))) {
+ dout("req %p full/pool_full\n", req);
+ pr_warn_ratelimited("FULL or reached pool quota\n");
+ req->r_t.paused = true;
+ maybe_request_map(osdc);
+ } else if (!osd_homeless(osd)) {
+ need_send = true;
} else {
- __send_queued(osdc);
+ maybe_request_map(osdc);
}
- return 0;
+ mutex_lock(&osd->lock);
+ /*
+ * Assign the tid atomically with send_request() to protect
+ * multiple writes to the same object from racing with each
+ * other, resulting in out of order ops on the OSDs.
+ */
+ req->r_tid = atomic64_inc_return(&osdc->last_tid);
+ link_request(osd, req);
+ if (need_send)
+ send_request(req);
+ mutex_unlock(&osd->lock);
+
+ if (promoted)
+ downgrade_write(&osdc->lock);
+ return;
+
+promote:
+ up_read(&osdc->lock);
+ down_write(&osdc->lock);
+ wrlocked = true;
+ promoted = true;
+ goto again;
+}
+
+static void account_request(struct ceph_osd_request *req)
+{
+ unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+
+ if (req->r_flags & CEPH_OSD_FLAG_READ) {
+ WARN_ON(req->r_flags & mask);
+ req->r_flags |= CEPH_OSD_FLAG_ACK;
+ } else if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+ WARN_ON(!(req->r_flags & mask));
+ else
+ WARN_ON(1);
+
+ WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask);
+ atomic_inc(&req->r_osdc->num_requests);
+}
+
+static void submit_request(struct ceph_osd_request *req, bool wrlocked)
+{
+ ceph_osdc_get_request(req);
+ account_request(req);
+ __submit_request(req, wrlocked);
+}
+
+static void __finish_request(struct ceph_osd_request *req)
+{
+ struct ceph_osd_client *osdc = req->r_osdc;
+ struct ceph_osd *osd = req->r_osd;
+
+ verify_osd_locked(osd);
+ dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+
+ unlink_request(osd, req);
+ atomic_dec(&osdc->num_requests);
+
+ /*
+ * If an OSD has failed or returned and a request has been sent
+ * twice, it's possible to get a reply and end up here while the
+ * request message is queued for delivery. We will ignore the
+ * reply, so not a big deal, but better to try and catch it.
+ */
+ ceph_msg_revoke(req->r_request);
+ ceph_msg_revoke_incoming(req->r_reply);
+}
+
+static void finish_request(struct ceph_osd_request *req)
+{
+ __finish_request(req);
+ ceph_osdc_put_request(req);
}
static void __complete_request(struct ceph_osd_request *req)
@@ -1747,6 +1666,13 @@ static void __complete_request(struct ceph_osd_request *req)
complete_all(&req->r_completion);
}
+static void cancel_request(struct ceph_osd_request *req)
+{
+ dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+
+ finish_request(req);
+}
+
/*
* Timeout callback, called every N seconds. When 1 or more OSD
* requests has been active for more than N seconds, we send a keepalive
@@ -1758,44 +1684,49 @@ static void handle_timeout(struct work_struct *work)
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
struct ceph_options *opts = osdc->client->options;
- struct ceph_osd_request *req;
- struct ceph_osd *osd;
- struct list_head slow_osds;
- dout("timeout\n");
- down_read(&osdc->map_sem);
-
- ceph_monc_request_next_osdmap(&osdc->client->monc);
+ unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
+ LIST_HEAD(slow_osds);
+ struct rb_node *n, *p;
- mutex_lock(&osdc->request_mutex);
+ dout("%s osdc %p\n", __func__, osdc);
+ down_write(&osdc->lock);
/*
* ping osds that are a bit slow. this ensures that if there
* is a break in the TCP connection we will notice, and reopen
* a connection with that osd (from the fault callback).
*/
- INIT_LIST_HEAD(&slow_osds);
- list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
- if (time_before(jiffies,
- req->r_stamp + opts->osd_keepalive_timeout))
- break;
+ for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+ bool found = false;
+
+ for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
+ struct ceph_osd_request *req =
+ rb_entry(p, struct ceph_osd_request, r_node);
+
+ if (time_before(req->r_stamp, cutoff)) {
+ dout(" req %p tid %llu on osd%d is laggy\n",
+ req, req->r_tid, osd->o_osd);
+ found = true;
+ }
+ }
- osd = req->r_osd;
- BUG_ON(!osd);
- dout(" tid %llu is slow, will send keepalive on osd%d\n",
- req->r_tid, osd->o_osd);
- list_move_tail(&osd->o_keepalive_item, &slow_osds);
+ if (found)
+ list_move_tail(&osd->o_keepalive_item, &slow_osds);
}
+
+ if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
+ maybe_request_map(osdc);
+
while (!list_empty(&slow_osds)) {
- osd = list_entry(slow_osds.next, struct ceph_osd,
- o_keepalive_item);
+ struct ceph_osd *osd = list_first_entry(&slow_osds,
+ struct ceph_osd,
+ o_keepalive_item);
list_del_init(&osd->o_keepalive_item);
ceph_con_keepalive(&osd->o_con);
}
- __send_queued(osdc);
- mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
-
+ up_write(&osdc->lock);
schedule_delayed_work(&osdc->timeout_work,
osdc->client->options->osd_keepalive_timeout);
}
@@ -1809,18 +1740,17 @@ static void handle_osds_timeout(struct work_struct *work)
struct ceph_osd *osd, *nosd;
dout("%s osdc %p\n", __func__, osdc);
- down_read(&osdc->map_sem);
- mutex_lock(&osdc->request_mutex);
-
+ down_write(&osdc->lock);
list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
if (time_before(jiffies, osd->lru_ttl))
break;
- remove_osd(osdc, osd);
+ WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
+ WARN_ON(!list_empty(&osd->o_linger_requests));
+ close_osd(osd);
}
- mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
+ up_write(&osdc->lock);
schedule_delayed_work(&osdc->osds_timeout_work,
round_jiffies_relative(delay));
}
@@ -2045,8 +1975,9 @@ static bool done_request(const struct ceph_osd_request *req,
* when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
* r_safe_completion r_safe_completion
*/
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
{
+ struct ceph_osd_client *osdc = osd->o_osdc;
struct ceph_osd_request *req;
struct MOSDOpReply m;
u64 tid = le64_to_cpu(msg->hdr.tid);
@@ -2057,14 +1988,19 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
dout("%s msg %p tid %llu\n", __func__, msg, tid);
- down_read(&osdc->map_sem);
- mutex_lock(&osdc->request_mutex);
- req = lookup_request(&osdc->requests, tid);
+ down_read(&osdc->lock);
+ if (!osd_registered(osd)) {
+ dout("%s osd%d unknown\n", __func__, osd->o_osd);
+ goto out_unlock_osdc;
+ }
+ WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
+
+ mutex_lock(&osd->lock);
+ req = lookup_request(&osd->o_requests, tid);
if (!req) {
- dout("%s no tid %llu\n", __func__, tid);
- goto out_unlock;
+ dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
+ goto out_unlock_session;
}
- ceph_osdc_get_request(req);
ret = decode_MOSDOpReply(msg, &m);
if (ret) {
@@ -2083,7 +2019,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
req, req->r_tid, m.retry_attempt,
req->r_attempts - 1);
- goto out_put;
+ goto out_unlock_session;
}
} else {
WARN_ON(1); /* MOSDOpReply v4 is assumed */
@@ -2092,22 +2028,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
if (!ceph_oloc_empty(&m.redirect.oloc)) {
dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
m.redirect.oloc.pool);
- __unregister_request(osdc, req);
+ unlink_request(osd, req);
+ mutex_unlock(&osd->lock);
ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
-
- /*
- * Start redirect requests with nofail=true. If
- * mapping fails, request will end up on the notarget
- * list, waiting for the new osdmap (which can take
- * a while), even though the original request mapped
- * successfully. In the future we might want to follow
- * original request's nofail setting here.
- */
- ret = __ceph_osdc_start_request(osdc, req, true);
- BUG_ON(ret);
-
- goto out_put;
+ req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
+ req->r_tid = 0;
+ __submit_request(req, false);
+ goto out_unlock_osdc;
}
if (m.num_ops != req->r_num_ops) {
@@ -2137,19 +2065,19 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
req->r_got_reply = true;
} else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
dout("req %p tid %llu dup ack\n", req, req->r_tid);
- goto out_put;
+ goto out_unlock_session;
}
if (done_request(req, &m)) {
- __unregister_request(osdc, req);
+ __finish_request(req);
if (req->r_linger) {
WARN_ON(req->r_unsafe_callback);
- __register_linger_request(osdc, req);
+ __register_linger_request(osd, req);
}
}
- mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
+ mutex_unlock(&osd->lock);
+ up_read(&osdc->lock);
if (done_request(req, &m)) {
if (already_acked && req->r_unsafe_callback) {
@@ -2175,14 +2103,13 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
fail_request:
req->r_result = -EIO;
- __unregister_request(osdc, req);
+ __finish_request(req);
__complete_request(req);
complete_all(&req->r_safe_completion);
-out_put:
- ceph_osdc_put_request(req);
-out_unlock:
- mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
+out_unlock_session:
+ mutex_unlock(&osd->lock);
+out_unlock_osdc:
+ up_read(&osdc->lock);
}
static void set_pool_was_full(struct ceph_osd_client *osdc)
@@ -2197,126 +2124,66 @@ static void set_pool_was_full(struct ceph_osd_client *osdc)
}
}
-static void reset_changed_osds(struct ceph_osd_client *osdc)
+static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
{
- struct rb_node *p, *n;
+ struct ceph_pg_pool_info *pi;
- dout("%s %p\n", __func__, osdc);
- for (p = rb_first(&osdc->osds); p; p = n) {
- struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
+ pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
+ if (!pi)
+ return false;
- n = rb_next(p);
- if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
- memcmp(&osd->o_con.peer_addr,
- ceph_osd_addr(osdc->osdmap,
- osd->o_osd),
- sizeof(struct ceph_entity_addr)) != 0)
- __reset_osd(osdc, osd);
- }
+ return pi->was_full && !__pool_full(pi);
}
/*
- * Requeue requests whose mapping to an OSD has changed. If requests map to
- * no osd, request a new map.
- *
- * Caller should hold map_sem for read.
+ * Requeue requests whose mapping to an OSD has changed.
*/
-static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
- bool force_resend_writes)
+static void scan_requests(struct ceph_osd *osd,
+ bool force_resend,
+ bool cleared_full,
+ bool check_pool_cleared_full,
+ struct rb_root *need_resend,
+ struct list_head *need_resend_linger)
{
- struct ceph_osd_request *req, *nreq;
- struct rb_node *p;
- int needmap = 0;
- int err;
- bool force_resend_req;
-
- dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
- force_resend_writes ? " (force resend writes)" : "");
- mutex_lock(&osdc->request_mutex);
- for (p = rb_first(&osdc->requests); p; ) {
- req = rb_entry(p, struct ceph_osd_request, r_node);
- p = rb_next(p);
-
- /*
- * For linger requests that have not yet been
- * registered, move them to the linger list; they'll
- * be sent to the osd in the loop below. Unregister
- * the request before re-registering it as a linger
- * request to ensure the __map_request() below
- * will decide it needs to be sent.
- */
- if (req->r_linger && list_empty(&req->r_linger_item)) {
- dout("%p tid %llu restart on osd%d\n",
- req, req->r_tid,
- req->r_osd ? req->r_osd->o_osd : -1);
- ceph_osdc_get_request(req);
- __unregister_request(osdc, req);
- __register_linger_request(osdc, req);
- ceph_osdc_put_request(req);
- continue;
- }
-
- force_resend_req = force_resend ||
- (force_resend_writes &&
- req->r_flags & CEPH_OSD_FLAG_WRITE);
- err = __map_request(osdc, req, force_resend_req);
- if (err < 0)
- continue; /* error */
- if (req->r_osd == NULL) {
- dout("%p tid %llu maps to no osd\n", req, req->r_tid);
- needmap++; /* request a newer map */
- } else if (err > 0) {
- if (!req->r_linger) {
- dout("%p tid %llu requeued on osd%d\n", req,
- req->r_tid,
- req->r_osd ? req->r_osd->o_osd : -1);
- req->r_flags |= CEPH_OSD_FLAG_RETRY;
- }
- }
- }
-
- list_for_each_entry_safe(req, nreq, &osdc->req_linger,
- r_linger_item) {
- dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
-
- err = __map_request(osdc, req,
- force_resend || force_resend_writes);
- dout("__map_request returned %d\n", err);
- if (err < 0)
- continue; /* hrm! */
- if (req->r_osd == NULL || err > 0) {
- if (req->r_osd == NULL) {
- dout("lingering %p tid %llu maps to no osd\n",
- req, req->r_tid);
- /*
- * A homeless lingering request makes
- * no sense, as it's job is to keep
- * a particular OSD connection open.
- * Request a newer map and kick the
- * request, knowing that it won't be
- * resent until we actually get a map
- * that can tell us where to send it.
- */
- needmap++;
- }
-
- dout("kicking lingering %p tid %llu osd%d\n", req,
- req->r_tid, req->r_osd ? req->r_osd->o_osd : -1);
- __register_request(osdc, req);
- __unregister_linger_request(osdc, req);
+ struct ceph_osd_client *osdc = osd->o_osdc;
+ struct rb_node *n;
+ bool force_resend_writes;
+
+ for (n = rb_first(&osd->o_requests); n; ) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+ enum calc_target_result ct_res;
+
+ n = rb_next(n); /* unlink_request() */
+
+ dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+ ct_res = calc_target(osdc, &req->r_t,
+ &req->r_last_force_resend, false);
+ switch (ct_res) {
+ case CALC_TARGET_NO_ACTION:
+ force_resend_writes = cleared_full ||
+ (check_pool_cleared_full &&
+ pool_cleared_full(osdc, req->r_t.base_oloc.pool));
+ if (!force_resend &&
+ (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
+ !force_resend_writes))
+ break;
+
+ /* fall through */
+ case CALC_TARGET_NEED_RESEND:
+ unlink_request(osd, req);
+ insert_request(need_resend, req);
+ break;
+ case CALC_TARGET_POOL_DNE:
+ break;
}
}
- reset_changed_osds(osdc);
- mutex_unlock(&osdc->request_mutex);
-
- if (needmap) {
- dout("%d requests for down osds, need new map\n", needmap);
- ceph_monc_request_next_osdmap(&osdc->client->monc);
- }
}
static int handle_one_map(struct ceph_osd_client *osdc,
- void *p, void *end, bool incremental)
+ void *p, void *end, bool incremental,
+ struct rb_root *need_resend,
+ struct list_head *need_resend_linger)
{
struct ceph_osdmap *newmap;
struct rb_node *n;
@@ -2362,11 +2229,51 @@ static int handle_one_map(struct ceph_osd_client *osdc,
}
was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
- kick_requests(osdc, skipped_map, was_full);
+ scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
+ need_resend, need_resend_linger);
+
+ for (n = rb_first(&osdc->osds); n; ) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+ n = rb_next(n); /* close_osd() */
+
+ scan_requests(osd, skipped_map, was_full, true, need_resend,
+ need_resend_linger);
+ if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
+ memcmp(&osd->o_con.peer_addr,
+ ceph_osd_addr(osdc->osdmap, osd->o_osd),
+ sizeof(struct ceph_entity_addr)))
+ close_osd(osd);
+ }
return 0;
}
+static void kick_requests(struct ceph_osd_client *osdc,
+ struct rb_root *need_resend,
+ struct list_head *need_resend_linger)
+{
+ struct rb_node *n;
+
+ for (n = rb_first(need_resend); n; ) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+ struct ceph_osd *osd;
+
+ n = rb_next(n);
+ erase_request(need_resend, req); /* before link_request() */
+
+ WARN_ON(req->r_osd);
+ calc_target(osdc, &req->r_t, NULL, false);
+ osd = lookup_create_osd(osdc, req->r_t.osd, true);
+ link_request(osd, req);
+ if (!req->r_linger) {
+ if (!osd_homeless(osd) && !req->r_t.paused)
+ send_request(req);
+ }
+ }
+}
+
/*
* Process updated osd map.
*
@@ -2381,13 +2288,15 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
u32 nr_maps, maplen;
u32 epoch;
struct ceph_fsid fsid;
+ struct rb_root need_resend = RB_ROOT;
+ LIST_HEAD(need_resend_linger);
bool handled_incremental = false;
bool was_pauserd, was_pausewr;
bool pauserd, pausewr;
int err;
dout("%s have %u\n", __func__, osdc->osdmap->epoch);
- down_write(&osdc->map_sem);
+ down_write(&osdc->lock);
/* verify fsid */
ceph_decode_need(&p, end, sizeof(fsid), bad);
@@ -2412,7 +2321,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
osdc->osdmap->epoch + 1 == epoch) {
dout("applying incremental map %u len %d\n",
epoch, maplen);
- err = handle_one_map(osdc, p, p + maplen, true);
+ err = handle_one_map(osdc, p, p + maplen, true,
+ &need_resend, &need_resend_linger);
if (err)
goto bad;
handled_incremental = true;
@@ -2443,7 +2353,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
osdc->osdmap->epoch);
} else {
dout("taking full map %u len %d\n", epoch, maplen);
- err = handle_one_map(osdc, p, p + maplen, false);
+ err = handle_one_map(osdc, p, p + maplen, false,
+ &need_resend, &need_resend_linger);
if (err)
goto bad;
}
@@ -2464,20 +2375,60 @@ done:
if (was_pauserd || was_pausewr || pauserd || pausewr)
maybe_request_map(osdc);
- mutex_lock(&osdc->request_mutex);
- __send_queued(osdc);
- mutex_unlock(&osdc->request_mutex);
+ kick_requests(osdc, &need_resend, &need_resend_linger);
ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
osdc->osdmap->epoch);
- up_write(&osdc->map_sem);
+ up_write(&osdc->lock);
wake_up_all(&osdc->client->auth_wq);
return;
bad:
pr_err("osdc handle_map corrupt msg\n");
ceph_msg_dump(msg);
- up_write(&osdc->map_sem);
+ up_write(&osdc->lock);
+}
+
+/*
+ * Resubmit requests pending on the given osd.
+ */
+static void kick_osd_requests(struct ceph_osd *osd)
+{
+ struct rb_node *n;
+
+ for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+
+ if (!req->r_linger) {
+ if (!req->r_t.paused)
+ send_request(req);
+ }
+ }
+}
+
+/*
+ * If the osd connection drops, we need to resubmit all requests.
+ */
+static void osd_fault(struct ceph_connection *con)
+{
+ struct ceph_osd *osd = con->private;
+ struct ceph_osd_client *osdc = osd->o_osdc;
+
+ dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+ down_write(&osdc->lock);
+ if (!osd_registered(osd)) {
+ dout("%s osd%d unknown\n", __func__, osd->o_osd);
+ goto out_unlock;
+ }
+
+ if (!reopen_osd(osd))
+ kick_osd_requests(osd);
+ maybe_request_map(osdc);
+
+out_unlock:
+ up_write(&osdc->lock);
}
/*
@@ -2680,17 +2631,11 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req,
bool nofail)
{
- int rc;
-
- down_read(&osdc->map_sem);
- mutex_lock(&osdc->request_mutex);
-
- rc = __ceph_osdc_start_request(osdc, req, nofail);
+ down_read(&osdc->lock);
+ submit_request(req, false);
+ up_read(&osdc->lock);
- mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
-
- return rc;
+ return 0;
}
EXPORT_SYMBOL(ceph_osdc_start_request);
@@ -2703,13 +2648,12 @@ void ceph_osdc_cancel_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
- mutex_lock(&osdc->request_mutex);
+ down_write(&osdc->lock);
if (req->r_linger)
__unregister_linger_request(osdc, req);
- __unregister_request(osdc, req);
- mutex_unlock(&osdc->request_mutex);
-
- dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid);
+ if (req->r_osd)
+ cancel_request(req);
+ up_write(&osdc->lock);
}
EXPORT_SYMBOL(ceph_osdc_cancel_request);
@@ -2744,32 +2688,40 @@ EXPORT_SYMBOL(ceph_osdc_wait_request);
*/
void ceph_osdc_sync(struct ceph_osd_client *osdc)
{
- struct ceph_osd_request *req;
- u64 last_tid, next_tid = 0;
+ struct rb_node *n, *p;
+ u64 last_tid = atomic64_read(&osdc->last_tid);
- mutex_lock(&osdc->request_mutex);
- last_tid = osdc->last_tid;
- while (1) {
- req = __lookup_request_ge(osdc, next_tid);
- if (!req)
- break;
- if (req->r_tid > last_tid)
- break;
+again:
+ down_read(&osdc->lock);
+ for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+ mutex_lock(&osd->lock);
+ for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
+ struct ceph_osd_request *req =
+ rb_entry(p, struct ceph_osd_request, r_node);
+
+ if (req->r_tid > last_tid)
+ break;
+
+ if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
+ continue;
- next_tid = req->r_tid + 1;
- if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
- continue;
+ ceph_osdc_get_request(req);
+ mutex_unlock(&osd->lock);
+ up_read(&osdc->lock);
+ dout("%s waiting on req %p tid %llu last_tid %llu\n",
+ __func__, req, req->r_tid, last_tid);
+ wait_for_completion(&req->r_safe_completion);
+ ceph_osdc_put_request(req);
+ goto again;
+ }
- ceph_osdc_get_request(req);
- mutex_unlock(&osdc->request_mutex);
- dout("sync waiting on tid %llu (last is %llu)\n",
- req->r_tid, last_tid);
- wait_for_completion(&req->r_safe_completion);
- mutex_lock(&osdc->request_mutex);
- ceph_osdc_put_request(req);
+ mutex_unlock(&osd->lock);
}
- mutex_unlock(&osdc->request_mutex);
- dout("sync done (thru tid %llu)\n", last_tid);
+
+ up_read(&osdc->lock);
+ dout("%s done last_tid %llu\n", __func__, last_tid);
}
EXPORT_SYMBOL(ceph_osdc_sync);
@@ -2793,18 +2745,14 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
dout("init\n");
osdc->client = client;
- init_rwsem(&osdc->map_sem);
- mutex_init(&osdc->request_mutex);
- osdc->last_tid = 0;
+ init_rwsem(&osdc->lock);
osdc->osds = RB_ROOT;
INIT_LIST_HEAD(&osdc->osd_lru);
spin_lock_init(&osdc->osd_lru_lock);
- osdc->requests = RB_ROOT;
- INIT_LIST_HEAD(&osdc->req_lru);
- INIT_LIST_HEAD(&osdc->req_unsent);
- INIT_LIST_HEAD(&osdc->req_notarget);
INIT_LIST_HEAD(&osdc->req_linger);
- osdc->num_requests = 0;
+ osd_init(&osdc->homeless_osd);
+ osdc->homeless_osd.o_osdc = osdc;
+ osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
spin_lock_init(&osdc->event_lock);
@@ -2861,13 +2809,19 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
cancel_delayed_work_sync(&osdc->timeout_work);
cancel_delayed_work_sync(&osdc->osds_timeout_work);
- mutex_lock(&osdc->request_mutex);
+ down_write(&osdc->lock);
while (!RB_EMPTY_ROOT(&osdc->osds)) {
struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
struct ceph_osd, o_node);
- remove_osd(osdc, osd);
+ close_osd(osd);
}
- mutex_unlock(&osdc->request_mutex);
+ up_write(&osdc->lock);
+ WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
+ osd_cleanup(&osdc->homeless_osd);
+
+ WARN_ON(!list_empty(&osdc->osd_lru));
+ WARN_ON(atomic_read(&osdc->num_requests));
+ WARN_ON(atomic_read(&osdc->num_homeless));
ceph_osdmap_destroy(osdc->osdmap);
mempool_destroy(osdc->req_mempool);
@@ -2982,19 +2936,15 @@ EXPORT_SYMBOL(ceph_osdc_cleanup);
static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
{
struct ceph_osd *osd = con->private;
- struct ceph_osd_client *osdc;
+ struct ceph_osd_client *osdc = osd->o_osdc;
int type = le16_to_cpu(msg->hdr.type);
- if (!osd)
- goto out;
- osdc = osd->o_osdc;
-
switch (type) {
case CEPH_MSG_OSD_MAP:
ceph_osdc_handle_map(osdc, msg);
break;
case CEPH_MSG_OSD_OPREPLY:
- handle_reply(osdc, msg);
+ handle_reply(osd, msg);
break;
case CEPH_MSG_WATCH_NOTIFY:
handle_watch_notify(osdc, msg);
@@ -3004,7 +2954,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
pr_err("received unknown message type %d %s\n", type,
ceph_msg_type_name(type));
}
-out:
+
ceph_msg_put(msg);
}
@@ -3019,21 +2969,27 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
{
struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc = osd->o_osdc;
- struct ceph_msg *m;
+ struct ceph_msg *m = NULL;
struct ceph_osd_request *req;
int front_len = le32_to_cpu(hdr->front_len);
int data_len = le32_to_cpu(hdr->data_len);
- u64 tid;
+ u64 tid = le64_to_cpu(hdr->tid);
- tid = le64_to_cpu(hdr->tid);
- mutex_lock(&osdc->request_mutex);
- req = lookup_request(&osdc->requests, tid);
+ down_read(&osdc->lock);
+ if (!osd_registered(osd)) {
+ dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
+ *skip = 1;
+ goto out_unlock_osdc;
+ }
+ WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
+
+ mutex_lock(&osd->lock);
+ req = lookup_request(&osd->o_requests, tid);
if (!req) {
dout("%s osd%d tid %llu unknown, skipping\n", __func__,
osd->o_osd, tid);
- m = NULL;
*skip = 1;
- goto out;
+ goto out_unlock_session;
}
ceph_msg_revoke_incoming(req->r_reply);
@@ -3045,7 +3001,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
false);
if (!m)
- goto out;
+ goto out_unlock_session;
ceph_msg_put(req->r_reply);
req->r_reply = m;
}
@@ -3056,14 +3012,16 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
req->r_reply->data_length);
m = NULL;
*skip = 1;
- goto out;
+ goto out_unlock_session;
}
m = ceph_msg_get(req->r_reply);
dout("get_reply tid %lld %p\n", tid, m);
-out:
- mutex_unlock(&osdc->request_mutex);
+out_unlock_session:
+ mutex_unlock(&osd->lock);
+out_unlock_osdc:
+ up_read(&osdc->lock);
return m;
}
@@ -3083,8 +3041,8 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
case CEPH_MSG_OSD_OPREPLY:
return get_reply(con, hdr, skip);
default:
- pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
- osd->o_osd);
+ pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
+ osd->o_osd, type);
*skip = 1;
return NULL;
}
@@ -3188,5 +3146,5 @@ static const struct ceph_connection_operations osd_con_ops = {
.alloc_msg = alloc_msg,
.sign_message = osd_sign_message,
.check_message_signature = osd_check_message_signature,
- .fault = osd_reset,
+ .fault = osd_fault,
};