diff options
Diffstat (limited to 'fs/ocfs2/cluster')
-rw-r--r-- | fs/ocfs2/cluster/heartbeat.c | 14 | ||||
-rw-r--r-- | fs/ocfs2/cluster/nodemanager.c | 198 | ||||
-rw-r--r-- | fs/ocfs2/cluster/nodemanager.h | 17 | ||||
-rw-r--r-- | fs/ocfs2/cluster/quorum.c | 4 | ||||
-rw-r--r-- | fs/ocfs2/cluster/tcp.c | 240 | ||||
-rw-r--r-- | fs/ocfs2/cluster/tcp.h | 8 | ||||
-rw-r--r-- | fs/ocfs2/cluster/tcp_internal.h | 23 |
7 files changed, 410 insertions, 94 deletions
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c index 305cba3681fe..a25ef5a50386 100644 --- a/fs/ocfs2/cluster/heartbeat.c +++ b/fs/ocfs2/cluster/heartbeat.c @@ -141,7 +141,7 @@ struct o2hb_region { * recognizes a node going up and down in one iteration */ u64 hr_generation; - struct work_struct hr_write_timeout_work; + struct delayed_work hr_write_timeout_work; unsigned long hr_last_timeout_start; /* Used during o2hb_check_slot to hold a copy of the block @@ -156,9 +156,11 @@ struct o2hb_bio_wait_ctxt { int wc_error; }; -static void o2hb_write_timeout(void *arg) +static void o2hb_write_timeout(struct work_struct *work) { - struct o2hb_region *reg = arg; + struct o2hb_region *reg = + container_of(work, struct o2hb_region, + hr_write_timeout_work.work); mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " "milliseconds\n", reg->hr_dev_name, @@ -1404,7 +1406,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, goto out; } - INIT_WORK(®->hr_write_timeout_work, o2hb_write_timeout, reg); + INIT_DELAYED_WORK(®->hr_write_timeout_work, o2hb_write_timeout); /* * A node is considered live after it has beat LIVE_THRESHOLD @@ -1551,7 +1553,7 @@ static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *g struct o2hb_region *reg = NULL; struct config_item *ret = NULL; - reg = kcalloc(1, sizeof(struct o2hb_region), GFP_KERNEL); + reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL); if (reg == NULL) goto out; /* ENOMEM */ @@ -1677,7 +1679,7 @@ struct config_group *o2hb_alloc_hb_set(void) struct o2hb_heartbeat_group *hs = NULL; struct config_group *ret = NULL; - hs = kcalloc(1, sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); + hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); if (hs == NULL) goto out; diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c index d11753c50bc1..b17333a0606b 100644 --- a/fs/ocfs2/cluster/nodemanager.c +++ b/fs/ocfs2/cluster/nodemanager.c @@ -35,7 +35,7 @@ /* for now we operate under the assertion that there can be only one * cluster active at a time. Changing this will require trickling * cluster references throughout where nodes are looked up */ -static struct o2nm_cluster *o2nm_single_cluster = NULL; +struct o2nm_cluster *o2nm_single_cluster = NULL; #define OCFS2_MAX_HB_CTL_PATH 256 static char ocfs2_hb_ctl_path[OCFS2_MAX_HB_CTL_PATH] = "/sbin/ocfs2_hb_ctl"; @@ -97,17 +97,6 @@ const char *o2nm_get_hb_ctl_path(void) } EXPORT_SYMBOL_GPL(o2nm_get_hb_ctl_path); -struct o2nm_cluster { - struct config_group cl_group; - unsigned cl_has_local:1; - u8 cl_local_node; - rwlock_t cl_nodes_lock; - struct o2nm_node *cl_nodes[O2NM_MAX_NODES]; - struct rb_root cl_node_ip_tree; - /* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */ - unsigned long cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; -}; - struct o2nm_node *o2nm_get_node_by_num(u8 node_num) { struct o2nm_node *node = NULL; @@ -543,6 +532,179 @@ static struct o2nm_node_group *to_o2nm_node_group(struct config_group *group) } #endif +struct o2nm_cluster_attribute { + struct configfs_attribute attr; + ssize_t (*show)(struct o2nm_cluster *, char *); + ssize_t (*store)(struct o2nm_cluster *, const char *, size_t); +}; + +static ssize_t o2nm_cluster_attr_write(const char *page, ssize_t count, + unsigned int *val) +{ + unsigned long tmp; + char *p = (char *)page; + + tmp = simple_strtoul(p, &p, 0); + if (!p || (*p && (*p != '\n'))) + return -EINVAL; + + if (tmp == 0) + return -EINVAL; + if (tmp >= (u32)-1) + return -ERANGE; + + *val = tmp; + + return count; +} + +static ssize_t o2nm_cluster_attr_idle_timeout_ms_read( + struct o2nm_cluster *cluster, char *page) +{ + return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms); +} + +static ssize_t o2nm_cluster_attr_idle_timeout_ms_write( + struct o2nm_cluster *cluster, const char *page, size_t count) +{ + ssize_t ret; + unsigned int val; + + ret = o2nm_cluster_attr_write(page, count, &val); + + if (ret > 0) { + if (cluster->cl_idle_timeout_ms != val + && o2net_num_connected_peers()) { + mlog(ML_NOTICE, + "o2net: cannot change idle timeout after " + "the first peer has agreed to it." + " %d connected peers\n", + o2net_num_connected_peers()); + ret = -EINVAL; + } else if (val <= cluster->cl_keepalive_delay_ms) { + mlog(ML_NOTICE, "o2net: idle timeout must be larger " + "than keepalive delay\n"); + ret = -EINVAL; + } else { + cluster->cl_idle_timeout_ms = val; + } + } + + return ret; +} + +static ssize_t o2nm_cluster_attr_keepalive_delay_ms_read( + struct o2nm_cluster *cluster, char *page) +{ + return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms); +} + +static ssize_t o2nm_cluster_attr_keepalive_delay_ms_write( + struct o2nm_cluster *cluster, const char *page, size_t count) +{ + ssize_t ret; + unsigned int val; + + ret = o2nm_cluster_attr_write(page, count, &val); + + if (ret > 0) { + if (cluster->cl_keepalive_delay_ms != val + && o2net_num_connected_peers()) { + mlog(ML_NOTICE, + "o2net: cannot change keepalive delay after" + " the first peer has agreed to it." + " %d connected peers\n", + o2net_num_connected_peers()); + ret = -EINVAL; + } else if (val >= cluster->cl_idle_timeout_ms) { + mlog(ML_NOTICE, "o2net: keepalive delay must be " + "smaller than idle timeout\n"); + ret = -EINVAL; + } else { + cluster->cl_keepalive_delay_ms = val; + } + } + + return ret; +} + +static ssize_t o2nm_cluster_attr_reconnect_delay_ms_read( + struct o2nm_cluster *cluster, char *page) +{ + return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms); +} + +static ssize_t o2nm_cluster_attr_reconnect_delay_ms_write( + struct o2nm_cluster *cluster, const char *page, size_t count) +{ + return o2nm_cluster_attr_write(page, count, + &cluster->cl_reconnect_delay_ms); +} +static struct o2nm_cluster_attribute o2nm_cluster_attr_idle_timeout_ms = { + .attr = { .ca_owner = THIS_MODULE, + .ca_name = "idle_timeout_ms", + .ca_mode = S_IRUGO | S_IWUSR }, + .show = o2nm_cluster_attr_idle_timeout_ms_read, + .store = o2nm_cluster_attr_idle_timeout_ms_write, +}; + +static struct o2nm_cluster_attribute o2nm_cluster_attr_keepalive_delay_ms = { + .attr = { .ca_owner = THIS_MODULE, + .ca_name = "keepalive_delay_ms", + .ca_mode = S_IRUGO | S_IWUSR }, + .show = o2nm_cluster_attr_keepalive_delay_ms_read, + .store = o2nm_cluster_attr_keepalive_delay_ms_write, +}; + +static struct o2nm_cluster_attribute o2nm_cluster_attr_reconnect_delay_ms = { + .attr = { .ca_owner = THIS_MODULE, + .ca_name = "reconnect_delay_ms", + .ca_mode = S_IRUGO | S_IWUSR }, + .show = o2nm_cluster_attr_reconnect_delay_ms_read, + .store = o2nm_cluster_attr_reconnect_delay_ms_write, +}; + +static struct configfs_attribute *o2nm_cluster_attrs[] = { + &o2nm_cluster_attr_idle_timeout_ms.attr, + &o2nm_cluster_attr_keepalive_delay_ms.attr, + &o2nm_cluster_attr_reconnect_delay_ms.attr, + NULL, +}; +static ssize_t o2nm_cluster_show(struct config_item *item, + struct configfs_attribute *attr, + char *page) +{ + struct o2nm_cluster *cluster = to_o2nm_cluster(item); + struct o2nm_cluster_attribute *o2nm_cluster_attr = + container_of(attr, struct o2nm_cluster_attribute, attr); + ssize_t ret = 0; + + if (o2nm_cluster_attr->show) + ret = o2nm_cluster_attr->show(cluster, page); + return ret; +} + +static ssize_t o2nm_cluster_store(struct config_item *item, + struct configfs_attribute *attr, + const char *page, size_t count) +{ + struct o2nm_cluster *cluster = to_o2nm_cluster(item); + struct o2nm_cluster_attribute *o2nm_cluster_attr = + container_of(attr, struct o2nm_cluster_attribute, attr); + ssize_t ret; + + if (o2nm_cluster_attr->store == NULL) { + ret = -EINVAL; + goto out; + } + + ret = o2nm_cluster_attr->store(cluster, page, count); + if (ret < count) + goto out; +out: + return ret; +} + static struct config_item *o2nm_node_group_make_item(struct config_group *group, const char *name) { @@ -552,7 +714,7 @@ static struct config_item *o2nm_node_group_make_item(struct config_group *group, if (strlen(name) > O2NM_MAX_NAME_LEN) goto out; /* ENAMETOOLONG */ - node = kcalloc(1, sizeof(struct o2nm_node), GFP_KERNEL); + node = kzalloc(sizeof(struct o2nm_node), GFP_KERNEL); if (node == NULL) goto out; /* ENOMEM */ @@ -624,10 +786,13 @@ static void o2nm_cluster_release(struct config_item *item) static struct configfs_item_operations o2nm_cluster_item_ops = { .release = o2nm_cluster_release, + .show_attribute = o2nm_cluster_show, + .store_attribute = o2nm_cluster_store, }; static struct config_item_type o2nm_cluster_type = { .ct_item_ops = &o2nm_cluster_item_ops, + .ct_attrs = o2nm_cluster_attrs, .ct_owner = THIS_MODULE, }; @@ -660,8 +825,8 @@ static struct config_group *o2nm_cluster_group_make_group(struct config_group *g if (o2nm_single_cluster) goto out; /* ENOSPC */ - cluster = kcalloc(1, sizeof(struct o2nm_cluster), GFP_KERNEL); - ns = kcalloc(1, sizeof(struct o2nm_node_group), GFP_KERNEL); + cluster = kzalloc(sizeof(struct o2nm_cluster), GFP_KERNEL); + ns = kzalloc(sizeof(struct o2nm_node_group), GFP_KERNEL); defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL); o2hb_group = o2hb_alloc_hb_set(); if (cluster == NULL || ns == NULL || o2hb_group == NULL || defs == NULL) @@ -678,6 +843,9 @@ static struct config_group *o2nm_cluster_group_make_group(struct config_group *g cluster->cl_group.default_groups[2] = NULL; rwlock_init(&cluster->cl_nodes_lock); cluster->cl_node_ip_tree = RB_ROOT; + cluster->cl_reconnect_delay_ms = O2NET_RECONNECT_DELAY_MS_DEFAULT; + cluster->cl_idle_timeout_ms = O2NET_IDLE_TIMEOUT_MS_DEFAULT; + cluster->cl_keepalive_delay_ms = O2NET_KEEPALIVE_DELAY_MS_DEFAULT; ret = &cluster->cl_group; o2nm_single_cluster = cluster; diff --git a/fs/ocfs2/cluster/nodemanager.h b/fs/ocfs2/cluster/nodemanager.h index fce8033c310f..8fb23cacc2f5 100644 --- a/fs/ocfs2/cluster/nodemanager.h +++ b/fs/ocfs2/cluster/nodemanager.h @@ -53,6 +53,23 @@ struct o2nm_node { unsigned long nd_set_attributes; }; +struct o2nm_cluster { + struct config_group cl_group; + unsigned cl_has_local:1; + u8 cl_local_node; + rwlock_t cl_nodes_lock; + struct o2nm_node *cl_nodes[O2NM_MAX_NODES]; + struct rb_root cl_node_ip_tree; + unsigned int cl_idle_timeout_ms; + unsigned int cl_keepalive_delay_ms; + unsigned int cl_reconnect_delay_ms; + + /* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */ + unsigned long cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; +}; + +extern struct o2nm_cluster *o2nm_single_cluster; + u8 o2nm_this_node(void); int o2nm_configured_node_map(unsigned long *map, unsigned bytes); diff --git a/fs/ocfs2/cluster/quorum.c b/fs/ocfs2/cluster/quorum.c index 7bba98fbfc15..4705d659fe57 100644 --- a/fs/ocfs2/cluster/quorum.c +++ b/fs/ocfs2/cluster/quorum.c @@ -88,7 +88,7 @@ void o2quo_disk_timeout(void) o2quo_fence_self(); } -static void o2quo_make_decision(void *arg) +static void o2quo_make_decision(struct work_struct *work) { int quorum; int lowest_hb, lowest_reachable = 0, fence = 0; @@ -306,7 +306,7 @@ void o2quo_init(void) struct o2quo_state *qs = &o2quo_state; spin_lock_init(&qs->qs_lock); - INIT_WORK(&qs->qs_work, o2quo_make_decision, NULL); + INIT_WORK(&qs->qs_work, o2quo_make_decision); } void o2quo_exit(void) diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c index b650efa8c8be..ae4ff4a6636b 100644 --- a/fs/ocfs2/cluster/tcp.c +++ b/fs/ocfs2/cluster/tcp.c @@ -140,13 +140,35 @@ static int o2net_sys_err_translations[O2NET_ERR_MAX] = [O2NET_ERR_DIED] = -EHOSTDOWN,}; /* can't quite avoid *all* internal declarations :/ */ -static void o2net_sc_connect_completed(void *arg); -static void o2net_rx_until_empty(void *arg); -static void o2net_shutdown_sc(void *arg); +static void o2net_sc_connect_completed(struct work_struct *work); +static void o2net_rx_until_empty(struct work_struct *work); +static void o2net_shutdown_sc(struct work_struct *work); static void o2net_listen_data_ready(struct sock *sk, int bytes); -static void o2net_sc_send_keep_req(void *arg); +static void o2net_sc_send_keep_req(struct work_struct *work); static void o2net_idle_timer(unsigned long data); static void o2net_sc_postpone_idle(struct o2net_sock_container *sc); +static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc); + +/* + * FIXME: These should use to_o2nm_cluster_from_node(), but we end up + * losing our parent link to the cluster during shutdown. This can be + * solved by adding a pre-removal callback to configfs, or passing + * around the cluster with the node. -jeffm + */ +static inline int o2net_reconnect_delay(struct o2nm_node *node) +{ + return o2nm_single_cluster->cl_reconnect_delay_ms; +} + +static inline int o2net_keepalive_delay(struct o2nm_node *node) +{ + return o2nm_single_cluster->cl_keepalive_delay_ms; +} + +static inline int o2net_idle_timeout(struct o2nm_node *node) +{ + return o2nm_single_cluster->cl_idle_timeout_ms; +} static inline int o2net_sys_err_to_errno(enum o2net_system_error err) { @@ -271,6 +293,8 @@ static void sc_kref_release(struct kref *kref) { struct o2net_sock_container *sc = container_of(kref, struct o2net_sock_container, sc_kref); + BUG_ON(timer_pending(&sc->sc_idle_timeout)); + sclog(sc, "releasing\n"); if (sc->sc_sock) { @@ -300,7 +324,7 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node) struct page *page = NULL; page = alloc_page(GFP_NOFS); - sc = kcalloc(1, sizeof(*sc), GFP_NOFS); + sc = kzalloc(sizeof(*sc), GFP_NOFS); if (sc == NULL || page == NULL) goto out; @@ -308,10 +332,10 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node) o2nm_node_get(node); sc->sc_node = node; - INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed, sc); - INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty, sc); - INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc, sc); - INIT_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req, sc); + INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed); + INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty); + INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc); + INIT_DELAYED_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req); init_timer(&sc->sc_idle_timeout); sc->sc_idle_timeout.function = o2net_idle_timer; @@ -342,7 +366,7 @@ static void o2net_sc_queue_work(struct o2net_sock_container *sc, sc_put(sc); } static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc, - struct work_struct *work, + struct delayed_work *work, int delay) { sc_get(sc); @@ -350,12 +374,19 @@ static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc, sc_put(sc); } static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc, - struct work_struct *work) + struct delayed_work *work) { if (cancel_delayed_work(work)) sc_put(sc); } +static atomic_t o2net_connected_peers = ATOMIC_INIT(0); + +int o2net_num_connected_peers(void) +{ + return atomic_read(&o2net_connected_peers); +} + static void o2net_set_nn_state(struct o2net_node *nn, struct o2net_sock_container *sc, unsigned valid, int err) @@ -366,6 +397,11 @@ static void o2net_set_nn_state(struct o2net_node *nn, assert_spin_locked(&nn->nn_lock); + if (old_sc && !sc) + atomic_dec(&o2net_connected_peers); + else if (!old_sc && sc) + atomic_inc(&o2net_connected_peers); + /* the node num comparison and single connect/accept path should stop * an non-null sc from being overwritten with another */ BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc); @@ -424,9 +460,9 @@ static void o2net_set_nn_state(struct o2net_node *nn, /* delay if we're withing a RECONNECT_DELAY of the * last attempt */ delay = (nn->nn_last_connect_attempt + - msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS)) + msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node))) - jiffies; - if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS)) + if (delay > msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node))) delay = 0; mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay); queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay); @@ -564,9 +600,11 @@ static void o2net_ensure_shutdown(struct o2net_node *nn, * ourselves as state_change couldn't get the nn_lock and call set_nn_state * itself. */ -static void o2net_shutdown_sc(void *arg) +static void o2net_shutdown_sc(struct work_struct *work) { - struct o2net_sock_container *sc = arg; + struct o2net_sock_container *sc = + container_of(work, struct o2net_sock_container, + sc_shutdown_work); struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); sclog(sc, "shutting down\n"); @@ -676,7 +714,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, goto out; } - nmh = kcalloc(1, sizeof(struct o2net_msg_handler), GFP_NOFS); + nmh = kzalloc(sizeof(struct o2net_msg_handler), GFP_NOFS); if (nmh == NULL) { ret = -ENOMEM; goto out; @@ -1097,13 +1135,51 @@ static int o2net_check_handshake(struct o2net_sock_container *sc) return -1; } + /* + * Ensure timeouts are consistent with other nodes, otherwise + * we can end up with one node thinking that the other must be down, + * but isn't. This can ultimately cause corruption. + */ + if (be32_to_cpu(hand->o2net_idle_timeout_ms) != + o2net_idle_timeout(sc->sc_node)) { + mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of " + "%u ms, but we use %u ms locally. disconnecting\n", + SC_NODEF_ARGS(sc), + be32_to_cpu(hand->o2net_idle_timeout_ms), + o2net_idle_timeout(sc->sc_node)); + o2net_ensure_shutdown(nn, sc, -ENOTCONN); + return -1; + } + + if (be32_to_cpu(hand->o2net_keepalive_delay_ms) != + o2net_keepalive_delay(sc->sc_node)) { + mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of " + "%u ms, but we use %u ms locally. disconnecting\n", + SC_NODEF_ARGS(sc), + be32_to_cpu(hand->o2net_keepalive_delay_ms), + o2net_keepalive_delay(sc->sc_node)); + o2net_ensure_shutdown(nn, sc, -ENOTCONN); + return -1; + } + + if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) != + O2HB_MAX_WRITE_TIMEOUT_MS) { + mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of " + "%u ms, but we use %u ms locally. disconnecting\n", + SC_NODEF_ARGS(sc), + be32_to_cpu(hand->o2hb_heartbeat_timeout_ms), + O2HB_MAX_WRITE_TIMEOUT_MS); + o2net_ensure_shutdown(nn, sc, -ENOTCONN); + return -1; + } + sc->sc_handshake_ok = 1; spin_lock(&nn->nn_lock); /* set valid and queue the idle timers only if it hasn't been * shut down already */ if (nn->nn_sc == sc) { - o2net_sc_postpone_idle(sc); + o2net_sc_reset_idle_timer(sc); o2net_set_nn_state(nn, sc, 1, 0); } spin_unlock(&nn->nn_lock); @@ -1129,6 +1205,23 @@ static int o2net_advance_rx(struct o2net_sock_container *sc) sclog(sc, "receiving\n"); do_gettimeofday(&sc->sc_tv_advance_start); + if (unlikely(sc->sc_handshake_ok == 0)) { + if(sc->sc_page_off < sizeof(struct o2net_handshake)) { + data = page_address(sc->sc_page) + sc->sc_page_off; + datalen = sizeof(struct o2net_handshake) - sc->sc_page_off; + ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); + if (ret > 0) + sc->sc_page_off += ret; + } + + if (sc->sc_page_off == sizeof(struct o2net_handshake)) { + o2net_check_handshake(sc); + if (unlikely(sc->sc_handshake_ok == 0)) + ret = -EPROTO; + } + goto out; + } + /* do we need more header? */ if (sc->sc_page_off < sizeof(struct o2net_msg)) { data = page_address(sc->sc_page) + sc->sc_page_off; @@ -1136,15 +1229,6 @@ static int o2net_advance_rx(struct o2net_sock_container *sc) ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); if (ret > 0) { sc->sc_page_off += ret; - - /* this working relies on the handshake being - * smaller than the normal message header */ - if (sc->sc_page_off >= sizeof(struct o2net_handshake)&& - !sc->sc_handshake_ok && o2net_check_handshake(sc)) { - ret = -EPROTO; - goto out; - } - /* only swab incoming here.. we can * only get here once as we cross from * being under to over */ @@ -1201,9 +1285,10 @@ out: /* this work func is triggerd by data ready. it reads until it can read no * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing * our work the work struct will be marked and we'll be called again. */ -static void o2net_rx_until_empty(void *arg) +static void o2net_rx_until_empty(struct work_struct *work) { - struct o2net_sock_container *sc = arg; + struct o2net_sock_container *sc = + container_of(work, struct o2net_sock_container, sc_rx_work); int ret; do { @@ -1245,26 +1330,43 @@ static int o2net_set_nodelay(struct socket *sock) return ret; } +static void o2net_initialize_handshake(void) +{ + o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32( + O2HB_MAX_WRITE_TIMEOUT_MS); + o2net_hand->o2net_idle_timeout_ms = cpu_to_be32( + o2net_idle_timeout(NULL)); + o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32( + o2net_keepalive_delay(NULL)); + o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32( + o2net_reconnect_delay(NULL)); +} + /* ------------------------------------------------------------ */ /* called when a connect completes and after a sock is accepted. the * rx path will see the response and mark the sc valid */ -static void o2net_sc_connect_completed(void *arg) +static void o2net_sc_connect_completed(struct work_struct *work) { - struct o2net_sock_container *sc = arg; + struct o2net_sock_container *sc = + container_of(work, struct o2net_sock_container, + sc_connect_work); mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n", (unsigned long long)O2NET_PROTOCOL_VERSION, (unsigned long long)be64_to_cpu(o2net_hand->connector_id)); + o2net_initialize_handshake(); o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); sc_put(sc); } /* this is called as a work_struct func. */ -static void o2net_sc_send_keep_req(void *arg) +static void o2net_sc_send_keep_req(struct work_struct *work) { - struct o2net_sock_container *sc = arg; + struct o2net_sock_container *sc = + container_of(work, struct o2net_sock_container, + sc_keepalive_work.work); o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req)); sc_put(sc); @@ -1280,8 +1382,10 @@ static void o2net_idle_timer(unsigned long data) do_gettimeofday(&now); - printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for 10 " - "seconds, shutting it down.\n", SC_NODEF_ARGS(sc)); + printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for %u.%u " + "seconds, shutting it down.\n", SC_NODEF_ARGS(sc), + o2net_idle_timeout(sc->sc_node) / 1000, + o2net_idle_timeout(sc->sc_node) % 1000); mlog(ML_NOTICE, "here are some times that might help debug the " "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv " "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n", @@ -1299,14 +1403,21 @@ static void o2net_idle_timer(unsigned long data) o2net_sc_queue_work(sc, &sc->sc_shutdown_work); } -static void o2net_sc_postpone_idle(struct o2net_sock_container *sc) +static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc) { o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work); o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work, - O2NET_KEEPALIVE_DELAY_SECS * HZ); + msecs_to_jiffies(o2net_keepalive_delay(sc->sc_node))); do_gettimeofday(&sc->sc_tv_timer); mod_timer(&sc->sc_idle_timeout, - jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ)); + jiffies + msecs_to_jiffies(o2net_idle_timeout(sc->sc_node))); +} + +static void o2net_sc_postpone_idle(struct o2net_sock_container *sc) +{ + /* Only push out an existing timer */ + if (timer_pending(&sc->sc_idle_timeout)) + o2net_sc_reset_idle_timer(sc); } /* this work func is kicked whenever a path sets the nn state which doesn't @@ -1314,14 +1425,15 @@ static void o2net_sc_postpone_idle(struct o2net_sock_container *sc) * having a connect attempt fail, etc. This centralizes the logic which decides * if a connect attempt should be made or if we should give up and all future * transmit attempts should fail */ -static void o2net_start_connect(void *arg) +static void o2net_start_connect(struct work_struct *work) { - struct o2net_node *nn = arg; + struct o2net_node *nn = + container_of(work, struct o2net_node, nn_connect_work.work); struct o2net_sock_container *sc = NULL; struct o2nm_node *node = NULL, *mynode = NULL; struct socket *sock = NULL; struct sockaddr_in myaddr = {0, }, remoteaddr = {0, }; - int ret = 0; + int ret = 0, stop; /* if we're greater we initiate tx, otherwise we accept */ if (o2nm_this_node() <= o2net_num_from_nn(nn)) @@ -1342,10 +1454,9 @@ static void o2net_start_connect(void *arg) spin_lock(&nn->nn_lock); /* see if we already have one pending or have given up */ - if (nn->nn_sc || nn->nn_persistent_error) - arg = NULL; + stop = (nn->nn_sc || nn->nn_persistent_error); spin_unlock(&nn->nn_lock); - if (arg == NULL) /* *shrug*, needed some indicator */ + if (stop) goto out; nn->nn_last_connect_attempt = jiffies; @@ -1421,24 +1532,29 @@ out: return; } -static void o2net_connect_expired(void *arg) +static void o2net_connect_expired(struct work_struct *work) { - struct o2net_node *nn = arg; + struct o2net_node *nn = + container_of(work, struct o2net_node, nn_connect_expired.work); spin_lock(&nn->nn_lock); if (!nn->nn_sc_valid) { + struct o2nm_node *node = nn->nn_sc->sc_node; mlog(ML_ERROR, "no connection established with node %u after " - "%u seconds, giving up and returning errors.\n", - o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS); + "%u.%u seconds, giving up and returning errors.\n", + o2net_num_from_nn(nn), + o2net_idle_timeout(node) / 1000, + o2net_idle_timeout(node) % 1000); o2net_set_nn_state(nn, NULL, 0, -ENOTCONN); } spin_unlock(&nn->nn_lock); } -static void o2net_still_up(void *arg) +static void o2net_still_up(struct work_struct *work) { - struct o2net_node *nn = arg; + struct o2net_node *nn = + container_of(work, struct o2net_node, nn_still_up.work); o2quo_hb_still_up(o2net_num_from_nn(nn)); } @@ -1469,6 +1585,8 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num, if (node_num != o2nm_this_node()) o2net_disconnect_node(node); + + BUG_ON(atomic_read(&o2net_connected_peers) < 0); } static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, @@ -1480,14 +1598,14 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, /* ensure an immediate connect attempt */ nn->nn_last_connect_attempt = jiffies - - (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1); + (msecs_to_jiffies(o2net_reconnect_delay(node)) + 1); if (node_num != o2nm_this_node()) { /* heartbeat doesn't work unless a local node number is * configured and doing so brings up the o2net_wq, so we can * use it.. */ queue_delayed_work(o2net_wq, &nn->nn_connect_expired, - O2NET_IDLE_TIMEOUT_SECS * HZ); + msecs_to_jiffies(o2net_idle_timeout(node))); /* believe it or not, accept and node hearbeating testing * can succeed for this node before we got here.. so @@ -1632,6 +1750,7 @@ static int o2net_accept_one(struct socket *sock) o2net_register_callbacks(sc->sc_sock->sk, sc); o2net_sc_queue_work(sc, &sc->sc_rx_work); + o2net_initialize_handshake(); o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); out: @@ -1644,9 +1763,9 @@ out: return ret; } -static void o2net_accept_many(void *arg) +static void o2net_accept_many(struct work_struct *work) { - struct socket *sock = arg; + struct socket *sock = o2net_listen_sock; while (o2net_accept_one(sock) == 0) cond_resched(); } @@ -1700,7 +1819,7 @@ static int o2net_open_listening_sock(__be16 port) write_unlock_bh(&sock->sk->sk_callback_lock); o2net_listen_sock = sock; - INIT_WORK(&o2net_listen_work, o2net_accept_many, sock); + INIT_WORK(&o2net_listen_work, o2net_accept_many); sock->sk->sk_reuse = 1; ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); @@ -1799,9 +1918,9 @@ int o2net_init(void) o2quo_init(); - o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL); - o2net_keep_req = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL); - o2net_keep_resp = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL); + o2net_hand = kzalloc(sizeof(struct o2net_handshake), GFP_KERNEL); + o2net_keep_req = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL); + o2net_keep_resp = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL); if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) { kfree(o2net_hand); kfree(o2net_keep_req); @@ -1819,9 +1938,10 @@ int o2net_init(void) struct o2net_node *nn = o2net_nn_from_num(i); spin_lock_init(&nn->nn_lock); - INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn); - INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn); - INIT_WORK(&nn->nn_still_up, o2net_still_up, nn); + INIT_DELAYED_WORK(&nn->nn_connect_work, o2net_start_connect); + INIT_DELAYED_WORK(&nn->nn_connect_expired, + o2net_connect_expired); + INIT_DELAYED_WORK(&nn->nn_still_up, o2net_still_up); /* until we see hb from a node we'll return einval */ nn->nn_persistent_error = -ENOTCONN; init_waitqueue_head(&nn->nn_sc_wq); diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h index 616ff2b8434a..21a4e43df836 100644 --- a/fs/ocfs2/cluster/tcp.h +++ b/fs/ocfs2/cluster/tcp.h @@ -54,6 +54,13 @@ typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data) #define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg)) +/* same as hb delay, we're waiting for another node to recognize our hb */ +#define O2NET_RECONNECT_DELAY_MS_DEFAULT 2000 + +#define O2NET_KEEPALIVE_DELAY_MS_DEFAULT 5000 +#define O2NET_IDLE_TIMEOUT_MS_DEFAULT 10000 + + /* TODO: figure this out.... */ static inline int o2net_link_down(int err, struct socket *sock) { @@ -101,6 +108,7 @@ void o2net_unregister_hb_callbacks(void); int o2net_start_listening(struct o2nm_node *node); void o2net_stop_listening(struct o2nm_node *node); void o2net_disconnect_node(struct o2nm_node *node); +int o2net_num_connected_peers(void); int o2net_init(void); void o2net_exit(void); diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h index 4b46aac7d243..b700dc9624d1 100644 --- a/fs/ocfs2/cluster/tcp_internal.h +++ b/fs/ocfs2/cluster/tcp_internal.h @@ -27,23 +27,20 @@ #define O2NET_MSG_KEEP_REQ_MAGIC ((u16)0xfa57) #define O2NET_MSG_KEEP_RESP_MAGIC ((u16)0xfa58) -/* same as hb delay, we're waiting for another node to recognize our hb */ -#define O2NET_RECONNECT_DELAY_MS O2HB_REGION_TIMEOUT_MS - /* we're delaying our quorum decision so that heartbeat will have timed * out truly dead nodes by the time we come around to making decisions * on their number */ #define O2NET_QUORUM_DELAY_MS ((o2hb_dead_threshold + 2) * O2HB_REGION_TIMEOUT_MS) -#define O2NET_KEEPALIVE_DELAY_SECS 5 -#define O2NET_IDLE_TIMEOUT_SECS 10 - /* * This version number represents quite a lot, unfortunately. It not * only represents the raw network message protocol on the wire but also * locking semantics of the file system using the protocol. It should * be somewhere else, I'm sure, but right now it isn't. * + * New in version 5: + * - Network timeout checking protocol + * * New in version 4: * - Remove i_generation from lock names for better stat performance. * @@ -54,10 +51,14 @@ * - full 64 bit i_size in the metadata lock lvbs * - introduction of "rw" lock and pushing meta/data locking down */ -#define O2NET_PROTOCOL_VERSION 4ULL +#define O2NET_PROTOCOL_VERSION 5ULL struct o2net_handshake { __be64 protocol_version; __be64 connector_id; + __be32 o2hb_heartbeat_timeout_ms; + __be32 o2net_idle_timeout_ms; + __be32 o2net_keepalive_delay_ms; + __be32 o2net_reconnect_delay_ms; }; struct o2net_node { @@ -86,18 +87,18 @@ struct o2net_node { * connect attempt fails and so can be self-arming. shutdown is * careful to first mark the nn such that no connects will be attempted * before canceling delayed connect work and flushing the queue. */ - struct work_struct nn_connect_work; + struct delayed_work nn_connect_work; unsigned long nn_last_connect_attempt; /* this is queued as nodes come up and is canceled when a connection is * established. this expiring gives up on the node and errors out * transmits */ - struct work_struct nn_connect_expired; + struct delayed_work nn_connect_expired; /* after we give up on a socket we wait a while before deciding * that it is still heartbeating and that we should do some * quorum work */ - struct work_struct nn_still_up; + struct delayed_work nn_still_up; }; struct o2net_sock_container { @@ -129,7 +130,7 @@ struct o2net_sock_container { struct work_struct sc_shutdown_work; struct timer_list sc_idle_timeout; - struct work_struct sc_keepalive_work; + struct delayed_work sc_keepalive_work; unsigned sc_handshake_ok:1; |