Commit 6706415b authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'gfs2-v6.10-rc1-fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/gfs2/linux-gfs2

Pull gfs2 updates from Andreas Gruenbacher:
 "Fixes and cleanups:

   - Revise the glock reference counting model and LRU list handling to
     be more sensible

   - Several quota related fixes: clean up the quota code, add some
     missing locking, and work around the on-disk corruption that the
     reverted patch "gfs2: ignore negated quota changes" causes

   - Clean up the glock demote logic in glock_work_func()"

* tag 'gfs2-v6.10-rc1-fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/gfs2/linux-gfs2: (29 commits)
  gfs2: Clean up glock demote logic
  gfs2: Revert "check for no eligible quota changes"
  gfs2: Be more careful with the quota sync generation
  gfs2: Get rid of some unnecessary quota locking
  gfs2: Add some missing quota locking
  gfs2: Fold qd_fish into gfs2_quota_sync
  gfs2: quota need_sync cleanup
  gfs2: Fix and clean up function do_qc
  gfs2: Revert "Add quota_change type"
  gfs2: Revert "ignore negated quota changes"
  gfs2: qd_check_sync cleanups
  gfs2: Revert "introduce qd_bh_get_or_undo"
  gfs2: Check quota consistency on mount
  gfs2: Minor gfs2_quota_init error path cleanup
  gfs2: Get rid of demote_ok checks
  Revert "GFS2: Don't add all glocks to the lru"
  gfs2: Revise glock reference counting model
  gfs2: Switch to a per-filesystem glock workqueue
  gfs2: Report when glocks cannot be freed for a long time
  gfs2: gfs2_glock_get cleanup
  ...
parents f097ef0e f75efefb
......@@ -40,14 +40,14 @@ shared lock mode, SH. In GFS2 the DF mode is used exclusively for direct I/O
operations. The glocks are basically a lock plus some routines which deal
with cache management. The following rules apply for the cache:
========== ========== ============== ========== ==============
Glock mode Cache data Cache Metadata Dirty Data Dirty Metadata
========== ========== ============== ========== ==============
========== ============== ========== ========== ==============
Glock mode Cache Metadata Cache data Dirty Data Dirty Metadata
========== ============== ========== ========== ==============
UN No No No No
DF Yes No No No
SH Yes Yes No No
DF No Yes No No
EX Yes Yes Yes Yes
========== ========== ============== ========== ==============
========== ============== ========== ========== ==============
These rules are implemented using the various glock operations which
are defined for each type of glock. Not all types of glocks use
......@@ -55,23 +55,22 @@ all the modes. Only inode glocks use the DF mode for example.
Table of glock operations and per type constants:
============= =============================================================
============== =============================================================
Field Purpose
============= =============================================================
go_xmote_th Called before remote state change (e.g. to sync dirty data)
============== =============================================================
go_sync Called before remote state change (e.g. to sync dirty data)
go_xmote_bh Called after remote state change (e.g. to refill cache)
go_inval Called if remote state change requires invalidating the cache
go_demote_ok Returns boolean value of whether its ok to demote a glock
(e.g. checks timeout, and that there is no cached data)
go_lock Called for the first local holder of a lock
go_unlock Called on the final local unlock of a lock
go_instantiate Called when a glock has been acquired
go_held Called every time a glock holder is acquired
go_dump Called to print content of object for debugfs file, or on
error to dump glock to the log.
go_type The type of the glock, ``LM_TYPE_*``
go_callback Called if the DLM sends a callback to drop this lock
go_unlocked Called when a glock is unlocked (dlm_unlock())
go_type The type of the glock, ``LM_TYPE_*``
go_flags GLOF_ASPACE is set, if the glock has an address space
associated with it
============= =============================================================
============== =============================================================
The minimum hold time for each lock is the time after a remote lock
grant for which we ignore remote demote requests. This is in order to
......@@ -82,26 +81,24 @@ to by multiple nodes. By delaying the demotion in response to a
remote callback, that gives the userspace program time to make
some progress before the pages are unmapped.
There is a plan to try and remove the go_lock and go_unlock callbacks
if possible, in order to try and speed up the fast path though the locking.
Also, eventually we hope to make the glock "EX" mode locally shared
such that any local locking will be done with the i_mutex as required
rather than via the glock.
Eventually, we hope to make the glock "EX" mode locally shared such that any
local locking will be done with the i_mutex as required rather than via the
glock.
Locking rules for glock operations:
============= ====================== =============================
============== ====================== =============================
Operation GLF_LOCK bit lock held gl_lockref.lock spinlock held
============= ====================== =============================
go_xmote_th Yes No
============== ====================== =============================
go_sync Yes No
go_xmote_bh Yes No
go_inval Yes No
go_demote_ok Sometimes Yes
go_lock Yes No
go_unlock Yes No
go_instantiate No No
go_held No No
go_dump Sometimes Yes
go_callback Sometimes (N/A) Yes
============= ====================== =============================
go_unlocked Yes No
============== ====================== =============================
.. Note::
......
......@@ -61,12 +61,10 @@ struct gfs2_glock_iter {
typedef void (*glock_examiner) (struct gfs2_glock * gl);
static void do_xmote(struct gfs2_glock *gl, struct gfs2_holder *gh, unsigned int target);
static void __gfs2_glock_dq(struct gfs2_holder *gh);
static void handle_callback(struct gfs2_glock *gl, unsigned int state,
static void request_demote(struct gfs2_glock *gl, unsigned int state,
unsigned long delay, bool remote);
static struct dentry *gfs2_root;
static struct workqueue_struct *glock_workqueue;
static LIST_HEAD(lru_list);
static atomic_t lru_count = ATOMIC_INIT(0);
static DEFINE_SPINLOCK(lru_lock);
......@@ -218,34 +216,9 @@ struct gfs2_glock *gfs2_glock_hold(struct gfs2_glock *gl)
return gl;
}
/**
* demote_ok - Check to see if it's ok to unlock a glock
* @gl: the glock
*
* Returns: 1 if it's ok
*/
static int demote_ok(const struct gfs2_glock *gl)
static void gfs2_glock_add_to_lru(struct gfs2_glock *gl)
{
const struct gfs2_glock_operations *glops = gl->gl_ops;
if (gl->gl_state == LM_ST_UNLOCKED)
return 0;
if (!list_empty(&gl->gl_holders))
return 0;
if (glops->go_demote_ok)
return glops->go_demote_ok(gl);
return 1;
}
void gfs2_glock_add_to_lru(struct gfs2_glock *gl)
{
if (!(gl->gl_ops->go_flags & GLOF_LRU))
return;
spin_lock(&lru_lock);
list_move_tail(&gl->gl_lru, &lru_list);
if (!test_bit(GLF_LRU, &gl->gl_flags)) {
......@@ -258,9 +231,6 @@ void gfs2_glock_add_to_lru(struct gfs2_glock *gl)
static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl)
{
if (!(gl->gl_ops->go_flags & GLOF_LRU))
return;
spin_lock(&lru_lock);
if (test_bit(GLF_LRU, &gl->gl_flags)) {
list_del_init(&gl->gl_lru);
......@@ -275,7 +245,9 @@ static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl)
* work queue.
*/
static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) {
if (!queue_delayed_work(glock_workqueue, &gl->gl_work, delay)) {
struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
if (!queue_delayed_work(sdp->sd_glock_wq, &gl->gl_work, delay)) {
/*
* We are holding the lockref spinlock, and the work was still
* queued above. The queued work (glock_work_func) takes that
......@@ -305,6 +277,20 @@ static void __gfs2_glock_put(struct gfs2_glock *gl)
sdp->sd_lockstruct.ls_ops->lm_put_lock(gl);
}
static bool __gfs2_glock_put_or_lock(struct gfs2_glock *gl)
{
if (lockref_put_or_lock(&gl->gl_lockref))
return true;
GLOCK_BUG_ON(gl, gl->gl_lockref.count != 1);
if (gl->gl_state != LM_ST_UNLOCKED) {
gl->gl_lockref.count--;
gfs2_glock_add_to_lru(gl);
spin_unlock(&gl->gl_lockref.lock);
return true;
}
return false;
}
/**
* gfs2_glock_put() - Decrement reference count on glock
* @gl: The glock to put
......@@ -313,7 +299,7 @@ static void __gfs2_glock_put(struct gfs2_glock *gl)
void gfs2_glock_put(struct gfs2_glock *gl)
{
if (lockref_put_or_lock(&gl->gl_lockref))
if (__gfs2_glock_put_or_lock(gl))
return;
__gfs2_glock_put(gl);
......@@ -328,10 +314,9 @@ void gfs2_glock_put(struct gfs2_glock *gl)
*/
void gfs2_glock_put_async(struct gfs2_glock *gl)
{
if (lockref_put_or_lock(&gl->gl_lockref))
if (__gfs2_glock_put_or_lock(gl))
return;
GLOCK_BUG_ON(gl, gl->gl_lockref.count != 1);
gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
}
......@@ -570,18 +555,6 @@ static inline struct gfs2_holder *find_last_waiter(const struct gfs2_glock *gl)
static void state_change(struct gfs2_glock *gl, unsigned int new_state)
{
int held1, held2;
held1 = (gl->gl_state != LM_ST_UNLOCKED);
held2 = (new_state != LM_ST_UNLOCKED);
if (held1 != held2) {
GLOCK_BUG_ON(gl, __lockref_is_dead(&gl->gl_lockref));
if (held2)
gl->gl_lockref.count++;
else
gl->gl_lockref.count--;
}
if (new_state != gl->gl_target)
/* shorten our minimum hold time */
gl->gl_hold_time = max(gl->gl_hold_time - GL_GLOCK_HOLD_DECR,
......@@ -812,7 +785,7 @@ __acquires(&gl->gl_lockref.lock)
(target != LM_ST_UNLOCKED ||
test_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags))) {
if (!is_system_glock(gl)) {
handle_callback(gl, LM_ST_UNLOCKED, 0, false); /* sets demote */
request_demote(gl, LM_ST_UNLOCKED, 0, false);
/*
* Ordinarily, we would call dlm and its callback would call
* finish_xmote, which would call state_change() to the new state.
......@@ -910,7 +883,6 @@ __acquires(&gl->gl_lockref.lock)
out_unlock:
clear_bit(GLF_LOCK, &gl->gl_flags);
smp_mb__after_atomic();
return;
}
/**
......@@ -1111,19 +1083,21 @@ static void glock_work_func(struct work_struct *work)
unsigned int drop_refs = 1;
spin_lock(&gl->gl_lockref.lock);
if (test_bit(GLF_REPLY_PENDING, &gl->gl_flags)) {
clear_bit(GLF_REPLY_PENDING, &gl->gl_flags);
if (test_bit(GLF_HAVE_REPLY, &gl->gl_flags)) {
clear_bit(GLF_HAVE_REPLY, &gl->gl_flags);
finish_xmote(gl, gl->gl_reply);
drop_refs++;
}
if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
gl->gl_state != LM_ST_UNLOCKED &&
gl->gl_demote_state != LM_ST_EXCLUSIVE) {
if (gl->gl_name.ln_type == LM_TYPE_INODE) {
unsigned long holdtime, now = jiffies;
holdtime = gl->gl_tchange + gl->gl_hold_time;
if (time_before(now, holdtime))
delay = holdtime - now;
}
if (!delay) {
clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags);
......@@ -1134,21 +1108,19 @@ static void glock_work_func(struct work_struct *work)
if (delay) {
/* Keep one glock reference for the work we requeue. */
drop_refs--;
if (gl->gl_name.ln_type != LM_TYPE_INODE)
delay = 0;
gfs2_glock_queue_work(gl, delay);
}
/*
* Drop the remaining glock references manually here. (Mind that
* gfs2_glock_queue_work depends on the lockref spinlock begin held
* here as well.)
*/
/* Drop the remaining glock references manually. */
GLOCK_BUG_ON(gl, gl->gl_lockref.count < drop_refs);
gl->gl_lockref.count -= drop_refs;
if (!gl->gl_lockref.count) {
if (gl->gl_state == LM_ST_UNLOCKED) {
__gfs2_glock_put(gl);
return;
}
gfs2_glock_add_to_lru(gl);
}
spin_unlock(&gl->gl_lockref.lock);
}
......@@ -1183,6 +1155,8 @@ static struct gfs2_glock *find_insert_glock(struct lm_lockname *name,
out:
rcu_read_unlock();
finish_wait(wq, &wait.wait);
if (gl)
gfs2_glock_remove_from_lru(gl);
return gl;
}
......@@ -1209,13 +1183,10 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
.ln_sbd = sdp };
struct gfs2_glock *gl, *tmp;
struct address_space *mapping;
int ret = 0;
gl = find_insert_glock(&name, NULL);
if (gl) {
*glp = gl;
return 0;
}
if (gl)
goto found;
if (!create)
return -ENOENT;
......@@ -1243,7 +1214,9 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
atomic_inc(&sdp->sd_glock_disposal);
gl->gl_node.next = NULL;
gl->gl_flags = glops->go_instantiate ? BIT(GLF_INSTANTIATE_NEEDED) : 0;
gl->gl_flags = BIT(GLF_INITIAL);
if (glops->go_instantiate)
gl->gl_flags |= BIT(GLF_INSTANTIATE_NEEDED);
gl->gl_name = name;
lockdep_set_subclass(&gl->gl_lockref.lock, glops->go_subclass);
gl->gl_lockref.count = 1;
......@@ -1275,23 +1248,19 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
}
tmp = find_insert_glock(&name, gl);
if (!tmp) {
*glp = gl;
goto out;
}
if (IS_ERR(tmp)) {
ret = PTR_ERR(tmp);
goto out_free;
}
*glp = tmp;
out_free:
if (tmp) {
gfs2_glock_dealloc(&gl->gl_rcu);
if (atomic_dec_and_test(&sdp->sd_glock_disposal))
wake_up(&sdp->sd_kill_wait);
out:
return ret;
if (IS_ERR(tmp))
return PTR_ERR(tmp);
gl = tmp;
}
found:
*glp = gl;
return 0;
}
/**
......@@ -1461,7 +1430,7 @@ int gfs2_glock_async_wait(unsigned int num_gh, struct gfs2_holder *ghs)
}
/**
* handle_callback - process a demote request
* request_demote - process a demote request
* @gl: the glock
* @state: the state the caller wants us to change to
* @delay: zero to demote immediately; otherwise pending demote
......@@ -1471,7 +1440,7 @@ int gfs2_glock_async_wait(unsigned int num_gh, struct gfs2_holder *ghs)
* practise: LM_ST_SHARED and LM_ST_UNLOCKED
*/
static void handle_callback(struct gfs2_glock *gl, unsigned int state,
static void request_demote(struct gfs2_glock *gl, unsigned int state,
unsigned long delay, bool remote)
{
if (delay)
......@@ -1636,15 +1605,12 @@ int gfs2_glock_nq(struct gfs2_holder *gh)
return error;
}
if (test_bit(GLF_LRU, &gl->gl_flags))
gfs2_glock_remove_from_lru(gl);
gh->gh_error = 0;
spin_lock(&gl->gl_lockref.lock);
add_to_queue(gh);
if (unlikely((LM_FLAG_NOEXP & gh->gh_flags) &&
test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))) {
set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
test_and_clear_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags))) {
set_bit(GLF_HAVE_REPLY, &gl->gl_flags);
gl->gl_lockref.count++;
gfs2_glock_queue_work(gl, 0);
}
......@@ -1688,7 +1654,7 @@ static void __gfs2_glock_dq(struct gfs2_holder *gh)
* below.
*/
if (gh->gh_flags & GL_NOCACHE)
handle_callback(gl, LM_ST_UNLOCKED, 0, false);
request_demote(gl, LM_ST_UNLOCKED, 0, false);
list_del_init(&gh->gh_list);
clear_bit(HIF_HOLDER, &gh->gh_iflags);
......@@ -1703,9 +1669,6 @@ static void __gfs2_glock_dq(struct gfs2_holder *gh)
fast_path = 1;
}
if (!test_bit(GLF_LFLUSH, &gl->gl_flags) && demote_ok(gl))
gfs2_glock_add_to_lru(gl);
if (unlikely(!fast_path)) {
gl->gl_lockref.count++;
if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
......@@ -1932,10 +1895,10 @@ void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state)
gl->gl_name.ln_type == LM_TYPE_INODE) {
if (time_before(now, holdtime))
delay = holdtime - now;
if (test_bit(GLF_REPLY_PENDING, &gl->gl_flags))
if (test_bit(GLF_HAVE_REPLY, &gl->gl_flags))
delay = gl->gl_hold_time;
}
handle_callback(gl, state, delay, true);
request_demote(gl, state, delay, true);
gfs2_glock_queue_work(gl, delay);
spin_unlock(&gl->gl_lockref.lock);
}
......@@ -1988,14 +1951,14 @@ void gfs2_glock_complete(struct gfs2_glock *gl, int ret)
if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags))) {
if (gfs2_should_freeze(gl)) {
set_bit(GLF_FROZEN, &gl->gl_flags);
set_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags);
spin_unlock(&gl->gl_lockref.lock);
return;
}
}
gl->gl_lockref.count++;
set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
set_bit(GLF_HAVE_REPLY, &gl->gl_flags);
gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
}
......@@ -2018,10 +1981,12 @@ static int glock_cmp(void *priv, const struct list_head *a,
static bool can_free_glock(struct gfs2_glock *gl)
{
bool held = gl->gl_state != LM_ST_UNLOCKED;
struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
return !test_bit(GLF_LOCK, &gl->gl_flags) &&
gl->gl_lockref.count == held;
!gl->gl_lockref.count &&
(!test_bit(GLF_LFLUSH, &gl->gl_flags) ||
test_bit(SDF_KILL, &sdp->sd_flags));
}
/**
......@@ -2063,8 +2028,8 @@ __acquires(&lru_lock)
clear_bit(GLF_LRU, &gl->gl_flags);
freed++;
gl->gl_lockref.count++;
if (demote_ok(gl))
handle_callback(gl, LM_ST_UNLOCKED, 0, false);
if (gl->gl_state != LM_ST_UNLOCKED)
request_demote(gl, LM_ST_UNLOCKED, 0, false);
gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
cond_resched_lock(&lru_lock);
......@@ -2182,13 +2147,14 @@ void gfs2_flush_delete_work(struct gfs2_sbd *sdp)
static void thaw_glock(struct gfs2_glock *gl)
{
if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))
if (!test_and_clear_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags))
return;
if (!lockref_get_not_dead(&gl->gl_lockref))
return;
gfs2_glock_remove_from_lru(gl);
spin_lock(&gl->gl_lockref.lock);
set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
set_bit(GLF_HAVE_REPLY, &gl->gl_flags);
gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
}
......@@ -2207,7 +2173,7 @@ static void clear_glock(struct gfs2_glock *gl)
if (!__lockref_is_dead(&gl->gl_lockref)) {
gl->gl_lockref.count++;
if (gl->gl_state != LM_ST_UNLOCKED)
handle_callback(gl, LM_ST_UNLOCKED, 0, false);
request_demote(gl, LM_ST_UNLOCKED, 0, false);
gfs2_glock_queue_work(gl, 0);
}
spin_unlock(&gl->gl_lockref.lock);
......@@ -2259,16 +2225,30 @@ void gfs2_gl_dq_holders(struct gfs2_sbd *sdp)
void gfs2_gl_hash_clear(struct gfs2_sbd *sdp)
{
unsigned long start = jiffies;
bool timed_out = false;
set_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags);
flush_workqueue(glock_workqueue);
flush_workqueue(sdp->sd_glock_wq);
glock_hash_walk(clear_glock, sdp);
flush_workqueue(glock_workqueue);
flush_workqueue(sdp->sd_glock_wq);
while (!timed_out) {
wait_event_timeout(sdp->sd_kill_wait,
atomic_read(&sdp->sd_glock_disposal) == 0,
HZ * 600);
!atomic_read(&sdp->sd_glock_disposal),
HZ * 60);
if (!atomic_read(&sdp->sd_glock_disposal))
break;
timed_out = time_after(jiffies, start + (HZ * 600));
fs_warn(sdp, "%u glocks left after %u seconds%s\n",
atomic_read(&sdp->sd_glock_disposal),
jiffies_to_msecs(jiffies - start) / 1000,
timed_out ? ":" : "; still waiting");
}
gfs2_lm_unmount(sdp);
gfs2_free_dead_glocks(sdp);
glock_hash_walk(dump_glock_func, sdp);
destroy_workqueue(sdp->sd_glock_wq);
}
static const char *state2str(unsigned state)
......@@ -2366,11 +2346,11 @@ static const char *gflags2str(char *buf, const struct gfs2_glock *gl)
*p++ = 'f';
if (test_bit(GLF_INVALIDATE_IN_PROGRESS, gflags))
*p++ = 'i';
if (test_bit(GLF_REPLY_PENDING, gflags))
if (test_bit(GLF_HAVE_REPLY, gflags))
*p++ = 'r';
if (test_bit(GLF_INITIAL, gflags))
*p++ = 'I';
if (test_bit(GLF_FROZEN, gflags))
*p++ = 'a';
if (test_bit(GLF_HAVE_FROZEN_REPLY, gflags))
*p++ = 'F';
if (!list_empty(&gl->gl_holders))
*p++ = 'q';
......@@ -2380,7 +2360,7 @@ static const char *gflags2str(char *buf, const struct gfs2_glock *gl)
*p++ = 'o';
if (test_bit(GLF_BLOCKING, gflags))
*p++ = 'b';
if (test_bit(GLF_FREEING, gflags))
if (test_bit(GLF_UNLOCKED, gflags))
*p++ = 'x';
if (test_bit(GLF_INSTANTIATE_NEEDED, gflags))
*p++ = 'n';
......@@ -2533,16 +2513,8 @@ int __init gfs2_glock_init(void)
if (ret < 0)
return ret;
glock_workqueue = alloc_workqueue("glock_workqueue", WQ_MEM_RECLAIM |
WQ_HIGHPRI | WQ_FREEZABLE, 0);
if (!glock_workqueue) {
rhashtable_destroy(&gl_hash_table);
return -ENOMEM;
}
glock_shrinker = shrinker_alloc(0, "gfs2-glock");
if (!glock_shrinker) {
destroy_workqueue(glock_workqueue);
rhashtable_destroy(&gl_hash_table);
return -ENOMEM;
}
......@@ -2562,7 +2534,6 @@ void gfs2_glock_exit(void)
{
shrinker_free(glock_shrinker);
rhashtable_destroy(&gl_hash_table);
destroy_workqueue(glock_workqueue);
}
static void gfs2_glock_iter_next(struct gfs2_glock_iter *gi, loff_t n)
......
......@@ -250,7 +250,6 @@ void gfs2_flush_delete_work(struct gfs2_sbd *sdp);
void gfs2_gl_hash_clear(struct gfs2_sbd *sdp);
void gfs2_gl_dq_holders(struct gfs2_sbd *sdp);
void gfs2_glock_thaw(struct gfs2_sbd *sdp);
void gfs2_glock_add_to_lru(struct gfs2_glock *gl);
void gfs2_glock_free(struct gfs2_glock *gl);
void gfs2_glock_free_later(struct gfs2_glock *gl);
......
......@@ -385,23 +385,6 @@ static void inode_go_inval(struct gfs2_glock *gl, int flags)
gfs2_clear_glop_pending(ip);
}
/**
* inode_go_demote_ok - Check to see if it's ok to unlock an inode glock
* @gl: the glock
*
* Returns: 1 if it's ok
*/
static int inode_go_demote_ok(const struct gfs2_glock *gl)
{
struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
if (sdp->sd_jindex == gl->gl_object || sdp->sd_rindex == gl->gl_object)
return 0;
return 1;
}
static int gfs2_dinode_in(struct gfs2_inode *ip, const void *buf)
{
struct gfs2_sbd *sdp = GFS2_SB(&ip->i_inode);
......@@ -648,21 +631,21 @@ static void iopen_go_callback(struct gfs2_glock *gl, bool remote)
}
/**
* inode_go_free - wake up anyone waiting for dlm's unlock ast to free it
* @gl: glock being freed
* inode_go_unlocked - wake up anyone waiting for dlm's unlock ast
* @gl: glock being unlocked
*
* For now, this is only used for the journal inode glock. In withdraw
* situations, we need to wait for the glock to be freed so that we know
* situations, we need to wait for the glock to be unlocked so that we know
* other nodes may proceed with recovery / journal replay.
*/
static void inode_go_free(struct gfs2_glock *gl)
static void inode_go_unlocked(struct gfs2_glock *gl)
{
/* Note that we cannot reference gl_object because it's already set
* to NULL by this point in its lifecycle. */
if (!test_bit(GLF_FREEING, &gl->gl_flags))
if (!test_bit(GLF_UNLOCKED, &gl->gl_flags))
return;
clear_bit_unlock(GLF_FREEING, &gl->gl_flags);
wake_up_bit(&gl->gl_flags, GLF_FREEING);
clear_bit_unlock(GLF_UNLOCKED, &gl->gl_flags);
wake_up_bit(&gl->gl_flags, GLF_UNLOCKED);
}
/**
......@@ -722,13 +705,12 @@ const struct gfs2_glock_operations gfs2_meta_glops = {
const struct gfs2_glock_operations gfs2_inode_glops = {
.go_sync = inode_go_sync,
.go_inval = inode_go_inval,
.go_demote_ok = inode_go_demote_ok,
.go_instantiate = inode_go_instantiate,
.go_held = inode_go_held,
.go_dump = inode_go_dump,
.go_type = LM_TYPE_INODE,
.go_flags = GLOF_ASPACE | GLOF_LRU | GLOF_LVB,
.go_free = inode_go_free,
.go_flags = GLOF_ASPACE | GLOF_LVB,
.go_unlocked = inode_go_unlocked,
};
const struct gfs2_glock_operations gfs2_rgrp_glops = {
......@@ -751,13 +733,13 @@ const struct gfs2_glock_operations gfs2_iopen_glops = {
.go_type = LM_TYPE_IOPEN,
.go_callback = iopen_go_callback,
.go_dump = inode_go_dump,
.go_flags = GLOF_LRU | GLOF_NONDISK,
.go_flags = GLOF_NONDISK,
.go_subclass = 1,
};
const struct gfs2_glock_operations gfs2_flock_glops = {
.go_type = LM_TYPE_FLOCK,
.go_flags = GLOF_LRU | GLOF_NONDISK,
.go_flags = GLOF_NONDISK,
};
const struct gfs2_glock_operations gfs2_nondisk_glops = {
......@@ -768,7 +750,7 @@ const struct gfs2_glock_operations gfs2_nondisk_glops = {
const struct gfs2_glock_operations gfs2_quota_glops = {
.go_type = LM_TYPE_QUOTA,
.go_flags = GLOF_LVB | GLOF_LRU | GLOF_NONDISK,
.go_flags = GLOF_LVB | GLOF_NONDISK,
};
const struct gfs2_glock_operations gfs2_journal_glops = {
......
......@@ -218,19 +218,17 @@ struct gfs2_glock_operations {
int (*go_sync) (struct gfs2_glock *gl);
int (*go_xmote_bh)(struct gfs2_glock *gl);
void (*go_inval) (struct gfs2_glock *gl, int flags);
int (*go_demote_ok) (const struct gfs2_glock *gl);
int (*go_instantiate) (struct gfs2_glock *gl);
int (*go_held)(struct gfs2_holder *gh);
void (*go_dump)(struct seq_file *seq, const struct gfs2_glock *gl,
const char *fs_id_buf);
void (*go_callback)(struct gfs2_glock *gl, bool remote);
void (*go_free)(struct gfs2_glock *gl);
void (*go_unlocked)(struct gfs2_glock *gl);
const int go_subclass;
const int go_type;
const unsigned long go_flags;
#define GLOF_ASPACE 1 /* address space attached */
#define GLOF_LVB 2 /* Lock Value Block attached */
#define GLOF_LRU 4 /* LRU managed */
#define GLOF_NONDISK 8 /* not I/O related */
};
......@@ -322,14 +320,14 @@ enum {
GLF_DIRTY = 6,
GLF_LFLUSH = 7,
GLF_INVALIDATE_IN_PROGRESS = 8,
GLF_REPLY_PENDING = 9,
GLF_HAVE_REPLY = 9,
GLF_INITIAL = 10,
GLF_FROZEN = 11,
GLF_HAVE_FROZEN_REPLY = 11,
GLF_INSTANTIATE_IN_PROG = 12, /* instantiate happening now */
GLF_LRU = 13,
GLF_OBJECT = 14, /* Used only for tracing */
GLF_BLOCKING = 15,
GLF_FREEING = 16, /* Wait for glock to be freed */
GLF_UNLOCKED = 16, /* Wait for glock to be unlocked */
GLF_TRY_TO_EVICT = 17, /* iopen glocks only */
GLF_VERIFY_EVICT = 18, /* iopen glocks only */
};
......@@ -772,6 +770,7 @@ struct gfs2_sbd {
/* Workqueue stuff */
struct workqueue_struct *sd_glock_wq;
struct workqueue_struct *sd_delete_wq;
/* Daemon stuff */
......@@ -783,7 +782,6 @@ struct gfs2_sbd {
struct list_head sd_quota_list;
atomic_t sd_quota_count;
struct mutex sd_quota_mutex;
struct mutex sd_quota_sync_mutex;
wait_queue_head_t sd_quota_wait;
......
......@@ -134,8 +134,8 @@ static void gdlm_ast(void *arg)
switch (gl->gl_lksb.sb_status) {
case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */
if (gl->gl_ops->go_free)
gl->gl_ops->go_free(gl);
if (gl->gl_ops->go_unlocked)
gl->gl_ops->go_unlocked(gl);
gfs2_glock_free(gl);
return;
case -DLM_ECANCEL: /* Cancel while getting lock */
......@@ -163,11 +163,21 @@ static void gdlm_ast(void *arg)
BUG();
}
set_bit(GLF_INITIAL, &gl->gl_flags);
/*
* The GLF_INITIAL flag is initially set for new glocks. Upon the
* first successful new (non-conversion) request, we clear this flag to
* indicate that a DLM lock exists and that gl->gl_lksb.sb_lkid is the
* identifier to use for identifying it.
*
* Any failed initial requests do not create a DLM lock, so we ignore
* the gl->gl_lksb.sb_lkid values that come with such requests.
*/
clear_bit(GLF_INITIAL, &gl->gl_flags);
gfs2_glock_complete(gl, ret);
return;
out:
if (!test_bit(GLF_INITIAL, &gl->gl_flags))
if (test_bit(GLF_INITIAL, &gl->gl_flags))
gl->gl_lksb.sb_lkid = 0;
gfs2_glock_complete(gl, ret);
}
......@@ -239,7 +249,7 @@ static u32 make_flags(struct gfs2_glock *gl, const unsigned int gfs_flags,
BUG();
}
if (gl->gl_lksb.sb_lkid != 0) {
if (!test_bit(GLF_INITIAL, &gl->gl_flags)) {
lkf |= DLM_LKF_CONVERT;
if (test_bit(GLF_BLOCKING, &gl->gl_flags))
lkf |= DLM_LKF_QUECVT;
......@@ -270,14 +280,14 @@ static int gdlm_lock(struct gfs2_glock *gl, unsigned int req_state,
lkf = make_flags(gl, flags, req);
gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
if (gl->gl_lksb.sb_lkid) {
gfs2_update_request_times(gl);
} else {
if (test_bit(GLF_INITIAL, &gl->gl_flags)) {
memset(strname, ' ', GDLM_STRNAME_BYTES - 1);
strname[GDLM_STRNAME_BYTES - 1] = '\0';
gfs2_reverse_hex(strname + 7, gl->gl_name.ln_type);
gfs2_reverse_hex(strname + 23, gl->gl_name.ln_number);
gl->gl_dstamp = ktime_get_real();
} else {
gfs2_update_request_times(gl);
}
/*
* Submit the actual lock request.
......@@ -301,7 +311,7 @@ static void gdlm_put_lock(struct gfs2_glock *gl)
BUG_ON(!__lockref_is_dead(&gl->gl_lockref));
if (gl->gl_lksb.sb_lkid == 0) {
if (test_bit(GLF_INITIAL, &gl->gl_flags)) {
gfs2_glock_free(gl);
return;
}
......
......@@ -103,7 +103,6 @@ static struct gfs2_sbd *init_sbd(struct super_block *sb)
init_completion(&sdp->sd_journal_ready);
INIT_LIST_HEAD(&sdp->sd_quota_list);
mutex_init(&sdp->sd_quota_mutex);
mutex_init(&sdp->sd_quota_sync_mutex);
init_waitqueue_head(&sdp->sd_quota_wait);
spin_lock_init(&sdp->sd_bitmap_lock);
......@@ -1188,11 +1187,17 @@ static int gfs2_fill_super(struct super_block *sb, struct fs_context *fc)
snprintf(sdp->sd_fsname, sizeof(sdp->sd_fsname), "%s", sdp->sd_table_name);
error = -ENOMEM;
sdp->sd_glock_wq = alloc_workqueue("gfs2-glock/%s",
WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_FREEZABLE, 0,
sdp->sd_fsname);
if (!sdp->sd_glock_wq)
goto fail_free;
sdp->sd_delete_wq = alloc_workqueue("gfs2-delete/%s",
WQ_MEM_RECLAIM | WQ_FREEZABLE, 0, sdp->sd_fsname);
error = -ENOMEM;
if (!sdp->sd_delete_wq)
goto fail_free;
goto fail_glock_wq;
error = gfs2_sys_fs_add(sdp);
if (error)
......@@ -1301,6 +1306,8 @@ static int gfs2_fill_super(struct super_block *sb, struct fs_context *fc)
gfs2_sys_fs_del(sdp);
fail_delete_wq:
destroy_workqueue(sdp->sd_delete_wq);
fail_glock_wq:
destroy_workqueue(sdp->sd_glock_wq);
fail_free:
free_sbd(sdp);
sb->s_fs_info = NULL;
......
......@@ -75,9 +75,6 @@
#define GFS2_QD_HASH_SIZE BIT(GFS2_QD_HASH_SHIFT)
#define GFS2_QD_HASH_MASK (GFS2_QD_HASH_SIZE - 1)
#define QC_CHANGE 0
#define QC_SYNC 1
/* Lock order: qd_lock -> bucket lock -> qd->lockref.lock -> lru lock */
/* -> sd_bitmap_lock */
static DEFINE_SPINLOCK(qd_lock);
......@@ -319,11 +316,11 @@ static int qd_get(struct gfs2_sbd *sdp, struct kqid qid,
}
static void qd_hold(struct gfs2_quota_data *qd)
static void __qd_hold(struct gfs2_quota_data *qd)
{
struct gfs2_sbd *sdp = qd->qd_sbd;
gfs2_assert(sdp, !__lockref_is_dead(&qd->qd_lockref));
lockref_get(&qd->qd_lockref);
gfs2_assert(sdp, qd->qd_lockref.count > 0);
qd->qd_lockref.count++;
}
static void qd_put(struct gfs2_quota_data *qd)
......@@ -400,16 +397,17 @@ static int bh_get(struct gfs2_quota_data *qd)
struct inode *inode = sdp->sd_qc_inode;
struct gfs2_inode *ip = GFS2_I(inode);
unsigned int block, offset;
struct buffer_head *bh;
struct buffer_head *bh = NULL;
struct iomap iomap = { };
int error;
mutex_lock(&sdp->sd_quota_mutex);
if (qd->qd_bh_count++) {
mutex_unlock(&sdp->sd_quota_mutex);
spin_lock(&qd->qd_lockref.lock);
if (qd->qd_bh_count) {
qd->qd_bh_count++;
spin_unlock(&qd->qd_lockref.lock);
return 0;
}
spin_unlock(&qd->qd_lockref.lock);
block = qd->qd_slot / sdp->sd_qc_per_block;
offset = qd->qd_slot % sdp->sd_qc_per_block;
......@@ -418,122 +416,83 @@ static int bh_get(struct gfs2_quota_data *qd)
(loff_t)block << inode->i_blkbits,
i_blocksize(inode), &iomap);
if (error)
goto fail;
return error;
error = -ENOENT;
if (iomap.type != IOMAP_MAPPED)
goto fail;
return error;
error = gfs2_meta_read(ip->i_gl, iomap.addr >> inode->i_blkbits,
DIO_WAIT, 0, &bh);
if (error)
goto fail;
return error;
error = -EIO;
if (gfs2_metatype_check(sdp, bh, GFS2_METATYPE_QC))
goto fail_brelse;
goto out;
spin_lock(&qd->qd_lockref.lock);
if (qd->qd_bh == NULL) {
qd->qd_bh = bh;
qd->qd_bh_qc = (struct gfs2_quota_change *)
(bh->b_data + sizeof(struct gfs2_meta_header) +
offset * sizeof(struct gfs2_quota_change));
bh = NULL;
}
qd->qd_bh_count++;
spin_unlock(&qd->qd_lockref.lock);
error = 0;
mutex_unlock(&sdp->sd_quota_mutex);
return 0;
fail_brelse:
out:
brelse(bh);
fail:
qd->qd_bh_count--;
mutex_unlock(&sdp->sd_quota_mutex);
return error;
}
static void bh_put(struct gfs2_quota_data *qd)
{
struct gfs2_sbd *sdp = qd->qd_sbd;
struct buffer_head *bh = NULL;
mutex_lock(&sdp->sd_quota_mutex);
spin_lock(&qd->qd_lockref.lock);
gfs2_assert(sdp, qd->qd_bh_count);
if (!--qd->qd_bh_count) {
brelse(qd->qd_bh);
bh = qd->qd_bh;
qd->qd_bh = NULL;
qd->qd_bh_qc = NULL;
}
mutex_unlock(&sdp->sd_quota_mutex);
spin_unlock(&qd->qd_lockref.lock);
brelse(bh);
}
static int qd_check_sync(struct gfs2_sbd *sdp, struct gfs2_quota_data *qd,
u64 *sync_gen)
static bool qd_grab_sync(struct gfs2_sbd *sdp, struct gfs2_quota_data *qd,
u64 sync_gen)
{
bool ret = false;
spin_lock(&qd->qd_lockref.lock);
if (test_bit(QDF_LOCKED, &qd->qd_flags) ||
!test_bit(QDF_CHANGE, &qd->qd_flags) ||
(sync_gen && (qd->qd_sync_gen >= *sync_gen)))
return 0;
/*
* If qd_change is 0 it means a pending quota change was negated.
* We should not sync it, but we still have a qd reference and slot
* reference taken by gfs2_quota_change -> do_qc that need to be put.
*/
if (!qd->qd_change && test_and_clear_bit(QDF_CHANGE, &qd->qd_flags)) {
slot_put(qd);
qd_put(qd);
return 0;
}
qd->qd_sync_gen >= sync_gen)
goto out;
if (!lockref_get_not_dead(&qd->qd_lockref))
return 0;
if (__lockref_is_dead(&qd->qd_lockref))
goto out;
qd->qd_lockref.count++;
list_move_tail(&qd->qd_list, &sdp->sd_quota_list);
set_bit(QDF_LOCKED, &qd->qd_flags);
qd->qd_change_sync = qd->qd_change;
slot_hold(qd);
return 1;
ret = true;
out:
spin_unlock(&qd->qd_lockref.lock);
return ret;
}
static int qd_bh_get_or_undo(struct gfs2_sbd *sdp, struct gfs2_quota_data *qd)
static void qd_ungrab_sync(struct gfs2_quota_data *qd)
{
int error;
error = bh_get(qd);
if (!error)
return 0;
clear_bit(QDF_LOCKED, &qd->qd_flags);
slot_put(qd);
qd_put(qd);
return error;
}
static int qd_fish(struct gfs2_sbd *sdp, struct gfs2_quota_data **qdp)
{
struct gfs2_quota_data *qd = NULL, *iter;
int error;
*qdp = NULL;
if (sb_rdonly(sdp->sd_vfs))
return 0;
spin_lock(&qd_lock);
list_for_each_entry(iter, &sdp->sd_quota_list, qd_list) {
if (qd_check_sync(sdp, iter, &sdp->sd_quota_sync_gen)) {
qd = iter;
break;
}
}
spin_unlock(&qd_lock);
if (qd) {
error = qd_bh_get_or_undo(sdp, qd);
if (error)
return error;
*qdp = qd;
}
return 0;
}
static void qdsb_put(struct gfs2_quota_data *qd)
......@@ -545,8 +504,10 @@ static void qdsb_put(struct gfs2_quota_data *qd)
static void qd_unlock(struct gfs2_quota_data *qd)
{
spin_lock(&qd->qd_lockref.lock);
gfs2_assert_warn(qd->qd_sbd, test_bit(QDF_LOCKED, &qd->qd_flags));
clear_bit(QDF_LOCKED, &qd->qd_flags);
spin_unlock(&qd->qd_lockref.lock);
qdsb_put(qd);
}
......@@ -710,48 +671,57 @@ static int sort_qd(const void *a, const void *b)
return 0;
}
static void do_qc(struct gfs2_quota_data *qd, s64 change, int qc_type)
static void do_qc(struct gfs2_quota_data *qd, s64 change)
{
struct gfs2_sbd *sdp = qd->qd_sbd;
struct gfs2_inode *ip = GFS2_I(sdp->sd_qc_inode);
struct gfs2_quota_change *qc = qd->qd_bh_qc;
bool needs_put = false;
s64 x;
mutex_lock(&sdp->sd_quota_mutex);
gfs2_trans_add_meta(ip->i_gl, qd->qd_bh);
if (!test_bit(QDF_CHANGE, &qd->qd_flags)) {
qc->qc_change = 0;
/*
* The QDF_CHANGE flag indicates that the slot in the quota change file
* is used. Here, we use the value of qc->qc_change when the slot is
* used, and we assume a value of 0 otherwise.
*/
spin_lock(&qd->qd_lockref.lock);
x = 0;
if (test_bit(QDF_CHANGE, &qd->qd_flags))
x = be64_to_cpu(qc->qc_change);
x += change;
qd->qd_change += change;
if (!x && test_bit(QDF_CHANGE, &qd->qd_flags)) {
/* The slot in the quota change file becomes unused. */
clear_bit(QDF_CHANGE, &qd->qd_flags);
qc->qc_flags = 0;
qc->qc_id = 0;
needs_put = true;
} else if (x && !test_bit(QDF_CHANGE, &qd->qd_flags)) {
/* The slot in the quota change file becomes used. */
set_bit(QDF_CHANGE, &qd->qd_flags);
__qd_hold(qd);
slot_hold(qd);
qc->qc_flags = 0;
if (qd->qd_id.type == USRQUOTA)
qc->qc_flags = cpu_to_be32(GFS2_QCF_USER);
qc->qc_id = cpu_to_be32(from_kqid(&init_user_ns, qd->qd_id));
}
x = be64_to_cpu(qc->qc_change) + change;
qc->qc_change = cpu_to_be64(x);
spin_lock(&qd_lock);
qd->qd_change = x;
spin_unlock(&qd_lock);
spin_unlock(&qd->qd_lockref.lock);
if (qc_type == QC_CHANGE) {
if (!test_and_set_bit(QDF_CHANGE, &qd->qd_flags)) {
qd_hold(qd);
slot_hold(qd);
}
} else {
gfs2_assert_warn(sdp, test_bit(QDF_CHANGE, &qd->qd_flags));
clear_bit(QDF_CHANGE, &qd->qd_flags);
qc->qc_flags = 0;
qc->qc_id = 0;
if (needs_put) {
slot_put(qd);
qd_put(qd);
}
if (change < 0) /* Reset quiet flag if we freed some blocks */
clear_bit(QDF_QMSG_QUIET, &qd->qd_flags);
mutex_unlock(&sdp->sd_quota_mutex);
}
static int gfs2_write_buf_to_page(struct gfs2_sbd *sdp, unsigned long index,
......@@ -890,6 +860,7 @@ static int gfs2_adjust_quota(struct gfs2_sbd *sdp, loff_t loc,
be64_add_cpu(&q.qu_value, change);
if (((s64)be64_to_cpu(q.qu_value)) < 0)
q.qu_value = 0; /* Never go negative on quota usage */
spin_lock(&qd->qd_lockref.lock);
qd->qd_qb.qb_value = q.qu_value;
if (fdq) {
if (fdq->d_fieldmask & QC_SPC_SOFT) {
......@@ -905,6 +876,7 @@ static int gfs2_adjust_quota(struct gfs2_sbd *sdp, loff_t loc,
qd->qd_qb.qb_value = q.qu_value;
}
}
spin_unlock(&qd->qd_lockref.lock);
err = gfs2_write_disk_quota(sdp, &q, loc);
if (!err) {
......@@ -919,7 +891,8 @@ static int gfs2_adjust_quota(struct gfs2_sbd *sdp, loff_t loc,
return err;
}
static int do_sync(unsigned int num_qd, struct gfs2_quota_data **qda)
static int do_sync(unsigned int num_qd, struct gfs2_quota_data **qda,
u64 sync_gen)
{
struct gfs2_sbd *sdp = (*qda)->qd_sbd;
struct gfs2_inode *ip = GFS2_I(sdp->sd_quota_inode);
......@@ -992,7 +965,7 @@ static int do_sync(unsigned int num_qd, struct gfs2_quota_data **qda)
if (error)
goto out_end_trans;
do_qc(qd, -qd->qd_change_sync, QC_SYNC);
do_qc(qd, -qd->qd_change_sync);
set_bit(QDF_REFRESH, &qd->qd_flags);
}
......@@ -1010,8 +983,13 @@ static int do_sync(unsigned int num_qd, struct gfs2_quota_data **qda)
gfs2_log_flush(ip->i_gl->gl_name.ln_sbd, ip->i_gl,
GFS2_LOG_HEAD_FLUSH_NORMAL | GFS2_LFC_DO_SYNC);
if (!error) {
for (x = 0; x < num_qd; x++)
qda[x]->qd_sync_gen = sdp->sd_quota_sync_gen;
for (x = 0; x < num_qd; x++) {
qd = qda[x];
spin_lock(&qd->qd_lockref.lock);
if (qd->qd_sync_gen < sync_gen)
qd->qd_sync_gen = sync_gen;
spin_unlock(&qd->qd_lockref.lock);
}
}
return error;
}
......@@ -1036,7 +1014,9 @@ static int update_qd(struct gfs2_sbd *sdp, struct gfs2_quota_data *qd)
qlvb->qb_limit = q.qu_limit;
qlvb->qb_warn = q.qu_warn;
qlvb->qb_value = q.qu_value;
spin_lock(&qd->qd_lockref.lock);
qd->qd_qb = *qlvb;
spin_unlock(&qd->qd_lockref.lock);
return 0;
}
......@@ -1058,7 +1038,9 @@ static int do_glock(struct gfs2_quota_data *qd, int force_refresh,
if (test_and_clear_bit(QDF_REFRESH, &qd->qd_flags))
force_refresh = FORCE;
spin_lock(&qd->qd_lockref.lock);
qd->qd_qb = *(struct gfs2_quota_lvb *)qd->qd_gl->gl_lksb.sb_lvbptr;
spin_unlock(&qd->qd_lockref.lock);
if (force_refresh || qd->qd_qb.qb_magic != cpu_to_be32(GFS2_MAGIC)) {
gfs2_glock_dq_uninit(q_gh);
......@@ -1129,35 +1111,36 @@ static bool need_sync(struct gfs2_quota_data *qd)
{
struct gfs2_sbd *sdp = qd->qd_sbd;
struct gfs2_tune *gt = &sdp->sd_tune;
s64 value;
s64 value, change, limit;
unsigned int num, den;
int ret = false;
spin_lock(&qd->qd_lockref.lock);
if (!qd->qd_qb.qb_limit)
return false;
goto out;
spin_lock(&qd_lock);
value = qd->qd_change;
spin_unlock(&qd_lock);
change = qd->qd_change;
if (change <= 0)
goto out;
value = (s64)be64_to_cpu(qd->qd_qb.qb_value);
limit = (s64)be64_to_cpu(qd->qd_qb.qb_limit);
if (value >= limit)
goto out;
spin_lock(&gt->gt_spin);
num = gt->gt_quota_scale_num;
den = gt->gt_quota_scale_den;
spin_unlock(&gt->gt_spin);
if (value <= 0)
return false;
else if ((s64)be64_to_cpu(qd->qd_qb.qb_value) >=
(s64)be64_to_cpu(qd->qd_qb.qb_limit))
return false;
else {
value *= gfs2_jindex_size(sdp) * num;
value = div_s64(value, den);
value += (s64)be64_to_cpu(qd->qd_qb.qb_value);
if (value < (s64)be64_to_cpu(qd->qd_qb.qb_limit))
return false;
}
change *= gfs2_jindex_size(sdp) * num;
change = div_s64(change, den);
if (value + change < limit)
goto out;
return true;
ret = true;
out:
spin_unlock(&qd->qd_lockref.lock);
return ret;
}
void gfs2_quota_unlock(struct gfs2_inode *ip)
......@@ -1166,7 +1149,6 @@ void gfs2_quota_unlock(struct gfs2_inode *ip)
struct gfs2_quota_data *qda[2 * GFS2_MAXQUOTAS];
unsigned int count = 0;
u32 x;
int found;
if (!test_and_clear_bit(GIF_QD_LOCKED, &ip->i_flags))
return;
......@@ -1174,6 +1156,7 @@ void gfs2_quota_unlock(struct gfs2_inode *ip)
for (x = 0; x < ip->i_qadata->qa_qd_num; x++) {
struct gfs2_quota_data *qd;
bool sync;
int error;
qd = ip->i_qadata->qa_qd[x];
sync = need_sync(qd);
......@@ -1183,18 +1166,26 @@ void gfs2_quota_unlock(struct gfs2_inode *ip)
continue;
spin_lock(&qd_lock);
found = qd_check_sync(sdp, qd, NULL);
sync = qd_grab_sync(sdp, qd, U64_MAX);
spin_unlock(&qd_lock);
if (!found)
if (!sync)
continue;
if (!qd_bh_get_or_undo(sdp, qd))
gfs2_assert_warn(sdp, qd->qd_change_sync);
error = bh_get(qd);
if (error) {
qd_ungrab_sync(qd);
continue;
}
qda[count++] = qd;
}
if (count) {
do_sync(count, qda);
u64 sync_gen = READ_ONCE(sdp->sd_quota_sync_gen);
do_sync(count, qda, sync_gen);
for (x = 0; x < count; x++)
qd_unlock(qda[x]);
}
......@@ -1253,12 +1244,12 @@ int gfs2_quota_check(struct gfs2_inode *ip, kuid_t uid, kgid_t gid,
qid_eq(qd->qd_id, make_kqid_gid(gid))))
continue;
spin_lock(&qd->qd_lockref.lock);
warn = (s64)be64_to_cpu(qd->qd_qb.qb_warn);
limit = (s64)be64_to_cpu(qd->qd_qb.qb_limit);
value = (s64)be64_to_cpu(qd->qd_qb.qb_value);
spin_lock(&qd_lock);
value += qd->qd_change;
spin_unlock(&qd_lock);
spin_unlock(&qd->qd_lockref.lock);
if (limit > 0 && (limit - value) < ap->allowed)
ap->allowed = limit - value;
......@@ -1312,39 +1303,20 @@ void gfs2_quota_change(struct gfs2_inode *ip, s64 change,
if (qid_eq(qd->qd_id, make_kqid_uid(uid)) ||
qid_eq(qd->qd_id, make_kqid_gid(gid))) {
do_qc(qd, change, QC_CHANGE);
do_qc(qd, change);
}
}
}
static bool qd_changed(struct gfs2_sbd *sdp)
{
struct gfs2_quota_data *qd;
bool changed = false;
spin_lock(&qd_lock);
list_for_each_entry(qd, &sdp->sd_quota_list, qd_list) {
if (test_bit(QDF_LOCKED, &qd->qd_flags) ||
!test_bit(QDF_CHANGE, &qd->qd_flags))
continue;
changed = true;
break;
}
spin_unlock(&qd_lock);
return changed;
}
int gfs2_quota_sync(struct super_block *sb, int type)
{
struct gfs2_sbd *sdp = sb->s_fs_info;
struct gfs2_quota_data **qda;
unsigned int max_qd = PAGE_SIZE / sizeof(struct gfs2_holder);
unsigned int num_qd;
unsigned int x;
u64 sync_gen;
int error = 0;
if (!qd_changed(sdp))
if (sb_rdonly(sdp->sd_vfs))
return 0;
qda = kcalloc(max_qd, sizeof(struct gfs2_quota_data *), GFP_KERNEL);
......@@ -1352,27 +1324,44 @@ int gfs2_quota_sync(struct super_block *sb, int type)
return -ENOMEM;
mutex_lock(&sdp->sd_quota_sync_mutex);
sdp->sd_quota_sync_gen++;
sync_gen = sdp->sd_quota_sync_gen + 1;
do {
num_qd = 0;
struct gfs2_quota_data *iter;
unsigned int num_qd = 0;
unsigned int x;
for (;;) {
error = qd_fish(sdp, qda + num_qd);
if (error || !qda[num_qd])
break;
if (++num_qd == max_qd)
spin_lock(&qd_lock);
list_for_each_entry(iter, &sdp->sd_quota_list, qd_list) {
if (qd_grab_sync(sdp, iter, sync_gen)) {
qda[num_qd++] = iter;
if (num_qd == max_qd)
break;
}
}
spin_unlock(&qd_lock);
if (!num_qd)
break;
if (num_qd) {
for (x = 0; x < num_qd; x++) {
error = bh_get(qda[x]);
if (!error)
error = do_sync(num_qd, qda);
continue;
while (x < num_qd)
qd_ungrab_sync(qda[--num_qd]);
break;
}
if (!error) {
WRITE_ONCE(sdp->sd_quota_sync_gen, sync_gen);
error = do_sync(num_qd, qda, sync_gen);
}
for (x = 0; x < num_qd; x++)
qd_unlock(qda[x]);
}
} while (!error && num_qd == max_qd);
} while (!error);
mutex_unlock(&sdp->sd_quota_sync_mutex);
kfree(qda);
......@@ -1407,6 +1396,7 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
unsigned int found = 0;
unsigned int hash;
unsigned int bm_size;
struct buffer_head *bh;
u64 dblock;
u32 extlen = 0;
int error;
......@@ -1426,8 +1416,7 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
return error;
for (x = 0; x < blocks; x++) {
struct buffer_head *bh;
const struct gfs2_quota_change *qc;
struct gfs2_quota_change *qc;
unsigned int y;
if (!extlen) {
......@@ -1440,15 +1429,13 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
bh = gfs2_meta_ra(ip->i_gl, dblock, extlen);
if (!bh)
goto fail;
if (gfs2_metatype_check(sdp, bh, GFS2_METATYPE_QC)) {
brelse(bh);
goto fail;
}
if (gfs2_metatype_check(sdp, bh, GFS2_METATYPE_QC))
goto fail_brelse;
qc = (const struct gfs2_quota_change *)(bh->b_data + sizeof(struct gfs2_meta_header));
qc = (struct gfs2_quota_change *)(bh->b_data + sizeof(struct gfs2_meta_header));
for (y = 0; y < sdp->sd_qc_per_block && slot < sdp->sd_quota_slots;
y++, slot++) {
struct gfs2_quota_data *qd;
struct gfs2_quota_data *old_qd, *qd;
s64 qc_change = be64_to_cpu(qc->qc_change);
u32 qc_flags = be32_to_cpu(qc->qc_flags);
enum quota_type qtype = (qc_flags & GFS2_QCF_USER) ?
......@@ -1461,10 +1448,8 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
hash = gfs2_qd_hash(sdp, qc_id);
qd = qd_alloc(hash, sdp, qc_id);
if (qd == NULL) {
brelse(bh);
goto fail;
}
if (qd == NULL)
goto fail_brelse;
set_bit(QDF_CHANGE, &qd->qd_flags);
qd->qd_change = qc_change;
......@@ -1472,18 +1457,41 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
qd->qd_slot_ref = 1;
spin_lock(&qd_lock);
spin_lock_bucket(hash);
old_qd = gfs2_qd_search_bucket(hash, sdp, qc_id);
if (old_qd) {
fs_err(sdp, "Corruption found in quota_change%u"
"file: duplicate identifier in "
"slot %u\n",
sdp->sd_jdesc->jd_jid, slot);
spin_unlock_bucket(hash);
spin_unlock(&qd_lock);
qd_put(old_qd);
gfs2_glock_put(qd->qd_gl);
kmem_cache_free(gfs2_quotad_cachep, qd);
/* zero out the duplicate slot */
lock_buffer(bh);
memset(qc, 0, sizeof(*qc));
mark_buffer_dirty(bh);
unlock_buffer(bh);
continue;
}
BUG_ON(test_and_set_bit(slot, sdp->sd_quota_bitmap));
list_add(&qd->qd_list, &sdp->sd_quota_list);
atomic_inc(&sdp->sd_quota_count);
spin_unlock(&qd_lock);
spin_lock_bucket(hash);
hlist_bl_add_head_rcu(&qd->qd_hlist, &qd_hash_table[hash]);
spin_unlock_bucket(hash);
spin_unlock(&qd_lock);
found++;
}
if (buffer_dirty(bh))
sync_dirty_buffer(bh);
brelse(bh);
dblock++;
extlen--;
......@@ -1494,6 +1502,10 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
return 0;
fail_brelse:
if (buffer_dirty(bh))
sync_dirty_buffer(bh);
brelse(bh);
fail:
gfs2_quota_cleanup(sdp);
return error;
......
......@@ -1524,7 +1524,6 @@ static void gfs2_evict_inode(struct inode *inode)
if (ip->i_gl) {
glock_clear_object(ip->i_gl, ip);
wait_on_bit_io(&ip->i_flags, GIF_GLOP_PENDING, TASK_UNINTERRUPTIBLE);
gfs2_glock_add_to_lru(ip->i_gl);
gfs2_glock_put_eventually(ip->i_gl);
rcu_assign_pointer(ip->i_gl, NULL);
}
......
......@@ -53,9 +53,9 @@
{(1UL << GLF_DIRTY), "y" }, \
{(1UL << GLF_LFLUSH), "f" }, \
{(1UL << GLF_INVALIDATE_IN_PROGRESS), "i" }, \
{(1UL << GLF_REPLY_PENDING), "r" }, \
{(1UL << GLF_INITIAL), "I" }, \
{(1UL << GLF_FROZEN), "F" }, \
{(1UL << GLF_HAVE_REPLY), "r" }, \
{(1UL << GLF_INITIAL), "a" }, \
{(1UL << GLF_HAVE_FROZEN_REPLY), "F" }, \
{(1UL << GLF_LRU), "L" }, \
{(1UL << GLF_OBJECT), "o" }, \
{(1UL << GLF_BLOCKING), "b" })
......
......@@ -99,12 +99,12 @@ int check_journal_clean(struct gfs2_sbd *sdp, struct gfs2_jdesc *jd,
*/
int gfs2_freeze_lock_shared(struct gfs2_sbd *sdp)
{
int flags = LM_FLAG_NOEXP | GL_EXACT;
int error;
error = gfs2_glock_nq_init(sdp->sd_freeze_gl, LM_ST_SHARED,
LM_FLAG_NOEXP | GL_EXACT,
error = gfs2_glock_nq_init(sdp->sd_freeze_gl, LM_ST_SHARED, flags,
&sdp->sd_freeze_gh);
if (error)
if (error && error != GLR_TRYFAILED)
fs_err(sdp, "can't lock the freeze glock: %d\n", error);
return error;
}
......@@ -206,9 +206,9 @@ static void signal_our_withdraw(struct gfs2_sbd *sdp)
* on other nodes to be successful, otherwise we remain the owner of
* the glock as far as dlm is concerned.
*/
if (i_gl->gl_ops->go_free) {
set_bit(GLF_FREEING, &i_gl->gl_flags);
wait_on_bit(&i_gl->gl_flags, GLF_FREEING, TASK_UNINTERRUPTIBLE);
if (i_gl->gl_ops->go_unlocked) {
set_bit(GLF_UNLOCKED, &i_gl->gl_flags);
wait_on_bit(&i_gl->gl_flags, GLF_UNLOCKED, TASK_UNINTERRUPTIBLE);
}
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment