diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2010-03-30 01:42:25 +0400 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2010-03-30 01:42:25 +0400 |
commit | 9f321603724be7386ea39ea41fd885954db60a4a (patch) | |
tree | efd64c26c2fb2698ecd95c2f10dc1016b45ba4a4 /fs | |
parent | 9d54e2c0b0a03b0f05fc4f988323c858ec9d7740 (diff) | |
parent | 82593f87b6c1922a8f8317bb165c6c7794fa4639 (diff) | |
download | linux-9f321603724be7386ea39ea41fd885954db60a4a.tar.xz |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (28 commits)
ceph: update discussion list address in MAINTAINERS
ceph: some documentations fixes
ceph: fix use after free on mds __unregister_request
ceph: avoid loaded term 'OSD' in documention
ceph: fix possible double-free of mds request reference
ceph: fix session check on mds reply
ceph: handle kmalloc() failure
ceph: propagate mds session allocation failures to caller
ceph: make write_begin wait propagate ERESTARTSYS
ceph: fix snap rebuild condition
ceph: avoid reopening osd connections when address hasn't changed
ceph: rename r_sent_stamp r_stamp
ceph: fix connection fault con_work reentrancy problem
ceph: prevent dup stale messages to console for restarting mds
ceph: fix pg pool decoding from incremental osdmap update
ceph: fix mds sync() race with completing requests
ceph: only release unused caps with mds requests
ceph: clean up handle_cap_grant, handle_caps wrt session mutex
ceph: fix session locking in handle_caps, ceph_check_caps
ceph: drop unnecessary WARN_ON in caps migration
...
Diffstat (limited to 'fs')
-rw-r--r-- | fs/ceph/addr.c | 10 | ||||
-rw-r--r-- | fs/ceph/auth_x.c | 53 | ||||
-rw-r--r-- | fs/ceph/caps.c | 73 | ||||
-rw-r--r-- | fs/ceph/dir.c | 4 | ||||
-rw-r--r-- | fs/ceph/inode.c | 16 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 43 | ||||
-rw-r--r-- | fs/ceph/messenger.c | 19 | ||||
-rw-r--r-- | fs/ceph/messenger.h | 1 | ||||
-rw-r--r-- | fs/ceph/osd_client.c | 29 | ||||
-rw-r--r-- | fs/ceph/osd_client.h | 2 | ||||
-rw-r--r-- | fs/ceph/osdmap.c | 17 | ||||
-rw-r--r-- | fs/ceph/snap.c | 6 |
12 files changed, 182 insertions, 91 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 23bb0ceabe31..ce8ef6107727 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -919,6 +919,10 @@ static int context_is_writeable_or_written(struct inode *inode, /* * We are only allowed to write into/dirty the page if the page is * clean, or already dirty within the same snap context. + * + * called with page locked. + * return success with page locked, + * or any failure (incl -EAGAIN) with page unlocked. */ static int ceph_update_writeable_page(struct file *file, loff_t pos, unsigned len, @@ -961,9 +965,11 @@ retry_locked: snapc = ceph_get_snap_context((void *)page->private); unlock_page(page); ceph_queue_writeback(inode); - wait_event_interruptible(ci->i_cap_wq, + r = wait_event_interruptible(ci->i_cap_wq, context_is_writeable_or_written(inode, snapc)); ceph_put_snap_context(snapc); + if (r == -ERESTARTSYS) + return r; return -EAGAIN; } @@ -1035,7 +1041,7 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, int r; do { - /* get a page*/ + /* get a page */ page = grab_cache_page_write_begin(mapping, index, 0); if (!page) return -ENOMEM; diff --git a/fs/ceph/auth_x.c b/fs/ceph/auth_x.c index f0318427b6da..8d8a84964763 100644 --- a/fs/ceph/auth_x.c +++ b/fs/ceph/auth_x.c @@ -28,6 +28,12 @@ static int ceph_x_is_authenticated(struct ceph_auth_client *ac) return (ac->want_keys & xi->have_keys) == ac->want_keys; } +static int ceph_x_encrypt_buflen(int ilen) +{ + return sizeof(struct ceph_x_encrypt_header) + ilen + 16 + + sizeof(u32); +} + static int ceph_x_encrypt(struct ceph_crypto_key *secret, void *ibuf, int ilen, void *obuf, size_t olen) { @@ -150,6 +156,11 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, struct timespec validity; struct ceph_crypto_key old_key; void *tp, *tpend; + struct ceph_timespec new_validity; + struct ceph_crypto_key new_session_key; + struct ceph_buffer *new_ticket_blob; + unsigned long new_expires, new_renew_after; + u64 new_secret_id; ceph_decode_need(&p, end, sizeof(u32) + 1, bad); @@ -182,16 +193,16 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, goto bad; memcpy(&old_key, &th->session_key, sizeof(old_key)); - ret = ceph_crypto_key_decode(&th->session_key, &dp, dend); + ret = ceph_crypto_key_decode(&new_session_key, &dp, dend); if (ret) goto out; - ceph_decode_copy(&dp, &th->validity, sizeof(th->validity)); - ceph_decode_timespec(&validity, &th->validity); - th->expires = get_seconds() + validity.tv_sec; - th->renew_after = th->expires - (validity.tv_sec / 4); - dout(" expires=%lu renew_after=%lu\n", th->expires, - th->renew_after); + ceph_decode_copy(&dp, &new_validity, sizeof(new_validity)); + ceph_decode_timespec(&validity, &new_validity); + new_expires = get_seconds() + validity.tv_sec; + new_renew_after = new_expires - (validity.tv_sec / 4); + dout(" expires=%lu renew_after=%lu\n", new_expires, + new_renew_after); /* ticket blob for service */ ceph_decode_8_safe(&p, end, is_enc, bad); @@ -216,10 +227,21 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, dout(" ticket blob is %d bytes\n", dlen); ceph_decode_need(&tp, tpend, 1 + sizeof(u64), bad); struct_v = ceph_decode_8(&tp); - th->secret_id = ceph_decode_64(&tp); - ret = ceph_decode_buffer(&th->ticket_blob, &tp, tpend); + new_secret_id = ceph_decode_64(&tp); + ret = ceph_decode_buffer(&new_ticket_blob, &tp, tpend); if (ret) goto out; + + /* all is well, update our ticket */ + ceph_crypto_key_destroy(&th->session_key); + if (th->ticket_blob) + ceph_buffer_put(th->ticket_blob); + th->session_key = new_session_key; + th->ticket_blob = new_ticket_blob; + th->validity = new_validity; + th->secret_id = new_secret_id; + th->expires = new_expires; + th->renew_after = new_renew_after; dout(" got ticket service %d (%s) secret_id %lld len %d\n", type, ceph_entity_type_name(type), th->secret_id, (int)th->ticket_blob->vec.iov_len); @@ -242,7 +264,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, struct ceph_x_ticket_handler *th, struct ceph_x_authorizer *au) { - int len; + int maxlen; struct ceph_x_authorize_a *msg_a; struct ceph_x_authorize_b msg_b; void *p, *end; @@ -253,15 +275,15 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, dout("build_authorizer for %s %p\n", ceph_entity_type_name(th->service), au); - len = sizeof(*msg_a) + sizeof(msg_b) + sizeof(u32) + - ticket_blob_len + 16; - dout(" need len %d\n", len); - if (au->buf && au->buf->alloc_len < len) { + maxlen = sizeof(*msg_a) + sizeof(msg_b) + + ceph_x_encrypt_buflen(ticket_blob_len); + dout(" need len %d\n", maxlen); + if (au->buf && au->buf->alloc_len < maxlen) { ceph_buffer_put(au->buf); au->buf = NULL; } if (!au->buf) { - au->buf = ceph_buffer_new(len, GFP_NOFS); + au->buf = ceph_buffer_new(maxlen, GFP_NOFS); if (!au->buf) return -ENOMEM; } @@ -296,6 +318,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, au->buf->vec.iov_len = p - au->buf->vec.iov_base; dout(" built authorizer nonce %llx len %d\n", au->nonce, (int)au->buf->vec.iov_len); + BUG_ON(au->buf->vec.iov_len > maxlen); return 0; out_buf: diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index db122bb357b8..7d0a0d0adc18 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1407,6 +1407,7 @@ static int try_nonblocking_invalidate(struct inode *inode) */ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_session *session) + __releases(session->s_mutex) { struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode); struct ceph_mds_client *mdsc = &client->mdsc; @@ -1414,7 +1415,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_cap *cap; int file_wanted, used; int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ - int drop_session_lock = session ? 0 : 1; int issued, implemented, want, retain, revoking, flushing = 0; int mds = -1; /* keep track of how far we've gone through i_caps list to avoid an infinite loop on retry */ @@ -1639,7 +1639,7 @@ ack: if (queue_invalidate) ceph_queue_invalidate(inode); - if (session && drop_session_lock) + if (session) mutex_unlock(&session->s_mutex); if (took_snap_rwsem) up_read(&mdsc->snap_rwsem); @@ -2195,18 +2195,19 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, * Handle a cap GRANT message from the MDS. (Note that a GRANT may * actually be a revocation if it specifies a smaller cap set.) * - * caller holds s_mutex. + * caller holds s_mutex and i_lock, we drop both. + * * return value: * 0 - ok * 1 - check_caps on auth cap only (writeback) * 2 - check_caps (ack revoke) */ -static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, - struct ceph_mds_session *session, - struct ceph_cap *cap, - struct ceph_buffer *xattr_buf) +static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, + struct ceph_mds_session *session, + struct ceph_cap *cap, + struct ceph_buffer *xattr_buf) __releases(inode->i_lock) - + __releases(session->s_mutex) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; @@ -2216,7 +2217,7 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, u64 size = le64_to_cpu(grant->size); u64 max_size = le64_to_cpu(grant->max_size); struct timespec mtime, atime, ctime; - int reply = 0; + int check_caps = 0; int wake = 0; int writeback = 0; int revoked_rdcache = 0; @@ -2329,11 +2330,12 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER) writeback = 1; /* will delay ack */ else if (dirty & ~newcaps) - reply = 1; /* initiate writeback in check_caps */ + check_caps = 1; /* initiate writeback in check_caps */ else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 || revoked_rdcache) - reply = 2; /* send revoke ack in check_caps */ + check_caps = 2; /* send revoke ack in check_caps */ cap->issued = newcaps; + cap->implemented |= newcaps; } else if (cap->issued == newcaps) { dout("caps unchanged: %s -> %s\n", ceph_cap_string(cap->issued), ceph_cap_string(newcaps)); @@ -2346,6 +2348,7 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, * pending revocation */ wake = 1; } + BUG_ON(cap->issued & ~cap->implemented); spin_unlock(&inode->i_lock); if (writeback) @@ -2359,7 +2362,14 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ceph_queue_invalidate(inode); if (wake) wake_up(&ci->i_cap_wq); - return reply; + + if (check_caps == 1) + ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, + session); + else if (check_caps == 2) + ceph_check_caps(ci, CHECK_CAPS_NODELAY, session); + else + mutex_unlock(&session->s_mutex); } /* @@ -2548,9 +2558,8 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ci->i_cap_exporting_issued = cap->issued; } __ceph_remove_cap(cap); - } else { - WARN_ON(!cap); } + /* else, we already released it */ spin_unlock(&inode->i_lock); } @@ -2621,9 +2630,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, u64 cap_id; u64 size, max_size; u64 tid; - int check_caps = 0; void *snaptrace; - int r; dout("handle_caps from mds%d\n", mds); @@ -2668,8 +2675,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, case CEPH_CAP_OP_IMPORT: handle_cap_import(mdsc, inode, h, session, snaptrace, le32_to_cpu(h->snap_trace_len)); - check_caps = 1; /* we may have sent a RELEASE to the old auth */ - goto done; + ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, + session); + goto done_unlocked; } /* the rest require a cap */ @@ -2686,16 +2694,8 @@ void ceph_handle_caps(struct ceph_mds_session *session, switch (op) { case CEPH_CAP_OP_REVOKE: case CEPH_CAP_OP_GRANT: - r = handle_cap_grant(inode, h, session, cap, msg->middle); - if (r == 1) - ceph_check_caps(ceph_inode(inode), - CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, - session); - else if (r == 2) - ceph_check_caps(ceph_inode(inode), - CHECK_CAPS_NODELAY, - session); - break; + handle_cap_grant(inode, h, session, cap, msg->middle); + goto done_unlocked; case CEPH_CAP_OP_FLUSH_ACK: handle_cap_flush_ack(inode, tid, h, session, cap); @@ -2713,9 +2713,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, done: mutex_unlock(&session->s_mutex); - - if (check_caps) - ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, NULL); +done_unlocked: if (inode) iput(inode); return; @@ -2838,11 +2836,18 @@ int ceph_encode_inode_release(void **p, struct inode *inode, struct ceph_cap *cap; struct ceph_mds_request_release *rel = *p; int ret = 0; - - dout("encode_inode_release %p mds%d drop %s unless %s\n", inode, - mds, ceph_cap_string(drop), ceph_cap_string(unless)); + int used = 0; spin_lock(&inode->i_lock); + used = __ceph_caps_used(ci); + + dout("encode_inode_release %p mds%d used %s drop %s unless %s\n", inode, + mds, ceph_cap_string(used), ceph_cap_string(drop), + ceph_cap_string(unless)); + + /* only drop unused caps */ + drop &= ~used; + cap = __get_cap_for_mds(ci, mds); if (cap && __cap_is_valid(cap)) { if (force || diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 5107384ee029..8a9116e15b70 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -288,8 +288,10 @@ more: CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; /* discard old result, if any */ - if (fi->last_readdir) + if (fi->last_readdir) { ceph_mdsc_put_request(fi->last_readdir); + fi->last_readdir = NULL; + } /* requery frag tree, as the frag topology may have changed */ frag = ceph_choose_frag(ceph_inode(inode), frag, NULL, NULL); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 7abe1aed819b..aca82d55cc53 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -378,6 +378,22 @@ void ceph_destroy_inode(struct inode *inode) ceph_queue_caps_release(inode); + /* + * we may still have a snap_realm reference if there are stray + * caps in i_cap_exporting_issued or i_snap_caps. + */ + if (ci->i_snap_realm) { + struct ceph_mds_client *mdsc = + &ceph_client(ci->vfs_inode.i_sb)->mdsc; + struct ceph_snap_realm *realm = ci->i_snap_realm; + + dout(" dropping residual ref to snap realm %p\n", realm); + spin_lock(&realm->inodes_with_caps_lock); + list_del_init(&ci->i_snap_realm_item); + spin_unlock(&realm->inodes_with_caps_lock); + ceph_put_snap_realm(mdsc, realm); + } + kfree(ci->i_symlink); while ((n = rb_first(&ci->i_fragtree)) != NULL) { frag = rb_entry(n, struct ceph_inode_frag, node); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index a2600101ec22..5c7920be6420 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -328,6 +328,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *s; s = kzalloc(sizeof(*s), GFP_NOFS); + if (!s) + return ERR_PTR(-ENOMEM); s->s_mdsc = mdsc; s->s_mds = mds; s->s_state = CEPH_MDS_SESSION_NEW; @@ -529,7 +531,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc, { dout("__unregister_request %p tid %lld\n", req, req->r_tid); rb_erase(&req->r_node, &mdsc->request_tree); - ceph_mdsc_put_request(req); + RB_CLEAR_NODE(&req->r_node); if (req->r_unsafe_dir) { struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); @@ -538,6 +540,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc, list_del_init(&req->r_unsafe_dir_item); spin_unlock(&ci->i_unsafe_lock); } + + ceph_mdsc_put_request(req); } /* @@ -862,6 +866,7 @@ static int send_renew_caps(struct ceph_mds_client *mdsc, if (time_after_eq(jiffies, session->s_cap_ttl) && time_after_eq(session->s_cap_ttl, session->s_renew_requested)) pr_info("mds%d caps stale\n", session->s_mds); + session->s_renew_requested = jiffies; /* do not try to renew caps until a recovering mds has reconnected * with its clients. */ @@ -874,7 +879,6 @@ static int send_renew_caps(struct ceph_mds_client *mdsc, dout("send_renew_caps to mds%d (%s)\n", session->s_mds, ceph_mds_state_name(state)); - session->s_renew_requested = jiffies; msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, ++session->s_renew_seq); if (IS_ERR(msg)) @@ -1566,8 +1570,13 @@ static int __do_request(struct ceph_mds_client *mdsc, /* get, open session */ session = __ceph_lookup_mds_session(mdsc, mds); - if (!session) + if (!session) { session = register_session(mdsc, mds); + if (IS_ERR(session)) { + err = PTR_ERR(session); + goto finish; + } + } dout("do_request mds%d session %p state %s\n", mds, session, session_state_name(session->s_state)); if (session->s_state != CEPH_MDS_SESSION_OPEN && @@ -1770,7 +1779,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) dout("handle_reply %p\n", req); /* correct session? */ - if (!req->r_session && req->r_session != session) { + if (req->r_session != session) { pr_err("mdsc_handle_reply got %llu on session mds%d" " not mds%d\n", tid, session->s_mds, req->r_session ? req->r_session->s_mds : -1); @@ -2682,29 +2691,41 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) */ static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) { - struct ceph_mds_request *req = NULL; + struct ceph_mds_request *req = NULL, *nextreq; struct rb_node *n; mutex_lock(&mdsc->mutex); dout("wait_unsafe_requests want %lld\n", want_tid); +restart: req = __get_oldest_req(mdsc); while (req && req->r_tid <= want_tid) { + /* find next request */ + n = rb_next(&req->r_node); + if (n) + nextreq = rb_entry(n, struct ceph_mds_request, r_node); + else + nextreq = NULL; if ((req->r_op & CEPH_MDS_OP_WRITE)) { /* write op */ ceph_mdsc_get_request(req); + if (nextreq) + ceph_mdsc_get_request(nextreq); mutex_unlock(&mdsc->mutex); dout("wait_unsafe_requests wait on %llu (want %llu)\n", req->r_tid, want_tid); wait_for_completion(&req->r_safe_completion); mutex_lock(&mdsc->mutex); - n = rb_next(&req->r_node); ceph_mdsc_put_request(req); - } else { - n = rb_next(&req->r_node); + if (!nextreq) + break; /* next dne before, so we're done! */ + if (RB_EMPTY_NODE(&nextreq->r_node)) { + /* next request was removed from tree */ + ceph_mdsc_put_request(nextreq); + goto restart; + } + ceph_mdsc_put_request(nextreq); /* won't go away */ } - if (!n) - break; - req = rb_entry(n, struct ceph_mds_request, r_node); + req = nextreq; } mutex_unlock(&mdsc->mutex); dout("wait_unsafe_requests done\n"); diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 781656a49bf8..a32f0f896d9f 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -366,6 +366,14 @@ void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) } /* + * return true if this connection ever successfully opened + */ +bool ceph_con_opened(struct ceph_connection *con) +{ + return con->connect_seq > 0; +} + +/* * generic get/put */ struct ceph_connection *ceph_con_get(struct ceph_connection *con) @@ -830,13 +838,6 @@ static void prepare_read_connect(struct ceph_connection *con) con->in_base_pos = 0; } -static void prepare_read_connect_retry(struct ceph_connection *con) -{ - dout("prepare_read_connect_retry %p\n", con); - con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->actual_peer_addr) - + sizeof(con->peer_addr_for_me); -} - static void prepare_read_ack(struct ceph_connection *con) { dout("prepare_read_ack %p\n", con); @@ -1146,7 +1147,7 @@ static int process_connect(struct ceph_connection *con) } con->auth_retry = 1; prepare_write_connect(con->msgr, con, 0); - prepare_read_connect_retry(con); + prepare_read_connect(con); break; case CEPH_MSGR_TAG_RESETSESSION: @@ -1843,8 +1844,6 @@ static void ceph_fault(struct ceph_connection *con) goto out; } - clear_bit(BUSY, &con->state); /* to avoid an improbable race */ - mutex_lock(&con->mutex); if (test_bit(CLOSED, &con->state)) goto out_unlock; diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index 4caaa5911110..a343dae73cdc 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h @@ -223,6 +223,7 @@ extern void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con); extern void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr); +extern bool ceph_con_opened(struct ceph_connection *con); extern void ceph_con_close(struct ceph_connection *con); extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index dbe63db9762f..c7b4dedaace6 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c @@ -413,11 +413,22 @@ static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all) */ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) { + struct ceph_osd_request *req; int ret = 0; dout("__reset_osd %p osd%d\n", osd, osd->o_osd); if (list_empty(&osd->o_requests)) { __remove_osd(osdc, osd); + } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], + &osd->o_con.peer_addr, + sizeof(osd->o_con.peer_addr)) == 0 && + !ceph_con_opened(&osd->o_con)) { + dout(" osd addr hasn't changed and connection never opened," + " letting msgr retry"); + /* touch each r_stamp for handle_timeout()'s benfit */ + list_for_each_entry(req, &osd->o_requests, r_osd_item) + req->r_stamp = jiffies; + ret = -EAGAIN; } else { ceph_con_close(&osd->o_con); ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]); @@ -633,7 +644,7 @@ static int __send_request(struct ceph_osd_client *osdc, reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ reqhead->reassert_version = req->r_reassert_version; - req->r_sent_stamp = jiffies; + req->r_stamp = jiffies; list_move_tail(&osdc->req_lru, &req->r_req_lru_item); ceph_msg_get(req->r_request); /* send consumes a ref */ @@ -660,7 +671,7 @@ static void handle_timeout(struct work_struct *work) unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ; unsigned long keepalive = osdc->client->mount_args->osd_keepalive_timeout * HZ; - unsigned long last_sent = 0; + unsigned long last_stamp = 0; struct rb_node *p; struct list_head slow_osds; @@ -697,12 +708,12 @@ static void handle_timeout(struct work_struct *work) req = list_entry(osdc->req_lru.next, struct ceph_osd_request, r_req_lru_item); - if (time_before(jiffies, req->r_sent_stamp + timeout)) + if (time_before(jiffies, req->r_stamp + timeout)) break; - BUG_ON(req == last_req && req->r_sent_stamp == last_sent); + BUG_ON(req == last_req && req->r_stamp == last_stamp); last_req = req; - last_sent = req->r_sent_stamp; + last_stamp = req->r_stamp; osd = req->r_osd; BUG_ON(!osd); @@ -718,7 +729,7 @@ static void handle_timeout(struct work_struct *work) */ INIT_LIST_HEAD(&slow_osds); list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { - if (time_before(jiffies, req->r_sent_stamp + keepalive)) + if (time_before(jiffies, req->r_stamp + keepalive)) break; osd = req->r_osd; @@ -862,7 +873,9 @@ static int __kick_requests(struct ceph_osd_client *osdc, dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); if (kickosd) { - __reset_osd(osdc, kickosd); + err = __reset_osd(osdc, kickosd); + if (err == -EAGAIN) + return 1; } else { for (p = rb_first(&osdc->osds); p; p = n) { struct ceph_osd *osd = @@ -913,7 +926,7 @@ static int __kick_requests(struct ceph_osd_client *osdc, kick: dout("kicking %p tid %llu osd%d\n", req, req->r_tid, - req->r_osd->o_osd); + req->r_osd ? req->r_osd->o_osd : -1); req->r_flags |= CEPH_OSD_FLAG_RETRY; err = __send_request(osdc, req); if (err) { diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h index 1b1a3ca43afc..b0759911e7c3 100644 --- a/fs/ceph/osd_client.h +++ b/fs/ceph/osd_client.h @@ -70,7 +70,7 @@ struct ceph_osd_request { char r_oid[40]; /* object name */ int r_oid_len; - unsigned long r_sent_stamp; + unsigned long r_stamp; /* send OR check time */ bool r_resend; /* msg send failed, needs retry */ struct ceph_file_layout r_file_layout; diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c index b83f2692b835..d82fe87c2a6e 100644 --- a/fs/ceph/osdmap.c +++ b/fs/ceph/osdmap.c @@ -480,6 +480,14 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id) return NULL; } +void __decode_pool(void **p, struct ceph_pg_pool_info *pi) +{ + ceph_decode_copy(p, &pi->v, sizeof(pi->v)); + calc_pg_masks(pi); + *p += le32_to_cpu(pi->v.num_snaps) * sizeof(u64); + *p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2; +} + /* * decode a full map. */ @@ -526,12 +534,8 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ev, CEPH_PG_POOL_VERSION); goto bad; } - ceph_decode_copy(p, &pi->v, sizeof(pi->v)); + __decode_pool(p, pi); __insert_pg_pool(&map->pg_pools, pi); - calc_pg_masks(pi); - *p += le32_to_cpu(pi->v.num_snaps) * sizeof(u64); - *p += le32_to_cpu(pi->v.num_removed_snap_intervals) - * sizeof(u64) * 2; } ceph_decode_32_safe(p, end, map->pool_max, bad); @@ -714,8 +718,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, pi->id = pool; __insert_pg_pool(&map->pg_pools, pi); } - ceph_decode_copy(p, &pi->v, sizeof(pi->v)); - calc_pg_masks(pi); + __decode_pool(p, pi); } /* old_pool */ diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index bf2a5f3846a4..df04e210a055 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -314,9 +314,9 @@ static int build_snap_context(struct ceph_snap_realm *realm) because we rebuild_snap_realms() works _downward_ in hierarchy after each update.) */ if (realm->cached_context && - realm->cached_context->seq <= realm->seq && + realm->cached_context->seq == realm->seq && (!parent || - realm->cached_context->seq <= parent->cached_context->seq)) { + realm->cached_context->seq >= parent->cached_context->seq)) { dout("build_snap_context %llx %p: %p seq %lld (%d snaps)" " (unchanged)\n", realm->ino, realm, realm->cached_context, @@ -818,7 +818,9 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, * queued (again) by ceph_update_snap_trace() * below. Queue it _now_, under the old context. */ + spin_lock(&realm->inodes_with_caps_lock); list_del_init(&ci->i_snap_realm_item); + spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&inode->i_lock); ceph_queue_cap_snap(ci, |