Commit 3872f87b authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

fs: dlm: remove ls_remove_wait waitqueue

This patch removes the ls_remove_wait waitqueue handling. The current
handling tries to wait before a lookup is send out for a identically
resource name which is going to be removed. Hereby the remove message
should be send out before the new lookup message. The reason is that
after a lookup request and response will actually use the specific
remote rsb. A followed remove message would delete the rsb on the remote
side but it's still being used.

To reach a similar behaviour we simple send the remove message out while
the rsb lookup lock is held and the rsb is removed from the toss list.
Other find_rsb() calls would never have the change to get a rsb back to
live while a remove message will be send out (without holding the lock).

This behaviour requires a non-sleepable context which should be provided
now and might be the reason why it was not implemented so in the first
place.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent e1711fe3
......@@ -592,11 +592,7 @@ struct dlm_ls {
int ls_new_rsb_count;
struct list_head ls_new_rsb; /* new rsb structs */
spinlock_t ls_remove_spin;
wait_queue_head_t ls_remove_wait;
char ls_remove_name[DLM_RESNAME_MAXLEN+1];
char *ls_remove_names[DLM_REMOVE_NAMES_MAX];
int ls_remove_len;
int ls_remove_lens[DLM_REMOVE_NAMES_MAX];
struct list_head ls_nodes; /* current nodes in ls */
......
......@@ -1589,37 +1589,6 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
return error;
}
/* If there's an rsb for the same resource being removed, ensure
* that the remove message is sent before the new lookup message.
*/
#define DLM_WAIT_PENDING_COND(ls, r) \
(ls->ls_remove_len && \
!rsb_cmp(r, ls->ls_remove_name, \
ls->ls_remove_len))
static void wait_pending_remove(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
restart:
spin_lock(&ls->ls_remove_spin);
if (DLM_WAIT_PENDING_COND(ls, r)) {
log_debug(ls, "delay lookup for remove dir %d %s",
r->res_dir_nodeid, r->res_name);
spin_unlock(&ls->ls_remove_spin);
wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r));
goto restart;
}
spin_unlock(&ls->ls_remove_spin);
}
/*
* ls_remove_spin protects ls_remove_name and ls_remove_len which are
* read by other threads in wait_pending_remove. ls_remove_names
* and ls_remove_lens are only used by the scan thread, so they do
* not need protection.
*/
static void shrink_bucket(struct dlm_ls *ls, int b)
{
struct rb_node *n, *next;
......@@ -1701,11 +1670,6 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
* list and sending the removal. Keeping this gap small is
* important to keep us (the master node) from being out of sync
* with the remote dir node for very long.
*
* From the time the rsb is removed from toss until just after
* send_remove, the rsb name is saved in ls_remove_name. A new
* lookup checks this to ensure that a new lookup message for the
* same resource name is not sent just before the remove message.
*/
for (i = 0; i < remote_count; i++) {
......@@ -1752,22 +1716,8 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
/* block lookup of same name until we've sent remove */
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = len;
memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
spin_unlock(&ls->ls_rsbtbl[b].lock);
send_remove(r);
/* allow lookup of name again */
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = 0;
memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
wake_up(&ls->ls_remove_wait);
spin_unlock(&ls->ls_rsbtbl[b].lock);
dlm_free_rsb(r);
}
......@@ -2718,8 +2668,6 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
return 0;
}
wait_pending_remove(r);
r->res_first_lkid = lkb->lkb_id;
send_lookup(r, lkb);
return 1;
......@@ -3813,7 +3761,7 @@ static int send_remove(struct dlm_rsb *r)
to_nodeid = dlm_dir_nodeid(r);
error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
GFP_NOFS);
GFP_ATOMIC);
if (error)
goto out;
......
......@@ -524,9 +524,6 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock_init(&ls->ls_rsbtbl[i].lock);
}
spin_lock_init(&ls->ls_remove_spin);
init_waitqueue_head(&ls->ls_remove_wait);
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
GFP_KERNEL);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment