Commit e7e69eb3 authored by Kurt Hackel's avatar Kurt Hackel Committed by Mark Fasheh

ocfs2: teach dlm_restart_lock_mastery() to wait on recovery

Change behavior of dlm_restart_lock_mastery() when a node goes down.  Dump
all responses that have been collected and start over.
Signed-off-by: default avatarKurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: default avatarMark Fasheh <mark.fasheh@oracle.com>
parent e4eb0368
...@@ -867,6 +867,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, ...@@ -867,6 +867,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
spin_unlock(&dlm->master_lock); spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
redo_request:
while (wait_on_recovery) { while (wait_on_recovery) {
/* any cluster changes that occurred after dropping the /* any cluster changes that occurred after dropping the
* dlm spinlock would be detectable be a change on the mle, * dlm spinlock would be detectable be a change on the mle,
...@@ -904,7 +905,6 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, ...@@ -904,7 +905,6 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
if (blocked) if (blocked)
goto wait; goto wait;
redo_request:
ret = -EINVAL; ret = -EINVAL;
dlm_node_iter_init(mle->vote_map, &iter); dlm_node_iter_init(mle->vote_map, &iter);
while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
...@@ -929,6 +929,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, ...@@ -929,6 +929,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
/* keep going until the response map includes all nodes */ /* keep going until the response map includes all nodes */
ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
if (ret < 0) { if (ret < 0) {
wait_on_recovery = 1;
mlog(0, "%s:%.*s: node map changed, redo the " mlog(0, "%s:%.*s: node map changed, redo the "
"master request now, blocked=%d\n", "master request now, blocked=%d\n",
dlm->name, res->lockname.len, dlm->name, res->lockname.len,
...@@ -1210,18 +1211,6 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, ...@@ -1210,18 +1211,6 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
set_bit(node, mle->vote_map); set_bit(node, mle->vote_map);
} else { } else {
mlog(ML_ERROR, "node down! %d\n", node); mlog(ML_ERROR, "node down! %d\n", node);
/* if the node wasn't involved in mastery skip it,
* but clear it out from the maps so that it will
* not affect mastery of this lockres */
clear_bit(node, mle->response_map);
clear_bit(node, mle->vote_map);
if (!test_bit(node, mle->maybe_map))
goto next;
/* if we're already blocked on lock mastery, and the
* dead node wasn't the expected master, or there is
* another node in the maybe_map, keep waiting */
if (blocked) { if (blocked) {
int lowest = find_next_bit(mle->maybe_map, int lowest = find_next_bit(mle->maybe_map,
O2NM_MAX_NODES, 0); O2NM_MAX_NODES, 0);
...@@ -1229,54 +1218,53 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, ...@@ -1229,54 +1218,53 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
/* act like it was never there */ /* act like it was never there */
clear_bit(node, mle->maybe_map); clear_bit(node, mle->maybe_map);
if (node != lowest) if (node == lowest) {
goto next; mlog(0, "expected master %u died"
" while this node was blocked "
mlog(ML_ERROR, "expected master %u died while " "waiting on it!\n", node);
"this node was blocked waiting on it!\n", lowest = find_next_bit(mle->maybe_map,
node); O2NM_MAX_NODES,
lowest = find_next_bit(mle->maybe_map, lowest+1);
O2NM_MAX_NODES, if (lowest < O2NM_MAX_NODES) {
lowest+1); mlog(0, "%s:%.*s:still "
if (lowest < O2NM_MAX_NODES) { "blocked. waiting on %u "
mlog(0, "still blocked. waiting " "now\n", dlm->name,
"on %u now\n", lowest); res->lockname.len,
goto next; res->lockname.name,
lowest);
} else {
/* mle is an MLE_BLOCK, but
* there is now nothing left to
* block on. we need to return
* all the way back out and try
* again with an MLE_MASTER.
* dlm_do_local_recovery_cleanup
* has already run, so the mle
* refcount is ok */
mlog(0, "%s:%.*s: no "
"longer blocking. try to "
"master this here\n",
dlm->name,
res->lockname.len,
res->lockname.name);
mle->type = DLM_MLE_MASTER;
mle->u.res = res;
}
} }
/* mle is an MLE_BLOCK, but there is now
* nothing left to block on. we need to return
* all the way back out and try again with
* an MLE_MASTER. dlm_do_local_recovery_cleanup
* has already run, so the mle refcount is ok */
mlog(0, "no longer blocking. we can "
"try to master this here\n");
mle->type = DLM_MLE_MASTER;
memset(mle->maybe_map, 0,
sizeof(mle->maybe_map));
memset(mle->response_map, 0,
sizeof(mle->maybe_map));
memcpy(mle->vote_map, mle->node_map,
sizeof(mle->node_map));
mle->u.res = res;
set_bit(dlm->node_num, mle->maybe_map);
ret = -EAGAIN;
goto next;
} }
clear_bit(node, mle->maybe_map); /* now blank out everything, as if we had never
if (node > dlm->node_num) * contacted anyone */
goto next; memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
memset(mle->response_map, 0, sizeof(mle->response_map));
mlog(0, "dead node in map!\n"); /* reset the vote_map to the current node_map */
/* yuck. go back and re-contact all nodes memcpy(mle->vote_map, mle->node_map,
* in the vote_map, removing this node. */ sizeof(mle->node_map));
memset(mle->response_map, 0, /* put myself into the maybe map */
sizeof(mle->response_map)); if (mle->type != DLM_MLE_BLOCK)
set_bit(dlm->node_num, mle->maybe_map);
} }
ret = -EAGAIN; ret = -EAGAIN;
next:
node = dlm_bitmap_diff_iter_next(&bdi, &sc); node = dlm_bitmap_diff_iter_next(&bdi, &sc);
} }
return ret; return ret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment