Commit 6fffab66 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-6.10' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This set includes some small fixes, and some big internal changes:

   - Fix a long standing race between the unlock callback for the last
     lkb struct, and removing the rsb that became unused after the final
     unlock. This could lead different nodes to inconsistent info about
     the rsb master node.

   - Remove unnecessary refcounting on callback structs, returning to
     the way things were done in the past.

   - Do message processing in softirq context. This allows dlm messages
     to be cleared more quickly and efficiently, reducing long lists of
     incomplete requests. A future change to run callbacks directly from
     this context will make this more effective.

   - The softirq message processing involved a number of patches
     changing mutexes to spinlocks and rwlocks, and a fair amount of
     code re-org in preparation.

   - Use an rhashtable for rsb structs, rather than our old internal
     hash table implementation. This also required some re-org of lists
     and locks preparation for the change.

   - Drop the dlm_scand kthread, and use timers to clear unused rsb
     structs. Scanning all rsb's periodically was a lot of wasted work.

   - Fix recent regression in logic for copying LVB data in user space
     lock requests"

* tag 'dlm-6.10' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm: (34 commits)
  dlm: return -ENOMEM if ls_recover_buf fails
  dlm: fix sleep in atomic context
  dlm: use rwlock for lkbidr
  dlm: use rwlock for rsb hash table
  dlm: drop dlm_scand kthread and use timers
  dlm: do not use ref counts for rsb in the toss state
  dlm: switch to use rhashtable for rsbs
  dlm: add rsb lists for iteration
  dlm: merge toss and keep hash table lists into one list
  dlm: change to single hashtable lock
  dlm: increment ls_count for dlm_scand
  dlm: do message processing in softirq context
  dlm: use spin_lock_bh for message processing
  dlm: remove schedule in receive path
  dlm: convert ls_recv_active from rw_semaphore to rwlock
  dlm: avoid blocking receive at the end of recovery
  dlm: convert res_lock to spinlock
  dlm: convert ls_waiters_mutex to spinlock
  dlm: drop mutex use in waiters recovery
  dlm: add new struct to save position in dlm_copy_master_names
  ...
parents a3d1f54d 7b72ab2c
......@@ -12,47 +12,50 @@
#include <trace/events/dlm.h>
#include "dlm_internal.h"
#include "lvb_table.h"
#include "memory.h"
#include "lock.h"
#include "user.h"
#include "ast.h"
void dlm_release_callback(struct kref *ref)
static void dlm_callback_work(struct work_struct *work)
{
struct dlm_callback *cb = container_of(ref, struct dlm_callback, ref);
struct dlm_callback *cb = container_of(work, struct dlm_callback, work);
if (cb->flags & DLM_CB_BAST) {
trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
cb->res_length);
cb->bastfn(cb->astparam, cb->mode);
} else if (cb->flags & DLM_CB_CAST) {
trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status,
cb->sb_flags, cb->res_name, cb->res_length);
cb->lkb_lksb->sb_status = cb->sb_status;
cb->lkb_lksb->sb_flags = cb->sb_flags;
cb->astfn(cb->astparam);
}
dlm_free_cb(cb);
}
void dlm_callback_set_last_ptr(struct dlm_callback **from,
struct dlm_callback *to)
{
if (*from)
kref_put(&(*from)->ref, dlm_release_callback);
if (to)
kref_get(&to->ref);
*from = to;
}
int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags)
int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags,
struct dlm_callback **cb)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
struct dlm_rsb *rsb = lkb->lkb_resource;
int rv = DLM_ENQUEUE_CALLBACK_SUCCESS;
struct dlm_callback *cb;
struct dlm_ls *ls = rsb->res_ls;
int copy_lvb = 0;
int prev_mode;
if (flags & DLM_CB_BAST) {
/* if cb is a bast, it should be skipped if the blocking mode is
* compatible with the last granted mode
*/
if (lkb->lkb_last_cast) {
if (dlm_modes_compat(mode, lkb->lkb_last_cast->mode)) {
if (lkb->lkb_last_cast_cb_mode != -1) {
if (dlm_modes_compat(mode, lkb->lkb_last_cast_cb_mode)) {
log_debug(ls, "skip %x bast mode %d for cast mode %d",
lkb->lkb_id, mode,
lkb->lkb_last_cast->mode);
lkb->lkb_last_cast_cb_mode);
goto out;
}
}
......@@ -63,8 +66,9 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
* is a bast for the same mode or a more restrictive mode.
* (the addional > PR check is needed for PR/CW inversion)
*/
if (lkb->lkb_last_cb && lkb->lkb_last_cb->flags & DLM_CB_BAST) {
prev_mode = lkb->lkb_last_cb->mode;
if (lkb->lkb_last_cb_mode != -1 &&
lkb->lkb_last_cb_flags & DLM_CB_BAST) {
prev_mode = lkb->lkb_last_cb_mode;
if ((prev_mode == mode) ||
(prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
......@@ -73,53 +77,55 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
goto out;
}
}
lkb->lkb_last_bast_time = ktime_get();
lkb->lkb_last_bast_cb_mode = mode;
} else if (flags & DLM_CB_CAST) {
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
prev_mode = lkb->lkb_last_cast_cb_mode;
if (!status && lkb->lkb_lksb->sb_lvbptr &&
dlm_lvb_operations[prev_mode + 1][mode + 1])
copy_lvb = 1;
}
lkb->lkb_last_cast_cb_mode = mode;
lkb->lkb_last_cast_time = ktime_get();
}
cb = dlm_allocate_cb();
if (!cb) {
lkb->lkb_last_cb_mode = mode;
lkb->lkb_last_cb_flags = flags;
*cb = dlm_allocate_cb();
if (!*cb) {
rv = DLM_ENQUEUE_CALLBACK_FAILURE;
goto out;
}
cb->flags = flags;
cb->mode = mode;
cb->sb_status = status;
cb->sb_flags = (sbflags & 0x000000FF);
kref_init(&cb->ref);
if (!test_and_set_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags))
rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
list_add_tail(&cb->list, &lkb->lkb_callbacks);
/* for tracing */
(*cb)->lkb_id = lkb->lkb_id;
(*cb)->ls_id = ls->ls_global_id;
memcpy((*cb)->res_name, rsb->res_name, rsb->res_length);
(*cb)->res_length = rsb->res_length;
if (flags & DLM_CB_CAST)
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, cb);
(*cb)->flags = flags;
(*cb)->mode = mode;
(*cb)->sb_status = status;
(*cb)->sb_flags = (sbflags & 0x000000FF);
(*cb)->copy_lvb = copy_lvb;
(*cb)->lkb_lksb = lkb->lkb_lksb;
dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb);
rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
out:
out:
return rv;
}
int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb)
{
/* oldest undelivered cb is callbacks first entry */
*cb = list_first_entry_or_null(&lkb->lkb_callbacks,
struct dlm_callback, list);
if (!*cb)
return DLM_DEQUEUE_CALLBACK_EMPTY;
/* remove it from callbacks so shift others down */
list_del(&(*cb)->list);
if (list_empty(&lkb->lkb_callbacks))
return DLM_DEQUEUE_CALLBACK_LAST;
return DLM_DEQUEUE_CALLBACK_SUCCESS;
}
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
uint32_t sbflags)
uint32_t sbflags)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
struct dlm_callback *cb;
int rv;
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
......@@ -127,88 +133,36 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
return;
}
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags,
&cb);
switch (rv) {
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
kref_get(&lkb->lkb_ref);
spin_lock(&ls->ls_cb_lock);
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
} else {
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
}
spin_unlock(&ls->ls_cb_lock);
break;
case DLM_ENQUEUE_CALLBACK_FAILURE:
WARN_ON_ONCE(1);
cb->astfn = lkb->lkb_astfn;
cb->bastfn = lkb->lkb_bastfn;
cb->astparam = lkb->lkb_astparam;
INIT_WORK(&cb->work, dlm_callback_work);
spin_lock_bh(&ls->ls_cb_lock);
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags))
list_add(&cb->list, &ls->ls_cb_delay);
else
queue_work(ls->ls_callback_wq, &cb->work);
spin_unlock_bh(&ls->ls_cb_lock);
break;
case DLM_ENQUEUE_CALLBACK_SUCCESS:
break;
case DLM_ENQUEUE_CALLBACK_FAILURE:
fallthrough;
default:
WARN_ON_ONCE(1);
break;
}
spin_unlock(&lkb->lkb_cb_lock);
}
void dlm_callback_work(struct work_struct *work)
{
struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
void (*castfn) (void *astparam);
void (*bastfn) (void *astparam, int mode);
struct dlm_callback *cb;
int rv;
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) {
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
spin_unlock(&lkb->lkb_cb_lock);
goto out;
}
spin_unlock(&lkb->lkb_cb_lock);
for (;;) {
castfn = lkb->lkb_astfn;
bastfn = lkb->lkb_bastfn;
if (cb->flags & DLM_CB_BAST) {
trace_dlm_bast(ls, lkb, cb->mode);
lkb->lkb_last_bast_time = ktime_get();
lkb->lkb_last_bast_mode = cb->mode;
bastfn(lkb->lkb_astparam, cb->mode);
} else if (cb->flags & DLM_CB_CAST) {
lkb->lkb_lksb->sb_status = cb->sb_status;
lkb->lkb_lksb->sb_flags = cb->sb_flags;
trace_dlm_ast(ls, lkb);
lkb->lkb_last_cast_time = ktime_get();
castfn(lkb->lkb_astparam);
}
kref_put(&cb->ref, dlm_release_callback);
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
spin_unlock(&lkb->lkb_cb_lock);
break;
}
spin_unlock(&lkb->lkb_cb_lock);
}
out:
/* undo kref_get from dlm_add_callback, may cause lkb to be freed */
dlm_put_lkb(lkb);
}
int dlm_callback_start(struct dlm_ls *ls)
{
ls->ls_callback_wq = alloc_workqueue("dlm_callback",
WQ_HIGHPRI | WQ_MEM_RECLAIM, 0);
ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback",
WQ_HIGHPRI | WQ_MEM_RECLAIM);
if (!ls->ls_callback_wq) {
log_print("can't start dlm_callback workqueue");
return -ENOMEM;
......@@ -225,9 +179,9 @@ void dlm_callback_stop(struct dlm_ls *ls)
void dlm_callback_suspend(struct dlm_ls *ls)
{
if (ls->ls_callback_wq) {
spin_lock(&ls->ls_cb_lock);
spin_lock_bh(&ls->ls_cb_lock);
set_bit(LSFL_CB_DELAY, &ls->ls_flags);
spin_unlock(&ls->ls_cb_lock);
spin_unlock_bh(&ls->ls_cb_lock);
flush_workqueue(ls->ls_callback_wq);
}
......@@ -237,7 +191,7 @@ void dlm_callback_suspend(struct dlm_ls *ls)
void dlm_callback_resume(struct dlm_ls *ls)
{
struct dlm_lkb *lkb, *safe;
struct dlm_callback *cb, *safe;
int count = 0, sum = 0;
bool empty;
......@@ -245,10 +199,10 @@ void dlm_callback_resume(struct dlm_ls *ls)
return;
more:
spin_lock(&ls->ls_cb_lock);
list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
list_del_init(&lkb->lkb_cb_list);
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
spin_lock_bh(&ls->ls_cb_lock);
list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) {
list_del(&cb->list);
queue_work(ls->ls_callback_wq, &cb->work);
count++;
if (count == MAX_CB_QUEUE)
break;
......@@ -256,7 +210,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
empty = list_empty(&ls->ls_cb_delay);
if (empty)
clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
spin_unlock(&ls->ls_cb_lock);
spin_unlock_bh(&ls->ls_cb_lock);
sum += count;
if (!empty) {
......
......@@ -14,19 +14,12 @@
#define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1
#define DLM_ENQUEUE_CALLBACK_SUCCESS 0
#define DLM_ENQUEUE_CALLBACK_FAILURE -1
int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags);
#define DLM_DEQUEUE_CALLBACK_EMPTY 2
#define DLM_DEQUEUE_CALLBACK_LAST 1
#define DLM_DEQUEUE_CALLBACK_SUCCESS 0
int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb);
int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags,
struct dlm_callback **cb);
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
uint32_t sbflags);
void dlm_callback_set_last_ptr(struct dlm_callback **from,
struct dlm_callback *to);
void dlm_release_callback(struct kref *ref);
void dlm_callback_work(struct work_struct *work);
int dlm_callback_start(struct dlm_ls *ls);
void dlm_callback_stop(struct dlm_ls *ls);
void dlm_callback_suspend(struct dlm_ls *ls);
......
......@@ -63,6 +63,14 @@ static void release_node(struct config_item *);
static struct configfs_attribute *comm_attrs[];
static struct configfs_attribute *node_attrs[];
const struct rhashtable_params dlm_rhash_rsb_params = {
.nelem_hint = 3, /* start small */
.key_len = DLM_RESNAME_MAXLEN,
.key_offset = offsetof(struct dlm_rsb, res_name),
.head_offset = offsetof(struct dlm_rsb, res_node),
.automatic_shrinking = true,
};
struct dlm_cluster {
struct config_group group;
unsigned int cl_tcp_port;
......
......@@ -21,6 +21,8 @@ struct dlm_config_node {
uint32_t comm_seq;
};
extern const struct rhashtable_params dlm_rhash_rsb_params;
#define DLM_MAX_ADDR_COUNT 3
#define DLM_PROTO_TCP 0
......
This diff is collapsed.
......@@ -47,15 +47,13 @@ int dlm_dir_nodeid(struct dlm_rsb *r)
return r->res_dir_nodeid;
}
void dlm_recover_dir_nodeid(struct dlm_ls *ls)
void dlm_recover_dir_nodeid(struct dlm_ls *ls, const struct list_head *root_list)
{
struct dlm_rsb *r;
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
list_for_each_entry(r, root_list, res_root_list) {
r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
}
up_read(&ls->ls_root_sem);
}
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
......@@ -200,35 +198,98 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
int len)
{
struct dlm_rsb *r;
uint32_t hash, bucket;
int rv;
hash = jhash(name, len, 0);
bucket = hash & (ls->ls_rsbtbl_size - 1);
spin_lock(&ls->ls_rsbtbl[bucket].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
if (rv)
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
name, len, &r);
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
read_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
read_unlock_bh(&ls->ls_rsbtbl_lock);
if (!rv)
return r;
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) {
if (len == r->res_length && !memcmp(name, r->res_name, len)) {
up_read(&ls->ls_root_sem);
log_debug(ls, "find_rsb_root revert to root_list %s",
r->res_name);
return r;
}
}
up_read(&ls->ls_root_sem);
return NULL;
}
struct dlm_dir_dump {
/* init values to match if whole
* dump fits to one seq. Sanity check only.
*/
uint64_t seq_init;
uint64_t nodeid_init;
/* compare local pointer with last lookup,
* just a sanity check.
*/
struct list_head *last;
unsigned int sent_res; /* for log info */
unsigned int sent_msg; /* for log info */
struct list_head list;
};
static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
{
struct dlm_dir_dump *dd, *safe;
write_lock_bh(&ls->ls_dir_dump_lock);
list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
if (dd->nodeid_init == nodeid) {
log_error(ls, "drop dump seq %llu",
(unsigned long long)dd->seq_init);
list_del(&dd->list);
kfree(dd);
}
}
write_unlock_bh(&ls->ls_dir_dump_lock);
}
static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
{
struct dlm_dir_dump *iter, *dd = NULL;
read_lock_bh(&ls->ls_dir_dump_lock);
list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
if (iter->nodeid_init == nodeid) {
dd = iter;
break;
}
}
read_unlock_bh(&ls->ls_dir_dump_lock);
return dd;
}
static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
{
struct dlm_dir_dump *dd;
dd = lookup_dir_dump(ls, nodeid);
if (dd) {
log_error(ls, "found ongoing dir dump for node %d, will drop it",
nodeid);
drop_dir_ctx(ls, nodeid);
}
dd = kzalloc(sizeof(*dd), GFP_ATOMIC);
if (!dd)
return NULL;
dd->seq_init = ls->ls_recover_seq;
dd->nodeid_init = nodeid;
write_lock_bh(&ls->ls_dir_dump_lock);
list_add(&dd->list, &ls->ls_dir_dump_list);
write_unlock_bh(&ls->ls_dir_dump_lock);
return dd;
}
/* Find the rsb where we left off (or start again), then send rsb names
for rsb's we're master of and whose directory node matches the requesting
node. inbuf is the rsb name last sent, inlen is the name's length */
......@@ -239,27 +300,50 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
struct list_head *list;
struct dlm_rsb *r;
int offset = 0, dir_nodeid;
struct dlm_dir_dump *dd;
__be16 be_namelen;
down_read(&ls->ls_root_sem);
read_lock_bh(&ls->ls_masters_lock);
if (inlen > 1) {
dd = lookup_dir_dump(ls, nodeid);
if (!dd) {
log_error(ls, "failed to lookup dir dump context nodeid: %d",
nodeid);
goto out;
}
/* next chunk in dump */
r = find_rsb_root(ls, inbuf, inlen);
if (!r) {
log_error(ls, "copy_master_names from %d start %d %.*s",
nodeid, inlen, inlen, inbuf);
goto out;
}
list = r->res_root_list.next;
list = r->res_masters_list.next;
/* sanity checks */
if (dd->last != &r->res_masters_list ||
dd->seq_init != ls->ls_recover_seq) {
log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu",
(unsigned long long)dd->seq_init,
(unsigned long long)ls->ls_recover_seq);
goto out;
}
} else {
list = ls->ls_root_list.next;
}
dd = init_dir_dump(ls, nodeid);
if (!dd) {
log_error(ls, "failed to allocate dir dump context");
goto out;
}
for (offset = 0; list != &ls->ls_root_list; list = list->next) {
r = list_entry(list, struct dlm_rsb, res_root_list);
if (r->res_nodeid)
continue;
/* start dump */
list = ls->ls_masters_list.next;
dd->last = list;
}
for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
r = list_entry(list, struct dlm_rsb, res_masters_list);
dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid != nodeid)
continue;
......@@ -277,7 +361,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
be_namelen = cpu_to_be16(0);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16);
ls->ls_recover_dir_sent_msg++;
dd->sent_msg++;
goto out;
}
......@@ -286,7 +370,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
offset += sizeof(__be16);
memcpy(outbuf + offset, r->res_name, r->res_length);
offset += r->res_length;
ls->ls_recover_dir_sent_res++;
dd->sent_res++;
dd->last = list;
}
/*
......@@ -294,14 +379,22 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
* terminating record.
*/
if ((list == &ls->ls_root_list) &&
if ((list == &ls->ls_masters_list) &&
(offset + sizeof(uint16_t) <= outlen)) {
/* end dump */
be_namelen = cpu_to_be16(0xFFFF);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16);
ls->ls_recover_dir_sent_msg++;
dd->sent_msg++;
log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
nodeid, dd->sent_res, dd->sent_msg);
write_lock_bh(&ls->ls_dir_dump_lock);
list_del_init(&dd->list);
write_unlock_bh(&ls->ls_dir_dump_lock);
kfree(dd);
}
out:
up_read(&ls->ls_root_sem);
read_unlock_bh(&ls->ls_masters_lock);
}
......@@ -14,7 +14,8 @@
int dlm_dir_nodeid(struct dlm_rsb *rsb);
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
void dlm_recover_dir_nodeid(struct dlm_ls *ls);
void dlm_recover_dir_nodeid(struct dlm_ls *ls,
const struct list_head *root_list);
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq);
void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid);
......
......@@ -16,6 +16,7 @@
* This is the main header file to be included in each DLM source file.
*/
#include <uapi/linux/dlm_device.h>
#include <linux/slab.h>
#include <linux/sched.h>
#include <linux/types.h>
......@@ -33,6 +34,7 @@
#include <linux/kernel.h>
#include <linux/jhash.h>
#include <linux/miscdevice.h>
#include <linux/rhashtable.h>
#include <linux/mutex.h>
#include <linux/idr.h>
#include <linux/ratelimit.h>
......@@ -98,17 +100,6 @@ do { \
} \
}
#define DLM_RTF_SHRINK_BIT 0
struct dlm_rsbtable {
struct rb_root keep;
struct rb_root toss;
spinlock_t lock;
unsigned long flags;
};
/*
* Lockspace member (per node in a ls)
*/
......@@ -204,8 +195,7 @@ struct dlm_args {
#define DLM_IFL_OVERLAP_CANCEL_BIT 20
#define DLM_IFL_ENDOFLIFE_BIT 21
#define DLM_IFL_DEADLOCK_CANCEL_BIT 24
#define DLM_IFL_CB_PENDING_BIT 25
#define __DLM_IFL_MAX_BIT DLM_IFL_CB_PENDING_BIT
#define __DLM_IFL_MAX_BIT DLM_IFL_DEADLOCK_CANCEL_BIT
/* lkb_dflags */
......@@ -217,14 +207,47 @@ struct dlm_args {
#define DLM_CB_CAST 0x00000001
#define DLM_CB_BAST 0x00000002
/* much of this is just saving user space pointers associated with the
* lock that we pass back to the user lib with an ast
*/
struct dlm_user_args {
struct dlm_user_proc *proc; /* each process that opens the lockspace
* device has private data
* (dlm_user_proc) on the struct file,
* the process's locks point back to it
*/
struct dlm_lksb lksb;
struct dlm_lksb __user *user_lksb;
void __user *castparam;
void __user *castaddr;
void __user *bastparam;
void __user *bastaddr;
uint64_t xid;
};
struct dlm_callback {
uint32_t flags; /* DLM_CBF_ */
int sb_status; /* copy to lksb status */
uint8_t sb_flags; /* copy to lksb flags */
int8_t mode; /* rq mode of bast, gr mode of cast */
bool copy_lvb;
struct dlm_lksb *lkb_lksb;
unsigned char lvbptr[DLM_USER_LVB_LEN];
union {
void *astparam; /* caller's ast arg */
struct dlm_user_args ua;
};
struct work_struct work;
void (*bastfn)(void *astparam, int mode);
void (*astfn)(void *astparam);
char res_name[DLM_RESNAME_MAXLEN];
size_t res_length;
uint32_t ls_id;
uint32_t lkb_id;
struct list_head list;
struct kref ref;
};
struct dlm_lkb {
......@@ -255,13 +278,10 @@ struct dlm_lkb {
struct list_head lkb_ownqueue; /* list of locks for a process */
ktime_t lkb_timestamp;
spinlock_t lkb_cb_lock;
struct work_struct lkb_cb_work;
struct list_head lkb_cb_list; /* for ls_cb_delay or proc->asts */
struct list_head lkb_callbacks;
struct dlm_callback *lkb_last_cast;
struct dlm_callback *lkb_last_cb;
int lkb_last_bast_mode;
int8_t lkb_last_cast_cb_mode;
int8_t lkb_last_bast_cb_mode;
int8_t lkb_last_cb_mode;
uint8_t lkb_last_cb_flags;
ktime_t lkb_last_cast_time; /* for debugging */
ktime_t lkb_last_bast_time; /* for debugging */
......@@ -290,7 +310,7 @@ struct dlm_lkb {
struct dlm_rsb {
struct dlm_ls *res_ls; /* the lockspace */
struct kref res_ref;
struct mutex res_mutex;
spinlock_t res_lock;
unsigned long res_flags;
int res_length; /* length of rsb name */
int res_nodeid;
......@@ -299,20 +319,22 @@ struct dlm_rsb {
int res_id; /* for ls_recover_idr */
uint32_t res_lvbseq;
uint32_t res_hash;
uint32_t res_bucket; /* rsbtbl */
unsigned long res_toss_time;
uint32_t res_first_lkid;
struct list_head res_lookup; /* lkbs waiting on first */
union {
struct list_head res_hashchain;
struct rb_node res_hashnode; /* rsbtbl */
struct rhash_head res_node; /* rsbtbl */
};
struct list_head res_grantqueue;
struct list_head res_convertqueue;
struct list_head res_waitqueue;
struct list_head res_rsbs_list;
struct list_head res_root_list; /* used for recovery */
struct list_head res_masters_list; /* used for recovery */
struct list_head res_recover_list; /* used for recovery */
struct list_head res_toss_q_list;
int res_recover_locks_count;
char *res_lvbptr;
......@@ -346,6 +368,7 @@ enum rsb_flags {
RSB_RECOVER_CONVERT,
RSB_RECOVER_GRANT,
RSB_RECOVER_LVB_INVAL,
RSB_TOSS,
};
static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
......@@ -559,24 +582,33 @@ struct dlm_ls {
struct kobject ls_kobj;
struct idr ls_lkbidr;
spinlock_t ls_lkbidr_spin;
rwlock_t ls_lkbidr_lock;
struct rhashtable ls_rsbtbl;
rwlock_t ls_rsbtbl_lock;
struct dlm_rsbtable *ls_rsbtbl;
uint32_t ls_rsbtbl_size;
struct list_head ls_toss;
struct list_head ls_keep;
struct mutex ls_waiters_mutex;
struct timer_list ls_timer;
/* this queue is ordered according the
* absolute res_toss_time jiffies time
* to mod_timer() with the first element
* if necessary.
*/
struct list_head ls_toss_q;
spinlock_t ls_toss_q_lock;
spinlock_t ls_waiters_lock;
struct list_head ls_waiters; /* lkbs needing a reply */
struct mutex ls_orphans_mutex;
spinlock_t ls_orphans_lock;
struct list_head ls_orphans;
spinlock_t ls_new_rsb_spin;
int ls_new_rsb_count;
struct list_head ls_new_rsb; /* new rsb structs */
char *ls_remove_names[DLM_REMOVE_NAMES_MAX];
int ls_remove_lens[DLM_REMOVE_NAMES_MAX];
struct list_head ls_nodes; /* current nodes in ls */
struct list_head ls_nodes_gone; /* dead node list, recovery */
int ls_num_nodes; /* number of nodes in ls */
......@@ -613,7 +645,6 @@ struct dlm_ls {
spinlock_t ls_cb_lock;
struct list_head ls_cb_delay; /* save for queue_work later */
struct timer_list ls_timer;
struct task_struct *ls_recoverd_task;
struct mutex ls_recoverd_active;
spinlock_t ls_recover_lock;
......@@ -622,15 +653,11 @@ struct dlm_ls {
uint64_t ls_recover_seq;
struct dlm_recover *ls_recover_args;
struct rw_semaphore ls_in_recovery; /* block local requests */
struct rw_semaphore ls_recv_active; /* block dlm_recv */
rwlock_t ls_recv_active; /* block dlm_recv */
struct list_head ls_requestqueue;/* queue remote requests */
atomic_t ls_requestqueue_cnt;
wait_queue_head_t ls_requestqueue_wait;
struct mutex ls_requestqueue_mutex;
rwlock_t ls_requestqueue_lock;
struct dlm_rcom *ls_recover_buf;
int ls_recover_nodeid; /* for debugging */
unsigned int ls_recover_dir_sent_res; /* for log info */
unsigned int ls_recover_dir_sent_msg; /* for log info */
unsigned int ls_recover_locks_in; /* for log info */
uint64_t ls_rcom_seq;
spinlock_t ls_rcom_spin;
......@@ -643,8 +670,10 @@ struct dlm_ls {
wait_queue_head_t ls_recover_lock_wait;
spinlock_t ls_clear_proc_locks;
struct list_head ls_root_list; /* root resources */
struct rw_semaphore ls_root_sem; /* protect root_list */
struct list_head ls_masters_list; /* root resources */
rwlock_t ls_masters_lock; /* protect root_list */
struct list_head ls_dir_dump_list; /* root resources */
rwlock_t ls_dir_dump_lock; /* protect root_list */
const struct dlm_lockspace_ops *ls_ops;
void *ls_ops_arg;
......@@ -686,23 +715,7 @@ struct dlm_ls {
#define LSFL_UEVENT_WAIT 7
#define LSFL_CB_DELAY 9
#define LSFL_NODIR 10
/* much of this is just saving user space pointers associated with the
lock that we pass back to the user lib with an ast */
struct dlm_user_args {
struct dlm_user_proc *proc; /* each process that opens the lockspace
device has private data
(dlm_user_proc) on the struct file,
the process's locks point back to it*/
struct dlm_lksb lksb;
struct dlm_lksb __user *user_lksb;
void __user *castparam;
void __user *castaddr;
void __user *bastparam;
void __user *bastaddr;
uint64_t xid;
};
#define LSFL_RECV_MSG_BLOCKED 11
#define DLM_PROC_FLAGS_CLOSING 1
#define DLM_PROC_FLAGS_COMPAT 2
......
This diff is collapsed.
......@@ -11,6 +11,7 @@
#ifndef __LOCK_DOT_H__
#define __LOCK_DOT_H__
void dlm_rsb_toss_timer(struct timer_list *timer);
void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len);
void dlm_print_lkb(struct dlm_lkb *lkb);
......@@ -18,20 +19,23 @@ void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
uint32_t saved_seq);
void dlm_receive_buffer(const union dlm_packet *p, int nodeid);
int dlm_modes_compat(int mode1, int mode2);
void free_toss_rsb(struct dlm_rsb *r);
void dlm_put_rsb(struct dlm_rsb *r);
void dlm_hold_rsb(struct dlm_rsb *r);
int dlm_put_lkb(struct dlm_lkb *lkb);
void dlm_scan_rsbs(struct dlm_ls *ls);
int dlm_lock_recovery_try(struct dlm_ls *ls);
void dlm_lock_recovery(struct dlm_ls *ls);
void dlm_unlock_recovery(struct dlm_ls *ls);
void dlm_timer_resume(struct dlm_ls *ls);
int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
int len, unsigned int flags, int *r_nodeid, int *result);
int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
struct dlm_rsb **r_ret);
void dlm_recover_purge(struct dlm_ls *ls);
void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list);
void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
void dlm_recover_grant(struct dlm_ls *ls);
int dlm_recover_waiters_post(struct dlm_ls *ls);
......@@ -68,12 +72,12 @@ static inline int is_master(struct dlm_rsb *r)
static inline void lock_rsb(struct dlm_rsb *r)
{
mutex_lock(&r->res_mutex);
spin_lock_bh(&r->res_lock);
}
static inline void unlock_rsb(struct dlm_rsb *r)
{
mutex_unlock(&r->res_mutex);
spin_unlock_bh(&r->res_lock);
}
#endif
......
This diff is collapsed.
......@@ -204,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work);
static DECLARE_WORK(process_work, process_dlm_messages);
static DEFINE_SPINLOCK(processqueue_lock);
static bool process_dlm_messages_pending;
static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
static atomic_t processqueue_count;
static LIST_HEAD(processqueue);
......@@ -248,7 +249,7 @@ struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
{
return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
return KMEM_CACHE(dlm_msg, 0);
}
/* need to held writequeue_lock */
......@@ -867,36 +868,38 @@ static void process_dlm_messages(struct work_struct *work)
{
struct processqueue_entry *pentry;
spin_lock(&processqueue_lock);
spin_lock_bh(&processqueue_lock);
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (WARN_ON_ONCE(!pentry)) {
process_dlm_messages_pending = false;
spin_unlock(&processqueue_lock);
spin_unlock_bh(&processqueue_lock);
return;
}
list_del(&pentry->list);
atomic_dec(&processqueue_count);
spin_unlock(&processqueue_lock);
if (atomic_dec_and_test(&processqueue_count))
wake_up(&processqueue_wq);
spin_unlock_bh(&processqueue_lock);
for (;;) {
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
pentry->buflen);
free_processqueue_entry(pentry);
spin_lock(&processqueue_lock);
spin_lock_bh(&processqueue_lock);
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (!pentry) {
process_dlm_messages_pending = false;
spin_unlock(&processqueue_lock);
spin_unlock_bh(&processqueue_lock);
break;
}
list_del(&pentry->list);
atomic_dec(&processqueue_count);
spin_unlock(&processqueue_lock);
if (atomic_dec_and_test(&processqueue_count))
wake_up(&processqueue_wq);
spin_unlock_bh(&processqueue_lock);
}
}
......@@ -966,14 +969,14 @@ static int receive_from_sock(struct connection *con, int buflen)
memmove(con->rx_leftover_buf, pentry->buf + ret,
con->rx_leftover);
spin_lock(&processqueue_lock);
spin_lock_bh(&processqueue_lock);
ret = atomic_inc_return(&processqueue_count);
list_add_tail(&pentry->list, &processqueue);
if (!process_dlm_messages_pending) {
process_dlm_messages_pending = true;
queue_work(process_workqueue, &process_work);
}
spin_unlock(&processqueue_lock);
spin_unlock_bh(&processqueue_lock);
if (ret > DLM_MAX_PROCESS_BUFFERS)
return DLM_IO_FLUSH;
......@@ -1229,14 +1232,13 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
};
static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
gfp_t allocation, char **ppc,
void (*cb)(void *data),
char **ppc, void (*cb)(void *data),
void *data)
{
struct writequeue_entry *e;
struct dlm_msg *msg;
msg = dlm_allocate_msg(allocation);
msg = dlm_allocate_msg();
if (!msg)
return NULL;
......@@ -1261,9 +1263,8 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
* dlm_lowcomms_commit_msg which is a must call if success
*/
#ifndef __CHECKER__
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
char **ppc, void (*cb)(void *data),
void *data)
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
void (*cb)(void *data), void *data)
{
struct connection *con;
struct dlm_msg *msg;
......@@ -1284,7 +1285,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
return NULL;
}
msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data);
if (!msg) {
srcu_read_unlock(&connections_srcu, idx);
return NULL;
......@@ -1348,8 +1349,8 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
if (msg->retransmit)
return 1;
msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
GFP_ATOMIC, &ppc, NULL, NULL);
msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc,
NULL, NULL);
if (!msg_resend)
return -ENOMEM;
......@@ -1513,7 +1514,20 @@ static void process_recv_sockets(struct work_struct *work)
/* CF_RECV_PENDING cleared */
break;
case DLM_IO_FLUSH:
flush_workqueue(process_workqueue);
/* we can't flush the process_workqueue here because a
* WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non
* WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead
* we have a waitqueue to wait until all messages are
* processed.
*
* This handling is only necessary to backoff the sender and
* not queue all messages from the socket layer into DLM
* processqueue. When DLM is capable to parse multiple messages
* on an e.g. per socket basis this handling can might be
* removed. Especially in a message burst we are too slow to
* process messages and the queue will fill up memory.
*/
wait_event(processqueue_wq, !atomic_read(&processqueue_count));
fallthrough;
case DLM_IO_RESCHED:
cond_resched();
......@@ -1703,11 +1717,7 @@ static int work_start(void)
return -ENOMEM;
}
/* ordered dlm message process queue,
* should be converted to a tasklet
*/
process_workqueue = alloc_ordered_workqueue("dlm_process",
WQ_HIGHPRI | WQ_MEM_RECLAIM);
process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0);
if (!process_workqueue) {
log_print("can't start dlm_process");
destroy_workqueue(io_workqueue);
......
......@@ -39,9 +39,8 @@ void dlm_lowcomms_stop(void);
void dlm_lowcomms_init(void);
void dlm_lowcomms_exit(void);
int dlm_lowcomms_close(int nodeid);
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
char **ppc, void (*cb)(void *data),
void *data);
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
void (*cb)(void *data), void *data);
void dlm_lowcomms_commit_msg(struct dlm_msg *msg);
void dlm_lowcomms_put_msg(struct dlm_msg *msg);
int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
......
......@@ -630,7 +630,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
* message to the requestqueue without races.
*/
down_write(&ls->ls_recv_active);
write_lock_bh(&ls->ls_recv_active);
/*
* Abort any recovery that's in progress (see RECOVER_STOP,
......@@ -638,18 +638,25 @@ int dlm_ls_stop(struct dlm_ls *ls)
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
*/
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
if (new)
timer_delete_sync(&ls->ls_timer);
ls->ls_recover_seq++;
spin_unlock(&ls->ls_recover_lock);
/* activate requestqueue and stop processing */
write_lock_bh(&ls->ls_requestqueue_lock);
set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
write_unlock_bh(&ls->ls_requestqueue_lock);
spin_unlock_bh(&ls->ls_recover_lock);
/*
* Let dlm_recv run again, now any normal messages will be saved on the
* requestqueue for later.
*/
up_write(&ls->ls_recv_active);
write_unlock_bh(&ls->ls_recv_active);
/*
* This in_recovery lock does two things:
......@@ -674,13 +681,13 @@ int dlm_ls_stop(struct dlm_ls *ls)
dlm_recoverd_suspend(ls);
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
kfree(ls->ls_slots);
ls->ls_slots = NULL;
ls->ls_num_slots = 0;
ls->ls_slots_size = 0;
ls->ls_recover_status = 0;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
dlm_recoverd_resume(ls);
......@@ -714,12 +721,12 @@ int dlm_ls_start(struct dlm_ls *ls)
if (error < 0)
goto fail_rv;
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
/* the lockspace needs to be stopped before it can be started */
if (!dlm_locking_stopped(ls)) {
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
log_error(ls, "start ignored: lockspace running");
error = -EINVAL;
goto fail;
......@@ -730,7 +737,7 @@ int dlm_ls_start(struct dlm_ls *ls)
rv->seq = ++ls->ls_recover_seq;
rv_old = ls->ls_recover_args;
ls->ls_recover_args = rv;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
if (rv_old) {
log_error(ls, "unused recovery %llx %d",
......
......@@ -84,7 +84,7 @@ char *dlm_allocate_lvb(struct dlm_ls *ls)
{
char *p;
p = kzalloc(ls->ls_lvblen, GFP_NOFS);
p = kzalloc(ls->ls_lvblen, GFP_ATOMIC);
return p;
}
......@@ -97,7 +97,7 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
{
struct dlm_rsb *r;
r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
r = kmem_cache_zalloc(rsb_cache, GFP_ATOMIC);
return r;
}
......@@ -112,7 +112,7 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
{
struct dlm_lkb *lkb;
lkb = kmem_cache_zalloc(lkb_cache, GFP_NOFS);
lkb = kmem_cache_zalloc(lkb_cache, GFP_ATOMIC);
return lkb;
}
......@@ -127,16 +127,12 @@ void dlm_free_lkb(struct dlm_lkb *lkb)
}
}
/* drop references if they are set */
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
kmem_cache_free(lkb_cache, lkb);
}
struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation)
struct dlm_mhandle *dlm_allocate_mhandle(void)
{
return kmem_cache_alloc(mhandle_cache, allocation);
return kmem_cache_alloc(mhandle_cache, GFP_ATOMIC);
}
void dlm_free_mhandle(struct dlm_mhandle *mhandle)
......@@ -154,9 +150,9 @@ void dlm_free_writequeue(struct writequeue_entry *writequeue)
kmem_cache_free(writequeue_cache, writequeue);
}
struct dlm_msg *dlm_allocate_msg(gfp_t allocation)
struct dlm_msg *dlm_allocate_msg(void)
{
return kmem_cache_alloc(msg_cache, allocation);
return kmem_cache_alloc(msg_cache, GFP_ATOMIC);
}
void dlm_free_msg(struct dlm_msg *msg)
......
......@@ -20,11 +20,11 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
void dlm_free_lkb(struct dlm_lkb *l);
char *dlm_allocate_lvb(struct dlm_ls *ls);
void dlm_free_lvb(char *l);
struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation);
struct dlm_mhandle *dlm_allocate_mhandle(void);
void dlm_free_mhandle(struct dlm_mhandle *mhandle);
struct writequeue_entry *dlm_allocate_writequeue(void);
void dlm_free_writequeue(struct writequeue_entry *writequeue);
struct dlm_msg *dlm_allocate_msg(gfp_t allocation);
struct dlm_msg *dlm_allocate_msg(void);
void dlm_free_msg(struct dlm_msg *msg);
struct dlm_callback *dlm_allocate_cb(void);
void dlm_free_cb(struct dlm_callback *cb);
......
......@@ -226,8 +226,7 @@ static DEFINE_MUTEX(close_lock);
struct kmem_cache *dlm_midcomms_cache_create(void)
{
return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle),
0, 0, NULL);
return KMEM_CACHE(dlm_mhandle, 0);
}
static inline const char *dlm_state_str(int state)
......@@ -365,9 +364,9 @@ int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
node->users = 0;
midcomms_node_reset(node);
spin_lock(&nodes_lock);
spin_lock_bh(&nodes_lock);
hlist_add_head_rcu(&node->hlist, &node_hash[r]);
spin_unlock(&nodes_lock);
spin_unlock_bh(&nodes_lock);
node->debugfs = dlm_create_debug_comms_file(nodeid, node);
return 0;
......@@ -380,8 +379,7 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
struct dlm_msg *msg;
char *ppc;
msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_ATOMIC, &ppc,
NULL, NULL);
msg = dlm_lowcomms_new_msg(nodeid, mb_len, &ppc, NULL, NULL);
if (!msg)
return -ENOMEM;
......@@ -429,7 +427,7 @@ static int dlm_send_fin(struct midcomms_node *node,
struct dlm_mhandle *mh;
char *ppc;
mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_ATOMIC, &ppc);
mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, &ppc);
if (!mh)
return -ENOMEM;
......@@ -479,7 +477,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
{
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
pr_debug("receive passive fin ack from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
......@@ -493,13 +491,13 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
wake_up(&node->shutdown_wait);
break;
default:
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
}
static void dlm_receive_buffer_3_2_trace(uint32_t seq,
......@@ -536,7 +534,7 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
if (is_expected_seq) {
switch (p->header.h_cmd) {
case DLM_FIN:
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
pr_debug("receive fin msg from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
......@@ -577,13 +575,13 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
/* probably remove_member caught it, do nothing */
break;
default:
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
break;
default:
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
......@@ -977,13 +975,13 @@ static void midcomms_new_msg_cb(void *data)
}
static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
int len, gfp_t allocation, char **ppc)
int len, char **ppc)
{
struct dlm_opts *opts;
struct dlm_msg *msg;
msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN,
allocation, ppc, midcomms_new_msg_cb, mh);
ppc, midcomms_new_msg_cb, mh);
if (!msg)
return NULL;
......@@ -1002,8 +1000,7 @@ static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int node
* dlm_midcomms_commit_mhandle which is a must call if success
*/
#ifndef __CHECKER__
struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc)
struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc)
{
struct midcomms_node *node;
struct dlm_mhandle *mh;
......@@ -1018,7 +1015,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
/* this is a bug, however we going on and hope it will be resolved */
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
mh = dlm_allocate_mhandle(allocation);
mh = dlm_allocate_mhandle();
if (!mh)
goto err;
......@@ -1029,8 +1026,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
switch (node->version) {
case DLM_VERSION_3_1:
msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
NULL, NULL);
msg = dlm_lowcomms_new_msg(nodeid, len, ppc, NULL, NULL);
if (!msg) {
dlm_free_mhandle(mh);
goto err;
......@@ -1041,8 +1037,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
/* send ack back if necessary */
dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
ppc);
msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, ppc);
if (!msg) {
dlm_free_mhandle(mh);
goto err;
......@@ -1187,7 +1182,7 @@ void dlm_midcomms_exit(void)
static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
{
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
pr_debug("receive active fin ack from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
......@@ -1207,13 +1202,13 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
wake_up(&node->shutdown_wait);
break;
default:
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
}
void dlm_midcomms_add_member(int nodeid)
......@@ -1228,7 +1223,7 @@ void dlm_midcomms_add_member(int nodeid)
return;
}
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
if (!node->users) {
pr_debug("receive add member from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
......@@ -1256,7 +1251,7 @@ void dlm_midcomms_add_member(int nodeid)
node->users++;
pr_debug("node %d users inc count %d\n", nodeid, node->users);
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
}
......@@ -1274,13 +1269,13 @@ void dlm_midcomms_remove_member(int nodeid)
return;
}
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
/* case of dlm_midcomms_addr() created node but
* was not added before because dlm_midcomms_close()
* removed the node
*/
if (!node->users) {
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
return;
}
......@@ -1318,7 +1313,7 @@ void dlm_midcomms_remove_member(int nodeid)
break;
}
}
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
}
......@@ -1356,7 +1351,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
return;
}
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
pr_debug("receive active shutdown for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
switch (node->state) {
......@@ -1375,7 +1370,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
*/
break;
}
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
if (DLM_DEBUG_FENCE_TERMINATION)
msleep(5000);
......@@ -1446,9 +1441,9 @@ int dlm_midcomms_close(int nodeid)
ret = dlm_lowcomms_close(nodeid);
dlm_delete_debug_comms_file(node->debugfs);
spin_lock(&nodes_lock);
spin_lock_bh(&nodes_lock);
hlist_del_rcu(&node->hlist);
spin_unlock(&nodes_lock);
spin_unlock_bh(&nodes_lock);
srcu_read_unlock(&nodes_srcu, idx);
/* wait that all readers left until flush send queue */
......@@ -1502,8 +1497,8 @@ int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
rd.node = node;
rd.buf = buf;
msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS,
&msgbuf, midcomms_new_rawmsg_cb, &rd);
msg = dlm_lowcomms_new_msg(node->nodeid, buflen, &msgbuf,
midcomms_new_rawmsg_cb, &rd);
if (!msg)
return -ENOMEM;
......
......@@ -16,8 +16,7 @@ struct midcomms_node;
int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len);
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc);
struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc);
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
int namelen);
int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
......
......@@ -55,7 +55,7 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
struct dlm_mhandle *mh;
char *mb;
mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
if (!mh) {
log_print("%s to %d type %d len %d ENOBUFS",
__func__, to_nodeid, type, len);
......@@ -75,8 +75,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
struct dlm_msg *msg;
char *mb;
msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
NULL, NULL);
msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, &mb, NULL, NULL);
if (!msg) {
log_print("create_rcom to %d type %d len %d ENOBUFS",
to_nodeid, type, len);
......@@ -144,18 +143,18 @@ static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
{
spin_lock(&ls->ls_rcom_spin);
spin_lock_bh(&ls->ls_rcom_spin);
*new_seq = cpu_to_le64(++ls->ls_rcom_seq);
set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
spin_unlock(&ls->ls_rcom_spin);
spin_unlock_bh(&ls->ls_rcom_spin);
}
static void disallow_sync_reply(struct dlm_ls *ls)
{
spin_lock(&ls->ls_rcom_spin);
spin_lock_bh(&ls->ls_rcom_spin);
clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
spin_unlock(&ls->ls_rcom_spin);
spin_unlock_bh(&ls->ls_rcom_spin);
}
/*
......@@ -246,10 +245,10 @@ static void receive_rcom_status(struct dlm_ls *ls,
goto do_create;
}
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
num_slots = ls->ls_num_slots;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
len += num_slots * sizeof(struct rcom_slot);
do_create:
......@@ -267,9 +266,9 @@ static void receive_rcom_status(struct dlm_ls *ls,
if (!num_slots)
goto do_send;
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
if (ls->ls_num_slots != num_slots) {
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
log_debug(ls, "receive_rcom_status num_slots %d to %d",
num_slots, ls->ls_num_slots);
rc->rc_result = 0;
......@@ -278,7 +277,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
}
dlm_slots_copy_out(ls, rc);
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
do_send:
send_rcom_stateless(msg, rc);
......@@ -286,7 +285,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
{
spin_lock(&ls->ls_rcom_spin);
spin_lock_bh(&ls->ls_rcom_spin);
if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
......@@ -302,7 +301,7 @@ static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
wake_up(&ls->ls_wait_general);
out:
spin_unlock(&ls->ls_rcom_spin);
spin_unlock_bh(&ls->ls_rcom_spin);
}
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
......@@ -510,7 +509,7 @@ int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in)
char *mb;
int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
mh = dlm_midcomms_get_mhandle(nodeid, mb_len, &mb);
if (!mh)
return -ENOBUFS;
......@@ -614,11 +613,11 @@ void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
break;
}
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
stop = dlm_recovery_stopped(ls);
seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
goto ignore;
......
......@@ -74,9 +74,9 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
uint32_t dlm_recover_status(struct dlm_ls *ls)
{
uint32_t status;
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
return status;
}
......@@ -87,9 +87,9 @@ static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
{
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
_set_recover_status(ls, status);
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
}
static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
......@@ -188,13 +188,13 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
if (!rv) {
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
_set_recover_status(ls, DLM_RS_NODES_ALL);
ls->ls_num_slots = num_slots;
ls->ls_slots_size = slots_size;
ls->ls_slots = slots;
ls->ls_generation = gen;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
} else {
dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
}
......@@ -241,9 +241,9 @@ static int recover_list_empty(struct dlm_ls *ls)
{
int empty;
spin_lock(&ls->ls_recover_list_lock);
spin_lock_bh(&ls->ls_recover_list_lock);
empty = list_empty(&ls->ls_recover_list);
spin_unlock(&ls->ls_recover_list_lock);
spin_unlock_bh(&ls->ls_recover_list_lock);
return empty;
}
......@@ -252,23 +252,23 @@ static void recover_list_add(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
spin_lock(&ls->ls_recover_list_lock);
spin_lock_bh(&ls->ls_recover_list_lock);
if (list_empty(&r->res_recover_list)) {
list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
ls->ls_recover_list_count++;
dlm_hold_rsb(r);
}
spin_unlock(&ls->ls_recover_list_lock);
spin_unlock_bh(&ls->ls_recover_list_lock);
}
static void recover_list_del(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
spin_lock(&ls->ls_recover_list_lock);
spin_lock_bh(&ls->ls_recover_list_lock);
list_del_init(&r->res_recover_list);
ls->ls_recover_list_count--;
spin_unlock(&ls->ls_recover_list_lock);
spin_unlock_bh(&ls->ls_recover_list_lock);
dlm_put_rsb(r);
}
......@@ -277,7 +277,7 @@ static void recover_list_clear(struct dlm_ls *ls)
{
struct dlm_rsb *r, *s;
spin_lock(&ls->ls_recover_list_lock);
spin_lock_bh(&ls->ls_recover_list_lock);
list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
list_del_init(&r->res_recover_list);
r->res_recover_locks_count = 0;
......@@ -290,17 +290,17 @@ static void recover_list_clear(struct dlm_ls *ls)
ls->ls_recover_list_count);
ls->ls_recover_list_count = 0;
}
spin_unlock(&ls->ls_recover_list_lock);
spin_unlock_bh(&ls->ls_recover_list_lock);
}
static int recover_idr_empty(struct dlm_ls *ls)
{
int empty = 1;
spin_lock(&ls->ls_recover_idr_lock);
spin_lock_bh(&ls->ls_recover_idr_lock);
if (ls->ls_recover_list_count)
empty = 0;
spin_unlock(&ls->ls_recover_idr_lock);
spin_unlock_bh(&ls->ls_recover_idr_lock);
return empty;
}
......@@ -310,8 +310,7 @@ static int recover_idr_add(struct dlm_rsb *r)
struct dlm_ls *ls = r->res_ls;
int rv;
idr_preload(GFP_NOFS);
spin_lock(&ls->ls_recover_idr_lock);
spin_lock_bh(&ls->ls_recover_idr_lock);
if (r->res_id) {
rv = -1;
goto out_unlock;
......@@ -325,8 +324,7 @@ static int recover_idr_add(struct dlm_rsb *r)
dlm_hold_rsb(r);
rv = 0;
out_unlock:
spin_unlock(&ls->ls_recover_idr_lock);
idr_preload_end();
spin_unlock_bh(&ls->ls_recover_idr_lock);
return rv;
}
......@@ -334,11 +332,11 @@ static void recover_idr_del(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
spin_lock(&ls->ls_recover_idr_lock);
spin_lock_bh(&ls->ls_recover_idr_lock);
idr_remove(&ls->ls_recover_idr, r->res_id);
r->res_id = 0;
ls->ls_recover_list_count--;
spin_unlock(&ls->ls_recover_idr_lock);
spin_unlock_bh(&ls->ls_recover_idr_lock);
dlm_put_rsb(r);
}
......@@ -347,9 +345,9 @@ static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
{
struct dlm_rsb *r;
spin_lock(&ls->ls_recover_idr_lock);
spin_lock_bh(&ls->ls_recover_idr_lock);
r = idr_find(&ls->ls_recover_idr, (int)id);
spin_unlock(&ls->ls_recover_idr_lock);
spin_unlock_bh(&ls->ls_recover_idr_lock);
return r;
}
......@@ -358,7 +356,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
struct dlm_rsb *r;
int id;
spin_lock(&ls->ls_recover_idr_lock);
spin_lock_bh(&ls->ls_recover_idr_lock);
idr_for_each_entry(&ls->ls_recover_idr, r, id) {
idr_remove(&ls->ls_recover_idr, id);
......@@ -374,7 +372,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
ls->ls_recover_list_count);
ls->ls_recover_list_count = 0;
}
spin_unlock(&ls->ls_recover_idr_lock);
spin_unlock_bh(&ls->ls_recover_idr_lock);
}
......@@ -521,7 +519,8 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
* the correct dir node.
*/
int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq,
const struct list_head *root_list)
{
struct dlm_rsb *r;
unsigned int total = 0;
......@@ -531,10 +530,8 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
log_rinfo(ls, "dlm_recover_masters");
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
list_for_each_entry(r, root_list, res_root_list) {
if (dlm_recovery_stopped(ls)) {
up_read(&ls->ls_root_sem);
error = -EINTR;
goto out;
}
......@@ -548,12 +545,9 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
cond_resched();
total++;
if (error) {
up_read(&ls->ls_root_sem);
if (error)
goto out;
}
}
up_read(&ls->ls_root_sem);
log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
......@@ -658,13 +652,13 @@ static int recover_locks(struct dlm_rsb *r, uint64_t seq)
return error;
}
int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq,
const struct list_head *root_list)
{
struct dlm_rsb *r;
int error, count = 0;
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
list_for_each_entry(r, root_list, res_root_list) {
if (is_master(r)) {
rsb_clear_flag(r, RSB_NEW_MASTER);
continue;
......@@ -675,19 +669,15 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
if (dlm_recovery_stopped(ls)) {
error = -EINTR;
up_read(&ls->ls_root_sem);
goto out;
}
error = recover_locks(r, seq);
if (error) {
up_read(&ls->ls_root_sem);
if (error)
goto out;
}
count += r->res_recover_locks_count;
}
up_read(&ls->ls_root_sem);
log_rinfo(ls, "dlm_recover_locks %d out", count);
......@@ -856,13 +846,12 @@ static void recover_grant(struct dlm_rsb *r)
rsb_set_flag(r, RSB_RECOVER_GRANT);
}
void dlm_recover_rsbs(struct dlm_ls *ls)
void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list)
{
struct dlm_rsb *r;
unsigned int count = 0;
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
list_for_each_entry(r, root_list, res_root_list) {
lock_rsb(r);
if (is_master(r)) {
if (rsb_flag(r, RSB_RECOVER_CONVERT))
......@@ -883,7 +872,6 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
rsb_clear_flag(r, RSB_NEW_MASTER2);
unlock_rsb(r);
}
up_read(&ls->ls_root_sem);
if (count)
log_rinfo(ls, "dlm_recover_rsbs %d done", count);
......@@ -891,66 +879,25 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
/* Create a single list of all root rsb's to be used during recovery */
int dlm_create_root_list(struct dlm_ls *ls)
{
struct rb_node *n;
struct dlm_rsb *r;
int i, error = 0;
down_write(&ls->ls_root_sem);
if (!list_empty(&ls->ls_root_list)) {
log_error(ls, "root list not empty");
error = -EINVAL;
goto out;
}
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
list_add(&r->res_root_list, &ls->ls_root_list);
dlm_hold_rsb(r);
}
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
log_error(ls, "dlm_create_root_list toss not empty");
spin_unlock(&ls->ls_rsbtbl[i].lock);
}
out:
up_write(&ls->ls_root_sem);
return error;
}
void dlm_release_root_list(struct dlm_ls *ls)
void dlm_clear_toss(struct dlm_ls *ls)
{
struct dlm_rsb *r, *safe;
unsigned int count = 0;
down_write(&ls->ls_root_sem);
list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
list_del_init(&r->res_root_list);
dlm_put_rsb(r);
}
up_write(&ls->ls_root_sem);
}
write_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
list_del(&r->res_rsbs_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
void dlm_clear_toss(struct dlm_ls *ls)
{
struct rb_node *n, *next;
struct dlm_rsb *r;
unsigned int count = 0;
int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode);
rb_erase(n, &ls->ls_rsbtbl[i].toss);
dlm_free_rsb(r);
count++;
}
spin_unlock(&ls->ls_rsbtbl[i].lock);
/* remove it from the toss queue if its part of it */
if (!list_empty(&r->res_toss_q_list))
list_del_init(&r->res_toss_q_list);
free_toss_rsb(r);
count++;
}
write_unlock_bh(&ls->ls_rsbtbl_lock);
if (count)
log_rinfo(ls, "dlm_clear_toss %u done", count);
......
......@@ -19,14 +19,14 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq,
const struct list_head *root_list);
int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc);
int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq,
const struct list_head *root_list);
void dlm_recovered_lock(struct dlm_rsb *r);
int dlm_create_root_list(struct dlm_ls *ls);
void dlm_release_root_list(struct dlm_ls *ls);
void dlm_clear_toss(struct dlm_ls *ls);
void dlm_recover_rsbs(struct dlm_ls *ls);
void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list);
#endif /* __RECOVER_DOT_H__ */
......@@ -20,6 +20,67 @@
#include "requestqueue.h"
#include "recoverd.h"
static int dlm_create_masters_list(struct dlm_ls *ls)
{
struct dlm_rsb *r;
int error = 0;
write_lock_bh(&ls->ls_masters_lock);
if (!list_empty(&ls->ls_masters_list)) {
log_error(ls, "root list not empty");
error = -EINVAL;
goto out;
}
read_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
if (r->res_nodeid)
continue;
list_add(&r->res_masters_list, &ls->ls_masters_list);
dlm_hold_rsb(r);
}
read_unlock_bh(&ls->ls_rsbtbl_lock);
out:
write_unlock_bh(&ls->ls_masters_lock);
return error;
}
static void dlm_release_masters_list(struct dlm_ls *ls)
{
struct dlm_rsb *r, *safe;
write_lock_bh(&ls->ls_masters_lock);
list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
list_del_init(&r->res_masters_list);
dlm_put_rsb(r);
}
write_unlock_bh(&ls->ls_masters_lock);
}
static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
{
struct dlm_rsb *r;
read_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
list_add(&r->res_root_list, root_list);
dlm_hold_rsb(r);
}
WARN_ON_ONCE(!list_empty(&ls->ls_toss));
read_unlock_bh(&ls->ls_rsbtbl_lock);
}
static void dlm_release_root_list(struct list_head *root_list)
{
struct dlm_rsb *r, *safe;
list_for_each_entry_safe(r, safe, root_list, res_root_list) {
list_del_init(&r->res_root_list);
dlm_put_rsb(r);
}
}
/* If the start for which we're re-enabling locking (seq) has been superseded
by a newer stop (ls_recover_seq), we need to leave locking disabled.
......@@ -32,24 +93,35 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
{
int error = -EINTR;
down_write(&ls->ls_recv_active);
write_lock_bh(&ls->ls_recv_active);
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
if (ls->ls_recover_seq == seq) {
set_bit(LSFL_RUNNING, &ls->ls_flags);
/* Schedule next timer if recovery put something on toss.
*
* The rsbs that was queued while recovery on toss hasn't
* started yet because LSFL_RUNNING was set everything
* else recovery hasn't started as well because ls_in_recovery
* is still hold. So we should not run into the case that
* dlm_timer_resume() queues a timer that can occur in
* a no op.
*/
dlm_timer_resume(ls);
/* unblocks processes waiting to enter the dlm */
up_write(&ls->ls_in_recovery);
clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
error = 0;
}
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
up_write(&ls->ls_recv_active);
write_unlock_bh(&ls->ls_recv_active);
return error;
}
static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
{
LIST_HEAD(root_list);
unsigned long start;
int error, neg = 0;
......@@ -66,7 +138,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* routines.
*/
dlm_create_root_list(ls);
dlm_create_root_list(ls, &root_list);
/*
* Add or remove nodes from the lockspace's ls_nodes list.
......@@ -82,10 +154,25 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail;
}
dlm_recover_dir_nodeid(ls);
dlm_recover_dir_nodeid(ls, &root_list);
/* Create a snapshot of all active rsbs were we are the master of.
* During the barrier between dlm_recover_members_wait() and
* dlm_recover_directory() other nodes can dump their necessary
* directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom
* communication dlm_copy_master_names() handling.
*
* TODO We should create a per lockspace list that contains rsbs
* that we are the master of. Instead of creating this list while
* recovery we keep track of those rsbs while locking handling and
* recovery can use it when necessary.
*/
error = dlm_create_masters_list(ls);
if (error) {
log_rinfo(ls, "dlm_create_masters_list error %d", error);
goto fail_root_list;
}
ls->ls_recover_dir_sent_res = 0;
ls->ls_recover_dir_sent_msg = 0;
ls->ls_recover_locks_in = 0;
dlm_set_recover_status(ls, DLM_RS_NODES);
......@@ -93,7 +180,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
error = dlm_recover_members_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_members_wait error %d", error);
goto fail;
dlm_release_masters_list(ls);
goto fail_root_list;
}
start = jiffies;
......@@ -106,7 +194,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
error = dlm_recover_directory(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_directory error %d", error);
goto fail;
dlm_release_masters_list(ls);
goto fail_root_list;
}
dlm_set_recover_status(ls, DLM_RS_DIR);
......@@ -114,11 +203,11 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
error = dlm_recover_directory_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
goto fail;
dlm_release_masters_list(ls);
goto fail_root_list;
}
log_rinfo(ls, "dlm_recover_directory %u out %u messages",
ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
dlm_release_masters_list(ls);
/*
* We may have outstanding operations that are waiting for a reply from
......@@ -130,7 +219,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
if (dlm_recovery_stopped(ls)) {
error = -EINTR;
goto fail;
goto fail_root_list;
}
if (neg || dlm_no_directory(ls)) {
......@@ -138,27 +227,27 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* Clear lkb's for departed nodes.
*/
dlm_recover_purge(ls);
dlm_recover_purge(ls, &root_list);
/*
* Get new master nodeid's for rsb's that were mastered on
* departed nodes.
*/
error = dlm_recover_masters(ls, rv->seq);
error = dlm_recover_masters(ls, rv->seq, &root_list);
if (error) {
log_rinfo(ls, "dlm_recover_masters error %d", error);
goto fail;
goto fail_root_list;
}
/*
* Send our locks on remastered rsb's to the new masters.
*/
error = dlm_recover_locks(ls, rv->seq);
error = dlm_recover_locks(ls, rv->seq, &root_list);
if (error) {
log_rinfo(ls, "dlm_recover_locks error %d", error);
goto fail;
goto fail_root_list;
}
dlm_set_recover_status(ls, DLM_RS_LOCKS);
......@@ -166,7 +255,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
error = dlm_recover_locks_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
goto fail;
goto fail_root_list;
}
log_rinfo(ls, "dlm_recover_locks %u in",
......@@ -178,7 +267,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* settings.
*/
dlm_recover_rsbs(ls);
dlm_recover_rsbs(ls, &root_list);
} else {
/*
* Other lockspace members may be going through the "neg" steps
......@@ -190,11 +279,11 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
error = dlm_recover_locks_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
goto fail;
goto fail_root_list;
}
}
dlm_release_root_list(ls);
dlm_release_root_list(&root_list);
/*
* Purge directory-related requests that are saved in requestqueue.
......@@ -243,8 +332,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
return 0;
fail_root_list:
dlm_release_root_list(&root_list);
fail:
dlm_release_root_list(ls);
mutex_unlock(&ls->ls_recoverd_active);
return error;
......@@ -259,12 +349,12 @@ static void do_ls_recovery(struct dlm_ls *ls)
struct dlm_recover *rv = NULL;
int error;
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
rv = ls->ls_recover_args;
ls->ls_recover_args = NULL;
if (rv && ls->ls_recover_seq == rv->seq)
clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
if (rv) {
error = ls_recover(ls, rv);
......
......@@ -37,7 +37,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
int length = le16_to_cpu(ms->m_header.h_length) -
sizeof(struct dlm_message);
e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS);
e = kmalloc(sizeof(struct rq_entry) + length, GFP_ATOMIC);
if (!e) {
log_print("dlm_add_requestqueue: out of memory len %d", length);
return;
......@@ -48,10 +48,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
memcpy(&e->request, ms, sizeof(*ms));
memcpy(&e->request.m_extra, ms->m_extra, length);
atomic_inc(&ls->ls_requestqueue_cnt);
mutex_lock(&ls->ls_requestqueue_mutex);
list_add_tail(&e->list, &ls->ls_requestqueue);
mutex_unlock(&ls->ls_requestqueue_mutex);
}
/*
......@@ -71,16 +68,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms;
int error = 0;
mutex_lock(&ls->ls_requestqueue_mutex);
write_lock_bh(&ls->ls_requestqueue_lock);
for (;;) {
if (list_empty(&ls->ls_requestqueue)) {
mutex_unlock(&ls->ls_requestqueue_mutex);
clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
error = 0;
break;
}
e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
mutex_unlock(&ls->ls_requestqueue_mutex);
e = list_first_entry(&ls->ls_requestqueue, struct rq_entry, list);
ms = &e->request;
......@@ -93,41 +88,23 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
e->recover_seq);
dlm_receive_message_saved(ls, &e->request, e->recover_seq);
mutex_lock(&ls->ls_requestqueue_mutex);
list_del(&e->list);
if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
wake_up(&ls->ls_requestqueue_wait);
kfree(e);
if (dlm_locking_stopped(ls)) {
log_debug(ls, "process_requestqueue abort running");
mutex_unlock(&ls->ls_requestqueue_mutex);
error = -EINTR;
break;
}
write_unlock_bh(&ls->ls_requestqueue_lock);
schedule();
write_lock_bh(&ls->ls_requestqueue_lock);
}
write_unlock_bh(&ls->ls_requestqueue_lock);
return error;
}
/*
* After recovery is done, locking is resumed and dlm_recoverd takes all the
* saved requests and processes them as they would have been by dlm_recv. At
* the same time, dlm_recv will start receiving new requests from remote nodes.
* We want to delay dlm_recv processing new requests until dlm_recoverd has
* finished processing the old saved requests. We don't check for locking
* stopped here because dlm_ls_stop won't stop locking until it's suspended us
* (dlm_recv).
*/
void dlm_wait_requestqueue(struct dlm_ls *ls)
{
wait_event(ls->ls_requestqueue_wait,
atomic_read(&ls->ls_requestqueue_cnt) == 0);
}
static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
{
__le32 type = ms->m_type;
......@@ -158,17 +135,15 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms;
struct rq_entry *e, *safe;
mutex_lock(&ls->ls_requestqueue_mutex);
write_lock_bh(&ls->ls_requestqueue_lock);
list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
ms = &e->request;
if (purge_request(ls, ms, e->nodeid)) {
list_del(&e->list);
if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
wake_up(&ls->ls_requestqueue_wait);
kfree(e);
}
}
mutex_unlock(&ls->ls_requestqueue_mutex);
write_unlock_bh(&ls->ls_requestqueue_lock);
}
This diff is collapsed.
......@@ -189,29 +189,25 @@ TRACE_EVENT(dlm_lock_end,
TRACE_EVENT(dlm_bast,
TP_PROTO(struct dlm_ls *ls, struct dlm_lkb *lkb, int mode),
TP_PROTO(__u32 ls_id, __u32 lkb_id, int mode,
const char *res_name, size_t res_length),
TP_ARGS(ls, lkb, mode),
TP_ARGS(ls_id, lkb_id, mode, res_name, res_length),
TP_STRUCT__entry(
__field(__u32, ls_id)
__field(__u32, lkb_id)
__field(int, mode)
__dynamic_array(unsigned char, res_name,
lkb->lkb_resource ? lkb->lkb_resource->res_length : 0)
__dynamic_array(unsigned char, res_name, res_length)
),
TP_fast_assign(
struct dlm_rsb *r;
__entry->ls_id = ls->ls_global_id;
__entry->lkb_id = lkb->lkb_id;
__entry->ls_id = ls_id;
__entry->lkb_id = lkb_id;
__entry->mode = mode;
r = lkb->lkb_resource;
if (r)
memcpy(__get_dynamic_array(res_name), r->res_name,
__get_dynamic_array_len(res_name));
memcpy(__get_dynamic_array(res_name), res_name,
__get_dynamic_array_len(res_name));
),
TP_printk("ls_id=%u lkb_id=%x mode=%s res_name=%s",
......@@ -224,31 +220,27 @@ TRACE_EVENT(dlm_bast,
TRACE_EVENT(dlm_ast,
TP_PROTO(struct dlm_ls *ls, struct dlm_lkb *lkb),
TP_PROTO(__u32 ls_id, __u32 lkb_id, __u8 sb_flags, int sb_status,
const char *res_name, size_t res_length),
TP_ARGS(ls, lkb),
TP_ARGS(ls_id, lkb_id, sb_flags, sb_status, res_name, res_length),
TP_STRUCT__entry(
__field(__u32, ls_id)
__field(__u32, lkb_id)
__field(u8, sb_flags)
__field(__u8, sb_flags)
__field(int, sb_status)
__dynamic_array(unsigned char, res_name,
lkb->lkb_resource ? lkb->lkb_resource->res_length : 0)
__dynamic_array(unsigned char, res_name, res_length)
),
TP_fast_assign(
struct dlm_rsb *r;
__entry->ls_id = ls->ls_global_id;
__entry->lkb_id = lkb->lkb_id;
__entry->sb_flags = lkb->lkb_lksb->sb_flags;
__entry->sb_status = lkb->lkb_lksb->sb_status;
__entry->ls_id = ls_id;
__entry->lkb_id = lkb_id;
__entry->sb_flags = sb_flags;
__entry->sb_status = sb_status;
r = lkb->lkb_resource;
if (r)
memcpy(__get_dynamic_array(res_name), r->res_name,
__get_dynamic_array_len(res_name));
memcpy(__get_dynamic_array(res_name), res_name,
__get_dynamic_array_len(res_name));
),
TP_printk("ls_id=%u lkb_id=%x sb_flags=%s sb_status=%d res_name=%s",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment