Commit 6a413211 authored by Kurt Hackel's avatar Kurt Hackel Committed by Mark Fasheh

ocfs2: dlm_remaster_locks() should never exit without completing

We cannot restart recovery. Once we begin to recover a node, keep the state
of the recovery intact and follow through, regardless of any other node
deaths that may occur.
Signed-off-by: default avatarKurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: default avatarMark Fasheh <mark.fasheh@oracle.com>
parent c8df412e
...@@ -480,6 +480,7 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) ...@@ -480,6 +480,7 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
status = dlm_remaster_locks(dlm, dlm->reco.dead_node); status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
if (status < 0) { if (status < 0) {
/* we should never hit this anymore */
mlog(ML_ERROR, "error %d remastering locks for node %u, " mlog(ML_ERROR, "error %d remastering locks for node %u, "
"retrying.\n", status, dlm->reco.dead_node); "retrying.\n", status, dlm->reco.dead_node);
/* yield a bit to allow any final network messages /* yield a bit to allow any final network messages
...@@ -506,9 +507,16 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -506,9 +507,16 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
int destroy = 0; int destroy = 0;
int pass = 0; int pass = 0;
status = dlm_init_recovery_area(dlm, dead_node); do {
if (status < 0) /* we have become recovery master. there is no escaping
goto leave; * this, so just keep trying until we get it. */
status = dlm_init_recovery_area(dlm, dead_node);
if (status < 0) {
mlog(ML_ERROR, "%s: failed to alloc recovery area, "
"retrying\n", dlm->name);
msleep(1000);
}
} while (status != 0);
/* safe to access the node data list without a lock, since this /* safe to access the node data list without a lock, since this
* process is the only one to change the list */ * process is the only one to change the list */
...@@ -525,16 +533,36 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -525,16 +533,36 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
continue; continue;
} }
status = dlm_request_all_locks(dlm, ndata->node_num, dead_node); do {
if (status < 0) { status = dlm_request_all_locks(dlm, ndata->node_num,
mlog_errno(status); dead_node);
if (dlm_is_host_down(status)) if (status < 0) {
ndata->state = DLM_RECO_NODE_DATA_DEAD; mlog_errno(status);
else { if (dlm_is_host_down(status)) {
destroy = 1; /* node died, ignore it for recovery */
goto leave; status = 0;
ndata->state = DLM_RECO_NODE_DATA_DEAD;
/* wait for the domain map to catch up
* with the network state. */
wait_event_timeout(dlm->dlm_reco_thread_wq,
dlm_is_node_dead(dlm,
ndata->node_num),
msecs_to_jiffies(1000));
mlog(0, "waited 1 sec for %u, "
"dead? %s\n", ndata->node_num,
dlm_is_node_dead(dlm, ndata->node_num) ?
"yes" : "no");
} else {
/* -ENOMEM on the other node */
mlog(0, "%s: node %u returned "
"%d during recovery, retrying "
"after a short wait\n",
dlm->name, ndata->node_num,
status);
msleep(100);
}
} }
} } while (status != 0);
switch (ndata->state) { switch (ndata->state) {
case DLM_RECO_NODE_DATA_INIT: case DLM_RECO_NODE_DATA_INIT:
...@@ -546,10 +574,9 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -546,10 +574,9 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
mlog(0, "node %u died after requesting " mlog(0, "node %u died after requesting "
"recovery info for node %u\n", "recovery info for node %u\n",
ndata->node_num, dead_node); ndata->node_num, dead_node);
// start all over /* fine. don't need this node's info.
destroy = 1; * continue without it. */
status = -EAGAIN; break;
goto leave;
case DLM_RECO_NODE_DATA_REQUESTING: case DLM_RECO_NODE_DATA_REQUESTING:
ndata->state = DLM_RECO_NODE_DATA_REQUESTED; ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
mlog(0, "now receiving recovery data from " mlog(0, "now receiving recovery data from "
...@@ -593,28 +620,12 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -593,28 +620,12 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
BUG(); BUG();
break; break;
case DLM_RECO_NODE_DATA_DEAD: case DLM_RECO_NODE_DATA_DEAD:
mlog(ML_NOTICE, "node %u died after " mlog(0, "node %u died after "
"requesting recovery info for " "requesting recovery info for "
"node %u\n", ndata->node_num, "node %u\n", ndata->node_num,
dead_node); dead_node);
spin_unlock(&dlm_reco_state_lock); spin_unlock(&dlm_reco_state_lock);
// start all over break;
destroy = 1;
status = -EAGAIN;
/* instead of spinning like crazy here,
* wait for the domain map to catch up
* with the network state. otherwise this
* can be hit hundreds of times before
* the node is really seen as dead. */
wait_event_timeout(dlm->dlm_reco_thread_wq,
dlm_is_node_dead(dlm,
ndata->node_num),
msecs_to_jiffies(1000));
mlog(0, "waited 1 sec for %u, "
"dead? %s\n", ndata->node_num,
dlm_is_node_dead(dlm, ndata->node_num) ?
"yes" : "no");
goto leave;
case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_RECEIVING:
case DLM_RECO_NODE_DATA_REQUESTED: case DLM_RECO_NODE_DATA_REQUESTED:
mlog(0, "%s: node %u still in state %s\n", mlog(0, "%s: node %u still in state %s\n",
...@@ -659,7 +670,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -659,7 +670,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
jiffies, dlm->reco.dead_node, jiffies, dlm->reco.dead_node,
dlm->node_num, dlm->reco.new_master); dlm->node_num, dlm->reco.new_master);
destroy = 1; destroy = 1;
status = ret; status = 0;
/* rescan everything marked dirty along the way */ /* rescan everything marked dirty along the way */
dlm_kick_thread(dlm, NULL); dlm_kick_thread(dlm, NULL);
break; break;
...@@ -672,7 +683,6 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -672,7 +683,6 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
} }
leave:
if (destroy) if (destroy)
dlm_destroy_recovery_area(dlm, dead_node); dlm_destroy_recovery_area(dlm, dead_node);
...@@ -832,24 +842,22 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) ...@@ -832,24 +842,22 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
if (dead_node != dlm->reco.dead_node || if (dead_node != dlm->reco.dead_node ||
reco_master != dlm->reco.new_master) { reco_master != dlm->reco.new_master) {
/* show extra debug info if the recovery state is messed */ /* worker could have been created before the recovery master
mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), " * died. if so, do not continue, but do not error. */
"request(dead=%u, master=%u)\n", if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
dlm->name, dlm->reco.dead_node, dlm->reco.new_master, mlog(ML_NOTICE, "%s: will not send recovery state, "
dead_node, reco_master); "recovery master %u died, thread=(dead=%u,mas=%u)"
mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u " " current=(dead=%u,mas=%u)\n", dlm->name,
"entry[0]={c=%u:%llu,l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n", reco_master, dead_node, reco_master,
dlm->name, mres->lockname_len, mres->lockname, mres->master, dlm->reco.dead_node, dlm->reco.new_master);
mres->num_locks, mres->total_locks, mres->flags, } else {
dlm_get_lock_cookie_node(mres->ml[0].cookie), mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, "
dlm_get_lock_cookie_seq(mres->ml[0].cookie), "master=%u), request(dead=%u, master=%u)\n",
mres->ml[0].list, mres->ml[0].flags, dlm->name, dlm->reco.dead_node,
mres->ml[0].type, mres->ml[0].convert_type, dlm->reco.new_master, dead_node, reco_master);
mres->ml[0].highest_blocked, mres->ml[0].node); }
BUG(); goto leave;
} }
BUG_ON(dead_node != dlm->reco.dead_node);
BUG_ON(reco_master != dlm->reco.new_master);
/* lock resources should have already been moved to the /* lock resources should have already been moved to the
* dlm->reco.resources list. now move items from that list * dlm->reco.resources list. now move items from that list
...@@ -889,7 +897,7 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) ...@@ -889,7 +897,7 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
dlm->name, reco_master, dead_node, ret); dlm->name, reco_master, dead_node, ret);
} }
} }
leave:
free_page((unsigned long)data); free_page((unsigned long)data);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment