diff options
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r-- | fs/dlm/lock.c | 123 |
1 files changed, 63 insertions, 60 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 865dc70a9dfc..6dd3a524cd35 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -509,7 +509,7 @@ static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) void dlm_rsb_scan(struct timer_list *timer) { - struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer); + struct dlm_ls *ls = timer_container_of(ls, timer, ls_scan_timer); int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *r; int rv; @@ -741,6 +741,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, read_lock_bh(&ls->ls_rsbtbl_lock); if (!rsb_flag(r, RSB_HASHED)) { read_unlock_bh(&ls->ls_rsbtbl_lock); + error = -EBADR; goto do_new; } @@ -784,6 +785,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, } } else { write_unlock_bh(&ls->ls_rsbtbl_lock); + error = -EBADR; goto do_new; } @@ -824,9 +826,12 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, r->res_first_lkid = 0; } - /* A dir record will not be on the scan list. */ - if (r->res_dir_nodeid != our_nodeid) - del_scan(ls, r); + /* we always deactivate scan timer for the rsb, when + * we move it out of the inactive state as rsb state + * can be changed and scan timers are only for inactive + * rsbs. + */ + del_scan(ls, r); list_move(&r->res_slow_list, &ls->ls_slow_active); rsb_clear_flag(r, RSB_INACTIVE); kref_init(&r->res_ref); /* ref is now used in active state */ @@ -989,10 +994,10 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, r->res_nodeid = 0; } + del_scan(ls, r); list_move(&r->res_slow_list, &ls->ls_slow_active); rsb_clear_flag(r, RSB_INACTIVE); kref_init(&r->res_ref); - del_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); goto out; @@ -1337,9 +1342,13 @@ static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *na __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, r_nodeid, result); - /* A dir record rsb should never be on scan list. */ - /* Try to fix this with del_scan? */ - WARN_ON(!list_empty(&r->res_scan_list)); + /* A dir record rsb should never be on scan list. + * Except when we are the dir and master node. + * This function should only be called by the dir + * node. + */ + WARN_ON(!list_empty(&r->res_scan_list) && + r->res_master_nodeid != our_nodeid); write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -1430,16 +1439,23 @@ static void deactivate_rsb(struct kref *kref) list_move(&r->res_slow_list, &ls->ls_slow_inactive); /* - * When the rsb becomes unused: - * - If it's not a dir record for a remote master rsb, - * then it is put on the scan list to be freed. - * - If it's a dir record for a remote master rsb, - * then it is kept in the inactive state until - * receive_remove() from the master node. + * When the rsb becomes unused, there are two possibilities: + * 1. Leave the inactive rsb in place (don't remove it). + * 2. Add it to the scan list to be removed. + * + * 1 is done when the rsb is acting as the dir record + * for a remotely mastered rsb. The rsb must be left + * in place as an inactive rsb to act as the dir record. + * + * 2 is done when a) the rsb is not the master and not the + * dir record, b) when the rsb is both the master and the + * dir record, c) when the rsb is master but not dir record. + * + * (If no directory is used, the rsb can always be removed.) */ - if (!dlm_no_directory(ls) && - (r->res_master_nodeid != our_nodeid) && - (dlm_dir_nodeid(r) != our_nodeid)) + if (dlm_no_directory(ls) || + (r->res_master_nodeid == our_nodeid || + dlm_dir_nodeid(r) != our_nodeid)) add_scan(ls, r); if (r->res_lvbptr) { @@ -1703,19 +1719,11 @@ static int msg_reply_type(int mstype) /* add/remove lkb from global waiters list of lkb's waiting for a reply from a remote node */ -static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) +static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; - int error = 0; spin_lock_bh(&ls->ls_waiters_lock); - - if (is_overlap_unlock(lkb) || - (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { - error = -EINVAL; - goto out; - } - if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { switch (mstype) { case DLM_MSG_UNLOCK: @@ -1725,7 +1733,11 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); break; default: - error = -EBUSY; + /* should never happen as validate_lock_args() checks + * on lkb_wait_type and validate_unlock_args() only + * creates UNLOCK or CANCEL messages. + */ + WARN_ON_ONCE(1); goto out; } lkb->lkb_wait_count++; @@ -1747,12 +1759,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) hold_lkb(lkb); list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: - if (error) - log_error(ls, "addwait error %x %d flags %x %d %d %s", - lkb->lkb_id, error, dlm_iflags_val(lkb), mstype, - lkb->lkb_wait_type, lkb->lkb_resource->res_name); spin_unlock_bh(&ls->ls_waiters_lock); - return error; } /* We clear the RESEND flag because we might be taking an lkb off the waiters @@ -2861,16 +2868,14 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, case -EINVAL: /* annoy the user because dlm usage is wrong */ WARN_ON(1); - log_error(ls, "%s %d %x %x %x %d %d %s", __func__, + log_error(ls, "%s %d %x %x %x %d %d", __func__, rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, - lkb->lkb_status, lkb->lkb_wait_type, - lkb->lkb_resource->res_name); + lkb->lkb_status, lkb->lkb_wait_type); break; default: - log_debug(ls, "%s %d %x %x %x %d %d %s", __func__, + log_debug(ls, "%s %d %x %x %x %d %d", __func__, rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, - lkb->lkb_status, lkb->lkb_wait_type, - lkb->lkb_resource->res_name); + lkb->lkb_status, lkb->lkb_wait_type); break; } @@ -2928,13 +2933,16 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) goto out; } + if (is_overlap_unlock(lkb)) + goto out; + /* cancel not allowed with another cancel/unlock in progress */ if (args->flags & DLM_LKF_CANCEL) { if (lkb->lkb_exflags & DLM_LKF_CANCEL) goto out; - if (is_overlap(lkb)) + if (is_overlap_cancel(lkb)) goto out; if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { @@ -2972,9 +2980,6 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) goto out; - if (is_overlap_unlock(lkb)) - goto out; - if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); rv = -EBUSY; @@ -3610,10 +3615,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) to_nodeid = r->res_nodeid; - error = add_to_waiters(lkb, mstype, to_nodeid); - if (error) - return error; - + add_to_waiters(lkb, mstype, to_nodeid); error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); if (error) goto fail; @@ -3716,10 +3718,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) to_nodeid = dlm_dir_nodeid(r); - error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); - if (error) - return error; - + add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); if (error) goto fail; @@ -5016,16 +5015,19 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms_local) { if (middle_conversion(lkb)) { + log_rinfo(ls, "%s %x middle convert in progress", __func__, + lkb->lkb_id); + + /* We sent this lock to the new master. The new master will + * tell us when it's granted. We no longer need a reply, so + * use a fake reply to put the lkb into the right state. + */ hold_lkb(lkb); memset(ms_local, 0, sizeof(struct dlm_message)); ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); _receive_convert_reply(lkb, ms_local, true); - - /* Same special case as in receive_rcom_lock_args() */ - lkb->lkb_grmode = DLM_LOCK_IV; - rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); unhold_lkb(lkb); } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { @@ -5572,10 +5574,11 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, The real granted mode of these converting locks cannot be determined until all locks have been rebuilt on the rsb (recover_conversion) */ - if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && - middle_conversion(lkb)) { - rl->rl_status = DLM_LKSTS_CONVERT; - lkb->lkb_grmode = DLM_LOCK_IV; + if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) { + /* We may need to adjust grmode depending on other granted locks. */ + log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x", + __func__, lkb->lkb_id, lkb->lkb_grmode, + lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid); rsb_set_flag(r, RSB_RECOVER_CONVERT); } @@ -6344,8 +6347,8 @@ int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, if (error) return error; - error = add_to_waiters(lkb, mstype, to_nodeid); + add_to_waiters(lkb, mstype, to_nodeid); dlm_put_lkb(lkb); - return error; + return 0; } |