diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/ceph/ceph_common.c | 1 | ||||
-rw-r--r-- | net/ceph/crypto.c | 4 | ||||
-rw-r--r-- | net/ceph/messenger.c | 83 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 37 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 51 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 2 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 14 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 6 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 15 |
9 files changed, 134 insertions, 79 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 69a4d30a9ccf..54a00d66509e 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -357,6 +357,7 @@ ceph_parse_options(char *options, const char *dev_name, opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; + opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT; /* get mon ip(s) */ /* ip1[:port1][,ip2[:port2]...] */ diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c index 790fe89d90c0..4440edcce0d6 100644 --- a/net/ceph/crypto.c +++ b/net/ceph/crypto.c @@ -79,10 +79,6 @@ int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *inkey) return 0; } - - -#define AES_KEY_SIZE 16 - static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void) { return crypto_alloc_blkcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e3be1d22a247..b9b0e3b5da49 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -163,6 +163,7 @@ static struct kmem_cache *ceph_msg_data_cache; static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; +static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; #ifdef CONFIG_LOCKDEP static struct lock_class_key socket_class; @@ -176,7 +177,7 @@ static struct lock_class_key socket_class; static void queue_con(struct ceph_connection *con); static void cancel_con(struct ceph_connection *con); -static void con_work(struct work_struct *); +static void ceph_con_workfn(struct work_struct *); static void con_fault(struct ceph_connection *con); /* @@ -276,22 +277,22 @@ static void _ceph_msgr_exit(void) ceph_msgr_wq = NULL; } - ceph_msgr_slab_exit(); - BUG_ON(zero_page == NULL); page_cache_release(zero_page); zero_page = NULL; + + ceph_msgr_slab_exit(); } int ceph_msgr_init(void) { + if (ceph_msgr_slab_init()) + return -ENOMEM; + BUG_ON(zero_page != NULL); zero_page = ZERO_PAGE(0); page_cache_get(zero_page); - if (ceph_msgr_slab_init()) - return -ENOMEM; - /* * The number of active work items is limited by the number of * connections, so leave @max_active at default. @@ -749,7 +750,7 @@ void ceph_con_init(struct ceph_connection *con, void *private, mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); - INIT_DELAYED_WORK(&con->work, con_work); + INIT_DELAYED_WORK(&con->work, ceph_con_workfn); con->state = CON_STATE_CLOSED; } @@ -1351,7 +1352,16 @@ static void prepare_write_keepalive(struct ceph_connection *con) { dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); - con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); + if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { + struct timespec now = CURRENT_TIME; + + con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); + ceph_encode_timespec(&con->out_temp_keepalive2, &now); + con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), + &con->out_temp_keepalive2); + } else { + con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); + } con_flag_set(con, CON_FLAG_WRITE_PENDING); } @@ -1625,6 +1635,12 @@ static void prepare_read_tag(struct ceph_connection *con) con->in_tag = CEPH_MSGR_TAG_READY; } +static void prepare_read_keepalive_ack(struct ceph_connection *con) +{ + dout("prepare_read_keepalive_ack %p\n", con); + con->in_base_pos = 0; +} + /* * Prepare to read a message. */ @@ -2322,13 +2338,6 @@ static int read_partial_message(struct ceph_connection *con) return ret; BUG_ON(!con->in_msg ^ skip); - if (con->in_msg && data_len > con->in_msg->data_length) { - pr_warn("%s skipping long message (%u > %zd)\n", - __func__, data_len, con->in_msg->data_length); - ceph_msg_put(con->in_msg); - con->in_msg = NULL; - skip = 1; - } if (skip) { /* skip this message */ dout("alloc_msg said skip message\n"); @@ -2457,6 +2466,17 @@ static void process_message(struct ceph_connection *con) mutex_lock(&con->mutex); } +static int read_keepalive_ack(struct ceph_connection *con) +{ + struct ceph_timespec ceph_ts; + size_t size = sizeof(ceph_ts); + int ret = read_partial(con, size, size, &ceph_ts); + if (ret <= 0) + return ret; + ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts); + prepare_read_tag(con); + return 1; +} /* * Write something to the socket. Called in a worker thread when the @@ -2526,6 +2546,10 @@ more_kvec: do_next: if (con->state == CON_STATE_OPEN) { + if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { + prepare_write_keepalive(con); + goto more; + } /* is anything else pending? */ if (!list_empty(&con->out_queue)) { prepare_write_message(con); @@ -2535,10 +2559,6 @@ do_next: prepare_write_ack(con); goto more; } - if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { - prepare_write_keepalive(con); - goto more; - } } /* Nothing to do! */ @@ -2641,6 +2661,9 @@ more: case CEPH_MSGR_TAG_ACK: prepare_read_ack(con); break; + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + prepare_read_keepalive_ack(con); + break; case CEPH_MSGR_TAG_CLOSE: con_close_socket(con); con->state = CON_STATE_CLOSED; @@ -2684,6 +2707,12 @@ more: process_ack(con); goto more; } + if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + ret = read_keepalive_ack(con); + if (ret <= 0) + goto out; + goto more; + } out: dout("try_read done on %p ret %d\n", con, ret); @@ -2799,7 +2828,7 @@ static void con_fault_finish(struct ceph_connection *con) /* * Do some work on a connection. Drop a connection ref when we're done. */ -static void con_work(struct work_struct *work) +static void ceph_con_workfn(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); @@ -3101,6 +3130,20 @@ void ceph_con_keepalive(struct ceph_connection *con) } EXPORT_SYMBOL(ceph_con_keepalive); +bool ceph_con_keepalive_expired(struct ceph_connection *con, + unsigned long interval) +{ + if (interval > 0 && + (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { + struct timespec now = CURRENT_TIME; + struct timespec ts; + jiffies_to_timespec(interval, &ts); + ts = timespec_add(con->last_keepalive_ack, ts); + return timespec_compare(&now, &ts) >= 0; + } + return false; +} + static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) { struct ceph_msg_data *data; diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 9d6ff1215928..edda01626a45 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -149,6 +149,10 @@ static int __open_session(struct ceph_mon_client *monc) CEPH_ENTITY_TYPE_MON, monc->cur_mon, &monc->monmap->mon_inst[monc->cur_mon].addr); + /* send an initial keepalive to ensure our timestamp is + * valid by the time we are in an OPENED state */ + ceph_con_keepalive(&monc->con); + /* initiatiate authentication handshake */ ret = ceph_auth_build_hello(monc->auth, monc->m_auth->front.iov_base, @@ -170,14 +174,19 @@ static bool __sub_expired(struct ceph_mon_client *monc) */ static void __schedule_delayed(struct ceph_mon_client *monc) { - unsigned int delay; + struct ceph_options *opt = monc->client->options; + unsigned long delay; - if (monc->cur_mon < 0 || __sub_expired(monc)) + if (monc->cur_mon < 0 || __sub_expired(monc)) { delay = 10 * HZ; - else + } else { delay = 20 * HZ; - dout("__schedule_delayed after %u\n", delay); - schedule_delayed_work(&monc->delayed_work, delay); + if (opt->monc_ping_timeout > 0) + delay = min(delay, opt->monc_ping_timeout / 3); + } + dout("__schedule_delayed after %lu\n", delay); + schedule_delayed_work(&monc->delayed_work, + round_jiffies_relative(delay)); } /* @@ -743,11 +752,23 @@ static void delayed_work(struct work_struct *work) __close_session(monc); __open_session(monc); /* continue hunting */ } else { - ceph_con_keepalive(&monc->con); + struct ceph_options *opt = monc->client->options; + int is_auth = ceph_auth_is_authenticated(monc->auth); + if (ceph_con_keepalive_expired(&monc->con, + opt->monc_ping_timeout)) { + dout("monc keepalive timeout\n"); + is_auth = 0; + __close_session(monc); + monc->hunting = true; + __open_session(monc); + } - __validate_auth(monc); + if (!monc->hunting) { + ceph_con_keepalive(&monc->con); + __validate_auth(monc); + } - if (ceph_auth_is_authenticated(monc->auth)) + if (is_auth) __send_subscribe(monc); } __schedule_delayed(monc); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 50033677c0fa..80b94e37c94a 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2817,8 +2817,9 @@ out: } /* - * lookup and return message for incoming reply. set up reply message - * pages. + * Lookup and return message for incoming reply. Don't try to do + * anything about a larger than preallocated data portion of the + * message at the moment - for now, just skip the message. */ static struct ceph_msg *get_reply(struct ceph_connection *con, struct ceph_msg_header *hdr, @@ -2836,10 +2837,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, mutex_lock(&osdc->request_mutex); req = __lookup_request(osdc, tid); if (!req) { - *skip = 1; + pr_warn("%s osd%d tid %llu unknown, skipping\n", + __func__, osd->o_osd, tid); m = NULL; - dout("get_reply unknown tid %llu from osd%d\n", tid, - osd->o_osd); + *skip = 1; goto out; } @@ -2849,10 +2850,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ceph_msg_revoke_incoming(req->r_reply); if (front_len > req->r_reply->front_alloc_len) { - pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n", - front_len, req->r_reply->front_alloc_len, - (unsigned int)con->peer_name.type, - le64_to_cpu(con->peer_name.num)); + pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", + __func__, osd->o_osd, req->r_tid, front_len, + req->r_reply->front_alloc_len); m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, false); if (!m) @@ -2860,37 +2860,22 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ceph_msg_put(req->r_reply); req->r_reply = m; } - m = ceph_msg_get(req->r_reply); - - if (data_len > 0) { - struct ceph_osd_data *osd_data; - /* - * XXX This is assuming there is only one op containing - * XXX page data. Probably OK for reads, but this - * XXX ought to be done more generally. - */ - osd_data = osd_req_op_extent_osd_data(req, 0); - if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { - if (osd_data->pages && - unlikely(osd_data->length < data_len)) { - - pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n", - tid, data_len, osd_data->length); - *skip = 1; - ceph_msg_put(m); - m = NULL; - goto out; - } - } + if (data_len > req->r_reply->data_length) { + pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", + __func__, osd->o_osd, req->r_tid, data_len, + req->r_reply->data_length); + m = NULL; + *skip = 1; + goto out; } - *skip = 0; + + m = ceph_msg_get(req->r_reply); dout("get_reply tid %lld %p\n", tid, m); out: mutex_unlock(&osdc->request_mutex); return m; - } static struct ceph_msg *alloc_msg(struct ceph_connection *con, diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 4a3125836b64..7d8f581d9f1f 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1300,7 +1300,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, ceph_decode_addr(&addr); pr_info("osd%d up\n", osd); BUG_ON(osd >= map->max_osd); - map->osd_state[osd] |= CEPH_OSD_UP; + map->osd_state[osd] |= CEPH_OSD_UP | CEPH_OSD_EXISTS; map->osd_addr[osd] = addr; } diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index b140c092d226..f14f24ee9983 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -297,7 +297,7 @@ static int rpc_complete_task(struct rpc_task *task) clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); ret = atomic_dec_and_test(&task->tk_count); if (waitqueue_active(wq)) - __wake_up_locked_key(wq, TASK_NORMAL, 1, &k); + __wake_up_locked_key(wq, TASK_NORMAL, &k); spin_unlock_irqrestore(&wq->lock, flags); return ret; } @@ -1092,14 +1092,10 @@ void rpc_destroy_mempool(void) { rpciod_stop(); - if (rpc_buffer_mempool) - mempool_destroy(rpc_buffer_mempool); - if (rpc_task_mempool) - mempool_destroy(rpc_task_mempool); - if (rpc_task_slabp) - kmem_cache_destroy(rpc_task_slabp); - if (rpc_buffer_slabp) - kmem_cache_destroy(rpc_buffer_slabp); + mempool_destroy(rpc_buffer_mempool); + mempool_destroy(rpc_task_mempool); + kmem_cache_destroy(rpc_task_slabp); + kmem_cache_destroy(rpc_buffer_slabp); rpc_destroy_wait_queue(&delay_queue); } diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index ab5dd621ae0c..2e98f4a243e5 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -614,6 +614,7 @@ static void xprt_autoclose(struct work_struct *work) clear_bit(XPRT_CLOSE_WAIT, &xprt->state); xprt->ops->close(xprt); xprt_release_write(xprt, NULL); + wake_up_bit(&xprt->state, XPRT_LOCKED); } /** @@ -723,6 +724,7 @@ void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) xprt->ops->release_xprt(xprt, NULL); out: spin_unlock_bh(&xprt->transport_lock); + wake_up_bit(&xprt->state, XPRT_LOCKED); } /** @@ -1394,6 +1396,10 @@ out: static void xprt_destroy(struct rpc_xprt *xprt) { dprintk("RPC: destroying transport %p\n", xprt); + + /* Exclude transport connect/disconnect handlers */ + wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); + del_timer_sync(&xprt->timer); rpc_xprt_debugfs_unregister(xprt); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 7be90bc1a7c2..1a85e0ed0b48 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -777,7 +777,6 @@ static void xs_sock_mark_closed(struct rpc_xprt *xprt) xs_sock_reset_connection_flags(xprt); /* Mark transport as closed and wake up all pending tasks */ xprt_disconnect_done(xprt); - xprt_force_disconnect(xprt); } /** @@ -881,8 +880,11 @@ static void xs_xprt_free(struct rpc_xprt *xprt) */ static void xs_destroy(struct rpc_xprt *xprt) { + struct sock_xprt *transport = container_of(xprt, + struct sock_xprt, xprt); dprintk("RPC: xs_destroy xprt %p\n", xprt); + cancel_delayed_work_sync(&transport->connect_worker); xs_close(xprt); xs_xprt_free(xprt); module_put(THIS_MODULE); @@ -1435,6 +1437,7 @@ out: static void xs_tcp_state_change(struct sock *sk) { struct rpc_xprt *xprt; + struct sock_xprt *transport; read_lock_bh(&sk->sk_callback_lock); if (!(xprt = xprt_from_sock(sk))) @@ -1446,13 +1449,12 @@ static void xs_tcp_state_change(struct sock *sk) sock_flag(sk, SOCK_ZAPPED), sk->sk_shutdown); + transport = container_of(xprt, struct sock_xprt, xprt); trace_rpc_socket_state_change(xprt, sk->sk_socket); switch (sk->sk_state) { case TCP_ESTABLISHED: spin_lock(&xprt->transport_lock); if (!xprt_test_and_set_connected(xprt)) { - struct sock_xprt *transport = container_of(xprt, - struct sock_xprt, xprt); /* Reset TCP record info */ transport->tcp_offset = 0; @@ -1461,6 +1463,8 @@ static void xs_tcp_state_change(struct sock *sk) transport->tcp_flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; xprt->connect_cookie++; + clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); + xprt_clear_connecting(xprt); xprt_wake_pending_tasks(xprt, -EAGAIN); } @@ -1496,6 +1500,9 @@ static void xs_tcp_state_change(struct sock *sk) smp_mb__after_atomic(); break; case TCP_CLOSE: + if (test_and_clear_bit(XPRT_SOCK_CONNECTING, + &transport->sock_state)) + xprt_clear_connecting(xprt); xs_sock_mark_closed(xprt); } out: @@ -2179,6 +2186,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) /* Tell the socket layer to start connecting... */ xprt->stat.connect_count++; xprt->stat.connect_start = jiffies; + set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); switch (ret) { case 0: @@ -2240,7 +2248,6 @@ static void xs_tcp_setup_socket(struct work_struct *work) case -EINPROGRESS: case -EALREADY: xprt_unlock_connect(xprt, transport); - xprt_clear_connecting(xprt); return; case -EINVAL: /* Happens, for instance, if the user specified a link |