Commit 0293b3c7 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1866 refs [t:1866] Default oldest living txnid is now TXNID_NONE_LIVING (0)

Fix bug where assert was hit when transactions are off.
Cleaned up some code.

git-svn-id: file:///svn/toku/tokudb@13625 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8bc4aafe
...@@ -3314,7 +3314,6 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, TOKULOGGER logger) { ...@@ -3314,7 +3314,6 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, TOKULOGGER logger) {
cursor->current_in_omt = FALSE; cursor->current_in_omt = FALSE;
cursor->prefetching = FALSE; cursor->prefetching = FALSE;
cursor->oldest_living_xid = toku_logger_get_oldest_living_xid(logger); cursor->oldest_living_xid = toku_logger_get_oldest_living_xid(logger);
assert(cursor->oldest_living_xid != MAX_TXNID);
list_push(&brt->cursors, &cursor->cursors_link); list_push(&brt->cursors, &cursor->cursors_link);
int r = toku_omt_cursor_create(&cursor->omtcursor); int r = toku_omt_cursor_create(&cursor->omtcursor);
assert(r==0); assert(r==0);
......
...@@ -24,7 +24,7 @@ typedef const void *bytevec; ...@@ -24,7 +24,7 @@ typedef const void *bytevec;
typedef int64_t DISKOFF; /* Offset in a disk. -1 is the NULL pointer. */ typedef int64_t DISKOFF; /* Offset in a disk. -1 is the NULL pointer. */
typedef u_int64_t TXNID; typedef u_int64_t TXNID;
#define MAX_TXNID ((TXNID)UINT64_MAX) #define TXNID_NONE_LIVING ((TXNID)0)
typedef struct s_blocknum { int64_t b; } BLOCKNUM; // make a struct so that we will notice type problems. typedef struct s_blocknum { int64_t b; } BLOCKNUM; // make a struct so that we will notice type problems.
...@@ -38,6 +38,7 @@ typedef struct { ...@@ -38,6 +38,7 @@ typedef struct {
/* Make the LSN be a struct instead of an integer so that we get better type checking. */ /* Make the LSN be a struct instead of an integer so that we get better type checking. */
typedef struct __toku_lsn { u_int64_t lsn; } LSN; typedef struct __toku_lsn { u_int64_t lsn; } LSN;
#define ZERO_LSN ((LSN){0}) #define ZERO_LSN ((LSN){0})
#define MAX_LSN ((LSN){UINT64_MAX})
/* Make the FILEID a struct for the same reason. */ /* Make the FILEID a struct for the same reason. */
typedef struct __toku_fileid { u_int32_t fileid; } FILENUM; typedef struct __toku_fileid { u_int32_t fileid; } FILENUM;
......
...@@ -29,7 +29,7 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -29,7 +29,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
result->n_in_file=0; result->n_in_file=0;
result->directory=0; result->directory=0;
result->checkpoint_lsn=(LSN){0}; result->checkpoint_lsn=(LSN){0};
result->oldest_living_xid = MAX_TXNID; result->oldest_living_xid = TXNID_NONE_LIVING;
result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size
*resultp=result; *resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto died1; r = ml_init(&result->input_lock); if (r!=0) goto died1;
...@@ -813,25 +813,32 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) { ...@@ -813,25 +813,32 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
// get them into increasing order // get them into increasing order
qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare); qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare);
LSN oldest_live_txn_lsn={.lsn = toku_logger_get_oldest_living_xid(logger)}; LSN oldest_live_txn_lsn;
assert(oldest_live_txn_lsn.lsn != 0); {
TXNID oldest_living_xid = toku_logger_get_oldest_living_xid(logger);
if (oldest_living_xid == TXNID_NONE_LIVING)
oldest_live_txn_lsn = MAX_LSN;
else
oldest_live_txn_lsn.lsn = oldest_living_xid;
}
//printf("%s:%d Oldest txn is %lld\n", __FILE__, __LINE__, (long long)oldest_live_txn_lsn.lsn); //printf("%s:%d Oldest txn is %lld\n", __FILE__, __LINE__, (long long)oldest_live_txn_lsn.lsn);
// Now starting at the last one, look for archivable ones. // Now starting at the last one, look for archivable ones.
// Count the total number of bytes, because we have to return a single big array. (That's the BDB interface. Bleah...) // Count the total number of bytes, because we have to return a single big array. (That's the BDB interface. Bleah...)
LSN earliest_lsn_seen={(unsigned long long)(-1LL)}; LSN earliest_lsn_in_logfile={(unsigned long long)(-1LL)};
r = peek_at_log(logger, all_logs[all_n_logs-1], &earliest_lsn_seen); // try to find the lsn that's in the most recent log r = peek_at_log(logger, all_logs[all_n_logs-1], &earliest_lsn_in_logfile); // try to find the lsn that's in the most recent log
if ((earliest_lsn_seen.lsn <= logger->checkpoint_lsn.lsn)&& if ((earliest_lsn_in_logfile.lsn <= logger->checkpoint_lsn.lsn)&&
(earliest_lsn_seen.lsn <= oldest_live_txn_lsn.lsn)) { (earliest_lsn_in_logfile.lsn <= oldest_live_txn_lsn.lsn)) {
i=all_n_logs-1; i=all_n_logs-1;
} else { } else {
for (i=all_n_logs-2; i>=0; i--) { // start at all_n_logs-2 because we never archive the most recent log for (i=all_n_logs-2; i>=0; i--) { // start at all_n_logs-2 because we never archive the most recent log
r = peek_at_log(logger, all_logs[i], &earliest_lsn_seen); r = peek_at_log(logger, all_logs[i], &earliest_lsn_in_logfile);
if (r!=0) continue; // In case of error, just keep going if (r!=0) continue; // In case of error, just keep going
//printf("%s:%d file=%s firstlsn=%lld checkpoint_lsns={%lld %lld}\n", __FILE__, __LINE__, all_logs[i], (long long)earliest_lsn_seen.lsn, (long long)logger->checkpoint_lsns[0].lsn, (long long)logger->checkpoint_lsns[1].lsn); //printf("%s:%d file=%s firstlsn=%lld checkpoint_lsns={%lld %lld}\n", __FILE__, __LINE__, all_logs[i], (long long)earliest_lsn_in_logfile.lsn, (long long)logger->checkpoint_lsns[0].lsn, (long long)logger->checkpoint_lsns[1].lsn);
if ((earliest_lsn_seen.lsn <= logger->checkpoint_lsn.lsn)&& if ((earliest_lsn_in_logfile.lsn <= logger->checkpoint_lsn.lsn)&&
(earliest_lsn_seen.lsn <= oldest_live_txn_lsn.lsn)) { (earliest_lsn_in_logfile.lsn <= oldest_live_txn_lsn.lsn)) {
break; break;
} }
} }
......
...@@ -43,7 +43,6 @@ void toku_rollback_txn_close (TOKUTXN txn) { ...@@ -43,7 +43,6 @@ void toku_rollback_txn_close (TOKUTXN txn) {
} }
assert(txn->logger->oldest_living_xid <= txn->txnid64); assert(txn->logger->oldest_living_xid <= txn->txnid64);
assert(txn->logger->oldest_living_xid < MAX_TXNID);
if (txn->txnid64 == txn->logger->oldest_living_xid) { if (txn->txnid64 == txn->logger->oldest_living_xid) {
TOKULOGGER logger = txn->logger; TOKULOGGER logger = txn->logger;
...@@ -58,7 +57,7 @@ void toku_rollback_txn_close (TOKUTXN txn) { ...@@ -58,7 +57,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
else { else {
//No living transactions //No living transactions
assert(r==EINVAL); assert(r==EINVAL);
txn->logger->oldest_living_xid = MAX_TXNID; txn->logger->oldest_living_xid = TXNID_NONE_LIVING;
} }
} }
......
...@@ -12,21 +12,10 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log ...@@ -12,21 +12,10 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
if (result==0) if (result==0)
return errno; return errno;
int r; int r;
if (0) {
died0:
toku_logger_panic(logger, r);
toku_free(result);
return r;
}
r = toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0); r = toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
if (r!=0) goto died0; if (r!=0) goto died;
r = toku_omt_create(&result->open_brts); r = toku_omt_create(&result->open_brts);
if (r!=0) goto died0; if (r!=0) goto died;
if (0) {
died1:
toku_omt_destroy(&result->open_brts);
goto died0;
}
result->txnid64 = result->first_lsn.lsn; result->txnid64 = result->first_lsn.lsn;
XIDS parent_xids; XIDS parent_xids;
if (parent_tokutxn==NULL) if (parent_tokutxn==NULL)
...@@ -34,12 +23,7 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log ...@@ -34,12 +23,7 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
else else
parent_xids = parent_tokutxn->xids; parent_xids = parent_tokutxn->xids;
if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64))) if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64)))
goto died1; goto died;
if (0) {
died2:
xids_destroy(&result->xids);
goto died1;
}
result->logger = logger; result->logger = logger;
result->parent = parent_tokutxn; result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0; result->oldest_logentry = result->newest_logentry = 0;
...@@ -47,17 +31,16 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log ...@@ -47,17 +31,16 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
result->rollentry_arena = memarena_create(); result->rollentry_arena = memarena_create();
if (toku_omt_size(logger->live_txns) == 0) { if (toku_omt_size(logger->live_txns) == 0) {
assert(logger->oldest_living_xid == MAX_TXNID); assert(logger->oldest_living_xid == TXNID_NONE_LIVING);
logger->oldest_living_xid = result->txnid64; logger->oldest_living_xid = result->txnid64;
} }
assert(logger->oldest_living_xid < MAX_TXNID);
assert(logger->oldest_living_xid <= result->txnid64); assert(logger->oldest_living_xid <= result->txnid64);
{ {
//Add txn to list (omt) of live transactions //Add txn to list (omt) of live transactions
u_int32_t idx; u_int32_t idx;
r = toku_omt_insert(logger->live_txns, result, find_xid, result, &idx); r = toku_omt_insert(logger->live_txns, result, find_xid, result, &idx);
if (r!=0) goto died2; if (r!=0) goto died;
if (logger->oldest_living_xid == result->txnid64) if (logger->oldest_living_xid == result->txnid64)
assert(idx == 0); assert(idx == 0);
...@@ -72,6 +55,9 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log ...@@ -72,6 +55,9 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
result->rollentry_filesize = 0; result->rollentry_filesize = 0;
*tokutxn = result; *tokutxn = result;
return 0; return 0;
died:
toku_logger_panic(logger, r);
return r;
} }
// Doesn't close the txn, just performs the commit operations. // Doesn't close the txn, just performs the commit operations.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment