summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Documentation/filesystems/gfs2-glocks.rst55
-rw-r--r--fs/gfs2/glock.c227
-rw-r--r--fs/gfs2/glock.h1
-rw-r--r--fs/gfs2/glops.c42
-rw-r--r--fs/gfs2/incore.h12
-rw-r--r--fs/gfs2/lock_dlm.c28
-rw-r--r--fs/gfs2/ops_fstype.c13
-rw-r--r--fs/gfs2/quota.c388
-rw-r--r--fs/gfs2/super.c1
-rw-r--r--fs/gfs2/trace_gfs2.h6
-rw-r--r--fs/gfs2/util.c12
11 files changed, 380 insertions, 405 deletions
diff --git a/Documentation/filesystems/gfs2-glocks.rst b/Documentation/filesystems/gfs2-glocks.rst
index 8a5842929b60..adc0d4c4d979 100644
--- a/Documentation/filesystems/gfs2-glocks.rst
+++ b/Documentation/filesystems/gfs2-glocks.rst
@@ -40,14 +40,14 @@ shared lock mode, SH. In GFS2 the DF mode is used exclusively for direct I/O
operations. The glocks are basically a lock plus some routines which deal
with cache management. The following rules apply for the cache:
-========== ========== ============== ========== ==============
-Glock mode Cache data Cache Metadata Dirty Data Dirty Metadata
-========== ========== ============== ========== ==============
- UN No No No No
- SH Yes Yes No No
- DF No Yes No No
- EX Yes Yes Yes Yes
-========== ========== ============== ========== ==============
+========== ============== ========== ========== ==============
+Glock mode Cache Metadata Cache data Dirty Data Dirty Metadata
+========== ============== ========== ========== ==============
+ UN No No No No
+ DF Yes No No No
+ SH Yes Yes No No
+ EX Yes Yes Yes Yes
+========== ============== ========== ========== ==============
These rules are implemented using the various glock operations which
are defined for each type of glock. Not all types of glocks use
@@ -55,23 +55,22 @@ all the modes. Only inode glocks use the DF mode for example.
Table of glock operations and per type constants:
-============= =============================================================
+============== =============================================================
Field Purpose
-============= =============================================================
-go_xmote_th Called before remote state change (e.g. to sync dirty data)
+============== =============================================================
+go_sync Called before remote state change (e.g. to sync dirty data)
go_xmote_bh Called after remote state change (e.g. to refill cache)
go_inval Called if remote state change requires invalidating the cache
-go_demote_ok Returns boolean value of whether its ok to demote a glock
- (e.g. checks timeout, and that there is no cached data)
-go_lock Called for the first local holder of a lock
-go_unlock Called on the final local unlock of a lock
+go_instantiate Called when a glock has been acquired
+go_held Called every time a glock holder is acquired
go_dump Called to print content of object for debugfs file, or on
error to dump glock to the log.
-go_type The type of the glock, ``LM_TYPE_*``
go_callback Called if the DLM sends a callback to drop this lock
+go_unlocked Called when a glock is unlocked (dlm_unlock())
+go_type The type of the glock, ``LM_TYPE_*``
go_flags GLOF_ASPACE is set, if the glock has an address space
associated with it
-============= =============================================================
+============== =============================================================
The minimum hold time for each lock is the time after a remote lock
grant for which we ignore remote demote requests. This is in order to
@@ -82,26 +81,24 @@ to by multiple nodes. By delaying the demotion in response to a
remote callback, that gives the userspace program time to make
some progress before the pages are unmapped.
-There is a plan to try and remove the go_lock and go_unlock callbacks
-if possible, in order to try and speed up the fast path though the locking.
-Also, eventually we hope to make the glock "EX" mode locally shared
-such that any local locking will be done with the i_mutex as required
-rather than via the glock.
+Eventually, we hope to make the glock "EX" mode locally shared such that any
+local locking will be done with the i_mutex as required rather than via the
+glock.
Locking rules for glock operations:
-============= ====================== =============================
+============== ====================== =============================
Operation GLF_LOCK bit lock held gl_lockref.lock spinlock held
-============= ====================== =============================
-go_xmote_th Yes No
+============== ====================== =============================
+go_sync Yes No
go_xmote_bh Yes No
go_inval Yes No
-go_demote_ok Sometimes Yes
-go_lock Yes No
-go_unlock Yes No
+go_instantiate No No
+go_held No No
go_dump Sometimes Yes
go_callback Sometimes (N/A) Yes
-============= ====================== =============================
+go_unlocked Yes No
+============== ====================== =============================
.. Note::
diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c
index 4ea6c8bfb4e6..12a769077ea0 100644
--- a/fs/gfs2/glock.c
+++ b/fs/gfs2/glock.c
@@ -61,12 +61,10 @@ struct gfs2_glock_iter {
typedef void (*glock_examiner) (struct gfs2_glock * gl);
static void do_xmote(struct gfs2_glock *gl, struct gfs2_holder *gh, unsigned int target);
-static void __gfs2_glock_dq(struct gfs2_holder *gh);
-static void handle_callback(struct gfs2_glock *gl, unsigned int state,
- unsigned long delay, bool remote);
+static void request_demote(struct gfs2_glock *gl, unsigned int state,
+ unsigned long delay, bool remote);
static struct dentry *gfs2_root;
-static struct workqueue_struct *glock_workqueue;
static LIST_HEAD(lru_list);
static atomic_t lru_count = ATOMIC_INIT(0);
static DEFINE_SPINLOCK(lru_lock);
@@ -218,34 +216,9 @@ struct gfs2_glock *gfs2_glock_hold(struct gfs2_glock *gl)
return gl;
}
-/**
- * demote_ok - Check to see if it's ok to unlock a glock
- * @gl: the glock
- *
- * Returns: 1 if it's ok
- */
-
-static int demote_ok(const struct gfs2_glock *gl)
-{
- const struct gfs2_glock_operations *glops = gl->gl_ops;
-
- if (gl->gl_state == LM_ST_UNLOCKED)
- return 0;
- if (!list_empty(&gl->gl_holders))
- return 0;
- if (glops->go_demote_ok)
- return glops->go_demote_ok(gl);
- return 1;
-}
-
-
-void gfs2_glock_add_to_lru(struct gfs2_glock *gl)
+static void gfs2_glock_add_to_lru(struct gfs2_glock *gl)
{
- if (!(gl->gl_ops->go_flags & GLOF_LRU))
- return;
-
spin_lock(&lru_lock);
-
list_move_tail(&gl->gl_lru, &lru_list);
if (!test_bit(GLF_LRU, &gl->gl_flags)) {
@@ -258,9 +231,6 @@ void gfs2_glock_add_to_lru(struct gfs2_glock *gl)
static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl)
{
- if (!(gl->gl_ops->go_flags & GLOF_LRU))
- return;
-
spin_lock(&lru_lock);
if (test_bit(GLF_LRU, &gl->gl_flags)) {
list_del_init(&gl->gl_lru);
@@ -275,7 +245,9 @@ static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl)
* work queue.
*/
static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) {
- if (!queue_delayed_work(glock_workqueue, &gl->gl_work, delay)) {
+ struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
+
+ if (!queue_delayed_work(sdp->sd_glock_wq, &gl->gl_work, delay)) {
/*
* We are holding the lockref spinlock, and the work was still
* queued above. The queued work (glock_work_func) takes that
@@ -305,6 +277,20 @@ static void __gfs2_glock_put(struct gfs2_glock *gl)
sdp->sd_lockstruct.ls_ops->lm_put_lock(gl);
}
+static bool __gfs2_glock_put_or_lock(struct gfs2_glock *gl)
+{
+ if (lockref_put_or_lock(&gl->gl_lockref))
+ return true;
+ GLOCK_BUG_ON(gl, gl->gl_lockref.count != 1);
+ if (gl->gl_state != LM_ST_UNLOCKED) {
+ gl->gl_lockref.count--;
+ gfs2_glock_add_to_lru(gl);
+ spin_unlock(&gl->gl_lockref.lock);
+ return true;
+ }
+ return false;
+}
+
/**
* gfs2_glock_put() - Decrement reference count on glock
* @gl: The glock to put
@@ -313,7 +299,7 @@ static void __gfs2_glock_put(struct gfs2_glock *gl)
void gfs2_glock_put(struct gfs2_glock *gl)
{
- if (lockref_put_or_lock(&gl->gl_lockref))
+ if (__gfs2_glock_put_or_lock(gl))
return;
__gfs2_glock_put(gl);
@@ -328,10 +314,9 @@ void gfs2_glock_put(struct gfs2_glock *gl)
*/
void gfs2_glock_put_async(struct gfs2_glock *gl)
{
- if (lockref_put_or_lock(&gl->gl_lockref))
+ if (__gfs2_glock_put_or_lock(gl))
return;
- GLOCK_BUG_ON(gl, gl->gl_lockref.count != 1);
gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
}
@@ -570,18 +555,6 @@ static inline struct gfs2_holder *find_last_waiter(const struct gfs2_glock *gl)
static void state_change(struct gfs2_glock *gl, unsigned int new_state)
{
- int held1, held2;
-
- held1 = (gl->gl_state != LM_ST_UNLOCKED);
- held2 = (new_state != LM_ST_UNLOCKED);
-
- if (held1 != held2) {
- GLOCK_BUG_ON(gl, __lockref_is_dead(&gl->gl_lockref));
- if (held2)
- gl->gl_lockref.count++;
- else
- gl->gl_lockref.count--;
- }
if (new_state != gl->gl_target)
/* shorten our minimum hold time */
gl->gl_hold_time = max(gl->gl_hold_time - GL_GLOCK_HOLD_DECR,
@@ -812,7 +785,7 @@ skip_inval:
(target != LM_ST_UNLOCKED ||
test_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags))) {
if (!is_system_glock(gl)) {
- handle_callback(gl, LM_ST_UNLOCKED, 0, false); /* sets demote */
+ request_demote(gl, LM_ST_UNLOCKED, 0, false);
/*
* Ordinarily, we would call dlm and its callback would call
* finish_xmote, which would call state_change() to the new state.
@@ -910,7 +883,6 @@ out_sched:
out_unlock:
clear_bit(GLF_LOCK, &gl->gl_flags);
smp_mb__after_atomic();
- return;
}
/**
@@ -1111,19 +1083,21 @@ static void glock_work_func(struct work_struct *work)
unsigned int drop_refs = 1;
spin_lock(&gl->gl_lockref.lock);
- if (test_bit(GLF_REPLY_PENDING, &gl->gl_flags)) {
- clear_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+ if (test_bit(GLF_HAVE_REPLY, &gl->gl_flags)) {
+ clear_bit(GLF_HAVE_REPLY, &gl->gl_flags);
finish_xmote(gl, gl->gl_reply);
drop_refs++;
}
if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
gl->gl_state != LM_ST_UNLOCKED &&
gl->gl_demote_state != LM_ST_EXCLUSIVE) {
- unsigned long holdtime, now = jiffies;
+ if (gl->gl_name.ln_type == LM_TYPE_INODE) {
+ unsigned long holdtime, now = jiffies;
- holdtime = gl->gl_tchange + gl->gl_hold_time;
- if (time_before(now, holdtime))
- delay = holdtime - now;
+ holdtime = gl->gl_tchange + gl->gl_hold_time;
+ if (time_before(now, holdtime))
+ delay = holdtime - now;
+ }
if (!delay) {
clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags);
@@ -1134,20 +1108,18 @@ static void glock_work_func(struct work_struct *work)
if (delay) {
/* Keep one glock reference for the work we requeue. */
drop_refs--;
- if (gl->gl_name.ln_type != LM_TYPE_INODE)
- delay = 0;
gfs2_glock_queue_work(gl, delay);
}
- /*
- * Drop the remaining glock references manually here. (Mind that
- * gfs2_glock_queue_work depends on the lockref spinlock begin held
- * here as well.)
- */
+ /* Drop the remaining glock references manually. */
+ GLOCK_BUG_ON(gl, gl->gl_lockref.count < drop_refs);
gl->gl_lockref.count -= drop_refs;
if (!gl->gl_lockref.count) {
- __gfs2_glock_put(gl);
- return;
+ if (gl->gl_state == LM_ST_UNLOCKED) {
+ __gfs2_glock_put(gl);
+ return;
+ }
+ gfs2_glock_add_to_lru(gl);
}
spin_unlock(&gl->gl_lockref.lock);
}
@@ -1183,6 +1155,8 @@ again:
out:
rcu_read_unlock();
finish_wait(wq, &wait.wait);
+ if (gl)
+ gfs2_glock_remove_from_lru(gl);
return gl;
}
@@ -1209,13 +1183,10 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
.ln_sbd = sdp };
struct gfs2_glock *gl, *tmp;
struct address_space *mapping;
- int ret = 0;
gl = find_insert_glock(&name, NULL);
- if (gl) {
- *glp = gl;
- return 0;
- }
+ if (gl)
+ goto found;
if (!create)
return -ENOENT;
@@ -1243,7 +1214,9 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
atomic_inc(&sdp->sd_glock_disposal);
gl->gl_node.next = NULL;
- gl->gl_flags = glops->go_instantiate ? BIT(GLF_INSTANTIATE_NEEDED) : 0;
+ gl->gl_flags = BIT(GLF_INITIAL);
+ if (glops->go_instantiate)
+ gl->gl_flags |= BIT(GLF_INSTANTIATE_NEEDED);
gl->gl_name = name;
lockdep_set_subclass(&gl->gl_lockref.lock, glops->go_subclass);
gl->gl_lockref.count = 1;
@@ -1275,23 +1248,19 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
}
tmp = find_insert_glock(&name, gl);
- if (!tmp) {
- *glp = gl;
- goto out;
- }
- if (IS_ERR(tmp)) {
- ret = PTR_ERR(tmp);
- goto out_free;
- }
- *glp = tmp;
+ if (tmp) {
+ gfs2_glock_dealloc(&gl->gl_rcu);
+ if (atomic_dec_and_test(&sdp->sd_glock_disposal))
+ wake_up(&sdp->sd_kill_wait);
-out_free:
- gfs2_glock_dealloc(&gl->gl_rcu);
- if (atomic_dec_and_test(&sdp->sd_glock_disposal))
- wake_up(&sdp->sd_kill_wait);
+ if (IS_ERR(tmp))
+ return PTR_ERR(tmp);
+ gl = tmp;
+ }
-out:
- return ret;
+found:
+ *glp = gl;
+ return 0;
}
/**
@@ -1461,7 +1430,7 @@ out:
}
/**
- * handle_callback - process a demote request
+ * request_demote - process a demote request
* @gl: the glock
* @state: the state the caller wants us to change to
* @delay: zero to demote immediately; otherwise pending demote
@@ -1471,8 +1440,8 @@ out:
* practise: LM_ST_SHARED and LM_ST_UNLOCKED
*/
-static void handle_callback(struct gfs2_glock *gl, unsigned int state,
- unsigned long delay, bool remote)
+static void request_demote(struct gfs2_glock *gl, unsigned int state,
+ unsigned long delay, bool remote)
{
if (delay)
set_bit(GLF_PENDING_DEMOTE, &gl->gl_flags);
@@ -1636,15 +1605,12 @@ unlock:
return error;
}
- if (test_bit(GLF_LRU, &gl->gl_flags))
- gfs2_glock_remove_from_lru(gl);
-
gh->gh_error = 0;
spin_lock(&gl->gl_lockref.lock);
add_to_queue(gh);
if (unlikely((LM_FLAG_NOEXP & gh->gh_flags) &&
- test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))) {
- set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+ test_and_clear_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags))) {
+ set_bit(GLF_HAVE_REPLY, &gl->gl_flags);
gl->gl_lockref.count++;
gfs2_glock_queue_work(gl, 0);
}
@@ -1688,7 +1654,7 @@ static void __gfs2_glock_dq(struct gfs2_holder *gh)
* below.
*/
if (gh->gh_flags & GL_NOCACHE)
- handle_callback(gl, LM_ST_UNLOCKED, 0, false);
+ request_demote(gl, LM_ST_UNLOCKED, 0, false);
list_del_init(&gh->gh_list);
clear_bit(HIF_HOLDER, &gh->gh_iflags);
@@ -1703,9 +1669,6 @@ static void __gfs2_glock_dq(struct gfs2_holder *gh)
fast_path = 1;
}
- if (!test_bit(GLF_LFLUSH, &gl->gl_flags) && demote_ok(gl))
- gfs2_glock_add_to_lru(gl);
-
if (unlikely(!fast_path)) {
gl->gl_lockref.count++;
if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
@@ -1932,10 +1895,10 @@ void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state)
gl->gl_name.ln_type == LM_TYPE_INODE) {
if (time_before(now, holdtime))
delay = holdtime - now;
- if (test_bit(GLF_REPLY_PENDING, &gl->gl_flags))
+ if (test_bit(GLF_HAVE_REPLY, &gl->gl_flags))
delay = gl->gl_hold_time;
}
- handle_callback(gl, state, delay, true);
+ request_demote(gl, state, delay, true);
gfs2_glock_queue_work(gl, delay);
spin_unlock(&gl->gl_lockref.lock);
}
@@ -1988,14 +1951,14 @@ void gfs2_glock_complete(struct gfs2_glock *gl, int ret)
if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags))) {
if (gfs2_should_freeze(gl)) {
- set_bit(GLF_FROZEN, &gl->gl_flags);
+ set_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags);
spin_unlock(&gl->gl_lockref.lock);
return;
}
}
gl->gl_lockref.count++;
- set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+ set_bit(GLF_HAVE_REPLY, &gl->gl_flags);
gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
}
@@ -2018,10 +1981,12 @@ static int glock_cmp(void *priv, const struct list_head *a,
static bool can_free_glock(struct gfs2_glock *gl)
{
- bool held = gl->gl_state != LM_ST_UNLOCKED;
+ struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
return !test_bit(GLF_LOCK, &gl->gl_flags) &&
- gl->gl_lockref.count == held;
+ !gl->gl_lockref.count &&
+ (!test_bit(GLF_LFLUSH, &gl->gl_flags) ||
+ test_bit(SDF_KILL, &sdp->sd_flags));
}
/**
@@ -2063,8 +2028,8 @@ add_back_to_lru:
clear_bit(GLF_LRU, &gl->gl_flags);
freed++;
gl->gl_lockref.count++;
- if (demote_ok(gl))
- handle_callback(gl, LM_ST_UNLOCKED, 0, false);
+ if (gl->gl_state != LM_ST_UNLOCKED)
+ request_demote(gl, LM_ST_UNLOCKED, 0, false);
gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
cond_resched_lock(&lru_lock);
@@ -2182,13 +2147,14 @@ void gfs2_flush_delete_work(struct gfs2_sbd *sdp)
static void thaw_glock(struct gfs2_glock *gl)
{
- if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))
+ if (!test_and_clear_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags))
return;
if (!lockref_get_not_dead(&gl->gl_lockref))
return;
+ gfs2_glock_remove_from_lru(gl);
spin_lock(&gl->gl_lockref.lock);
- set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+ set_bit(GLF_HAVE_REPLY, &gl->gl_flags);
gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
}
@@ -2207,7 +2173,7 @@ static void clear_glock(struct gfs2_glock *gl)
if (!__lockref_is_dead(&gl->gl_lockref)) {
gl->gl_lockref.count++;
if (gl->gl_state != LM_ST_UNLOCKED)
- handle_callback(gl, LM_ST_UNLOCKED, 0, false);
+ request_demote(gl, LM_ST_UNLOCKED, 0, false);
gfs2_glock_queue_work(gl, 0);
}
spin_unlock(&gl->gl_lockref.lock);
@@ -2259,16 +2225,30 @@ void gfs2_gl_dq_holders(struct gfs2_sbd *sdp)
void gfs2_gl_hash_clear(struct gfs2_sbd *sdp)
{
+ unsigned long start = jiffies;
+ bool timed_out = false;
+
set_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags);
- flush_workqueue(glock_workqueue);
+ flush_workqueue(sdp->sd_glock_wq);
glock_hash_walk(clear_glock, sdp);
- flush_workqueue(glock_workqueue);
- wait_event_timeout(sdp->sd_kill_wait,
- atomic_read(&sdp->sd_glock_disposal) == 0,
- HZ * 600);
+ flush_workqueue(sdp->sd_glock_wq);
+
+ while (!timed_out) {
+ wait_event_timeout(sdp->sd_kill_wait,
+ !atomic_read(&sdp->sd_glock_disposal),
+ HZ * 60);
+ if (!atomic_read(&sdp->sd_glock_disposal))
+ break;
+ timed_out = time_after(jiffies, start + (HZ * 600));
+ fs_warn(sdp, "%u glocks left after %u seconds%s\n",
+ atomic_read(&sdp->sd_glock_disposal),
+ jiffies_to_msecs(jiffies - start) / 1000,
+ timed_out ? ":" : "; still waiting");
+ }
gfs2_lm_unmount(sdp);
gfs2_free_dead_glocks(sdp);
glock_hash_walk(dump_glock_func, sdp);
+ destroy_workqueue(sdp->sd_glock_wq);
}
static const char *state2str(unsigned state)
@@ -2366,11 +2346,11 @@ static const char *gflags2str(char *buf, const struct gfs2_glock *gl)
*p++ = 'f';
if (test_bit(GLF_INVALIDATE_IN_PROGRESS, gflags))
*p++ = 'i';
- if (test_bit(GLF_REPLY_PENDING, gflags))
+ if (test_bit(GLF_HAVE_REPLY, gflags))
*p++ = 'r';
if (test_bit(GLF_INITIAL, gflags))
- *p++ = 'I';
- if (test_bit(GLF_FROZEN, gflags))
+ *p++ = 'a';
+ if (test_bit(GLF_HAVE_FROZEN_REPLY, gflags))
*p++ = 'F';
if (!list_empty(&gl->gl_holders))
*p++ = 'q';
@@ -2380,7 +2360,7 @@ static const char *gflags2str(char *buf, const struct gfs2_glock *gl)
*p++ = 'o';
if (test_bit(GLF_BLOCKING, gflags))
*p++ = 'b';
- if (test_bit(GLF_FREEING, gflags))
+ if (test_bit(GLF_UNLOCKED, gflags))
*p++ = 'x';
if (test_bit(GLF_INSTANTIATE_NEEDED, gflags))
*p++ = 'n';
@@ -2533,16 +2513,8 @@ int __init gfs2_glock_init(void)
if (ret < 0)
return ret;
- glock_workqueue = alloc_workqueue("glock_workqueue", WQ_MEM_RECLAIM |
- WQ_HIGHPRI | WQ_FREEZABLE, 0);
- if (!glock_workqueue) {
- rhashtable_destroy(&gl_hash_table);
- return -ENOMEM;
- }
-
glock_shrinker = shrinker_alloc(0, "gfs2-glock");
if (!glock_shrinker) {
- destroy_workqueue(glock_workqueue);
rhashtable_destroy(&gl_hash_table);
return -ENOMEM;
}
@@ -2562,7 +2534,6 @@ void gfs2_glock_exit(void)
{
shrinker_free(glock_shrinker);
rhashtable_destroy(&gl_hash_table);
- destroy_workqueue(glock_workqueue);
}
static void gfs2_glock_iter_next(struct gfs2_glock_iter *gi, loff_t n)
diff --git a/fs/gfs2/glock.h b/fs/gfs2/glock.h
index 19aef6d53267..adf0091cc98f 100644
--- a/fs/gfs2/glock.h
+++ b/fs/gfs2/glock.h
@@ -250,7 +250,6 @@ void gfs2_flush_delete_work(struct gfs2_sbd *sdp);
void gfs2_gl_hash_clear(struct gfs2_sbd *sdp);
void gfs2_gl_dq_holders(struct gfs2_sbd *sdp);
void gfs2_glock_thaw(struct gfs2_sbd *sdp);
-void gfs2_glock_add_to_lru(struct gfs2_glock *gl);
void gfs2_glock_free(struct gfs2_glock *gl);
void gfs2_glock_free_later(struct gfs2_glock *gl);
diff --git a/fs/gfs2/glops.c b/fs/gfs2/glops.c
index 68677fb69a73..95d8081681dc 100644
--- a/fs/gfs2/glops.c
+++ b/fs/gfs2/glops.c
@@ -385,23 +385,6 @@ static void inode_go_inval(struct gfs2_glock *gl, int flags)
gfs2_clear_glop_pending(ip);
}
-/**
- * inode_go_demote_ok - Check to see if it's ok to unlock an inode glock
- * @gl: the glock
- *
- * Returns: 1 if it's ok
- */
-
-static int inode_go_demote_ok(const struct gfs2_glock *gl)
-{
- struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
-
- if (sdp->sd_jindex == gl->gl_object || sdp->sd_rindex == gl->gl_object)
- return 0;
-
- return 1;
-}
-
static int gfs2_dinode_in(struct gfs2_inode *ip, const void *buf)
{
struct gfs2_sbd *sdp = GFS2_SB(&ip->i_inode);
@@ -648,21 +631,21 @@ static void iopen_go_callback(struct gfs2_glock *gl, bool remote)
}
/**
- * inode_go_free - wake up anyone waiting for dlm's unlock ast to free it
- * @gl: glock being freed
+ * inode_go_unlocked - wake up anyone waiting for dlm's unlock ast
+ * @gl: glock being unlocked
*
* For now, this is only used for the journal inode glock. In withdraw
- * situations, we need to wait for the glock to be freed so that we know
+ * situations, we need to wait for the glock to be unlocked so that we know
* other nodes may proceed with recovery / journal replay.
*/
-static void inode_go_free(struct gfs2_glock *gl)
+static void inode_go_unlocked(struct gfs2_glock *gl)
{
/* Note that we cannot reference gl_object because it's already set
* to NULL by this point in its lifecycle. */
- if (!test_bit(GLF_FREEING, &gl->gl_flags))
+ if (!test_bit(GLF_UNLOCKED, &gl->gl_flags))
return;
- clear_bit_unlock(GLF_FREEING, &gl->gl_flags);
- wake_up_bit(&gl->gl_flags, GLF_FREEING);
+ clear_bit_unlock(GLF_UNLOCKED, &gl->gl_flags);
+ wake_up_bit(&gl->gl_flags, GLF_UNLOCKED);
}
/**
@@ -722,13 +705,12 @@ const struct gfs2_glock_operations gfs2_meta_glops = {
const struct gfs2_glock_operations gfs2_inode_glops = {
.go_sync = inode_go_sync,
.go_inval = inode_go_inval,
- .go_demote_ok = inode_go_demote_ok,
.go_instantiate = inode_go_instantiate,
.go_held = inode_go_held,
.go_dump = inode_go_dump,
.go_type = LM_TYPE_INODE,
- .go_flags = GLOF_ASPACE | GLOF_LRU | GLOF_LVB,
- .go_free = inode_go_free,
+ .go_flags = GLOF_ASPACE | GLOF_LVB,
+ .go_unlocked = inode_go_unlocked,
};
const struct gfs2_glock_operations gfs2_rgrp_glops = {
@@ -751,13 +733,13 @@ const struct gfs2_glock_operations gfs2_iopen_glops = {
.go_type = LM_TYPE_IOPEN,
.go_callback = iopen_go_callback,
.go_dump = inode_go_dump,
- .go_flags = GLOF_LRU | GLOF_NONDISK,
+ .go_flags = GLOF_NONDISK,
.go_subclass = 1,
};
const struct gfs2_glock_operations gfs2_flock_glops = {
.go_type = LM_TYPE_FLOCK,
- .go_flags = GLOF_LRU | GLOF_NONDISK,
+ .go_flags = GLOF_NONDISK,
};
const struct gfs2_glock_operations gfs2_nondisk_glops = {
@@ -768,7 +750,7 @@ const struct gfs2_glock_operations gfs2_nondisk_glops = {
const struct gfs2_glock_operations gfs2_quota_glops = {
.go_type = LM_TYPE_QUOTA,
- .go_flags = GLOF_LVB | GLOF_LRU | GLOF_NONDISK,
+ .go_flags = GLOF_LVB | GLOF_NONDISK,
};
const struct gfs2_glock_operations gfs2_journal_glops = {
diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h
index 60abd7050c99..aa4ef67a34e0 100644
--- a/fs/gfs2/incore.h
+++ b/fs/gfs2/incore.h
@@ -218,19 +218,17 @@ struct gfs2_glock_operations {
int (*go_sync) (struct gfs2_glock *gl);
int (*go_xmote_bh)(struct gfs2_glock *gl);
void (*go_inval) (struct gfs2_glock *gl, int flags);
- int (*go_demote_ok) (const struct gfs2_glock *gl);
int (*go_instantiate) (struct gfs2_glock *gl);
int (*go_held)(struct gfs2_holder *gh);
void (*go_dump)(struct seq_file *seq, const struct gfs2_glock *gl,
const char *fs_id_buf);
void (*go_callback)(struct gfs2_glock *gl, bool remote);
- void (*go_free)(struct gfs2_glock *gl);
+ void (*go_unlocked)(struct gfs2_glock *gl);
const int go_subclass;
const int go_type;
const unsigned long go_flags;
#define GLOF_ASPACE 1 /* address space attached */
#define GLOF_LVB 2 /* Lock Value Block attached */
-#define GLOF_LRU 4 /* LRU managed */
#define GLOF_NONDISK 8 /* not I/O related */
};
@@ -322,14 +320,14 @@ enum {
GLF_DIRTY = 6,
GLF_LFLUSH = 7,
GLF_INVALIDATE_IN_PROGRESS = 8,
- GLF_REPLY_PENDING = 9,
+ GLF_HAVE_REPLY = 9,
GLF_INITIAL = 10,
- GLF_FROZEN = 11,
+ GLF_HAVE_FROZEN_REPLY = 11,
GLF_INSTANTIATE_IN_PROG = 12, /* instantiate happening now */
GLF_LRU = 13,
GLF_OBJECT = 14, /* Used only for tracing */
GLF_BLOCKING = 15,
- GLF_FREEING = 16, /* Wait for glock to be freed */
+ GLF_UNLOCKED = 16, /* Wait for glock to be unlocked */
GLF_TRY_TO_EVICT = 17, /* iopen glocks only */
GLF_VERIFY_EVICT = 18, /* iopen glocks only */
};
@@ -772,6 +770,7 @@ struct gfs2_sbd {
/* Workqueue stuff */
+ struct workqueue_struct *sd_glock_wq;
struct workqueue_struct *sd_delete_wq;
/* Daemon stuff */
@@ -783,7 +782,6 @@ struct gfs2_sbd {
struct list_head sd_quota_list;
atomic_t sd_quota_count;
- struct mutex sd_quota_mutex;
struct mutex sd_quota_sync_mutex;
wait_queue_head_t sd_quota_wait;
diff --git a/fs/gfs2/lock_dlm.c b/fs/gfs2/lock_dlm.c
index 49059274a528..fa5134df985f 100644
--- a/fs/gfs2/lock_dlm.c
+++ b/fs/gfs2/lock_dlm.c
@@ -134,8 +134,8 @@ static void gdlm_ast(void *arg)
switch (gl->gl_lksb.sb_status) {
case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */
- if (gl->gl_ops->go_free)
- gl->gl_ops->go_free(gl);
+ if (gl->gl_ops->go_unlocked)
+ gl->gl_ops->go_unlocked(gl);
gfs2_glock_free(gl);
return;
case -DLM_ECANCEL: /* Cancel while getting lock */
@@ -163,11 +163,21 @@ static void gdlm_ast(void *arg)
BUG();
}
- set_bit(GLF_INITIAL, &gl->gl_flags);
+ /*
+ * The GLF_INITIAL flag is initially set for new glocks. Upon the
+ * first successful new (non-conversion) request, we clear this flag to
+ * indicate that a DLM lock exists and that gl->gl_lksb.sb_lkid is the
+ * identifier to use for identifying it.
+ *
+ * Any failed initial requests do not create a DLM lock, so we ignore
+ * the gl->gl_lksb.sb_lkid values that come with such requests.
+ */
+
+ clear_bit(GLF_INITIAL, &gl->gl_flags);
gfs2_glock_complete(gl, ret);
return;
out:
- if (!test_bit(GLF_INITIAL, &gl->gl_flags))
+ if (test_bit(GLF_INITIAL, &gl->gl_flags))
gl->gl_lksb.sb_lkid = 0;
gfs2_glock_complete(gl, ret);
}
@@ -239,7 +249,7 @@ static u32 make_flags(struct gfs2_glock *gl, const unsigned int gfs_flags,
BUG();
}
- if (gl->gl_lksb.sb_lkid != 0) {
+ if (!test_bit(GLF_INITIAL, &gl->gl_flags)) {
lkf |= DLM_LKF_CONVERT;
if (test_bit(GLF_BLOCKING, &gl->gl_flags))
lkf |= DLM_LKF_QUECVT;
@@ -270,14 +280,14 @@ static int gdlm_lock(struct gfs2_glock *gl, unsigned int req_state,
lkf = make_flags(gl, flags, req);
gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
- if (gl->gl_lksb.sb_lkid) {
- gfs2_update_request_times(gl);
- } else {
+ if (test_bit(GLF_INITIAL, &gl->gl_flags)) {
memset(strname, ' ', GDLM_STRNAME_BYTES - 1);
strname[GDLM_STRNAME_BYTES - 1] = '\0';
gfs2_reverse_hex(strname + 7, gl->gl_name.ln_type);
gfs2_reverse_hex(strname + 23, gl->gl_name.ln_number);
gl->gl_dstamp = ktime_get_real();
+ } else {
+ gfs2_update_request_times(gl);
}
/*
* Submit the actual lock request.
@@ -301,7 +311,7 @@ static void gdlm_put_lock(struct gfs2_glock *gl)
BUG_ON(!__lockref_is_dead(&gl->gl_lockref));
- if (gl->gl_lksb.sb_lkid == 0) {
+ if (test_bit(GLF_INITIAL, &gl->gl_flags)) {
gfs2_glock_free(gl);
return;
}
diff --git a/fs/gfs2/ops_fstype.c b/fs/gfs2/ops_fstype.c
index 05975ec76d35..ff1f3e3dc65c 100644
--- a/fs/gfs2/ops_fstype.c
+++ b/fs/gfs2/ops_fstype.c
@@ -103,7 +103,6 @@ static struct gfs2_sbd *init_sbd(struct super_block *sb)
init_completion(&sdp->sd_journal_ready);
INIT_LIST_HEAD(&sdp->sd_quota_list);
- mutex_init(&sdp->sd_quota_mutex);
mutex_init(&sdp->sd_quota_sync_mutex);
init_waitqueue_head(&sdp->sd_quota_wait);
spin_lock_init(&sdp->sd_bitmap_lock);
@@ -1188,11 +1187,17 @@ static int gfs2_fill_super(struct super_block *sb, struct fs_context *fc)
snprintf(sdp->sd_fsname, sizeof(sdp->sd_fsname), "%s", sdp->sd_table_name);
+ error = -ENOMEM;
+ sdp->sd_glock_wq = alloc_workqueue("gfs2-glock/%s",
+ WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_FREEZABLE, 0,
+ sdp->sd_fsname);
+ if (!sdp->sd_glock_wq)
+ goto fail_free;
+
sdp->sd_delete_wq = alloc_workqueue("gfs2-delete/%s",
WQ_MEM_RECLAIM | WQ_FREEZABLE, 0, sdp->sd_fsname);
- error = -ENOMEM;
if (!sdp->sd_delete_wq)
- goto fail_free;
+ goto fail_glock_wq;
error = gfs2_sys_fs_add(sdp);
if (error)
@@ -1301,6 +1306,8 @@ fail_debug:
gfs2_sys_fs_del(sdp);
fail_delete_wq:
destroy_workqueue(sdp->sd_delete_wq);
+fail_glock_wq:
+ destroy_workqueue(sdp->sd_glock_wq);
fail_free:
free_sbd(sdp);
sb->s_fs_info = NULL;
diff --git a/fs/gfs2/quota.c b/fs/gfs2/quota.c
index aa9cf0102848..2e6bc77f4f81 100644
--- a/fs/gfs2/quota.c
+++ b/fs/gfs2/quota.c
@@ -75,9 +75,6 @@
#define GFS2_QD_HASH_SIZE BIT(GFS2_QD_HASH_SHIFT)
#define GFS2_QD_HASH_MASK (GFS2_QD_HASH_SIZE - 1)
-#define QC_CHANGE 0
-#define QC_SYNC 1
-
/* Lock order: qd_lock -> bucket lock -> qd->lockref.lock -> lru lock */
/* -> sd_bitmap_lock */
static DEFINE_SPINLOCK(qd_lock);
@@ -319,11 +316,11 @@ static int qd_get(struct gfs2_sbd *sdp, struct kqid qid,
}
-static void qd_hold(struct gfs2_quota_data *qd)
+static void __qd_hold(struct gfs2_quota_data *qd)
{
struct gfs2_sbd *sdp = qd->qd_sbd;
- gfs2_assert(sdp, !__lockref_is_dead(&qd->qd_lockref));
- lockref_get(&qd->qd_lockref);
+ gfs2_assert(sdp, qd->qd_lockref.count > 0);
+ qd->qd_lockref.count++;
}
static void qd_put(struct gfs2_quota_data *qd)
@@ -400,16 +397,17 @@ static int bh_get(struct gfs2_quota_data *qd)
struct inode *inode = sdp->sd_qc_inode;
struct gfs2_inode *ip = GFS2_I(inode);
unsigned int block, offset;
- struct buffer_head *bh;
+ struct buffer_head *bh = NULL;
struct iomap iomap = { };
int error;
- mutex_lock(&sdp->sd_quota_mutex);
-
- if (qd->qd_bh_count++) {
- mutex_unlock(&sdp->sd_quota_mutex);
+ spin_lock(&qd->qd_lockref.lock);
+ if (qd->qd_bh_count) {
+ qd->qd_bh_count++;
+ spin_unlock(&qd->qd_lockref.lock);
return 0;
}
+ spin_unlock(&qd->qd_lockref.lock);
block = qd->qd_slot / sdp->sd_qc_per_block;
offset = qd->qd_slot % sdp->sd_qc_per_block;
@@ -418,122 +416,83 @@ static int bh_get(struct gfs2_quota_data *qd)
(loff_t)block << inode->i_blkbits,
i_blocksize(inode), &iomap);
if (error)
- goto fail;
+ return error;
error = -ENOENT;
if (iomap.type != IOMAP_MAPPED)
- goto fail;
+ return error;
error = gfs2_meta_read(ip->i_gl, iomap.addr >> inode->i_blkbits,
DIO_WAIT, 0, &bh);
if (error)
- goto fail;
+ return error;
error = -EIO;
if (gfs2_metatype_check(sdp, bh, GFS2_METATYPE_QC))
- goto fail_brelse;
-
- qd->qd_bh = bh;
- qd->qd_bh_qc = (struct gfs2_quota_change *)
- (bh->b_data + sizeof(struct gfs2_meta_header) +
- offset * sizeof(struct gfs2_quota_change));
-
- mutex_unlock(&sdp->sd_quota_mutex);
+ goto out;
- return 0;
+ spin_lock(&qd->qd_lockref.lock);
+ if (qd->qd_bh == NULL) {
+ qd->qd_bh = bh;
+ qd->qd_bh_qc = (struct gfs2_quota_change *)
+ (bh->b_data + sizeof(struct gfs2_meta_header) +
+ offset * sizeof(struct gfs2_quota_change));
+ bh = NULL;
+ }
+ qd->qd_bh_count++;
+ spin_unlock(&qd->qd_lockref.lock);
+ error = 0;
-fail_brelse:
+out:
brelse(bh);
-fail:
- qd->qd_bh_count--;
- mutex_unlock(&sdp->sd_quota_mutex);
return error;
}
static void bh_put(struct gfs2_quota_data *qd)
{
struct gfs2_sbd *sdp = qd->qd_sbd;
+ struct buffer_head *bh = NULL;
- mutex_lock(&sdp->sd_quota_mutex);
+ spin_lock(&qd->qd_lockref.lock);
gfs2_assert(sdp, qd->qd_bh_count);
if (!--qd->qd_bh_count) {
- brelse(qd->qd_bh);
+ bh = qd->qd_bh;
qd->qd_bh = NULL;
qd->qd_bh_qc = NULL;
}
- mutex_unlock(&sdp->sd_quota_mutex);
+ spin_unlock(&qd->qd_lockref.lock);
+ brelse(bh);
}
-static int qd_check_sync(struct gfs2_sbd *sdp, struct gfs2_quota_data *qd,
- u64 *sync_gen)
+static bool qd_grab_sync(struct gfs2_sbd *sdp, struct gfs2_quota_data *qd,
+ u64 sync_gen)
{
+ bool ret = false;
+
+ spin_lock(&qd->qd_lockref.lock);
if (test_bit(QDF_LOCKED, &qd->qd_flags) ||
!test_bit(QDF_CHANGE, &qd->qd_flags) ||
- (sync_gen && (qd->qd_sync_gen >= *sync_gen)))
- return 0;
-
- /*
- * If qd_change is 0 it means a pending quota change was negated.
- * We should not sync it, but we still have a qd reference and slot
- * reference taken by gfs2_quota_change -> do_qc that need to be put.
- */
- if (!qd->qd_change && test_and_clear_bit(QDF_CHANGE, &qd->qd_flags)) {
- slot_put(qd);
- qd_put(qd);
- return 0;
- }
+ qd->qd_sync_gen >= sync_gen)
+ goto out;
- if (!lockref_get_not_dead(&qd->qd_lockref))
- return 0;
+ if (__lockref_is_dead(&qd->qd_lockref))
+ goto out;
+ qd->qd_lockref.count++;
list_move_tail(&qd->qd_list, &sdp->sd_quota_list);
set_bit(QDF_LOCKED, &qd->qd_flags);
qd->qd_change_sync = qd->qd_change;
slot_hold(qd);
- return 1;
+ ret = true;
+
+out:
+ spin_unlock(&qd->qd_lockref.lock);
+ return ret;
}
-static int qd_bh_get_or_undo(struct gfs2_sbd *sdp, struct gfs2_quota_data *qd)
+static void qd_ungrab_sync(struct gfs2_quota_data *qd)
{
- int error;
-
- error = bh_get(qd);
- if (!error)
- return 0;
-
clear_bit(QDF_LOCKED, &qd->qd_flags);
slot_put(qd);
qd_put(qd);
- return error;
-}
-
-static int qd_fish(struct gfs2_sbd *sdp, struct gfs2_quota_data **qdp)
-{
- struct gfs2_quota_data *qd = NULL, *iter;
- int error;
-
- *qdp = NULL;
-
- if (sb_rdonly(sdp->sd_vfs))
- return 0;
-
- spin_lock(&qd_lock);
-
- list_for_each_entry(iter, &sdp->sd_quota_list, qd_list) {
- if (qd_check_sync(sdp, iter, &sdp->sd_quota_sync_gen)) {
- qd = iter;
- break;
- }
- }
-
- spin_unlock(&qd_lock);
-
- if (qd) {
- error = qd_bh_get_or_undo(sdp, qd);
- if (error)
- return error;
- *qdp = qd;
- }
-
- return 0;
}
static void qdsb_put(struct gfs2_quota_data *qd)
@@ -545,8 +504,10 @@ static void qdsb_put(struct gfs2_quota_data *qd)
static void qd_unlock(struct gfs2_quota_data *qd)
{
+ spin_lock(&qd->qd_lockref.lock);
gfs2_assert_warn(qd->qd_sbd, test_bit(QDF_LOCKED, &qd->qd_flags));
clear_bit(QDF_LOCKED, &qd->qd_flags);
+ spin_unlock(&qd->qd_lockref.lock);
qdsb_put(qd);
}
@@ -710,48 +671,57 @@ static int sort_qd(const void *a, const void *b)
return 0;
}
-static void do_qc(struct gfs2_quota_data *qd, s64 change, int qc_type)
+static void do_qc(struct gfs2_quota_data *qd, s64 change)
{
struct gfs2_sbd *sdp = qd->qd_sbd;
struct gfs2_inode *ip = GFS2_I(sdp->sd_qc_inode);
struct gfs2_quota_change *qc = qd->qd_bh_qc;
+ bool needs_put = false;
s64 x;
- mutex_lock(&sdp->sd_quota_mutex);
gfs2_trans_add_meta(ip->i_gl, qd->qd_bh);
- if (!test_bit(QDF_CHANGE, &qd->qd_flags)) {
- qc->qc_change = 0;
+ /*
+ * The QDF_CHANGE flag indicates that the slot in the quota change file
+ * is used. Here, we use the value of qc->qc_change when the slot is
+ * used, and we assume a value of 0 otherwise.
+ */
+
+ spin_lock(&qd->qd_lockref.lock);
+
+ x = 0;
+ if (test_bit(QDF_CHANGE, &qd->qd_flags))
+ x = be64_to_cpu(qc->qc_change);
+ x += change;
+ qd->qd_change += change;
+
+ if (!x && test_bit(QDF_CHANGE, &qd->qd_flags)) {
+ /* The slot in the quota change file becomes unused. */
+ clear_bit(QDF_CHANGE, &qd->qd_flags);
+ qc->qc_flags = 0;
+ qc->qc_id = 0;
+ needs_put = true;
+ } else if (x && !test_bit(QDF_CHANGE, &qd->qd_flags)) {
+ /* The slot in the quota change file becomes used. */
+ set_bit(QDF_CHANGE, &qd->qd_flags);
+ __qd_hold(qd);
+ slot_hold(qd);
+
qc->qc_flags = 0;
if (qd->qd_id.type == USRQUOTA)
qc->qc_flags = cpu_to_be32(GFS2_QCF_USER);
qc->qc_id = cpu_to_be32(from_kqid(&init_user_ns, qd->qd_id));
}
-
- x = be64_to_cpu(qc->qc_change) + change;
qc->qc_change = cpu_to_be64(x);
- spin_lock(&qd_lock);
- qd->qd_change = x;
- spin_unlock(&qd_lock);
+ spin_unlock(&qd->qd_lockref.lock);
- if (qc_type == QC_CHANGE) {
- if (!test_and_set_bit(QDF_CHANGE, &qd->qd_flags)) {
- qd_hold(qd);
- slot_hold(qd);
- }
- } else {
- gfs2_assert_warn(sdp, test_bit(QDF_CHANGE, &qd->qd_flags));
- clear_bit(QDF_CHANGE, &qd->qd_flags);
- qc->qc_flags = 0;
- qc->qc_id = 0;
+ if (needs_put) {
slot_put(qd);
qd_put(qd);
}
-
if (change < 0) /* Reset quiet flag if we freed some blocks */
clear_bit(QDF_QMSG_QUIET, &qd->qd_flags);
- mutex_unlock(&sdp->sd_quota_mutex);
}
static int gfs2_write_buf_to_page(struct gfs2_sbd *sdp, unsigned long index,
@@ -890,6 +860,7 @@ static int gfs2_adjust_quota(struct gfs2_sbd *sdp, loff_t loc,
be64_add_cpu(&q.qu_value, change);
if (((s64)be64_to_cpu(q.qu_value)) < 0)
q.qu_value = 0; /* Never go negative on quota usage */
+ spin_lock(&qd->qd_lockref.lock);
qd->qd_qb.qb_value = q.qu_value;
if (fdq) {
if (fdq->d_fieldmask & QC_SPC_SOFT) {
@@ -905,6 +876,7 @@ static int gfs2_adjust_quota(struct gfs2_sbd *sdp, loff_t loc,
qd->qd_qb.qb_value = q.qu_value;
}
}
+ spin_unlock(&qd->qd_lockref.lock);
err = gfs2_write_disk_quota(sdp, &q, loc);
if (!err) {
@@ -919,7 +891,8 @@ static int gfs2_adjust_quota(struct gfs2_sbd *sdp, loff_t loc,
return err;
}
-static int do_sync(unsigned int num_qd, struct gfs2_quota_data **qda)
+static int do_sync(unsigned int num_qd, struct gfs2_quota_data **qda,
+ u64 sync_gen)
{
struct gfs2_sbd *sdp = (*qda)->qd_sbd;
struct gfs2_inode *ip = GFS2_I(sdp->sd_quota_inode);
@@ -992,7 +965,7 @@ static int do_sync(unsigned int num_qd, struct gfs2_quota_data **qda)
if (error)
goto out_end_trans;
- do_qc(qd, -qd->qd_change_sync, QC_SYNC);
+ do_qc(qd, -qd->qd_change_sync);
set_bit(QDF_REFRESH, &qd->qd_flags);
}
@@ -1010,8 +983,13 @@ out_dq:
gfs2_log_flush(ip->i_gl->gl_name.ln_sbd, ip->i_gl,
GFS2_LOG_HEAD_FLUSH_NORMAL | GFS2_LFC_DO_SYNC);
if (!error) {
- for (x = 0; x < num_qd; x++)
- qda[x]->qd_sync_gen = sdp->sd_quota_sync_gen;
+ for (x = 0; x < num_qd; x++) {
+ qd = qda[x];
+ spin_lock(&qd->qd_lockref.lock);
+ if (qd->qd_sync_gen < sync_gen)
+ qd->qd_sync_gen = sync_gen;
+ spin_unlock(&qd->qd_lockref.lock);
+ }
}
return error;
}
@@ -1036,7 +1014,9 @@ static int update_qd(struct gfs2_sbd *sdp, struct gfs2_quota_data *qd)
qlvb->qb_limit = q.qu_limit;
qlvb->qb_warn = q.qu_warn;
qlvb->qb_value = q.qu_value;
+ spin_lock(&qd->qd_lockref.lock);
qd->qd_qb = *qlvb;
+ spin_unlock(&qd->qd_lockref.lock);
return 0;
}
@@ -1058,7 +1038,9 @@ restart:
if (test_and_clear_bit(QDF_REFRESH, &qd->qd_flags))
force_refresh = FORCE;
+ spin_lock(&qd->qd_lockref.lock);
qd->qd_qb = *(struct gfs2_quota_lvb *)qd->qd_gl->gl_lksb.sb_lvbptr;
+ spin_unlock(&qd->qd_lockref.lock);
if (force_refresh || qd->qd_qb.qb_magic != cpu_to_be32(GFS2_MAGIC)) {
gfs2_glock_dq_uninit(q_gh);
@@ -1129,35 +1111,36 @@ static bool need_sync(struct gfs2_quota_data *qd)
{
struct gfs2_sbd *sdp = qd->qd_sbd;
struct gfs2_tune *gt = &sdp->sd_tune;
- s64 value;
+ s64 value, change, limit;
unsigned int num, den;
+ int ret = false;
+ spin_lock(&qd->qd_lockref.lock);
if (!qd->qd_qb.qb_limit)
- return false;
+ goto out;
- spin_lock(&qd_lock);
- value = qd->qd_change;
- spin_unlock(&qd_lock);
+ change = qd->qd_change;
+ if (change <= 0)
+ goto out;
+ value = (s64)be64_to_cpu(qd->qd_qb.qb_value);
+ limit = (s64)be64_to_cpu(qd->qd_qb.qb_limit);
+ if (value >= limit)
+ goto out;
spin_lock(&gt->gt_spin);
num = gt->gt_quota_scale_num;
den = gt->gt_quota_scale_den;
spin_unlock(&gt->gt_spin);
- if (value <= 0)
- return false;
- else if ((s64)be64_to_cpu(qd->qd_qb.qb_value) >=
- (s64)be64_to_cpu(qd->qd_qb.qb_limit))
- return false;
- else {
- value *= gfs2_jindex_size(sdp) * num;
- value = div_s64(value, den);
- value += (s64)be64_to_cpu(qd->qd_qb.qb_value);
- if (value < (s64)be64_to_cpu(qd->qd_qb.qb_limit))
- return false;
- }
+ change *= gfs2_jindex_size(sdp) * num;
+ change = div_s64(change, den);
+ if (value + change < limit)
+ goto out;
- return true;
+ ret = true;
+out:
+ spin_unlock(&qd->qd_lockref.lock);
+ return ret;
}
void gfs2_quota_unlock(struct gfs2_inode *ip)
@@ -1166,7 +1149,6 @@ void gfs2_quota_unlock(struct gfs2_inode *ip)
struct gfs2_quota_data *qda[2 * GFS2_MAXQUOTAS];
unsigned int count = 0;
u32 x;
- int found;
if (!test_and_clear_bit(GIF_QD_LOCKED, &ip->i_flags))
return;
@@ -1174,6 +1156,7 @@ void gfs2_quota_unlock(struct gfs2_inode *ip)
for (x = 0; x < ip->i_qadata->qa_qd_num; x++) {
struct gfs2_quota_data *qd;
bool sync;
+ int error;
qd = ip->i_qadata->qa_qd[x];
sync = need_sync(qd);
@@ -1183,18 +1166,26 @@ void gfs2_quota_unlock(struct gfs2_inode *ip)
continue;
spin_lock(&qd_lock);
- found = qd_check_sync(sdp, qd, NULL);
+ sync = qd_grab_sync(sdp, qd, U64_MAX);
spin_unlock(&qd_lock);
- if (!found)
+ if (!sync)
continue;
- if (!qd_bh_get_or_undo(sdp, qd))
- qda[count++] = qd;
+ gfs2_assert_warn(sdp, qd->qd_change_sync);
+ error = bh_get(qd);
+ if (error) {
+ qd_ungrab_sync(qd);
+ continue;
+ }
+
+ qda[count++] = qd;
}
if (count) {
- do_sync(count, qda);
+ u64 sync_gen = READ_ONCE(sdp->sd_quota_sync_gen);
+
+ do_sync(count, qda, sync_gen);
for (x = 0; x < count; x++)
qd_unlock(qda[x]);
}
@@ -1253,12 +1244,12 @@ int gfs2_quota_check(struct gfs2_inode *ip, kuid_t uid, kgid_t gid,
qid_eq(qd->qd_id, make_kqid_gid(gid))))
continue;
+ spin_lock(&qd->qd_lockref.lock);
warn = (s64)be64_to_cpu(qd->qd_qb.qb_warn);
limit = (s64)be64_to_cpu(qd->qd_qb.qb_limit);
value = (s64)be64_to_cpu(qd->qd_qb.qb_value);
- spin_lock(&qd_lock);
value += qd->qd_change;
- spin_unlock(&qd_lock);
+ spin_unlock(&qd->qd_lockref.lock);
if (limit > 0 && (limit - value) < ap->allowed)
ap->allowed = limit - value;
@@ -1312,39 +1303,20 @@ void gfs2_quota_change(struct gfs2_inode *ip, s64 change,
if (qid_eq(qd->qd_id, make_kqid_uid(uid)) ||
qid_eq(qd->qd_id, make_kqid_gid(gid))) {
- do_qc(qd, change, QC_CHANGE);
+ do_qc(qd, change);
}
}
}
-static bool qd_changed(struct gfs2_sbd *sdp)
-{
- struct gfs2_quota_data *qd;
- bool changed = false;
-
- spin_lock(&qd_lock);
- list_for_each_entry(qd, &sdp->sd_quota_list, qd_list) {
- if (test_bit(QDF_LOCKED, &qd->qd_flags) ||
- !test_bit(QDF_CHANGE, &qd->qd_flags))
- continue;
-
- changed = true;
- break;
- }
- spin_unlock(&qd_lock);
- return changed;
-}
-
int gfs2_quota_sync(struct super_block *sb, int type)
{
struct gfs2_sbd *sdp = sb->s_fs_info;
struct gfs2_quota_data **qda;
unsigned int max_qd = PAGE_SIZE / sizeof(struct gfs2_holder);
- unsigned int num_qd;
- unsigned int x;
+ u64 sync_gen;
int error = 0;
- if (!qd_changed(sdp))
+ if (sb_rdonly(sdp->sd_vfs))
return 0;
qda = kcalloc(max_qd, sizeof(struct gfs2_quota_data *), GFP_KERNEL);
@@ -1352,27 +1324,44 @@ int gfs2_quota_sync(struct super_block *sb, int type)
return -ENOMEM;
mutex_lock(&sdp->sd_quota_sync_mutex);
- sdp->sd_quota_sync_gen++;
+ sync_gen = sdp->sd_quota_sync_gen + 1;
do {
- num_qd = 0;
+ struct gfs2_quota_data *iter;
+ unsigned int num_qd = 0;
+ unsigned int x;
- for (;;) {
- error = qd_fish(sdp, qda + num_qd);
- if (error || !qda[num_qd])
- break;
- if (++num_qd == max_qd)
- break;
+ spin_lock(&qd_lock);
+ list_for_each_entry(iter, &sdp->sd_quota_list, qd_list) {
+ if (qd_grab_sync(sdp, iter, sync_gen)) {
+ qda[num_qd++] = iter;
+ if (num_qd == max_qd)
+ break;
+ }
}
+ spin_unlock(&qd_lock);
- if (num_qd) {
+ if (!num_qd)
+ break;
+
+ for (x = 0; x < num_qd; x++) {
+ error = bh_get(qda[x]);
if (!error)
- error = do_sync(num_qd, qda);
+ continue;
+
+ while (x < num_qd)
+ qd_ungrab_sync(qda[--num_qd]);
+ break;
+ }
- for (x = 0; x < num_qd; x++)
- qd_unlock(qda[x]);
+ if (!error) {
+ WRITE_ONCE(sdp->sd_quota_sync_gen, sync_gen);
+ error = do_sync(num_qd, qda, sync_gen);
}
- } while (!error && num_qd == max_qd);
+
+ for (x = 0; x < num_qd; x++)
+ qd_unlock(qda[x]);
+ } while (!error);
mutex_unlock(&sdp->sd_quota_sync_mutex);
kfree(qda);
@@ -1407,6 +1396,7 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
unsigned int found = 0;
unsigned int hash;
unsigned int bm_size;
+ struct buffer_head *bh;
u64 dblock;
u32 extlen = 0;
int error;
@@ -1426,8 +1416,7 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
return error;
for (x = 0; x < blocks; x++) {
- struct buffer_head *bh;
- const struct gfs2_quota_change *qc;
+ struct gfs2_quota_change *qc;
unsigned int y;
if (!extlen) {
@@ -1440,15 +1429,13 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
bh = gfs2_meta_ra(ip->i_gl, dblock, extlen);
if (!bh)
goto fail;
- if (gfs2_metatype_check(sdp, bh, GFS2_METATYPE_QC)) {
- brelse(bh);
- goto fail;
- }
+ if (gfs2_metatype_check(sdp, bh, GFS2_METATYPE_QC))
+ goto fail_brelse;
- qc = (const struct gfs2_quota_change *)(bh->b_data + sizeof(struct gfs2_meta_header));
+ qc = (struct gfs2_quota_change *)(bh->b_data + sizeof(struct gfs2_meta_header));
for (y = 0; y < sdp->sd_qc_per_block && slot < sdp->sd_quota_slots;
y++, slot++) {
- struct gfs2_quota_data *qd;
+ struct gfs2_quota_data *old_qd, *qd;
s64 qc_change = be64_to_cpu(qc->qc_change);
u32 qc_flags = be32_to_cpu(qc->qc_flags);
enum quota_type qtype = (qc_flags & GFS2_QCF_USER) ?
@@ -1461,10 +1448,8 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
hash = gfs2_qd_hash(sdp, qc_id);
qd = qd_alloc(hash, sdp, qc_id);
- if (qd == NULL) {
- brelse(bh);
- goto fail;
- }
+ if (qd == NULL)
+ goto fail_brelse;
set_bit(QDF_CHANGE, &qd->qd_flags);
qd->qd_change = qc_change;
@@ -1472,18 +1457,41 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
qd->qd_slot_ref = 1;
spin_lock(&qd_lock);
+ spin_lock_bucket(hash);
+ old_qd = gfs2_qd_search_bucket(hash, sdp, qc_id);
+ if (old_qd) {
+ fs_err(sdp, "Corruption found in quota_change%u"
+ "file: duplicate identifier in "
+ "slot %u\n",
+ sdp->sd_jdesc->jd_jid, slot);
+
+ spin_unlock_bucket(hash);
+ spin_unlock(&qd_lock);
+ qd_put(old_qd);
+
+ gfs2_glock_put(qd->qd_gl);
+ kmem_cache_free(gfs2_quotad_cachep, qd);
+
+ /* zero out the duplicate slot */
+ lock_buffer(bh);
+ memset(qc, 0, sizeof(*qc));
+ mark_buffer_dirty(bh);
+ unlock_buffer(bh);
+
+ continue;
+ }
BUG_ON(test_and_set_bit(slot, sdp->sd_quota_bitmap));
list_add(&qd->qd_list, &sdp->sd_quota_list);
atomic_inc(&sdp->sd_quota_count);
- spin_unlock(&qd_lock);
-
- spin_lock_bucket(hash);
hlist_bl_add_head_rcu(&qd->qd_hlist, &qd_hash_table[hash]);
spin_unlock_bucket(hash);
+ spin_unlock(&qd_lock);
found++;
}
+ if (buffer_dirty(bh))
+ sync_dirty_buffer(bh);
brelse(bh);
dblock++;
extlen--;
@@ -1494,6 +1502,10 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
return 0;
+fail_brelse:
+ if (buffer_dirty(bh))
+ sync_dirty_buffer(bh);
+ brelse(bh);
fail:
gfs2_quota_cleanup(sdp);
return error;
diff --git a/fs/gfs2/super.c b/fs/gfs2/super.c
index 7a5aedfcd52a..6678060ed4d2 100644
--- a/fs/gfs2/super.c
+++ b/fs/gfs2/super.c
@@ -1524,7 +1524,6 @@ out:
if (ip->i_gl) {
glock_clear_object(ip->i_gl, ip);
wait_on_bit_io(&ip->i_flags, GIF_GLOP_PENDING, TASK_UNINTERRUPTIBLE);
- gfs2_glock_add_to_lru(ip->i_gl);
gfs2_glock_put_eventually(ip->i_gl);
rcu_assign_pointer(ip->i_gl, NULL);
}
diff --git a/fs/gfs2/trace_gfs2.h b/fs/gfs2/trace_gfs2.h
index a5deb9f86831..8eae8d62a413 100644
--- a/fs/gfs2/trace_gfs2.h
+++ b/fs/gfs2/trace_gfs2.h
@@ -53,9 +53,9 @@
{(1UL << GLF_DIRTY), "y" }, \
{(1UL << GLF_LFLUSH), "f" }, \
{(1UL << GLF_INVALIDATE_IN_PROGRESS), "i" }, \
- {(1UL << GLF_REPLY_PENDING), "r" }, \
- {(1UL << GLF_INITIAL), "I" }, \
- {(1UL << GLF_FROZEN), "F" }, \
+ {(1UL << GLF_HAVE_REPLY), "r" }, \
+ {(1UL << GLF_INITIAL), "a" }, \
+ {(1UL << GLF_HAVE_FROZEN_REPLY), "F" }, \
{(1UL << GLF_LRU), "L" }, \
{(1UL << GLF_OBJECT), "o" }, \
{(1UL << GLF_BLOCKING), "b" })
diff --git a/fs/gfs2/util.c b/fs/gfs2/util.c
index af4758d8d894..13be8d1d228b 100644
--- a/fs/gfs2/util.c
+++ b/fs/gfs2/util.c
@@ -99,12 +99,12 @@ out_unlock:
*/
int gfs2_freeze_lock_shared(struct gfs2_sbd *sdp)
{
+ int flags = LM_FLAG_NOEXP | GL_EXACT;
int error;
- error = gfs2_glock_nq_init(sdp->sd_freeze_gl, LM_ST_SHARED,
- LM_FLAG_NOEXP | GL_EXACT,
+ error = gfs2_glock_nq_init(sdp->sd_freeze_gl, LM_ST_SHARED, flags,
&sdp->sd_freeze_gh);
- if (error)
+ if (error && error != GLR_TRYFAILED)
fs_err(sdp, "can't lock the freeze glock: %d\n", error);
return error;
}
@@ -206,9 +206,9 @@ static void signal_our_withdraw(struct gfs2_sbd *sdp)
* on other nodes to be successful, otherwise we remain the owner of
* the glock as far as dlm is concerned.
*/
- if (i_gl->gl_ops->go_free) {
- set_bit(GLF_FREEING, &i_gl->gl_flags);
- wait_on_bit(&i_gl->gl_flags, GLF_FREEING, TASK_UNINTERRUPTIBLE);
+ if (i_gl->gl_ops->go_unlocked) {
+ set_bit(GLF_UNLOCKED, &i_gl->gl_flags);
+ wait_on_bit(&i_gl->gl_flags, GLF_UNLOCKED, TASK_UNINTERRUPTIBLE);
}
/*