Commit 6aaf4404 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm:
  dlm: don't limit active work items
  dlm: use workqueue for callbacks
  dlm: remove deadlock debug print
  dlm: improve rsb searches
  dlm: keep lkbs in idr
  dlm: fix kmalloc args
  dlm: don't do pointless NULL check, use kzalloc and fix order of arguments
  dlm: dump address of unknown node
  dlm: use vmalloc for hash tables
  dlm: show addresses in configfs
parents ba1f9db9 10d1459f
......@@ -14,17 +14,9 @@
#include "dlm_internal.h"
#include "lock.h"
#include "user.h"
#include "ast.h"
#define WAKE_ASTS 0
static uint64_t ast_seq_count;
static struct list_head ast_queue;
static spinlock_t ast_queue_lock;
static struct task_struct * astd_task;
static unsigned long astd_wakeflags;
static struct mutex astd_running;
static uint64_t dlm_cb_seq;
static spinlock_t dlm_cb_seq_spin;
static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
{
......@@ -57,21 +49,13 @@ static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
}
}
void dlm_del_ast(struct dlm_lkb *lkb)
{
spin_lock(&ast_queue_lock);
if (!list_empty(&lkb->lkb_astqueue))
list_del_init(&lkb->lkb_astqueue);
spin_unlock(&ast_queue_lock);
}
int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags, uint64_t seq)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
uint64_t prev_seq;
int prev_mode;
int i;
int i, rv;
for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
if (lkb->lkb_callbacks[i].seq)
......@@ -100,7 +84,8 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
mode,
(unsigned long long)prev_seq,
prev_mode);
return 0;
rv = 0;
goto out;
}
}
......@@ -109,6 +94,7 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
lkb->lkb_callbacks[i].mode = mode;
lkb->lkb_callbacks[i].sb_status = status;
lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
rv = 0;
break;
}
......@@ -117,21 +103,24 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
lkb->lkb_id, (unsigned long long)seq,
flags, mode, status, sbflags);
dlm_dump_lkb_callbacks(lkb);
return -1;
rv = -1;
goto out;
}
return 0;
out:
return rv;
}
int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_callback *cb, int *resid)
{
int i;
int i, rv;
*resid = 0;
if (!lkb->lkb_callbacks[0].seq)
return -ENOENT;
if (!lkb->lkb_callbacks[0].seq) {
rv = -ENOENT;
goto out;
}
/* oldest undelivered cb is callbacks[0] */
......@@ -163,7 +152,8 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
cb->mode,
(unsigned long long)lkb->lkb_last_cast.seq,
lkb->lkb_last_cast.mode);
return 0;
rv = 0;
goto out;
}
}
......@@ -176,84 +166,85 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
lkb->lkb_last_bast_time = ktime_get();
}
return 0;
rv = 0;
out:
return rv;
}
void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
uint32_t sbflags)
{
uint64_t seq;
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
uint64_t new_seq, prev_seq;
int rv;
spin_lock(&ast_queue_lock);
seq = ++ast_seq_count;
spin_lock(&dlm_cb_seq_spin);
new_seq = ++dlm_cb_seq;
spin_unlock(&dlm_cb_seq_spin);
if (lkb->lkb_flags & DLM_IFL_USER) {
spin_unlock(&ast_queue_lock);
dlm_user_add_ast(lkb, flags, mode, status, sbflags, seq);
dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
return;
}
rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq);
if (rv < 0) {
spin_unlock(&ast_queue_lock);
return;
}
mutex_lock(&lkb->lkb_cb_mutex);
prev_seq = lkb->lkb_callbacks[0].seq;
if (list_empty(&lkb->lkb_astqueue)) {
rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
if (rv < 0)
goto out;
if (!prev_seq) {
kref_get(&lkb->lkb_ref);
list_add_tail(&lkb->lkb_astqueue, &ast_queue);
}
spin_unlock(&ast_queue_lock);
set_bit(WAKE_ASTS, &astd_wakeflags);
wake_up_process(astd_task);
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
mutex_lock(&ls->ls_cb_mutex);
list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
mutex_unlock(&ls->ls_cb_mutex);
} else {
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
}
}
out:
mutex_unlock(&lkb->lkb_cb_mutex);
}
static void process_asts(void)
void dlm_callback_work(struct work_struct *work)
{
struct dlm_ls *ls = NULL;
struct dlm_rsb *r = NULL;
struct dlm_lkb *lkb;
struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
void (*castfn) (void *astparam);
void (*bastfn) (void *astparam, int mode);
struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
int i, rv, resid;
repeat:
spin_lock(&ast_queue_lock);
list_for_each_entry(lkb, &ast_queue, lkb_astqueue) {
r = lkb->lkb_resource;
ls = r->res_ls;
if (dlm_locking_stopped(ls))
continue;
/* we remove from astqueue list and remove everything in
lkb_callbacks before releasing the spinlock so empty
lkb_astqueue is always consistent with empty lkb_callbacks */
list_del_init(&lkb->lkb_astqueue);
castfn = lkb->lkb_astfn;
bastfn = lkb->lkb_bastfn;
memset(&callbacks, 0, sizeof(callbacks));
mutex_lock(&lkb->lkb_cb_mutex);
if (!lkb->lkb_callbacks[0].seq) {
/* no callback work exists, shouldn't happen */
log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
dlm_print_lkb(lkb);
dlm_dump_lkb_callbacks(lkb);
}
for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
if (rv < 0)
break;
}
spin_unlock(&ast_queue_lock);
if (resid) {
/* shouldn't happen, for loop should have removed all */
log_error(ls, "callback resid %d lkb %x",
resid, lkb->lkb_id);
/* cbs remain, loop should have removed all, shouldn't happen */
log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id,
resid);
dlm_print_lkb(lkb);
dlm_dump_lkb_callbacks(lkb);
}
mutex_unlock(&lkb->lkb_cb_mutex);
castfn = lkb->lkb_astfn;
bastfn = lkb->lkb_bastfn;
for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
if (!callbacks[i].seq)
......@@ -269,78 +260,56 @@ static void process_asts(void)
}
}
/* removes ref for ast_queue, may cause lkb to be freed */
/* undo kref_get from dlm_add_callback, may cause lkb to be freed */
dlm_put_lkb(lkb);
cond_resched();
goto repeat;
}
spin_unlock(&ast_queue_lock);
}
static inline int no_asts(void)
int dlm_callback_start(struct dlm_ls *ls)
{
int ret;
spin_lock(&ast_queue_lock);
ret = list_empty(&ast_queue);
spin_unlock(&ast_queue_lock);
return ret;
}
static int dlm_astd(void *data)
{
while (!kthread_should_stop()) {
set_current_state(TASK_INTERRUPTIBLE);
if (!test_bit(WAKE_ASTS, &astd_wakeflags))
schedule();
set_current_state(TASK_RUNNING);
mutex_lock(&astd_running);
if (test_and_clear_bit(WAKE_ASTS, &astd_wakeflags))
process_asts();
mutex_unlock(&astd_running);
ls->ls_callback_wq = alloc_workqueue("dlm_callback",
WQ_UNBOUND |
WQ_MEM_RECLAIM |
WQ_NON_REENTRANT,
0);
if (!ls->ls_callback_wq) {
log_print("can't start dlm_callback workqueue");
return -ENOMEM;
}
return 0;
}
void dlm_astd_wake(void)
void dlm_callback_stop(struct dlm_ls *ls)
{
if (!no_asts()) {
set_bit(WAKE_ASTS, &astd_wakeflags);
wake_up_process(astd_task);
}
if (ls->ls_callback_wq)
destroy_workqueue(ls->ls_callback_wq);
}
int dlm_astd_start(void)
void dlm_callback_suspend(struct dlm_ls *ls)
{
struct task_struct *p;
int error = 0;
INIT_LIST_HEAD(&ast_queue);
spin_lock_init(&ast_queue_lock);
mutex_init(&astd_running);
p = kthread_run(dlm_astd, NULL, "dlm_astd");
if (IS_ERR(p))
error = PTR_ERR(p);
else
astd_task = p;
return error;
}
set_bit(LSFL_CB_DELAY, &ls->ls_flags);
void dlm_astd_stop(void)
{
kthread_stop(astd_task);
if (ls->ls_callback_wq)
flush_workqueue(ls->ls_callback_wq);
}
void dlm_astd_suspend(void)
void dlm_callback_resume(struct dlm_ls *ls)
{
mutex_lock(&astd_running);
}
struct dlm_lkb *lkb, *safe;
int count = 0;
void dlm_astd_resume(void)
{
mutex_unlock(&astd_running);
clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
if (!ls->ls_callback_wq)
return;
mutex_lock(&ls->ls_cb_mutex);
list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
list_del_init(&lkb->lkb_cb_list);
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
count++;
}
mutex_unlock(&ls->ls_cb_mutex);
log_debug(ls, "dlm_callback_resume %d", count);
}
......@@ -18,14 +18,15 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags, uint64_t seq);
int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_callback *cb, int *resid);
void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
uint32_t sbflags);
void dlm_astd_wake(void);
int dlm_astd_start(void);
void dlm_astd_stop(void);
void dlm_astd_suspend(void);
void dlm_astd_resume(void);
void dlm_callback_work(struct work_struct *work);
int dlm_callback_start(struct dlm_ls *ls);
void dlm_callback_stop(struct dlm_ls *ls);
void dlm_callback_suspend(struct dlm_ls *ls);
void dlm_callback_resume(struct dlm_ls *ls);
#endif
......@@ -28,7 +28,8 @@
* /config/dlm/<cluster>/spaces/<space>/nodes/<node>/weight
* /config/dlm/<cluster>/comms/<comm>/nodeid
* /config/dlm/<cluster>/comms/<comm>/local
* /config/dlm/<cluster>/comms/<comm>/addr
* /config/dlm/<cluster>/comms/<comm>/addr (write only)
* /config/dlm/<cluster>/comms/<comm>/addr_list (read only)
* The <cluster> level is useless, but I haven't figured out how to avoid it.
*/
......@@ -80,6 +81,7 @@ static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf,
size_t len);
static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf,
size_t len);
static ssize_t comm_addr_list_read(struct dlm_comm *cm, char *buf);
static ssize_t node_nodeid_read(struct dlm_node *nd, char *buf);
static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf,
size_t len);
......@@ -92,7 +94,6 @@ struct dlm_cluster {
unsigned int cl_tcp_port;
unsigned int cl_buffer_size;
unsigned int cl_rsbtbl_size;
unsigned int cl_lkbtbl_size;
unsigned int cl_dirtbl_size;
unsigned int cl_recover_timer;
unsigned int cl_toss_secs;
......@@ -101,13 +102,13 @@ struct dlm_cluster {
unsigned int cl_protocol;
unsigned int cl_timewarn_cs;
unsigned int cl_waitwarn_us;
unsigned int cl_new_rsb_count;
};
enum {
CLUSTER_ATTR_TCP_PORT = 0,
CLUSTER_ATTR_BUFFER_SIZE,
CLUSTER_ATTR_RSBTBL_SIZE,
CLUSTER_ATTR_LKBTBL_SIZE,
CLUSTER_ATTR_DIRTBL_SIZE,
CLUSTER_ATTR_RECOVER_TIMER,
CLUSTER_ATTR_TOSS_SECS,
......@@ -116,6 +117,7 @@ enum {
CLUSTER_ATTR_PROTOCOL,
CLUSTER_ATTR_TIMEWARN_CS,
CLUSTER_ATTR_WAITWARN_US,
CLUSTER_ATTR_NEW_RSB_COUNT,
};
struct cluster_attribute {
......@@ -160,7 +162,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write)
CLUSTER_ATTR(tcp_port, 1);
CLUSTER_ATTR(buffer_size, 1);
CLUSTER_ATTR(rsbtbl_size, 1);
CLUSTER_ATTR(lkbtbl_size, 1);
CLUSTER_ATTR(dirtbl_size, 1);
CLUSTER_ATTR(recover_timer, 1);
CLUSTER_ATTR(toss_secs, 1);
......@@ -169,12 +170,12 @@ CLUSTER_ATTR(log_debug, 0);
CLUSTER_ATTR(protocol, 0);
CLUSTER_ATTR(timewarn_cs, 1);
CLUSTER_ATTR(waitwarn_us, 0);
CLUSTER_ATTR(new_rsb_count, 0);
static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
[CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr,
[CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr,
[CLUSTER_ATTR_LKBTBL_SIZE] = &cluster_attr_lkbtbl_size.attr,
[CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr,
[CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr,
[CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
......@@ -183,6 +184,7 @@ static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
[CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
[CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr,
[CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count.attr,
NULL,
};
......@@ -190,6 +192,7 @@ enum {
COMM_ATTR_NODEID = 0,
COMM_ATTR_LOCAL,
COMM_ATTR_ADDR,
COMM_ATTR_ADDR_LIST,
};
struct comm_attribute {
......@@ -217,14 +220,22 @@ static struct comm_attribute comm_attr_local = {
static struct comm_attribute comm_attr_addr = {
.attr = { .ca_owner = THIS_MODULE,
.ca_name = "addr",
.ca_mode = S_IRUGO | S_IWUSR },
.ca_mode = S_IWUSR },
.store = comm_addr_write,
};
static struct comm_attribute comm_attr_addr_list = {
.attr = { .ca_owner = THIS_MODULE,
.ca_name = "addr_list",
.ca_mode = S_IRUGO },
.show = comm_addr_list_read,
};
static struct configfs_attribute *comm_attrs[] = {
[COMM_ATTR_NODEID] = &comm_attr_nodeid.attr,
[COMM_ATTR_LOCAL] = &comm_attr_local.attr,
[COMM_ATTR_ADDR] = &comm_attr_addr.attr,
[COMM_ATTR_ADDR_LIST] = &comm_attr_addr_list.attr,
NULL,
};
......@@ -435,7 +446,6 @@ static struct config_group *make_cluster(struct config_group *g,
cl->cl_tcp_port = dlm_config.ci_tcp_port;
cl->cl_buffer_size = dlm_config.ci_buffer_size;
cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size;
cl->cl_lkbtbl_size = dlm_config.ci_lkbtbl_size;
cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size;
cl->cl_recover_timer = dlm_config.ci_recover_timer;
cl->cl_toss_secs = dlm_config.ci_toss_secs;
......@@ -444,6 +454,7 @@ static struct config_group *make_cluster(struct config_group *g,
cl->cl_protocol = dlm_config.ci_protocol;
cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us;
cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count;
space_list = &sps->ss_group;
comm_list = &cms->cs_group;
......@@ -720,6 +731,50 @@ static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len)
return len;
}
static ssize_t comm_addr_list_read(struct dlm_comm *cm, char *buf)
{
ssize_t s;
ssize_t allowance;
int i;
struct sockaddr_storage *addr;
struct sockaddr_in *addr_in;
struct sockaddr_in6 *addr_in6;
/* Taken from ip6_addr_string() defined in lib/vsprintf.c */
char buf0[sizeof("AF_INET6 xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:255.255.255.255\n")];
/* Derived from SIMPLE_ATTR_SIZE of fs/configfs/file.c */
allowance = 4096;
buf[0] = '\0';
for (i = 0; i < cm->addr_count; i++) {
addr = cm->addr[i];
switch(addr->ss_family) {
case AF_INET:
addr_in = (struct sockaddr_in *)addr;
s = sprintf(buf0, "AF_INET %pI4\n", &addr_in->sin_addr.s_addr);
break;
case AF_INET6:
addr_in6 = (struct sockaddr_in6 *)addr;
s = sprintf(buf0, "AF_INET6 %pI6\n", &addr_in6->sin6_addr);
break;
default:
s = sprintf(buf0, "%s\n", "<UNKNOWN>");
break;
}
allowance -= s;
if (allowance >= 0)
strcat(buf, buf0);
else {
allowance += s;
break;
}
}
return 4096 - allowance;
}
static ssize_t show_node(struct config_item *i, struct configfs_attribute *a,
char *buf)
{
......@@ -983,7 +1038,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_TCP_PORT 21064
#define DEFAULT_BUFFER_SIZE 4096
#define DEFAULT_RSBTBL_SIZE 1024
#define DEFAULT_LKBTBL_SIZE 1024
#define DEFAULT_DIRTBL_SIZE 1024
#define DEFAULT_RECOVER_TIMER 5
#define DEFAULT_TOSS_SECS 10
......@@ -992,12 +1046,12 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_PROTOCOL 0
#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */
#define DEFAULT_WAITWARN_US 0
#define DEFAULT_NEW_RSB_COUNT 128
struct dlm_config_info dlm_config = {
.ci_tcp_port = DEFAULT_TCP_PORT,
.ci_buffer_size = DEFAULT_BUFFER_SIZE,
.ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
.ci_lkbtbl_size = DEFAULT_LKBTBL_SIZE,
.ci_dirtbl_size = DEFAULT_DIRTBL_SIZE,
.ci_recover_timer = DEFAULT_RECOVER_TIMER,
.ci_toss_secs = DEFAULT_TOSS_SECS,
......@@ -1005,6 +1059,7 @@ struct dlm_config_info dlm_config = {
.ci_log_debug = DEFAULT_LOG_DEBUG,
.ci_protocol = DEFAULT_PROTOCOL,
.ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
.ci_waitwarn_us = DEFAULT_WAITWARN_US
.ci_waitwarn_us = DEFAULT_WAITWARN_US,
.ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT
};
......@@ -20,7 +20,6 @@ struct dlm_config_info {
int ci_tcp_port;
int ci_buffer_size;
int ci_rsbtbl_size;
int ci_lkbtbl_size;
int ci_dirtbl_size;
int ci_recover_timer;
int ci_toss_secs;
......@@ -29,6 +28,7 @@ struct dlm_config_info {
int ci_protocol;
int ci_timewarn_cs;
int ci_waitwarn_us;
int ci_new_rsb_count;
};
extern struct dlm_config_info dlm_config;
......
......@@ -37,6 +37,7 @@
#include <linux/jhash.h>
#include <linux/miscdevice.h>
#include <linux/mutex.h>
#include <linux/idr.h>
#include <asm/uaccess.h>
#include <linux/dlm.h>
......@@ -52,7 +53,6 @@ struct dlm_ls;
struct dlm_lkb;
struct dlm_rsb;
struct dlm_member;
struct dlm_lkbtable;
struct dlm_rsbtable;
struct dlm_dirtable;
struct dlm_direntry;
......@@ -108,11 +108,6 @@ struct dlm_rsbtable {
spinlock_t lock;
};
struct dlm_lkbtable {
struct list_head list;
rwlock_t lock;
uint16_t counter;
};
/*
* Lockspace member (per node in a ls)
......@@ -248,17 +243,18 @@ struct dlm_lkb {
int8_t lkb_wait_count;
int lkb_wait_nodeid; /* for debugging */
struct list_head lkb_idtbl_list; /* lockspace lkbtbl */
struct list_head lkb_statequeue; /* rsb g/c/w list */
struct list_head lkb_rsb_lookup; /* waiting for rsb lookup */
struct list_head lkb_wait_reply; /* waiting for remote reply */
struct list_head lkb_astqueue; /* need ast to be sent */
struct list_head lkb_ownqueue; /* list of locks for a process */
struct list_head lkb_time_list;
ktime_t lkb_timestamp;
ktime_t lkb_wait_time;
unsigned long lkb_timeout_cs;
struct mutex lkb_cb_mutex;
struct work_struct lkb_cb_work;
struct list_head lkb_cb_list; /* for ls_cb_delay or proc->asts */
struct dlm_callback lkb_callbacks[DLM_CALLBACKS_SIZE];
struct dlm_callback lkb_last_cast;
struct dlm_callback lkb_last_bast;
......@@ -299,7 +295,7 @@ struct dlm_rsb {
int res_recover_locks_count;
char *res_lvbptr;
char res_name[1];
char res_name[DLM_RESNAME_MAXLEN+1];
};
/* find_rsb() flags */
......@@ -465,12 +461,12 @@ struct dlm_ls {
unsigned long ls_scan_time;
struct kobject ls_kobj;
struct idr ls_lkbidr;
spinlock_t ls_lkbidr_spin;
struct dlm_rsbtable *ls_rsbtbl;
uint32_t ls_rsbtbl_size;
struct dlm_lkbtable *ls_lkbtbl;
uint32_t ls_lkbtbl_size;
struct dlm_dirtable *ls_dirtbl;
uint32_t ls_dirtbl_size;
......@@ -483,6 +479,10 @@ struct dlm_ls {
struct mutex ls_timeout_mutex;
struct list_head ls_timeout;
spinlock_t ls_new_rsb_spin;
int ls_new_rsb_count;
struct list_head ls_new_rsb; /* new rsb structs */
struct list_head ls_nodes; /* current nodes in ls */
struct list_head ls_nodes_gone; /* dead node list, recovery */
int ls_num_nodes; /* number of nodes in ls */
......@@ -506,8 +506,12 @@ struct dlm_ls {
struct miscdevice ls_device;
struct workqueue_struct *ls_callback_wq;
/* recovery related */
struct mutex ls_cb_mutex;
struct list_head ls_cb_delay; /* save for queue_work later */
struct timer_list ls_timer;
struct task_struct *ls_recoverd_task;
struct mutex ls_recoverd_active;
......@@ -544,6 +548,7 @@ struct dlm_ls {
#define LSFL_RCOM_WAIT 4
#define LSFL_UEVENT_WAIT 5
#define LSFL_TIMEWARN 6
#define LSFL_CB_DELAY 7
/* much of this is just saving user space pointers associated with the
lock that we pass back to the user lib with an ast */
......
......@@ -305,7 +305,7 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
rv = -EDEADLK;
}
dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
}
static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
......@@ -319,7 +319,7 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
if (is_master_copy(lkb)) {
send_bast(r, lkb, rqmode);
} else {
dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
}
}
......@@ -327,19 +327,68 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
* Basic operations on rsb's and lkb's
*/
static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
static int pre_rsb_struct(struct dlm_ls *ls)
{
struct dlm_rsb *r1, *r2;
int count = 0;
spin_lock(&ls->ls_new_rsb_spin);
if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
spin_unlock(&ls->ls_new_rsb_spin);
return 0;
}
spin_unlock(&ls->ls_new_rsb_spin);
r1 = dlm_allocate_rsb(ls);
r2 = dlm_allocate_rsb(ls);
spin_lock(&ls->ls_new_rsb_spin);
if (r1) {
list_add(&r1->res_hashchain, &ls->ls_new_rsb);
ls->ls_new_rsb_count++;
}
if (r2) {
list_add(&r2->res_hashchain, &ls->ls_new_rsb);
ls->ls_new_rsb_count++;
}
count = ls->ls_new_rsb_count;
spin_unlock(&ls->ls_new_rsb_spin);
if (!count)
return -ENOMEM;
return 0;
}
/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
unlock any spinlocks, go back and call pre_rsb_struct again.
Otherwise, take an rsb off the list and return it. */
static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
struct dlm_rsb **r_ret)
{
struct dlm_rsb *r;
int count;
r = dlm_allocate_rsb(ls, len);
if (!r)
return NULL;
spin_lock(&ls->ls_new_rsb_spin);
if (list_empty(&ls->ls_new_rsb)) {
count = ls->ls_new_rsb_count;
spin_unlock(&ls->ls_new_rsb_spin);
log_debug(ls, "find_rsb retry %d %d %s",
count, dlm_config.ci_new_rsb_count, name);
return -EAGAIN;
}
r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
list_del(&r->res_hashchain);
ls->ls_new_rsb_count--;
spin_unlock(&ls->ls_new_rsb_spin);
r->res_ls = ls;
r->res_length = len;
memcpy(r->res_name, name, len);
mutex_init(&r->res_mutex);
INIT_LIST_HEAD(&r->res_hashchain);
INIT_LIST_HEAD(&r->res_lookup);
INIT_LIST_HEAD(&r->res_grantqueue);
INIT_LIST_HEAD(&r->res_convertqueue);
......@@ -347,7 +396,8 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
INIT_LIST_HEAD(&r->res_root_list);
INIT_LIST_HEAD(&r->res_recover_list);
return r;
*r_ret = r;
return 0;
}
static int search_rsb_list(struct list_head *head, char *name, int len,
......@@ -405,16 +455,6 @@ static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
return error;
}
static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
unsigned int flags, struct dlm_rsb **r_ret)
{
int error;
spin_lock(&ls->ls_rsbtbl[b].lock);
error = _search_rsb(ls, name, len, b, flags, r_ret);
spin_unlock(&ls->ls_rsbtbl[b].lock);
return error;
}
/*
* Find rsb in rsbtbl and potentially create/add one
*
......@@ -432,35 +472,48 @@ static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
unsigned int flags, struct dlm_rsb **r_ret)
{
struct dlm_rsb *r = NULL, *tmp;
struct dlm_rsb *r = NULL;
uint32_t hash, bucket;
int error = -EINVAL;
int error;
if (namelen > DLM_RESNAME_MAXLEN)
if (namelen > DLM_RESNAME_MAXLEN) {
error = -EINVAL;
goto out;
}
if (dlm_no_directory(ls))
flags |= R_CREATE;
error = 0;
hash = jhash(name, namelen, 0);
bucket = hash & (ls->ls_rsbtbl_size - 1);
error = search_rsb(ls, name, namelen, bucket, flags, &r);
if (!error)
retry:
if (flags & R_CREATE) {
error = pre_rsb_struct(ls);
if (error < 0)
goto out;
}
spin_lock(&ls->ls_rsbtbl[bucket].lock);
error = _search_rsb(ls, name, namelen, bucket, flags, &r);
if (!error)
goto out_unlock;
if (error == -EBADR && !(flags & R_CREATE))
goto out;
goto out_unlock;
/* the rsb was found but wasn't a master copy */
if (error == -ENOTBLK)
goto out;
goto out_unlock;
error = -ENOMEM;
r = create_rsb(ls, name, namelen);
if (!r)
goto out;
error = get_rsb_struct(ls, name, namelen, &r);
if (error == -EAGAIN) {
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
goto retry;
}
if (error)
goto out_unlock;
r->res_hash = hash;
r->res_bucket = bucket;
......@@ -474,18 +527,10 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
nodeid = 0;
r->res_nodeid = nodeid;
}
spin_lock(&ls->ls_rsbtbl[bucket].lock);
error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
if (!error) {
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
dlm_free_rsb(r);
r = tmp;
goto out;
}
list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
error = 0;
out_unlock:
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
out:
*r_ret = r;
return error;
......@@ -580,9 +625,8 @@ static void detach_lkb(struct dlm_lkb *lkb)
static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb, *tmp;
uint32_t lkid = 0;
uint16_t bucket;
struct dlm_lkb *lkb;
int rv, id;
lkb = dlm_allocate_lkb(ls);
if (!lkb)
......@@ -594,60 +638,42 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
INIT_LIST_HEAD(&lkb->lkb_ownqueue);
INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
INIT_LIST_HEAD(&lkb->lkb_time_list);
INIT_LIST_HEAD(&lkb->lkb_astqueue);
get_random_bytes(&bucket, sizeof(bucket));
bucket &= (ls->ls_lkbtbl_size - 1);
INIT_LIST_HEAD(&lkb->lkb_cb_list);
mutex_init(&lkb->lkb_cb_mutex);
INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
write_lock(&ls->ls_lkbtbl[bucket].lock);
retry:
rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
if (!rv)
return -ENOMEM;
/* counter can roll over so we must verify lkid is not in use */
spin_lock(&ls->ls_lkbidr_spin);
rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
if (!rv)
lkb->lkb_id = id;
spin_unlock(&ls->ls_lkbidr_spin);
while (lkid == 0) {
lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
if (rv == -EAGAIN)
goto retry;
list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
lkb_idtbl_list) {
if (tmp->lkb_id != lkid)
continue;
lkid = 0;
break;
}
if (rv < 0) {
log_error(ls, "create_lkb idr error %d", rv);
return rv;
}
lkb->lkb_id = lkid;
list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
write_unlock(&ls->ls_lkbtbl[bucket].lock);
*lkb_ret = lkb;
return 0;
}
static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
{
struct dlm_lkb *lkb;
uint16_t bucket = (lkid >> 16);
list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
if (lkb->lkb_id == lkid)
return lkb;
}
return NULL;
}
static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb;
uint16_t bucket = (lkid >> 16);
if (bucket >= ls->ls_lkbtbl_size)
return -EBADSLT;
read_lock(&ls->ls_lkbtbl[bucket].lock);
lkb = __find_lkb(ls, lkid);
spin_lock(&ls->ls_lkbidr_spin);
lkb = idr_find(&ls->ls_lkbidr, lkid);
if (lkb)
kref_get(&lkb->lkb_ref);
read_unlock(&ls->ls_lkbtbl[bucket].lock);
spin_unlock(&ls->ls_lkbidr_spin);
*lkb_ret = lkb;
return lkb ? 0 : -ENOENT;
......@@ -668,12 +694,12 @@ static void kill_lkb(struct kref *kref)
static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
{
uint16_t bucket = (lkb->lkb_id >> 16);
uint32_t lkid = lkb->lkb_id;
write_lock(&ls->ls_lkbtbl[bucket].lock);
spin_lock(&ls->ls_lkbidr_spin);
if (kref_put(&lkb->lkb_ref, kill_lkb)) {
list_del(&lkb->lkb_idtbl_list);
write_unlock(&ls->ls_lkbtbl[bucket].lock);
idr_remove(&ls->ls_lkbidr, lkid);
spin_unlock(&ls->ls_lkbidr_spin);
detach_lkb(lkb);
......@@ -683,7 +709,7 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
dlm_free_lkb(lkb);
return 1;
} else {
write_unlock(&ls->ls_lkbtbl[bucket].lock);
spin_unlock(&ls->ls_lkbidr_spin);
return 0;
}
}
......@@ -849,9 +875,7 @@ void dlm_scan_waiters(struct dlm_ls *ls)
if (!num_nodes) {
num_nodes = ls->ls_num_nodes;
warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
if (warned)
memset(warned, 0, num_nodes * sizeof(int));
warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
}
if (!warned)
continue;
......@@ -863,8 +887,6 @@ void dlm_scan_waiters(struct dlm_ls *ls)
dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
}
mutex_unlock(&ls->ls_waiters_mutex);
if (warned)
kfree(warned);
if (debug_expired)
......@@ -2401,9 +2423,6 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (deadlk) {
/* it's left on the granted queue */
log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
revert_lock(r, lkb);
queue_cast(r, lkb, -EDEADLK);
error = -EDEADLK;
......@@ -3993,8 +4012,6 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
default:
log_error(ls, "unknown message type %d", ms->m_type);
}
dlm_astd_wake();
}
/* If the lockspace is in recovery mode (locking stopped), then normal
......@@ -4133,7 +4150,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
struct dlm_message *ms_stub;
int wait_type, stub_unlock_result, stub_cancel_result;
ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
if (!ms_stub) {
log_error(ls, "dlm_recover_waiters_pre no mem");
return;
......@@ -4809,7 +4826,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
goto out_put;
spin_lock(&ua->proc->locks_spin);
/* dlm_user_add_ast() may have already taken lkb off the proc list */
/* dlm_user_add_cb() may have already taken lkb off the proc list */
if (!list_empty(&lkb->lkb_ownqueue))
list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
spin_unlock(&ua->proc->locks_spin);
......@@ -4946,7 +4963,7 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
(which does lock_rsb) due to deadlock with receiving a message that does
lock_rsb followed by dlm_user_add_ast() */
lock_rsb followed by dlm_user_add_cb() */
static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
struct dlm_user_proc *proc)
......@@ -4969,7 +4986,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
return lkb;
}
/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
which we clear here. */
......@@ -5011,10 +5028,10 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb);
}
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
memset(&lkb->lkb_callbacks, 0,
sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
list_del_init(&lkb->lkb_astqueue);
list_del_init(&lkb->lkb_cb_list);
dlm_put_lkb(lkb);
}
......@@ -5053,10 +5070,10 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
spin_unlock(&proc->locks_spin);
spin_lock(&proc->asts_spin);
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
memset(&lkb->lkb_callbacks, 0,
sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
list_del_init(&lkb->lkb_astqueue);
list_del_init(&lkb->lkb_cb_list);
dlm_put_lkb(lkb);
}
spin_unlock(&proc->asts_spin);
......
......@@ -15,7 +15,6 @@
#include "lockspace.h"
#include "member.h"
#include "recoverd.h"
#include "ast.h"
#include "dir.h"
#include "lowcomms.h"
#include "config.h"
......@@ -24,6 +23,7 @@
#include "recover.h"
#include "requestqueue.h"
#include "user.h"
#include "ast.h"
static int ls_count;
static struct mutex ls_lock;
......@@ -359,17 +359,10 @@ static int threads_start(void)
{
int error;
/* Thread which process lock requests for all lockspace's */
error = dlm_astd_start();
if (error) {
log_print("cannot start dlm_astd thread %d", error);
goto fail;
}
error = dlm_scand_start();
if (error) {
log_print("cannot start dlm_scand thread %d", error);
goto astd_fail;
goto fail;
}
/* Thread for sending/receiving messages for all lockspace's */
......@@ -383,8 +376,6 @@ static int threads_start(void)
scand_fail:
dlm_scand_stop();
astd_fail:
dlm_astd_stop();
fail:
return error;
}
......@@ -393,7 +384,6 @@ static void threads_stop(void)
{
dlm_scand_stop();
dlm_lowcomms_stop();
dlm_astd_stop();
}
static int new_lockspace(const char *name, int namelen, void **lockspace,
......@@ -463,7 +453,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
size = dlm_config.ci_rsbtbl_size;
ls->ls_rsbtbl_size = size;
ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_NOFS);
ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
if (!ls->ls_rsbtbl)
goto out_lsfree;
for (i = 0; i < size; i++) {
......@@ -472,22 +462,13 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
spin_lock_init(&ls->ls_rsbtbl[i].lock);
}
size = dlm_config.ci_lkbtbl_size;
ls->ls_lkbtbl_size = size;
ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_NOFS);
if (!ls->ls_lkbtbl)
goto out_rsbfree;
for (i = 0; i < size; i++) {
INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
rwlock_init(&ls->ls_lkbtbl[i].lock);
ls->ls_lkbtbl[i].counter = 1;
}
idr_init(&ls->ls_lkbidr);
spin_lock_init(&ls->ls_lkbidr_spin);
size = dlm_config.ci_dirtbl_size;
ls->ls_dirtbl_size = size;
ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_NOFS);
ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
if (!ls->ls_dirtbl)
goto out_lkbfree;
for (i = 0; i < size; i++) {
......@@ -502,6 +483,9 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
INIT_LIST_HEAD(&ls->ls_timeout);
mutex_init(&ls->ls_timeout_mutex);
INIT_LIST_HEAD(&ls->ls_new_rsb);
spin_lock_init(&ls->ls_new_rsb_spin);
INIT_LIST_HEAD(&ls->ls_nodes);
INIT_LIST_HEAD(&ls->ls_nodes_gone);
ls->ls_num_nodes = 0;
......@@ -520,6 +504,9 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
init_completion(&ls->ls_members_done);
ls->ls_members_result = -1;
mutex_init(&ls->ls_cb_mutex);
INIT_LIST_HEAD(&ls->ls_cb_delay);
ls->ls_recoverd_task = NULL;
mutex_init(&ls->ls_recoverd_active);
spin_lock_init(&ls->ls_recover_lock);
......@@ -553,18 +540,26 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
list_add(&ls->ls_list, &lslist);
spin_unlock(&lslist_lock);
if (flags & DLM_LSFL_FS) {
error = dlm_callback_start(ls);
if (error) {
log_error(ls, "can't start dlm_callback %d", error);
goto out_delist;
}
}
/* needs to find ls in lslist */
error = dlm_recoverd_start(ls);
if (error) {
log_error(ls, "can't start dlm_recoverd %d", error);
goto out_delist;
goto out_callback;
}
ls->ls_kobj.kset = dlm_kset;
error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
"%s", ls->ls_name);
if (error)
goto out_stop;
goto out_recoverd;
kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
/* let kobject handle freeing of ls if there's an error */
......@@ -578,7 +573,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
error = do_uevent(ls, 1);
if (error)
goto out_stop;
goto out_recoverd;
wait_for_completion(&ls->ls_members_done);
error = ls->ls_members_result;
......@@ -595,19 +590,20 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
do_uevent(ls, 0);
dlm_clear_members(ls);
kfree(ls->ls_node_array);
out_stop:
out_recoverd:
dlm_recoverd_stop(ls);
out_callback:
dlm_callback_stop(ls);
out_delist:
spin_lock(&lslist_lock);
list_del(&ls->ls_list);
spin_unlock(&lslist_lock);
kfree(ls->ls_recover_buf);
out_dirfree:
kfree(ls->ls_dirtbl);
vfree(ls->ls_dirtbl);
out_lkbfree:
kfree(ls->ls_lkbtbl);
out_rsbfree:
kfree(ls->ls_rsbtbl);
idr_destroy(&ls->ls_lkbidr);
vfree(ls->ls_rsbtbl);
out_lsfree:
if (do_unreg)
kobject_put(&ls->ls_kobj);
......@@ -641,50 +637,64 @@ int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
return error;
}
/* Return 1 if the lockspace still has active remote locks,
* 2 if the lockspace still has active local locks.
*/
static int lockspace_busy(struct dlm_ls *ls)
static int lkb_idr_is_local(int id, void *p, void *data)
{
int i, lkb_found = 0;
struct dlm_lkb *lkb;
struct dlm_lkb *lkb = p;
/* NOTE: We check the lockidtbl here rather than the resource table.
This is because there may be LKBs queued as ASTs that have been
unlinked from their RSBs and are pending deletion once the AST has
been delivered */
if (!lkb->lkb_nodeid)
return 1;
return 0;
}
for (i = 0; i < ls->ls_lkbtbl_size; i++) {
read_lock(&ls->ls_lkbtbl[i].lock);
if (!list_empty(&ls->ls_lkbtbl[i].list)) {
lkb_found = 1;
list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
lkb_idtbl_list) {
if (!lkb->lkb_nodeid) {
read_unlock(&ls->ls_lkbtbl[i].lock);
return 2;
}
}
}
read_unlock(&ls->ls_lkbtbl[i].lock);
static int lkb_idr_is_any(int id, void *p, void *data)
{
return 1;
}
static int lkb_idr_free(int id, void *p, void *data)
{
struct dlm_lkb *lkb = p;
if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
dlm_free_lvb(lkb->lkb_lvbptr);
dlm_free_lkb(lkb);
return 0;
}
/* NOTE: We check the lkbidr here rather than the resource table.
This is because there may be LKBs queued as ASTs that have been unlinked
from their RSBs and are pending deletion once the AST has been delivered */
static int lockspace_busy(struct dlm_ls *ls, int force)
{
int rv;
spin_lock(&ls->ls_lkbidr_spin);
if (force == 0) {
rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
} else if (force == 1) {
rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
} else {
rv = 0;
}
return lkb_found;
spin_unlock(&ls->ls_lkbidr_spin);
return rv;
}
static int release_lockspace(struct dlm_ls *ls, int force)
{
struct dlm_lkb *lkb;
struct dlm_rsb *rsb;
struct list_head *head;
int i, busy, rv;
busy = lockspace_busy(ls);
busy = lockspace_busy(ls, force);
spin_lock(&lslist_lock);
if (ls->ls_create_count == 1) {
if (busy > force)
if (busy) {
rv = -EBUSY;
else {
} else {
/* remove_lockspace takes ls off lslist */
ls->ls_create_count = 0;
rv = 0;
......@@ -708,12 +718,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
dlm_recoverd_stop(ls);
dlm_callback_stop(ls);
remove_lockspace(ls);
dlm_delete_debug_file(ls);
dlm_astd_suspend();
kfree(ls->ls_recover_buf);
/*
......@@ -721,31 +731,15 @@ static int release_lockspace(struct dlm_ls *ls, int force)
*/
dlm_dir_clear(ls);
kfree(ls->ls_dirtbl);
vfree(ls->ls_dirtbl);
/*
* Free all lkb's on lkbtbl[] lists.
* Free all lkb's in idr
*/
for (i = 0; i < ls->ls_lkbtbl_size; i++) {
head = &ls->ls_lkbtbl[i].list;
while (!list_empty(head)) {
lkb = list_entry(head->next, struct dlm_lkb,
lkb_idtbl_list);
list_del(&lkb->lkb_idtbl_list);
dlm_del_ast(lkb);
if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
dlm_free_lvb(lkb->lkb_lvbptr);
dlm_free_lkb(lkb);
}
}
dlm_astd_resume();
kfree(ls->ls_lkbtbl);
idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
idr_remove_all(&ls->ls_lkbidr);
idr_destroy(&ls->ls_lkbidr);
/*
* Free all rsb's on rsbtbl[] lists
......@@ -770,7 +764,14 @@ static int release_lockspace(struct dlm_ls *ls, int force)
}
}
kfree(ls->ls_rsbtbl);
vfree(ls->ls_rsbtbl);
while (!list_empty(&ls->ls_new_rsb)) {
rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
res_hashchain);
list_del(&rsb->res_hashchain);
dlm_free_rsb(rsb);
}
/*
* Free structures on any other lists
......
......@@ -512,12 +512,10 @@ static void process_sctp_notification(struct connection *con,
}
make_sockaddr(&prim.ssp_addr, 0, &addr_len);
if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
int i;
unsigned char *b=(unsigned char *)&prim.ssp_addr;
log_print("reject connect from unknown addr");
for (i=0; i<sizeof(struct sockaddr_storage);i++)
printk("%02x ", b[i]);
printk("\n");
print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
b, sizeof(struct sockaddr_storage));
sctp_send_shutdown(prim.ssp_assoc_id);
return;
}
......@@ -748,7 +746,10 @@ static int tcp_accept_from_sock(struct connection *con)
/* Get the new node's NODEID */
make_sockaddr(&peeraddr, 0, &len);
if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
unsigned char *b=(unsigned char *)&peeraddr;
log_print("connect from non cluster node");
print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
b, sizeof(struct sockaddr_storage));
sock_release(newsock);
mutex_unlock(&con->sock_mutex);
return -1;
......
......@@ -16,6 +16,7 @@
#include "memory.h"
static struct kmem_cache *lkb_cache;
static struct kmem_cache *rsb_cache;
int __init dlm_memory_init(void)
......@@ -26,6 +27,14 @@ int __init dlm_memory_init(void)
__alignof__(struct dlm_lkb), 0, NULL);
if (!lkb_cache)
ret = -ENOMEM;
rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
__alignof__(struct dlm_rsb), 0, NULL);
if (!rsb_cache) {
kmem_cache_destroy(lkb_cache);
ret = -ENOMEM;
}
return ret;
}
......@@ -33,6 +42,8 @@ void dlm_memory_exit(void)
{
if (lkb_cache)
kmem_cache_destroy(lkb_cache);
if (rsb_cache)
kmem_cache_destroy(rsb_cache);
}
char *dlm_allocate_lvb(struct dlm_ls *ls)
......@@ -48,16 +59,11 @@ void dlm_free_lvb(char *p)
kfree(p);
}
/* FIXME: have some minimal space built-in to rsb for the name and
kmalloc a separate name if needed, like dentries are done */
struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls, int namelen)
struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
{
struct dlm_rsb *r;
DLM_ASSERT(namelen <= DLM_RESNAME_MAXLEN,);
r = kzalloc(sizeof(*r) + namelen, GFP_NOFS);
r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
return r;
}
......@@ -65,7 +71,7 @@ void dlm_free_rsb(struct dlm_rsb *r)
{
if (r->res_lvbptr)
dlm_free_lvb(r->res_lvbptr);
kfree(r);
kmem_cache_free(rsb_cache, r);
}
struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
......
......@@ -16,7 +16,7 @@
int dlm_memory_init(void);
void dlm_memory_exit(void);
struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls, int namelen);
struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls);
void dlm_free_rsb(struct dlm_rsb *r);
struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
void dlm_free_lkb(struct dlm_lkb *l);
......
......@@ -58,13 +58,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
mutex_lock(&ls->ls_recoverd_active);
/*
* Suspending and resuming dlm_astd ensures that no lkb's from this ls
* will be processed by dlm_astd during recovery.
*/
dlm_astd_suspend();
dlm_astd_resume();
dlm_callback_suspend(ls);
/*
* Free non-master tossed rsb's. Master rsb's are kept on toss
......@@ -202,6 +196,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_adjust_timeouts(ls);
dlm_callback_resume(ls);
error = enable_locking(ls, rv->seq);
if (error) {
log_debug(ls, "enable_locking failed %d", error);
......@@ -222,8 +218,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_grant_after_purge(ls);
dlm_astd_wake();
log_debug(ls, "recover %llx done: %u ms",
(unsigned long long)rv->seq,
jiffies_to_msecs(jiffies - start));
......
......@@ -213,9 +213,9 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
goto out;
}
if (list_empty(&lkb->lkb_astqueue)) {
if (list_empty(&lkb->lkb_cb_list)) {
kref_get(&lkb->lkb_ref);
list_add_tail(&lkb->lkb_astqueue, &proc->asts);
list_add_tail(&lkb->lkb_cb_list, &proc->asts);
wake_up_interruptible(&proc->wait);
}
spin_unlock(&proc->asts_spin);
......@@ -832,24 +832,24 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
}
/* if we empty lkb_callbacks, we don't want to unlock the spinlock
without removing lkb_astqueue; so empty lkb_astqueue is always
without removing lkb_cb_list; so empty lkb_cb_list is always
consistent with empty lkb_callbacks */
lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_astqueue);
lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_cb_list);
rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid);
if (rv < 0) {
/* this shouldn't happen; lkb should have been removed from
list when resid was zero */
log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
list_del_init(&lkb->lkb_astqueue);
list_del_init(&lkb->lkb_cb_list);
spin_unlock(&proc->asts_spin);
/* removes ref for proc->asts, may cause lkb to be freed */
dlm_put_lkb(lkb);
goto try_another;
}
if (!resid)
list_del_init(&lkb->lkb_astqueue);
list_del_init(&lkb->lkb_cb_list);
spin_unlock(&proc->asts_spin);
if (cb.flags & DLM_CB_SKIP) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment