diff options
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/Kconfig | 1 | ||||
-rw-r--r-- | fs/dlm/ast.c | 2 | ||||
-rw-r--r-- | fs/dlm/config.c | 175 | ||||
-rw-r--r-- | fs/dlm/config.h | 28 | ||||
-rw-r--r-- | fs/dlm/lock.c | 123 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 2 | ||||
-rw-r--r-- | fs/dlm/lowcomms.c | 22 | ||||
-rw-r--r-- | fs/dlm/member.c | 2 | ||||
-rw-r--r-- | fs/dlm/recover.c | 35 | ||||
-rw-r--r-- | fs/dlm/recoverd.c | 2 |
10 files changed, 213 insertions, 179 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig index f82a4952769d..b46165df5a91 100644 --- a/fs/dlm/Kconfig +++ b/fs/dlm/Kconfig @@ -3,7 +3,6 @@ menuconfig DLM tristate "Distributed Lock Manager (DLM)" depends on INET depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n) - select IP_SCTP help A general purpose distributed lock manager for kernel or userspace applications. diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 742b30b61c19..0fe8d80ce5e8 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -30,7 +30,7 @@ static void dlm_run_callback(uint32_t ls_id, uint32_t lkb_id, int8_t mode, trace_dlm_bast(ls_id, lkb_id, mode, res_name, res_length); bastfn(astparam, mode); } else if (flags & DLM_CB_CAST) { - trace_dlm_ast(ls_id, lkb_id, sb_status, sb_flags, res_name, + trace_dlm_ast(ls_id, lkb_id, sb_flags, sb_status, res_name, res_length); lksb->sb_status = sb_status; lksb->sb_flags = sb_flags; diff --git a/fs/dlm/config.c b/fs/dlm/config.c index eac96f1c1d74..a23fd524a6ee 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -24,9 +24,9 @@ #include "lowcomms.h" /* - * /config/dlm/<cluster>/spaces/<space>/nodes/<node>/nodeid + * /config/dlm/<cluster>/spaces/<space>/nodes/<node>/nodeid (refers to <node>) * /config/dlm/<cluster>/spaces/<space>/nodes/<node>/weight - * /config/dlm/<cluster>/comms/<comm>/nodeid + * /config/dlm/<cluster>/comms/<comm>/nodeid (refers to <comm>) * /config/dlm/<cluster>/comms/<comm>/local * /config/dlm/<cluster>/comms/<comm>/addr (write only) * /config/dlm/<cluster>/comms/<comm>/addr_list (read only) @@ -73,20 +73,6 @@ const struct rhashtable_params dlm_rhash_rsb_params = { struct dlm_cluster { struct config_group group; - unsigned int cl_tcp_port; - unsigned int cl_buffer_size; - unsigned int cl_rsbtbl_size; - unsigned int cl_recover_timer; - unsigned int cl_toss_secs; - unsigned int cl_scan_secs; - unsigned int cl_log_debug; - unsigned int cl_log_info; - unsigned int cl_protocol; - unsigned int cl_mark; - unsigned int cl_new_rsb_count; - unsigned int cl_recover_callbacks; - char cl_cluster_name[DLM_LOCKSPACE_LEN]; - struct dlm_spaces *sps; struct dlm_comms *cms; }; @@ -115,25 +101,60 @@ enum { static ssize_t cluster_cluster_name_show(struct config_item *item, char *buf) { - struct dlm_cluster *cl = config_item_to_cluster(item); - return sprintf(buf, "%s\n", cl->cl_cluster_name); + return sprintf(buf, "%s\n", dlm_config.ci_cluster_name); } static ssize_t cluster_cluster_name_store(struct config_item *item, const char *buf, size_t len) { - struct dlm_cluster *cl = config_item_to_cluster(item); - strscpy(dlm_config.ci_cluster_name, buf, - sizeof(dlm_config.ci_cluster_name)); - strscpy(cl->cl_cluster_name, buf, sizeof(cl->cl_cluster_name)); + sizeof(dlm_config.ci_cluster_name)); return len; } CONFIGFS_ATTR(cluster_, cluster_name); -static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, - int *info_field, int (*check_cb)(unsigned int x), +static ssize_t cluster_tcp_port_show(struct config_item *item, char *buf) +{ + return sprintf(buf, "%u\n", be16_to_cpu(dlm_config.ci_tcp_port)); +} + +static int dlm_check_zero_and_dlm_running(unsigned int x) +{ + if (!x) + return -EINVAL; + + if (dlm_lowcomms_is_running()) + return -EBUSY; + + return 0; +} + +static ssize_t cluster_tcp_port_store(struct config_item *item, + const char *buf, size_t len) +{ + int rc; + u16 x; + + if (!capable(CAP_SYS_ADMIN)) + return -EPERM; + + rc = kstrtou16(buf, 0, &x); + if (rc) + return rc; + + rc = dlm_check_zero_and_dlm_running(x); + if (rc) + return rc; + + dlm_config.ci_tcp_port = cpu_to_be16(x); + return len; +} + +CONFIGFS_ATTR(cluster_, tcp_port); + +static ssize_t cluster_set(unsigned int *info_field, + int (*check_cb)(unsigned int x), const char *buf, size_t len) { unsigned int x; @@ -151,7 +172,6 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, return rc; } - *cl_field = x; *info_field = x; return len; @@ -161,14 +181,11 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, static ssize_t cluster_##name##_store(struct config_item *item, \ const char *buf, size_t len) \ { \ - struct dlm_cluster *cl = config_item_to_cluster(item); \ - return cluster_set(cl, &cl->cl_##name, &dlm_config.ci_##name, \ - check_cb, buf, len); \ + return cluster_set(&dlm_config.ci_##name, check_cb, buf, len); \ } \ static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \ { \ - struct dlm_cluster *cl = config_item_to_cluster(item); \ - return snprintf(buf, PAGE_SIZE, "%u\n", cl->cl_##name); \ + return snprintf(buf, PAGE_SIZE, "%u\n", dlm_config.ci_##name); \ } \ CONFIGFS_ATTR(cluster_, name); @@ -180,6 +197,9 @@ static int dlm_check_protocol_and_dlm_running(unsigned int x) break; case 1: /* SCTP */ + if (!IS_ENABLED(CONFIG_IP_SCTP)) + return -EOPNOTSUPP; + break; default: return -EINVAL; @@ -191,17 +211,6 @@ static int dlm_check_protocol_and_dlm_running(unsigned int x) return 0; } -static int dlm_check_zero_and_dlm_running(unsigned int x) -{ - if (!x) - return -EINVAL; - - if (dlm_lowcomms_is_running()) - return -EBUSY; - - return 0; -} - static int dlm_check_zero(unsigned int x) { if (!x) @@ -218,7 +227,6 @@ static int dlm_check_buffer_size(unsigned int x) return 0; } -CLUSTER_ATTR(tcp_port, dlm_check_zero_and_dlm_running); CLUSTER_ATTR(buffer_size, dlm_check_buffer_size); CLUSTER_ATTR(rsbtbl_size, dlm_check_zero); CLUSTER_ATTR(recover_timer, dlm_check_zero); @@ -423,20 +431,6 @@ static struct config_group *make_cluster(struct config_group *g, configfs_add_default_group(&sps->ss_group, &cl->group); configfs_add_default_group(&cms->cs_group, &cl->group); - cl->cl_tcp_port = dlm_config.ci_tcp_port; - cl->cl_buffer_size = dlm_config.ci_buffer_size; - cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size; - cl->cl_recover_timer = dlm_config.ci_recover_timer; - cl->cl_toss_secs = dlm_config.ci_toss_secs; - cl->cl_scan_secs = dlm_config.ci_scan_secs; - cl->cl_log_debug = dlm_config.ci_log_debug; - cl->cl_log_info = dlm_config.ci_log_info; - cl->cl_protocol = dlm_config.ci_protocol; - cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count; - cl->cl_recover_callbacks = dlm_config.ci_recover_callbacks; - memcpy(cl->cl_cluster_name, dlm_config.ci_cluster_name, - DLM_LOCKSPACE_LEN); - space_list = &sps->ss_group; comm_list = &cms->cs_group; return &cl->group; @@ -517,6 +511,12 @@ static void release_space(struct config_item *i) static struct config_item *make_comm(struct config_group *g, const char *name) { struct dlm_comm *cm; + unsigned int nodeid; + int rv; + + rv = kstrtouint(name, 0, &nodeid); + if (rv) + return ERR_PTR(rv); cm = kzalloc(sizeof(struct dlm_comm), GFP_NOFS); if (!cm) @@ -528,7 +528,7 @@ static struct config_item *make_comm(struct config_group *g, const char *name) if (!cm->seq) cm->seq = dlm_comm_count++; - cm->nodeid = -1; + cm->nodeid = nodeid; cm->local = 0; cm->addr_count = 0; cm->mark = 0; @@ -555,16 +555,25 @@ static void release_comm(struct config_item *i) static struct config_item *make_node(struct config_group *g, const char *name) { struct dlm_space *sp = config_item_to_space(g->cg_item.ci_parent); + unsigned int nodeid; struct dlm_node *nd; + uint32_t seq = 0; + int rv; + + rv = kstrtouint(name, 0, &nodeid); + if (rv) + return ERR_PTR(rv); nd = kzalloc(sizeof(struct dlm_node), GFP_NOFS); if (!nd) return ERR_PTR(-ENOMEM); config_item_init_type_name(&nd->item, name, &node_type); - nd->nodeid = -1; + nd->nodeid = nodeid; nd->weight = 1; /* default weight of 1 if none is set */ nd->new = 1; /* set to 0 once it's been read by dlm_nodeid_list() */ + dlm_comm_seq(nodeid, &seq, true); + nd->comm_seq = seq; mutex_lock(&sp->members_lock); list_add(&nd->list, &sp->members); @@ -622,16 +631,19 @@ void dlm_config_exit(void) static ssize_t comm_nodeid_show(struct config_item *item, char *buf) { - return sprintf(buf, "%d\n", config_item_to_comm(item)->nodeid); + unsigned int nodeid; + int rv; + + rv = kstrtouint(config_item_name(item), 0, &nodeid); + if (WARN_ON(rv)) + return rv; + + return sprintf(buf, "%u\n", nodeid); } static ssize_t comm_nodeid_store(struct config_item *item, const char *buf, size_t len) { - int rc = kstrtoint(buf, 0, &config_item_to_comm(item)->nodeid); - - if (rc) - return rc; return len; } @@ -772,20 +784,19 @@ static struct configfs_attribute *comm_attrs[] = { static ssize_t node_nodeid_show(struct config_item *item, char *buf) { - return sprintf(buf, "%d\n", config_item_to_node(item)->nodeid); + unsigned int nodeid; + int rv; + + rv = kstrtouint(config_item_name(item), 0, &nodeid); + if (WARN_ON(rv)) + return rv; + + return sprintf(buf, "%u\n", nodeid); } static ssize_t node_nodeid_store(struct config_item *item, const char *buf, size_t len) { - struct dlm_node *nd = config_item_to_node(item); - uint32_t seq = 0; - int rc = kstrtoint(buf, 0, &nd->nodeid); - - if (rc) - return rc; - dlm_comm_seq(nd->nodeid, &seq); - nd->comm_seq = seq; return len; } @@ -845,7 +856,7 @@ static struct dlm_comm *get_comm(int nodeid) if (!comm_list) return NULL; - mutex_lock(&clusters_root.subsys.su_mutex); + WARN_ON_ONCE(!mutex_is_locked(&clusters_root.subsys.su_mutex)); list_for_each_entry(i, &comm_list->cg_children, ci_entry) { cm = config_item_to_comm(i); @@ -856,7 +867,6 @@ static struct dlm_comm *get_comm(int nodeid) config_item_get(i); break; } - mutex_unlock(&clusters_root.subsys.su_mutex); if (!found) cm = NULL; @@ -916,11 +926,20 @@ int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, return rv; } -int dlm_comm_seq(int nodeid, uint32_t *seq) +int dlm_comm_seq(int nodeid, uint32_t *seq, bool locked) { - struct dlm_comm *cm = get_comm(nodeid); + struct dlm_comm *cm; + + if (locked) { + cm = get_comm(nodeid); + } else { + mutex_lock(&clusters_root.subsys.su_mutex); + cm = get_comm(nodeid); + mutex_unlock(&clusters_root.subsys.su_mutex); + } if (!cm) - return -EEXIST; + return -ENOENT; + *seq = cm->seq; put_comm(cm); return 0; @@ -957,7 +976,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) #define DEFAULT_CLUSTER_NAME "" struct dlm_config_info dlm_config = { - .ci_tcp_port = DEFAULT_TCP_PORT, + .ci_tcp_port = cpu_to_be16(DEFAULT_TCP_PORT), .ci_buffer_size = DLM_MAX_SOCKET_BUFSIZE, .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE, .ci_recover_timer = DEFAULT_RECOVER_TIMER, diff --git a/fs/dlm/config.h b/fs/dlm/config.h index ed237d910208..13a3d0b26194 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -23,24 +23,24 @@ struct dlm_config_node { extern const struct rhashtable_params dlm_rhash_rsb_params; -#define DLM_MAX_ADDR_COUNT 3 +#define DLM_MAX_ADDR_COUNT 8 #define DLM_PROTO_TCP 0 #define DLM_PROTO_SCTP 1 struct dlm_config_info { - int ci_tcp_port; - int ci_buffer_size; - int ci_rsbtbl_size; - int ci_recover_timer; - int ci_toss_secs; - int ci_scan_secs; - int ci_log_debug; - int ci_log_info; - int ci_protocol; - int ci_mark; - int ci_new_rsb_count; - int ci_recover_callbacks; + __be16 ci_tcp_port; + unsigned int ci_buffer_size; + unsigned int ci_rsbtbl_size; + unsigned int ci_recover_timer; + unsigned int ci_toss_secs; + unsigned int ci_scan_secs; + unsigned int ci_log_debug; + unsigned int ci_log_info; + unsigned int ci_protocol; + unsigned int ci_mark; + unsigned int ci_new_rsb_count; + unsigned int ci_recover_callbacks; char ci_cluster_name[DLM_LOCKSPACE_LEN]; }; @@ -50,7 +50,7 @@ int dlm_config_init(void); void dlm_config_exit(void); int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, int *count_out); -int dlm_comm_seq(int nodeid, uint32_t *seq); +int dlm_comm_seq(int nodeid, uint32_t *seq, bool locked); int dlm_our_nodeid(void); int dlm_our_addr(struct sockaddr_storage *addr, int num); diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 865dc70a9dfc..6dd3a524cd35 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -509,7 +509,7 @@ static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) void dlm_rsb_scan(struct timer_list *timer) { - struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer); + struct dlm_ls *ls = timer_container_of(ls, timer, ls_scan_timer); int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *r; int rv; @@ -741,6 +741,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, read_lock_bh(&ls->ls_rsbtbl_lock); if (!rsb_flag(r, RSB_HASHED)) { read_unlock_bh(&ls->ls_rsbtbl_lock); + error = -EBADR; goto do_new; } @@ -784,6 +785,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, } } else { write_unlock_bh(&ls->ls_rsbtbl_lock); + error = -EBADR; goto do_new; } @@ -824,9 +826,12 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, r->res_first_lkid = 0; } - /* A dir record will not be on the scan list. */ - if (r->res_dir_nodeid != our_nodeid) - del_scan(ls, r); + /* we always deactivate scan timer for the rsb, when + * we move it out of the inactive state as rsb state + * can be changed and scan timers are only for inactive + * rsbs. + */ + del_scan(ls, r); list_move(&r->res_slow_list, &ls->ls_slow_active); rsb_clear_flag(r, RSB_INACTIVE); kref_init(&r->res_ref); /* ref is now used in active state */ @@ -989,10 +994,10 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, r->res_nodeid = 0; } + del_scan(ls, r); list_move(&r->res_slow_list, &ls->ls_slow_active); rsb_clear_flag(r, RSB_INACTIVE); kref_init(&r->res_ref); - del_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); goto out; @@ -1337,9 +1342,13 @@ static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *na __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, r_nodeid, result); - /* A dir record rsb should never be on scan list. */ - /* Try to fix this with del_scan? */ - WARN_ON(!list_empty(&r->res_scan_list)); + /* A dir record rsb should never be on scan list. + * Except when we are the dir and master node. + * This function should only be called by the dir + * node. + */ + WARN_ON(!list_empty(&r->res_scan_list) && + r->res_master_nodeid != our_nodeid); write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -1430,16 +1439,23 @@ static void deactivate_rsb(struct kref *kref) list_move(&r->res_slow_list, &ls->ls_slow_inactive); /* - * When the rsb becomes unused: - * - If it's not a dir record for a remote master rsb, - * then it is put on the scan list to be freed. - * - If it's a dir record for a remote master rsb, - * then it is kept in the inactive state until - * receive_remove() from the master node. + * When the rsb becomes unused, there are two possibilities: + * 1. Leave the inactive rsb in place (don't remove it). + * 2. Add it to the scan list to be removed. + * + * 1 is done when the rsb is acting as the dir record + * for a remotely mastered rsb. The rsb must be left + * in place as an inactive rsb to act as the dir record. + * + * 2 is done when a) the rsb is not the master and not the + * dir record, b) when the rsb is both the master and the + * dir record, c) when the rsb is master but not dir record. + * + * (If no directory is used, the rsb can always be removed.) */ - if (!dlm_no_directory(ls) && - (r->res_master_nodeid != our_nodeid) && - (dlm_dir_nodeid(r) != our_nodeid)) + if (dlm_no_directory(ls) || + (r->res_master_nodeid == our_nodeid || + dlm_dir_nodeid(r) != our_nodeid)) add_scan(ls, r); if (r->res_lvbptr) { @@ -1703,19 +1719,11 @@ static int msg_reply_type(int mstype) /* add/remove lkb from global waiters list of lkb's waiting for a reply from a remote node */ -static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) +static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; - int error = 0; spin_lock_bh(&ls->ls_waiters_lock); - - if (is_overlap_unlock(lkb) || - (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { - error = -EINVAL; - goto out; - } - if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { switch (mstype) { case DLM_MSG_UNLOCK: @@ -1725,7 +1733,11 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); break; default: - error = -EBUSY; + /* should never happen as validate_lock_args() checks + * on lkb_wait_type and validate_unlock_args() only + * creates UNLOCK or CANCEL messages. + */ + WARN_ON_ONCE(1); goto out; } lkb->lkb_wait_count++; @@ -1747,12 +1759,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) hold_lkb(lkb); list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: - if (error) - log_error(ls, "addwait error %x %d flags %x %d %d %s", - lkb->lkb_id, error, dlm_iflags_val(lkb), mstype, - lkb->lkb_wait_type, lkb->lkb_resource->res_name); spin_unlock_bh(&ls->ls_waiters_lock); - return error; } /* We clear the RESEND flag because we might be taking an lkb off the waiters @@ -2861,16 +2868,14 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, case -EINVAL: /* annoy the user because dlm usage is wrong */ WARN_ON(1); - log_error(ls, "%s %d %x %x %x %d %d %s", __func__, + log_error(ls, "%s %d %x %x %x %d %d", __func__, rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, - lkb->lkb_status, lkb->lkb_wait_type, - lkb->lkb_resource->res_name); + lkb->lkb_status, lkb->lkb_wait_type); break; default: - log_debug(ls, "%s %d %x %x %x %d %d %s", __func__, + log_debug(ls, "%s %d %x %x %x %d %d", __func__, rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, - lkb->lkb_status, lkb->lkb_wait_type, - lkb->lkb_resource->res_name); + lkb->lkb_status, lkb->lkb_wait_type); break; } @@ -2928,13 +2933,16 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) goto out; } + if (is_overlap_unlock(lkb)) + goto out; + /* cancel not allowed with another cancel/unlock in progress */ if (args->flags & DLM_LKF_CANCEL) { if (lkb->lkb_exflags & DLM_LKF_CANCEL) goto out; - if (is_overlap(lkb)) + if (is_overlap_cancel(lkb)) goto out; if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { @@ -2972,9 +2980,6 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) goto out; - if (is_overlap_unlock(lkb)) - goto out; - if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); rv = -EBUSY; @@ -3610,10 +3615,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) to_nodeid = r->res_nodeid; - error = add_to_waiters(lkb, mstype, to_nodeid); - if (error) - return error; - + add_to_waiters(lkb, mstype, to_nodeid); error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); if (error) goto fail; @@ -3716,10 +3718,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) to_nodeid = dlm_dir_nodeid(r); - error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); - if (error) - return error; - + add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); if (error) goto fail; @@ -5016,16 +5015,19 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms_local) { if (middle_conversion(lkb)) { + log_rinfo(ls, "%s %x middle convert in progress", __func__, + lkb->lkb_id); + + /* We sent this lock to the new master. The new master will + * tell us when it's granted. We no longer need a reply, so + * use a fake reply to put the lkb into the right state. + */ hold_lkb(lkb); memset(ms_local, 0, sizeof(struct dlm_message)); ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); _receive_convert_reply(lkb, ms_local, true); - - /* Same special case as in receive_rcom_lock_args() */ - lkb->lkb_grmode = DLM_LOCK_IV; - rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); unhold_lkb(lkb); } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { @@ -5572,10 +5574,11 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, The real granted mode of these converting locks cannot be determined until all locks have been rebuilt on the rsb (recover_conversion) */ - if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && - middle_conversion(lkb)) { - rl->rl_status = DLM_LKSTS_CONVERT; - lkb->lkb_grmode = DLM_LOCK_IV; + if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) { + /* We may need to adjust grmode depending on other granted locks. */ + log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x", + __func__, lkb->lkb_id, lkb->lkb_grmode, + lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid); rsb_set_flag(r, RSB_RECOVER_CONVERT); } @@ -6344,8 +6347,8 @@ int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, if (error) return error; - error = add_to_waiters(lkb, mstype, to_nodeid); + add_to_waiters(lkb, mstype, to_nodeid); dlm_put_lkb(lkb); - return error; + return 0; } diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 8afac6e2dff0..1929327ffbe1 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -576,7 +576,7 @@ static int new_lockspace(const char *name, const char *cluster, lockspace to start running (via sysfs) in dlm_ls_start(). */ error = do_uevent(ls, 1); - if (error) + if (error < 0) goto out_recoverd; /* wait until recovery is successful or failed */ diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index cb3a10b041c2..e4373bce1bc2 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -160,6 +160,7 @@ struct dlm_proto_ops { bool try_new_addr; const char *name; int proto; + int how; void (*sockopts)(struct socket *sock); int (*bind)(struct socket *sock); @@ -462,7 +463,8 @@ static bool dlm_lowcomms_con_has_addr(const struct connection *con, int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr) { struct connection *con; - bool ret, idx; + bool ret; + int idx; idx = srcu_read_lock(&connections_srcu); con = nodeid2con(nodeid, GFP_NOFS); @@ -532,7 +534,7 @@ static void lowcomms_state_change(struct sock *sk) /* SCTP layer is not calling sk_data_ready when the connection * is done, so we catch the signal through here. */ - if (sk->sk_shutdown == RCV_SHUTDOWN) + if (sk->sk_shutdown & RCV_SHUTDOWN) lowcomms_data_ready(sk); } @@ -660,18 +662,18 @@ static void add_sock(struct socket *sock, struct connection *con) /* Add the port number to an IPv6 or 4 sockaddr and return the address length */ -static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, +static void make_sockaddr(struct sockaddr_storage *saddr, __be16 port, int *addr_len) { saddr->ss_family = dlm_local_addr[0].ss_family; if (saddr->ss_family == AF_INET) { struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; - in4_addr->sin_port = cpu_to_be16(port); + in4_addr->sin_port = port; *addr_len = sizeof(struct sockaddr_in); memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); } else { struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; - in6_addr->sin6_port = cpu_to_be16(port); + in6_addr->sin6_port = port; *addr_len = sizeof(struct sockaddr_in6); } memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); @@ -809,7 +811,7 @@ static void shutdown_connection(struct connection *con, bool and_other) return; } - ret = kernel_sock_shutdown(con->sock, SHUT_WR); + ret = kernel_sock_shutdown(con->sock, dlm_proto_ops->how); up_read(&con->sock_lock); if (ret) { log_print("Connection %p failed to shutdown: %d will force close", @@ -1121,7 +1123,7 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed) /* * sctp_bind_addrs - bind a SCTP socket to all our addresses */ -static int sctp_bind_addrs(struct socket *sock, uint16_t port) +static int sctp_bind_addrs(struct socket *sock, __be16 port) { struct sockaddr_storage localaddr; struct sockaddr *addr = (struct sockaddr *)&localaddr; @@ -1825,8 +1827,8 @@ static int dlm_tcp_listen_validate(void) { /* We don't support multi-homed hosts */ if (dlm_local_count > 1) { - log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); - return -EINVAL; + log_print("Detect multi-homed hosts but use only the first IP address."); + log_print("Try SCTP, if you want to enable multi-link."); } return 0; @@ -1857,6 +1859,7 @@ static int dlm_tcp_listen_bind(struct socket *sock) static const struct dlm_proto_ops dlm_tcp_ops = { .name = "TCP", .proto = IPPROTO_TCP, + .how = SHUT_WR, .sockopts = dlm_tcp_sockopts, .bind = dlm_tcp_bind, .listen_validate = dlm_tcp_listen_validate, @@ -1895,6 +1898,7 @@ static void dlm_sctp_sockopts(struct socket *sock) static const struct dlm_proto_ops dlm_sctp_ops = { .name = "SCTP", .proto = IPPROTO_SCTP, + .how = SHUT_RDWR, .try_new_addr = true, .sockopts = dlm_sctp_sockopts, .bind = dlm_sctp_bind, diff --git a/fs/dlm/member.c b/fs/dlm/member.c index c9661906568a..b0864c93230f 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -493,7 +493,7 @@ static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb) we consider the node to have failed (versus being removed due to dlm_release_lockspace) */ - error = dlm_comm_seq(memb->nodeid, &seq); + error = dlm_comm_seq(memb->nodeid, &seq, false); if (!error && seq == memb->comm_seq) return; diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index 2e1169c81c6e..be4240f09abd 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -811,33 +811,42 @@ static void recover_lvb(struct dlm_rsb *r) } /* All master rsb's flagged RECOVER_CONVERT need to be looked at. The locks - converting PR->CW or CW->PR need to have their lkb_grmode set. */ + * converting PR->CW or CW->PR may need to have their lkb_grmode changed. + */ static void recover_conversion(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; + uint32_t other_lkid = 0; + int other_grmode = -1; struct dlm_lkb *lkb; - int grmode = -1; list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { if (lkb->lkb_grmode == DLM_LOCK_PR || lkb->lkb_grmode == DLM_LOCK_CW) { - grmode = lkb->lkb_grmode; + other_grmode = lkb->lkb_grmode; + other_lkid = lkb->lkb_id; break; } } + if (other_grmode == -1) + return; + list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { - if (lkb->lkb_grmode != DLM_LOCK_IV) - continue; - if (grmode == -1) { - log_debug(ls, "recover_conversion %x set gr to rq %d", - lkb->lkb_id, lkb->lkb_rqmode); - lkb->lkb_grmode = lkb->lkb_rqmode; - } else { - log_debug(ls, "recover_conversion %x set gr %d", - lkb->lkb_id, grmode); - lkb->lkb_grmode = grmode; + /* Lock recovery created incompatible granted modes, so + * change the granted mode of the converting lock to + * NL. The rqmode of the converting lock should be CW, + * which means the converting lock should be granted at + * the end of recovery. + */ + if (((lkb->lkb_grmode == DLM_LOCK_PR) && (other_grmode == DLM_LOCK_CW)) || + ((lkb->lkb_grmode == DLM_LOCK_CW) && (other_grmode == DLM_LOCK_PR))) { + log_limit(ls, "%s %x gr %d rq %d, remote %d %x, other_lkid %u, other gr %d, set gr=NL", + __func__, lkb->lkb_id, lkb->lkb_grmode, + lkb->lkb_rqmode, lkb->lkb_nodeid, + lkb->lkb_remid, other_lkid, other_grmode); + lkb->lkb_grmode = DLM_LOCK_NL; } } } diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 34f4f9f49a6c..12272a8f6d75 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -151,7 +151,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_members(ls, rv, &neg); if (error) { log_rinfo(ls, "dlm_recover_members error %d", error); - goto fail; + goto fail_root_list; } dlm_recover_dir_nodeid(ls, &root_list); |