recoverd.c 11.4 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0-only
2 3 4 5
/******************************************************************************
*******************************************************************************
**
**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
David Teigland's avatar
David Teigland committed
6
**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
**
**
*******************************************************************************
******************************************************************************/

#include "dlm_internal.h"
#include "lockspace.h"
#include "member.h"
#include "dir.h"
#include "ast.h"
#include "recover.h"
#include "lowcomms.h"
#include "lock.h"
#include "requestqueue.h"
#include "recoverd.h"

23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
static int dlm_create_masters_list(struct dlm_ls *ls)
{
	struct rb_node *n;
	struct dlm_rsb *r;
	int i, error = 0;

	write_lock(&ls->ls_masters_lock);
	if (!list_empty(&ls->ls_masters_list)) {
		log_error(ls, "root list not empty");
		error = -EINVAL;
		goto out;
	}

	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
			r = rb_entry(n, struct dlm_rsb, res_hashnode);
			if (r->res_nodeid)
				continue;

			list_add(&r->res_masters_list, &ls->ls_masters_list);
			dlm_hold_rsb(r);
		}
		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
	}
 out:
	write_unlock(&ls->ls_masters_lock);
	return error;
}

static void dlm_release_masters_list(struct dlm_ls *ls)
{
	struct dlm_rsb *r, *safe;

	write_lock(&ls->ls_masters_lock);
	list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
		list_del_init(&r->res_masters_list);
		dlm_put_rsb(r);
	}
	write_unlock(&ls->ls_masters_lock);
}

65
static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
66 67 68 69 70 71 72 73 74
{
	struct rb_node *n;
	struct dlm_rsb *r;
	int i;

	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
			r = rb_entry(n, struct dlm_rsb, res_hashnode);
75
			list_add(&r->res_root_list, root_list);
76 77 78 79 80 81 82 83 84
			dlm_hold_rsb(r);
		}

		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
			log_error(ls, "%s toss not empty", __func__);
		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
	}
}

85
static void dlm_release_root_list(struct list_head *root_list)
86 87 88
{
	struct dlm_rsb *r, *safe;

89
	list_for_each_entry_safe(r, safe, root_list, res_root_list) {
90 91 92 93
		list_del_init(&r->res_root_list);
		dlm_put_rsb(r);
	}
}
94 95

/* If the start for which we're re-enabling locking (seq) has been superseded
96 97 98 99 100
   by a newer stop (ls_recover_seq), we need to leave locking disabled.

   We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
   locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
   enables locking and clears the requestqueue between a and b. */
101 102 103 104 105

static int enable_locking(struct dlm_ls *ls, uint64_t seq)
{
	int error = -EINTR;

106
	write_lock(&ls->ls_recv_active);
107

108 109 110
	spin_lock(&ls->ls_recover_lock);
	if (ls->ls_recover_seq == seq) {
		set_bit(LSFL_RUNNING, &ls->ls_flags);
111
		/* unblocks processes waiting to enter the dlm */
112
		up_write(&ls->ls_in_recovery);
113
		clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
114 115 116
		error = 0;
	}
	spin_unlock(&ls->ls_recover_lock);
117

118
	write_unlock(&ls->ls_recv_active);
119 120 121 122 123
	return error;
}

static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
{
124
	LIST_HEAD(root_list);
125 126 127
	unsigned long start;
	int error, neg = 0;

128
	log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
129

130
	mutex_lock(&ls->ls_recoverd_active);
131

132
	dlm_callback_suspend(ls);
133

134
	dlm_clear_toss(ls);
135 136

	/*
137 138
	 * This list of root rsb's will be the basis of most of the recovery
	 * routines.
139 140
	 */

141
	dlm_create_root_list(ls, &root_list);
142 143 144

	/*
	 * Add or remove nodes from the lockspace's ls_nodes list.
145 146 147 148
	 *
	 * Due to the fact that we must report all membership changes to lsops
	 * or midcomms layer, it is not permitted to abort ls_recover() until
	 * this is done.
149 150 151 152
	 */

	error = dlm_recover_members(ls, rv, &neg);
	if (error) {
153
		log_rinfo(ls, "dlm_recover_members error %d", error);
154 155
		goto fail;
	}
156

157
	dlm_recover_dir_nodeid(ls, &root_list);
158

159 160 161 162 163 164 165 166 167 168 169 170 171 172
	/* Create a snapshot of all active rsbs were we are the master of.
	 * During the barrier between dlm_recover_members_wait() and
	 * dlm_recover_directory() other nodes can dump their necessary
	 * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom
	 * communication dlm_copy_master_names() handling.
	 *
	 * TODO We should create a per lockspace list that contains rsbs
	 * that we are the master of. Instead of creating this list while
	 * recovery we keep track of those rsbs while locking handling and
	 * recovery can use it when necessary.
	 */
	error = dlm_create_masters_list(ls);
	if (error) {
		log_rinfo(ls, "dlm_create_masters_list error %d", error);
173
		goto fail_root_list;
174 175
	}

David Teigland's avatar
David Teigland committed
176 177
	ls->ls_recover_locks_in = 0;

178 179
	dlm_set_recover_status(ls, DLM_RS_NODES);

180
	error = dlm_recover_members_wait(ls, rv->seq);
181
	if (error) {
182
		log_rinfo(ls, "dlm_recover_members_wait error %d", error);
183
		dlm_release_masters_list(ls);
184
		goto fail_root_list;
185 186
	}

187 188 189 190 191 192 193
	start = jiffies;

	/*
	 * Rebuild our own share of the directory by collecting from all other
	 * nodes their master rsb names that hash to us.
	 */

194
	error = dlm_recover_directory(ls, rv->seq);
195
	if (error) {
196
		log_rinfo(ls, "dlm_recover_directory error %d", error);
197
		dlm_release_masters_list(ls);
198
		goto fail_root_list;
199 200
	}

201
	dlm_set_recover_status(ls, DLM_RS_DIR);
202

203
	error = dlm_recover_directory_wait(ls, rv->seq);
204
	if (error) {
205
		log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
206
		dlm_release_masters_list(ls);
207
		goto fail_root_list;
208 209
	}

210 211
	dlm_release_masters_list(ls);

212 213 214 215 216 217 218 219
	/*
	 * We may have outstanding operations that are waiting for a reply from
	 * a failed node.  Mark these to be resent after recovery.  Unlock and
	 * cancel ops can just be completed.
	 */

	dlm_recover_waiters_pre(ls);

220
	if (dlm_recovery_stopped(ls)) {
221
		error = -EINTR;
222
		goto fail_root_list;
223
	}
224 225 226 227 228 229

	if (neg || dlm_no_directory(ls)) {
		/*
		 * Clear lkb's for departed nodes.
		 */

230
		dlm_recover_purge(ls, &root_list);
231 232 233 234 235 236

		/*
		 * Get new master nodeid's for rsb's that were mastered on
		 * departed nodes.
		 */

237
		error = dlm_recover_masters(ls, rv->seq, &root_list);
238
		if (error) {
239
			log_rinfo(ls, "dlm_recover_masters error %d", error);
240
			goto fail_root_list;
241 242 243 244 245 246
		}

		/*
		 * Send our locks on remastered rsb's to the new masters.
		 */

247
		error = dlm_recover_locks(ls, rv->seq, &root_list);
248
		if (error) {
249
			log_rinfo(ls, "dlm_recover_locks error %d", error);
250
			goto fail_root_list;
251 252
		}

253 254
		dlm_set_recover_status(ls, DLM_RS_LOCKS);

255
		error = dlm_recover_locks_wait(ls, rv->seq);
256
		if (error) {
257
			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
258
			goto fail_root_list;
259 260
		}

261
		log_rinfo(ls, "dlm_recover_locks %u in",
David Teigland's avatar
David Teigland committed
262 263
			  ls->ls_recover_locks_in);

264 265 266 267 268 269
		/*
		 * Finalize state in master rsb's now that all locks can be
		 * checked.  This includes conversion resolution and lvb
		 * settings.
		 */

270
		dlm_recover_rsbs(ls, &root_list);
271 272 273 274
	} else {
		/*
		 * Other lockspace members may be going through the "neg" steps
		 * while also adding us to the lockspace, in which case they'll
275
		 * be doing the recover_locks (RS_LOCKS) barrier.
276 277
		 */
		dlm_set_recover_status(ls, DLM_RS_LOCKS);
278

279
		error = dlm_recover_locks_wait(ls, rv->seq);
280
		if (error) {
281
			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
282
			goto fail_root_list;
283
		}
284 285
	}

286
	dlm_release_root_list(&root_list);
287

288 289 290 291 292 293 294 295
	/*
	 * Purge directory-related requests that are saved in requestqueue.
	 * All dir requests from before recovery are invalid now due to the dir
	 * rebuild and will be resent by the requesting nodes.
	 */

	dlm_purge_requestqueue(ls);

296
	dlm_set_recover_status(ls, DLM_RS_DONE);
297

298
	error = dlm_recover_done_wait(ls, rv->seq);
299
	if (error) {
300
		log_rinfo(ls, "dlm_recover_done_wait error %d", error);
301 302 303 304 305
		goto fail;
	}

	dlm_clear_members_gone(ls);

306 307
	dlm_callback_resume(ls);

308 309
	error = enable_locking(ls, rv->seq);
	if (error) {
310
		log_rinfo(ls, "enable_locking error %d", error);
311 312 313 314 315
		goto fail;
	}

	error = dlm_process_requestqueue(ls);
	if (error) {
316
		log_rinfo(ls, "dlm_process_requestqueue error %d", error);
317 318 319 320 321
		goto fail;
	}

	error = dlm_recover_waiters_post(ls);
	if (error) {
322
		log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
323 324 325
		goto fail;
	}

David Teigland's avatar
David Teigland committed
326
	dlm_recover_grant(ls);
327

328
	log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
David Teigland's avatar
David Teigland committed
329
		  (unsigned long long)rv->seq, ls->ls_generation,
330
		  jiffies_to_msecs(jiffies - start));
331
	mutex_unlock(&ls->ls_recoverd_active);
332 333 334

	return 0;

335 336
 fail_root_list:
	dlm_release_root_list(&root_list);
337
 fail:
338
	mutex_unlock(&ls->ls_recoverd_active);
339

340 341 342
	return error;
}

343 344 345 346
/* The dlm_ls_start() that created the rv we take here may already have been
   stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
   flag set. */

347 348 349
static void do_ls_recovery(struct dlm_ls *ls)
{
	struct dlm_recover *rv = NULL;
350
	int error;
351 352 353 354

	spin_lock(&ls->ls_recover_lock);
	rv = ls->ls_recover_args;
	ls->ls_recover_args = NULL;
355
	if (rv && ls->ls_recover_seq == rv->seq)
356
		clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
357 358 359
	spin_unlock(&ls->ls_recover_lock);

	if (rv) {
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
		error = ls_recover(ls, rv);
		switch (error) {
		case 0:
			ls->ls_recovery_result = 0;
			complete(&ls->ls_recovery_done);

			dlm_lsop_recover_done(ls);
			break;
		case -EINTR:
			/* if recovery was interrupted -EINTR we wait for the next
			 * ls_recover() iteration until it hopefully succeeds.
			 */
			log_rinfo(ls, "%s %llu interrupted and should be queued to run again",
				  __func__, (unsigned long long)rv->seq);
			break;
		default:
			log_rinfo(ls, "%s %llu error %d", __func__,
				  (unsigned long long)rv->seq, error);

			/* let new_lockspace() get aware of critical error */
			ls->ls_recovery_result = error;
			complete(&ls->ls_recovery_done);
			break;
		}

David Teigland's avatar
David Teigland committed
385
		kfree(rv->nodes);
386 387 388 389 390 391 392 393 394
		kfree(rv);
	}
}

static int dlm_recoverd(void *arg)
{
	struct dlm_ls *ls;

	ls = dlm_find_lockspace_local(arg);
395 396 397 398
	if (!ls) {
		log_print("dlm_recoverd: no lockspace %p", arg);
		return -1;
	}
399

400 401 402 403
	down_write(&ls->ls_in_recovery);
	set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
	wake_up(&ls->ls_recover_lock_wait);

404 405 406 407 408 409
	while (1) {
		/*
		 * We call kthread_should_stop() after set_current_state().
		 * This is because it works correctly if kthread_stop() is
		 * called just before set_current_state().
		 */
410
		set_current_state(TASK_INTERRUPTIBLE);
411 412 413 414
		if (kthread_should_stop()) {
			set_current_state(TASK_RUNNING);
			break;
		}
415
		if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
416 417 418
		    !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
			if (kthread_should_stop())
				break;
419
			schedule();
420
		}
421 422
		set_current_state(TASK_RUNNING);

423 424 425 426 427 428 429
		if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
			down_write(&ls->ls_in_recovery);
			set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
			wake_up(&ls->ls_recover_lock_wait);
		}

		if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
430 431 432
			do_ls_recovery(ls);
	}

433 434 435
	if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
		up_write(&ls->ls_in_recovery);

436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
	dlm_put_lockspace(ls);
	return 0;
}

int dlm_recoverd_start(struct dlm_ls *ls)
{
	struct task_struct *p;
	int error = 0;

	p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
	if (IS_ERR(p))
		error = PTR_ERR(p);
	else
                ls->ls_recoverd_task = p;
	return error;
}

void dlm_recoverd_stop(struct dlm_ls *ls)
{
	kthread_stop(ls->ls_recoverd_task);
}

void dlm_recoverd_suspend(struct dlm_ls *ls)
{
460
	wake_up(&ls->ls_wait_general);
461
	mutex_lock(&ls->ls_recoverd_active);
462 463 464 465
}

void dlm_recoverd_resume(struct dlm_ls *ls)
{
466
	mutex_unlock(&ls->ls_recoverd_active);
467 468
}