Commit c288745f authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

dlm: avoid blocking receive at the end of recovery

The end of the recovery process transitioned to normal message
processing by temporarily blocking the receiving context,
processing saved messages, then unblocking the receiving
context.  To avoid blocking the receiving context, the old
wait_queue and mutex are replaced by a new rwlock and the new
RECV_MSG_BLOCKED flag.  Received messages are added to the
list of saved messages, protected by the rwlock, until the
flag is cleared, which happens when all saved messages have
been processed.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent cc396e23
...@@ -655,9 +655,7 @@ struct dlm_ls { ...@@ -655,9 +655,7 @@ struct dlm_ls {
struct rw_semaphore ls_in_recovery; /* block local requests */ struct rw_semaphore ls_in_recovery; /* block local requests */
struct rw_semaphore ls_recv_active; /* block dlm_recv */ struct rw_semaphore ls_recv_active; /* block dlm_recv */
struct list_head ls_requestqueue;/* queue remote requests */ struct list_head ls_requestqueue;/* queue remote requests */
atomic_t ls_requestqueue_cnt; rwlock_t ls_requestqueue_lock;
wait_queue_head_t ls_requestqueue_wait;
struct mutex ls_requestqueue_mutex;
struct dlm_rcom *ls_recover_buf; struct dlm_rcom *ls_recover_buf;
int ls_recover_nodeid; /* for debugging */ int ls_recover_nodeid; /* for debugging */
unsigned int ls_recover_locks_in; /* for log info */ unsigned int ls_recover_locks_in; /* for log info */
...@@ -717,6 +715,7 @@ struct dlm_ls { ...@@ -717,6 +715,7 @@ struct dlm_ls {
#define LSFL_UEVENT_WAIT 7 #define LSFL_UEVENT_WAIT 7
#define LSFL_CB_DELAY 9 #define LSFL_CB_DELAY 9
#define LSFL_NODIR 10 #define LSFL_NODIR 10
#define LSFL_RECV_MSG_BLOCKED 11
#define DLM_PROC_FLAGS_CLOSING 1 #define DLM_PROC_FLAGS_CLOSING 1
#define DLM_PROC_FLAGS_COMPAT 2 #define DLM_PROC_FLAGS_COMPAT 2
......
...@@ -4752,20 +4752,32 @@ static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms, ...@@ -4752,20 +4752,32 @@ static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms, static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
int nodeid) int nodeid)
{ {
if (dlm_locking_stopped(ls)) { try_again:
read_lock(&ls->ls_requestqueue_lock);
if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
/* If we were a member of this lockspace, left, and rejoined, /* If we were a member of this lockspace, left, and rejoined,
other nodes may still be sending us messages from the other nodes may still be sending us messages from the
lockspace generation before we left. */ lockspace generation before we left. */
if (WARN_ON_ONCE(!ls->ls_generation)) { if (WARN_ON_ONCE(!ls->ls_generation)) {
read_unlock(&ls->ls_requestqueue_lock);
log_limit(ls, "receive %d from %d ignore old gen", log_limit(ls, "receive %d from %d ignore old gen",
le32_to_cpu(ms->m_type), nodeid); le32_to_cpu(ms->m_type), nodeid);
return; return;
} }
read_unlock(&ls->ls_requestqueue_lock);
write_lock(&ls->ls_requestqueue_lock);
/* recheck because we hold writelock now */
if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
write_unlock_bh(&ls->ls_requestqueue_lock);
goto try_again;
}
dlm_add_requestqueue(ls, nodeid, ms); dlm_add_requestqueue(ls, nodeid, ms);
write_unlock(&ls->ls_requestqueue_lock);
} else { } else {
dlm_wait_requestqueue(ls);
_receive_message(ls, ms, 0); _receive_message(ls, ms, 0);
read_unlock(&ls->ls_requestqueue_lock);
} }
} }
......
...@@ -554,9 +554,7 @@ static int new_lockspace(const char *name, const char *cluster, ...@@ -554,9 +554,7 @@ static int new_lockspace(const char *name, const char *cluster,
init_rwsem(&ls->ls_in_recovery); init_rwsem(&ls->ls_in_recovery);
init_rwsem(&ls->ls_recv_active); init_rwsem(&ls->ls_recv_active);
INIT_LIST_HEAD(&ls->ls_requestqueue); INIT_LIST_HEAD(&ls->ls_requestqueue);
atomic_set(&ls->ls_requestqueue_cnt, 0); rwlock_init(&ls->ls_requestqueue_lock);
init_waitqueue_head(&ls->ls_requestqueue_wait);
mutex_init(&ls->ls_requestqueue_mutex);
spin_lock_init(&ls->ls_clear_proc_locks); spin_lock_init(&ls->ls_clear_proc_locks);
/* Due backwards compatibility with 3.1 we need to use maximum /* Due backwards compatibility with 3.1 we need to use maximum
......
...@@ -642,6 +642,11 @@ int dlm_ls_stop(struct dlm_ls *ls) ...@@ -642,6 +642,11 @@ int dlm_ls_stop(struct dlm_ls *ls)
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
ls->ls_recover_seq++; ls->ls_recover_seq++;
/* activate requestqueue and stop processing */
write_lock(&ls->ls_requestqueue_lock);
set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
write_unlock(&ls->ls_requestqueue_lock);
spin_unlock(&ls->ls_recover_lock); spin_unlock(&ls->ls_recover_lock);
/* /*
......
...@@ -48,10 +48,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, ...@@ -48,10 +48,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
memcpy(&e->request, ms, sizeof(*ms)); memcpy(&e->request, ms, sizeof(*ms));
memcpy(&e->request.m_extra, ms->m_extra, length); memcpy(&e->request.m_extra, ms->m_extra, length);
atomic_inc(&ls->ls_requestqueue_cnt);
mutex_lock(&ls->ls_requestqueue_mutex);
list_add_tail(&e->list, &ls->ls_requestqueue); list_add_tail(&e->list, &ls->ls_requestqueue);
mutex_unlock(&ls->ls_requestqueue_mutex);
} }
/* /*
...@@ -71,16 +68,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls) ...@@ -71,16 +68,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms; struct dlm_message *ms;
int error = 0; int error = 0;
mutex_lock(&ls->ls_requestqueue_mutex); write_lock(&ls->ls_requestqueue_lock);
for (;;) { for (;;) {
if (list_empty(&ls->ls_requestqueue)) { if (list_empty(&ls->ls_requestqueue)) {
mutex_unlock(&ls->ls_requestqueue_mutex); clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
error = 0; error = 0;
break; break;
} }
e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); e = list_first_entry(&ls->ls_requestqueue, struct rq_entry, list);
mutex_unlock(&ls->ls_requestqueue_mutex);
ms = &e->request; ms = &e->request;
...@@ -93,41 +88,23 @@ int dlm_process_requestqueue(struct dlm_ls *ls) ...@@ -93,41 +88,23 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
e->recover_seq); e->recover_seq);
dlm_receive_message_saved(ls, &e->request, e->recover_seq); dlm_receive_message_saved(ls, &e->request, e->recover_seq);
mutex_lock(&ls->ls_requestqueue_mutex);
list_del(&e->list); list_del(&e->list);
if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
wake_up(&ls->ls_requestqueue_wait);
kfree(e); kfree(e);
if (dlm_locking_stopped(ls)) { if (dlm_locking_stopped(ls)) {
log_debug(ls, "process_requestqueue abort running"); log_debug(ls, "process_requestqueue abort running");
mutex_unlock(&ls->ls_requestqueue_mutex);
error = -EINTR; error = -EINTR;
break; break;
} }
write_unlock(&ls->ls_requestqueue_lock);
schedule(); schedule();
write_lock(&ls->ls_requestqueue_lock);
} }
write_unlock(&ls->ls_requestqueue_lock);
return error; return error;
} }
/*
* After recovery is done, locking is resumed and dlm_recoverd takes all the
* saved requests and processes them as they would have been by dlm_recv. At
* the same time, dlm_recv will start receiving new requests from remote nodes.
* We want to delay dlm_recv processing new requests until dlm_recoverd has
* finished processing the old saved requests. We don't check for locking
* stopped here because dlm_ls_stop won't stop locking until it's suspended us
* (dlm_recv).
*/
void dlm_wait_requestqueue(struct dlm_ls *ls)
{
wait_event(ls->ls_requestqueue_wait,
atomic_read(&ls->ls_requestqueue_cnt) == 0);
}
static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
{ {
__le32 type = ms->m_type; __le32 type = ms->m_type;
...@@ -158,17 +135,15 @@ void dlm_purge_requestqueue(struct dlm_ls *ls) ...@@ -158,17 +135,15 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms; struct dlm_message *ms;
struct rq_entry *e, *safe; struct rq_entry *e, *safe;
mutex_lock(&ls->ls_requestqueue_mutex); write_lock(&ls->ls_requestqueue_lock);
list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) { list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
ms = &e->request; ms = &e->request;
if (purge_request(ls, ms, e->nodeid)) { if (purge_request(ls, ms, e->nodeid)) {
list_del(&e->list); list_del(&e->list);
if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
wake_up(&ls->ls_requestqueue_wait);
kfree(e); kfree(e);
} }
} }
mutex_unlock(&ls->ls_requestqueue_mutex); write_unlock(&ls->ls_requestqueue_lock);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment