Commit c04fecb4 authored by David Teigland's avatar David Teigland

dlm: use rsbtbl as resource directory

Remove the dir hash table (dirtbl), and use
the rsb hash table (rsbtbl) as the resource
directory.  It has always been an unnecessary
duplication of information.

This improves efficiency by using a single rsbtbl
lookup in many cases where both rsbtbl and dirtbl
lookups were needed previously.

This eliminates the need to handle cases of rsbtbl
and dirtbl being out of sync.

In many cases there will be memory savings because
the dir hash table no longer exists.
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent ecc72846
...@@ -96,7 +96,6 @@ struct dlm_cluster { ...@@ -96,7 +96,6 @@ struct dlm_cluster {
unsigned int cl_tcp_port; unsigned int cl_tcp_port;
unsigned int cl_buffer_size; unsigned int cl_buffer_size;
unsigned int cl_rsbtbl_size; unsigned int cl_rsbtbl_size;
unsigned int cl_dirtbl_size;
unsigned int cl_recover_timer; unsigned int cl_recover_timer;
unsigned int cl_toss_secs; unsigned int cl_toss_secs;
unsigned int cl_scan_secs; unsigned int cl_scan_secs;
...@@ -113,7 +112,6 @@ enum { ...@@ -113,7 +112,6 @@ enum {
CLUSTER_ATTR_TCP_PORT = 0, CLUSTER_ATTR_TCP_PORT = 0,
CLUSTER_ATTR_BUFFER_SIZE, CLUSTER_ATTR_BUFFER_SIZE,
CLUSTER_ATTR_RSBTBL_SIZE, CLUSTER_ATTR_RSBTBL_SIZE,
CLUSTER_ATTR_DIRTBL_SIZE,
CLUSTER_ATTR_RECOVER_TIMER, CLUSTER_ATTR_RECOVER_TIMER,
CLUSTER_ATTR_TOSS_SECS, CLUSTER_ATTR_TOSS_SECS,
CLUSTER_ATTR_SCAN_SECS, CLUSTER_ATTR_SCAN_SECS,
...@@ -189,7 +187,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write) ...@@ -189,7 +187,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write)
CLUSTER_ATTR(tcp_port, 1); CLUSTER_ATTR(tcp_port, 1);
CLUSTER_ATTR(buffer_size, 1); CLUSTER_ATTR(buffer_size, 1);
CLUSTER_ATTR(rsbtbl_size, 1); CLUSTER_ATTR(rsbtbl_size, 1);
CLUSTER_ATTR(dirtbl_size, 1);
CLUSTER_ATTR(recover_timer, 1); CLUSTER_ATTR(recover_timer, 1);
CLUSTER_ATTR(toss_secs, 1); CLUSTER_ATTR(toss_secs, 1);
CLUSTER_ATTR(scan_secs, 1); CLUSTER_ATTR(scan_secs, 1);
...@@ -204,7 +201,6 @@ static struct configfs_attribute *cluster_attrs[] = { ...@@ -204,7 +201,6 @@ static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
[CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr, [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr,
[CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr, [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr,
[CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr,
[CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr, [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr,
[CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
[CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr,
...@@ -478,7 +474,6 @@ static struct config_group *make_cluster(struct config_group *g, ...@@ -478,7 +474,6 @@ static struct config_group *make_cluster(struct config_group *g,
cl->cl_tcp_port = dlm_config.ci_tcp_port; cl->cl_tcp_port = dlm_config.ci_tcp_port;
cl->cl_buffer_size = dlm_config.ci_buffer_size; cl->cl_buffer_size = dlm_config.ci_buffer_size;
cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size; cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size;
cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size;
cl->cl_recover_timer = dlm_config.ci_recover_timer; cl->cl_recover_timer = dlm_config.ci_recover_timer;
cl->cl_toss_secs = dlm_config.ci_toss_secs; cl->cl_toss_secs = dlm_config.ci_toss_secs;
cl->cl_scan_secs = dlm_config.ci_scan_secs; cl->cl_scan_secs = dlm_config.ci_scan_secs;
...@@ -1050,7 +1045,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) ...@@ -1050,7 +1045,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_TCP_PORT 21064 #define DEFAULT_TCP_PORT 21064
#define DEFAULT_BUFFER_SIZE 4096 #define DEFAULT_BUFFER_SIZE 4096
#define DEFAULT_RSBTBL_SIZE 1024 #define DEFAULT_RSBTBL_SIZE 1024
#define DEFAULT_DIRTBL_SIZE 1024
#define DEFAULT_RECOVER_TIMER 5 #define DEFAULT_RECOVER_TIMER 5
#define DEFAULT_TOSS_SECS 10 #define DEFAULT_TOSS_SECS 10
#define DEFAULT_SCAN_SECS 5 #define DEFAULT_SCAN_SECS 5
...@@ -1066,7 +1060,6 @@ struct dlm_config_info dlm_config = { ...@@ -1066,7 +1060,6 @@ struct dlm_config_info dlm_config = {
.ci_tcp_port = DEFAULT_TCP_PORT, .ci_tcp_port = DEFAULT_TCP_PORT,
.ci_buffer_size = DEFAULT_BUFFER_SIZE, .ci_buffer_size = DEFAULT_BUFFER_SIZE,
.ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE, .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
.ci_dirtbl_size = DEFAULT_DIRTBL_SIZE,
.ci_recover_timer = DEFAULT_RECOVER_TIMER, .ci_recover_timer = DEFAULT_RECOVER_TIMER,
.ci_toss_secs = DEFAULT_TOSS_SECS, .ci_toss_secs = DEFAULT_TOSS_SECS,
.ci_scan_secs = DEFAULT_SCAN_SECS, .ci_scan_secs = DEFAULT_SCAN_SECS,
......
...@@ -27,7 +27,6 @@ struct dlm_config_info { ...@@ -27,7 +27,6 @@ struct dlm_config_info {
int ci_tcp_port; int ci_tcp_port;
int ci_buffer_size; int ci_buffer_size;
int ci_rsbtbl_size; int ci_rsbtbl_size;
int ci_dirtbl_size;
int ci_recover_timer; int ci_recover_timer;
int ci_toss_secs; int ci_toss_secs;
int ci_scan_secs; int ci_scan_secs;
......
...@@ -344,6 +344,45 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s) ...@@ -344,6 +344,45 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s)
return rv; return rv;
} }
static int print_format4(struct dlm_rsb *r, struct seq_file *s)
{
int our_nodeid = dlm_our_nodeid();
int print_name = 1;
int i, rv;
lock_rsb(r);
rv = seq_printf(s, "rsb %p %d %d %d %d %lu %lx %d ",
r,
r->res_nodeid,
r->res_master_nodeid,
r->res_dir_nodeid,
our_nodeid,
r->res_toss_time,
r->res_flags,
r->res_length);
if (rv)
goto out;
for (i = 0; i < r->res_length; i++) {
if (!isascii(r->res_name[i]) || !isprint(r->res_name[i]))
print_name = 0;
}
seq_printf(s, "%s", print_name ? "str " : "hex");
for (i = 0; i < r->res_length; i++) {
if (print_name)
seq_printf(s, "%c", r->res_name[i]);
else
seq_printf(s, " %02x", (unsigned char)r->res_name[i]);
}
rv = seq_printf(s, "\n");
out:
unlock_rsb(r);
return rv;
}
struct rsbtbl_iter { struct rsbtbl_iter {
struct dlm_rsb *rsb; struct dlm_rsb *rsb;
unsigned bucket; unsigned bucket;
...@@ -382,6 +421,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr) ...@@ -382,6 +421,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
} }
rv = print_format3(ri->rsb, seq); rv = print_format3(ri->rsb, seq);
break; break;
case 4:
if (ri->header) {
seq_printf(seq, "version 4 rsb 2\n");
ri->header = 0;
}
rv = print_format4(ri->rsb, seq);
break;
} }
return rv; return rv;
...@@ -390,15 +436,18 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr) ...@@ -390,15 +436,18 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
static const struct seq_operations format1_seq_ops; static const struct seq_operations format1_seq_ops;
static const struct seq_operations format2_seq_ops; static const struct seq_operations format2_seq_ops;
static const struct seq_operations format3_seq_ops; static const struct seq_operations format3_seq_ops;
static const struct seq_operations format4_seq_ops;
static void *table_seq_start(struct seq_file *seq, loff_t *pos) static void *table_seq_start(struct seq_file *seq, loff_t *pos)
{ {
struct rb_root *tree;
struct rb_node *node; struct rb_node *node;
struct dlm_ls *ls = seq->private; struct dlm_ls *ls = seq->private;
struct rsbtbl_iter *ri; struct rsbtbl_iter *ri;
struct dlm_rsb *r; struct dlm_rsb *r;
loff_t n = *pos; loff_t n = *pos;
unsigned bucket, entry; unsigned bucket, entry;
int toss = (seq->op == &format4_seq_ops);
bucket = n >> 32; bucket = n >> 32;
entry = n & ((1LL << 32) - 1); entry = n & ((1LL << 32) - 1);
...@@ -417,11 +466,14 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) ...@@ -417,11 +466,14 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
ri->format = 2; ri->format = 2;
if (seq->op == &format3_seq_ops) if (seq->op == &format3_seq_ops)
ri->format = 3; ri->format = 3;
if (seq->op == &format4_seq_ops)
ri->format = 4;
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock(&ls->ls_rsbtbl[bucket].lock); spin_lock(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { if (!RB_EMPTY_ROOT(tree)) {
for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node; for (node = rb_first(tree); node; node = rb_next(node)) {
node = rb_next(node)) {
r = rb_entry(node, struct dlm_rsb, res_hashnode); r = rb_entry(node, struct dlm_rsb, res_hashnode);
if (!entry--) { if (!entry--) {
dlm_hold_rsb(r); dlm_hold_rsb(r);
...@@ -449,10 +501,11 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) ...@@ -449,10 +501,11 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
kfree(ri); kfree(ri);
return NULL; return NULL;
} }
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock(&ls->ls_rsbtbl[bucket].lock); spin_lock(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { if (!RB_EMPTY_ROOT(tree)) {
node = rb_first(&ls->ls_rsbtbl[bucket].keep); node = rb_first(tree);
r = rb_entry(node, struct dlm_rsb, res_hashnode); r = rb_entry(node, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r); dlm_hold_rsb(r);
ri->rsb = r; ri->rsb = r;
...@@ -469,10 +522,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) ...@@ -469,10 +522,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
{ {
struct dlm_ls *ls = seq->private; struct dlm_ls *ls = seq->private;
struct rsbtbl_iter *ri = iter_ptr; struct rsbtbl_iter *ri = iter_ptr;
struct rb_root *tree;
struct rb_node *next; struct rb_node *next;
struct dlm_rsb *r, *rp; struct dlm_rsb *r, *rp;
loff_t n = *pos; loff_t n = *pos;
unsigned bucket; unsigned bucket;
int toss = (seq->op == &format4_seq_ops);
bucket = n >> 32; bucket = n >> 32;
...@@ -511,10 +566,11 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) ...@@ -511,10 +566,11 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
kfree(ri); kfree(ri);
return NULL; return NULL;
} }
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock(&ls->ls_rsbtbl[bucket].lock); spin_lock(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { if (!RB_EMPTY_ROOT(tree)) {
next = rb_first(&ls->ls_rsbtbl[bucket].keep); next = rb_first(tree);
r = rb_entry(next, struct dlm_rsb, res_hashnode); r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r); dlm_hold_rsb(r);
ri->rsb = r; ri->rsb = r;
...@@ -558,9 +614,17 @@ static const struct seq_operations format3_seq_ops = { ...@@ -558,9 +614,17 @@ static const struct seq_operations format3_seq_ops = {
.show = table_seq_show, .show = table_seq_show,
}; };
static const struct seq_operations format4_seq_ops = {
.start = table_seq_start,
.next = table_seq_next,
.stop = table_seq_stop,
.show = table_seq_show,
};
static const struct file_operations format1_fops; static const struct file_operations format1_fops;
static const struct file_operations format2_fops; static const struct file_operations format2_fops;
static const struct file_operations format3_fops; static const struct file_operations format3_fops;
static const struct file_operations format4_fops;
static int table_open(struct inode *inode, struct file *file) static int table_open(struct inode *inode, struct file *file)
{ {
...@@ -573,6 +637,8 @@ static int table_open(struct inode *inode, struct file *file) ...@@ -573,6 +637,8 @@ static int table_open(struct inode *inode, struct file *file)
ret = seq_open(file, &format2_seq_ops); ret = seq_open(file, &format2_seq_ops);
else if (file->f_op == &format3_fops) else if (file->f_op == &format3_fops)
ret = seq_open(file, &format3_seq_ops); ret = seq_open(file, &format3_seq_ops);
else if (file->f_op == &format4_fops)
ret = seq_open(file, &format4_seq_ops);
if (ret) if (ret)
return ret; return ret;
...@@ -606,6 +672,14 @@ static const struct file_operations format3_fops = { ...@@ -606,6 +672,14 @@ static const struct file_operations format3_fops = {
.release = seq_release .release = seq_release
}; };
static const struct file_operations format4_fops = {
.owner = THIS_MODULE,
.open = table_open,
.read = seq_read,
.llseek = seq_lseek,
.release = seq_release
};
/* /*
* dump lkb's on the ls_waiters list * dump lkb's on the ls_waiters list
*/ */
...@@ -652,6 +726,8 @@ void dlm_delete_debug_file(struct dlm_ls *ls) ...@@ -652,6 +726,8 @@ void dlm_delete_debug_file(struct dlm_ls *ls)
debugfs_remove(ls->ls_debug_locks_dentry); debugfs_remove(ls->ls_debug_locks_dentry);
if (ls->ls_debug_all_dentry) if (ls->ls_debug_all_dentry)
debugfs_remove(ls->ls_debug_all_dentry); debugfs_remove(ls->ls_debug_all_dentry);
if (ls->ls_debug_toss_dentry)
debugfs_remove(ls->ls_debug_toss_dentry);
} }
int dlm_create_debug_file(struct dlm_ls *ls) int dlm_create_debug_file(struct dlm_ls *ls)
...@@ -694,6 +770,19 @@ int dlm_create_debug_file(struct dlm_ls *ls) ...@@ -694,6 +770,19 @@ int dlm_create_debug_file(struct dlm_ls *ls)
if (!ls->ls_debug_all_dentry) if (!ls->ls_debug_all_dentry)
goto fail; goto fail;
/* format 4 */
memset(name, 0, sizeof(name));
snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_toss", ls->ls_name);
ls->ls_debug_toss_dentry = debugfs_create_file(name,
S_IFREG | S_IRUGO,
dlm_root,
ls,
&format4_fops);
if (!ls->ls_debug_toss_dentry)
goto fail;
memset(name, 0, sizeof(name)); memset(name, 0, sizeof(name));
snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_waiters", ls->ls_name); snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_waiters", ls->ls_name);
......
...@@ -23,50 +23,6 @@ ...@@ -23,50 +23,6 @@
#include "lock.h" #include "lock.h"
#include "dir.h" #include "dir.h"
static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
{
spin_lock(&ls->ls_recover_list_lock);
list_add(&de->list, &ls->ls_recover_list);
spin_unlock(&ls->ls_recover_list_lock);
}
static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
{
int found = 0;
struct dlm_direntry *de;
spin_lock(&ls->ls_recover_list_lock);
list_for_each_entry(de, &ls->ls_recover_list, list) {
if (de->length == len) {
list_del(&de->list);
de->master_nodeid = 0;
memset(de->name, 0, len);
found = 1;
break;
}
}
spin_unlock(&ls->ls_recover_list_lock);
if (!found)
de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
return de;
}
void dlm_clear_free_entries(struct dlm_ls *ls)
{
struct dlm_direntry *de;
spin_lock(&ls->ls_recover_list_lock);
while (!list_empty(&ls->ls_recover_list)) {
de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
list);
list_del(&de->list);
kfree(de);
}
spin_unlock(&ls->ls_recover_list_lock);
}
/* /*
* We use the upper 16 bits of the hash value to select the directory node. * We use the upper 16 bits of the hash value to select the directory node.
* Low bits are used for distribution of rsb's among hash buckets on each node. * Low bits are used for distribution of rsb's among hash buckets on each node.
...@@ -78,144 +34,53 @@ void dlm_clear_free_entries(struct dlm_ls *ls) ...@@ -78,144 +34,53 @@ void dlm_clear_free_entries(struct dlm_ls *ls)
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
{ {
struct list_head *tmp; uint32_t node;
struct dlm_member *memb = NULL;
uint32_t node, n = 0;
int nodeid;
if (ls->ls_num_nodes == 1) {
nodeid = dlm_our_nodeid();
goto out;
}
if (ls->ls_node_array) { if (ls->ls_num_nodes == 1)
return dlm_our_nodeid();
else {
node = (hash >> 16) % ls->ls_total_weight; node = (hash >> 16) % ls->ls_total_weight;
nodeid = ls->ls_node_array[node]; return ls->ls_node_array[node];
goto out;
}
/* make_member_array() failed to kmalloc ls_node_array... */
node = (hash >> 16) % ls->ls_num_nodes;
list_for_each(tmp, &ls->ls_nodes) {
if (n++ != node)
continue;
memb = list_entry(tmp, struct dlm_member, list);
break;
} }
DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
ls->ls_num_nodes, n, node););
nodeid = memb->nodeid;
out:
return nodeid;
} }
int dlm_dir_nodeid(struct dlm_rsb *r) int dlm_dir_nodeid(struct dlm_rsb *r)
{ {
return dlm_hash2nodeid(r->res_ls, r->res_hash); return r->res_dir_nodeid;
}
static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
{
uint32_t val;
val = jhash(name, len, 0);
val &= (ls->ls_dirtbl_size - 1);
return val;
}
static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
{
uint32_t bucket;
bucket = dir_hash(ls, de->name, de->length);
list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
} }
static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name, void dlm_recover_dir_nodeid(struct dlm_ls *ls)
int namelen, uint32_t bucket)
{ {
struct dlm_direntry *de; struct dlm_rsb *r;
list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
if (de->length == namelen && !memcmp(name, de->name, namelen))
goto out;
}
de = NULL;
out:
return de;
}
void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
{
struct dlm_direntry *de;
uint32_t bucket;
bucket = dir_hash(ls, name, namelen);
spin_lock(&ls->ls_dirtbl[bucket].lock);
de = search_bucket(ls, name, namelen, bucket);
if (!de) {
log_error(ls, "remove fr %u none", nodeid);
goto out;
}
if (de->master_nodeid != nodeid) {
log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
goto out;
}
list_del(&de->list);
kfree(de);
out:
spin_unlock(&ls->ls_dirtbl[bucket].lock);
}
void dlm_dir_clear(struct dlm_ls *ls) down_read(&ls->ls_root_sem);
{ list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
struct list_head *head; r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
struct dlm_direntry *de;
int i;
DLM_ASSERT(list_empty(&ls->ls_recover_list), );
for (i = 0; i < ls->ls_dirtbl_size; i++) {
spin_lock(&ls->ls_dirtbl[i].lock);
head = &ls->ls_dirtbl[i].list;
while (!list_empty(head)) {
de = list_entry(head->next, struct dlm_direntry, list);
list_del(&de->list);
put_free_de(ls, de);
}
spin_unlock(&ls->ls_dirtbl[i].lock);
} }
up_read(&ls->ls_root_sem);
} }
int dlm_recover_directory(struct dlm_ls *ls) int dlm_recover_directory(struct dlm_ls *ls)
{ {
struct dlm_member *memb; struct dlm_member *memb;
struct dlm_direntry *de;
char *b, *last_name = NULL; char *b, *last_name = NULL;
int error = -ENOMEM, last_len, count = 0; int error = -ENOMEM, last_len, nodeid, result;
uint16_t namelen; uint16_t namelen;
unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
log_debug(ls, "dlm_recover_directory"); log_debug(ls, "dlm_recover_directory");
if (dlm_no_directory(ls)) if (dlm_no_directory(ls))
goto out_status; goto out_status;
dlm_dir_clear(ls);
last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS); last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
if (!last_name) if (!last_name)
goto out; goto out;
list_for_each_entry(memb, &ls->ls_nodes, list) { list_for_each_entry(memb, &ls->ls_nodes, list) {
if (memb->nodeid == dlm_our_nodeid())
continue;
memset(last_name, 0, DLM_RESNAME_MAXLEN); memset(last_name, 0, DLM_RESNAME_MAXLEN);
last_len = 0; last_len = 0;
...@@ -230,7 +95,7 @@ int dlm_recover_directory(struct dlm_ls *ls) ...@@ -230,7 +95,7 @@ int dlm_recover_directory(struct dlm_ls *ls)
if (error) if (error)
goto out_free; goto out_free;
schedule(); cond_resched();
/* /*
* pick namelen/name pairs out of received buffer * pick namelen/name pairs out of received buffer
...@@ -267,87 +132,71 @@ int dlm_recover_directory(struct dlm_ls *ls) ...@@ -267,87 +132,71 @@ int dlm_recover_directory(struct dlm_ls *ls)
if (namelen > DLM_RESNAME_MAXLEN) if (namelen > DLM_RESNAME_MAXLEN)
goto out_free; goto out_free;
error = -ENOMEM; error = dlm_master_lookup(ls, memb->nodeid,
de = get_free_de(ls, namelen); b, namelen,
if (!de) DLM_LU_RECOVER_DIR,
&nodeid, &result);
if (error) {
log_error(ls, "recover_dir lookup %d",
error);
goto out_free; goto out_free;
}
/* The name was found in rsbtbl, but the
* master nodeid is different from
* memb->nodeid which says it is the master.
* This should not happen. */
if (result == DLM_LU_MATCH &&
nodeid != memb->nodeid) {
count_bad++;
log_error(ls, "recover_dir lookup %d "
"nodeid %d memb %d bad %u",
result, nodeid, memb->nodeid,
count_bad);
print_hex_dump_bytes("dlm_recover_dir ",
DUMP_PREFIX_NONE,
b, namelen);
}
/* The name was found in rsbtbl, and the
* master nodeid matches memb->nodeid. */
if (result == DLM_LU_MATCH &&
nodeid == memb->nodeid) {
count_match++;
}
/* The name was not found in rsbtbl and was
* added with memb->nodeid as the master. */
if (result == DLM_LU_ADD) {
count_add++;
}
de->master_nodeid = memb->nodeid;
de->length = namelen;
last_len = namelen; last_len = namelen;
memcpy(de->name, b, namelen);
memcpy(last_name, b, namelen); memcpy(last_name, b, namelen);
b += namelen; b += namelen;
left -= namelen; left -= namelen;
add_entry_to_hash(ls, de);
count++; count++;
} }
} }
done: done:
; ;
} }
out_status: out_status:
error = 0; error = 0;
log_debug(ls, "dlm_recover_directory %d entries", count); dlm_set_recover_status(ls, DLM_RS_DIR);
log_debug(ls, "dlm_recover_directory %u in %u new",
count, count_add);
out_free: out_free:
kfree(last_name); kfree(last_name);
out: out:
dlm_clear_free_entries(ls);
return error; return error;
} }
static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
int namelen, int *r_nodeid)
{
struct dlm_direntry *de, *tmp;
uint32_t bucket;
bucket = dir_hash(ls, name, namelen);
spin_lock(&ls->ls_dirtbl[bucket].lock);
de = search_bucket(ls, name, namelen, bucket);
if (de) {
*r_nodeid = de->master_nodeid;
spin_unlock(&ls->ls_dirtbl[bucket].lock);
if (*r_nodeid == nodeid)
return -EEXIST;
return 0;
}
spin_unlock(&ls->ls_dirtbl[bucket].lock);
if (namelen > DLM_RESNAME_MAXLEN)
return -EINVAL;
de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
if (!de)
return -ENOMEM;
de->master_nodeid = nodeid;
de->length = namelen;
memcpy(de->name, name, namelen);
spin_lock(&ls->ls_dirtbl[bucket].lock);
tmp = search_bucket(ls, name, namelen, bucket);
if (tmp) {
kfree(de);
de = tmp;
} else {
list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
}
*r_nodeid = de->master_nodeid;
spin_unlock(&ls->ls_dirtbl[bucket].lock);
return 0;
}
int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
int *r_nodeid)
{
return get_entry(ls, nodeid, name, namelen, r_nodeid);
}
static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -358,10 +207,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) ...@@ -358,10 +207,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
bucket = hash & (ls->ls_rsbtbl_size - 1); bucket = hash & (ls->ls_rsbtbl_size - 1);
spin_lock(&ls->ls_rsbtbl[bucket].lock); spin_lock(&ls->ls_rsbtbl[bucket].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r); rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
if (rv) if (rv)
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
name, len, 0, &r); name, len, &r);
spin_unlock(&ls->ls_rsbtbl[bucket].lock); spin_unlock(&ls->ls_rsbtbl[bucket].lock);
if (!rv) if (!rv)
...@@ -371,7 +220,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) ...@@ -371,7 +220,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
list_for_each_entry(r, &ls->ls_root_list, res_root_list) { list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (len == r->res_length && !memcmp(name, r->res_name, len)) { if (len == r->res_length && !memcmp(name, r->res_name, len)) {
up_read(&ls->ls_root_sem); up_read(&ls->ls_root_sem);
log_error(ls, "find_rsb_root revert to root_list %s", log_debug(ls, "find_rsb_root revert to root_list %s",
r->res_name); r->res_name);
return r; return r;
} }
...@@ -429,6 +278,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, ...@@ -429,6 +278,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
be_namelen = cpu_to_be16(0); be_namelen = cpu_to_be16(0);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16); offset += sizeof(__be16);
ls->ls_recover_dir_sent_msg++;
goto out; goto out;
} }
...@@ -437,6 +287,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, ...@@ -437,6 +287,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
offset += sizeof(__be16); offset += sizeof(__be16);
memcpy(outbuf + offset, r->res_name, r->res_length); memcpy(outbuf + offset, r->res_name, r->res_length);
offset += r->res_length; offset += r->res_length;
ls->ls_recover_dir_sent_res++;
} }
/* /*
...@@ -449,8 +300,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, ...@@ -449,8 +300,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
be_namelen = cpu_to_be16(0xFFFF); be_namelen = cpu_to_be16(0xFFFF);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16); offset += sizeof(__be16);
ls->ls_recover_dir_sent_msg++;
} }
out: out:
up_read(&ls->ls_root_sem); up_read(&ls->ls_root_sem);
} }
......
...@@ -14,15 +14,10 @@ ...@@ -14,15 +14,10 @@
#ifndef __DIR_DOT_H__ #ifndef __DIR_DOT_H__
#define __DIR_DOT_H__ #define __DIR_DOT_H__
int dlm_dir_nodeid(struct dlm_rsb *rsb); int dlm_dir_nodeid(struct dlm_rsb *rsb);
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash); int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int len); void dlm_recover_dir_nodeid(struct dlm_ls *ls);
void dlm_dir_clear(struct dlm_ls *ls);
void dlm_clear_free_entries(struct dlm_ls *ls);
int dlm_recover_directory(struct dlm_ls *ls); int dlm_recover_directory(struct dlm_ls *ls);
int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
int *r_nodeid);
void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid); char *outbuf, int outlen, int nodeid);
......
...@@ -55,8 +55,6 @@ struct dlm_lkb; ...@@ -55,8 +55,6 @@ struct dlm_lkb;
struct dlm_rsb; struct dlm_rsb;
struct dlm_member; struct dlm_member;
struct dlm_rsbtable; struct dlm_rsbtable;
struct dlm_dirtable;
struct dlm_direntry;
struct dlm_recover; struct dlm_recover;
struct dlm_header; struct dlm_header;
struct dlm_message; struct dlm_message;
...@@ -98,18 +96,6 @@ do { \ ...@@ -98,18 +96,6 @@ do { \
} }
struct dlm_direntry {
struct list_head list;
uint32_t master_nodeid;
uint16_t length;
char name[1];
};
struct dlm_dirtable {
struct list_head list;
spinlock_t lock;
};
struct dlm_rsbtable { struct dlm_rsbtable {
struct rb_root keep; struct rb_root keep;
struct rb_root toss; struct rb_root toss;
...@@ -283,6 +269,15 @@ struct dlm_lkb { ...@@ -283,6 +269,15 @@ struct dlm_lkb {
}; };
}; };
/*
* res_master_nodeid is "normal": 0 is unset/invalid, non-zero is the real
* nodeid, even when nodeid is our_nodeid.
*
* res_nodeid is "odd": -1 is unset/invalid, zero means our_nodeid,
* greater than zero when another nodeid.
*
* (TODO: remove res_nodeid and only use res_master_nodeid)
*/
struct dlm_rsb { struct dlm_rsb {
struct dlm_ls *res_ls; /* the lockspace */ struct dlm_ls *res_ls; /* the lockspace */
...@@ -291,6 +286,8 @@ struct dlm_rsb { ...@@ -291,6 +286,8 @@ struct dlm_rsb {
unsigned long res_flags; unsigned long res_flags;
int res_length; /* length of rsb name */ int res_length; /* length of rsb name */
int res_nodeid; int res_nodeid;
int res_master_nodeid;
int res_dir_nodeid;
uint32_t res_lvbseq; uint32_t res_lvbseq;
uint32_t res_hash; uint32_t res_hash;
uint32_t res_bucket; /* rsbtbl */ uint32_t res_bucket; /* rsbtbl */
...@@ -313,10 +310,21 @@ struct dlm_rsb { ...@@ -313,10 +310,21 @@ struct dlm_rsb {
char res_name[DLM_RESNAME_MAXLEN+1]; char res_name[DLM_RESNAME_MAXLEN+1];
}; };
/* dlm_master_lookup() flags */
#define DLM_LU_RECOVER_DIR 1
#define DLM_LU_RECOVER_MASTER 2
/* dlm_master_lookup() results */
#define DLM_LU_MATCH 1
#define DLM_LU_ADD 2
/* find_rsb() flags */ /* find_rsb() flags */
#define R_MASTER 1 /* only return rsb if it's a master */ #define R_REQUEST 0x00000001
#define R_CREATE 2 /* create/add rsb if not found */ #define R_RECEIVE_REQUEST 0x00000002
#define R_RECEIVE_RECOVER 0x00000004
/* rsb_flags */ /* rsb_flags */
...@@ -509,9 +517,6 @@ struct dlm_ls { ...@@ -509,9 +517,6 @@ struct dlm_ls {
struct dlm_rsbtable *ls_rsbtbl; struct dlm_rsbtable *ls_rsbtbl;
uint32_t ls_rsbtbl_size; uint32_t ls_rsbtbl_size;
struct dlm_dirtable *ls_dirtbl;
uint32_t ls_dirtbl_size;
struct mutex ls_waiters_mutex; struct mutex ls_waiters_mutex;
struct list_head ls_waiters; /* lkbs needing a reply */ struct list_head ls_waiters; /* lkbs needing a reply */
...@@ -545,6 +550,7 @@ struct dlm_ls { ...@@ -545,6 +550,7 @@ struct dlm_ls {
struct dentry *ls_debug_waiters_dentry; /* debugfs */ struct dentry *ls_debug_waiters_dentry; /* debugfs */
struct dentry *ls_debug_locks_dentry; /* debugfs */ struct dentry *ls_debug_locks_dentry; /* debugfs */
struct dentry *ls_debug_all_dentry; /* debugfs */ struct dentry *ls_debug_all_dentry; /* debugfs */
struct dentry *ls_debug_toss_dentry; /* debugfs */
wait_queue_head_t ls_uevent_wait; /* user part of join/leave */ wait_queue_head_t ls_uevent_wait; /* user part of join/leave */
int ls_uevent_result; int ls_uevent_result;
...@@ -573,6 +579,8 @@ struct dlm_ls { ...@@ -573,6 +579,8 @@ struct dlm_ls {
struct mutex ls_requestqueue_mutex; struct mutex ls_requestqueue_mutex;
struct dlm_rcom *ls_recover_buf; struct dlm_rcom *ls_recover_buf;
int ls_recover_nodeid; /* for debugging */ int ls_recover_nodeid; /* for debugging */
unsigned int ls_recover_dir_sent_res; /* for log info */
unsigned int ls_recover_dir_sent_msg; /* for log info */
unsigned int ls_recover_locks_in; /* for log info */ unsigned int ls_recover_locks_in; /* for log info */
uint64_t ls_rcom_seq; uint64_t ls_rcom_seq;
spinlock_t ls_rcom_spin; spinlock_t ls_rcom_spin;
......
This diff is collapsed.
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#define __LOCK_DOT_H__ #define __LOCK_DOT_H__
void dlm_dump_rsb(struct dlm_rsb *r); void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len);
void dlm_print_lkb(struct dlm_lkb *lkb); void dlm_print_lkb(struct dlm_lkb *lkb);
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
uint32_t saved_seq); uint32_t saved_seq);
...@@ -28,9 +29,11 @@ void dlm_unlock_recovery(struct dlm_ls *ls); ...@@ -28,9 +29,11 @@ void dlm_unlock_recovery(struct dlm_ls *ls);
void dlm_scan_waiters(struct dlm_ls *ls); void dlm_scan_waiters(struct dlm_ls *ls);
void dlm_scan_timeout(struct dlm_ls *ls); void dlm_scan_timeout(struct dlm_ls *ls);
void dlm_adjust_timeouts(struct dlm_ls *ls); void dlm_adjust_timeouts(struct dlm_ls *ls);
int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len,
unsigned int flags, int *r_nodeid, int *result);
int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
unsigned int flags, struct dlm_rsb **r_ret); struct dlm_rsb **r_ret);
void dlm_recover_purge(struct dlm_ls *ls); void dlm_recover_purge(struct dlm_ls *ls);
void dlm_purge_mstcpy_locks(struct dlm_rsb *r); void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
......
...@@ -509,17 +509,6 @@ static int new_lockspace(const char *name, const char *cluster, ...@@ -509,17 +509,6 @@ static int new_lockspace(const char *name, const char *cluster,
idr_init(&ls->ls_lkbidr); idr_init(&ls->ls_lkbidr);
spin_lock_init(&ls->ls_lkbidr_spin); spin_lock_init(&ls->ls_lkbidr_spin);
size = dlm_config.ci_dirtbl_size;
ls->ls_dirtbl_size = size;
ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
if (!ls->ls_dirtbl)
goto out_lkbfree;
for (i = 0; i < size; i++) {
INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
spin_lock_init(&ls->ls_dirtbl[i].lock);
}
INIT_LIST_HEAD(&ls->ls_waiters); INIT_LIST_HEAD(&ls->ls_waiters);
mutex_init(&ls->ls_waiters_mutex); mutex_init(&ls->ls_waiters_mutex);
INIT_LIST_HEAD(&ls->ls_orphans); INIT_LIST_HEAD(&ls->ls_orphans);
...@@ -567,7 +556,7 @@ static int new_lockspace(const char *name, const char *cluster, ...@@ -567,7 +556,7 @@ static int new_lockspace(const char *name, const char *cluster,
ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
if (!ls->ls_recover_buf) if (!ls->ls_recover_buf)
goto out_dirfree; goto out_lkbfree;
ls->ls_slot = 0; ls->ls_slot = 0;
ls->ls_num_slots = 0; ls->ls_num_slots = 0;
...@@ -648,8 +637,6 @@ static int new_lockspace(const char *name, const char *cluster, ...@@ -648,8 +637,6 @@ static int new_lockspace(const char *name, const char *cluster,
list_del(&ls->ls_list); list_del(&ls->ls_list);
spin_unlock(&lslist_lock); spin_unlock(&lslist_lock);
kfree(ls->ls_recover_buf); kfree(ls->ls_recover_buf);
out_dirfree:
vfree(ls->ls_dirtbl);
out_lkbfree: out_lkbfree:
idr_destroy(&ls->ls_lkbidr); idr_destroy(&ls->ls_lkbidr);
vfree(ls->ls_rsbtbl); vfree(ls->ls_rsbtbl);
...@@ -778,13 +765,6 @@ static int release_lockspace(struct dlm_ls *ls, int force) ...@@ -778,13 +765,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
kfree(ls->ls_recover_buf); kfree(ls->ls_recover_buf);
/*
* Free direntry structs.
*/
dlm_dir_clear(ls);
vfree(ls->ls_dirtbl);
/* /*
* Free all lkb's in idr * Free all lkb's in idr
*/ */
...@@ -826,7 +806,6 @@ static int release_lockspace(struct dlm_ls *ls, int force) ...@@ -826,7 +806,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
dlm_purge_requestqueue(ls); dlm_purge_requestqueue(ls);
kfree(ls->ls_recover_args); kfree(ls->ls_recover_args);
dlm_clear_free_entries(ls);
dlm_clear_members(ls); dlm_clear_members(ls);
dlm_clear_members_gone(ls); dlm_clear_members_gone(ls);
kfree(ls->ls_node_array); kfree(ls->ls_node_array);
......
...@@ -23,8 +23,6 @@ ...@@ -23,8 +23,6 @@
#include "memory.h" #include "memory.h"
#include "lock.h" #include "lock.h"
#include "util.h" #include "util.h"
#include "member.h"
static int rcom_response(struct dlm_ls *ls) static int rcom_response(struct dlm_ls *ls)
{ {
...@@ -275,19 +273,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) ...@@ -275,19 +273,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
struct dlm_rcom *rc; struct dlm_rcom *rc;
struct dlm_mhandle *mh; struct dlm_mhandle *mh;
int error = 0; int error = 0;
int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
ls->ls_recover_nodeid = nodeid; ls->ls_recover_nodeid = nodeid;
if (nodeid == dlm_our_nodeid()) {
ls->ls_recover_buf->rc_header.h_length =
dlm_config.ci_buffer_size;
dlm_copy_master_names(ls, last_name, last_len,
ls->ls_recover_buf->rc_buf,
max_size, nodeid);
goto out;
}
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh); error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
if (error) if (error)
goto out; goto out;
...@@ -344,6 +332,25 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) ...@@ -344,6 +332,25 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
return error; return error;
} }
int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
struct dlm_ls *ls = r->res_ls;
int error;
error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
&rc, &mh);
if (error)
goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length);
rc->rc_id = 0xFFFFFFFF;
send_rcom(ls, mh, rc);
out:
return error;
}
static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{ {
struct dlm_rcom *rc; struct dlm_rcom *rc;
...@@ -355,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) ...@@ -355,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
if (error) if (error)
return; return;
error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid); if (rc_in->rc_id == 0xFFFFFFFF) {
log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
return;
}
error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
if (error) if (error)
ret_nodeid = error; ret_nodeid = error;
rc->rc_result = ret_nodeid; rc->rc_result = ret_nodeid;
...@@ -486,17 +500,76 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) ...@@ -486,17 +500,76 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
return 0; return 0;
} }
/*
* Ignore messages for stage Y before we set
* recover_status bit for stage X:
*
* recover_status = 0
*
* dlm_recover_members()
* - send nothing
* - recv nothing
* - ignore NAMES, NAMES_REPLY
* - ignore LOOKUP, LOOKUP_REPLY
* - ignore LOCK, LOCK_REPLY
*
* recover_status |= NODES
*
* dlm_recover_members_wait()
*
* dlm_recover_directory()
* - send NAMES
* - recv NAMES_REPLY
* - ignore LOOKUP, LOOKUP_REPLY
* - ignore LOCK, LOCK_REPLY
*
* recover_status |= DIR
*
* dlm_recover_directory_wait()
*
* dlm_recover_masters()
* - send LOOKUP
* - recv LOOKUP_REPLY
*
* dlm_recover_locks()
* - send LOCKS
* - recv LOCKS_REPLY
*
* recover_status |= LOCKS
*
* dlm_recover_locks_wait()
*
* recover_status |= DONE
*/
/* Called by dlm_recv; corresponds to dlm_receive_message() but special /* Called by dlm_recv; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */ recovery-only comms are sent through here. */
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{ {
int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
int stop, reply = 0, lock = 0; int stop, reply = 0, names = 0, lookup = 0, lock = 0;
uint32_t status; uint32_t status;
uint64_t seq; uint64_t seq;
switch (rc->rc_type) { switch (rc->rc_type) {
case DLM_RCOM_STATUS_REPLY:
reply = 1;
break;
case DLM_RCOM_NAMES:
names = 1;
break;
case DLM_RCOM_NAMES_REPLY:
names = 1;
reply = 1;
break;
case DLM_RCOM_LOOKUP:
lookup = 1;
break;
case DLM_RCOM_LOOKUP_REPLY:
lookup = 1;
reply = 1;
break;
case DLM_RCOM_LOCK: case DLM_RCOM_LOCK:
lock = 1; lock = 1;
break; break;
...@@ -504,10 +577,6 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) ...@@ -504,10 +577,6 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
lock = 1; lock = 1;
reply = 1; reply = 1;
break; break;
case DLM_RCOM_STATUS_REPLY:
case DLM_RCOM_NAMES_REPLY:
case DLM_RCOM_LOOKUP_REPLY:
reply = 1;
}; };
spin_lock(&ls->ls_recover_lock); spin_lock(&ls->ls_recover_lock);
...@@ -516,19 +585,17 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) ...@@ -516,19 +585,17 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
seq = ls->ls_recover_seq; seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock); spin_unlock(&ls->ls_recover_lock);
if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) || if (stop && (rc->rc_type != DLM_RCOM_STATUS))
(reply && (rc->rc_seq_reply != seq)) || goto ignore;
(lock && !(status & DLM_RS_DIR))) {
log_limit(ls, "dlm_receive_rcom ignore msg %d " if (reply && (rc->rc_seq_reply != seq))
"from %d %llu %llu recover seq %llu sts %x gen %u", goto ignore;
rc->rc_type,
nodeid, if (!(status & DLM_RS_NODES) && (names || lookup || lock))
(unsigned long long)rc->rc_seq, goto ignore;
(unsigned long long)rc->rc_seq_reply,
(unsigned long long)seq, if (!(status & DLM_RS_DIR) && (lookup || lock))
status, ls->ls_generation); goto ignore;
goto out;
}
switch (rc->rc_type) { switch (rc->rc_type) {
case DLM_RCOM_STATUS: case DLM_RCOM_STATUS:
...@@ -570,10 +637,20 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) ...@@ -570,10 +637,20 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
default: default:
log_error(ls, "receive_rcom bad type %d", rc->rc_type); log_error(ls, "receive_rcom bad type %d", rc->rc_type);
} }
out: return;
ignore:
log_limit(ls, "dlm_receive_rcom ignore msg %d "
"from %d %llu %llu recover seq %llu sts %x gen %u",
rc->rc_type,
nodeid,
(unsigned long long)rc->rc_seq,
(unsigned long long)rc->rc_seq_reply,
(unsigned long long)seq,
status, ls->ls_generation);
return; return;
Eshort: Eshort:
log_error(ls, "recovery message %x from %d is too short", log_error(ls, "recovery message %d from %d is too short",
rc->rc_type, nodeid); rc->rc_type, nodeid);
} }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags); int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid);
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid); void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in); int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
......
...@@ -361,9 +361,8 @@ static void set_master_lkbs(struct dlm_rsb *r) ...@@ -361,9 +361,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
* rsb's to consider. * rsb's to consider.
*/ */
static void set_new_master(struct dlm_rsb *r, int nodeid) static void set_new_master(struct dlm_rsb *r)
{ {
r->res_nodeid = nodeid;
set_master_lkbs(r); set_master_lkbs(r);
rsb_set_flag(r, RSB_NEW_MASTER); rsb_set_flag(r, RSB_NEW_MASTER);
rsb_set_flag(r, RSB_NEW_MASTER2); rsb_set_flag(r, RSB_NEW_MASTER2);
...@@ -372,31 +371,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid) ...@@ -372,31 +371,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
/* /*
* We do async lookups on rsb's that need new masters. The rsb's * We do async lookups on rsb's that need new masters. The rsb's
* waiting for a lookup reply are kept on the recover_list. * waiting for a lookup reply are kept on the recover_list.
*
* Another node recovering the master may have sent us a rcom lookup,
* and our dlm_master_lookup() set it as the new master, along with
* NEW_MASTER so that we'll recover it here (this implies dir_nodeid
* equals our_nodeid below).
*/ */
static int recover_master(struct dlm_rsb *r) static int recover_master(struct dlm_rsb *r, unsigned int *count)
{ {
struct dlm_ls *ls = r->res_ls; struct dlm_ls *ls = r->res_ls;
int error, ret_nodeid; int our_nodeid, dir_nodeid;
int our_nodeid = dlm_our_nodeid(); int is_removed = 0;
int dir_nodeid = dlm_dir_nodeid(r); int error;
if (is_master(r))
return 0;
is_removed = dlm_is_removed(ls, r->res_nodeid);
if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
return 0;
our_nodeid = dlm_our_nodeid();
dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid == our_nodeid) { if (dir_nodeid == our_nodeid) {
error = dlm_dir_lookup(ls, our_nodeid, r->res_name, if (is_removed) {
r->res_length, &ret_nodeid); r->res_master_nodeid = our_nodeid;
if (error) r->res_nodeid = 0;
log_error(ls, "recover dir lookup error %d", error); }
if (ret_nodeid == our_nodeid) /* set master of lkbs to ourself when is_removed, or to
ret_nodeid = 0; another new master which we set along with NEW_MASTER
lock_rsb(r); in dlm_master_lookup */
set_new_master(r, ret_nodeid); set_new_master(r);
unlock_rsb(r); error = 0;
} else { } else {
recover_list_add(r); recover_list_add(r);
error = dlm_send_rcom_lookup(r, dir_nodeid); error = dlm_send_rcom_lookup(r, dir_nodeid);
} }
(*count)++;
return error; return error;
} }
...@@ -415,7 +431,7 @@ static int recover_master(struct dlm_rsb *r) ...@@ -415,7 +431,7 @@ static int recover_master(struct dlm_rsb *r)
* resent. * resent.
*/ */
static int recover_master_static(struct dlm_rsb *r) static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
{ {
int dir_nodeid = dlm_dir_nodeid(r); int dir_nodeid = dlm_dir_nodeid(r);
int new_master = dir_nodeid; int new_master = dir_nodeid;
...@@ -423,11 +439,12 @@ static int recover_master_static(struct dlm_rsb *r) ...@@ -423,11 +439,12 @@ static int recover_master_static(struct dlm_rsb *r)
if (dir_nodeid == dlm_our_nodeid()) if (dir_nodeid == dlm_our_nodeid())
new_master = 0; new_master = 0;
lock_rsb(r);
dlm_purge_mstcpy_locks(r); dlm_purge_mstcpy_locks(r);
set_new_master(r, new_master); r->res_master_nodeid = dir_nodeid;
unlock_rsb(r); r->res_nodeid = new_master;
return 1; set_new_master(r);
(*count)++;
return 0;
} }
/* /*
...@@ -443,7 +460,10 @@ static int recover_master_static(struct dlm_rsb *r) ...@@ -443,7 +460,10 @@ static int recover_master_static(struct dlm_rsb *r)
int dlm_recover_masters(struct dlm_ls *ls) int dlm_recover_masters(struct dlm_ls *ls)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
int error = 0, count = 0; unsigned int total = 0;
unsigned int count = 0;
int nodir = dlm_no_directory(ls);
int error;
log_debug(ls, "dlm_recover_masters"); log_debug(ls, "dlm_recover_masters");
...@@ -455,20 +475,23 @@ int dlm_recover_masters(struct dlm_ls *ls) ...@@ -455,20 +475,23 @@ int dlm_recover_masters(struct dlm_ls *ls)
goto out; goto out;
} }
if (dlm_no_directory(ls)) lock_rsb(r);
count += recover_master_static(r); if (nodir)
else if (!is_master(r) && error = recover_master_static(r, &count);
(dlm_is_removed(ls, r->res_nodeid) || else
rsb_flag(r, RSB_NEW_MASTER))) { error = recover_master(r, &count);
recover_master(r); unlock_rsb(r);
count++; cond_resched();
} total++;
schedule(); if (error) {
up_read(&ls->ls_root_sem);
goto out;
}
} }
up_read(&ls->ls_root_sem); up_read(&ls->ls_root_sem);
log_debug(ls, "dlm_recover_masters %d resources", count); log_debug(ls, "dlm_recover_masters %u of %u", count, total);
error = dlm_wait_function(ls, &recover_list_empty); error = dlm_wait_function(ls, &recover_list_empty);
out: out:
...@@ -480,7 +503,7 @@ int dlm_recover_masters(struct dlm_ls *ls) ...@@ -480,7 +503,7 @@ int dlm_recover_masters(struct dlm_ls *ls)
int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
int nodeid; int ret_nodeid, new_master;
r = recover_list_find(ls, rc->rc_id); r = recover_list_find(ls, rc->rc_id);
if (!r) { if (!r) {
...@@ -489,12 +512,17 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -489,12 +512,17 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
goto out; goto out;
} }
nodeid = rc->rc_result; ret_nodeid = rc->rc_result;
if (nodeid == dlm_our_nodeid())
nodeid = 0; if (ret_nodeid == dlm_our_nodeid())
new_master = 0;
else
new_master = ret_nodeid;
lock_rsb(r); lock_rsb(r);
set_new_master(r, nodeid); r->res_master_nodeid = ret_nodeid;
r->res_nodeid = new_master;
set_new_master(r);
unlock_rsb(r); unlock_rsb(r);
recover_list_del(r); recover_list_del(r);
...@@ -791,20 +819,8 @@ int dlm_create_root_list(struct dlm_ls *ls) ...@@ -791,20 +819,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
dlm_hold_rsb(r); dlm_hold_rsb(r);
} }
/* If we're using a directory, add tossed rsbs to the root if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
list; they'll have entries created in the new directory, log_error(ls, "dlm_create_root_list toss not empty");
but no other recovery steps should do anything with them. */
if (dlm_no_directory(ls)) {
spin_unlock(&ls->ls_rsbtbl[i].lock);
continue;
}
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
list_add(&r->res_root_list, &ls->ls_root_list);
dlm_hold_rsb(r);
}
spin_unlock(&ls->ls_rsbtbl[i].lock); spin_unlock(&ls->ls_rsbtbl[i].lock);
} }
out: out:
...@@ -824,28 +840,26 @@ void dlm_release_root_list(struct dlm_ls *ls) ...@@ -824,28 +840,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
up_write(&ls->ls_root_sem); up_write(&ls->ls_root_sem);
} }
/* If not using a directory, clear the entire toss list, there's no benefit to void dlm_clear_toss(struct dlm_ls *ls)
caching the master value since it's fixed. If we are using a dir, keep the
rsb's we're the master of. Recovery will add them to the root list and from
there they'll be entered in the rebuilt directory. */
void dlm_clear_toss_list(struct dlm_ls *ls)
{ {
struct rb_node *n, *next; struct rb_node *n, *next;
struct dlm_rsb *rsb; struct dlm_rsb *r;
unsigned int count = 0;
int i; int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) { for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock(&ls->ls_rsbtbl[i].lock); spin_lock(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
next = rb_next(n);; next = rb_next(n);
rsb = rb_entry(n, struct dlm_rsb, res_hashnode); r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (dlm_no_directory(ls) || !is_master(rsb)) { rb_erase(n, &ls->ls_rsbtbl[i].toss);
rb_erase(n, &ls->ls_rsbtbl[i].toss); dlm_free_rsb(r);
dlm_free_rsb(rsb); count++;
}
} }
spin_unlock(&ls->ls_rsbtbl[i].lock); spin_unlock(&ls->ls_rsbtbl[i].lock);
} }
if (count)
log_debug(ls, "dlm_clear_toss %u done", count);
} }
...@@ -27,7 +27,7 @@ int dlm_recover_locks(struct dlm_ls *ls); ...@@ -27,7 +27,7 @@ int dlm_recover_locks(struct dlm_ls *ls);
void dlm_recovered_lock(struct dlm_rsb *r); void dlm_recovered_lock(struct dlm_rsb *r);
int dlm_create_root_list(struct dlm_ls *ls); int dlm_create_root_list(struct dlm_ls *ls);
void dlm_release_root_list(struct dlm_ls *ls); void dlm_release_root_list(struct dlm_ls *ls);
void dlm_clear_toss_list(struct dlm_ls *ls); void dlm_clear_toss(struct dlm_ls *ls);
void dlm_recover_rsbs(struct dlm_ls *ls); void dlm_recover_rsbs(struct dlm_ls *ls);
#endif /* __RECOVER_DOT_H__ */ #endif /* __RECOVER_DOT_H__ */
......
...@@ -60,12 +60,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -60,12 +60,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_callback_suspend(ls); dlm_callback_suspend(ls);
/* dlm_clear_toss(ls);
* Free non-master tossed rsb's. Master rsb's are kept on toss
* list and put on root list to be included in resdir recovery.
*/
dlm_clear_toss_list(ls);
/* /*
* This list of root rsb's will be the basis of most of the recovery * This list of root rsb's will be the basis of most of the recovery
...@@ -84,6 +79,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -84,6 +79,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail; goto fail;
} }
dlm_recover_dir_nodeid(ls);
ls->ls_recover_dir_sent_res = 0;
ls->ls_recover_dir_sent_msg = 0;
ls->ls_recover_locks_in = 0; ls->ls_recover_locks_in = 0;
dlm_set_recover_status(ls, DLM_RS_NODES); dlm_set_recover_status(ls, DLM_RS_NODES);
...@@ -115,6 +114,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -115,6 +114,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail; goto fail;
} }
log_debug(ls, "dlm_recover_directory %u out %u messages",
ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
/* /*
* We may have outstanding operations that are waiting for a reply from * We may have outstanding operations that are waiting for a reply from
* a failed node. Mark these to be resent after recovery. Unlock and * a failed node. Mark these to be resent after recovery. Unlock and
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment