Commit 0515480a authored by Andreas Gruenbacher's avatar Andreas Gruenbacher Committed by Bob Peterson

gfs2: gfs2_glock_get: Wait on freeing glocks

Keep glocks in their hash table until they are freed instead of removing
them when their last reference is dropped.  This allows to wait for any
previous instances of a glock to go away in gfs2_glock_get before
creating a new glocks.

Special thanks to Andy Price for finding and fixing a problem which also
required us to delete the rcu_read_unlock from the error case in function
gfs2_glock_get.
Signed-off-by: default avatarAndreas Gruenbacher <agruenba@redhat.com>
Signed-off-by: default avatarBob Peterson <rpeterso@redhat.com>
parent 61b91cfd
......@@ -15,6 +15,7 @@
#include <linux/buffer_head.h>
#include <linux/delay.h>
#include <linux/sort.h>
#include <linux/hash.h>
#include <linux/jhash.h>
#include <linux/kallsyms.h>
#include <linux/gfs2_ondisk.h>
......@@ -80,6 +81,66 @@ static struct rhashtable_params ht_parms = {
static struct rhashtable gl_hash_table;
#define GLOCK_WAIT_TABLE_BITS 12
#define GLOCK_WAIT_TABLE_SIZE (1 << GLOCK_WAIT_TABLE_BITS)
static wait_queue_head_t glock_wait_table[GLOCK_WAIT_TABLE_SIZE] __cacheline_aligned;
struct wait_glock_queue {
struct lm_lockname *name;
wait_queue_entry_t wait;
};
static int glock_wake_function(wait_queue_entry_t *wait, unsigned int mode,
int sync, void *key)
{
struct wait_glock_queue *wait_glock =
container_of(wait, struct wait_glock_queue, wait);
struct lm_lockname *wait_name = wait_glock->name;
struct lm_lockname *wake_name = key;
if (wake_name->ln_sbd != wait_name->ln_sbd ||
wake_name->ln_number != wait_name->ln_number ||
wake_name->ln_type != wait_name->ln_type)
return 0;
return autoremove_wake_function(wait, mode, sync, key);
}
static wait_queue_head_t *glock_waitqueue(struct lm_lockname *name)
{
u32 hash = jhash2((u32 *)name, sizeof(*name) / 4, 0);
return glock_wait_table + hash_32(hash, GLOCK_WAIT_TABLE_BITS);
}
static void prepare_to_wait_on_glock(wait_queue_head_t **wq,
struct wait_glock_queue *wait,
struct lm_lockname *name)
{
wait->name = name;
init_wait(&wait->wait);
wait->wait.func = glock_wake_function;
*wq = glock_waitqueue(name);
prepare_to_wait(*wq, &wait->wait, TASK_UNINTERRUPTIBLE);
}
static void finish_wait_on_glock(wait_queue_head_t *wq,
struct wait_glock_queue *wait)
{
finish_wait(wq, &wait->wait);
}
/**
* wake_up_glock - Wake up waiters on a glock
* @gl: the glock
*/
static void wake_up_glock(struct gfs2_glock *gl)
{
wait_queue_head_t *wq = glock_waitqueue(&gl->gl_name);
if (waitqueue_active(wq))
__wake_up(wq, TASK_NORMAL, 1, &gl->gl_name);
}
static void gfs2_glock_dealloc(struct rcu_head *rcu)
{
struct gfs2_glock *gl = container_of(rcu, struct gfs2_glock, gl_rcu);
......@@ -96,6 +157,9 @@ void gfs2_glock_free(struct gfs2_glock *gl)
{
struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
rhashtable_remove_fast(&gl_hash_table, &gl->gl_node, ht_parms);
smp_mb();
wake_up_glock(gl);
call_rcu(&gl->gl_rcu, gfs2_glock_dealloc);
if (atomic_dec_and_test(&sdp->sd_glock_disposal))
wake_up(&sdp->sd_glock_wait);
......@@ -194,7 +258,6 @@ static void __gfs2_glock_put(struct gfs2_glock *gl)
gfs2_glock_remove_from_lru(gl);
spin_unlock(&gl->gl_lockref.lock);
rhashtable_remove_fast(&gl_hash_table, &gl->gl_node, ht_parms);
GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
GLOCK_BUG_ON(gl, mapping && mapping->nrpages);
trace_gfs2_glock_put(gl);
......@@ -679,6 +742,36 @@ static void glock_work_func(struct work_struct *work)
spin_unlock(&gl->gl_lockref.lock);
}
static struct gfs2_glock *find_insert_glock(struct lm_lockname *name,
struct gfs2_glock *new)
{
struct wait_glock_queue wait;
wait_queue_head_t *wq;
struct gfs2_glock *gl;
again:
prepare_to_wait_on_glock(&wq, &wait, name);
rcu_read_lock();
if (new) {
gl = rhashtable_lookup_get_insert_fast(&gl_hash_table,
&new->gl_node, ht_parms);
if (IS_ERR(gl))
goto out;
} else {
gl = rhashtable_lookup_fast(&gl_hash_table,
name, ht_parms);
}
if (gl && !lockref_get_not_dead(&gl->gl_lockref)) {
rcu_read_unlock();
schedule();
goto again;
}
out:
rcu_read_unlock();
finish_wait_on_glock(wq, &wait);
return gl;
}
/**
* gfs2_glock_get() - Get a glock, or create one if one doesn't exist
* @sdp: The GFS2 superblock
......@@ -705,15 +798,11 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
struct kmem_cache *cachep;
int ret = 0;
rcu_read_lock();
gl = rhashtable_lookup_fast(&gl_hash_table, &name, ht_parms);
if (gl && !lockref_get_not_dead(&gl->gl_lockref))
gl = NULL;
rcu_read_unlock();
*glp = gl;
if (gl)
gl = find_insert_glock(&name, NULL);
if (gl) {
*glp = gl;
return 0;
}
if (!create)
return -ENOENT;
......@@ -767,10 +856,7 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
mapping->writeback_index = 0;
}
again:
rcu_read_lock();
tmp = rhashtable_lookup_get_insert_fast(&gl_hash_table, &gl->gl_node,
ht_parms);
tmp = find_insert_glock(&name, gl);
if (!tmp) {
*glp = gl;
goto out;
......@@ -779,13 +865,7 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
ret = PTR_ERR(tmp);
goto out_free;
}
if (lockref_get_not_dead(&tmp->gl_lockref)) {
*glp = tmp;
goto out_free;
}
rcu_read_unlock();
cond_resched();
goto again;
*glp = tmp;
out_free:
kfree(gl->gl_lksb.sb_lvbptr);
......@@ -793,7 +873,6 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
atomic_dec(&sdp->sd_glock_disposal);
out:
rcu_read_unlock();
return ret;
}
......@@ -1806,7 +1885,7 @@ static int gfs2_sbstats_seq_show(struct seq_file *seq, void *iter_ptr)
int __init gfs2_glock_init(void)
{
int ret;
int i, ret;
ret = rhashtable_init(&gl_hash_table, &ht_parms);
if (ret < 0)
......@@ -1835,6 +1914,9 @@ int __init gfs2_glock_init(void)
return ret;
}
for (i = 0; i < GLOCK_WAIT_TABLE_SIZE; i++)
init_waitqueue_head(glock_wait_table + i);
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment