summaryrefslogtreecommitdiff
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c123
1 files changed, 63 insertions, 60 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 865dc70a9dfc..6dd3a524cd35 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -509,7 +509,7 @@ static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
void dlm_rsb_scan(struct timer_list *timer)
{
- struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
+ struct dlm_ls *ls = timer_container_of(ls, timer, ls_scan_timer);
int our_nodeid = dlm_our_nodeid();
struct dlm_rsb *r;
int rv;
@@ -741,6 +741,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
read_lock_bh(&ls->ls_rsbtbl_lock);
if (!rsb_flag(r, RSB_HASHED)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
+ error = -EBADR;
goto do_new;
}
@@ -784,6 +785,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
}
} else {
write_unlock_bh(&ls->ls_rsbtbl_lock);
+ error = -EBADR;
goto do_new;
}
@@ -824,9 +826,12 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
r->res_first_lkid = 0;
}
- /* A dir record will not be on the scan list. */
- if (r->res_dir_nodeid != our_nodeid)
- del_scan(ls, r);
+ /* we always deactivate scan timer for the rsb, when
+ * we move it out of the inactive state as rsb state
+ * can be changed and scan timers are only for inactive
+ * rsbs.
+ */
+ del_scan(ls, r);
list_move(&r->res_slow_list, &ls->ls_slow_active);
rsb_clear_flag(r, RSB_INACTIVE);
kref_init(&r->res_ref); /* ref is now used in active state */
@@ -989,10 +994,10 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
r->res_nodeid = 0;
}
+ del_scan(ls, r);
list_move(&r->res_slow_list, &ls->ls_slow_active);
rsb_clear_flag(r, RSB_INACTIVE);
kref_init(&r->res_ref);
- del_scan(ls, r);
write_unlock_bh(&ls->ls_rsbtbl_lock);
goto out;
@@ -1337,9 +1342,13 @@ static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *na
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
r_nodeid, result);
- /* A dir record rsb should never be on scan list. */
- /* Try to fix this with del_scan? */
- WARN_ON(!list_empty(&r->res_scan_list));
+ /* A dir record rsb should never be on scan list.
+ * Except when we are the dir and master node.
+ * This function should only be called by the dir
+ * node.
+ */
+ WARN_ON(!list_empty(&r->res_scan_list) &&
+ r->res_master_nodeid != our_nodeid);
write_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -1430,16 +1439,23 @@ static void deactivate_rsb(struct kref *kref)
list_move(&r->res_slow_list, &ls->ls_slow_inactive);
/*
- * When the rsb becomes unused:
- * - If it's not a dir record for a remote master rsb,
- * then it is put on the scan list to be freed.
- * - If it's a dir record for a remote master rsb,
- * then it is kept in the inactive state until
- * receive_remove() from the master node.
+ * When the rsb becomes unused, there are two possibilities:
+ * 1. Leave the inactive rsb in place (don't remove it).
+ * 2. Add it to the scan list to be removed.
+ *
+ * 1 is done when the rsb is acting as the dir record
+ * for a remotely mastered rsb. The rsb must be left
+ * in place as an inactive rsb to act as the dir record.
+ *
+ * 2 is done when a) the rsb is not the master and not the
+ * dir record, b) when the rsb is both the master and the
+ * dir record, c) when the rsb is master but not dir record.
+ *
+ * (If no directory is used, the rsb can always be removed.)
*/
- if (!dlm_no_directory(ls) &&
- (r->res_master_nodeid != our_nodeid) &&
- (dlm_dir_nodeid(r) != our_nodeid))
+ if (dlm_no_directory(ls) ||
+ (r->res_master_nodeid == our_nodeid ||
+ dlm_dir_nodeid(r) != our_nodeid))
add_scan(ls, r);
if (r->res_lvbptr) {
@@ -1703,19 +1719,11 @@ static int msg_reply_type(int mstype)
/* add/remove lkb from global waiters list of lkb's waiting for
a reply from a remote node */
-static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
+static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
- int error = 0;
spin_lock_bh(&ls->ls_waiters_lock);
-
- if (is_overlap_unlock(lkb) ||
- (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
- error = -EINVAL;
- goto out;
- }
-
if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
switch (mstype) {
case DLM_MSG_UNLOCK:
@@ -1725,7 +1733,11 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
break;
default:
- error = -EBUSY;
+ /* should never happen as validate_lock_args() checks
+ * on lkb_wait_type and validate_unlock_args() only
+ * creates UNLOCK or CANCEL messages.
+ */
+ WARN_ON_ONCE(1);
goto out;
}
lkb->lkb_wait_count++;
@@ -1747,12 +1759,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
hold_lkb(lkb);
list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
out:
- if (error)
- log_error(ls, "addwait error %x %d flags %x %d %d %s",
- lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
- lkb->lkb_wait_type, lkb->lkb_resource->res_name);
spin_unlock_bh(&ls->ls_waiters_lock);
- return error;
}
/* We clear the RESEND flag because we might be taking an lkb off the waiters
@@ -2861,16 +2868,14 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
case -EINVAL:
/* annoy the user because dlm usage is wrong */
WARN_ON(1);
- log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
+ log_error(ls, "%s %d %x %x %x %d %d", __func__,
rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
- lkb->lkb_status, lkb->lkb_wait_type,
- lkb->lkb_resource->res_name);
+ lkb->lkb_status, lkb->lkb_wait_type);
break;
default:
- log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
+ log_debug(ls, "%s %d %x %x %x %d %d", __func__,
rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
- lkb->lkb_status, lkb->lkb_wait_type,
- lkb->lkb_resource->res_name);
+ lkb->lkb_status, lkb->lkb_wait_type);
break;
}
@@ -2928,13 +2933,16 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
goto out;
}
+ if (is_overlap_unlock(lkb))
+ goto out;
+
/* cancel not allowed with another cancel/unlock in progress */
if (args->flags & DLM_LKF_CANCEL) {
if (lkb->lkb_exflags & DLM_LKF_CANCEL)
goto out;
- if (is_overlap(lkb))
+ if (is_overlap_cancel(lkb))
goto out;
if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
@@ -2972,9 +2980,6 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
goto out;
- if (is_overlap_unlock(lkb))
- goto out;
-
if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
rv = -EBUSY;
@@ -3610,10 +3615,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
to_nodeid = r->res_nodeid;
- error = add_to_waiters(lkb, mstype, to_nodeid);
- if (error)
- return error;
-
+ add_to_waiters(lkb, mstype, to_nodeid);
error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
if (error)
goto fail;
@@ -3716,10 +3718,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
to_nodeid = dlm_dir_nodeid(r);
- error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
- if (error)
- return error;
-
+ add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
if (error)
goto fail;
@@ -5016,16 +5015,19 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms_local)
{
if (middle_conversion(lkb)) {
+ log_rinfo(ls, "%s %x middle convert in progress", __func__,
+ lkb->lkb_id);
+
+ /* We sent this lock to the new master. The new master will
+ * tell us when it's granted. We no longer need a reply, so
+ * use a fake reply to put the lkb into the right state.
+ */
hold_lkb(lkb);
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_convert_reply(lkb, ms_local, true);
-
- /* Same special case as in receive_rcom_lock_args() */
- lkb->lkb_grmode = DLM_LOCK_IV;
- rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
unhold_lkb(lkb);
} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
@@ -5572,10 +5574,11 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
The real granted mode of these converting locks cannot be determined
until all locks have been rebuilt on the rsb (recover_conversion) */
- if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
- middle_conversion(lkb)) {
- rl->rl_status = DLM_LKSTS_CONVERT;
- lkb->lkb_grmode = DLM_LOCK_IV;
+ if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
+ /* We may need to adjust grmode depending on other granted locks. */
+ log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x",
+ __func__, lkb->lkb_id, lkb->lkb_grmode,
+ lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
rsb_set_flag(r, RSB_RECOVER_CONVERT);
}
@@ -6344,8 +6347,8 @@ int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
if (error)
return error;
- error = add_to_waiters(lkb, mstype, to_nodeid);
+ add_to_waiters(lkb, mstype, to_nodeid);
dlm_put_lkb(lkb);
- return error;
+ return 0;
}