Commit b3603fcb authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:

 - Fix mistaken variable assignment that caused a refcounting problem

 - Revert a recent change that began using atomic counters where they
   were not needed (for lkb wait_count)

 - Add comments around forced state reset for waiting lock operations
   during recovery

* tag 'dlm-6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: add comments about forced waiters reset
  dlm: revert atomic_t lkb_wait_count
  dlm: fix user space lkb refcounting
parents 6207b37e c53309b9
......@@ -246,7 +246,7 @@ struct dlm_lkb {
int8_t lkb_highbast; /* highest mode bast sent for */
int8_t lkb_wait_type; /* type of reply waiting for */
atomic_t lkb_wait_count;
int8_t lkb_wait_count;
int lkb_wait_nodeid; /* for debugging */
struct list_head lkb_statequeue; /* rsb g/c/w list */
......
......@@ -1407,7 +1407,6 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error = 0;
int wc;
mutex_lock(&ls->ls_waiters_mutex);
......@@ -1429,17 +1428,20 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
error = -EBUSY;
goto out;
}
wc = atomic_inc_return(&lkb->lkb_wait_count);
lkb->lkb_wait_count++;
hold_lkb(lkb);
log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
lkb->lkb_id, lkb->lkb_wait_type, mstype, wc,
dlm_iflags_val(lkb));
lkb->lkb_id, lkb->lkb_wait_type, mstype,
lkb->lkb_wait_count, dlm_iflags_val(lkb));
goto out;
}
wc = atomic_fetch_inc(&lkb->lkb_wait_count);
DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc););
DLM_ASSERT(!lkb->lkb_wait_count,
dlm_print_lkb(lkb);
printk("wait_count %d\n", lkb->lkb_wait_count););
lkb->lkb_wait_count++;
lkb->lkb_wait_type = mstype;
lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
hold_lkb(lkb);
......@@ -1502,7 +1504,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
lkb->lkb_id);
lkb->lkb_wait_type = 0;
atomic_dec(&lkb->lkb_wait_count);
lkb->lkb_wait_count--;
unhold_lkb(lkb);
goto out_del;
}
......@@ -1529,15 +1531,16 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
if (overlap_done && lkb->lkb_wait_type) {
log_error(ls, "remwait error %x reply %d wait_type %d overlap",
lkb->lkb_id, mstype, lkb->lkb_wait_type);
atomic_dec(&lkb->lkb_wait_count);
lkb->lkb_wait_count--;
unhold_lkb(lkb);
lkb->lkb_wait_type = 0;
}
DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb););
DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
if (atomic_dec_and_test(&lkb->lkb_wait_count))
lkb->lkb_wait_count--;
if (!lkb->lkb_wait_count)
list_del_init(&lkb->lkb_wait_reply);
unhold_lkb(lkb);
return 0;
......@@ -2666,7 +2669,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
goto out;
/* lock not allowed if there's any op in progress */
if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))
if (lkb->lkb_wait_type || lkb->lkb_wait_count)
goto out;
if (is_overlap(lkb))
......@@ -2728,7 +2731,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
/* normal unlock not allowed if there's any op in progress */
if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
(lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)))
(lkb->lkb_wait_type || lkb->lkb_wait_count))
goto out;
/* an lkb may be waiting for an rsb lookup to complete where the
......@@ -5011,21 +5014,32 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
return lkb;
}
/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
master or dir-node for r. Processing the lkb may result in it being placed
back on waiters. */
/* We do this after normal locking has been enabled and any saved messages
(in requestqueue) have been processed. We should be confident that at
this point we won't get or process a reply to any of these waiting
operations. But, new ops may be coming in on the rsbs/locks here from
userspace or remotely. */
/* there may have been an overlap unlock/cancel prior to recovery or after
recovery. if before, the lkb may still have a pos wait_count; if after, the
overlap flag would just have been set and nothing new sent. we can be
confident here than any replies to either the initial op or overlap ops
prior to recovery have been received. */
/*
* Forced state reset for locks that were in the middle of remote operations
* when recovery happened (i.e. lkbs that were on the waiters list, waiting
* for a reply from a remote operation.) The lkbs remaining on the waiters
* list need to be reevaluated; some may need resending to a different node
* than previously, and some may now need local handling rather than remote.
*
* First, the lkb state for the voided remote operation is forcibly reset,
* equivalent to what remove_from_waiters() would normally do:
* . lkb removed from ls_waiters list
* . lkb wait_type cleared
* . lkb waiters_count cleared
* . lkb ref count decremented for each waiters_count (almost always 1,
* but possibly 2 in case of cancel/unlock overlapping, which means
* two remote replies were being expected for the lkb.)
*
* Second, the lkb is reprocessed like an original operation would be,
* by passing it to _request_lock or _convert_lock, which will either
* process the lkb operation locally, or send it to a remote node again
* and put the lkb back onto the waiters list.
*
* When reprocessing the lkb, we may find that it's flagged for an overlapping
* force-unlock or cancel, either from before recovery began, or after recovery
* finished. If this is the case, the unlock/cancel is done directly, and the
* original operation is not initiated again (no _request_lock/_convert_lock.)
*/
int dlm_recover_waiters_post(struct dlm_ls *ls)
{
......@@ -5040,6 +5054,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
break;
}
/*
* Find an lkb from the waiters list that's been affected by
* recovery node changes, and needs to be reprocessed. Does
* hold_lkb(), adding a refcount.
*/
lkb = find_resend_waiter(ls);
if (!lkb)
break;
......@@ -5048,6 +5067,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
hold_rsb(r);
lock_rsb(r);
/*
* If the lkb has been flagged for a force unlock or cancel,
* then the reprocessing below will be replaced by just doing
* the unlock/cancel directly.
*/
mstype = lkb->lkb_wait_type;
oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
&lkb->lkb_iflags);
......@@ -5061,22 +5085,40 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
dlm_dir_nodeid(r), oc, ou);
/* At this point we assume that we won't get a reply to any
previous op or overlap op on this lock. First, do a big
remove_from_waiters() for all previous ops. */
/*
* No reply to the pre-recovery operation will now be received,
* so a forced equivalent of remove_from_waiters() is needed to
* reset the waiters state that was in place before recovery.
*/
clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
/* Forcibly clear wait_type */
lkb->lkb_wait_type = 0;
/* drop all wait_count references we still
* hold a reference for this iteration.
/*
* Forcibly reset wait_count and associated refcount. The
* wait_count will almost always be 1, but in case of an
* overlapping unlock/cancel it could be 2: see where
* add_to_waiters() finds the lkb is already on the waiters
* list and does lkb_wait_count++; hold_lkb().
*/
while (!atomic_dec_and_test(&lkb->lkb_wait_count))
while (lkb->lkb_wait_count) {
lkb->lkb_wait_count--;
unhold_lkb(lkb);
}
/* Forcibly remove from waiters list */
mutex_lock(&ls->ls_waiters_mutex);
list_del_init(&lkb->lkb_wait_reply);
mutex_unlock(&ls->ls_waiters_mutex);
/*
* The lkb is now clear of all prior waiters state and can be
* processed locally, or sent to remote node again, or directly
* cancelled/unlocked.
*/
if (oc || ou) {
/* do an unlock or cancel instead of resending */
switch (mstype) {
......
......@@ -806,7 +806,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
struct dlm_lkb *lkb;
DECLARE_WAITQUEUE(wait, current);
struct dlm_callback *cb;
int rv, copy_lvb = 0;
int rv, ret, copy_lvb = 0;
int old_mode, new_mode;
if (count == sizeof(struct dlm_device_version)) {
......@@ -906,9 +906,9 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
trace_dlm_ast(lkb->lkb_resource->res_ls, lkb);
}
rv = copy_result_to_user(lkb->lkb_ua,
test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
cb->flags, cb->mode, copy_lvb, buf, count);
ret = copy_result_to_user(lkb->lkb_ua,
test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
cb->flags, cb->mode, copy_lvb, buf, count);
kref_put(&cb->ref, dlm_release_callback);
......@@ -916,7 +916,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
if (rv == DLM_DEQUEUE_CALLBACK_LAST)
dlm_put_lkb(lkb);
return rv;
return ret;
}
static __poll_t device_poll(struct file *file, poll_table *wait)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment