Commit 8de073d6 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

The recovered file is exactly the same as the original file for test_log2.tdb.

(The wrong LSN was being saved in the file.  The recovered version was right.)
Addresses #27.
(Also, it was broken, and that appears to be fixed.)


git-svn-id: file:///svn/tokudb@730 c7de825b-a66e-492c-adef-691d508d4ae1
parent 29502b52
...@@ -94,7 +94,7 @@ brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o he ...@@ -94,7 +94,7 @@ brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o he
log.o: log_header.h log-internal.h log.h wbuf.h crc.h brttypes.h $(BRT_INTERNAL_H_INCLUDES) log.o: log_header.h log-internal.h log.h wbuf.h crc.h brttypes.h $(BRT_INTERNAL_H_INCLUDES)
brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h
brt-serialize-test.o: $(BRT_INTERNAL_H_INCLUDES) brt-serialize-test.o: $(BRT_INTERNAL_H_INCLUDES)
brt.o: $(BRT_INTERNAL_H_INCLUDES) brt.o: $(BRT_INTERNAL_H_INCLUDES) key.h log_header.h
mdict.o: pma.h mdict.o: pma.h
hashtable.o: hashtable.h brttypes.h memory.h key.h yerror.h ../include/db.h hashfun.h hashtable.o: hashtable.h brttypes.h memory.h key.h yerror.h ../include/db.h hashfun.h
memory.o: memory.h memory.o: memory.h
......
...@@ -28,7 +28,8 @@ struct brtnode { ...@@ -28,7 +28,8 @@ struct brtnode {
// BRT brt; // The containing BRT // BRT brt; // The containing BRT
unsigned int nodesize; unsigned int nodesize;
DISKOFF thisnodename; // The size of the node allocated on disk. Not all is necessarily in use. DISKOFF thisnodename; // The size of the node allocated on disk. Not all is necessarily in use.
LSN lsn; // Need the LSN as of the most recent modification. LSN disk_lsn; // The LSN as of the most recent version on disk.
LSN log_lsn; // The LSN as of the most recent log write.
int layout_version; // What version of the data structure? int layout_version; // What version of the data structure?
BRTNODE parent_brtnode; /* Invariant: The parent of an in-memory node must be in main memory. This is so we can find and update the down pointer when we change the diskoff of a node. */ BRTNODE parent_brtnode; /* Invariant: The parent of an in-memory node must be in main memory. This is so we can find and update the down pointer when we change the diskoff of a node. */
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */ int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
......
...@@ -20,7 +20,8 @@ void test_serialize(void) { ...@@ -20,7 +20,8 @@ void test_serialize(void) {
char *hello_string; char *hello_string;
sn.nodesize = nodesize; sn.nodesize = nodesize;
sn.thisnodename = sn.nodesize*20; sn.thisnodename = sn.nodesize*20;
sn.lsn.lsn = 123456; sn.disk_lsn.lsn = 789;
sn.log_lsn.lsn = 123456;
sn.layout_version = 0; sn.layout_version = 0;
sn.height = 1; sn.height = 1;
sn.rand4fingerprint = randval; sn.rand4fingerprint = randval;
...@@ -47,7 +48,7 @@ void test_serialize(void) { ...@@ -47,7 +48,7 @@ void test_serialize(void) {
assert(r==0); assert(r==0);
assert(dn->thisnodename==nodesize*20); assert(dn->thisnodename==nodesize*20);
assert(dn->lsn.lsn==123456); assert(dn->disk_lsn.lsn==123456);
assert(dn->layout_version ==0); assert(dn->layout_version ==0);
assert(dn->height == 1); assert(dn->height == 1);
assert(dn->rand4fingerprint==randval); assert(dn->rand4fingerprint==randval);
......
...@@ -99,7 +99,8 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node) ...@@ -99,7 +99,8 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node)
if (node->height==0) wbuf_literal_bytes(&w, "leaf", 4); if (node->height==0) wbuf_literal_bytes(&w, "leaf", 4);
else wbuf_literal_bytes(&w, "node", 4); else wbuf_literal_bytes(&w, "node", 4);
wbuf_int(&w, node->layout_version); wbuf_int(&w, node->layout_version);
wbuf_ulonglong(&w, node->lsn.lsn); fprintf(stderr, "%s:%d lsn at file write = %llu\n", __FILE__, __LINE__, node->log_lsn.lsn);
wbuf_ulonglong(&w, node->log_lsn.lsn);
//printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size); //printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size);
wbuf_int(&w, calculated_size); wbuf_int(&w, calculated_size);
wbuf_int(&w, node->height); wbuf_int(&w, node->height);
...@@ -241,7 +242,8 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl ...@@ -241,7 +242,8 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
r=DB_BADFORMAT; r=DB_BADFORMAT;
goto died1; goto died1;
} }
result->lsn.lsn = rbuf_ulonglong(&rc); result->disk_lsn.lsn = rbuf_ulonglong(&rc);
result->log_lsn = result->disk_lsn;
{ {
unsigned int stored_size = rbuf_int(&rc); unsigned int stored_size = rbuf_int(&rc);
if (stored_size!=datasize) { r=DB_BADFORMAT; goto died1; } if (stored_size!=datasize) { r=DB_BADFORMAT; goto died1; }
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "brt-internal.h" #include "brt-internal.h"
#include "key.h" #include "key.h"
#include "log_header.h"
#include <stdlib.h> #include <stdlib.h>
#include <assert.h> #include <assert.h>
...@@ -135,7 +136,7 @@ static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck, unsigned ...@@ -135,7 +136,7 @@ static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck, unsigned
} }
void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnode_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p __attribute__((__unused__))) { void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnode_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) {
BRTNODE brtnode = brtnode_v; BRTNODE brtnode = brtnode_v;
// if ((write_me || keep_me) && (brtnode->height==0)) { // if ((write_me || keep_me) && (brtnode->height==0)) {
// toku_pma_verify_fingerprint(brtnode->u.l.buffer, brtnode->rand4fingerprint, brtnode->subtree_fingerprint); // toku_pma_verify_fingerprint(brtnode->u.l.buffer, brtnode->rand4fingerprint, brtnode->subtree_fingerprint);
...@@ -145,7 +146,7 @@ void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnod ...@@ -145,7 +146,7 @@ void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnod
if (brtnode->height==0) printf(" pma=%p", brtnode->u.l.buffer); if (brtnode->height==0) printf(" pma=%p", brtnode->u.l.buffer);
printf("\n"); printf("\n");
} }
if (modified_lsn.lsn > brtnode->lsn.lsn) brtnode->lsn=modified_lsn; //if (modified_lsn.lsn > brtnode->lsn.lsn) brtnode->lsn=modified_lsn;
fix_up_parent_pointers_of_children_now_that_parent_is_gone(cachefile, brtnode); fix_up_parent_pointers_of_children_now_that_parent_is_gone(cachefile, brtnode);
assert(brtnode->thisnodename==nodename); assert(brtnode->thisnodename==nodename);
{ {
...@@ -200,7 +201,7 @@ int brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnod ...@@ -200,7 +201,7 @@ int brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnod
t->compare_fun, t->dup_compare); t->compare_fun, t->dup_compare);
if (r == 0) if (r == 0)
*sizep = brtnode_size(*result); *sizep = brtnode_size(*result);
*written_lsn = (*result)->lsn; *written_lsn = (*result)->disk_lsn;
//(*result)->parent_brtnode = 0; /* Don't know it right now. */ //(*result)->parent_brtnode = 0; /* Don't know it right now. */
//printf("%s:%d installed %p (offset=%lld)\n", __FILE__, __LINE__, *result, nodename); //printf("%s:%d installed %p (offset=%lld)\n", __FILE__, __LINE__, *result, nodename);
return r; return r;
...@@ -297,7 +298,8 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height) ...@@ -297,7 +298,8 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
// n->brt = t; // n->brt = t;
n->nodesize = t->h->nodesize; n->nodesize = t->h->nodesize;
n->thisnodename = nodename; n->thisnodename = nodename;
n->lsn.lsn = 0; // a new one can always be 0. n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn;
n->layout_version = 0; n->layout_version = 0;
n->height = height; n->height = height;
n->rand4fingerprint = random(); n->rand4fingerprint = random();
...@@ -1451,7 +1453,11 @@ static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) { ...@@ -1451,7 +1453,11 @@ static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) {
//printf("%s:%d created %lld\n", __FILE__, __LINE__, node->thisnodename); //printf("%s:%d created %lld\n", __FILE__, __LINE__, node->thisnodename);
toku_verify_counts(node); toku_verify_counts(node);
// verify_local_fingerprint_nonleaf(node); // verify_local_fingerprint_nonleaf(node);
tokulogger_log_newbrtnode(txn, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint); toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
if (txn) {
node->log_lsn = toku_txn_get_last_lsn(txn);
fprintf(stderr, "%s:%d last lsn=%llu\n", __FILE__, __LINE__, node->log_lsn.lsn);
}
r=toku_cachetable_unpin(t->cf, node->thisnodename, node->dirty, brtnode_size(node)); r=toku_cachetable_unpin(t->cf, node->thisnodename, node->dirty, brtnode_size(node));
if (r!=0) { if (r!=0) {
toku_free(node); toku_free(node);
......
...@@ -42,7 +42,15 @@ typedef struct loggedbrtheader { ...@@ -42,7 +42,15 @@ typedef struct loggedbrtheader {
DISKOFF freelist; DISKOFF freelist;
DISKOFF unused_memory; DISKOFF unused_memory;
u_int32_t n_named_roots; u_int32_t n_named_roots;
DISKOFF root; union {
struct {
char **names;
DISKOFF *roots;
} many;
struct {
DISKOFF root;
} one;
} u;
} LOGGEDBRTHEADER; } LOGGEDBRTHEADER;
#endif #endif
...@@ -37,6 +37,7 @@ struct tokutxn { ...@@ -37,6 +37,7 @@ struct tokutxn {
u_int64_t txnid64; u_int64_t txnid64;
TOKULOGGER logger; TOKULOGGER logger;
TOKUTXN parent; TOKUTXN parent;
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
}; };
int tokulogger_finish (TOKULOGGER logger, struct wbuf *wbuf); int tokulogger_finish (TOKULOGGER logger, struct wbuf *wbuf);
......
...@@ -57,7 +57,7 @@ int tokulogger_find_logfiles (const char *directory, int *n_resultsp, char ***re ...@@ -57,7 +57,7 @@ int tokulogger_find_logfiles (const char *directory, int *n_resultsp, char ***re
} }
*n_resultsp = n_results; *n_resultsp = n_results;
*resultp = result; *resultp = result;
return 0; return closedir(d);
} }
int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *resultp) { int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *resultp) {
...@@ -350,6 +350,29 @@ int tokulogger_log_unlink (TOKUTXN txn, const char *fname) { ...@@ -350,6 +350,29 @@ int tokulogger_log_unlink (TOKUTXN txn, const char *fname) {
}; };
int tokulogger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) { int tokulogger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
#if 0
LOGGEDBRTHEADER lh;
lh.size = toku_serialize_brt_header_size(h);
lh.flags = h->flags;
lh.nodesize = h->nodesize;
lh.freelist = h->freelist;
lh.unused_memory = h->unused_memory;
lh.n_named_roots = h->n_named_roots;
if (h->n_named_roots==-1) {
lh.u.one.root = h->unnamed_root;
} else {
int i;
MALLOC_N(h->n_named_roots, lh.u.many.names);
MALLOC_N(h->n_named_roots, lh.u.many.roots);
for (i=0; i<h->n_named_roots; i++) {
lh.u.many.names[i]=toku_strdup(h->names[i]);
lh.u.many.roots[i]=h->roots[i];
}
}
r = toku_log_fheader(txn, toku_txn_get_txnid(txn), filenum, lh);
toku_free(all_that_stuff);
return r;
#else
if (txn==0) return 0; if (txn==0) return 0;
int subsize=toku_serialize_brt_header_size(h); int subsize=toku_serialize_brt_header_size(h);
int buflen = (1+ int buflen = (1+
...@@ -374,35 +397,7 @@ int tokulogger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) { ...@@ -374,35 +397,7 @@ int tokulogger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
r=tokulogger_finish(txn->logger, &wbuf); r=tokulogger_finish(txn->logger, &wbuf);
toku_free(buf); toku_free(buf);
return r; return r;
} #endif
int tokulogger_log_newbrtnode (TOKUTXN txn, FILENUM filenum, DISKOFF offset, u_int32_t height, u_int32_t nodesize, char is_dup_sort_mode, u_int32_t rand4fingerprint) {
if (txn==0) return 0;
int buflen=(1+
+ 8 // lsn
+ 8 // txnid
+ 4 // filenum
+ 8 // diskoff
+ 4 // height
+ 4 // nodesize
+ 1 // is_dup_sort_mode
+ 4 // rand4fingerprint
+ 8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init (&wbuf, buf, buflen);
wbuf_char(&wbuf, LT_NEWBRTNODE);
wbuf_LSN (&wbuf, txn->logger->lsn);
txn->logger->lsn.lsn++;
wbuf_TXNID(&wbuf, txn->txnid64);
wbuf_FILENUM(&wbuf, filenum);
wbuf_DISKOFF(&wbuf, offset);
wbuf_int(&wbuf, height);
wbuf_int(&wbuf, nodesize);
wbuf_char(&wbuf, is_dup_sort_mode);
wbuf_int(&wbuf, rand4fingerprint);
return tokulogger_finish(txn->logger, &wbuf);
} }
/* /*
...@@ -506,7 +501,7 @@ int toku_fread_LOGGEDBRTHEADER(FILE *f, LOGGEDBRTHEADER *v, u_int32_t *crc, u_in ...@@ -506,7 +501,7 @@ int toku_fread_LOGGEDBRTHEADER(FILE *f, LOGGEDBRTHEADER *v, u_int32_t *crc, u_in
r = toku_fread_DISKOFF (f, &v->unused_memory, crc, len); if (r!=0) return r; r = toku_fread_DISKOFF (f, &v->unused_memory, crc, len); if (r!=0) return r;
r = toku_fread_u_int32_t(f, &v->n_named_roots, crc, len); if (r!=0) return r; r = toku_fread_u_int32_t(f, &v->n_named_roots, crc, len); if (r!=0) return r;
assert((signed)v->n_named_roots==-1); assert((signed)v->n_named_roots==-1);
r = toku_fread_DISKOFF (f, &v->root, crc, len); if (r!=0) return r; r = toku_fread_DISKOFF (f, &v->u.one.root, crc, len); if (r!=0) return r;
return 0; return 0;
} }
...@@ -606,3 +601,12 @@ int read_and_print_logmagic (FILE *f, u_int32_t *versionp) { ...@@ -606,3 +601,12 @@ int read_and_print_logmagic (FILE *f, u_int32_t *versionp) {
} }
return 0; return 0;
} }
TXNID toku_txn_get_txnid (TOKUTXN txn) {
if (txn==0) return 0;
else return txn->txnid64;
}
LSN toku_txn_get_last_lsn (TOKUTXN txn) {
return txn->last_lsn;
}
...@@ -51,4 +51,7 @@ int toku_logprint_LOGGEDBRTHEADER (FILE *outf, FILE *inf, const char *fieldname, ...@@ -51,4 +51,7 @@ int toku_logprint_LOGGEDBRTHEADER (FILE *outf, FILE *inf, const char *fieldname,
int read_and_print_logmagic (FILE *f, u_int32_t *version); int read_and_print_logmagic (FILE *f, u_int32_t *version);
TXNID toku_txn_get_txnid (TOKUTXN);
LSN toku_txn_get_last_lsn (TOKUTXN);
#endif #endif
...@@ -130,6 +130,7 @@ void generate_log_writer (void) { ...@@ -130,6 +130,7 @@ void generate_log_writer (void) {
fprintf2(cf, hf, ", %s %s", ft->type, ft->name)); fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n"); fprintf(hf, ");\n");
fprintf(cf, ") {\n"); fprintf(cf, ") {\n");
fprintf(cf, " if (txn==0) return 0;\n");
fprintf(cf, " const unsigned int buflen= (1 // log command\n"); fprintf(cf, " const unsigned int buflen= (1 // log command\n");
fprintf(cf, " +8 // lsn\n"); fprintf(cf, " +8 // lsn\n");
DO_FIELDS(ft, lt, DO_FIELDS(ft, lt,
...@@ -142,7 +143,8 @@ void generate_log_writer (void) { ...@@ -142,7 +143,8 @@ void generate_log_writer (void) {
fprintf(cf, " wbuf_init(&wbuf, buf, buflen);\n"); fprintf(cf, " wbuf_init(&wbuf, buf, buflen);\n");
fprintf(cf, " wbuf_char(&wbuf, '%c');\n", lt->command); fprintf(cf, " wbuf_char(&wbuf, '%c');\n", lt->command);
fprintf(cf, " wbuf_LSN(&wbuf, txn->logger->lsn);\n"); fprintf(cf, " wbuf_LSN(&wbuf, txn->logger->lsn);\n");
fprintf(cf, " txn->logger->lsn.lsn++;;\n"); fprintf(cf, " txn->last_lsn = txn->logger->lsn;\n");
fprintf(cf, " txn->logger->lsn.lsn++;\n");
DO_FIELDS(ft, lt, DO_FIELDS(ft, lt,
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name)); fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " int r= tokulogger_finish(txn->logger, &wbuf);\n"); fprintf(cf, " int r= tokulogger_finish(txn->logger, &wbuf);\n");
......
...@@ -49,6 +49,7 @@ static void toku_recover_fcreate (struct logtype_fcreate *c) { ...@@ -49,6 +49,7 @@ static void toku_recover_fcreate (struct logtype_fcreate *c) {
int fd = creat(fname, c->mode); int fd = creat(fname, c->mode);
assert(fd>=0); assert(fd>=0);
toku_free(fname); toku_free(fname);
toku_free(c->fname.data);
} }
static void toku_recover_fheader (struct logtype_fheader *c) { static void toku_recover_fheader (struct logtype_fheader *c) {
CACHEFILE cf = find_cachefile(c->filenum); CACHEFILE cf = find_cachefile(c->filenum);
...@@ -113,6 +114,8 @@ static void toku_recover_fopen (struct logtype_fopen *c) { ...@@ -113,6 +114,8 @@ static void toku_recover_fopen (struct logtype_fopen *c) {
} }
cf_pairs[n_cf_pairs-1].filenum = c->filenum; cf_pairs[n_cf_pairs-1].filenum = c->filenum;
cf_pairs[n_cf_pairs-1].cf = cf; cf_pairs[n_cf_pairs-1].cf = cf;
toku_free(fname);
toku_free(c->fname.data);
} }
int main (int argc, char *argv[]) { int main (int argc, char *argv[]) {
...@@ -152,6 +155,7 @@ int main (int argc, char *argv[]) { ...@@ -152,6 +155,7 @@ int main (int argc, char *argv[]) {
r = toku_cachefile_close(&cf_pairs[i].cf); r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r==0); assert(r==0);
} }
toku_free(cf_pairs);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
assert(r==0); assert(r==0);
for (i=0; i<n_logfiles; i++) { for (i=0; i<n_logfiles; i++) {
......
...@@ -124,8 +124,15 @@ static inline void wbuf_LOGGEDBRTHEADER (struct wbuf *w, LOGGEDBRTHEADER h) { ...@@ -124,8 +124,15 @@ static inline void wbuf_LOGGEDBRTHEADER (struct wbuf *w, LOGGEDBRTHEADER h) {
wbuf_DISKOFF(w, h.freelist); wbuf_DISKOFF(w, h.freelist);
wbuf_DISKOFF(w, h.unused_memory); wbuf_DISKOFF(w, h.unused_memory);
wbuf_int(w, h.n_named_roots); wbuf_int(w, h.n_named_roots);
assert(h.n_named_roots==0); if ((signed)h.n_named_roots==-1) {
wbuf_DISKOFF(w, h.root); wbuf_DISKOFF(w, h.u.one.root);
} else {
unsigned int i;
for (i=0; i<h.n_named_roots; i++) {
wbuf_DISKOFF(w, h.u.many.roots[i]);
wbuf_bytes (w, h.u.many.names[i], 1+strlen(h.u.many.names[i]));
}
}
} }
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment