Commit dcdaad05 authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

dlm: change to single hashtable lock

Prepare to replace our own hash table with rhashtable by replacing
the per-bucket locks in our own hash table with a single lock.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 700b0480
...@@ -452,7 +452,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) ...@@ -452,7 +452,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
if (!RB_EMPTY_ROOT(tree)) { if (!RB_EMPTY_ROOT(tree)) {
for (node = rb_first(tree); node; node = rb_next(node)) { for (node = rb_first(tree); node; node = rb_next(node)) {
r = rb_entry(node, struct dlm_rsb, res_hashnode); r = rb_entry(node, struct dlm_rsb, res_hashnode);
...@@ -460,12 +460,12 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) ...@@ -460,12 +460,12 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
dlm_hold_rsb(r); dlm_hold_rsb(r);
ri->rsb = r; ri->rsb = r;
ri->bucket = bucket; ri->bucket = bucket;
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
return ri; return ri;
} }
} }
} }
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
/* /*
* move to the first rsb in the next non-empty bucket * move to the first rsb in the next non-empty bucket
...@@ -484,18 +484,18 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) ...@@ -484,18 +484,18 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
} }
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
if (!RB_EMPTY_ROOT(tree)) { if (!RB_EMPTY_ROOT(tree)) {
node = rb_first(tree); node = rb_first(tree);
r = rb_entry(node, struct dlm_rsb, res_hashnode); r = rb_entry(node, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r); dlm_hold_rsb(r);
ri->rsb = r; ri->rsb = r;
ri->bucket = bucket; ri->bucket = bucket;
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
*pos = n; *pos = n;
return ri; return ri;
} }
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
} }
} }
...@@ -516,7 +516,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) ...@@ -516,7 +516,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
* move to the next rsb in the same bucket * move to the next rsb in the same bucket
*/ */
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
rp = ri->rsb; rp = ri->rsb;
next = rb_next(&rp->res_hashnode); next = rb_next(&rp->res_hashnode);
...@@ -524,12 +524,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) ...@@ -524,12 +524,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
r = rb_entry(next, struct dlm_rsb, res_hashnode); r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r); dlm_hold_rsb(r);
ri->rsb = r; ri->rsb = r;
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_put_rsb(rp); dlm_put_rsb(rp);
++*pos; ++*pos;
return ri; return ri;
} }
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_put_rsb(rp); dlm_put_rsb(rp);
/* /*
...@@ -550,18 +550,18 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) ...@@ -550,18 +550,18 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
} }
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
if (!RB_EMPTY_ROOT(tree)) { if (!RB_EMPTY_ROOT(tree)) {
next = rb_first(tree); next = rb_first(tree);
r = rb_entry(next, struct dlm_rsb, res_hashnode); r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r); dlm_hold_rsb(r);
ri->rsb = r; ri->rsb = r;
ri->bucket = bucket; ri->bucket = bucket;
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
*pos = n; *pos = n;
return ri; return ri;
} }
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
} }
} }
......
...@@ -204,12 +204,12 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name, ...@@ -204,12 +204,12 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
hash = jhash(name, len, 0); hash = jhash(name, len, 0);
bucket = hash & (ls->ls_rsbtbl_size - 1); bucket = hash & (ls->ls_rsbtbl_size - 1);
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r); rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
if (rv) if (rv)
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
name, len, &r); name, len, &r);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
if (!rv) if (!rv)
return r; return r;
......
...@@ -105,7 +105,6 @@ do { \ ...@@ -105,7 +105,6 @@ do { \
struct dlm_rsbtable { struct dlm_rsbtable {
struct rb_root keep; struct rb_root keep;
struct rb_root toss; struct rb_root toss;
spinlock_t lock;
unsigned long flags; unsigned long flags;
}; };
...@@ -593,6 +592,7 @@ struct dlm_ls { ...@@ -593,6 +592,7 @@ struct dlm_ls {
spinlock_t ls_lkbidr_spin; spinlock_t ls_lkbidr_spin;
struct dlm_rsbtable *ls_rsbtbl; struct dlm_rsbtable *ls_rsbtbl;
spinlock_t ls_rsbtbl_lock;
uint32_t ls_rsbtbl_size; uint32_t ls_rsbtbl_size;
spinlock_t ls_waiters_lock; spinlock_t ls_waiters_lock;
......
...@@ -369,13 +369,12 @@ static inline int dlm_kref_put_lock_bh(struct kref *kref, ...@@ -369,13 +369,12 @@ static inline int dlm_kref_put_lock_bh(struct kref *kref,
static void put_rsb(struct dlm_rsb *r) static void put_rsb(struct dlm_rsb *r)
{ {
struct dlm_ls *ls = r->res_ls; struct dlm_ls *ls = r->res_ls;
uint32_t bucket = r->res_bucket;
int rv; int rv;
rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb, rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
&ls->ls_rsbtbl[bucket].lock); &ls->ls_rsbtbl_lock);
if (rv) if (rv)
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
} }
void dlm_put_rsb(struct dlm_rsb *r) void dlm_put_rsb(struct dlm_rsb *r)
...@@ -615,7 +614,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, ...@@ -615,7 +614,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
goto out; goto out;
} }
spin_lock_bh(&ls->ls_rsbtbl[b].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (error) if (error)
...@@ -685,7 +684,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, ...@@ -685,7 +684,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
error = get_rsb_struct(ls, name, len, &r); error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) { if (error == -EAGAIN) {
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
goto retry; goto retry;
} }
if (error) if (error)
...@@ -734,7 +733,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, ...@@ -734,7 +733,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
out_add: out_add:
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
out_unlock: out_unlock:
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
out: out:
*r_ret = r; *r_ret = r;
return error; return error;
...@@ -759,7 +758,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, ...@@ -759,7 +758,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
if (error < 0) if (error < 0)
goto out; goto out;
spin_lock_bh(&ls->ls_rsbtbl[b].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (error) if (error)
...@@ -817,7 +816,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, ...@@ -817,7 +816,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
error = get_rsb_struct(ls, name, len, &r); error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) { if (error == -EAGAIN) {
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
goto retry; goto retry;
} }
if (error) if (error)
...@@ -832,7 +831,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, ...@@ -832,7 +831,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
out_unlock: out_unlock:
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
out: out:
*r_ret = r; *r_ret = r;
return error; return error;
...@@ -1049,7 +1048,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, ...@@ -1049,7 +1048,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
if (error < 0) if (error < 0)
return error; return error;
spin_lock_bh(&ls->ls_rsbtbl[b].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!error) { if (!error) {
/* because the rsb is active, we need to lock_rsb before /* because the rsb is active, we need to lock_rsb before
...@@ -1057,7 +1056,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, ...@@ -1057,7 +1056,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
*/ */
hold_rsb(r); hold_rsb(r);
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
lock_rsb(r); lock_rsb(r);
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
...@@ -1083,14 +1082,14 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, ...@@ -1083,14 +1082,14 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
r->res_toss_time = jiffies; r->res_toss_time = jiffies;
/* the rsb was inactive (on toss list) */ /* the rsb was inactive (on toss list) */
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
return 0; return 0;
not_found: not_found:
error = get_rsb_struct(ls, name, len, &r); error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) { if (error == -EAGAIN) {
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
goto retry; goto retry;
} }
if (error) if (error)
...@@ -1108,7 +1107,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, ...@@ -1108,7 +1107,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
if (error) { if (error) {
/* should never happen */ /* should never happen */
dlm_free_rsb(r); dlm_free_rsb(r);
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
goto retry; goto retry;
} }
...@@ -1116,7 +1115,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, ...@@ -1116,7 +1115,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
*result = DLM_LU_ADD; *result = DLM_LU_ADD;
*r_nodeid = from_nodeid; *r_nodeid = from_nodeid;
out_unlock: out_unlock:
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
return error; return error;
} }
...@@ -1126,15 +1125,15 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) ...@@ -1126,15 +1125,15 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
struct dlm_rsb *r; struct dlm_rsb *r;
int i; int i;
spin_lock_bh(&ls->ls_rsbtbl_lock);
for (i = 0; i < ls->ls_rsbtbl_size; i++) { for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode); r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (r->res_hash == hash) if (r->res_hash == hash)
dlm_dump_rsb(r); dlm_dump_rsb(r);
} }
spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
} }
spin_unlock_bh(&ls->ls_rsbtbl_lock);
} }
void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
...@@ -1146,7 +1145,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) ...@@ -1146,7 +1145,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
hash = jhash(name, len, 0); hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1); b = hash & (ls->ls_rsbtbl_size - 1);
spin_lock_bh(&ls->ls_rsbtbl[b].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!error) if (!error)
goto out_dump; goto out_dump;
...@@ -1157,7 +1156,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) ...@@ -1157,7 +1156,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
out_dump: out_dump:
dlm_dump_rsb(r); dlm_dump_rsb(r);
out: out:
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
} }
static void toss_rsb(struct kref *kref) static void toss_rsb(struct kref *kref)
...@@ -1621,10 +1620,10 @@ static void shrink_bucket(struct dlm_ls *ls, int b) ...@@ -1621,10 +1620,10 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
spin_lock_bh(&ls->ls_rsbtbl[b].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) { if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
return; return;
} }
...@@ -1681,7 +1680,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b) ...@@ -1681,7 +1680,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags); set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
else else
clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags); clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
/* /*
* While searching for rsb's to free, we found some that require * While searching for rsb's to free, we found some that require
...@@ -1696,16 +1695,16 @@ static void shrink_bucket(struct dlm_ls *ls, int b) ...@@ -1696,16 +1695,16 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
name = ls->ls_remove_names[i]; name = ls->ls_remove_names[i];
len = ls->ls_remove_lens[i]; len = ls->ls_remove_lens[i];
spin_lock_bh(&ls->ls_rsbtbl[b].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (rv) { if (rv) {
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
log_debug(ls, "remove_name not toss %s", name); log_debug(ls, "remove_name not toss %s", name);
continue; continue;
} }
if (r->res_master_nodeid != our_nodeid) { if (r->res_master_nodeid != our_nodeid) {
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
log_debug(ls, "remove_name master %d dir %d our %d %s", log_debug(ls, "remove_name master %d dir %d our %d %s",
r->res_master_nodeid, r->res_dir_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
our_nodeid, name); our_nodeid, name);
...@@ -1714,7 +1713,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b) ...@@ -1714,7 +1713,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
if (r->res_dir_nodeid == our_nodeid) { if (r->res_dir_nodeid == our_nodeid) {
/* should never happen */ /* should never happen */
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
log_error(ls, "remove_name dir %d master %d our %d %s", log_error(ls, "remove_name dir %d master %d our %d %s",
r->res_dir_nodeid, r->res_master_nodeid, r->res_dir_nodeid, r->res_master_nodeid,
our_nodeid, name); our_nodeid, name);
...@@ -1723,21 +1722,21 @@ static void shrink_bucket(struct dlm_ls *ls, int b) ...@@ -1723,21 +1722,21 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
if (!time_after_eq(jiffies, r->res_toss_time + if (!time_after_eq(jiffies, r->res_toss_time +
dlm_config.ci_toss_secs * HZ)) { dlm_config.ci_toss_secs * HZ)) {
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
log_debug(ls, "remove_name toss_time %lu now %lu %s", log_debug(ls, "remove_name toss_time %lu now %lu %s",
r->res_toss_time, jiffies, name); r->res_toss_time, jiffies, name);
continue; continue;
} }
if (!kref_put(&r->res_ref, kill_rsb)) { if (!kref_put(&r->res_ref, kill_rsb)) {
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
log_error(ls, "remove_name in use %s", name); log_error(ls, "remove_name in use %s", name);
continue; continue;
} }
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
send_remove(r); send_remove(r);
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r); dlm_free_rsb(r);
} }
...@@ -4201,7 +4200,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) ...@@ -4201,7 +4200,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
hash = jhash(name, len, 0); hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1); b = hash & (ls->ls_rsbtbl_size - 1);
spin_lock_bh(&ls->ls_rsbtbl[b].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (rv) { if (rv) {
...@@ -4211,7 +4210,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) ...@@ -4211,7 +4210,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
/* should not happen */ /* should not happen */
log_error(ls, "receive_remove from %d not found %s", log_error(ls, "receive_remove from %d not found %s",
from_nodeid, name); from_nodeid, name);
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
return; return;
} }
if (r->res_master_nodeid != from_nodeid) { if (r->res_master_nodeid != from_nodeid) {
...@@ -4219,14 +4218,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) ...@@ -4219,14 +4218,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
log_error(ls, "receive_remove keep from %d master %d", log_error(ls, "receive_remove keep from %d master %d",
from_nodeid, r->res_master_nodeid); from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r); dlm_print_rsb(r);
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
return; return;
} }
log_debug(ls, "receive_remove from %d master %d first %x %s", log_debug(ls, "receive_remove from %d master %d first %x %s",
from_nodeid, r->res_master_nodeid, r->res_first_lkid, from_nodeid, r->res_master_nodeid, r->res_first_lkid,
name); name);
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
return; return;
} }
...@@ -4234,19 +4233,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) ...@@ -4234,19 +4233,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
log_error(ls, "receive_remove toss from %d master %d", log_error(ls, "receive_remove toss from %d master %d",
from_nodeid, r->res_master_nodeid); from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r); dlm_print_rsb(r);
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
return; return;
} }
if (kref_put(&r->res_ref, kill_rsb)) { if (kref_put(&r->res_ref, kill_rsb)) {
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r); dlm_free_rsb(r);
} else { } else {
log_error(ls, "receive_remove from %d rsb ref error", log_error(ls, "receive_remove from %d rsb ref error",
from_nodeid); from_nodeid);
dlm_print_rsb(r); dlm_print_rsb(r);
spin_unlock_bh(&ls->ls_rsbtbl[b].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
} }
} }
...@@ -5314,7 +5313,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) ...@@ -5314,7 +5313,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
struct rb_node *n; struct rb_node *n;
struct dlm_rsb *r; struct dlm_rsb *r;
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock); spin_lock_bh(&ls->ls_rsbtbl_lock);
for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode); r = rb_entry(n, struct dlm_rsb, res_hashnode);
...@@ -5325,10 +5324,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) ...@@ -5325,10 +5324,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
continue; continue;
} }
hold_rsb(r); hold_rsb(r);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
return r; return r;
} }
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock); spin_unlock_bh(&ls->ls_rsbtbl_lock);
return NULL; return NULL;
} }
......
...@@ -495,6 +495,7 @@ static int new_lockspace(const char *name, const char *cluster, ...@@ -495,6 +495,7 @@ static int new_lockspace(const char *name, const char *cluster,
*/ */
ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
spin_lock_init(&ls->ls_rsbtbl_lock);
size = READ_ONCE(dlm_config.ci_rsbtbl_size); size = READ_ONCE(dlm_config.ci_rsbtbl_size);
ls->ls_rsbtbl_size = size; ls->ls_rsbtbl_size = size;
...@@ -504,7 +505,6 @@ static int new_lockspace(const char *name, const char *cluster, ...@@ -504,7 +505,6 @@ static int new_lockspace(const char *name, const char *cluster,
for (i = 0; i < size; i++) { for (i = 0; i < size; i++) {
ls->ls_rsbtbl[i].keep.rb_node = NULL; ls->ls_rsbtbl[i].keep.rb_node = NULL;
ls->ls_rsbtbl[i].toss.rb_node = NULL; ls->ls_rsbtbl[i].toss.rb_node = NULL;
spin_lock_init(&ls->ls_rsbtbl[i].lock);
} }
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
......
...@@ -886,8 +886,8 @@ void dlm_clear_toss(struct dlm_ls *ls) ...@@ -886,8 +886,8 @@ void dlm_clear_toss(struct dlm_ls *ls)
unsigned int count = 0; unsigned int count = 0;
int i; int i;
spin_lock(&ls->ls_rsbtbl_lock);
for (i = 0; i < ls->ls_rsbtbl_size; i++) { for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
next = rb_next(n); next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode); r = rb_entry(n, struct dlm_rsb, res_hashnode);
...@@ -895,8 +895,8 @@ void dlm_clear_toss(struct dlm_ls *ls) ...@@ -895,8 +895,8 @@ void dlm_clear_toss(struct dlm_ls *ls)
dlm_free_rsb(r); dlm_free_rsb(r);
count++; count++;
} }
spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
} }
spin_unlock_bh(&ls->ls_rsbtbl_lock);
if (count) if (count)
log_rinfo(ls, "dlm_clear_toss %u done", count); log_rinfo(ls, "dlm_clear_toss %u done", count);
......
...@@ -33,8 +33,8 @@ static int dlm_create_masters_list(struct dlm_ls *ls) ...@@ -33,8 +33,8 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
goto out; goto out;
} }
spin_lock_bh(&ls->ls_rsbtbl_lock);
for (i = 0; i < ls->ls_rsbtbl_size; i++) { for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode); r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (r->res_nodeid) if (r->res_nodeid)
...@@ -43,8 +43,8 @@ static int dlm_create_masters_list(struct dlm_ls *ls) ...@@ -43,8 +43,8 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
list_add(&r->res_masters_list, &ls->ls_masters_list); list_add(&r->res_masters_list, &ls->ls_masters_list);
dlm_hold_rsb(r); dlm_hold_rsb(r);
} }
spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
} }
spin_unlock_bh(&ls->ls_rsbtbl_lock);
out: out:
write_unlock_bh(&ls->ls_masters_lock); write_unlock_bh(&ls->ls_masters_lock);
return error; return error;
...@@ -68,8 +68,8 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list) ...@@ -68,8 +68,8 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
struct dlm_rsb *r; struct dlm_rsb *r;
int i; int i;
spin_lock_bh(&ls->ls_rsbtbl_lock);
for (i = 0; i < ls->ls_rsbtbl_size; i++) { for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode); r = rb_entry(n, struct dlm_rsb, res_hashnode);
list_add(&r->res_root_list, root_list); list_add(&r->res_root_list, root_list);
...@@ -78,8 +78,8 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list) ...@@ -78,8 +78,8 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss)) if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
log_error(ls, "%s toss not empty", __func__); log_error(ls, "%s toss not empty", __func__);
spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
} }
spin_unlock_bh(&ls->ls_rsbtbl_lock);
} }
static void dlm_release_root_list(struct list_head *root_list) static void dlm_release_root_list(struct list_head *root_list)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment