Commit 97971df8 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-6.2' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "These patches include the usual cleanups and minor fixes, the removal
  of code that is no longer needed due to recent improvements, and
  improvements to processing large volumes of messages during heavy
  locking activity.

  Summary:

   - Misc code cleanup

   - Fix a couple of socket handling bugs: a double release on an error
     path and a data-ready race in an accept loop

   - Remove code for resending dir-remove messages. This code is no
     longer needed since the midcomms layer now ensures the messages are
     resent if needed

   - Add tracepoints for dlm messages

   - Improve callback queueing by replacing the fixed array with a list

   - Simplify the handling of a remove message followed by a lookup
     message by sending both without releasing a spinlock in between

   - Improve the concurrency of sending and receiving messages by
     holding locks for a shorter time, and changing how workqueues are
     used

   - Remove old code for shutting down sockets, which is no longer
     needed with the reliable connection handling that was recently
     added"

* tag 'dlm-6.2' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm: (37 commits)
  fs: dlm: fix building without lockdep
  fs: dlm: parallelize lowcomms socket handling
  fs: dlm: don't init error value
  fs: dlm: use saved sk_error_report()
  fs: dlm: use sock2con without checking null
  fs: dlm: remove dlm_node_addrs lookup list
  fs: dlm: don't put dlm_local_addrs on heap
  fs: dlm: cleanup listen sock handling
  fs: dlm: remove socket shutdown handling
  fs: dlm: use listen sock as dlm running indicator
  fs: dlm: use list_first_entry_or_null
  fs: dlm: remove twice INIT_WORK
  fs: dlm: add midcomms init/start functions
  fs: dlm: add dst nodeid for msg tracing
  fs: dlm: rename seq to h_seq for msg tracing
  fs: dlm: rename DLM_IFL_NEED_SCHED to DLM_IFL_CB_PENDING
  fs: dlm: ast do WARN_ON_ONCE() on hotpath
  fs: dlm: drop lkb ref in bug case
  fs: dlm: avoid false-positive checker warning
  fs: dlm: use WARN_ON_ONCE() instead of WARN_ON()
  ...
parents 56c003e4 7a5e9f1f
This diff is collapsed.
......@@ -11,13 +11,22 @@
#ifndef __ASTD_DOT_H__
#define __ASTD_DOT_H__
int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags, uint64_t seq);
int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_callback *cb, int *resid);
#define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1
#define DLM_ENQUEUE_CALLBACK_SUCCESS 0
#define DLM_ENQUEUE_CALLBACK_FAILURE -1
int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags);
#define DLM_DEQUEUE_CALLBACK_EMPTY 2
#define DLM_DEQUEUE_CALLBACK_LAST 1
#define DLM_DEQUEUE_CALLBACK_SUCCESS 0
int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb);
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
uint32_t sbflags);
void dlm_callback_set_last_ptr(struct dlm_callback **from,
struct dlm_callback *to);
void dlm_release_callback(struct kref *ref);
void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb);
void dlm_callback_work(struct work_struct *work);
int dlm_callback_start(struct dlm_ls *ls);
void dlm_callback_stop(struct dlm_ls *ls);
......
......@@ -183,7 +183,7 @@ static int dlm_check_protocol_and_dlm_running(unsigned int x)
return -EINVAL;
}
if (dlm_allow_conn)
if (dlm_lowcomms_is_running())
return -EBUSY;
return 0;
......@@ -194,7 +194,7 @@ static int dlm_check_zero_and_dlm_running(unsigned int x)
if (!x)
return -EINVAL;
if (dlm_allow_conn)
if (dlm_lowcomms_is_running())
return -EBUSY;
return 0;
......
......@@ -246,7 +246,7 @@ static void print_format3_lock(struct seq_file *s, struct dlm_lkb *lkb,
lkb->lkb_status,
lkb->lkb_grmode,
lkb->lkb_rqmode,
lkb->lkb_last_bast.mode,
lkb->lkb_last_bast_mode,
rsb_lookup,
lkb->lkb_wait_type,
lkb->lkb_lvbseq,
......
......@@ -211,6 +211,7 @@ struct dlm_args {
#endif
#define DLM_IFL_DEADLOCK_CANCEL 0x01000000
#define DLM_IFL_STUB_MS 0x02000000 /* magic number for m_flags */
#define DLM_IFL_CB_PENDING 0x04000000
/* least significant 2 bytes are message changed, they are full transmitted
* but at receive side only the 2 bytes LSB will be set.
*
......@@ -222,18 +223,17 @@ struct dlm_args {
#define DLM_IFL_USER 0x00000001
#define DLM_IFL_ORPHAN 0x00000002
#define DLM_CALLBACKS_SIZE 6
#define DLM_CB_CAST 0x00000001
#define DLM_CB_BAST 0x00000002
#define DLM_CB_SKIP 0x00000004
struct dlm_callback {
uint64_t seq;
uint32_t flags; /* DLM_CBF_ */
int sb_status; /* copy to lksb status */
uint8_t sb_flags; /* copy to lksb flags */
int8_t mode; /* rq mode of bast, gr mode of cast */
struct list_head list;
struct kref ref;
};
struct dlm_lkb {
......@@ -268,12 +268,13 @@ struct dlm_lkb {
unsigned long lkb_timeout_cs;
#endif
struct mutex lkb_cb_mutex;
spinlock_t lkb_cb_lock;
struct work_struct lkb_cb_work;
struct list_head lkb_cb_list; /* for ls_cb_delay or proc->asts */
struct dlm_callback lkb_callbacks[DLM_CALLBACKS_SIZE];
struct dlm_callback lkb_last_cast;
struct dlm_callback lkb_last_bast;
struct list_head lkb_callbacks;
struct dlm_callback *lkb_last_cast;
struct dlm_callback *lkb_last_cb;
int lkb_last_bast_mode;
ktime_t lkb_last_cast_time; /* for debugging */
ktime_t lkb_last_bast_time; /* for debugging */
......@@ -591,11 +592,7 @@ struct dlm_ls {
int ls_new_rsb_count;
struct list_head ls_new_rsb; /* new rsb structs */
spinlock_t ls_remove_spin;
wait_queue_head_t ls_remove_wait;
char ls_remove_name[DLM_RESNAME_MAXLEN+1];
char *ls_remove_names[DLM_REMOVE_NAMES_MAX];
int ls_remove_len;
int ls_remove_lens[DLM_REMOVE_NAMES_MAX];
struct list_head ls_nodes; /* current nodes in ls */
......@@ -631,7 +628,7 @@ struct dlm_ls {
/* recovery related */
struct mutex ls_cb_mutex;
spinlock_t ls_cb_lock;
struct list_head ls_cb_delay; /* save for queue_work later */
struct timer_list ls_timer;
struct task_struct *ls_recoverd_task;
......@@ -670,7 +667,7 @@ struct dlm_ls {
void *ls_ops_arg;
int ls_namelen;
char ls_name[1];
char ls_name[DLM_LOCKSPACE_LEN + 1];
};
/*
......
This diff is collapsed.
......@@ -17,7 +17,6 @@
#include "recoverd.h"
#include "dir.h"
#include "midcomms.h"
#include "lowcomms.h"
#include "config.h"
#include "memory.h"
#include "lock.h"
......@@ -391,7 +390,7 @@ static int threads_start(void)
/* Thread for sending/receiving messages for all lockspace's */
error = dlm_midcomms_start();
if (error) {
log_print("cannot start dlm lowcomms %d", error);
log_print("cannot start dlm midcomms %d", error);
goto scand_fail;
}
......@@ -473,7 +472,7 @@ static int new_lockspace(const char *name, const char *cluster,
error = -ENOMEM;
ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
ls = kzalloc(sizeof(*ls), GFP_NOFS);
if (!ls)
goto out;
memcpy(ls->ls_name, name, namelen);
......@@ -524,9 +523,6 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock_init(&ls->ls_rsbtbl[i].lock);
}
spin_lock_init(&ls->ls_remove_spin);
init_waitqueue_head(&ls->ls_remove_wait);
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
GFP_KERNEL);
......@@ -567,7 +563,7 @@ static int new_lockspace(const char *name, const char *cluster,
init_completion(&ls->ls_recovery_done);
ls->ls_recovery_result = -1;
mutex_init(&ls->ls_cb_mutex);
spin_lock_init(&ls->ls_cb_lock);
INIT_LIST_HEAD(&ls->ls_cb_delay);
ls->ls_recoverd_task = NULL;
......@@ -726,7 +722,7 @@ static int __dlm_new_lockspace(const char *name, const char *cluster,
if (!ls_count) {
dlm_scand_stop();
dlm_midcomms_shutdown();
dlm_lowcomms_stop();
dlm_midcomms_stop();
}
out:
mutex_unlock(&ls_lock);
......@@ -929,7 +925,7 @@ int dlm_release_lockspace(void *lockspace, int force)
if (!error)
ls_count--;
if (!ls_count)
dlm_lowcomms_stop();
dlm_midcomms_stop();
mutex_unlock(&ls_lock);
return error;
......
This diff is collapsed.
......@@ -29,12 +29,14 @@ static inline int nodeid_hash(int nodeid)
return nodeid & (CONN_HASH_SIZE-1);
}
/* switch to check if dlm is running */
extern int dlm_allow_conn;
/* check if dlm is running */
bool dlm_lowcomms_is_running(void);
int dlm_lowcomms_start(void);
void dlm_lowcomms_shutdown(void);
void dlm_lowcomms_shutdown_node(int nodeid, bool force);
void dlm_lowcomms_stop(void);
void dlm_lowcomms_init(void);
void dlm_lowcomms_exit(void);
int dlm_lowcomms_close(int nodeid);
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
......
......@@ -17,7 +17,7 @@
#include "user.h"
#include "memory.h"
#include "config.h"
#include "lowcomms.h"
#include "midcomms.h"
#define CREATE_TRACE_POINTS
#include <trace/events/dlm.h>
......@@ -30,6 +30,8 @@ static int __init init_dlm(void)
if (error)
goto out;
dlm_midcomms_init();
error = dlm_lockspace_init();
if (error)
goto out_mem;
......@@ -66,6 +68,7 @@ static int __init init_dlm(void)
out_lockspace:
dlm_lockspace_exit();
out_mem:
dlm_midcomms_exit();
dlm_memory_exit();
out:
return error;
......@@ -79,7 +82,7 @@ static void __exit exit_dlm(void)
dlm_config_exit();
dlm_memory_exit();
dlm_lockspace_exit();
dlm_lowcomms_exit();
dlm_midcomms_exit();
dlm_unregister_debugfs();
}
......
......@@ -573,7 +573,10 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
node = &rv->nodes[i];
if (dlm_is_member(ls, node->nodeid))
continue;
dlm_add_member(ls, node);
error = dlm_add_member(ls, node);
if (error)
return error;
log_rinfo(ls, "add member %d", node->nodeid);
}
......
......@@ -14,12 +14,14 @@
#include "lowcomms.h"
#include "config.h"
#include "memory.h"
#include "ast.h"
static struct kmem_cache *writequeue_cache;
static struct kmem_cache *mhandle_cache;
static struct kmem_cache *msg_cache;
static struct kmem_cache *lkb_cache;
static struct kmem_cache *rsb_cache;
static struct kmem_cache *cb_cache;
int __init dlm_memory_init(void)
......@@ -46,8 +48,16 @@ int __init dlm_memory_init(void)
if (!rsb_cache)
goto rsb;
cb_cache = kmem_cache_create("dlm_cb", sizeof(struct dlm_callback),
__alignof__(struct dlm_callback), 0,
NULL);
if (!rsb_cache)
goto cb;
return 0;
cb:
kmem_cache_destroy(rsb_cache);
rsb:
kmem_cache_destroy(msg_cache);
msg:
......@@ -67,6 +77,7 @@ void dlm_memory_exit(void)
kmem_cache_destroy(msg_cache);
kmem_cache_destroy(lkb_cache);
kmem_cache_destroy(rsb_cache);
kmem_cache_destroy(cb_cache);
}
char *dlm_allocate_lvb(struct dlm_ls *ls)
......@@ -115,12 +126,17 @@ void dlm_free_lkb(struct dlm_lkb *lkb)
kfree(ua);
}
}
/* drop references if they are set */
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
kmem_cache_free(lkb_cache, lkb);
}
struct dlm_mhandle *dlm_allocate_mhandle(void)
struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation)
{
return kmem_cache_alloc(mhandle_cache, GFP_NOFS);
return kmem_cache_alloc(mhandle_cache, allocation);
}
void dlm_free_mhandle(struct dlm_mhandle *mhandle)
......@@ -147,3 +163,13 @@ void dlm_free_msg(struct dlm_msg *msg)
{
kmem_cache_free(msg_cache, msg);
}
struct dlm_callback *dlm_allocate_cb(void)
{
return kmem_cache_alloc(cb_cache, GFP_ATOMIC);
}
void dlm_free_cb(struct dlm_callback *cb)
{
kmem_cache_free(cb_cache, cb);
}
......@@ -20,12 +20,14 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
void dlm_free_lkb(struct dlm_lkb *l);
char *dlm_allocate_lvb(struct dlm_ls *ls);
void dlm_free_lvb(char *l);
struct dlm_mhandle *dlm_allocate_mhandle(void);
struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation);
void dlm_free_mhandle(struct dlm_mhandle *mhandle);
struct writequeue_entry *dlm_allocate_writequeue(void);
void dlm_free_writequeue(struct writequeue_entry *writequeue);
struct dlm_msg *dlm_allocate_msg(gfp_t allocation);
void dlm_free_msg(struct dlm_msg *msg);
struct dlm_callback *dlm_allocate_cb(void);
void dlm_free_cb(struct dlm_callback *cb);
#endif /* __MEMORY_DOT_H__ */
......@@ -132,6 +132,7 @@
*/
#define DLM_DEBUG_FENCE_TERMINATION 0
#include <trace/events/dlm.h>
#include <net/tcp.h>
#include "dlm_internal.h"
......@@ -194,7 +195,7 @@ struct midcomms_node {
};
struct dlm_mhandle {
const struct dlm_header *inner_hd;
const union dlm_packet *inner_p;
struct midcomms_node *node;
struct dlm_opts *opts;
struct dlm_msg *msg;
......@@ -305,11 +306,11 @@ static void dlm_send_queue_flush(struct midcomms_node *node)
pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
rcu_read_lock();
spin_lock(&node->send_queue_lock);
spin_lock_bh(&node->send_queue_lock);
list_for_each_entry_rcu(mh, &node->send_queue, list) {
dlm_mhandle_delete(node, mh);
}
spin_unlock(&node->send_queue_lock);
spin_unlock_bh(&node->send_queue_lock);
rcu_read_unlock();
}
......@@ -415,7 +416,7 @@ static int dlm_send_fin(struct midcomms_node *node,
m_header->h_cmd = DLM_FIN;
pr_debug("sending fin msg to node %d\n", node->nodeid);
dlm_midcomms_commit_mhandle(mh);
dlm_midcomms_commit_mhandle(mh, NULL, 0);
set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
return 0;
......@@ -436,7 +437,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
}
}
spin_lock(&node->send_queue_lock);
spin_lock_bh(&node->send_queue_lock);
list_for_each_entry_rcu(mh, &node->send_queue, list) {
if (before(mh->seq, seq)) {
dlm_mhandle_delete(node, mh);
......@@ -445,7 +446,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
break;
}
}
spin_unlock(&node->send_queue_lock);
spin_unlock_bh(&node->send_queue_lock);
rcu_read_unlock();
}
......@@ -468,12 +469,26 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
spin_unlock(&node->state_lock);
log_print("%s: unexpected state: %d\n",
__func__, node->state);
WARN_ON(1);
WARN_ON_ONCE(1);
return;
}
spin_unlock(&node->state_lock);
}
static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p)
{
switch (p->header.h_cmd) {
case DLM_MSG:
trace_dlm_recv_message(dlm_our_nodeid(), seq, &p->message);
break;
case DLM_RCOM:
trace_dlm_recv_rcom(dlm_our_nodeid(), seq, &p->rcom);
break;
default:
break;
}
}
static void dlm_midcomms_receive_buffer(union dlm_packet *p,
struct midcomms_node *node,
uint32_t seq)
......@@ -525,7 +540,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
spin_unlock(&node->state_lock);
log_print("%s: unexpected state: %d\n",
__func__, node->state);
WARN_ON(1);
WARN_ON_ONCE(1);
return;
}
spin_unlock(&node->state_lock);
......@@ -533,7 +548,8 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
break;
default:
WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
dlm_receive_buffer_3_2_trace(seq, p);
dlm_receive_buffer(p, node->nodeid);
set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
break;
......@@ -754,7 +770,7 @@ static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
goto out;
}
WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
dlm_receive_buffer(p, nodeid);
break;
case DLM_OPTS:
......@@ -874,12 +890,7 @@ static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
dlm_receive_buffer(p, nodeid);
}
/*
* Called from the low-level comms layer to process a buffer of
* commands.
*/
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len)
{
const unsigned char *ptr = buf;
const struct dlm_header *hd;
......@@ -914,6 +925,32 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
if (msglen > len)
break;
ret += msglen;
len -= msglen;
ptr += msglen;
}
return ret;
}
/*
* Called from the low-level comms layer to process a buffer of
* commands.
*/
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
{
const unsigned char *ptr = buf;
const struct dlm_header *hd;
uint16_t msglen;
int ret = 0;
while (len >= sizeof(struct dlm_header)) {
hd = (struct dlm_header *)ptr;
msglen = le16_to_cpu(hd->h_length);
if (msglen > len)
break;
switch (hd->h_version) {
case cpu_to_le32(DLM_VERSION_3_1):
dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
......@@ -1030,9 +1067,9 @@ static void midcomms_new_msg_cb(void *data)
atomic_inc(&mh->node->send_queue_cnt);
spin_lock(&mh->node->send_queue_lock);
spin_lock_bh(&mh->node->send_queue_lock);
list_add_tail_rcu(&mh->list, &mh->node->send_queue);
spin_unlock(&mh->node->send_queue_lock);
spin_unlock_bh(&mh->node->send_queue_lock);
mh->seq = mh->node->seq_send++;
}
......@@ -1055,7 +1092,7 @@ static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int node
dlm_fill_opts_header(opts, len, mh->seq);
*ppc += sizeof(*opts);
mh->inner_hd = (const struct dlm_header *)*ppc;
mh->inner_p = (const union dlm_packet *)*ppc;
return msg;
}
......@@ -1079,9 +1116,9 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
}
/* this is a bug, however we going on and hope it will be resolved */
WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
mh = dlm_allocate_mhandle();
mh = dlm_allocate_mhandle(allocation);
if (!mh)
goto err;
......@@ -1111,7 +1148,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
break;
default:
dlm_free_mhandle(mh);
WARN_ON(1);
WARN_ON_ONCE(1);
goto err;
}
......@@ -1130,11 +1167,32 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
}
#endif
static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh)
static void dlm_midcomms_commit_msg_3_2_trace(const struct dlm_mhandle *mh,
const void *name, int namelen)
{
switch (mh->inner_p->header.h_cmd) {
case DLM_MSG:
trace_dlm_send_message(mh->node->nodeid, mh->seq,
&mh->inner_p->message,
name, namelen);
break;
case DLM_RCOM:
trace_dlm_send_rcom(mh->node->nodeid, mh->seq,
&mh->inner_p->rcom);
break;
default:
/* nothing to trace */
break;
}
}
static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh,
const void *name, int namelen)
{
/* nexthdr chain for fast lookup */
mh->opts->o_nextcmd = mh->inner_hd->h_cmd;
mh->opts->o_nextcmd = mh->inner_p->header.h_cmd;
mh->committed = true;
dlm_midcomms_commit_msg_3_2_trace(mh, name, namelen);
dlm_lowcomms_commit_msg(mh->msg);
}
......@@ -1142,8 +1200,10 @@ static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh)
* dlm_midcomms_get_mhandle
*/
#ifndef __CHECKER__
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh,
const void *name, int namelen)
{
switch (mh->node->version) {
case DLM_VERSION_3_1:
srcu_read_unlock(&nodes_srcu, mh->idx);
......@@ -1154,25 +1214,40 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
dlm_free_mhandle(mh);
break;
case DLM_VERSION_3_2:
dlm_midcomms_commit_msg_3_2(mh);
dlm_midcomms_commit_msg_3_2(mh, name, namelen);
srcu_read_unlock(&nodes_srcu, mh->idx);
break;
default:
srcu_read_unlock(&nodes_srcu, mh->idx);
WARN_ON(1);
WARN_ON_ONCE(1);
break;
}
}
#endif
int dlm_midcomms_start(void)
{
return dlm_lowcomms_start();
}
void dlm_midcomms_stop(void)
{
dlm_lowcomms_stop();
}
void dlm_midcomms_init(void)
{
int i;
for (i = 0; i < CONN_HASH_SIZE; i++)
INIT_HLIST_HEAD(&node_hash[i]);
return dlm_lowcomms_start();
dlm_lowcomms_init();
}
void dlm_midcomms_exit(void)
{
dlm_lowcomms_exit();
}
static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
......@@ -1201,7 +1276,7 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
spin_unlock(&node->state_lock);
log_print("%s: unexpected state: %d\n",
__func__, node->state);
WARN_ON(1);
WARN_ON_ONCE(1);
return;
}
spin_unlock(&node->state_lock);
......@@ -1319,7 +1394,7 @@ static void midcomms_node_release(struct rcu_head *rcu)
{
struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
WARN_ON(atomic_read(&node->send_queue_cnt));
WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
kfree(node);
}
......@@ -1372,11 +1447,13 @@ static void midcomms_shutdown(struct midcomms_node *node)
pr_debug("active shutdown timed out for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
midcomms_node_reset(node);
dlm_lowcomms_shutdown_node(node->nodeid, true);
return;
}
pr_debug("active shutdown done for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
dlm_lowcomms_shutdown_node(node->nodeid, false);
}
void dlm_midcomms_shutdown(void)
......@@ -1384,6 +1461,8 @@ void dlm_midcomms_shutdown(void)
struct midcomms_node *node;
int i, idx;
dlm_lowcomms_shutdown();
mutex_lock(&close_lock);
idx = srcu_read_lock(&nodes_srcu);
for (i = 0; i < CONN_HASH_SIZE; i++) {
......@@ -1401,8 +1480,6 @@ void dlm_midcomms_shutdown(void)
}
srcu_read_unlock(&nodes_srcu, idx);
mutex_unlock(&close_lock);
dlm_lowcomms_shutdown();
}
int dlm_midcomms_close(int nodeid)
......
......@@ -14,12 +14,17 @@
struct midcomms_node;
int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len);
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc);
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh);
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
int namelen);
int dlm_midcomms_close(int nodeid);
int dlm_midcomms_start(void);
void dlm_midcomms_stop(void);
void dlm_midcomms_init(void);
void dlm_midcomms_exit(void);
void dlm_midcomms_shutdown(void);
void dlm_midcomms_add_member(int nodeid);
void dlm_midcomms_remove_member(int nodeid);
......
......@@ -91,7 +91,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
{
dlm_midcomms_commit_mhandle(mh);
dlm_midcomms_commit_mhandle(mh, NULL, 0);
}
static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
......@@ -516,7 +516,7 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
rf = (struct rcom_config *) rc->rc_buf;
rf->rf_lvblen = cpu_to_le32(~0U);
dlm_midcomms_commit_mhandle(mh);
dlm_midcomms_commit_mhandle(mh, NULL, 0);
return 0;
}
......
......@@ -44,7 +44,8 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
e->nodeid = nodeid;
memcpy(&e->request, ms, le16_to_cpu(ms->m_header.h_length));
memcpy(&e->request, ms, sizeof(*ms));
memcpy(&e->request.m_extra, ms->m_extra, length);
atomic_inc(&ls->ls_requestqueue_cnt);
mutex_lock(&ls->ls_requestqueue_mutex);
......
......@@ -25,6 +25,7 @@
#include "user.h"
#include "ast.h"
#include "config.h"
#include "memory.h"
static const char name_prefix[] = "dlm";
static const struct file_operations device_fops;
......@@ -175,7 +176,7 @@ static int lkb_is_endoflife(int mode, int status)
being removed and then remove that lkb from the orphans list and free it */
void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags, uint64_t seq)
int status, uint32_t sbflags)
{
struct dlm_ls *ls;
struct dlm_user_args *ua;
......@@ -209,16 +210,22 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
spin_lock(&proc->asts_spin);
rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq);
if (rv < 0) {
rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
switch (rv) {
case DLM_ENQUEUE_CALLBACK_FAILURE:
spin_unlock(&proc->asts_spin);
WARN_ON_ONCE(1);
goto out;
}
if (list_empty(&lkb->lkb_cb_list)) {
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
kref_get(&lkb->lkb_ref);
list_add_tail(&lkb->lkb_cb_list, &proc->asts);
wake_up_interruptible(&proc->wait);
break;
case DLM_ENQUEUE_CALLBACK_SUCCESS:
break;
default:
WARN_ON_ONCE(1);
break;
}
spin_unlock(&proc->asts_spin);
......@@ -800,8 +807,8 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
struct dlm_user_proc *proc = file->private_data;
struct dlm_lkb *lkb;
DECLARE_WAITQUEUE(wait, current);
struct dlm_callback cb;
int rv, resid, copy_lvb = 0;
struct dlm_callback *cb;
int rv, copy_lvb = 0;
int old_mode, new_mode;
if (count == sizeof(struct dlm_device_version)) {
......@@ -857,53 +864,58 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
without removing lkb_cb_list; so empty lkb_cb_list is always
consistent with empty lkb_callbacks */
lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_cb_list);
lkb = list_first_entry(&proc->asts, struct dlm_lkb, lkb_cb_list);
/* rem_lkb_callback sets a new lkb_last_cast */
old_mode = lkb->lkb_last_cast.mode;
old_mode = lkb->lkb_last_cast->mode;
rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid);
if (rv < 0) {
rv = dlm_dequeue_lkb_callback(lkb, &cb);
switch (rv) {
case DLM_DEQUEUE_CALLBACK_EMPTY:
/* this shouldn't happen; lkb should have been removed from
list when resid was zero */
* list when last item was dequeued
*/
log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
list_del_init(&lkb->lkb_cb_list);
spin_unlock(&proc->asts_spin);
/* removes ref for proc->asts, may cause lkb to be freed */
dlm_put_lkb(lkb);
WARN_ON_ONCE(1);
goto try_another;
}
if (!resid)
case DLM_DEQUEUE_CALLBACK_LAST:
list_del_init(&lkb->lkb_cb_list);
spin_unlock(&proc->asts_spin);
if (cb.flags & DLM_CB_SKIP) {
/* removes ref for proc->asts, may cause lkb to be freed */
if (!resid)
dlm_put_lkb(lkb);
goto try_another;
lkb->lkb_flags &= ~DLM_IFL_CB_PENDING;
break;
case DLM_DEQUEUE_CALLBACK_SUCCESS:
break;
default:
WARN_ON_ONCE(1);
break;
}
spin_unlock(&proc->asts_spin);
if (cb.flags & DLM_CB_BAST) {
trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb.mode);
} else if (cb.flags & DLM_CB_CAST) {
new_mode = cb.mode;
if (cb->flags & DLM_CB_BAST) {
trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb->mode);
} else if (cb->flags & DLM_CB_CAST) {
new_mode = cb->mode;
if (!cb.sb_status && lkb->lkb_lksb->sb_lvbptr &&
if (!cb->sb_status && lkb->lkb_lksb->sb_lvbptr &&
dlm_lvb_operations[old_mode + 1][new_mode + 1])
copy_lvb = 1;
lkb->lkb_lksb->sb_status = cb.sb_status;
lkb->lkb_lksb->sb_flags = cb.sb_flags;
lkb->lkb_lksb->sb_status = cb->sb_status;
lkb->lkb_lksb->sb_flags = cb->sb_flags;
trace_dlm_ast(lkb->lkb_resource->res_ls, lkb);
}
rv = copy_result_to_user(lkb->lkb_ua,
test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
cb.flags, cb.mode, copy_lvb, buf, count);
cb->flags, cb->mode, copy_lvb, buf, count);
kref_put(&cb->ref, dlm_release_callback);
/* removes ref for proc->asts, may cause lkb to be freed */
if (!resid)
if (rv == DLM_DEQUEUE_CALLBACK_LAST)
dlm_put_lkb(lkb);
return rv;
......
......@@ -7,7 +7,7 @@
#define __USER_DOT_H__
void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags, uint64_t seq);
int status, uint32_t sbflags);
int dlm_user_init(void);
void dlm_user_exit(void);
int dlm_device_deregister(struct dlm_ls *ls);
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment