Commit e4488f68 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Add the first two tests for {{{DB_ENV->log_archive}}}. Fix up some memory...

Add the first two tests for {{{DB_ENV->log_archive}}}.  Fix up some memory leaks.  Add {{{LSN*}}} to all the {{{toku_log_*}}} functions so that {{{toku_logger_txn_begin}}} can use the LSN as a txnid.  Addresses #75, #83, #392.

git-svn-id: file:///svn/tokudb@3004 c7de825b-a66e-492c-adef-691d508d4ae1
parent edb506e7
This diff is collapsed.
......@@ -74,10 +74,11 @@ int toku_logger_find_logfiles (const char *directory, char ***resultp);
struct tokutxn {
enum typ_tag tag;
u_int64_t txnid64;
u_int64_t txnid64; /* this happens to be the first lsn */
TOKULOGGER logger;
TOKUTXN parent;
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
LSN first_lsn; /* The first lsn in the transaction. */
struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link;
};
......
......@@ -344,7 +344,7 @@ int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbu
int toku_logger_commit (TOKUTXN txn, int nosync) {
// panic handled in log_commit
int r = toku_log_commit(txn->logger, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (r!=0) goto free_and_return;
if (txn->parent!=0) {
// Append the list to the front.
......@@ -378,14 +378,16 @@ int toku_logger_log_checkpoint (TOKULOGGER logger) {
if (r!=0) return r;
logger->checkpoint_lsns[1]=logger->checkpoint_lsns[0];
logger->checkpoint_lsns[0]=logger->lsn;
return toku_log_checkpoint(logger, 1);
return toku_log_checkpoint(logger, (LSN*)0, 1);
}
int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) {
int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger) {
if (logger->is_panicked) return EINVAL;
TAGMALLOC(TOKUTXN, result);
if (result==0) return errno;
result->txnid64 = txnid64;
int r =toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
if (r!=0) { toku_logger_panic(logger, r); return r; }
result->txnid64 = result->first_lsn.lsn;
result->logger = logger;
result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0;
......@@ -398,7 +400,7 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs = { .len=strlen(fname), .data = strdup(fname) };
int r = toku_log_fcreate (txn->logger, 0, toku_txn_get_txnid(txn), bs, mode);
int r = toku_log_fcreate (txn->logger, (LSN*)0, 0, toku_txn_get_txnid(txn), bs, mode);
if (r!=0) return r;
r = toku_logger_save_rollback_fcreate(txn, bs);
return r;
......@@ -411,7 +413,7 @@ int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
BYTESTRING bs;
bs.len = strlen(fname);
bs.data = (char*)fname;
return toku_log_fopen (txn->logger, 0, toku_txn_get_txnid(txn), bs, filenum);
return toku_log_fopen (txn->logger, (LSN*)0, 0, toku_txn_get_txnid(txn), bs, filenum);
}
int toku_fread_u_int8_t_nocrclen (FILE *f, u_int8_t *v) {
......@@ -767,20 +769,24 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
for (i=0; i<n_to_archive; i++) {
count_bytes+=1+strlen(all_logs[i]);
}
char **result = toku_malloc((1+n_to_archive)*sizeof(*result) + count_bytes);
char **result;
if (i==0) {
result=0;
} else {
result = toku_malloc((1+n_to_archive)*sizeof(*result) + count_bytes);
char *base = (char*)(result+1+n_to_archive);
for (i=0; i<n_to_archive; i++) {
int len=1+strlen(all_logs[i]);
result[i]=base;
memcpy(base, all_logs[i], len);
base+=len;
free(all_logs[i]);
}
for (; all_logs[i]; i++) {
result[n_to_archive]=0;
}
for (i=0; all_logs[i]; i++) {
free(all_logs[i]);
}
free(all_logs);
result[n_to_archive]=0;
*logs_p = result;
return 0;
}
......@@ -37,7 +37,7 @@ int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF
int toku_logger_commit (TOKUTXN txn, int no_sync);
int toku_logger_txn_begin (TOKUTXN /*parent*/,TOKUTXN *, TXNID /*txnid64*/, TOKULOGGER /*logger*/);
int toku_logger_txn_begin (TOKUTXN /*parent*/,TOKUTXN *, TOKULOGGER /*logger*/);
int toku_logger_log_fcreate (TOKUTXN, const char */*fname*/, int /*mode*/);
......
......@@ -59,6 +59,7 @@ const struct logtype rollbacks[] = {
const struct logtype logtypes[] = {
{"checkpoint", 'x', FA{NULLFIELD}},
{"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}},
{"xbegin", 'b', FA{{"TXNID", "parenttxnid", 0},NULLFIELD}},
#if 0
{"tl_delete", 'D', FA{{"FILENUM", "filenum", 0}, // tl logentries can be used, by themselves, to rebuild the whole DB from scratch.
{"DISKOFF", "diskoff", 0},
......@@ -316,7 +317,7 @@ void generate_log_free(void) {
void generate_log_writer (void) {
DO_LOGTYPES(lt, ({
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger, int do_fsync", lt->name);
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger, LSN *lsnp, int do_fsync", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
......@@ -338,6 +339,7 @@ void generate_log_writer (void) {
fprintf(cf, " LSN lsn = logger->lsn;\n");
fprintf(cf, " wbuf_LSN(&wbuf, lsn);\n");
fprintf(cf, " lbytes->lsn = lsn;\n");
fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n");
fprintf(cf, " logger->lsn.lsn++;\n");
DO_FIELDS(ft, lt,
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name));
......
......@@ -369,7 +369,7 @@ static int pma_log_distribute (TOKULOGGER logger, FILENUM filenum, DISKOFF old_d
}
}
ipa.size=j;
int r=toku_log_pmadistribute(logger, 0, filenum, old_diskoff, new_diskoff, ipa);
int r=toku_log_pmadistribute(logger, (LSN*)0, 0, filenum, old_diskoff, new_diskoff, ipa);
if (logger && oldnode_lsn) *oldnode_lsn = toku_logger_last_lsn(logger);
if (logger && newnode_lsn) *newnode_lsn = toku_logger_last_lsn(logger);
// if (0 && pma) {
......@@ -546,7 +546,7 @@ static int pma_resize_array(TOKULOGGER logger, FILENUM filenum, DISKOFF offset,
unsigned int oldN, n;
int r = pma_resize_array_nolog(pma, asksize, startz, &oldN, &n);
if (r!=0) return r;
toku_log_resizepma (logger, 0, filenum, offset, oldN, n);
toku_log_resizepma (logger, (LSN*)0, 0, filenum, offset, oldN, n);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
return 0;
}
......@@ -734,7 +734,7 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE
{
const BYTESTRING key = { pair->keylen, kv_pair_key(pair) };
const BYTESTRING data = { pair->vallen, kv_pair_val(pair) };
int r = toku_log_insertinleaf (logger, 0, xid, pma->filenum, diskoff, idx, key, data);
int r = toku_log_insertinleaf (logger, (LSN*)0, 0, xid, pma->filenum, diskoff, idx, key, data);
if (r!=0) return r;
if (node_lsn) *node_lsn = toku_logger_last_lsn(logger);
}
......@@ -772,7 +772,7 @@ static int pma_log_delete (PMA pma, const char *key, int keylen, const char *val
{
const BYTESTRING deletedkey = { keylen, (char*)key };
const BYTESTRING deleteddata = { vallen, (char*)val };
int r=toku_log_deleteinleaf(logger, 0, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
int r=toku_log_deleteinleaf(logger, (LSN*)0, 0, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
if (r!=0) return r;
}
if (logger) {
......@@ -945,7 +945,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
{
const BYTESTRING key = { k->size, k->data };
const BYTESTRING data = { v->size, v->data };
r = toku_log_insertinleaf (logger, 0, xid, pma->filenum, diskoff, idx, key, data);
r = toku_log_insertinleaf (logger, (LSN*)0, 0, xid, pma->filenum, diskoff, idx, key, data);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
if (r!=0) return r;
/* We don't record the insert here for rollback. The insert should have been logged at the top-level. */
......@@ -1105,7 +1105,7 @@ int toku_pma_split(TOKULOGGER logger, FILENUM filenum,
{
int r = pma_log_distribute(logger, filenum, diskoff, diskoff, spliti, &pairs[0], lsn, lsn);
if (r!=0) { toku_free(pairs); return r; }
r = toku_log_resizepma(logger, 0, filenum, diskoff, oldn_for_logging, newn_for_logging);
r = toku_log_resizepma(logger, (LSN*)0, 0, filenum, diskoff, oldn_for_logging, newn_for_logging);
if (r!=0) { toku_free(pairs); return r; }
if (logger && lsn) *lsn = toku_logger_last_lsn(logger);
......
......@@ -527,6 +527,10 @@ static int toku_recover_checkpoint (LSN UU(lsn)) {
return 0;
}
static int toku_recover_xbegin (LSN UU(lsn), TXNID UU(parent)) {
return 0;
}
int tokudb_recover(const char *data_dir, const char *log_dir) {
int r;
int entrycount=0;
......
/* Test log archive. */
#include <db.h>
#include <sys/stat.h>
#include "test.h"
int main (int argc, const char *argv[]) {
parse_args(argc, argv);
DB_ENV *env;
DB *db;
DB_TXN *txn;
int r;
system("rm -rf " ENVDIR);
r=mkdir(ENVDIR, 0777); assert(r==0);
r=db_env_create(&env, 0); CKERR(r);
env->set_errfile(env, stderr);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, 0777); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
r=db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, 0777); CKERR(r);
r=txn->commit(txn, 0); CKERR(r);
char **list;
r=env->log_archive(env, &list, 0);
assert(list==0);
r=db->close(db, 0); CKERR(r);
r=env->close(env, 0); CKERR(r);
return 0;
}
/* Test log archive. */
#include <db.h>
#include <sys/stat.h>
#include "test.h"
int main (int argc, const char *argv[]) {
parse_args(argc, argv);
DB_ENV *env;
DB *db;
DB_TXN *txn;
int r;
system("rm -rf " ENVDIR);
r=mkdir(ENVDIR, 0777); assert(r==0);
r=db_env_create(&env, 0); CKERR(r);
env->set_errfile(env, stderr);
r=env->set_lg_max(env, 16000); CKERR(r);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, 0777); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
r=db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, 0777); CKERR(r);
r=txn->commit(txn, 0); CKERR(r);
int i;
for (i=0; i<100; i++) {
DBT key,data;
char hello[30],there[30];
snprintf(hello, sizeof(hello), "hello%d", i);
snprintf(there, sizeof(there), "there%d", i);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
r=db->put(db, txn,
dbt_init(&key, hello, strlen(hello)+1),
dbt_init(&data, there, strlen(there)+1),
0);
r=txn->commit(txn, 0); CKERR(r);
r=env->txn_checkpoint(env, 0, 0, 0);
}
{
char **list;
r=env->log_archive(env, &list, 0);
CKERR(r);
assert(list);
assert(list[0]);
if (verbose) printf("file[0]=%s\n", list[0]);
free(list);
}
r=db->close(db, 0); CKERR(r);
r=env->close(env, 0); CKERR(r);
return 0;
}
......@@ -810,8 +810,6 @@ static u_int32_t toku_txn_id(DB_TXN * txn) {
return -1;
}
static TXNID next_txn = 0;
static int toku_txn_abort(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp);
int r = toku_logger_abort(txn->i->tokutxn);
......@@ -873,7 +871,7 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
}
}
r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, next_txn++, env->i->logger);
r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, env->i->logger);
if (r != 0)
return r;
*txn = result;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment