lock_deadlock.c 25.1 KB
Newer Older
1 2 3
/*-
 * See the file LICENSE for redistribution information.
 *
jimw@mysql.com's avatar
jimw@mysql.com committed
4
 * Copyright (c) 1996-2005
5
 *	Sleepycat Software.  All rights reserved.
jimw@mysql.com's avatar
jimw@mysql.com committed
6
 *
jimw@mysql.com's avatar
jimw@mysql.com committed
7
 * $Id: lock_deadlock.c,v 12.10 2005/10/07 20:21:30 ubell Exp $
8 9 10 11 12 13 14 15 16 17 18
 */

#include "db_config.h"

#ifndef NO_SYSTEM_INCLUDES
#include <sys/types.h>

#include <string.h>
#endif

#include "db_int.h"
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
19 20
#include "dbinc/db_shash.h"
#include "dbinc/lock.h"
jimw@mysql.com's avatar
jimw@mysql.com committed
21
#include "dbinc/log.h"
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
22
#include "dbinc/txn.h"
23

jimw@mysql.com's avatar
jimw@mysql.com committed
24
#define	ISSET_MAP(M, N)	((M)[(N) / 32] & (1 << ((N) % 32)))
25 26 27 28 29 30 31 32

#define	CLEAR_MAP(M, N) {						\
	u_int32_t __i;							\
	for (__i = 0; __i < (N); __i++)					\
		(M)[__i] = 0;						\
}

#define	SET_MAP(M, B)	((M)[(B) / 32] |= (1 << ((B) % 32)))
jimw@mysql.com's avatar
jimw@mysql.com committed
33
#define	CLR_MAP(M, B)	((M)[(B) / 32] &= ~((u_int)1 << ((B) % 32)))
34 35 36 37 38 39 40 41 42 43

#define	OR_MAP(D, S, N)	{						\
	u_int32_t __i;							\
	for (__i = 0; __i < (N); __i++)					\
		D[__i] |= S[__i];					\
}
#define	BAD_KILLID	0xffffffff

typedef struct {
	int		valid;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
44
	int		self_wait;
jimw@mysql.com's avatar
jimw@mysql.com committed
45
	int		in_abort;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
46
	u_int32_t	count;
47
	u_int32_t	id;
jimw@mysql.com's avatar
jimw@mysql.com committed
48 49
	roff_t		last_lock;
	roff_t		last_obj;
50 51 52 53
	u_int32_t	last_locker_id;
	db_pgno_t	pgno;
} locker_info;

jimw@mysql.com's avatar
jimw@mysql.com committed
54
static int __dd_abort __P((DB_ENV *, locker_info *, int *));
jimw@mysql.com's avatar
jimw@mysql.com committed
55
static int __dd_build __P((DB_ENV *,
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
56
	    u_int32_t, u_int32_t **, u_int32_t *, u_int32_t *, locker_info **));
jimw@mysql.com's avatar
jimw@mysql.com committed
57
static int __dd_find __P((DB_ENV *,
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
58
	    u_int32_t *, locker_info *, u_int32_t, u_int32_t, u_int32_t ***));
jimw@mysql.com's avatar
jimw@mysql.com committed
59
static int __dd_isolder __P((u_int32_t, u_int32_t, u_int32_t, u_int32_t));
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
60 61
static int __dd_verify __P((locker_info *, u_int32_t *, u_int32_t *,
	    u_int32_t *, u_int32_t, u_int32_t, u_int32_t));
62 63

#ifdef DIAGNOSTIC
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
64 65
static void __dd_debug
	    __P((DB_ENV *, locker_info *, u_int32_t *, u_int32_t, u_int32_t));
66 67
#endif

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
68
/*
jimw@mysql.com's avatar
jimw@mysql.com committed
69 70
 * __lock_detect_pp --
 *	DB_ENV->lock_detect pre/post processing.
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
71
 *
jimw@mysql.com's avatar
jimw@mysql.com committed
72
 * PUBLIC: int __lock_detect_pp __P((DB_ENV *, u_int32_t, u_int32_t, int *));
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
73
 */
74
int
jimw@mysql.com's avatar
jimw@mysql.com committed
75
__lock_detect_pp(dbenv, flags, atype, abortp)
76 77 78 79
	DB_ENV *dbenv;
	u_int32_t flags, atype;
	int *abortp;
{
jimw@mysql.com's avatar
jimw@mysql.com committed
80 81
	DB_THREAD_INFO *ip;
	int ret;
82 83

	PANIC_CHECK(dbenv);
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
84 85 86 87 88 89 90 91 92 93
	ENV_REQUIRES_CONFIG(dbenv,
	    dbenv->lk_handle, "DB_ENV->lock_detect", DB_INIT_LOCK);

	/* Validate arguments. */
	if ((ret = __db_fchk(dbenv, "DB_ENV->lock_detect", flags, 0)) != 0)
		return (ret);
	switch (atype) {
	case DB_LOCK_DEFAULT:
	case DB_LOCK_EXPIRE:
	case DB_LOCK_MAXLOCKS:
jimw@mysql.com's avatar
jimw@mysql.com committed
94
	case DB_LOCK_MAXWRITE:
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
95 96 97 98 99 100 101 102 103 104 105 106
	case DB_LOCK_MINLOCKS:
	case DB_LOCK_MINWRITE:
	case DB_LOCK_OLDEST:
	case DB_LOCK_RANDOM:
	case DB_LOCK_YOUNGEST:
		break;
	default:
		__db_err(dbenv,
	    "DB_ENV->lock_detect: unknown deadlock detection mode specified");
		return (EINVAL);
	}

jimw@mysql.com's avatar
jimw@mysql.com committed
107 108 109
	ENV_ENTER(dbenv, ip);
	REPLICATION_WRAP(dbenv, (__lock_detect(dbenv, atype, abortp)), ret);
	ENV_LEAVE(dbenv, ip);
jimw@mysql.com's avatar
jimw@mysql.com committed
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
	return (ret);
}

/*
 * __lock_detect --
 *	DB_ENV->lock_detect.
 *
 * PUBLIC: int __lock_detect __P((DB_ENV *, u_int32_t, int *));
 */
int
__lock_detect(dbenv, atype, abortp)
	DB_ENV *dbenv;
	u_int32_t atype;
	int *abortp;
{
	DB_LOCKREGION *region;
	DB_LOCKTAB *lt;
	db_timeval_t now;
	locker_info *idmap;
	u_int32_t *bitmap, *copymap, **deadp, **free_me, *tmpmap;
	u_int32_t i, cid, keeper, killid, limit, nalloc, nlockers;
	u_int32_t lock_max, txn_max;
jimw@mysql.com's avatar
jimw@mysql.com committed
132
	int ret, status;
jimw@mysql.com's avatar
jimw@mysql.com committed
133

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
134 135 136 137 138 139 140 141
	/*
	 * If this environment is a replication client, then we must use the
	 * MINWRITE detection discipline.
	 */
	if (__rep_is_client(dbenv))
		atype = DB_LOCK_MINWRITE;

	free_me = NULL;
142 143 144 145 146 147

	lt = dbenv->lk_handle;
	if (abortp != NULL)
		*abortp = 0;

	/* Check if a detector run is necessary. */
jimw@mysql.com's avatar
jimw@mysql.com committed
148
	LOCK_SYSTEM_LOCK(dbenv);
149

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
150 151 152
	/* Make a pass only if auto-detect would run. */
	region = lt->reginfo.primary;

jimw@mysql.com's avatar
jimw@mysql.com committed
153 154 155 156
	LOCK_SET_TIME_INVALID(&now);
	if (region->need_dd == 0 &&
	     (!LOCK_TIME_ISVALID(&region->next_timeout) ||
	     !__lock_expired(dbenv, &now, &region->next_timeout))) {
jimw@mysql.com's avatar
jimw@mysql.com committed
157
		LOCK_SYSTEM_UNLOCK(dbenv);
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
158
		return (0);
159
	}
jimw@mysql.com's avatar
jimw@mysql.com committed
160 161
	if (region->need_dd == 0)
		atype = DB_LOCK_EXPIRE;
162

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
163 164 165
	/* Reset need_dd, so we know we've run the detector. */
	region->need_dd = 0;

166
	/* Build the waits-for bitmap. */
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
167 168
	ret = __dd_build(dbenv, atype, &bitmap, &nlockers, &nalloc, &idmap);
	lock_max = region->stat.st_cur_maxid;
jimw@mysql.com's avatar
jimw@mysql.com committed
169
	LOCK_SYSTEM_UNLOCK(dbenv);
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
170
	if (ret != 0 || atype == DB_LOCK_EXPIRE)
171 172
		return (ret);

jimw@mysql.com's avatar
jimw@mysql.com committed
173
	/* If there are no lockers, there are no deadlocks. */
174 175
	if (nlockers == 0)
		return (0);
jimw@mysql.com's avatar
jimw@mysql.com committed
176

177 178
#ifdef DIAGNOSTIC
	if (FLD_ISSET(dbenv->verbose, DB_VERB_WAITSFOR))
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
179
		__dd_debug(dbenv, idmap, bitmap, nlockers, nalloc);
180
#endif
jimw@mysql.com's avatar
jimw@mysql.com committed
181

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
182 183 184 185 186 187 188 189 190
	/* Now duplicate the bitmaps so we can verify deadlock participants. */
	if ((ret = __os_calloc(dbenv, (size_t)nlockers,
	    sizeof(u_int32_t) * nalloc, &copymap)) != 0)
		goto err;
	memcpy(copymap, bitmap, nlockers * sizeof(u_int32_t) * nalloc);

	if ((ret = __os_calloc(dbenv, sizeof(u_int32_t), nalloc, &tmpmap)) != 0)
		goto err1;

191
	/* Find a deadlock. */
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
192 193
	if ((ret =
	    __dd_find(dbenv, bitmap, idmap, nlockers, nalloc, &deadp)) != 0)
194 195
		return (ret);

jimw@mysql.com's avatar
jimw@mysql.com committed
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
	/*
	 * We need the cur_maxid from the txn region as well.  In order
	 * to avoid tricky synchronization between the lock and txn
	 * regions, we simply unlock the lock region and then lock the
	 * txn region.  This introduces a small window during which the
	 * transaction system could then wrap.  We're willing to return
	 * the wrong answer for "oldest" or "youngest" in those rare
	 * circumstances.
	 */
	if (TXN_ON(dbenv)) {
		TXN_SYSTEM_LOCK(dbenv);
		txn_max = ((DB_TXNREGION *)((DB_TXNMGR *)
		    dbenv->tx_handle)->reginfo.primary)->cur_maxid;
		TXN_SYSTEM_UNLOCK(dbenv);
	} else
		txn_max = TXN_MAXIMUM;

213 214 215 216 217
	killid = BAD_KILLID;
	free_me = deadp;
	for (; *deadp != NULL; deadp++) {
		if (abortp != NULL)
			++*abortp;
jimw@mysql.com's avatar
jimw@mysql.com committed
218
		killid = (u_int32_t)(*deadp - bitmap) / nalloc;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
219
		limit = killid;
220

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
221
		/*
jimw@mysql.com's avatar
jimw@mysql.com committed
222 223 224 225 226 227 228 229 230 231 232 233
		 * There are cases in which our general algorithm will
		 * fail.  Returning 1 from verify indicates that the
		 * particular locker is not only involved in a deadlock,
		 * but that killing him will allow others to make forward
		 * progress.  Unfortunately, there are cases where we need
		 * to abort someone, but killing them will not necessarily
		 * ensure forward progress (imagine N readers all trying to
		 * acquire a write lock).
		 * killid is only set to lockers that pass the db_verify test.
		 * keeper will hold the best candidate even if it does
		 * not pass db_verify.  Once we fill in killid then we do
		 * not need a keeper, but we keep updating it anyway.
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
234
		 */
jimw@mysql.com's avatar
jimw@mysql.com committed
235 236 237 238 239 240 241 242 243 244

		keeper = idmap[killid].in_abort == 0 ? killid : BAD_KILLID;
		if (keeper == BAD_KILLID ||
		    __dd_verify(idmap, *deadp,
		    tmpmap, copymap, nlockers, nalloc, keeper) == 0)
			killid = BAD_KILLID;

		if (killid != BAD_KILLID &&
		    (atype == DB_LOCK_DEFAULT || atype == DB_LOCK_RANDOM))
			goto dokill;
245

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
246
		/*
jimw@mysql.com's avatar
jimw@mysql.com committed
247 248 249 250 251 252 253 254 255 256
		 * Start with the id that we know is deadlocked, then examine
		 * all other set bits and see if any are a better candidate
		 * for abortion and they are genuinely part of the deadlock.
		 * The definition of "best":
		 *	MAXLOCKS: maximum count
		 *	MAXWRITE: maximum write count
		 *	MINLOCKS: minimum count
		 *	MINWRITE: minimum write count
		 *	OLDEST: smallest id
		 *	YOUNGEST: largest id
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
257
		 */
jimw@mysql.com's avatar
jimw@mysql.com committed
258
		for (i = (limit + 1) % nlockers;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
259 260
		    i != limit;
		    i = (i + 1) % nlockers) {
jimw@mysql.com's avatar
jimw@mysql.com committed
261
			if (!ISSET_MAP(*deadp, i) || idmap[i].in_abort)
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
262
				continue;
jimw@mysql.com's avatar
jimw@mysql.com committed
263 264 265 266 267 268 269 270 271 272 273 274 275 276

			/*
			 * Determine if we have a verified candidate
			 * in killid, if not then compare with the
			 * non-verified candidate in keeper.
			 */
			if (killid == BAD_KILLID) {
				if (keeper == BAD_KILLID)
					goto use_next;
				else
					cid = keeper;
			} else
				cid = killid;

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
277 278
			switch (atype) {
			case DB_LOCK_OLDEST:
jimw@mysql.com's avatar
jimw@mysql.com committed
279
				if (__dd_isolder(idmap[cid].id,
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
280 281 282 283 284
				    idmap[i].id, lock_max, txn_max))
					continue;
				break;
			case DB_LOCK_YOUNGEST:
				if (__dd_isolder(idmap[i].id,
jimw@mysql.com's avatar
jimw@mysql.com committed
285
				    idmap[cid].id, lock_max, txn_max))
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
286 287 288
					continue;
				break;
			case DB_LOCK_MAXLOCKS:
jimw@mysql.com's avatar
jimw@mysql.com committed
289 290 291 292 293
				if (idmap[i].count < idmap[cid].count)
					continue;
				break;
			case DB_LOCK_MAXWRITE:
				if (idmap[i].count < idmap[cid].count)
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
294 295 296 297
					continue;
				break;
			case DB_LOCK_MINLOCKS:
			case DB_LOCK_MINWRITE:
jimw@mysql.com's avatar
jimw@mysql.com committed
298
				if (idmap[i].count > idmap[cid].count)
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
299 300
					continue;
				break;
jimw@mysql.com's avatar
jimw@mysql.com committed
301 302 303 304
			case DB_LOCK_DEFAULT:
			case DB_LOCK_RANDOM:
				goto dokill;

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
305 306 307 308 309
			default:
				killid = BAD_KILLID;
				ret = EINVAL;
				goto dokill;
			}
jimw@mysql.com's avatar
jimw@mysql.com committed
310 311

use_next:		keeper = i;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
312 313 314
			if (__dd_verify(idmap, *deadp,
			    tmpmap, copymap, nlockers, nalloc, i))
				killid = i;
315 316
		}

jimw@mysql.com's avatar
jimw@mysql.com committed
317 318 319 320 321 322 323 324 325 326 327 328 329
dokill:		if (killid == BAD_KILLID) {
			if (keeper == BAD_KILLID)
				/*
				 * It's conceivable that under XA, the
				 * locker could have gone away.
				 */
				continue;
			else {
				/*
				 * Removing a single locker will not
				 * break the deadlock, signal to run
				 * detection again.
				 */
jimw@mysql.com's avatar
jimw@mysql.com committed
330
				LOCK_SYSTEM_LOCK(dbenv);
jimw@mysql.com's avatar
jimw@mysql.com committed
331
				region->need_dd = 1;
jimw@mysql.com's avatar
jimw@mysql.com committed
332
				LOCK_SYSTEM_UNLOCK(dbenv);
jimw@mysql.com's avatar
jimw@mysql.com committed
333 334
				killid = keeper;
			}
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
335 336
		}

337
		/* Kill the locker with lockid idmap[killid]. */
jimw@mysql.com's avatar
jimw@mysql.com committed
338 339 340 341 342 343 344 345 346
		if ((ret = __dd_abort(dbenv, &idmap[killid], &status)) != 0)
			break;

		/*
		 * It's possible that the lock was already aborted; this isn't
		 * necessarily a problem, so do not treat it as an error.
		 */
		if (status != 0) {
			if (status != DB_ALREADY_ABORTED)
347 348 349 350
				__db_err(dbenv,
				    "warning: unable to abort locker %lx",
				    (u_long)idmap[killid].id);
		} else if (FLD_ISSET(dbenv->verbose, DB_VERB_DEADLOCK))
jimw@mysql.com's avatar
jimw@mysql.com committed
351
			__db_msg(dbenv,
352 353
			    "Aborting locker %lx", (u_long)idmap[killid].id);
	}
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
354 355 356 357 358 359 360
	__os_free(dbenv, tmpmap);
err1:	__os_free(dbenv, copymap);

err:	if (free_me != NULL)
		__os_free(dbenv, free_me);
	__os_free(dbenv, bitmap);
	__os_free(dbenv, idmap);
361 362 363 364 365 366 367 368 369

	return (ret);
}

/*
 * ========================================================================
 * Utilities
 */

jimw@mysql.com's avatar
jimw@mysql.com committed
370
#define	DD_INVALID_ID	((u_int32_t) -1)
371 372

static int
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
373
__dd_build(dbenv, atype, bmp, nlockers, allocp, idmap)
374
	DB_ENV *dbenv;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
375
	u_int32_t atype, **bmp, *nlockers, *allocp;
376 377 378 379 380 381 382 383
	locker_info **idmap;
{
	struct __db_lock *lp;
	DB_LOCKER *lip, *lockerp, *child;
	DB_LOCKOBJ *op, *lo;
	DB_LOCKREGION *region;
	DB_LOCKTAB *lt;
	locker_info *id_array;
jimw@mysql.com's avatar
jimw@mysql.com committed
384
	db_timeval_t now, min_timeout;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
385
	u_int32_t *bitmap, count, dd, *entryp, id, ndx, nentries, *tmpmap;
386
	u_int8_t *pptr;
jimw@mysql.com's avatar
jimw@mysql.com committed
387
	int is_first, ret;
388 389 390

	lt = dbenv->lk_handle;
	region = lt->reginfo.primary;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
391
	LOCK_SET_TIME_INVALID(&now);
jimw@mysql.com's avatar
jimw@mysql.com committed
392
	LOCK_SET_TIME_MAX(&min_timeout);
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
393 394

	/*
jimw@mysql.com's avatar
jimw@mysql.com committed
395 396 397 398 399 400
	 * While we always check for expired timeouts, if we are called with
	 * DB_LOCK_EXPIRE, then we are only checking for timeouts (i.e., not
	 * doing deadlock detection at all).  If we aren't doing real deadlock
	 * detection, then we can skip a significant, amount of the processing.
	 * In particular we do not build the conflict array and our caller
	 * needs to expect this.
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
401
	 */
jimw@mysql.com's avatar
jimw@mysql.com committed
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
	if (atype == DB_LOCK_EXPIRE) {
		for (op = SH_TAILQ_FIRST(&region->dd_objs, __db_lockobj);
		    op != NULL;
		    op = SH_TAILQ_NEXT(op, dd_links, __db_lockobj))
			for (lp = SH_TAILQ_FIRST(&op->waiters, __db_lock);
			    lp != NULL;
			    lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
				LOCKER_LOCK(lt, region, lp->holder, ndx);
				if ((ret = __lock_getlocker(lt,
				    lp->holder, ndx, 0, &lockerp)) != 0)
					continue;
				if (lp->status == DB_LSTAT_WAITING) {
					if (__lock_expired(dbenv,
					    &now, &lockerp->lk_expire)) {
						lp->status = DB_LSTAT_EXPIRED;
						MUTEX_UNLOCK(
						    dbenv, lp->mtx_lock);
						continue;
					}
					if (LOCK_TIME_GREATER(
					    &min_timeout, &lockerp->lk_expire))
						min_timeout =
						    lockerp->lk_expire;
				}
			}
		goto done;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
428
	}
429 430 431 432 433 434 435

	/*
	 * We'll check how many lockers there are, add a few more in for
	 * good measure and then allocate all the structures.  Then we'll
	 * verify that we have enough room when we go back in and get the
	 * mutex the second time.
	 */
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
436
retry:	count = region->stat.st_nlockers;
437 438 439 440 441 442
	if (count == 0) {
		*nlockers = 0;
		return (0);
	}

	if (FLD_ISSET(dbenv->verbose, DB_VERB_DEADLOCK))
jimw@mysql.com's avatar
jimw@mysql.com committed
443
		__db_msg(dbenv, "%lu lockers", (u_long)count);
444

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
445
	count += 20;
jimw@mysql.com's avatar
jimw@mysql.com committed
446
	nentries = (u_int32_t)DB_ALIGN(count, 32) / 32;
447 448 449 450 451 452 453 454 455 456 457 458 459 460

	/*
	 * Allocate enough space for a count by count bitmap matrix.
	 *
	 * XXX
	 * We can probably save the malloc's between iterations just
	 * reallocing if necessary because count grew by too much.
	 */
	if ((ret = __os_calloc(dbenv, (size_t)count,
	    sizeof(u_int32_t) * nentries, &bitmap)) != 0)
		return (ret);

	if ((ret = __os_calloc(dbenv,
	    sizeof(u_int32_t), nentries, &tmpmap)) != 0) {
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
461
		__os_free(dbenv, bitmap);
462 463 464 465 466
		return (ret);
	}

	if ((ret = __os_calloc(dbenv,
	    (size_t)count, sizeof(locker_info), &id_array)) != 0) {
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
467 468
		__os_free(dbenv, bitmap);
		__os_free(dbenv, tmpmap);
469 470 471 472 473 474
		return (ret);
	}

	/*
	 * Now go back in and actually fill in the matrix.
	 */
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
475 476 477 478
	if (region->stat.st_nlockers > count) {
		__os_free(dbenv, bitmap);
		__os_free(dbenv, tmpmap);
		__os_free(dbenv, id_array);
479 480 481 482 483 484
		goto retry;
	}

	/*
	 * First we go through and assign each locker a deadlock detector id.
	 */
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
485 486 487 488 489 490
	for (id = 0, lip = SH_TAILQ_FIRST(&region->lockers, __db_locker);
	    lip != NULL;
	    lip = SH_TAILQ_NEXT(lip, ulinks, __db_locker)) {
		if (lip->master_locker == INVALID_ROFF) {
			lip->dd_id = id++;
			id_array[lip->dd_id].id = lip->id;
jimw@mysql.com's avatar
jimw@mysql.com committed
491 492 493
			switch (atype) {
			case DB_LOCK_MINLOCKS:
			case DB_LOCK_MAXLOCKS:
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
494
				id_array[lip->dd_id].count = lip->nlocks;
jimw@mysql.com's avatar
jimw@mysql.com committed
495 496 497
				break;
			case DB_LOCK_MINWRITE:
			case DB_LOCK_MAXWRITE:
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
498
				id_array[lip->dd_id].count = lip->nwrites;
jimw@mysql.com's avatar
jimw@mysql.com committed
499
				break;
jimw@mysql.com's avatar
jimw@mysql.com committed
500 501
			default:
				break;
jimw@mysql.com's avatar
jimw@mysql.com committed
502 503 504
			}
			if (F_ISSET(lip, DB_LOCKER_INABORT))
				id_array[lip->dd_id].in_abort = 1;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
505 506 507
		} else
			lip->dd_id = DD_INVALID_ID;

508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
	}

	/*
	 * We only need consider objects that have waiters, so we use
	 * the list of objects with waiters (dd_objs) instead of traversing
	 * the entire hash table.  For each object, we traverse the waiters
	 * list and add an entry in the waitsfor matrix for each waiter/holder
	 * combination.
	 */
	for (op = SH_TAILQ_FIRST(&region->dd_objs, __db_lockobj);
	    op != NULL; op = SH_TAILQ_NEXT(op, dd_links, __db_lockobj)) {
		CLEAR_MAP(tmpmap, nentries);

		/*
		 * First we go through and create a bit map that
		 * represents all the holders of this object.
		 */
		for (lp = SH_TAILQ_FIRST(&op->holders, __db_lock);
		    lp != NULL;
		    lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
			LOCKER_LOCK(lt, region, lp->holder, ndx);
			if ((ret = __lock_getlocker(lt,
			    lp->holder, ndx, 0, &lockerp)) != 0)
				continue;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
532 533 534 535 536

			if (lockerp->dd_id == DD_INVALID_ID) {
				dd = ((DB_LOCKER *)R_ADDR(&lt->reginfo,
				    lockerp->master_locker))->dd_id;
				lockerp->dd_id = dd;
jimw@mysql.com's avatar
jimw@mysql.com committed
537 538 539
				switch (atype) {
				case DB_LOCK_MINLOCKS:
				case DB_LOCK_MAXLOCKS:
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
540
					id_array[dd].count += lockerp->nlocks;
jimw@mysql.com's avatar
jimw@mysql.com committed
541 542 543
					break;
				case DB_LOCK_MINWRITE:
				case DB_LOCK_MAXWRITE:
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
544
					id_array[dd].count += lockerp->nwrites;
jimw@mysql.com's avatar
jimw@mysql.com committed
545
					break;
jimw@mysql.com's avatar
jimw@mysql.com committed
546 547
				default:
					break;
jimw@mysql.com's avatar
jimw@mysql.com committed
548 549 550
				}
				if (F_ISSET(lockerp, DB_LOCKER_INABORT))
					id_array[dd].in_abort = 1;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
551 552

			} else
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
				dd = lockerp->dd_id;
			id_array[dd].valid = 1;

			/*
			 * If the holder has already been aborted, then
			 * we should ignore it for now.
			 */
			if (lp->status == DB_LSTAT_HELD)
				SET_MAP(tmpmap, dd);
		}

		/*
		 * Next, for each waiter, we set its row in the matrix
		 * equal to the map of holders we set up above.
		 */
		for (is_first = 1,
		    lp = SH_TAILQ_FIRST(&op->waiters, __db_lock);
		    lp != NULL;
		    is_first = 0,
		    lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
			LOCKER_LOCK(lt, region, lp->holder, ndx);
			if ((ret = __lock_getlocker(lt,
			    lp->holder, ndx, 0, &lockerp)) != 0)
				continue;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
577 578 579 580
			if (lp->status == DB_LSTAT_WAITING) {
				if (__lock_expired(dbenv,
				    &now, &lockerp->lk_expire)) {
					lp->status = DB_LSTAT_EXPIRED;
jimw@mysql.com's avatar
jimw@mysql.com committed
581
					MUTEX_UNLOCK(dbenv, lp->mtx_lock);
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
582 583
					continue;
				}
jimw@mysql.com's avatar
jimw@mysql.com committed
584 585 586
				if (LOCK_TIME_GREATER(
				    &min_timeout, &lockerp->lk_expire))
					min_timeout = lockerp->lk_expire;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
587 588 589 590 591 592
			}

			if (lockerp->dd_id == DD_INVALID_ID) {
				dd = ((DB_LOCKER *)R_ADDR(&lt->reginfo,
				    lockerp->master_locker))->dd_id;
				lockerp->dd_id = dd;
jimw@mysql.com's avatar
jimw@mysql.com committed
593 594 595
				switch (atype) {
				case DB_LOCK_MINLOCKS:
				case DB_LOCK_MAXLOCKS:
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
596
					id_array[dd].count += lockerp->nlocks;
jimw@mysql.com's avatar
jimw@mysql.com committed
597 598 599
					break;
				case DB_LOCK_MINWRITE:
				case DB_LOCK_MAXWRITE:
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
600
					id_array[dd].count += lockerp->nwrites;
jimw@mysql.com's avatar
jimw@mysql.com committed
601
					break;
jimw@mysql.com's avatar
jimw@mysql.com committed
602 603
				default:
					break;
jimw@mysql.com's avatar
jimw@mysql.com committed
604
				}
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
605
			} else
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624
				dd = lockerp->dd_id;
			id_array[dd].valid = 1;

			/*
			 * If the transaction is pending abortion, then
			 * ignore it on this iteration.
			 */
			if (lp->status != DB_LSTAT_WAITING)
				continue;

			entryp = bitmap + (nentries * dd);
			OR_MAP(entryp, tmpmap, nentries);
			/*
			 * If this is the first waiter on the queue,
			 * then we remove the waitsfor relationship
			 * with oneself.  However, if it's anywhere
			 * else on the queue, then we have to keep
			 * it and we have an automatic deadlock.
			 */
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
625 626 627
			if (is_first) {
				if (ISSET_MAP(entryp, dd))
					id_array[dd].self_wait = 1;
628
				CLR_MAP(entryp, dd);
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
629
			}
630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654
		}
	}

	/* Now for each locker; record its last lock. */
	for (id = 0; id < count; id++) {
		if (!id_array[id].valid)
			continue;
		LOCKER_LOCK(lt, region, id_array[id].id, ndx);
		if ((ret = __lock_getlocker(lt,
		    id_array[id].id, ndx, 0, &lockerp)) != 0) {
			__db_err(dbenv,
			    "No locks for locker %lu", (u_long)id_array[id].id);
			continue;
		}

		/*
		 * If this is a master transaction, try to
		 * find one of its children's locks first,
		 * as they are probably more recent.
		 */
		child = SH_LIST_FIRST(&lockerp->child_locker, __db_locker);
		if (child != NULL) {
			do {
				lp = SH_LIST_FIRST(&child->heldby, __db_lock);
				if (lp != NULL &&
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
655
				    lp->status == DB_LSTAT_WAITING) {
656 657 658 659 660 661 662 663 664 665
					id_array[id].last_locker_id = child->id;
					goto get_lock;
				}
				child = SH_LIST_NEXT(
				    child, child_link, __db_locker);
			} while (child != NULL);
		}
		lp = SH_LIST_FIRST(&lockerp->heldby, __db_lock);
		if (lp != NULL) {
			id_array[id].last_locker_id = lockerp->id;
jimw@mysql.com's avatar
jimw@mysql.com committed
666 667
get_lock:		id_array[id].last_lock = R_OFFSET(&lt->reginfo, lp);
			id_array[id].last_obj = lp->obj;
668 669 670 671 672 673 674 675 676 677
			lo = (DB_LOCKOBJ *)((u_int8_t *)lp + lp->obj);
			pptr = SH_DBT_PTR(&lo->lockobj);
			if (lo->lockobj.size >= sizeof(db_pgno_t))
				memcpy(&id_array[id].pgno,
				    pptr, sizeof(db_pgno_t));
			else
				id_array[id].pgno = 0;
		}
	}

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
678
	/*
jimw@mysql.com's avatar
jimw@mysql.com committed
679
	 * Pass complete, reset the deadlock detector bit.
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
680
	 */
jimw@mysql.com's avatar
jimw@mysql.com committed
681
	region->need_dd = 0;
682 683 684 685 686 687 688 689

	/*
	 * Now we can release everything except the bitmap matrix that we
	 * created.
	 */
	*nlockers = id;
	*idmap = id_array;
	*bmp = bitmap;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
690 691
	*allocp = nentries;
	__os_free(dbenv, tmpmap);
jimw@mysql.com's avatar
jimw@mysql.com committed
692 693 694 695 696 697
done:	if (LOCK_TIME_ISVALID(&region->next_timeout)) {
		if (LOCK_TIME_ISMAX(&min_timeout))
			LOCK_SET_TIME_INVALID(&region->next_timeout);
		else
			region->next_timeout = min_timeout;
	}
698 699 700 701
	return (0);
}

static int
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
702
__dd_find(dbenv, bmp, idmap, nlockers, nalloc, deadp)
703
	DB_ENV *dbenv;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
704
	u_int32_t *bmp, nlockers, nalloc;
705 706 707
	locker_info *idmap;
	u_int32_t ***deadp;
{
jimw@mysql.com's avatar
jimw@mysql.com committed
708 709 710
	u_int32_t i, j, k, *mymap, *tmpmap, **retp;
	u_int ndead, ndeadalloc;
	int ret;
711 712 713 714 715 716 717

#undef	INITIAL_DEAD_ALLOC
#define	INITIAL_DEAD_ALLOC	8

	ndeadalloc = INITIAL_DEAD_ALLOC;
	ndead = 0;
	if ((ret = __os_malloc(dbenv,
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
718
	    ndeadalloc * sizeof(u_int32_t *), &retp)) != 0)
719 720 721 722 723 724
		return (ret);

	/*
	 * For each locker, OR in the bits from the lockers on which that
	 * locker is waiting.
	 */
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
725
	for (mymap = bmp, i = 0; i < nlockers; i++, mymap += nalloc) {
726 727 728 729 730 731 732
		if (!idmap[i].valid)
			continue;
		for (j = 0; j < nlockers; j++) {
			if (!ISSET_MAP(mymap, j))
				continue;

			/* Find the map for this bit. */
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
733 734
			tmpmap = bmp + (nalloc * j);
			OR_MAP(mymap, tmpmap, nalloc);
735 736 737 738 739 740 741 742 743 744 745 746
			if (!ISSET_MAP(mymap, i))
				continue;

			/* Make sure we leave room for NULL. */
			if (ndead + 2 >= ndeadalloc) {
				ndeadalloc <<= 1;
				/*
				 * If the alloc fails, then simply return the
				 * deadlocks that we already have.
				 */
				if (__os_realloc(dbenv,
				    ndeadalloc * sizeof(u_int32_t),
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
747
				    &retp) != 0) {
748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767
					retp[ndead] = NULL;
					*deadp = retp;
					return (0);
				}
			}
			retp[ndead++] = mymap;

			/* Mark all participants in this deadlock invalid. */
			for (k = 0; k < nlockers; k++)
				if (ISSET_MAP(mymap, k))
					idmap[k].valid = 0;
			break;
		}
	}
	retp[ndead] = NULL;
	*deadp = retp;
	return (0);
}

static int
jimw@mysql.com's avatar
jimw@mysql.com committed
768
__dd_abort(dbenv, info, statusp)
769 770
	DB_ENV *dbenv;
	locker_info *info;
jimw@mysql.com's avatar
jimw@mysql.com committed
771
	int *statusp;
772 773 774 775 776 777 778 779 780
{
	struct __db_lock *lockp;
	DB_LOCKER *lockerp;
	DB_LOCKOBJ *sh_obj;
	DB_LOCKREGION *region;
	DB_LOCKTAB *lt;
	u_int32_t ndx;
	int ret;

jimw@mysql.com's avatar
jimw@mysql.com committed
781 782
	*statusp = 0;

783 784
	lt = dbenv->lk_handle;
	region = lt->reginfo.primary;
jimw@mysql.com's avatar
jimw@mysql.com committed
785
	ret = 0;
786

jimw@mysql.com's avatar
jimw@mysql.com committed
787
	LOCK_SYSTEM_LOCK(dbenv);
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
788

jimw@mysql.com's avatar
jimw@mysql.com committed
789
	/*
jimw@mysql.com's avatar
jimw@mysql.com committed
790 791
	 * Get the locker.  If it's gone or was aborted while we were
	 * detecting, return that.
jimw@mysql.com's avatar
jimw@mysql.com committed
792
	 */
793 794
	LOCKER_LOCK(lt, region, info->last_locker_id, ndx);
	if ((ret = __lock_getlocker(lt,
jimw@mysql.com's avatar
jimw@mysql.com committed
795 796 797 798
	    info->last_locker_id, ndx, 0, &lockerp)) != 0)
		goto err;
	if (lockerp == NULL || F_ISSET(lockerp, DB_LOCKER_INABORT)) {
		*statusp = DB_ALREADY_ABORTED;
799 800 801
		goto out;
	}

jimw@mysql.com's avatar
jimw@mysql.com committed
802
	/*
jimw@mysql.com's avatar
jimw@mysql.com committed
803 804
	 * Find the locker's last lock.  It is possible for this lock to have
	 * been freed, either though a timeout or another detector run.
jimw@mysql.com's avatar
jimw@mysql.com committed
805
	 */
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
806
	if ((lockp = SH_LIST_FIRST(&lockerp->heldby, __db_lock)) == NULL) {
jimw@mysql.com's avatar
jimw@mysql.com committed
807
		*statusp = DB_ALREADY_ABORTED;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
808 809 810
		goto out;
	}
	if (R_OFFSET(&lt->reginfo, lockp) != info->last_lock ||
jimw@mysql.com's avatar
jimw@mysql.com committed
811 812
	    lockp->holder != lockerp->id ||
	    lockp->obj != info->last_obj || lockp->status != DB_LSTAT_WAITING) {
jimw@mysql.com's avatar
jimw@mysql.com committed
813
		*statusp = DB_ALREADY_ABORTED;
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832
		goto out;
	}

	sh_obj = (DB_LOCKOBJ *)((u_int8_t *)lockp + lockp->obj);

	/* Abort lock, take it off list, and wake up this lock. */
	SHOBJECT_LOCK(lt, region, sh_obj, ndx);
	lockp->status = DB_LSTAT_ABORTED;
	SH_TAILQ_REMOVE(&sh_obj->waiters, lockp, links, __db_lock);

	/*
	 * Either the waiters list is now empty, in which case we remove
	 * it from dd_objs, or it is not empty, in which case we need to
	 * do promotion.
	 */
	if (SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock) == NULL)
		SH_TAILQ_REMOVE(&region->dd_objs,
		    sh_obj, dd_links, __db_lockobj);
	else
jimw@mysql.com's avatar
jimw@mysql.com committed
833 834
		ret = __lock_promote(lt, sh_obj, NULL, 0);
	MUTEX_UNLOCK(dbenv, lockp->mtx_lock);
835

ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
836
	region->stat.st_ndeadlocks++;
jimw@mysql.com's avatar
jimw@mysql.com committed
837 838
err:
out:	LOCK_SYSTEM_UNLOCK(dbenv);
839 840 841 842 843
	return (ret);
}

#ifdef DIAGNOSTIC
static void
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
844
__dd_debug(dbenv, idmap, bitmap, nlockers, nalloc)
845 846
	DB_ENV *dbenv;
	locker_info *idmap;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
847
	u_int32_t *bitmap, nlockers, nalloc;
848
{
jimw@mysql.com's avatar
jimw@mysql.com committed
849
	DB_MSGBUF mb;
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
850
	u_int32_t i, j, *mymap;
851

jimw@mysql.com's avatar
jimw@mysql.com committed
852
	DB_MSGBUF_INIT(&mb);
853

jimw@mysql.com's avatar
jimw@mysql.com committed
854
	__db_msg(dbenv, "Waitsfor array\nWaiter:\tWaiting on:");
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
855
	for (mymap = bitmap, i = 0; i < nlockers; i++, mymap += nalloc) {
856 857
		if (!idmap[i].valid)
			continue;
jimw@mysql.com's avatar
jimw@mysql.com committed
858 859

		__db_msgadd(dbenv, &mb,				/* Waiter. */
860 861 862
		    "%lx/%lu:\t", (u_long)idmap[i].id, (u_long)idmap[i].pgno);
		for (j = 0; j < nlockers; j++)
			if (ISSET_MAP(mymap, j))
jimw@mysql.com's avatar
jimw@mysql.com committed
863 864 865 866
				__db_msgadd(dbenv,
				    &mb, " %lx", (u_long)idmap[j].id);
		__db_msgadd(dbenv, &mb, " %lu", (u_long)idmap[i].last_lock);
		DB_MSGBUF_FLUSH(dbenv, &mb);
867 868 869
	}
}
#endif
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
870 871 872 873 874

/*
 * Given a bitmap that contains a deadlock, verify that the bit
 * specified in the which parameter indicates a transaction that
 * is actually deadlocked.  Return 1 if really deadlocked, 0 otherwise.
jimw@mysql.com's avatar
jimw@mysql.com committed
875 876 877 878 879 880
 * deadmap  --  the array that identified the deadlock.
 * tmpmap   --  a copy of the initial bitmaps from the dd_build phase.
 * origmap  --  a temporary bit map into which we can OR things.
 * nlockers --  the number of actual lockers under consideration.
 * nalloc   --  the number of words allocated for the bitmap.
 * which    --  the locker in question.
ram@mysql.r18.ru's avatar
ram@mysql.r18.ru committed
881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
 */
static int
__dd_verify(idmap, deadmap, tmpmap, origmap, nlockers, nalloc, which)
	locker_info *idmap;
	u_int32_t *deadmap, *tmpmap, *origmap;
	u_int32_t nlockers, nalloc, which;
{
	u_int32_t *tmap;
	u_int32_t j;
	int count;

	memset(tmpmap, 0, sizeof(u_int32_t) * nalloc);

	/*
	 * In order for "which" to be actively involved in
	 * the deadlock, removing him from the evaluation
	 * must remove the deadlock.  So, we OR together everyone
	 * except which; if all the participants still have their
	 * bits set, then the deadlock persists and which does
	 * not participate.  If the deadlock does not persist
	 * then "which" does participate.
	 */
	count = 0;
	for (j = 0; j < nlockers; j++) {
		if (!ISSET_MAP(deadmap, j) || j == which)
			continue;

		/* Find the map for this bit. */
		tmap = origmap + (nalloc * j);

		/*
		 * We special case the first waiter who is also a holder, so
		 * we don't automatically call that a deadlock.  However, if
		 * it really is a deadlock, we need the bit set now so that
		 * we treat the first waiter like other waiters.
		 */
		if (idmap[j].self_wait)
			SET_MAP(tmap, j);
		OR_MAP(tmpmap, tmap, nalloc);
		count++;
	}

	if (count == 1)
		return (1);

	/*
	 * Now check the resulting map and see whether
	 * all participants still have their bit set.
	 */
	for (j = 0; j < nlockers; j++) {
		if (!ISSET_MAP(deadmap, j) || j == which)
			continue;
		if (!ISSET_MAP(tmpmap, j))
			return (1);
	}
	return (0);
}

/*
 * __dd_isolder --
 *
 * Figure out the relative age of two lockers.  We make all lockers
 * older than all transactions, because that's how it's worked
 * historically (because lockers are lower ids).
 */
static int
__dd_isolder(a, b, lock_max, txn_max)
	u_int32_t	a, b;
	u_int32_t	lock_max, txn_max;
{
	u_int32_t max;

	/* Check for comparing lock-id and txnid. */
	if (a <= DB_LOCK_MAXID && b > DB_LOCK_MAXID)
		return (1);
	if (b <= DB_LOCK_MAXID && a > DB_LOCK_MAXID)
		return (0);

	/* In the same space; figure out which one. */
	max = txn_max;
	if (a <= DB_LOCK_MAXID)
		max = lock_max;

	/*
	 * We can't get a 100% correct ordering, because we don't know
	 * where the current interval started and if there were older
	 * lockers outside the interval.  We do the best we can.
	 */

	/*
	 * Check for a wrapped case with ids above max.
	 */
	if (a > max && b < max)
		return (1);
	if (b > max && a < max)
		return (0);

	return (a < b);
}