summaryrefslogtreecommitdiff
path: root/net/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c31
-rw-r--r--net/ceph/osd_client.c216
-rw-r--r--net/ceph/osdmap.c19
3 files changed, 149 insertions, 117 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3b3d33ea9ed8..c6413c360771 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -168,12 +168,6 @@ static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
static struct lock_class_key socket_class;
#endif
-/*
- * When skipping (ignoring) a block of input we read it into a "skip
- * buffer," which is this many bytes in size.
- */
-#define SKIP_BUF_SIZE 1024
-
static void queue_con(struct ceph_connection *con);
static void cancel_con(struct ceph_connection *con);
static void ceph_con_workfn(struct work_struct *);
@@ -520,12 +514,18 @@ static int ceph_tcp_connect(struct ceph_connection *con)
return 0;
}
+/*
+ * If @buf is NULL, discard up to @len bytes.
+ */
static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
{
struct kvec iov = {buf, len};
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
int r;
+ if (!buf)
+ msg.msg_flags |= MSG_TRUNC;
+
iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, len);
r = sock_recvmsg(sock, &msg, msg.msg_flags);
if (r == -EAGAIN)
@@ -2575,9 +2575,6 @@ static int try_write(struct ceph_connection *con)
con->state != CON_STATE_OPEN)
return 0;
-more:
- dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
-
/* open the socket first? */
if (con->state == CON_STATE_PREOPEN) {
BUG_ON(con->sock);
@@ -2598,7 +2595,8 @@ more:
}
}
-more_kvec:
+more:
+ dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
BUG_ON(!con->sock);
/* kvec data queued? */
@@ -2623,7 +2621,7 @@ more_kvec:
ret = write_partial_message_data(con);
if (ret == 1)
- goto more_kvec; /* we need to send the footer, too! */
+ goto more; /* we need to send the footer, too! */
if (ret == 0)
goto out;
if (ret < 0) {
@@ -2659,8 +2657,6 @@ out:
return ret;
}
-
-
/*
* Read what we can from the socket.
*/
@@ -2721,16 +2717,11 @@ more:
if (con->in_base_pos < 0) {
/*
* skipping + discarding content.
- *
- * FIXME: there must be a better way to do this!
*/
- static char buf[SKIP_BUF_SIZE];
- int skip = min((int) sizeof (buf), -con->in_base_pos);
-
- dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
- ret = ceph_tcp_recvmsg(con->sock, buf, skip);
+ ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos);
if (ret <= 0)
goto out;
+ dout("skipped %d / %d bytes\n", ret, -con->in_base_pos);
con->in_base_pos += ret;
if (con->in_base_pos)
goto more;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 69a2581ddbba..a00c74f1154e 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -766,7 +766,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_extent_dup_last);
-void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
+int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *class, const char *method)
{
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
@@ -778,7 +778,9 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
BUG_ON(opcode != CEPH_OSD_OP_CALL);
pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
- BUG_ON(!pagelist);
+ if (!pagelist)
+ return -ENOMEM;
+
ceph_pagelist_init(pagelist);
op->cls.class_name = class;
@@ -798,6 +800,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
op->indata_len = payload_len;
+ return 0;
}
EXPORT_SYMBOL(osd_req_op_cls_init);
@@ -1026,7 +1029,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
truncate_size, truncate_seq);
}
- req->r_abort_on_full = true;
req->r_flags = flags;
req->r_base_oloc.pool = layout->pool_id;
req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
@@ -1054,6 +1056,38 @@ EXPORT_SYMBOL(ceph_osdc_new_request);
DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
+/*
+ * Call @fn on each OSD request as long as @fn returns 0.
+ */
+static void for_each_request(struct ceph_osd_client *osdc,
+ int (*fn)(struct ceph_osd_request *req, void *arg),
+ void *arg)
+{
+ struct rb_node *n, *p;
+
+ for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+ for (p = rb_first(&osd->o_requests); p; ) {
+ struct ceph_osd_request *req =
+ rb_entry(p, struct ceph_osd_request, r_node);
+
+ p = rb_next(p);
+ if (fn(req, arg))
+ return;
+ }
+ }
+
+ for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
+ struct ceph_osd_request *req =
+ rb_entry(p, struct ceph_osd_request, r_node);
+
+ p = rb_next(p);
+ if (fn(req, arg))
+ return;
+ }
+}
+
static bool osd_homeless(struct ceph_osd *osd)
{
return osd->o_osd == CEPH_HOMELESS_OSD;
@@ -1395,7 +1429,6 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
bool recovery_deletes = ceph_osdmap_flag(osdc,
CEPH_OSDMAP_RECOVERY_DELETES);
enum calc_target_result ct_res;
- int ret;
t->epoch = osdc->osdmap->epoch;
pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
@@ -1431,14 +1464,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
}
}
- ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc,
- &pgid);
- if (ret) {
- WARN_ON(ret != -ENOENT);
- t->osd = CEPH_HOMELESS_OSD;
- ct_res = CALC_TARGET_POOL_DNE;
- goto out;
- }
+ __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
last_pgid.pool = pgid.pool;
last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
@@ -2161,9 +2187,9 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
struct ceph_osd_client *osdc = req->r_osdc;
struct ceph_osd *osd;
enum calc_target_result ct_res;
+ int err = 0;
bool need_send = false;
bool promoted = false;
- bool need_abort = false;
WARN_ON(req->r_tid);
dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
@@ -2179,7 +2205,10 @@ again:
goto promote;
}
- if (osdc->osdmap->epoch < osdc->epoch_barrier) {
+ if (osdc->abort_err) {
+ dout("req %p abort_err %d\n", req, osdc->abort_err);
+ err = osdc->abort_err;
+ } else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
osdc->epoch_barrier);
req->r_t.paused = true;
@@ -2200,11 +2229,13 @@ again:
(ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
pool_full(osdc, req->r_t.base_oloc.pool))) {
dout("req %p full/pool_full\n", req);
- pr_warn_ratelimited("FULL or reached pool quota\n");
- req->r_t.paused = true;
- maybe_request_map(osdc);
- if (req->r_abort_on_full)
- need_abort = true;
+ if (osdc->abort_on_full) {
+ err = -ENOSPC;
+ } else {
+ pr_warn_ratelimited("FULL or reached pool quota\n");
+ req->r_t.paused = true;
+ maybe_request_map(osdc);
+ }
} else if (!osd_homeless(osd)) {
need_send = true;
} else {
@@ -2221,11 +2252,11 @@ again:
link_request(osd, req);
if (need_send)
send_request(req);
- else if (need_abort)
- complete_request(req, -ENOSPC);
+ else if (err)
+ complete_request(req, err);
mutex_unlock(&osd->lock);
- if (ct_res == CALC_TARGET_POOL_DNE)
+ if (!err && ct_res == CALC_TARGET_POOL_DNE)
send_map_check(req);
if (promoted)
@@ -2281,11 +2312,21 @@ static void finish_request(struct ceph_osd_request *req)
static void __complete_request(struct ceph_osd_request *req)
{
- if (req->r_callback) {
- dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
- req->r_tid, req->r_callback, req->r_result);
+ dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
+ req->r_tid, req->r_callback, req->r_result);
+
+ if (req->r_callback)
req->r_callback(req);
- }
+ complete_all(&req->r_completion);
+ ceph_osdc_put_request(req);
+}
+
+static void complete_request_workfn(struct work_struct *work)
+{
+ struct ceph_osd_request *req =
+ container_of(work, struct ceph_osd_request, r_complete_work);
+
+ __complete_request(req);
}
/*
@@ -2297,9 +2338,9 @@ static void complete_request(struct ceph_osd_request *req, int err)
req->r_result = err;
finish_request(req);
- __complete_request(req);
- complete_all(&req->r_completion);
- ceph_osdc_put_request(req);
+
+ INIT_WORK(&req->r_complete_work, complete_request_workfn);
+ queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
}
static void cancel_map_check(struct ceph_osd_request *req)
@@ -2336,6 +2377,28 @@ static void abort_request(struct ceph_osd_request *req, int err)
complete_request(req, err);
}
+static int abort_fn(struct ceph_osd_request *req, void *arg)
+{
+ int err = *(int *)arg;
+
+ abort_request(req, err);
+ return 0; /* continue iteration */
+}
+
+/*
+ * Abort all in-flight requests with @err and arrange for all future
+ * requests to be failed immediately.
+ */
+void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
+{
+ dout("%s osdc %p err %d\n", __func__, osdc, err);
+ down_write(&osdc->lock);
+ for_each_request(osdc, abort_fn, &err);
+ osdc->abort_err = err;
+ up_write(&osdc->lock);
+}
+EXPORT_SYMBOL(ceph_osdc_abort_requests);
+
static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
{
if (likely(eb > osdc->epoch_barrier)) {
@@ -2363,6 +2426,30 @@ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
/*
+ * We can end up releasing caps as a result of abort_request().
+ * In that case, we probably want to ensure that the cap release message
+ * has an updated epoch barrier in it, so set the epoch barrier prior to
+ * aborting the first request.
+ */
+static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
+{
+ struct ceph_osd_client *osdc = req->r_osdc;
+ bool *victims = arg;
+
+ if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+ (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
+ pool_full(osdc, req->r_t.base_oloc.pool))) {
+ if (!*victims) {
+ update_epoch_barrier(osdc, osdc->osdmap->epoch);
+ *victims = true;
+ }
+ abort_request(req, -ENOSPC);
+ }
+
+ return 0; /* continue iteration */
+}
+
+/*
* Drop all pending requests that are stalled waiting on a full condition to
* clear, and complete them with ENOSPC as the return code. Set the
* osdc->epoch_barrier to the latest map epoch that we've seen if any were
@@ -2370,61 +2457,11 @@ EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
*/
static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
{
- struct rb_node *n;
bool victims = false;
- dout("enter abort_on_full\n");
-
- if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
- goto out;
-
- /* Scan list and see if there is anything to abort */
- for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
- struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
- struct rb_node *m;
-
- m = rb_first(&osd->o_requests);
- while (m) {
- struct ceph_osd_request *req = rb_entry(m,
- struct ceph_osd_request, r_node);
- m = rb_next(m);
-
- if (req->r_abort_on_full) {
- victims = true;
- break;
- }
- }
- if (victims)
- break;
- }
-
- if (!victims)
- goto out;
-
- /*
- * Update the barrier to current epoch if it's behind that point,
- * since we know we have some calls to be aborted in the tree.
- */
- update_epoch_barrier(osdc, osdc->osdmap->epoch);
-
- for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
- struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
- struct rb_node *m;
-
- m = rb_first(&osd->o_requests);
- while (m) {
- struct ceph_osd_request *req = rb_entry(m,
- struct ceph_osd_request, r_node);
- m = rb_next(m);
-
- if (req->r_abort_on_full &&
- (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
- pool_full(osdc, req->r_t.target_oloc.pool)))
- abort_request(req, -ENOSPC);
- }
- }
-out:
- dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
+ if (osdc->abort_on_full &&
+ (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
+ for_each_request(osdc, abort_on_full_fn, &victims);
}
static void check_pool_dne(struct ceph_osd_request *req)
@@ -3541,8 +3578,6 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
up_read(&osdc->lock);
__complete_request(req);
- complete_all(&req->r_completion);
- ceph_osdc_put_request(req);
return;
fail_request:
@@ -4927,7 +4962,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
if (ret)
goto out_put_req;
- osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
+ ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
+ if (ret)
+ goto out_put_req;
+
if (req_page)
osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
0, false, false);
@@ -4996,6 +5034,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
if (!osdc->notify_wq)
goto out_msgpool_reply;
+ osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
+ if (!osdc->completion_wq)
+ goto out_notify_wq;
+
schedule_delayed_work(&osdc->timeout_work,
osdc->client->options->osd_keepalive_timeout);
schedule_delayed_work(&osdc->osds_timeout_work,
@@ -5003,6 +5045,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
return 0;
+out_notify_wq:
+ destroy_workqueue(osdc->notify_wq);
out_msgpool_reply:
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
out_msgpool:
@@ -5017,7 +5061,7 @@ out:
void ceph_osdc_stop(struct ceph_osd_client *osdc)
{
- flush_workqueue(osdc->notify_wq);
+ destroy_workqueue(osdc->completion_wq);
destroy_workqueue(osdc->notify_wq);
cancel_delayed_work_sync(&osdc->timeout_work);
cancel_delayed_work_sync(&osdc->osds_timeout_work);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index e22820e24f50..98c0ff3d6441 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -2146,10 +2146,10 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting,
* Should only be called with target_oid and target_oloc (as opposed to
* base_oid and base_oloc), since tiering isn't taken into account.
*/
-int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
- const struct ceph_object_id *oid,
- const struct ceph_object_locator *oloc,
- struct ceph_pg *raw_pgid)
+void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
+ const struct ceph_object_id *oid,
+ const struct ceph_object_locator *oloc,
+ struct ceph_pg *raw_pgid)
{
WARN_ON(pi->id != oloc->pool);
@@ -2165,11 +2165,8 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
int nsl = oloc->pool_ns->len;
size_t total = nsl + 1 + oid->name_len;
- if (total > sizeof(stack_buf)) {
- buf = kmalloc(total, GFP_NOIO);
- if (!buf)
- return -ENOMEM;
- }
+ if (total > sizeof(stack_buf))
+ buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL);
memcpy(buf, oloc->pool_ns->str, nsl);
buf[nsl] = '\037';
memcpy(buf + nsl + 1, oid->name, oid->name_len);
@@ -2181,7 +2178,6 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
oid->name, nsl, oloc->pool_ns->str,
raw_pgid->pool, raw_pgid->seed);
}
- return 0;
}
int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
@@ -2195,7 +2191,8 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
if (!pi)
return -ENOENT;
- return __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
+ __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
+ return 0;
}
EXPORT_SYMBOL(ceph_object_locator_to_pg);