Commit 1c614497 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

some progress on recovery close[t:1835]

git-svn-id: file:///svn/toku/tokudb@14112 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0dc181a0
......@@ -24,6 +24,7 @@ u_int32_t cachesize = 127*1024*1024;
static int do_mysql = 0;
static u_int64_t start_range = 0, end_range = 0;
static int n_experiments = 2;
static int verbose = 0;
static int print_usage (const char *argv0) {
fprintf(stderr, "Usage:\n%s [--verify-lwc | --lwc | --nohwc] [--prelock] [--prelockflag] [--prelockwriteflag] [--env DIR]\n", argv0);
......@@ -52,13 +53,21 @@ int env_open_flags_yesx = DB_CREATE|DB_PRIVATE|DB_INIT_MPOOL|DB_INIT_TXN|DB_INIT
int env_open_flags_nox = DB_CREATE|DB_PRIVATE|DB_INIT_MPOOL;
char *dbfilename = "bench.db";
static double gettime (void) {
struct timeval tv;
int r = gettimeofday(&tv, 0);
assert(r==0);
return tv.tv_sec + 1e-6*tv.tv_usec;
}
static void parse_args (int argc, const char *argv[]) {
pname=argv[0];
argc--; argv++;
int specified_run_mode=0;
while (argc>0) {
if (strcmp(*argv,"--verify-lwc")==0) {
if (strcmp(*argv,"--verbose")==0) {
verbose++;
} else if (strcmp(*argv,"--verify-lwc")==0) {
if (specified_run_mode && run_mode!=RUN_VERIFY) { two_modes: fprintf(stderr, "You specified two run modes\n"); exit(1); }
run_mode = RUN_VERIFY;
} else if (strcmp(*argv, "--lwc")==0) {
......@@ -101,6 +110,9 @@ static void parse_args (int argc, const char *argv[]) {
} else if (strcmp(*argv, "--experiments") == 0 && argc > 1) {
argc--; argv++;
n_experiments = strtol(*argv, NULL, 10);
} else if (strcmp(*argv, "--recover") == 0) {
env_open_flags_yesx |= DB_RECOVER;
env_open_flags_nox |= DB_RECOVER;
} else {
exit(print_usage(pname));
}
......@@ -139,7 +151,11 @@ static void scanscan_setup (void) {
int r;
r = db_env_create(&env, 0); assert(r==0);
r = env->set_cachesize(env, 0, cachesize, 1); assert(r==0);
double tstart = gettime();
r = env->open(env, dbdir, do_txns? env_open_flags_yesx : env_open_flags_nox, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); assert(r==0);
double tend = gettime();
if (verbose)
printf("env open %f seconds\n", tend-tstart);
r = db_create(&db, env, 0); assert(r==0);
if (do_mysql) {
r = db->set_bt_compare(db, mysql_key_compare); assert(r == 0);
......@@ -178,13 +194,6 @@ static void scanscan_shutdown (void) {
#endif
}
static double gettime (void) {
struct timeval tv;
int r = gettimeofday(&tv, 0);
assert(r==0);
return tv.tv_sec + 1e-6*tv.tv_usec;
}
static void scanscan_hwc (void) {
int r;
int counter=0;
......@@ -472,7 +481,7 @@ int main (int argc, const char *argv[]) {
scanscan_shutdown();
#if defined(TOKUDB)
if (1) {
if (verbose) {
toku_cachetable_print_hash_histogram();
}
......@@ -483,17 +492,19 @@ int main (int argc, const char *argv[]) {
}
#endif
#if defined(__linux__) && __linux__
char fname[256];
sprintf(fname, "/proc/%d/status", toku_os_getpid());
FILE *f = fopen(fname, "r");
if (f) {
char line[256];
while (fgets(line, sizeof line, f)) {
int n;
if (sscanf(line, "VmPeak: %d", &n) || sscanf(line, "VmHWM: %d", &n) || sscanf(line, "VmRSS: %d", &n))
fputs(line, stdout);
if (verbose) {
char fname[256];
sprintf(fname, "/proc/%d/status", toku_os_getpid());
FILE *f = fopen(fname, "r");
if (f) {
char line[256];
while (fgets(line, sizeof line, f)) {
int n;
if (sscanf(line, "VmPeak: %d", &n) || sscanf(line, "VmHWM: %d", &n) || sscanf(line, "VmRSS: %d", &n))
fputs(line, stdout);
}
fclose(f);
}
fclose(f);
}
#endif
return 0;
......
......@@ -67,6 +67,7 @@ BRT_SOURCES = \
toku_worker \
trace_mem \
txn \
varray \
x1764 \
xids \
ybt \
......
......@@ -1533,13 +1533,6 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *loggerv) {
return 0;
}
static u_int64_t get_timestamp(void) {
struct timeval tv;
int r = gettimeofday(&tv, NULL);
assert(r == 0);
return (tv.tv_sec * 1000000ULL) + tv.tv_usec;
}
// TODO: #1510 locking of cachetable is suspect
// verify correct algorithm overall
......@@ -1575,7 +1568,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
// The checkpoint must be performed after the lock is acquired.
{
LSN begin_lsn; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed.
int r = toku_log_begin_checkpoint(logger, &begin_lsn, 0, get_timestamp());
int r = toku_log_begin_checkpoint(logger, &begin_lsn, 0, 0);
ct->lsn_of_checkpoint_in_progress = begin_lsn;
assert(r==0);
}
......@@ -1710,7 +1703,7 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_st
if (logger) {
int r = toku_log_end_checkpoint(logger, NULL,
1, // want the end_checkpoint to be fsync'd
ct->lsn_of_checkpoint_in_progress.lsn, get_timestamp());
ct->lsn_of_checkpoint_in_progress.lsn, 0);
assert(r==0);
toku_logger_note_checkpoint(logger, ct->lsn_of_checkpoint_in_progress);
}
......
......@@ -55,7 +55,7 @@ struct tokulogger {
struct mylock input_lock, output_lock; // acquired in that order
int is_open;
int is_panicked;
int write_log_files;
BOOL write_log_files;
int panic_errno;
char *directory;
int fd;
......
......@@ -305,10 +305,22 @@ int toku_logcursor_first(TOKULOGCURSOR lc, struct log_entry **le) {
if (r!=0)
return r;
lc->cur_logfiles_index = 0;
}
r = toku_log_fread(lc->cur_fp, &(lc->entry));
if (r!=0)
return r;
}
while (1) {
r = toku_log_fread(lc->cur_fp, &(lc->entry));
if (r==0)
break;
// move to next file
r = lc_close_cur_logfile(lc);
if (r!=0)
return r;
if ( lc->cur_logfiles_index == lc->n_logfiles-1)
return DB_NOTFOUND;
lc->cur_logfiles_index++;
r = lc_open_logfile(lc, lc->cur_logfiles_index);
if (r!= 0)
return r;
}
r = lc_check_lsn(lc, LC_FIRST);
if (r!=0)
return r;
......@@ -335,13 +347,25 @@ int toku_logcursor_last(TOKULOGCURSOR lc, struct log_entry **le) {
return r;
lc->cur_logfiles_index = lc->n_logfiles-1;
}
// seek to end
r = fseek(lc->cur_fp, 0, SEEK_END);
assert(0==r);
// read backward
r = toku_log_fread_backward(lc->cur_fp, &(lc->entry));
if (r!=0)
return r;
while (1) {
// seek to end
r = fseek(lc->cur_fp, 0, SEEK_END);
assert(0==r);
// read backward
r = toku_log_fread_backward(lc->cur_fp, &(lc->entry));
if (r==0)
break;
// move to previous file
r = lc_close_cur_logfile(lc);
if (r!=0)
return r;
if ( lc->cur_logfiles_index == 0 )
return DB_NOTFOUND;
lc->cur_logfiles_index--;
r = lc_open_logfile(lc, lc->cur_logfiles_index);
if (r!=0)
return r;
}
r = lc_check_lsn(lc, LC_LAST);
if (r!=0)
return r;
......
......@@ -130,7 +130,7 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "key", 0},
{"BYTESTRING", "value", 0},
NULLFIELD}},
{"shutdown", 'S', FA{NULLFIELD}},
{"shutdown", 'S', FA{{"u_int64_t", "timestamp", 0}, NULLFIELD}},
{"timestamp", 'T', FA{{"u_int64_t", "timestamp", 0},
{"BYTESTRING", "comment", 0},
NULLFIELD}},
......@@ -273,10 +273,20 @@ generate_dispatch (void) {
});
fprintf(hf, " }} while (0)\n");
}
static void
generate_get_timestamp(void) {
fprintf(cf, "static u_int64_t toku_get_timestamp(void) {\n");
fprintf(cf, " struct timeval tv; int r = gettimeofday(&tv, NULL);\n");
fprintf(cf, " assert(r==0);\n");
fprintf(cf, " return tv.tv_sec * 1000000ULL + tv.tv_usec;\n");
fprintf(cf, "}\n");
}
static void
generate_log_writer (void) {
fprintf(cf, "static u_int64_t toku_lsn_increment=1;\nvoid toku_set_lsn_increment (uint64_t incr) { assert(incr>0 && incr< (16LL<<32)); toku_lsn_increment=incr; }\n");
generate_get_timestamp();
DO_LOGTYPES(lt, {
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger, LSN *lsnp, int do_fsync", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
......@@ -310,6 +320,8 @@ generate_log_writer (void) {
fprintf(cf, " lbytes->lsn = lsn;\n");
fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n");
DO_FIELDS(ft, lt,
if (strcmp(ft->name, "timestamp") == 0)
fprintf(cf, " if (timestamp == 0) timestamp = toku_get_timestamp();\n");
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " int r= toku_logger_finish(logger, lbytes, &wbuf, do_fsync);\n");
fprintf(cf, " assert(wbuf.ndone==buflen);\n");
......
......@@ -11,7 +11,6 @@ static toku_pthread_mutex_t logger_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER;
static int (*toku_os_fsync_function)(int)=fsync;
static int open_logfile (TOKULOGGER logger);
static int toku_logger_fsync_null(int fd __attribute__((__unused__)));
static int do_write (TOKULOGGER logger, int do_fsync);
int toku_logger_create (TOKULOGGER *resultp) {
......@@ -20,7 +19,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
if (result==0) return errno;
result->is_open=0;
result->is_panicked=0;
result->write_log_files = 1;
result->write_log_files = TRUE;
result->lg_max = 100<<20; // 100MB default
result->head = result->tail = 0;
result->lsn = result->written_lsn = result->fsynced_lsn = (LSN){0};
......@@ -70,14 +69,24 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) {
long long nexti;
r = toku_logger_find_next_unused_log_file(directory, &nexti);
if (r!=0) return r;
logger->directory = toku_strdup(directory);
if (toku_os_is_absolute_name(directory)) {
logger->directory = toku_strdup(directory);
} else {
char *cwd = getcwd(NULL, 0);
if (cwd == NULL)
return -1;
char *new_log_dir = toku_malloc(strlen(cwd) + strlen(directory) + 2);
if (new_log_dir == NULL)
return -2;
sprintf(new_log_dir, "%s/%s", cwd, directory);
logger->directory = new_log_dir;
toku_free(cwd);
}
if (logger->directory==0) return errno;
logger->next_log_file_number = nexti;
open_logfile(logger);
logger->is_open = 1;
if (!logger->write_log_files)
toku_set_func_fsync(toku_logger_fsync_null);
return 0;
}
......@@ -107,7 +116,9 @@ int toku_logger_log_bytes (TOKULOGGER logger, struct logbytes *bytes, int do_fsy
/* Our LSN has been written. We have the output lock */
if (do_fsync && logger->fsynced_lsn.lsn > bytes->lsn.lsn) {
/* But we need to fsync it. */
r = toku_os_fsync_function(logger->fd);
if (logger->write_log_files) {
r = toku_os_fsync_function(logger->fd); assert(r == 0);
}
logger->fsynced_lsn = logger->written_lsn;
}
}
......@@ -160,6 +171,16 @@ int toku_logger_close(TOKULOGGER *loggerp) {
return r;
}
int toku_logger_shutdown(TOKULOGGER logger) {
int r = 0;
// TODO: checkpoint?
// log a shutdown
if (logger->is_open)
r = toku_log_shutdown(logger, NULL, TRUE, 0);
return r;
}
#if 0
int toku_logger_log_checkpoint (TOKULOGGER logger) {
......@@ -203,8 +224,8 @@ int toku_logger_is_open(TOKULOGGER logger) {
return logger->is_open;
}
void toku_logger_set_cachetable (TOKULOGGER tl, CACHETABLE ct) {
tl->ct = ct;
void toku_logger_set_cachetable (TOKULOGGER logger, CACHETABLE ct) {
logger->ct = ct;
}
int toku_logger_set_lg_max(TOKULOGGER logger, u_int32_t lg_max) {
......@@ -244,10 +265,6 @@ int toku_logger_lock_destroy(void) {
return r;
}
static int toku_logger_fsync_null(int fd __attribute__((__unused__))) {
return 0;
}
int toku_logger_find_next_unused_log_file(const char *directory, long long *result) {
DIR *d=opendir(directory);
long long max=-1; *result = max;
......@@ -309,10 +326,6 @@ int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_lo
return d ? closedir(d) : 0;
}
void toku_logger_write_log_files (TOKULOGGER tl, int write_log_files) {
tl->write_log_files = write_log_files;
}
// Write something out. Keep trying even if partial writes occur.
// On error: Return negative with errno set.
// On success return nbytes.
......@@ -334,13 +347,14 @@ static int open_logfile (TOKULOGGER logger) {
char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012lld.tokulog", logger->directory, logger->next_log_file_number);
if (logger->write_log_files) {
logger->fd = open(fname, O_CREAT+O_WRONLY+O_TRUNC+O_EXCL+O_BINARY, S_IRWXU); if (logger->fd==-1) return errno;
logger->fd = open(fname, O_CREAT+O_WRONLY+O_TRUNC+O_EXCL+O_BINARY, S_IRWXU);
if (logger->fd==-1) return errno;
logger->next_log_file_number++;
} else {
logger->fd = open(DEV_NULL_FILE, O_WRONLY+O_BINARY);
// printf("%s: %s %d\n", __FUNCTION__, DEV_NULL_FILE, logger->fd); fflush(stdout);
if (logger->fd==-1) return errno;
}
logger->next_log_file_number++;
r = write_it(logger->fd, "tokulogg", 8); if (r!=8) return errno;
int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order
r = write_it(logger->fd, &version_l, 4); if (r!=4) return errno;
......@@ -351,11 +365,18 @@ static int open_logfile (TOKULOGGER logger) {
static int close_and_open_logfile (TOKULOGGER logger) {
int r;
r = toku_os_fsync_function(logger->fd); if (r!=0) return errno;
if (logger->write_log_files) {
r = toku_os_fsync_function(logger->fd); if (r!=0) return errno;
}
r = close(logger->fd); if (r!=0) return errno;
return open_logfile(logger);
}
void toku_logger_write_log_files (TOKULOGGER logger, BOOL write_log_files) {
assert(!logger->is_open);
logger->write_log_files = write_log_files;
}
// Enter holding both locks
// Exit holding only the output_lock
static int do_write (TOKULOGGER logger, int do_fsync) {
......@@ -402,7 +423,9 @@ static int do_write (TOKULOGGER logger, int do_fsync) {
if (r!=logger->n_in_buf) { r=errno; goto panic; }
logger->n_in_buf=0;
if (do_fsync) {
r = toku_os_fsync_function(logger->fd);
if (logger->write_log_files) {
r = toku_os_fsync_function(logger->fd); assert(r == 0);
}
logger->fsynced_lsn = logger->written_lsn;
}
return 0;
......@@ -411,6 +434,27 @@ static int do_write (TOKULOGGER logger, int do_fsync) {
return r;
}
int toku_logger_restart(TOKULOGGER logger, LSN lastlsn) {
int r;
// flush out the log buffer
r = ml_lock(&logger->output_lock); assert(r == 0);
r = ml_lock(&logger->input_lock); assert(r == 0);
r = do_write(logger, TRUE); assert(r == 0);
r = ml_unlock(&logger->output_lock); assert(r == 0);
// close the log file
r = close(logger->fd); assert(r == 0);
logger->fd = -1;
// reset the LSN's to the lastlsn when the logger was opened
logger->lsn = logger->written_lsn = logger->fsynced_lsn = lastlsn;
logger->write_log_files = TRUE;
// open a new log file
return open_logfile(logger);
}
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, int mode) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
......@@ -736,10 +780,9 @@ LSN toku_txn_get_last_lsn (TOKUTXN txn) {
if (txn==0) return (LSN){0};
return txn->last_lsn;
}
LSN toku_logger_last_lsn(TOKULOGGER logger) {
LSN result=logger->lsn;
result.lsn--;
return result;
return logger->lsn;
}
TOKULOGGER toku_txn_logger (TOKUTXN txn) {
......
......@@ -10,13 +10,14 @@ enum { TOKU_LOG_VERSION = 1 };
int toku_logger_create (TOKULOGGER *resultp);
int toku_logger_open (const char *directory, TOKULOGGER logger);
int toku_logger_log_bytes (TOKULOGGER logger, struct logbytes *bytes, int do_fsync);
int toku_logger_shutdown(TOKULOGGER logger);
int toku_logger_close(TOKULOGGER *loggerp);
int toku_logger_fsync (TOKULOGGER logger);
void toku_logger_panic (TOKULOGGER logger, int err);
int toku_logger_panicked(TOKULOGGER logger);
int toku_logger_is_open(TOKULOGGER logger);
void toku_logger_set_cachetable (TOKULOGGER tl, CACHETABLE ct);
void toku_logger_set_cachetable (TOKULOGGER logger, CACHETABLE ct);
int toku_logger_set_lg_max(TOKULOGGER logger, u_int32_t lg_max);
int toku_logger_get_lg_max(TOKULOGGER logger, u_int32_t *lg_maxp);
int toku_logger_set_lg_bsize(TOKULOGGER logger, u_int32_t bsize);
......@@ -24,7 +25,20 @@ int toku_logger_set_lg_bsize(TOKULOGGER logger, u_int32_t bsize);
int toku_logger_lock_init(void);
int toku_logger_lock_destroy(void);
void toku_logger_write_log_files (TOKULOGGER tl, int write_log_files);
void toku_logger_write_log_files (TOKULOGGER logger, BOOL write_log_files);
// Restart the logger. This function is used by recovery to really start
// logging.
// Effects: Flush the current log buffer, reset the logger's lastlsn, and
// open a new log file.
// Returns: 0 if success
int toku_logger_restart(TOKULOGGER logger, LSN lastlsn);
// Maybe trim the log entries from the log that are older than the given LSN
// Effect: find all of the log files whose largest LSN is smaller than the
// given LSN and delete them.
// Returns: 0 if success
int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN lsn);
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, int mode);
int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum);
......
......@@ -10,6 +10,8 @@
// ../../../newbrt/recover ../dir.test_log2.c.tdb
#include "includes.h"
#include "log_header.h"
#include "varray.h"
//#define DO_VERIFY_COUNTS
#ifdef DO_VERIFY_COUNTS
......@@ -18,11 +20,11 @@
#define VERIFY_COUNTS(n) ((void)0)
#endif
static DB * const null_db=0;
static TOKULOGGER const null_tokulogger = 0;
// These data structures really should be part of a recovery data structure. Recovery could be multithreaded (on different environments...) But this is OK since recovery can only happen in one
static CACHETABLE ct;
static CACHETABLE recover_ct;
static TOKULOGGER recover_logger;
// TODO why can't we use the cachetable to find by filenum?
static struct cf_pair {
FILENUM filenum;
CACHEFILE cf;
......@@ -31,24 +33,49 @@ static struct cf_pair {
static int n_cf_pairs=0, max_cf_pairs=0;
int toku_recover_init (void) {
int r = toku_create_cachetable(&ct, 1<<25, (LSN){0}, 0);
int r;
r = toku_create_cachetable(&recover_ct, 1<<25, (LSN){0}, 0);
assert(r == 0);
r = toku_logger_create(&recover_logger);
assert(r == 0);
toku_logger_write_log_files(recover_logger, FALSE);
toku_logger_set_cachetable(recover_logger, recover_ct);
return r;
}
void toku_recover_cleanup (void) {
static void toku_recover_close_dictionaries(void) {
int r;
int i;
for (i=0; i<n_cf_pairs; i++) {
if (cf_pairs[i].brt) {
int r = toku_close_brt(cf_pairs[i].brt, 0, 0);
r = toku_close_brt(cf_pairs[i].brt, 0, 0);
//r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r==0);
assert(r == 0);
}
}
n_cf_pairs = 0;
toku_free(cf_pairs);
{
int r = toku_cachetable_close(&ct);
assert(r==0);
}
cf_pairs = NULL;
}
void toku_recover_cleanup (void) {
int r;
if (cf_pairs)
toku_recover_close_dictionaries();
r = toku_logger_close(&recover_logger);
assert(r == 0);
r = toku_cachetable_close(&recover_ct);
assert(r == 0);
}
// Null function supplied to transaction commit and abort
static void recover_yield(voidfp UU(f), void *UU(extra)) {
// nothing
}
enum backward_state { BS_INIT, BS_SAW_CKPT_END, BS_SAW_CKPT };
......@@ -56,28 +83,54 @@ struct backward_scan_state {
enum backward_state bs;
LSN checkpoint_lsn;
int n_live_txns;
LSN min_live_txn;
TXNID min_live_txn;
};
static struct backward_scan_state initial_bss = {BS_INIT,{0},0,{0}};
static struct backward_scan_state initial_bss = {BS_INIT,{0},0,0};
static void toku_recover_commit (LSN UU(lsn), TXNID xid) {
int r;
// find the transaction by transaction id
TOKUTXN txn;
r = toku_txnid2txn(recover_logger, xid, &txn);
assert(r == 0);
// commit the transaction
r = toku_txn_commit_txn(txn, TRUE, recover_yield, NULL);
assert(r == 0);
static void
toku_recover_commit (LSN UU(lsn), TXNID UU(txnid)) {
// close the transaction
toku_txn_close_txn(txn);
}
static int toku_recover_backward_commit (struct logtype_commit *UU(l), struct backward_scan_state *UU(bs)) {
// nothing
return 0;
}
static void
toku_recover_xabort (LSN UU(lsn), TXNID UU(txnid)) {
static void toku_recover_xabort (LSN UU(lsn), TXNID xid) {
int r;
// find the transaction by transaction id
TOKUTXN txn;
r = toku_txnid2txn(recover_logger, xid, &txn);
assert(r == 0);
// abort the transaction
r = toku_txn_abort_txn(txn, recover_yield, NULL);
assert(r == 0);
// close the transaction
toku_txn_close_txn(txn);
}
static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), struct backward_scan_state *UU(bs)) {
// nothing
return 0;
}
static void
create_dir_from_file (const char *fname) {
static void create_dir_from_file (const char *fname) {
int i;
char *tmp=toku_strdup(fname);
char ch;
......@@ -94,7 +147,7 @@ create_dir_from_file (const char *fname) {
if (r!=0 && errno!=EEXIST) {
printf("error: %s\n", strerror(errno));
}
assert (r==0 || (errno==EEXIST));
assert (r == 0 || (errno==EEXIST));
umask(oldu);
tmp[i]=ch;
}
......@@ -103,8 +156,7 @@ create_dir_from_file (const char *fname) {
toku_free(tmp);
}
static int
toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf, BRT brt) {
static int toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf, BRT brt) {
if (max_cf_pairs==0) {
n_cf_pairs=1;
max_cf_pairs=2;
......@@ -112,8 +164,9 @@ toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf, BRT brt) {
if (cf_pairs==0) return errno;
} else {
if (n_cf_pairs>=max_cf_pairs) {
cf_pairs = toku_realloc(cf_pairs, 2*max_cf_pairs*sizeof(*cf_pairs));
assert(cf_pairs);
max_cf_pairs*=2;
cf_pairs = toku_realloc(cf_pairs, max_cf_pairs*sizeof(*cf_pairs));
}
n_cf_pairs++;
}
......@@ -134,10 +187,8 @@ static int find_cachefile (FILENUM fnum, struct cf_pair **cf_pair) {
return 1;
}
static void
internal_toku_recover_fopen_or_fcreate (int flags, int mode, char *fixedfname, FILENUM filenum)
// Open the file if it is not already open. If it is already open, then do nothing.
{
static void internal_toku_recover_fopen_or_fcreate (int flags, int mode, char *fixedfname, FILENUM filenum) {
{
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
......@@ -158,14 +209,14 @@ internal_toku_recover_fopen_or_fcreate (int flags, int mode, char *fixedfname, F
assert(fd>=0);
BRT brt=0;
int r = toku_brt_create(&brt);
assert(r==0);
assert(r == 0);
brt->fname = fixedfname;
brt->h=0;
brt->compare_fun = toku_default_compare_fun; // we'll need to set these to the right comparison function, or do without them.
brt->dup_compare = toku_default_compare_fun;
brt->db = 0;
r = toku_cachetable_openfd(&cf, ct, fd, fixedfname);
assert(r==0);
r = toku_cachetable_openfd(&cf, recover_ct, fd, fixedfname);
assert(r == 0);
brt->cf=cf;
r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r==TOKUDB_DICTIONARY_NO_HEADER) {
......@@ -174,8 +225,7 @@ internal_toku_recover_fopen_or_fcreate (int flags, int mode, char *fixedfname, F
toku_recover_note_cachefile(filenum, cf, brt);
}
static void
toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM filenum) {
static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FILENUM filenum) {
char *fixedfname = fixup_fname(&fname);
internal_toku_recover_fopen_or_fcreate(0, 0, fixedfname, filenum);
}
......@@ -184,110 +234,92 @@ static int toku_recover_backward_fopen (struct logtype_fopen *UU(l), struct back
return 0;
}
// fcreate is like fopen except that the file must be created. Also creates the dir if needed.
static void
toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid), FILENUM filenum, BYTESTRING fname,u_int32_t mode) {
static void toku_recover_fcreate (LSN UU(lsn), TXNID UU(xid), FILENUM filenum, BYTESTRING fname,u_int32_t mode) {
char *fixedfname = fixup_fname(&fname);
create_dir_from_file(fixedfname);
internal_toku_recover_fopen_or_fcreate(O_CREAT|O_TRUNC, mode, fixedfname, filenum);
}
static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), struct backward_scan_state *UU(bs)) {
// nothing
return 0;
}
static void
toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
static void toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
struct brt_cmd cmd;
// TODO compare file LSN with this XID
TOKUTXN txn;
r = toku_txnid2txn(recover_logger, xid, &txn);
assert(r == 0);
DBT keydbt, valdbt;
cmd.type=BRT_INSERT;
//TODO: #1125 and recovery: Remove this hack
// Assume this is a root txn (not yet enough info to construct full XIDS for message)
XIDS root = xids_get_root_xids();
r = xids_create_child(root, &cmd.xids, xid);
assert(r==0);
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
xids_destroy(&cmd.xids);
xids_destroy(&root);
toku_fill_dbt(&keydbt, key.data, key.len);
toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_insert(pair->brt, &keydbt, &valdbt, txn);
assert(r == 0);
}
static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), struct backward_scan_state *UU(bs)) {
// nothing
return 0;
}
static void
toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
static void toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
struct brt_cmd cmd;
// TODO compare file LSN with this XID
TOKUTXN txn;
r = toku_txnid2txn(recover_logger, xid, &txn);
assert(r == 0);
DBT keydbt, valdbt;
cmd.type = BRT_DELETE_BOTH;
//TODO: #1125 and recovery: Remove this hack
// Assume this is a root txn (not yet enough info to construct full XIDS for message)
XIDS root = xids_get_root_xids();
r = xids_create_child(root, &cmd.xids, xid);
assert(r==0);
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
xids_destroy(&cmd.xids);
xids_destroy(&root);
toku_fill_dbt(&keydbt, key.data, key.len);
toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_delete_both(pair->brt, &keydbt, &valdbt, txn);
assert(r == 0);
}
static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both *UU(l), struct backward_scan_state *UU(bs)) {
// nothing
return 0;
}
static void
toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
static void toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING UU(val)) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
struct brt_cmd cmd;
DBT keydbt, valdbt;
cmd.type = BRT_DELETE_ANY;
//TODO: #1125 and recovery: Remove this hack
// Assume this is a root txn (not yet enough info to construct full XIDS for message)
XIDS root = xids_get_root_xids();
r = xids_create_child(root, &cmd.xids, xid);
assert(r==0);
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
xids_destroy(&cmd.xids);
xids_destroy(&root);
// TODO compare file LSN with this XID
TOKUTXN txn;
r = toku_txnid2txn(recover_logger, xid, &txn);
assert(r == 0);
DBT keydbt;
toku_fill_dbt(&keydbt, key.data, key.len);
r = toku_brt_delete(pair->brt, &keydbt, txn);
assert(r == 0);
}
static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), struct backward_scan_state *UU(bs)) {
// nothing
return 0;
}
static void
toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
static void toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
assert(r == 0);
r = toku_close_brt(pair->brt, 0, 0);
assert(r==0);
assert(r == 0);
pair->brt=0;
}
......@@ -296,6 +328,7 @@ static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), struct ba
}
static int toku_recover_begin_checkpoint (LSN UU(lsn), u_int64_t UU(timestamp)) {
// nothing
return 0;
}
......@@ -318,7 +351,8 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi
abort();
}
static int toku_recover_end_checkpoint (LSN UU(lsn), TXNID UU(txnid), u_int64_t UU(timestamp)) {
static int toku_recover_end_checkpoint (LSN UU(lsn), TXNID UU(xid), u_int64_t UU(timestamp)) {
// nothing
return 0;
}
......@@ -338,18 +372,19 @@ static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *
abort();
}
// going backward we open it, so going forward we don't have to do anything
static int toku_recover_fassociate (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(fname)) {
// nothing
return 0;
}
static int toku_recover_backward_fassociate (struct logtype_fassociate *l, struct backward_scan_state *UU(bs)) {
char *fixedfname = fixup_fname(&l->fname);
internal_toku_recover_fopen_or_fcreate(0, 0, fixedfname, l->filenum);
internal_toku_recover_fopen_or_fcreate(0, 0, fixedfname, l->filenum);
return 0;
}
static int toku_recover_xstillopen (LSN UU(lsn), TXNID UU(txnid), TXNID UU(parent)) {
static int toku_recover_xstillopen (LSN UU(lsn), TXNID UU(xid), TXNID UU(parent)) {
// nothing
return 0;
}
......@@ -359,9 +394,9 @@ static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, struc
return 0; // ignore live txns from incomplete checkpoint
case BS_SAW_CKPT_END:
if (bs->n_live_txns==0) {
bs->min_live_txn = l->lsn;
bs->min_live_txn = l->txnid;
} else {
if (bs->min_live_txn.lsn > l->txnid) bs->min_live_txn = l->lsn;
if (bs->min_live_txn > l->txnid) bs->min_live_txn = l->txnid;
}
bs->n_live_txns++;
return 0;
......@@ -372,8 +407,17 @@ static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, struc
abort();
}
static int toku_recover_xbegin (LSN lsn, TXNID parent_xid) {
int r;
// TODO do something with the parent
assert(parent_xid == 0);
TOKUTXN parent = NULL;
static int toku_recover_xbegin (LSN UU(lsn), TXNID UU(parent)) {
// create a transaction and bind it to the transaction id
TOKUTXN txn;
r = toku_txn_begin_with_xid(parent, &txn, recover_logger, lsn.lsn);
assert(r == 0);
return 0;
}
......@@ -386,11 +430,11 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, struct backwa
case BS_SAW_CKPT:
assert(bs->n_live_txns > 0); // the only thing we are doing here is looking for a live txn, so there better be one
// If we got to the min, return nonzero
if (bs->min_live_txn.lsn >= l->lsn.lsn) {
if (bs->min_live_txn >= l->lsn.lsn) {
fprintf(stderr, "Turning around at xbegin %" PRIu64 "\n", l->lsn.lsn);
return 1;
} else {
fprintf(stderr, "scanning back at xbegin %" PRIu64 " (looking for %" PRIu64 "\n", l->lsn.lsn, bs->min_live_txn.lsn);
fprintf(stderr, "scanning back at xbegin %" PRIu64 " (looking for %" PRIu64 ")\n", l->lsn.lsn, bs->min_live_txn);
return 0;
}
}
......@@ -399,18 +443,22 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, struct backwa
}
static int toku_recover_timestamp (LSN UU(lsn), u_int64_t UU(timestamp), BYTESTRING UU(comment)) {
// nothing
return 0;
}
static int toku_recover_backward_timestamp (struct logtype_timestamp *UU(l), struct backward_scan_state *UU(bs)) {
// nothing
return 0;
}
static int toku_recover_shutdown (LSN UU(lsn)) {
static int toku_recover_shutdown (LSN UU(lsn), u_int64_t UU(timestamp)) {
// nothing
return 0;
}
static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), struct backward_scan_state *UU(bs)) {
// nothing
return 0;
}
......@@ -424,7 +472,7 @@ static int toku_delete_rolltmp_files (const char *log_dir) {
while ((de=readdir(d))) {
char rolltmp_prefix[] = "__rolltmp.";
int r = memcmp(de->d_name, rolltmp_prefix, sizeof(rolltmp_prefix) - 1);
if (r==0) {
if (r == 0) {
int fnamelen = strlen(log_dir) + strlen(de->d_name) + 2; // One for the slash and one for the trailing NUL.
char fname[fnamelen];
int l = snprintf(fname, fnamelen, "%s/%s", log_dir, de->d_name);
......@@ -443,24 +491,77 @@ static int toku_delete_rolltmp_files (const char *log_dir) {
return result;
}
int tokudb_recover(const char *data_dir, const char *log_dir) {
int failresult = 0;
// Does this log environment need recovery?
// Effects: If there are no log files, or if there is a "null" checkpoint at the end of the log,
// or if there is a shutdown at the end of the log, then we don't need recovery to run.
// Returns: TRUE if it does, FALSE if it does not
static int tokudb_needs_recovery(const char *log_dir) {
int needs_recovery;
int r;
int lockfd;
TOKULOGCURSOR logcursor = NULL;
{
const char fname[] = "/__recoverylock_dont_delete_me";
int namelen=strlen(data_dir);
char lockfname[namelen+sizeof(fname)];
r = toku_logcursor_create(&logcursor, log_dir);
if (r != 0) {
needs_recovery = TRUE; goto exit;
}
struct log_entry *le = NULL;
r = toku_logcursor_last(logcursor, &le);
if (r == DB_NOTFOUND) {
needs_recovery = FALSE; goto exit;
}
if (r != 0) {
needs_recovery = TRUE; goto exit;
}
if (le->cmd == LT_shutdown) {
needs_recovery = FALSE; goto exit;
}
if (le->cmd != LT_end_checkpoint) {
needs_recovery = TRUE; goto exit;
}
struct log_entry end_checkpoint = *le;
int l = snprintf(lockfname, sizeof(lockfname), "%s%s", data_dir, fname);
assert(l+1 == (signed)(sizeof(lockfname)));
lockfd = toku_os_lock_file(lockfname);
if (lockfd<0) {
printf("Couldn't run recovery because some other process holds the recovery lock %s\n", lockfname);
return errno;
}
r = toku_logcursor_prev(logcursor, &le);
if (r != 0 || le->cmd != LT_begin_checkpoint) {
needs_recovery = TRUE; goto exit;
}
if (le->u.begin_checkpoint.lsn.lsn != end_checkpoint.u.end_checkpoint.txnid) {
needs_recovery = TRUE; goto exit;
}
needs_recovery = FALSE;
exit:
if (logcursor) {
r = toku_logcursor_destroy(&logcursor);
assert(r == 0);
}
return needs_recovery;
}
// Append a transaction to the set of live transactions
static int append_live_txn(OMTVALUE v, u_int32_t UU(i), void *extra) {
TOKUTXN txn = (TOKUTXN) v;
struct varray *live_txns = (struct varray *) extra;
varray_append(live_txns, txn);
return 0;
}
// Abort a live transaction
static void abort_live_txn(void *v, void *UU(extra)) {
TOKUTXN txn = (TOKUTXN) v;
// abort the transaction
int r = toku_txn_abort_txn(txn, recover_yield, NULL);
assert(r == 0);
// close the transaction
toku_txn_close_txn(txn);
}
static const int toku_recover_trace = 0;
static int really_do_recovery(const char *data_dir, const char *log_dir) {
int r;
char org_wd[1000];
{
......@@ -469,56 +570,117 @@ int tokudb_recover(const char *data_dir, const char *log_dir) {
//printf("%s:%d org_wd=\"%s\"\n", __FILE__, __LINE__, org_wd);
}
r = toku_delete_rolltmp_files(log_dir);
if (r!=0) { failresult=r; goto fail; }
toku_recover_init();
r = toku_logger_open(log_dir, recover_logger);
assert(r == 0);
LSN lastlsn = toku_logger_last_lsn(recover_logger);
TOKULOGCURSOR logcursor;
r = toku_logcursor_create(&logcursor, log_dir);
assert(r == 0);
toku_recover_init();
r = chdir(data_dir);
assert(r == 0);
struct log_entry *le;
// scan backwards
struct backward_scan_state bs = initial_bss;
while (1) {
le = NULL;
r = toku_logcursor_prev(logcursor, &le);
if (0) printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, r==0 ? le->cmd:'?');
if (toku_recover_trace) printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, le ? le->cmd : '?');
if (r != 0)
break;
logtype_dispatch_assign(le, toku_recover_backward_, r, &bs);
if (r != 0)
if (r != 0) {
if (toku_recover_trace) printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, le ? le->cmd : '?');
logtype_dispatch_args(le, toku_recover_);
break;
}
}
// scan forwards
while (1) {
le = NULL;
r = toku_logcursor_next(logcursor, &le);
if (0) printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, r==0 ? le->cmd:'?');
if (toku_recover_trace) printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, le ? le->cmd : '?');
if (r != 0)
break;
logtype_dispatch_args(le, toku_recover_);
}
r = toku_logcursor_destroy(&logcursor);
assert(r == 0);
// restart logging
toku_logger_restart(recover_logger, lastlsn);
// abort all of the remaining live transactions
struct varray *live_txns = NULL;
r = varray_create(&live_txns, 1);
assert(r == 0);
toku_omt_iterate(recover_logger->live_txns, append_live_txn, live_txns);
varray_iterate(live_txns, abort_live_txn, NULL);
varray_destroy(&live_txns);
// close the open dictionaries
toku_recover_close_dictionaries();
// write a recovery log entry
BYTESTRING recover_comment = { strlen("recover"), "recover" };
r = toku_log_timestamp(recover_logger, NULL, TRUE, 0, recover_comment);
assert(r == 0);
// checkpoint
// TODO: checkpoint locks needed here?
r = toku_cachetable_begin_checkpoint(recover_ct, recover_logger);
assert(r == 0);
// TODO: what about the error_string?
r = toku_cachetable_end_checkpoint(recover_ct, recover_logger, NULL);
assert(r == 0);
toku_recover_cleanup();
r = chdir(org_wd);
assert(r == 0);
r = toku_logcursor_destroy(&logcursor);
assert(r == 0);
return 0;
}
int tokudb_recover(const char *data_dir, const char *log_dir) {
int r;
int lockfd;
{
const char fname[] = "/__recoverylock_dont_delete_me";
int namelen=strlen(data_dir);
char lockfname[namelen+sizeof(fname)];
int l = snprintf(lockfname, sizeof(lockfname), "%s%s", data_dir, fname);
assert(l+1 == (signed)(sizeof(lockfname)));
lockfd = toku_os_lock_file(lockfname);
if (lockfd<0) {
printf("Couldn't run recovery because some other process holds the recovery lock %s\n", lockfname);
return errno; }
}
r = toku_delete_rolltmp_files(log_dir);
if (r != 0) {
toku_os_unlock_file(lockfd);
return r;
}
r=toku_os_unlock_file(lockfd);
if (r!=0) return errno;
int rr = 0;
if (tokudb_needs_recovery(log_dir))
rr = really_do_recovery(data_dir, log_dir);
r = toku_os_unlock_file(lockfd);
if (r != 0) return errno;
//printf("%s:%d recovery successful! ls -l says\n", __FILE__, __LINE__);
//system("ls -l");
return 0;
fail:
toku_os_unlock_file(lockfd);
chdir(org_wd);
return failresult;
return rr;
}
......@@ -56,10 +56,66 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
*tokutxn = result;
return 0;
died:
// TODO memory leak
toku_logger_panic(logger, r);
return r;
}
int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid) {
if (logger->is_panicked) return EINVAL;
TAGMALLOC(TOKUTXN, result);
if (result==0)
return errno;
int r;
result->first_lsn.lsn = xid;
r = toku_omt_create(&result->open_brts);
if (r!=0) goto died;
result->txnid64 = result->first_lsn.lsn;
XIDS parent_xids;
if (parent_tokutxn==NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent_tokutxn->xids;
if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64)))
goto died;
result->logger = logger;
result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0;
result->rollentry_arena = memarena_create();
if (toku_omt_size(logger->live_txns) == 0) {
assert(logger->oldest_living_xid == TXNID_NONE_LIVING);
logger->oldest_living_xid = result->txnid64;
}
assert(logger->oldest_living_xid <= result->txnid64);
{
//Add txn to list (omt) of live transactions
u_int32_t idx;
r = toku_omt_insert(logger->live_txns, result, find_xid, result, &idx);
if (r!=0) goto died;
if (logger->oldest_living_xid == result->txnid64)
assert(idx == 0);
else
assert(idx > 0);
}
result->rollentry_resident_bytecount=0;
result->rollentry_raw_count = 0;
result->rollentry_filename = 0;
result->rollentry_fd = -1;
result->rollentry_filesize = 0;
*tokutxn = result;
return 0;
died:
// TODO memory leak
toku_logger_panic(logger, r);
return r;
}
// Doesn't close the txn, just performs the commit operations.
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv) {
int r;
......
......@@ -6,6 +6,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger);
int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv);
void toku_txn_close_txn(TOKUTXN txn);
......
#include <varray.h>
#include <memory.h>
#include <toku_assert.h>
#include <errno.h>
struct varray {
int c; // current number in the array
int n; // size of the array
void **a; // array of pointers
};
int varray_create(struct varray **vap, int n) {
struct varray *va = toku_malloc(sizeof (struct varray));
if (va == NULL) {
int e = errno; return e;
}
va->n = n;
va->c = 0;
va->a = toku_malloc(va->n * (sizeof (void *)));
if (va->a == NULL) {
int e = errno;
toku_free(va);
return e;
}
*vap = va;
return 0;
}
void varray_destroy(struct varray **vap) {
struct varray *va = *vap; *vap = NULL;
toku_free(va->a);
toku_free(va);
}
int varray_current_size(struct varray *va) {
return va->c;
}
int varray_append(struct varray *va, void *p) {
if (va->c >= va->n) {
void *newa = toku_realloc(va->a, 2 * va->n * sizeof (void *));
if (newa == NULL) {
int e = errno;
assert(e != 0);
return e;
}
va->a = newa;
va->n *= 2;
}
va->a[va->c++] = p;
return 0;
}
void varray_iterate(struct varray *va, void (*f)(void *p, void *extra), void *extra) {
int i;
for (i=0; i<va->c; i++)
f(va->a[i], extra);
}
#ifndef TOKU_VARRAY_H
#define TOKU_VARRAY_H
// Variable sized arrays of pointers (like an STL vector<void *>)
struct varray;
// Allocate and initialize an array
// Effect: a new varray is allocated and its initial size is set to n
// Returns: 0 if success and *vap points to the new varray
int varray_create(struct varray **vap, int n);
// Returns: the current size of the array
int varray_current_size(struct varray *va);
// Free an array
// Effect: the varray at *vap is freed
void varray_destroy(struct varray **vap);
// Append an element to the end of the array
// Effect: The size of the array is 1 larger than before and the last
// element is the new element
// Returns: 0 if success
int varray_append(struct varray *va, void *p);
// Apply a function to all of the elements in the array
// Effect: The function f is called for every element in the array, with
// p set to the element and with an extra argument
void varray_iterate(struct varray *va, void (*f)(void *p, void *extra), void *extra);
#endif
......@@ -277,7 +277,7 @@ endif
%.recover: %.tdb$(BINSUF) $(PTHREAD_LOCAL)
echo doing ./$< &&\
$(VGRIND) ./$< && \
$(VGRIND) ./$< --no-shutdown && \
rm -rf dir.$*.c.tdb.recover && \
mkdir dir.$*.c.tdb.recover && \
echo doing recovery &&\
......
......@@ -34,7 +34,9 @@ static void test (void) {
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
// dont close the env. we want recovery to run over the entire log and rebuild the database
// r=env->close(env, 0); assert(r==0);
unlink(ENVDIR "/foo.db");
......
......@@ -44,7 +44,9 @@ static void test (void) {
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
// dont close the env. we want recovery to run over the entire log and rebuild the database
// r=env->close(env, 0); assert(r==0);
unlink(ENVDIR "/foo.db");
......
......@@ -53,7 +53,9 @@ static void test (void) {
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
// dont close the env. we want recovery to run over the entire log and rebuild the database
// r=env->close(env, 0); assert(r==0);
unlink(ENVDIR "/foo.db");
......
......@@ -26,7 +26,7 @@ struct in_db {
int maxcount = 10000;
static void insert_some (int outeri) {
static void insert_some (int outeri, BOOL close_env) {
u_int32_t create_flag = outeri%2 ? DB_CREATE : 0; // Sometimes use DB_CREATE, sometimes don't.
int r;
DB_ENV *env;
......@@ -68,10 +68,12 @@ static void insert_some (int outeri) {
}
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
if (close_env) {
r=env->close(env, 0); assert(r==0);
}
}
static void make_db (void) {
static void make_db (BOOL close_env) {
DB_ENV *env;
DB *db;
DB_TXN *tid;
......@@ -88,10 +90,12 @@ static void make_db (void) {
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); CKERR(r);
r=env->close(env, 0); CKERR(r);
if (close_env) {
r=env->close(env, 0); CKERR(r);
}
for (i=0; i<10; i++)
insert_some(i);
insert_some(i, close_env);
while (items) {
struct in_db *next=items->next;
......@@ -101,7 +105,12 @@ static void make_db (void) {
}
int
test_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
make_db();
test_main (int argc, char *argv[]) {
BOOL close_env = TRUE;
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "--no-shutdown") == 0)
close_env = FALSE;
}
make_db(close_env);
return 0;
}
......@@ -14,7 +14,7 @@
// ENVDIR is defined in the Makefile
static void make_db (void) {
static void make_db (BOOL close_env) {
DB_ENV *env;
DB *db;
DB_TXN *tid;
......@@ -30,11 +30,18 @@ static void make_db (void) {
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
if (close_env) {
r=env->close(env, 0); assert(r==0);
}
}
int
test_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
make_db();
test_main (int argc, char *argv[]) {
BOOL close_env = TRUE;
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "--no-shutdown") == 0)
close_env = FALSE;
}
make_db(close_env);
return 0;
}
......@@ -15,7 +15,7 @@
// ENVDIR is defined in the Makefile
static void make_db (void) {
static void make_db (BOOL close_env) {
DB_ENV *env;
DB *db;
DB_TXN *tid;
......@@ -40,11 +40,18 @@ static void make_db (void) {
}
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
if (close_env) {
r=env->close(env, 0); assert(r==0);
}
}
int
test_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
make_db();
test_main (int argc, char *argv[]) {
BOOL close_env = TRUE;
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "--no-shutdown") == 0)
close_env = FALSE;
}
make_db(close_env);
return 0;
}
......@@ -15,7 +15,7 @@
// ENVDIR is defined in the Makefile
static void make_db (void) {
static void make_db (BOOL close_env) {
DB_ENV *env;
DB *db;
DB_TXN *tid;
......@@ -46,11 +46,18 @@ static void make_db (void) {
}
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
if (close_env) {
r=env->close(env, 0); assert(r==0);
}
}
int
test_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
make_db();
test_main (int argc, char *argv[]) {
BOOL close_env = TRUE;
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "--no-shutdown") == 0)
close_env = FALSE;
}
make_db(close_env);
return 0;
}
......@@ -22,7 +22,7 @@ struct in_db {
struct in_db *next;
} *items=0;
static void make_db (void) {
static void make_db (BOOL close_env) {
DB_ENV *env;
DB *db;
DB_TXN *tid;
......@@ -60,7 +60,9 @@ static void make_db (void) {
}
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
if (close_env) {
r=env->close(env, 0); assert(r==0);
}
while (items) {
struct in_db *next=items->next;
toku_free(items);
......@@ -69,7 +71,12 @@ static void make_db (void) {
}
int
test_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
make_db();
test_main (int argc, char *argv[]) {
BOOL close_env = TRUE;
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "--no-shutdown") == 0)
close_env = FALSE;
}
make_db(close_env);
return 0;
}
......@@ -69,7 +69,7 @@ static void del_n (DB *db, DB_TXN *tid, int i) {
}
}
static void make_db (void) {
static void make_db (BOOL close_env) {
DB_ENV *env;
DB *db;
DB_TXN *tid;
......@@ -94,7 +94,9 @@ static void make_db (void) {
}
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
if (close_env) {
r=env->close(env, 0); assert(r==0);
}
while (items) {
struct in_db *next=items->next;
toku_free(items);
......@@ -109,7 +111,12 @@ static void make_db (void) {
}
int
test_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
make_db();
test_main (int argc, char *argv[]) {
BOOL close_env = TRUE;
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "--no-shutdown") == 0)
close_env = FALSE;
}
make_db(close_env);
return 0;
}
......@@ -23,7 +23,7 @@ struct in_db {
struct in_db *next;
} *items=0;
static void make_db (void) {
static void make_db (BOOL close_env) {
DB_ENV *env;
DB *db;
DB_TXN *tid;
......@@ -70,7 +70,9 @@ static void make_db (void) {
}
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
if (close_env) {
r=env->close(env, 0); assert(r==0);
}
while (items) {
struct in_db *next=items->next;
toku_free(items);
......@@ -79,7 +81,12 @@ static void make_db (void) {
}
int
test_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
make_db();
test_main (int argc, char *argv[]) {
BOOL close_env = TRUE;
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "--no-shutdown") == 0)
close_env = FALSE;
}
make_db(close_env);
return 0;
}
......@@ -25,7 +25,7 @@ struct in_db {
int maxcount = 10;
static void insert_some (int outeri) {
static void insert_some (int outeri, BOOL close_env) {
u_int32_t create_flag = outeri%2 ? DB_CREATE : 0; // Sometimes use DB_CREATE, sometimes don't.
int r;
DB_ENV *env;
......@@ -60,10 +60,12 @@ static void insert_some (int outeri) {
}
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
if (close_env) {
r=env->close(env, 0); assert(r==0);
}
}
static void make_db (void) {
static void make_db (BOOL close_env) {
DB_ENV *env;
DB *db;
DB_TXN *tid;
......@@ -80,10 +82,12 @@ static void make_db (void) {
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); CKERR(r);
r=env->close(env, 0); CKERR(r);
if (close_env) {
r=env->close(env, 0); CKERR(r);
}
for (i=0; i<1; i++)
insert_some(i);
insert_some(i, close_env);
while (items) {
struct in_db *next=items->next;
......@@ -93,7 +97,12 @@ static void make_db (void) {
}
int
test_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
make_db();
test_main (int argc, char *argv[]) {
BOOL close_env = TRUE;
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "--no-shutdown") == 0)
close_env = FALSE;
}
make_db(close_env);
return 0;
}
......@@ -25,7 +25,7 @@ struct in_db {
int maxcount = 10;
static void insert_some (int outeri) {
static void insert_some (int outeri, BOOL close_env) {
u_int32_t create_flag = outeri%2 ? DB_CREATE : 0; // Sometimes use DB_CREATE, sometimes don't.
int r;
DB_ENV *env;
......@@ -60,10 +60,12 @@ static void insert_some (int outeri) {
}
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
if (close_env) {
r=env->close(env, 0); assert(r==0);
}
}
static void make_db (void) {
static void make_db (BOOL close_env) {
DB_ENV *env;
DB *db;
DB_TXN *tid;
......@@ -80,10 +82,12 @@ static void make_db (void) {
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); CKERR(r);
r=env->close(env, 0); CKERR(r);
if (close_env) {
r=env->close(env, 0); CKERR(r);
}
for (i=0; i<2; i++)
insert_some(i);
insert_some(i, close_env);
while (items) {
struct in_db *next=items->next;
......@@ -93,7 +97,12 @@ static void make_db (void) {
}
int
test_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
make_db();
test_main (int argc, char *argv[]) {
BOOL close_env = TRUE;
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "--no-shutdown") == 0)
close_env = FALSE;
}
make_db(close_env);
return 0;
}
......@@ -21,8 +21,7 @@ char *nameb="b.db";
static void
do_x1_shutdown (BOOL do_commit, BOOL do_abort)
{
do_x1_shutdown (BOOL do_commit, BOOL do_abort) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
......@@ -48,14 +47,17 @@ do_x1_shutdown (BOOL do_commit, BOOL do_abort)
r = txn->commit(txn, 0); CKERR(r);
} else if (do_abort) {
r = txn->abort(txn); CKERR(r);
// force an fsync of the log
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
//printf("shutdown\n");
abort();
}
static void
do_x1_recover (BOOL did_commit)
{
do_x1_recover (BOOL did_commit) {
DB_ENV *env;
DB *dba, *dbb;
int r;
......@@ -107,6 +109,17 @@ do_x1_recover (BOOL did_commit)
exit(0);
}
static void
do_x1_recover_only (void) {
DB_ENV *env;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
exit(0);
}
const char *cmd;
static void
......@@ -149,7 +162,7 @@ do_test (void) {
do_test_internal(FALSE);
}
BOOL do_commit=FALSE, do_abort=FALSE, do_explicit_abort=FALSE, do_recover_committed=FALSE, do_recover_aborted=FALSE;
BOOL do_commit=FALSE, do_abort=FALSE, do_explicit_abort=FALSE, do_recover_committed=FALSE, do_recover_aborted=FALSE, do_recover_only=FALSE;
static void
x1_parse_args (int argc, char *argv[]) {
......@@ -162,16 +175,18 @@ x1_parse_args (int argc, char *argv[]) {
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0],"--commit")==0) {
} else if (strcmp(argv[0], "--commit")==0) {
do_commit=1;
} else if (strcmp(argv[0],"--abort")==0) {
} else if (strcmp(argv[0], "--abort")==0) {
do_abort=1;
} else if (strcmp(argv[0],"--explicit-abort")==0) {
} else if (strcmp(argv[0], "--explicit-abort")==0) {
do_explicit_abort=1;
} else if (strcmp(argv[0],"--recover-committed")==0) {
} else if (strcmp(argv[0], "--recover-committed")==0) {
do_recover_committed=1;
} else if (strcmp(argv[0],"--recover-aborted")==0) {
} else if (strcmp(argv[0], "--recover-aborted")==0) {
do_recover_aborted=1;
} else if (strcmp(argv[0], "--recover-only") == 0) {
do_recover_only=1;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
......@@ -189,9 +204,10 @@ x1_parse_args (int argc, char *argv[]) {
int n_specified=0;
if (do_commit) n_specified++;
if (do_abort) n_specified++;
if (do_explicit_abort) n_specified++;
if (do_recover_committed) n_specified++;
if (do_explicit_abort) n_specified++;
if (do_recover_committed) n_specified++;
if (do_recover_aborted) n_specified++;
if (do_recover_only) n_specified++;
if (n_specified>1) {
printf("Specify only one of --commit or --abort or --recover-committed or --recover-aborted\n");
resultcode=1;
......@@ -214,6 +230,8 @@ test_main (int argc, char *argv[])
do_x1_recover(TRUE);
} else if (do_recover_aborted) {
do_x1_recover(FALSE);
} else if (do_recover_only) {
do_x1_recover_only();
} else {
do_test();
}
......
......@@ -458,6 +458,8 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) {
}
}
if (env->i->logger) {
r1=toku_logger_shutdown(env->i->logger);
// TODO: check return
r1=toku_logger_close(&env->i->logger);
if (r0==0 && r1) {
toku_ydb_do_error(env, r0, "Cannot close environment (logger close error)\n");
......
......@@ -12,7 +12,7 @@ DEPEND_COMPILE += \
HERE = utils
include $(TOKUROOT)toku_include/Makefile.include
ifndef BDBDIR
BDBDIR=/usr/local/BerkeleyDB.4.4
BDBDIR=/usr/local/BerkeleyDB.4.6
endif
BDB_DUMP=$(BDBDIR)/bin/db_dump$(BINSUF)
BDB_LOAD=$(BDBDIR)/bin/db_load$(BINSUF)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment