Commit 9c938f87 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:5062] txnid becomes separate from lsn, and optimizations for read-only txns


git-svn-id: file:///svn/toku/tokudb@44591 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0826ebdd
......@@ -3035,22 +3035,30 @@ set_filenum_in_array(OMTVALUE hv, u_int32_t index, void*arrayv) {
}
static int
log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *extra) {
int r;
TOKUTXN txn = txnv;
TOKULOGGER logger = txn->logger;
FILENUMS open_filenums;
uint32_t num_filenums = toku_omt_size(txn->open_fts);
FILENUM array[num_filenums];
{
if (!txn->begin_was_logged) {
invariant(num_filenums == 0);
goto cleanup;
}
else {
CACHETABLE ct = extra;
ct->checkpoint_num_txns++;
}
open_filenums.num = num_filenums;
open_filenums.filenums = array;
//Fill in open_filenums
int r = toku_omt_iterate(txn->open_fts, set_filenum_in_array, array);
assert(r==0);
}
r = toku_omt_iterate(txn->open_fts, set_filenum_in_array, array);
invariant(r==0);
switch (toku_txn_get_state(txn)) {
case TOKUTXN_LIVE:{
int r = toku_log_xstillopen(logger, NULL, 0,
r = toku_log_xstillopen(logger, NULL, 0,
toku_txn_get_txnid(txn),
toku_txn_get_txnid(toku_logger_txn_parent(txn)),
txn->roll_info.rollentry_raw_count,
......@@ -3061,13 +3069,13 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
txn->roll_info.spilled_rollback_head,
txn->roll_info.spilled_rollback_tail,
txn->roll_info.current_rollback);
assert(r==0);
return 0;
lazy_assert_zero(r);
goto cleanup;
}
case TOKUTXN_PREPARING: {
TOKU_XA_XID xa_xid;
toku_txn_get_prepared_xa_xid(txn, &xa_xid);
int r = toku_log_xstillopenprepared(logger, NULL, 0,
r = toku_log_xstillopenprepared(logger, NULL, 0,
toku_txn_get_txnid(txn),
&xa_xid,
txn->roll_info.rollentry_raw_count,
......@@ -3078,8 +3086,8 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
txn->roll_info.spilled_rollback_head,
txn->roll_info.spilled_rollback_tail,
txn->roll_info.current_rollback);
assert(r==0);
return 0;
lazy_assert_zero(r);
goto cleanup;
}
case TOKUTXN_RETIRED:
case TOKUTXN_COMMITTING:
......@@ -3089,6 +3097,7 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
}
// default is an error
assert(0);
cleanup:
return 0;
}
......@@ -3133,7 +3142,9 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
// The checkpoint must be performed after the lock is acquired.
{
LSN begin_lsn={.lsn=-1}; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed.
int r = toku_log_begin_checkpoint(logger, &begin_lsn, 0, 0);
TXN_MANAGER mgr = toku_logger_get_txn_manager(logger);
TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
int r = toku_log_begin_checkpoint(logger, &begin_lsn, 0, 0, last_xid);
assert(r==0);
ct->lsn_of_checkpoint_in_progress = begin_lsn;
}
......@@ -3153,11 +3164,10 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
}
// Log all the open transactions MUST BE AFTER OPEN FILES
{
ct->checkpoint_num_txns = toku_txn_manager_num_live_txns(logger->txn_manager);
int r = toku_txn_manager_iter_over_live_txns(
logger->txn_manager,
log_open_txn,
NULL
ct
);
assert(r==0);
}
......@@ -3343,7 +3353,7 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
if (logger) {
int r = toku_log_end_checkpoint(logger, NULL,
1, // want the end_checkpoint to be fsync'd
ct->lsn_of_checkpoint_in_progress.lsn,
ct->lsn_of_checkpoint_in_progress,
0,
ct->checkpoint_num_files,
ct->checkpoint_num_txns);
......
......@@ -2564,7 +2564,7 @@ toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_iname,
r = toku_logger_save_rollback_load(txn, old_filenum, &new_iname_bs);
if (r==0 && do_log && logger) {
TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_load(logger, load_lsn, do_fsync, xid, old_filenum, new_iname_bs);
r = toku_log_load(logger, load_lsn, do_fsync, txn, xid, old_filenum, new_iname_bs);
}
return r;
}
......@@ -2585,7 +2585,7 @@ toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_
if ( r==0 && do_log && logger) {
TXNID xid = toku_txn_get_txnid(txn);
// write to the recovery log
r = toku_log_hot_index(logger, hot_index_lsn, do_fsync, xid, filenums);
r = toku_log_hot_index(logger, hot_index_lsn, do_fsync, txn, xid, filenums);
}
return r;
}
......@@ -2648,7 +2648,7 @@ toku_ft_log_put (TOKUTXN txn, FT_HANDLE brt, const DBT *key, const DBT *val) {
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn);
// if (type == FT_INSERT)
r = toku_log_enq_insert(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->ft->cf), xid, keybs, valbs);
r = toku_log_enq_insert(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(brt->ft->cf), xid, keybs, valbs);
// else
// r = toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->ft->cf), xid, keybs, valbs);
}
......@@ -2677,7 +2677,7 @@ toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int nu
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn);
FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE;
r = toku_log_enq_insert_multiple(logger, (LSN*)0, 0, src_filenum, filenums, xid, keybs, valbs);
r = toku_log_enq_insert_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs);
}
}
return r;
......@@ -2709,10 +2709,10 @@ toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, BOOL opls
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
if (type == FT_INSERT) {
r = toku_log_enq_insert(logger, (LSN*)0, 0, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs, valbs);
r = toku_log_enq_insert(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs, valbs);
}
else {
r = toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs, valbs);
r = toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs, valbs);
}
if (r!=0) return r;
}
......@@ -2756,7 +2756,7 @@ toku_ft_maybe_update(FT_HANDLE ft_h, const DBT *key, const DBT *update_function_
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING extrabs = {.len=update_function_extra->size,
.data=update_function_extra->data};
r = toku_log_enq_update(logger, NULL, 0,
r = toku_log_enq_update(logger, NULL, 0, txn,
toku_cachefile_filenum(ft_h->ft->cf),
xid, keybs, extrabs);
if (r != 0) { goto cleanup; }
......@@ -2795,7 +2795,7 @@ toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *update_function_extra,
ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
BYTESTRING extrabs = {.len=update_function_extra->size,
.data=update_function_extra->data};
r = toku_log_enq_updatebroadcast(logger, NULL, 0,
r = toku_log_enq_updatebroadcast(logger, NULL, 0, txn,
toku_cachefile_filenum(ft_h->ft->cf),
xid, extrabs, resetting);
if (r != 0) { goto cleanup; }
......@@ -2844,7 +2844,7 @@ toku_ft_log_del(TOKUTXN txn, FT_HANDLE brt, const DBT *key) {
if (logger && brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->ft->cf), xid, keybs);
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(brt->ft->cf), xid, keybs);
}
return r;
}
......@@ -2871,7 +2871,7 @@ toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int nu
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn);
FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE;
r = toku_log_enq_delete_multiple(logger, (LSN*)0, 0, src_filenum, filenums, xid, keybs, valbs);
r = toku_log_enq_delete_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs);
}
}
return r;
......@@ -2900,7 +2900,7 @@ toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LS
if (do_logging && logger &&
ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs);
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs);
if (r!=0) return r;
}
......@@ -3096,6 +3096,7 @@ toku_ft_change_descriptor(
TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_change_fdescriptor(
logger, NULL, 0,
txn,
toku_cachefile_filenum(ft_h->ft->cf),
xid,
old_desc_bs,
......
......@@ -138,7 +138,7 @@ ft_log_suppress_rollback_during_checkpoint (CACHEFILE cf, void *header_v) {
//Only log if useful.
TOKULOGGER logger = toku_cachefile_logger(cf);
FILENUM filenum = toku_cachefile_filenum (cf);
r = toku_log_suppress_rollback(logger, NULL, 0, filenum, xid);
r = toku_log_suppress_rollback(logger, NULL, 0, NULL, filenum, xid);
}
return r;
}
......@@ -823,7 +823,7 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKU
TXNID xid = toku_txn_get_txnid(txn);
toku_ft_suppress_rollbacks(new_ft, txn);
r = toku_log_suppress_rollback(txn->logger, NULL, 0, new_filenum, xid);
r = toku_log_suppress_rollback(txn->logger, NULL, 0, txn, new_filenum, xid);
assert_zero(r);
}
......
......@@ -25,7 +25,7 @@ enum ft_layout_version_e {
FT_LAYOUT_VERSION_17 = 17, // Dr. No: Add STAT64INFO_S to brt_header
FT_LAYOUT_VERSION_18 = 18, // Dr. No: Add HOT info to brt_header
FT_LAYOUT_VERSION_19 = 19, // Doofenshmirtz: Add compression method, highest_unused_msn_for_upgrade
FT_LAYOUT_VERSION_20 = 20, // Clayface: Add compression method to log_fcreate
FT_LAYOUT_VERSION_20 = 20, // Clayface: Add compression method to log_fcreate, mgr_last_xid after begin checkpoint
FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
......
......@@ -2065,12 +2065,12 @@ static int allocate_block (struct dbout *out, int64_t *ret_block_number)
result = errno;
out->n_translations_limit = old_n_translations_limit;
out->translation = old_translation;
goto cleanup;
}
}
if (result == 0) {
out->n_translations++;
*ret_block_number = block_number;
}
cleanup:
dbout_unlock(out);
return result;
}
......@@ -2247,7 +2247,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
out.translation[1].off = -1; // block 1 is the block translation, filled in later
out.translation[2].off = -1; // block 2 is the descriptor
seek_align(&out);
int64_t lblock;
int64_t lblock = 0; // make gcc --happy
result = allocate_block(&out, &lblock);
invariant(result == 0); // can not fail since translations reserved above
......
......@@ -40,6 +40,7 @@ typedef struct sub_block *SUB_BLOCK;
typedef struct ft *FT;
typedef struct ft_header *FT_HEADER;
typedef struct ft_options *FT_OPTIONS;
struct wbuf;
struct dbuf;
......@@ -47,7 +48,7 @@ typedef unsigned int ITEMLEN;
typedef const void *bytevec;
typedef int64_t DISKOFF; /* Offset in a disk. -1 is the NULL pointer. */
typedef u_int64_t TXNID;
typedef uint64_t TXNID;
#define TXNID_NONE_LIVING ((TXNID)0)
#define TXNID_NONE ((TXNID)0)
......
......@@ -152,6 +152,7 @@ struct tokutxn {
OMT live_root_txn_list; // the root txns live when the root ancestor (self if a root) started.
XIDS xids; // Represents the xid list
bool begin_was_logged;
// These are not read until a commit, prepare, or abort starts, and
// they're "monotonic" (only go false->true) during operation:
BOOL checkpoint_needed_before_commit;
......@@ -225,6 +226,10 @@ static inline int toku_logsizeof_BLOCKNUM (BLOCKNUM v __attribute__((__unused__)
return 8;
}
static inline int toku_logsizeof_LSN (LSN lsn __attribute__((__unused__))) {
return 8;
}
static inline int toku_logsizeof_TXNID (TXNID txnid __attribute__((__unused__))) {
return 8;
}
......
......@@ -33,6 +33,7 @@ static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) {
return 0;
}
static inline void toku_free_TXNID(TXNID txnid __attribute__((__unused__))) {}
static inline void toku_free_LSN(LSN lsn __attribute__((__unused__))) {}
static inline void toku_free_u_int64_t(u_int64_t u __attribute__((__unused__))) {}
static inline void toku_free_u_int32_t(u_int32_t u __attribute__((__unused__))) {}
static inline void toku_free_u_int8_t(u_int8_t u __attribute__((__unused__))) {}
......@@ -43,8 +44,6 @@ static inline void toku_free_XIDP(XIDP xidp) { toku_free(xidp); }
static inline void toku_free_BYTESTRING(BYTESTRING val) { toku_free(val.data); }
static inline void toku_free_FILENUMS(FILENUMS val) { toku_free(val.filenums); }
void toku_set_lsn_increment (uint64_t incr) __attribute__((__visibility__("default")));
int toku_maybe_upgrade_log (const char *env_dir, const char *log_dir, LSN * lsn_of_clean_shutdown, BOOL * upgrade_in_progress);
uint64_t toku_log_upgrade_get_footprint(void);
......
......@@ -17,6 +17,7 @@
#include <ctype.h>
#include <errno.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
......@@ -38,6 +39,7 @@ struct logtype {
char *name;
unsigned int command_and_flags;
struct field *fields;
bool needs_to_maybe_log_begin_txn;
};
// In the fields, don't mention the command, the LSN, the CRC or the trailing LEN.
......@@ -92,18 +94,18 @@ const struct logtype logtypes[] = {
#if 0 // no longer used, but reserve the type
{"local_txn_checkpoint", 'c', FA{{"TXNID", "xid", 0}, NULLFIELD}},
#endif
{"begin_checkpoint", 'x', FA{{"u_int64_t", "timestamp", 0}, NULLFIELD}},
{"end_checkpoint", 'X', FA{{"TXNID", "xid", 0}, // xid is LSN of begin_checkpoint
{"begin_checkpoint", 'x', FA{{"u_int64_t", "timestamp", 0}, {"TXNID", "last_xid", 0}, NULLFIELD}, false},
{"end_checkpoint", 'X', FA{{"LSN", "lsn_begin_checkpoint", 0},
{"u_int64_t", "timestamp", 0},
{"u_int32_t", "num_fassociate_entries", 0}, // how many files were checkpointed
{"u_int32_t", "num_xstillopen_entries", 0}, // how many txns were checkpointed
NULLFIELD}},
NULLFIELD}, false},
//TODO: #2037 Add dname
{"fassociate", 'f', FA{{"FILENUM", "filenum", 0},
{"u_int32_t", "treeflags", 0},
{"BYTESTRING", "iname", 0}, // pathname of file
{"u_int8_t", "unlink_on_close", 0},
NULLFIELD}},
NULLFIELD}, false},
//We do not use a TXNINFO struct since recovery log has
//FILENUMS and TOKUTXN has FTs (for open_fts)
{"xstillopen", 's', FA{{"TXNID", "xid", 0},
......@@ -116,7 +118,7 @@ const struct logtype logtypes[] = {
{"BLOCKNUM", "spilled_rollback_head", 0},
{"BLOCKNUM", "spilled_rollback_tail", 0},
{"BLOCKNUM", "current_rollback", 0},
NULLFIELD}}, // record all transactions
NULLFIELD}, false}, // record all transactions
// prepared txns need a gid
{"xstillopenprepared", 'p', FA{{"TXNID", "xid", 0},
{"XIDP", "xa_xid", 0}, // prepared transactions need a gid, and have no parentxid.
......@@ -128,15 +130,15 @@ const struct logtype logtypes[] = {
{"BLOCKNUM", "spilled_rollback_head", 0},
{"BLOCKNUM", "spilled_rollback_tail", 0},
{"BLOCKNUM", "current_rollback", 0},
NULLFIELD}}, // record all transactions
NULLFIELD}, false}, // record all transactions
{"suppress_rollback", 'S', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
NULLFIELD}},
NULLFIELD}, true},
// Records produced by transactions
{"xbegin", 'b', FA{{"TXNID", "parentxid", 0},NULLFIELD}},
{"xcommit",'C', FA{{"TXNID", "xid", 0},NULLFIELD}},
{"xprepare",'P', FA{{"TXNID", "xid", 0}, {"XIDP", "xa_xid", 0}, NULLFIELD}},
{"xabort", 'q', FA{{"TXNID", "xid", 0},NULLFIELD}},
{"xbegin", 'b', FA{{"TXNID", "xid", 0},{"TXNID", "parentxid", 0},NULLFIELD}, false},
{"xcommit",'C', FA{{"TXNID", "xid", 0},NULLFIELD}, false},
{"xprepare",'P', FA{{"TXNID", "xid", 0}, {"XIDP", "xa_xid", 0}, NULLFIELD}, false},
{"xabort", 'q', FA{{"TXNID", "xid", 0},NULLFIELD}, false},
//TODO: #2037 Add dname
{"fcreate", 'F', FA{{"TXNID", "xid", 0},
{"FILENUM", "filenum", 0},
......@@ -146,76 +148,82 @@ const struct logtype logtypes[] = {
{"u_int32_t", "nodesize", 0},
{"u_int32_t", "basementnodesize", 0},
{"u_int32_t", "compression_method", 0},
NULLFIELD}},
NULLFIELD}, true},
//TODO: #2037 Add dname
{"fopen", 'O', FA{{"BYTESTRING", "iname", 0},
{"FILENUM", "filenum", 0},
{"u_int32_t", "treeflags", 0},
NULLFIELD}},
NULLFIELD}, false},
//TODO: #2037 Add dname
{"fclose", 'e', FA{{"BYTESTRING", "iname", 0},
{"FILENUM", "filenum", 0},
NULLFIELD}},
NULLFIELD}, false},
//TODO: #2037 Add dname
{"fdelete", 'U', FA{{"TXNID", "xid", 0},
{"FILENUM", "filenum", 0},
NULLFIELD}},
NULLFIELD}, true},
{"enq_insert", 'I', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "value", 0},
NULLFIELD}},
NULLFIELD}, true},
{"enq_insert_no_overwrite", 'i', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "value", 0},
NULLFIELD}},
NULLFIELD}, true},
{"enq_delete_any", 'E', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
NULLFIELD}},
NULLFIELD}, true},
{"enq_insert_multiple", 'm', FA{{"FILENUM", "src_filenum", 0},
{"FILENUMS", "dest_filenums", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "src_key", 0},
{"BYTESTRING", "src_val", 0},
NULLFIELD}},
NULLFIELD}, true},
{"enq_delete_multiple", 'M', FA{{"FILENUM", "src_filenum", 0},
{"FILENUMS", "dest_filenums", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "src_key", 0},
{"BYTESTRING", "src_val", 0},
NULLFIELD}},
NULLFIELD}, true},
{"comment", 'T', FA{{"u_int64_t", "timestamp", 0},
{"BYTESTRING", "comment", 0},
NULLFIELD}},
NULLFIELD}, false},
// Note: Shutdown log entry is NOT ALLOWED TO BE CHANGED.
// Do not change the letter ('Q'), do not add fields,
// do not remove fields.
// This is how we detect clean shutdowns from OLDER VERSIONS.
// This log entry must always be readable for future versions.
// If you DO change it, you need to write a separate log upgrade mechanism.
{"shutdown", 'Q', FA{{"u_int64_t", "timestamp", 0},
NULLFIELD}},
NULLFIELD}, false},
{"load", 'l', FA{{"TXNID", "xid", 0},
{"FILENUM", "old_filenum", 0},
{"BYTESTRING", "new_iname", 0},
NULLFIELD}},
NULLFIELD}, true},
// #2954
{"hot_index", 'h', FA{{"TXNID", "xid", 0},
{"FILENUMS", "hot_index_filenums", 0},
NULLFIELD}},
NULLFIELD}, true},
{"enq_update", 'u', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "extra", 0},
NULLFIELD}},
NULLFIELD}, true},
{"enq_updatebroadcast", 'B', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "extra", 0},
{"BOOL", "is_resetting_op", 0},
NULLFIELD}},
NULLFIELD}, true},
{"change_fdescriptor", 'D', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "old_descriptor", 0},
{"BYTESTRING", "new_descriptor", 0},
{"BOOL", "update_cmp_descriptor", 0},
NULLFIELD}},
{0,0,FA{NULLFIELD}}
NULLFIELD}, true},
{0,0,FA{NULLFIELD}, false}
};
......@@ -365,21 +373,30 @@ generate_get_timestamp(void) {
static void
generate_log_writer (void) {
fprintf(cf, "static u_int64_t toku_lsn_increment=1;\nvoid toku_set_lsn_increment (uint64_t incr) { assert(incr>0 && incr< (16LL<<32)); toku_lsn_increment=incr; }\n");
generate_get_timestamp();
DO_LOGTYPES(lt, {
//TODO(yoni): The overhead variables are NOT correct for BYTESTRING, FILENUMS (or any other variable length type)
// We should switch to something like using toku_logsizeof_*.
fprintf(hf, "static const size_t toku_log_%s_overhead = (+4+1+8", lt->name);
DO_FIELDS(field_type, lt, fprintf(hf, "+%lu", sizeof (field_type->type)));
DO_FIELDS(field_type, lt, fprintf(hf, "+sizeof(%s)", field_type->type));
fprintf(hf, "+8);\n");
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger, LSN *lsnp, int do_fsync", lt->name);
if (lt->needs_to_maybe_log_begin_txn) {
fprintf2(cf, hf, ", TOKUTXN txn");
}
DO_FIELDS(field_type, lt, fprintf2(cf, hf, ", %s %s", field_type->type, field_type->name));
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
fprintf(cf, " int r = 0;\n");
fprintf(cf, " if (logger==0) return 0;\n");
if (lt->needs_to_maybe_log_begin_txn) {
fprintf(cf, " if (txn && !txn->begin_was_logged) {\n");
fprintf(cf, " toku_maybe_log_begin_txn_for_write_operation(txn);\n");
fprintf(cf, " }\n");
}
fprintf(cf, " if (!logger->write_log_files) {\n");
fprintf(cf, " ml_lock(&logger->input_lock);\n");
fprintf(cf, " logger->lsn.lsn += toku_lsn_increment;\n");
fprintf(cf, " logger->lsn.lsn++;\n");
fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n");
fprintf(cf, " ml_unlock(&logger->input_lock);\n");
fprintf(cf, " return 0;\n");
......@@ -398,7 +415,7 @@ generate_log_writer (void) {
fprintf(cf, " wbuf_nocrc_init(&wbuf, logger->inbuf.buf+logger->inbuf.n_in_buf, buflen);\n");
fprintf(cf, " wbuf_nocrc_int(&wbuf, buflen);\n");
fprintf(cf, " wbuf_nocrc_char(&wbuf, '%c');\n", (char)(0xff&lt->command_and_flags));
fprintf(cf, " logger->lsn.lsn += toku_lsn_increment;\n");
fprintf(cf, " logger->lsn.lsn++;\n");
fprintf(cf, " LSN lsn =logger->lsn;\n");
fprintf(cf, " logger->inbuf.max_lsn_in_buf = lsn;\n");
fprintf(cf, " wbuf_nocrc_LSN(&wbuf, lsn);\n");
......
......@@ -159,6 +159,7 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) {
logger->next_log_file_number = nexti;
open_logfile(logger);
toku_txn_manager_set_last_xid_from_logger(logger->txn_manager, logger);
logger->is_open = TRUE;
return 0;
......@@ -277,7 +278,15 @@ int toku_logger_close(TOKULOGGER *loggerp) {
int toku_logger_shutdown(TOKULOGGER logger) {
int r = 0;
if (logger->is_open) {
if (toku_txn_manager_num_live_txns(logger->txn_manager) == 0) {
TXN_MANAGER mgr = logger->txn_manager;
if (toku_txn_manager_num_live_txns(mgr) == 0) {
TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
// Increase the LSN of the shutdown log entry if it would be smaller
// than last_xid because we use the LSN of the shutdown log entry
// to seed the last_xid on bootup.
if (logger->lsn.lsn < last_xid) {
logger->lsn.lsn = last_xid;
}
r = toku_log_shutdown(logger, NULL, TRUE, 0);
}
}
......@@ -838,7 +847,7 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, u_
if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs_fname = { .len=strlen(fname), .data = (char *) fname };
// fsync log on fcreate
int r = toku_log_fcreate (txn->logger, (LSN*)0, 1, toku_txn_get_txnid(txn), filenum, bs_fname, mode, treeflags, nodesize, basementnodesize, compression_method);
int r = toku_log_fcreate (txn->logger, (LSN*)0, 1, txn, toku_txn_get_txnid(txn), filenum, bs_fname, mode, treeflags, nodesize, basementnodesize, compression_method);
return r;
}
......@@ -848,7 +857,7 @@ int toku_logger_log_fdelete (TOKUTXN txn, FILENUM filenum) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
//No fsync.
int r = toku_log_fdelete (txn->logger, (LSN*)0, 0, toku_txn_get_txnid(txn), filenum);
int r = toku_log_fdelete (txn->logger, (LSN*)0, 0, txn, toku_txn_get_txnid(txn), filenum);
return r;
}
......
......@@ -7,6 +7,7 @@
#include "includes.h"
#include <ft/log_header.h>
#include "checkpoint.h"
#include "txn_manager.h"
static const char recovery_lock_file[] = "/__tokudb_recoverylock_dont_delete_me";
......@@ -35,6 +36,7 @@ struct scan_state {
uint64_t checkpoint_begin_timestamp;
uint32_t checkpoint_num_fassociate;
uint32_t checkpoint_num_xstillopen;
TXNID last_xid;
};
static const char *scan_state_strings[] = {
......@@ -47,6 +49,7 @@ static void scan_state_init(struct scan_state *ss) {
ss->checkpoint_end_lsn = ZERO_LSN;
ss->checkpoint_num_fassociate = 0;
ss->checkpoint_num_xstillopen = 0;
ss->last_xid = 0;
}
static const char *scan_state_string(struct scan_state *ss) {
......@@ -200,10 +203,6 @@ static int recover_env_init (RECOVER_ENV renv,
size_t cachetable_size) {
int r;
r = toku_create_cachetable(&renv->ct, cachetable_size ? cachetable_size : 1<<25, (LSN){0}, logger);
assert(r == 0);
toku_cachetable_set_env_dir(renv->ct, env_dir);
if (keep_cachetable_callback) keep_cachetable_callback(env, renv->ct);
// If we are passed a logger use it, otherwise create one.
renv->destroy_logger_at_end = logger==NULL;
if (logger) {
......@@ -213,6 +212,10 @@ static int recover_env_init (RECOVER_ENV renv,
assert(r == 0);
}
toku_logger_write_log_files(renv->logger, FALSE);
r = toku_create_cachetable(&renv->ct, cachetable_size ? cachetable_size : 1<<25, (LSN){0}, renv->logger);
assert(r == 0);
toku_cachetable_set_env_dir(renv->ct, env_dir);
if (keep_cachetable_callback) keep_cachetable_callback(env, renv->ct);
toku_logger_set_cachetable(renv->logger, renv->ct);
renv->env = env;
renv->prepared_txn_callback = prepared_txn_callback;
......@@ -320,13 +323,23 @@ static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create
static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
int r;
TXN_MANAGER mgr = toku_logger_get_txn_manager(renv->logger);
switch (renv->ss.ss) {
case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn);
invariant(renv->ss.last_xid == TXNID_NONE);
renv->ss.last_xid = l->last_xid;
toku_txn_manager_set_last_xid_from_recovered_checkpoint(mgr, l->last_xid);
r = 0;
break;
case FORWARD_NEWER_CHECKPOINT_END:
assert(l->lsn.lsn > renv->ss.checkpoint_end_lsn.lsn);
// Verify last_xid is no older than the previous begin
invariant(l->last_xid >= renv->ss.last_xid);
// Verify last_xid is no older than the newest txn
invariant(l->last_xid >= toku_txn_manager_get_last_xid(mgr));
r = 0; // ignore it (log only has a begin checkpoint)
break;
default:
......@@ -369,7 +382,7 @@ static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *l, RECOVE
int r;
switch (renv->ss.ss) {
case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
assert(l->xid == renv->ss.checkpoint_begin_lsn.lsn);
assert(l->lsn_begin_checkpoint.lsn == renv->ss.checkpoint_begin_lsn.lsn);
assert(l->lsn.lsn == renv->ss.checkpoint_end_lsn.lsn);
assert(l->num_fassociate_entries == renv->ss.checkpoint_num_fassociate);
assert(l->num_xstillopen_entries == renv->ss.checkpoint_num_xstillopen);
......@@ -388,11 +401,11 @@ static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *l, RECOVE
static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
time_t tnow = time(NULL);
fprintf(stderr, "%.24s Tokudb recovery bw_end_checkpoint at %"PRIu64" timestamp %"PRIu64" xid %"PRIu64" (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, l->xid, recover_state(renv));
fprintf(stderr, "%.24s Tokudb recovery bw_end_checkpoint at %"PRIu64" timestamp %"PRIu64" xid %"PRIu64" (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, l->lsn_begin_checkpoint.lsn, recover_state(renv));
switch (renv->ss.ss) {
case BACKWARD_NEWER_CHECKPOINT_END:
renv->ss.ss = BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END;
renv->ss.checkpoint_begin_lsn.lsn = l->xid;
renv->ss.checkpoint_begin_lsn.lsn = l->lsn_begin_checkpoint.lsn;
renv->ss.checkpoint_end_lsn.lsn = l->lsn.lsn;
renv->ss.checkpoint_end_timestamp = l->timestamp;
return 0;
......@@ -482,6 +495,9 @@ recover_transaction(TOKUTXN *txnp, TXNID xid, TXNID parentxid, TOKULOGGER logger
}
r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE, NULL, true);
assert(r == 0);
// We only know about it because it was logged. Restore the log bit.
// Logging is 'off' but it will still set the bit.
toku_maybe_log_begin_txn_for_write_operation(txn);
if (txnp) *txnp = txn;
return 0;
}
......@@ -506,6 +522,8 @@ static int recover_xstillopen_internal (TOKUTXN *txnp,
switch (renv->ss.ss) {
case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: {
renv->ss.checkpoint_num_xstillopen++;
invariant(renv->ss.last_xid != TXNID_NONE);
invariant(xid <= renv->ss.last_xid);
TOKUTXN txn = NULL;
{ //Create the transaction.
r = recover_transaction(&txn, xid, parentxid, renv->logger);
......@@ -637,7 +655,7 @@ static int toku_recover_backward_suppress_rollback (struct logtype_suppress_roll
static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
int r;
r = recover_transaction(NULL, l->lsn.lsn, l->parentxid, renv->logger);
r = recover_transaction(NULL, l->xid, l->parentxid, renv->logger);
return r;
}
......@@ -1167,8 +1185,10 @@ static int toku_recover_backward_hot_index(struct logtype_hot_index *UU(l), RECO
return 0;
}
// Effects: If there are no log files, or if there is a "clean" checkpoint at the end of the log,
// then we don't need recovery to run. Skip the shutdown log entry if there is one.
// Effects: If there are no log files, or if there is a clean "shutdown" at
// the end of the log, then we don't need recovery to run.
// Requires: the shutdown log entry does not change between tokudb versions,
// must always remain readable.
// Returns: TRUE if we need recovery, otherwise FALSE.
int tokudb_needs_recovery(const char *log_dir, BOOL ignore_log_empty) {
int needs_recovery;
......@@ -1182,32 +1202,12 @@ int tokudb_needs_recovery(const char *log_dir, BOOL ignore_log_empty) {
struct log_entry *le = NULL;
r = toku_logcursor_last(logcursor, &le);
if (r == DB_NOTFOUND && ignore_log_empty) {
needs_recovery = FALSE; goto exit;
}
if (r != 0) {
needs_recovery = TRUE; goto exit;
}
if (le->cmd==LT_shutdown || le->cmd==LT_comment) {
r = toku_logcursor_prev(logcursor, &le);
if (r != 0) {
needs_recovery = TRUE; goto exit;
}
}
if (le->cmd != LT_end_checkpoint) {
needs_recovery = TRUE; goto exit;
}
struct log_entry end_checkpoint = *le;
r = toku_logcursor_prev(logcursor, &le);
if (r != 0 || le->cmd != LT_begin_checkpoint) {
needs_recovery = TRUE; goto exit;
if (r == 0) {
needs_recovery = le->cmd != LT_shutdown;
}
if (le->u.begin_checkpoint.lsn.lsn != end_checkpoint.u.end_checkpoint.xid) {
needs_recovery = TRUE; goto exit;
else {
needs_recovery = !(r == DB_NOTFOUND && ignore_log_empty);
}
needs_recovery = FALSE;
exit:
if (logcursor) {
r = toku_logcursor_destroy(&logcursor);
......
......@@ -29,9 +29,9 @@ test_main (int argc __attribute__((__unused__)),
LSN comment_lsn;
r = toku_log_comment(logger, &comment_lsn, TRUE, 0, hello);
LSN begin_lsn;
r = toku_log_begin_checkpoint(logger, &begin_lsn, TRUE, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &begin_lsn, TRUE, 0, 0); assert(r == 0);
LSN end_lsn;
r = toku_log_end_checkpoint(logger, &end_lsn, TRUE, begin_lsn.lsn, 0, 0, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, &end_lsn, TRUE, begin_lsn, 0, 0, 0); assert(r == 0);
r = toku_logger_maybe_trim_log(logger, begin_lsn); assert(r == 0);
r = toku_logger_close(&logger); assert(r == 0);
......
......@@ -8,10 +8,14 @@
#define TESTDIR __SRCFILE__ ".dir"
static const int magic_begin_end_checkpoint_sz = 85; // leave this many bytes in file
static int
run_test(void) {
// leave this many bytes in file
const int magic_begin_end_checkpoint_sz = 8 // "tokulogg" magic 8 byte header
+4 // version
+toku_log_begin_checkpoint_overhead
+toku_log_end_checkpoint_overhead;
int r;
int trim = 1;
toku_struct_stat st;
......@@ -31,13 +35,12 @@ run_test(void) {
r = toku_logger_open(TESTDIR, logger); assert(r == 0);
LSN beginlsn;
// all logs must contain a valid checkpoint
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0, 0, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn, 0, 0, 0); assert(r == 0);
r = toku_log_comment(logger, NULL, TRUE, 0, hello); assert(r == 0);
r = toku_log_comment(logger, NULL, TRUE, 0, world); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0, 0, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn, 0, 0, 0); assert(r == 0);
r = toku_log_comment(logger, NULL, TRUE, 0, hello); assert(r == 0);
r = toku_log_comment(logger, NULL, TRUE, 0, there); assert(r == 0);
r = toku_logger_close(&logger); assert(r == 0);
......
......@@ -24,8 +24,8 @@ run_test(void) {
// add begin checkpoint, end checkpoint
LSN beginlsn;
r = toku_log_begin_checkpoint(logger, &beginlsn, FALSE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0, 0, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, FALSE, 0, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn, 0, 0, 0); assert(r == 0);
r = toku_logger_close(&logger); assert(r == 0);
// add hello
......
......@@ -20,12 +20,12 @@ run_test(void) {
r = toku_logger_create(&logger); assert(r == 0);
r = toku_logger_open(TESTDIR, logger); assert(r == 0);
LSN beginlsn;
r = toku_log_begin_checkpoint(logger, &beginlsn, FALSE, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, FALSE, 0, 0); assert(r == 0);
r = toku_logger_close(&logger); assert(r == 0);
r = toku_logger_create(&logger); assert(r == 0);
r = toku_logger_open(TESTDIR, logger); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0, 0, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn, 0, 0, 0); assert(r == 0);
r = toku_logger_close(&logger); assert(r == 0);
// run recovery
......
......@@ -20,9 +20,9 @@ run_test(void) {
r = toku_logger_create(&logger); assert(r == 0);
r = toku_logger_open(TESTDIR, logger); assert(r == 0);
LSN beginlsn;
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0, 0, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn, 0, 0, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0, 0); assert(r == 0);
r = toku_logger_close(&logger); assert(r == 0);
// run recovery
......
......@@ -22,10 +22,10 @@ run_test(void) {
r = toku_logger_create(&logger); assert(r == 0);
r = toku_logger_open(TESTDIR, logger); assert(r == 0);
LSN firstbegin = ZERO_LSN;
r = toku_log_begin_checkpoint(logger, &firstbegin, TRUE, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &firstbegin, TRUE, 0, 0); assert(r == 0);
assert(firstbegin.lsn != ZERO_LSN.lsn);
r = toku_log_end_checkpoint(logger, NULL, FALSE, firstbegin.lsn, 0, 0, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, NULL, TRUE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, FALSE, firstbegin, 0, 0, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, NULL, TRUE, 0, 0); assert(r == 0);
r = toku_logger_close(&logger); assert(r == 0);
if (!verbose) {
......
......@@ -22,8 +22,8 @@ run_test(void) {
r = toku_logger_create(&logger); assert(r == 0);
r = toku_logger_open(TESTDIR, logger); assert(r == 0);
LSN beginlsn;
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0, 0, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn, 0, 0, 0); assert(r == 0);
BYTESTRING iname = { strlen("missing_tokudb_file"), "missing_tokudb_file" };
FILENUM filenum = {42};
......
......@@ -24,8 +24,8 @@ run_test(void) {
BYTESTRING hello = { strlen("hello"), "hello" };
r = toku_log_comment(logger, NULL, TRUE, 0, hello); assert(r == 0);
LSN beginlsn;
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0, 0, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn, 0, 0, 0); assert(r == 0);
r = toku_log_comment(logger, NULL, TRUE, 0, hello); assert(r == 0);
BYTESTRING there = { strlen("there"), "there" };
r = toku_log_comment(logger, NULL, TRUE, 0, there); assert(r == 0);
......
......@@ -41,8 +41,8 @@ run_test(void) {
r = toku_logger_open(TESTDIR, logger); assert(r == 0);
LSN beginlsn;
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0, 0, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn, 0, 0, 0); assert(r == 0);
r = toku_logger_close(&logger); assert(r == 0);
......
......@@ -184,23 +184,23 @@ int create_logfiles() {
// use old x1.tdb test log as basis
//xbegin 'b': lsn=1 parenttxnid=0 crc=00005f1f len=29
r = toku_log_xbegin(logger, &lsn, NO_FSYNC, 0); assert(r==0); txnid = lsn.lsn;
r = toku_log_xbegin(logger, &lsn, 1, NO_FSYNC, 0); assert(r==0); txnid = lsn.lsn;
//fcreate 'F': lsn=2 txnid=1 filenum=0 fname={len=4 data="a.db"} mode=0777 treeflags=0 crc=18a3d525 len=49
r = toku_log_fcreate(logger, &lsn, NO_FSYNC, txnid, fn_aname, bs_aname, 0x0777, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 0); assert(r==0);
r = toku_log_fcreate(logger, &lsn, NO_FSYNC, NULL, txnid, fn_aname, bs_aname, 0x0777, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 0); assert(r==0);
//commit 'C': lsn=3 txnid=1 crc=00001f1e len=29
r = toku_log_xcommit(logger, &lsn, FSYNC, txnid); assert(r==0);
//xbegin 'b': lsn=4 parenttxnid=0 crc=00000a1f len=29
r = toku_log_xbegin(logger, &lsn, NO_FSYNC, 0); assert(r==0); txnid = lsn.lsn;
r = toku_log_xbegin(logger, &lsn, 2, NO_FSYNC, 0); assert(r==0); txnid = lsn.lsn;
//fcreate 'F': lsn=5 txnid=4 filenum=1 fname={len=4 data="b.db"} mode=0777 treeflags=0 crc=14a47925 len=49
r = toku_log_fcreate(logger, &lsn, NO_FSYNC, txnid, fn_bname, bs_bname, 0x0777, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 0); assert(r==0);
r = toku_log_fcreate(logger, &lsn, NO_FSYNC, NULL, txnid, fn_bname, bs_bname, 0x0777, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 0); assert(r==0);
//commit 'C': lsn=6 txnid=4 crc=0000c11e len=29
r = toku_log_xcommit(logger, &lsn, FSYNC, txnid); assert(r==0);
//xbegin 'b': lsn=7 parenttxnid=0 crc=0000f91f len=29
r = toku_log_xbegin(logger, &lsn, NO_FSYNC, 0); assert(r==0); txnid = lsn.lsn;
r = toku_log_xbegin(logger, &lsn, 3, NO_FSYNC, 0); assert(r==0); txnid = lsn.lsn;
//enq_insert 'I': lsn=8 filenum=0 xid=7 key={len=2 data="a\000"} value={len=2 data="b\000"} crc=40b863e4 len=45
r = toku_log_enq_insert(logger, &lsn, NO_FSYNC, fn_aname, txnid, bs_a, bs_b); assert(r==0);
r = toku_log_enq_insert(logger, &lsn, NO_FSYNC, NULL, fn_aname, txnid, bs_a, bs_b); assert(r==0);
//begin_checkpoint 'x': lsn=9 timestamp=1251309957584197 crc=cd067878 len=29
r = toku_log_begin_checkpoint(logger, &lsn, NO_FSYNC, 1251309957584197); assert(r==0); cp_txnid = lsn.lsn;
r = toku_log_begin_checkpoint(logger, &lsn, NO_FSYNC, 1251309957584197, 0); assert(r==0); cp_txnid = lsn.lsn;
//fassociate 'f': lsn=11 filenum=1 fname={len=4 data="b.db"} crc=a7126035 len=33
r = toku_log_fassociate(logger, &lsn, NO_FSYNC, fn_bname, 0, bs_bname, 0); assert(r==0);
num_fassociate++;
......@@ -217,9 +217,9 @@ int create_logfiles() {
}
num_xstillopen++;
//end_checkpoint 'X': lsn=13 txnid=9 timestamp=1251309957586872 crc=cd285c30 len=37
r = toku_log_end_checkpoint(logger, &lsn, FSYNC, cp_txnid, 1251309957586872, num_fassociate, num_xstillopen); assert(r==0);
r = toku_log_end_checkpoint(logger, &lsn, FSYNC, (LSN){cp_txnid}, 1251309957586872, num_fassociate, num_xstillopen); assert(r==0);
//enq_insert 'I': lsn=14 filenum=1 xid=7 key={len=2 data="b\000"} value={len=2 data="a\000"} crc=40388be4 len=45
r = toku_log_enq_insert(logger, &lsn, NO_FSYNC, fn_bname, txnid, bs_b, bs_a); assert(r==0);
r = toku_log_enq_insert(logger, &lsn, NO_FSYNC, NULL, fn_bname, txnid, bs_b, bs_a); assert(r==0);
//commit 'C': lsn=15 txnid=7 crc=00016d1e len=29
r = toku_log_xcommit(logger, &lsn, FSYNC, txnid); assert(r==0);
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
#include "includes.h"
#include "toku_os.h"
#include "checkpoint.h"
#define TESTDIR __SRCFILE__ ".dir"
#define FILENAME "test0.ft"
static void test_it (int N) {
FT_HANDLE brt;
int r;
r = system("rm -rf " TESTDIR);
CKERR(r);
r = toku_os_mkdir(TESTDIR, S_IRWXU); CKERR(r);
TOKULOGGER logger;
r = toku_logger_create(&logger); CKERR(r);
r = toku_logger_open(TESTDIR, logger); CKERR(r);
CACHETABLE ct;
r = toku_create_cachetable(&ct, 0, ZERO_LSN, logger); CKERR(r);
toku_cachetable_set_env_dir(ct, TESTDIR);
toku_logger_set_cachetable(logger, ct);
r = toku_logger_open_rollback(logger, ct, TRUE); CKERR(r);
TOKUTXN txn;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_open_ft_handle(FILENAME, 1, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
TXNID xid_first = txn->txnid64;
unsigned int rands[N];
for (int i=0; i<N; i++) {
char key[100],val[300];
DBT k, v;
rands[i] = random();
snprintf(key, sizeof(key), "key%x.%x", rands[i], i);
memset(val, 'v', sizeof(val));
val[sizeof(val)-1]=0;
r = toku_ft_insert(brt, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), txn);
CKERR(r);
}
{
TOKUTXN txn2;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn2, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
// Verify the txnid has gone up only by one (even though many log entries were done)
invariant(txn2->txnid64 == xid_first + 1);
r = toku_txn_commit_txn(txn2, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn2);
}
r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn);
{
//TODO(yoni) #5067 will break this portion of the test. (End ids are also assigned, so it would increase by 4 instead of 2.)
// Verify the txnid has gone up only by two (even though many log entries were done)
TOKUTXN txn3;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn3, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
invariant(txn3->txnid64 == xid_first + 2);
r = toku_txn_commit_txn(txn3, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn3);
}
r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT); CKERR(r);
r = toku_close_ft_handle_nolsn(brt, NULL); CKERR(r);
r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT); CKERR(r);
r = toku_logger_close_rollback(logger, FALSE); CKERR(r);
r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT); CKERR(r);
r = toku_cachetable_close(&ct); CKERR(r);
r = toku_logger_close(&logger); assert(r==0);
}
int test_main (int argc, const char *argv[]) {
default_parse_args(argc, argv);
for (int i=1; i<=64; i++) {
test_it(i);
}
return 0;
}
......@@ -110,9 +110,7 @@ toku_txn_create_txn (
TOKUTXN *tokutxn,
TOKUTXN parent_tokutxn,
TOKULOGGER logger,
TXNID xid,
TXN_SNAPSHOT_TYPE snapshot_type,
XIDS xids,
DB_TXN *container_db_txn,
bool for_checkpoint
)
......@@ -122,17 +120,6 @@ toku_txn_create_txn (
}
assert(logger->rollback_cachefile);
TXNID snapshot_txnid64;
if (snapshot_type == TXN_SNAPSHOT_NONE) {
snapshot_txnid64 = TXNID_NONE;
} else if (parent_tokutxn == NULL || snapshot_type == TXN_SNAPSHOT_CHILD) {
snapshot_txnid64 = xid;
} else if (snapshot_type == TXN_SNAPSHOT_ROOT) {
snapshot_txnid64 = parent_tokutxn->snapshot_txnid64;
} else {
assert(false);
}
OMT open_fts;
{
int r = toku_omt_create(&open_fts);
......@@ -160,16 +147,18 @@ toku_txn_create_txn (
.progress_poll_fun = NULL,
.progress_poll_fun_extra = NULL,
.snapshot_type = snapshot_type,
.snapshot_txnid64 = snapshot_txnid64,
.snapshot_txnid64 = TXNID_NONE,
.container_db_txn = container_db_txn,
.force_fsync_on_commit = FALSE,
.begin_was_logged = false,
.recovered_from_checkpoint = for_checkpoint,
.checkpoint_needed_before_commit = FALSE,
.state = TOKUTXN_LIVE,
.do_fsync = FALSE,
.txnid64 = xid,
.ancestor_txnid64 = (parent_tokutxn ? parent_tokutxn->ancestor_txnid64 : xid),
.xids = xids,
.do_fsync_lsn = ZERO_LSN,
.txnid64 = TXNID_NONE,
.ancestor_txnid64 = TXNID_NONE,
.xids = NULL,
.roll_info = roll_info,
.num_pin = 0
};
......@@ -188,6 +177,39 @@ toku_txn_create_txn (
return 0;
}
void
toku_txn_update_xids_in_txn(TOKUTXN txn, TXNID xid, XIDS xids)
{
// these should not have been set yet
invariant(txn->txnid64 == TXNID_NONE);
invariant(txn->ancestor_txnid64 == TXNID_NONE);
invariant(txn->snapshot_txnid64 == TXNID_NONE);
invariant(txn->xids == NULL);
TXNID snapshot_txnid64;
if (txn->snapshot_type == TXN_SNAPSHOT_NONE) {
snapshot_txnid64 = TXNID_NONE;
} else if (txn->parent == NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD) {
snapshot_txnid64 = xid;
} else if (txn->snapshot_type == TXN_SNAPSHOT_ROOT) {
snapshot_txnid64 = txn->parent->snapshot_txnid64;
} else {
assert(false);
}
#define UNCONST(t, x) *((t *) &(x))
// we need to cast around const here in order to move
// toku_txn_create_txn outside of the txn_manager_lock in
// toku_txn_manager_start_txn
UNCONST(TXNID, txn->txnid64) = xid;
UNCONST(TXNID, txn->snapshot_txnid64) = snapshot_txnid64;
UNCONST(TXNID, txn->ancestor_txnid64) = (txn->parent ? txn->parent->ancestor_txnid64 : xid);
txn->xids = xids;
#undef UNCONST
}
//Used on recovery to recover a transaction.
int
toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
......@@ -260,11 +282,25 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra;
if (txn->begin_was_logged) {
r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64);
if (r==0) {
if (r != 0) {
goto cleanup;
}
}
else {
// Did no work.
invariant(txn->roll_info.num_rollentries == 0);
// Was not prepared.
invariant(txn->do_fsync_lsn.lsn == ZERO_LSN.lsn);
}
// If !txn->begin_was_logged, we could skip toku_rollback_commit
// but it's cheap (only a number of function calls that return immediately)
// since there were no writes. Skipping it would mean we would need to be careful
// in case we added any additional required cleanup into those functions in the future.
r = toku_rollback_commit(txn, oplsn);
STATUS_VALUE(TXN_COMMIT)++;
}
cleanup:
return r;
}
......@@ -278,19 +314,34 @@ int toku_txn_abort_txn(TOKUTXN txn,
int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Ammong other things, if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
// Effect: Among other things, if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{
toku_txn_manager_note_abort_txn(txn->logger->txn_manager, txn);
txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra;
int r = 0;
int r;
txn->do_fsync = FALSE;
if (txn->begin_was_logged) {
r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64);
if (r==0) {
if (r != 0) {
goto cleanup;
}
}
else {
// Did no work.
invariant(txn->roll_info.num_rollentries == 0);
// Was not prepared.
invariant(txn->do_fsync_lsn.lsn == ZERO_LSN.lsn);
}
// If !txn->begin_was_logged, we could skip toku_rollback_abort
// but it's cheap (only a number of function calls that return immediately)
// since there were no writes. Skipping it would mean we would need to be careful
// in case we added any additional required cleanup into those functions in the future.
r = toku_rollback_abort(txn, oplsn);
STATUS_VALUE(TXN_ABORT)++;
}
cleanup:
return r;
}
......@@ -326,24 +377,10 @@ int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist prepl
);
}
struct txn_fsync_log_info {
TOKULOGGER logger;
LSN do_fsync_lsn;
int r;
};
static void do_txn_fsync_log(void *thunk) {
struct txn_fsync_log_info *info = (struct txn_fsync_log_info *) thunk;
info->r = toku_logger_fsync_if_lsn_not_fsynced(info->logger, info->do_fsync_lsn);
}
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync) {
int r = 0;
if (logger && do_fsync) {
struct txn_fsync_log_info info = { .logger = logger, .do_fsync_lsn = do_fsync_lsn };
//TODO(yoni): inline do_txn_fsync_log here
do_txn_fsync_log(&info);
r = info.r;
r = toku_logger_fsync_if_lsn_not_fsynced(logger, do_fsync_lsn);
}
return r;
}
......@@ -446,6 +483,38 @@ toku_txn_get_state(TOKUTXN txn) {
return txn->state;
}
static void
maybe_log_begin_txn_for_write_operation_unlocked(TOKUTXN txn) {
// We now hold the lock.
if (txn->begin_was_logged) {
goto cleanup;
}
TOKUTXN parent = txn->parent;
TXNID xid = txn->txnid64;
TXNID pxid = 0;
if (parent) {
// Recursively log parent first if necessary.
// Transactions cannot do work if they have children,
// so the lowest level child's lock is sufficient for ancestors.
maybe_log_begin_txn_for_write_operation_unlocked(parent);
pxid = parent->txnid64;
}
int r = toku_log_xbegin(txn->logger, NULL, 0, xid, pxid);
lazy_assert_zero(r);
txn->begin_was_logged = true;
cleanup:
return;
}
void
toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn) {
toku_txn_lock(txn);
maybe_log_begin_txn_for_write_operation_unlocked(txn);
toku_txn_unlock(txn);
}
#include <valgrind/helgrind.h>
void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void);
void
......
......@@ -37,7 +37,8 @@ int toku_txn_begin_with_xid (
);
// Allocate and initialize a txn
int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXNID xid, TXN_SNAPSHOT_TYPE snapshot_type, XIDS xids, DB_TXN *container_db_txn, bool for_checkpoint);
int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn, bool for_checkpoint);
void toku_txn_update_xids_in_txn(TOKUTXN txn, TXNID xid, XIDS xids);
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
......@@ -115,6 +116,8 @@ struct tokulogger_preplist {
};
int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags);
void toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn);
#if defined(__cplusplus) || defined(__cilkplusplus)
}
#endif
......
......@@ -29,6 +29,7 @@ struct txn_manager {
struct toku_list prepared_and_returned_txns; // transactions that have been prepared and unresolved, and have been returned through txn_recover. We need this list so that we can restart the recovery.
toku_cond_t wait_for_unpin_of_txn;
TXNID last_xid;
};
static TXN_MANAGER_STATUS_S txn_manager_status;
......@@ -189,7 +190,7 @@ void toku_txn_manager_get_status(TOKULOGGER logger, TXN_MANAGER_STATUS s) {
void toku_txn_manager_init(TXN_MANAGER* txn_managerp) {
int r = 0;
TXN_MANAGER XMALLOC(txn_manager);
TXN_MANAGER XCALLOC(txn_manager);
toku_mutex_init(&txn_manager->txn_manager_lock, NULL);
r = toku_omt_create(&txn_manager->live_txns);
assert_zero(r);
......@@ -201,6 +202,8 @@ void toku_txn_manager_init(TXN_MANAGER* txn_managerp) {
assert_zero(r);
txn_manager->oldest_living_xid = TXNID_NONE_LIVING;
txn_manager->oldest_living_starttime = 0;
txn_manager->last_xid = 0;
//TODO(yoni): #5062 get this from somewhere
toku_list_init(&txn_manager->prepared_txns);
toku_list_init(&txn_manager->prepared_and_returned_txns);
......@@ -310,6 +313,11 @@ live_list_reverse_note_txn_start(TOKUTXN txn) {
return r;
}
static TXNID
max_xid(TXNID a, TXNID b) {
return a < b ? b : a;
}
int toku_txn_manager_start_txn(
TOKUTXN *txnp,
TXN_MANAGER txn_manager,
......@@ -321,8 +329,22 @@ int toku_txn_manager_start_txn(
bool for_recovery)
{
int r;
// we take the txn_manager_lock before writing to the log,
// because the act of getting a transaction ID and adding the
// Do as much (safe) work as possible before serializing on the txn_manager lock.
XIDS parent_xids;
if (parent == NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent->xids;
TOKUTXN txn;
r = toku_txn_create_txn(&txn, parent, logger, snapshot_type, container_db_txn, for_recovery);
if (r != 0) {
// logger is panicked
return r;
}
// the act of getting a transaction ID and adding the
// txn to the proper OMTs must be atomic. MVCC depends
// on this.
toku_mutex_lock(&txn_manager->txn_manager_lock);
......@@ -330,27 +352,22 @@ int toku_txn_manager_start_txn(
verify_snapshot_system(txn_manager);
}
if (xid == TXNID_NONE) {
LSN first_lsn;
invariant(!for_recovery);
xid = ++txn_manager->last_xid;
invariant(logger);
r = toku_log_xbegin(logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
assert_zero(r);
xid = first_lsn.lsn;
}
XIDS parent_xids;
if (parent == NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent->xids;
else {
// Recovered transactions may not come in ascending order,
// because we assign xids when transactions are created but
// log transactions only when they first perform a write.
invariant(for_recovery);
txn_manager->last_xid = max_xid(txn_manager->last_xid, xid);
}
XIDS xids;
r = xids_create_child(parent_xids, &xids, xid);
assert_zero(r);
TOKUTXN txn;
r = toku_txn_create_txn(&txn, parent, logger, xid, snapshot_type, xids, container_db_txn, for_recovery);
if (r != 0) {
// logger is panicked
return r;
}
toku_txn_update_xids_in_txn(txn, xid, xids);
if (toku_omt_size(txn_manager->live_txns) == 0) {
assert(txn_manager->oldest_living_xid == TXNID_NONE_LIVING);
......@@ -501,7 +518,7 @@ static int find_xid (OMTVALUE v, void *txnv) {
void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
int r;
toku_mutex_lock(&txn_manager->txn_manager_lock);
{
if (garbage_collection_debug) {
verify_snapshot_system(txn_manager);
}
......@@ -510,10 +527,10 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
OMTVALUE txnagain;
u_int32_t idx;
r = toku_omt_find_zero(txn_manager->live_txns, find_xid, txn, &txnagain, &idx);
assert(r==0);
assert(txn==txnagain);
invariant_zero(r);
invariant(txn==txnagain);
r = toku_omt_delete_at(txn_manager->live_txns, idx);
assert(r==0);
invariant_zero(r);
}
if (txn->parent==NULL) {
......@@ -521,11 +538,11 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
u_int32_t idx;
//Remove txn from list of live root txns
r = toku_omt_find_zero(txn_manager->live_root_txns, toku_find_xid_by_xid, (OMTVALUE)txn->txnid64, &v, &idx);
assert(r==0);
invariant_zero(r);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(txn_manager->live_root_txns, idx);
assert(r==0);
invariant_zero(r);
}
//
// if this txn created a snapshot, make necessary modifications to list of snapshot txnids and live_list_reverse
......@@ -535,25 +552,22 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
// live_list_reverse, we have trouble. We end up never removing (id, id) from live_list_reverse
//
if (txn->snapshot_type != TXN_SNAPSHOT_NONE && (txn->parent==NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD)) {
{
u_int32_t idx;
OMTVALUE v;
//Free memory used for snapshot_txnids
r = toku_omt_find_zero(txn_manager->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
invariant(r==0);
invariant_zero(r);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(txn_manager->snapshot_txnids, idx);
invariant(r==0);
}
invariant_zero(r);
live_list_reverse_note_txn_end(txn);
{
//Free memory used for live root txns local list
invariant(toku_omt_size(txn->live_root_txn_list) > 0);
toku_omt_destroy(&txn->live_root_txn_list);
}
}
}
assert(txn_manager->oldest_living_xid <= txn->txnid64);
if (txn->txnid64 == txn_manager->oldest_living_xid) {
......@@ -802,5 +816,32 @@ void toku_txn_manager_resume(TXN_MANAGER txn_manager) {
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
void
toku_txn_manager_set_last_xid_from_logger(TXN_MANAGER txn_manager, TOKULOGGER logger) {
invariant(txn_manager->last_xid == TXNID_NONE);
LSN last_lsn = toku_logger_last_lsn(logger);
txn_manager->last_xid = last_lsn.lsn;
}
void
toku_txn_manager_set_last_xid_from_recovered_checkpoint(TXN_MANAGER txn_manager, TXNID last_xid) {
txn_manager->last_xid = last_xid;
}
TXNID
toku_txn_manager_get_last_xid(TXN_MANAGER mgr) {
toku_mutex_lock(&mgr->txn_manager_lock);
TXNID last_xid = mgr->last_xid;
toku_mutex_unlock(&mgr->txn_manager_lock);
return last_xid;
}
// Test-only function
void
toku_txn_manager_increase_last_xid(TXN_MANAGER mgr, uint64_t increment) {
toku_mutex_lock(&mgr->txn_manager_lock);
mgr->last_xid += increment;
toku_mutex_unlock(&mgr->txn_manager_lock);
}
#undef STATUS_VALUE
......@@ -84,6 +84,13 @@ void toku_txn_manager_unpin_live_txn_unlocked(TXN_MANAGER txn_manager, TOKUTXN t
void toku_txn_manager_suspend(TXN_MANAGER txn_manager);
void toku_txn_manager_resume(TXN_MANAGER txn_manager);
void toku_txn_manager_set_last_xid_from_logger(TXN_MANAGER txn_manager, TOKULOGGER logger);
void toku_txn_manager_set_last_xid_from_recovered_checkpoint(TXN_MANAGER txn_manager, TXNID last_xid);
TXNID toku_txn_manager_get_last_xid(TXN_MANAGER mgr);
// Test-only function
void toku_txn_manager_increase_last_xid(TXN_MANAGER mgr, uint64_t increment);
#if defined(__cplusplus) || defined(__cilkplusplus)
}
#endif
......
......@@ -79,6 +79,7 @@
toku_test_get_latest_lsn;
toku_test_get_checkpointing_user_data_status;
toku_indexer_set_test_only_flags;
toku_increase_last_xid;
local: *;
};
......
......@@ -7,6 +7,7 @@
#include <db.h>
#include <sys/stat.h>
#include <ft/log.h>
#include <src/ydb_txn.h>
static void
four_billion_subtransactions (int do_something_in_children, int use_big_increment) {
......@@ -14,10 +15,11 @@ four_billion_subtransactions (int do_something_in_children, int use_big_incremen
DB *db;
DB_TXN *xparent;
uint64_t extra_increment;
if (use_big_increment) {
toku_set_lsn_increment(1<<28); // 1/4 of a billion, so 16 transactions should push us over the edge.
extra_increment = (1<<28); // 1/4 of a billion, so 16 transactions should push us over the edge.
} else {
toku_set_lsn_increment(1);
extra_increment = 0; // xid is already incrementing once per txn.
}
int r;
......@@ -42,6 +44,7 @@ four_billion_subtransactions (int do_something_in_children, int use_big_incremen
long long const fourbillion = use_big_increment ? 32 : 500000; // if using the big increment we should run into trouble in only 32 transactions or less.
for (i=0; i < fourbillion + 100; i++) {
DB_TXN *xchild;
toku_increase_last_xid(env, extra_increment);
r=env->txn_begin(env, xparent, &xchild, 0); CKERR(r);
if (do_something_in_children) {
char hello[30], there[30];
......
......@@ -934,7 +934,7 @@ env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
DB_TXN *txn=NULL;
if (using_txns) {
r = locked_txn_begin(env, 0, &txn, 0);
r = toku_txn_begin(env, 0, &txn, 0);
assert_zero(r);
}
......@@ -1177,7 +1177,7 @@ locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *db
DB_TXN *child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
if (using_txns) {
ret = locked_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
ret = toku_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
}
......@@ -1206,7 +1206,7 @@ locked_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbn
DB_TXN *child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
if (using_txns) {
ret = locked_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
ret = toku_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
}
......@@ -2216,7 +2216,7 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) {
result->get_engine_status = env_get_engine_status;
result->get_engine_status_text = env_get_engine_status_text;
result->crash = env_crash; // handlerton's call to fractal tree layer on failed assert
result->txn_begin = locked_txn_begin;
result->txn_begin = toku_txn_begin;
MALLOC(result->i);
if (result->i == 0) { r = ENOMEM; goto cleanup; }
......
......@@ -625,7 +625,7 @@ locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYP
DB_TXN *child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
if (using_txns) {
ret = locked_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
ret = toku_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
}
......@@ -934,7 +934,7 @@ locked_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_in
DB_TXN *child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
if (using_txns) {
ret = locked_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
ret = toku_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
}
......
......@@ -60,7 +60,7 @@ toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed, BOOL force_auto_c
}
BOOL nosync = (BOOL)(!force_auto_commit && !(env->i->open_flags & DB_AUTO_COMMIT));
u_int32_t txn_flags = DB_TXN_NOWAIT | (nosync ? DB_TXN_NOSYNC : 0);
int r = locked_txn_begin(env, NULL, txn, txn_flags);
int r = toku_txn_begin(env, NULL, txn, txn_flags);
if (r!=0) return r;
*changed = TRUE;
return 0;
......
......@@ -146,4 +146,3 @@ db_env_enable_engine_status(uint32_t enable) {
engine_status_enable = enable;
}
......@@ -18,6 +18,9 @@ extern uint32_t engine_status_enable;
// Called to use dlmalloc functions.
void setup_dlmalloc(void) __attribute__((__visibility__("default")));
// Test-only function
void toku_env_increase_last_xid(DB_ENV *env, uint64_t increment);
#if defined(__cplusplus)
}
#endif
......
......@@ -13,7 +13,6 @@
#include <valgrind/helgrind.h>
#include "ft/txn_manager.h"
static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION poll,
void *poll_extra, bool release_multi_operation_client_lock);
static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
......@@ -235,15 +234,6 @@ toku_txn_abort(DB_TXN * txn,
return r;
}
// Create a new transaction.
int
locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
toku_multi_operation_client_lock();
int r = toku_txn_begin(env, stxn, txn, flags);
toku_multi_operation_client_unlock();
return r;
}
static u_int32_t
locked_txn_id(DB_TXN *txn) {
u_int32_t r = toku_txn_id(txn);
......@@ -317,7 +307,7 @@ locked_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) {
return r;
}
static int
int
toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
HANDLE_PANICKED_ENV(env);
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists.
......@@ -397,8 +387,7 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
}
if (flags!=0) return toku_ydb_do_error(env, EINVAL, "Invalid flags passed to DB_ENV->txn_begin\n");
struct __toku_db_txn_external *XMALLOC(eresult); // so the internal stuff is stuck on the end.
memset(eresult, 0, sizeof(*eresult));
struct __toku_db_txn_external *XCALLOC(eresult); // so the internal stuff is stuck on the end.
DB_TXN *result = &eresult->external_part;
//toku_ydb_notef("parent=%p flags=0x%x\n", stxn, flags);
......@@ -416,13 +405,12 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
result->parent = stxn;
#if !TOKUDB_NATIVE_H
MALLOC(db_txn_struct_i(result));
CALLOC(db_txn_struct_i(result));
if (!db_txn_struct_i(result)) {
toku_free(result);
return ENOMEM;
}
#endif
memset(db_txn_struct_i(result), 0, sizeof *db_txn_struct_i(result));
db_txn_struct_i(result)->flags = txn_flags;
db_txn_struct_i(result)->iso = child_isolation;
......@@ -514,3 +502,9 @@ void toku_keep_prepared_txn_callback (DB_ENV *env, TOKUTXN tokutxn) {
(void) __sync_fetch_and_add(&env->i->open_txns, 1);
}
// Test-only function
void
toku_increase_last_xid(DB_ENV *env, uint64_t increment) {
toku_txn_manager_increase_last_xid(toku_logger_get_txn_manager(env->i->logger), increment);
}
......@@ -13,7 +13,7 @@ extern "C" {
// internally to synchronize with begin checkpoint. callers
// should not hold the multi operation lock.
int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
int locked_txn_commit(DB_TXN *txn, u_int32_t flags);
......@@ -21,6 +21,8 @@ int locked_txn_abort(DB_TXN *txn);
void toku_keep_prepared_txn_callback(DB_ENV *env, TOKUTXN tokutxn);
// Test-only function
void toku_increase_last_xid(DB_ENV *env, uint64_t increment) __attribute__((__visibility__("default")));
#if defined(__cplusplus)
}
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment