Commit 2a7ce0ed authored by David Teigland's avatar David Teigland

dlm: remove shared message stub for recovery

kmalloc a stub message struct during recovery instead of sharing the
struct in the lockspace.  This leaves the lockspace stub_ms only for
faking downconvert replies, where it is never modified and sharing
is not a problem.

Also improve the debug messages in the same recovery function.
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent c6ff669b
...@@ -209,6 +209,7 @@ struct dlm_args { ...@@ -209,6 +209,7 @@ struct dlm_args {
#define DLM_IFL_WATCH_TIMEWARN 0x00400000 #define DLM_IFL_WATCH_TIMEWARN 0x00400000
#define DLM_IFL_TIMEOUT_CANCEL 0x00800000 #define DLM_IFL_TIMEOUT_CANCEL 0x00800000
#define DLM_IFL_DEADLOCK_CANCEL 0x01000000 #define DLM_IFL_DEADLOCK_CANCEL 0x01000000
#define DLM_IFL_STUB_MS 0x02000000 /* magic number for m_flags */
#define DLM_IFL_USER 0x00000001 #define DLM_IFL_USER 0x00000001
#define DLM_IFL_ORPHAN 0x00000002 #define DLM_IFL_ORPHAN 0x00000002
......
...@@ -1037,10 +1037,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) ...@@ -1037,10 +1037,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
struct dlm_ls *ls = lkb->lkb_resource->res_ls; struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error; int error;
if (ms != &ls->ls_stub_ms) if (ms->m_flags != DLM_IFL_STUB_MS)
mutex_lock(&ls->ls_waiters_mutex); mutex_lock(&ls->ls_waiters_mutex);
error = _remove_from_waiters(lkb, ms->m_type, ms); error = _remove_from_waiters(lkb, ms->m_type, ms);
if (ms != &ls->ls_stub_ms) if (ms->m_flags != DLM_IFL_STUB_MS)
mutex_unlock(&ls->ls_waiters_mutex); mutex_unlock(&ls->ls_waiters_mutex);
return error; return error;
} }
...@@ -1462,14 +1462,8 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -1462,14 +1462,8 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
compatible with other granted locks */ compatible with other granted locks */
static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms) static void munge_demoted(struct dlm_lkb *lkb)
{ {
if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
log_print("munge_demoted %x invalid reply type %d",
lkb->lkb_id, ms->m_type);
return;
}
if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
log_print("munge_demoted %x invalid modes gr %d rq %d", log_print("munge_demoted %x invalid modes gr %d rq %d",
lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
...@@ -2966,9 +2960,9 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -2966,9 +2960,9 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
/* down conversions go without a reply from the master */ /* down conversions go without a reply from the master */
if (!error && down_conversion(lkb)) { if (!error && down_conversion(lkb)) {
remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
r->res_ls->ls_stub_ms.m_result = 0; r->res_ls->ls_stub_ms.m_result = 0;
r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
} }
...@@ -3156,6 +3150,9 @@ static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) ...@@ -3156,6 +3150,9 @@ static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
{ {
if (ms->m_flags == DLM_IFL_STUB_MS)
return;
lkb->lkb_sbflags = ms->m_sbflags; lkb->lkb_sbflags = ms->m_sbflags;
lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
(ms->m_flags & 0x0000FFFF); (ms->m_flags & 0x0000FFFF);
...@@ -3698,7 +3695,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, ...@@ -3698,7 +3695,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
/* convert was queued on remote master */ /* convert was queued on remote master */
receive_flags_reply(lkb, ms); receive_flags_reply(lkb, ms);
if (is_demoted(lkb)) if (is_demoted(lkb))
munge_demoted(lkb, ms); munge_demoted(lkb);
del_lkb(r, lkb); del_lkb(r, lkb);
add_lkb(r, lkb, DLM_LKSTS_CONVERT); add_lkb(r, lkb, DLM_LKSTS_CONVERT);
add_timeout(lkb); add_timeout(lkb);
...@@ -3708,7 +3705,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, ...@@ -3708,7 +3705,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
/* convert was granted on remote master */ /* convert was granted on remote master */
receive_flags_reply(lkb, ms); receive_flags_reply(lkb, ms);
if (is_demoted(lkb)) if (is_demoted(lkb))
munge_demoted(lkb, ms); munge_demoted(lkb);
grant_lock_pc(r, lkb, ms); grant_lock_pc(r, lkb, ms);
queue_cast(r, lkb, 0); queue_cast(r, lkb, 0);
break; break;
...@@ -4082,15 +4079,17 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid) ...@@ -4082,15 +4079,17 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
dlm_put_lockspace(ls); dlm_put_lockspace(ls);
} }
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms_stub)
{ {
if (middle_conversion(lkb)) { if (middle_conversion(lkb)) {
hold_lkb(lkb); hold_lkb(lkb);
ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; memset(ms_stub, 0, sizeof(struct dlm_message));
ls->ls_stub_ms.m_result = -EINPROGRESS; ms_stub->m_flags = DLM_IFL_STUB_MS;
ls->ls_stub_ms.m_flags = lkb->lkb_flags; ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; ms_stub->m_result = -EINPROGRESS;
_receive_convert_reply(lkb, &ls->ls_stub_ms); ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
_receive_convert_reply(lkb, ms_stub);
/* Same special case as in receive_rcom_lock_args() */ /* Same special case as in receive_rcom_lock_args() */
lkb->lkb_grmode = DLM_LOCK_IV; lkb->lkb_grmode = DLM_LOCK_IV;
...@@ -4131,13 +4130,27 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) ...@@ -4131,13 +4130,27 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
void dlm_recover_waiters_pre(struct dlm_ls *ls) void dlm_recover_waiters_pre(struct dlm_ls *ls)
{ {
struct dlm_lkb *lkb, *safe; struct dlm_lkb *lkb, *safe;
struct dlm_message *ms_stub;
int wait_type, stub_unlock_result, stub_cancel_result; int wait_type, stub_unlock_result, stub_cancel_result;
ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
if (!ms_stub) {
log_error(ls, "dlm_recover_waiters_pre no mem");
return;
}
mutex_lock(&ls->ls_waiters_mutex); mutex_lock(&ls->ls_waiters_mutex);
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags); /* exclude debug messages about unlocks because there can be so
many and they aren't very interesting */
if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
log_debug(ls, "recover_waiter %x nodeid %d "
"msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
}
/* all outstanding lookups, regardless of destination will be /* all outstanding lookups, regardless of destination will be
resent after recovery is done */ resent after recovery is done */
...@@ -4183,26 +4196,28 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) ...@@ -4183,26 +4196,28 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
break; break;
case DLM_MSG_CONVERT: case DLM_MSG_CONVERT:
recover_convert_waiter(ls, lkb); recover_convert_waiter(ls, lkb, ms_stub);
break; break;
case DLM_MSG_UNLOCK: case DLM_MSG_UNLOCK:
hold_lkb(lkb); hold_lkb(lkb);
ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; memset(ms_stub, 0, sizeof(struct dlm_message));
ls->ls_stub_ms.m_result = stub_unlock_result; ms_stub->m_flags = DLM_IFL_STUB_MS;
ls->ls_stub_ms.m_flags = lkb->lkb_flags; ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; ms_stub->m_result = stub_unlock_result;
_receive_unlock_reply(lkb, &ls->ls_stub_ms); ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
_receive_unlock_reply(lkb, ms_stub);
dlm_put_lkb(lkb); dlm_put_lkb(lkb);
break; break;
case DLM_MSG_CANCEL: case DLM_MSG_CANCEL:
hold_lkb(lkb); hold_lkb(lkb);
ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; memset(ms_stub, 0, sizeof(struct dlm_message));
ls->ls_stub_ms.m_result = stub_cancel_result; ms_stub->m_flags = DLM_IFL_STUB_MS;
ls->ls_stub_ms.m_flags = lkb->lkb_flags; ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; ms_stub->m_result = stub_cancel_result;
_receive_cancel_reply(lkb, &ls->ls_stub_ms); ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
_receive_cancel_reply(lkb, ms_stub);
dlm_put_lkb(lkb); dlm_put_lkb(lkb);
break; break;
...@@ -4213,6 +4228,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) ...@@ -4213,6 +4228,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
schedule(); schedule();
} }
mutex_unlock(&ls->ls_waiters_mutex); mutex_unlock(&ls->ls_waiters_mutex);
kfree(ms_stub);
} }
static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
...@@ -4277,8 +4293,8 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) ...@@ -4277,8 +4293,8 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
ou = is_overlap_unlock(lkb); ou = is_overlap_unlock(lkb);
err = 0; err = 0;
log_debug(ls, "recover_waiters_post %x type %d flags %x %s", log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
/* At this point we assume that we won't get a reply to any /* At this point we assume that we won't get a reply to any
previous op or overlap op on this lock. First, do a big previous op or overlap op on this lock. First, do a big
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment