Commit 4e76c1b7 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Add len before each log entry. Closes #23.

Also gets rid of most of the manual log code.  The remaining code is tough to excise, so I'll call this closed.  Closes #32.


git-svn-id: file:///svn/tokudb@735 c7de825b-a66e-492c-adef-691d508d4ae1
parent e2ffd764
...@@ -285,50 +285,18 @@ int tokulogger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF oldd ...@@ -285,50 +285,18 @@ int tokulogger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF oldd
} }
int tokulogger_log_fcreate (TOKUTXN txn, const char *fname, int mode) { int tokulogger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
if (txn==0) return 0; BYTESTRING bs;
const int fnamelen = strlen(fname); bs.len = strlen(fname);
const int buflen = (+1 // log command bs.data = (char*)fname;
+8 // lsn return toku_log_fcreate (txn, toku_txn_get_txnid(txn), bs, mode);
+8 // txnid
+4 // length of fname
+fnamelen
+4 // mode
+8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init (&wbuf, buf, buflen);
wbuf_char (&wbuf, LT_FCREATE);
wbuf_LSN (&wbuf, txn->logger->lsn);
txn->logger->lsn.lsn++;
wbuf_TXNID(&wbuf, txn->txnid64);
wbuf_bytes(&wbuf, fname, fnamelen);
wbuf_int (&wbuf, mode);
return tokulogger_finish(txn->logger, &wbuf);
} }
/* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */ /* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */
int tokulogger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) { int tokulogger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
if (txn==0) return 0; BYTESTRING bs;
const int fnamelen = strlen(fname); bs.len = strlen(fname);
const int buflen = (+1 // log command bs.data = (char*)fname;
+8 // lsn return toku_log_fopen (txn,toku_txn_get_txnid(txn), bs, filenum);
+8 // txnid
+4 // length of fname
+fnamelen
+4 // filenum len
+8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init (&wbuf, buf, buflen);
wbuf_char (&wbuf, LT_FOPEN);
wbuf_LSN (&wbuf, txn->logger->lsn);
txn->logger->lsn.lsn++;
wbuf_TXNID(&wbuf, txn->txnid64);
wbuf_bytes(&wbuf, fname, fnamelen);
wbuf_FILENUM(&wbuf, filenum);
return tokulogger_finish(txn->logger, &wbuf);
} }
...@@ -375,7 +343,8 @@ int tokulogger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) { ...@@ -375,7 +343,8 @@ int tokulogger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
#else #else
if (txn==0) return 0; if (txn==0) return 0;
int subsize=toku_serialize_brt_header_size(h); int subsize=toku_serialize_brt_header_size(h);
int buflen = (1+ int buflen = (4 // firstlen
+ 1 //cmd
+ 8 // lsn + 8 // lsn
+ 8 // txnid + 8 // txnid
+ 4 // filenum + 4 // filenum
...@@ -387,6 +356,7 @@ int tokulogger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) { ...@@ -387,6 +356,7 @@ int tokulogger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
if (buf==0) return errno; if (buf==0) return errno;
struct wbuf wbuf; struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen); wbuf_init(&wbuf, buf, buflen);
wbuf_int (&wbuf, buflen);
wbuf_char(&wbuf, LT_FHEADER); wbuf_char(&wbuf, LT_FHEADER);
wbuf_LSN (&wbuf, txn->logger->lsn); wbuf_LSN (&wbuf, txn->logger->lsn);
txn->logger->lsn.lsn++; txn->logger->lsn.lsn++;
...@@ -556,6 +526,7 @@ int toku_logprint_BYTESTRING (FILE *outf, FILE *inf, const char *fieldname, u_in ...@@ -556,6 +526,7 @@ int toku_logprint_BYTESTRING (FILE *outf, FILE *inf, const char *fieldname, u_in
else fprintf(outf, "\\0%03o", bs.data[i]); else fprintf(outf, "\\0%03o", bs.data[i]);
} }
} }
fprintf(outf, "\"");
toku_free(bs.data); toku_free(bs.data);
return 0; return 0;
} }
......
...@@ -77,11 +77,19 @@ void transcribe_filenum(void) { ...@@ -77,11 +77,19 @@ void transcribe_filenum(void) {
printf(" filenum=%d", value); printf(" filenum=%d", value);
} }
u_int32_t len1;
void transcribe_len1 (void) {
len1 = get_uint32();
//printf(" len=%d", len1);
}
void transcribe_len (void) { void transcribe_len (void) {
u_int32_t l = get_uint32(); u_int32_t l = get_uint32();
printf(" len=%d", l); printf(" len=%d", l);
if (l!=actual_len) printf(" actual_len=%d", actual_len); if (l!=actual_len) printf(" actual_len=%d", actual_len);
assert(l==actual_len); assert(l==actual_len);
assert(len1==actual_len);
} }
...@@ -139,12 +147,15 @@ static void oldmain (int count) { ...@@ -139,12 +147,15 @@ static void oldmain (int count) {
u_int32_t version; u_int32_t version;
int r = read_and_print_logmagic(stdin, &version); int r = read_and_print_logmagic(stdin, &version);
assert(r==0); assert(r==0);
for (i=0; for (i=0; i!=count; i++) {
i!=count && (crc=0,actual_len=0,cmd=get_char())!=EOF; actual_len=0;
i++) { crc=0;
transcribe_len1();
if ((cmd=get_char())==EOF) break;
switch ((enum lt_command)cmd) { switch ((enum lt_command)cmd) {
case LT_INSERT_WITH_NO_OVERWRITE: case LT_INSERT_WITH_NO_OVERWRITE:
printf("INSERT_WITH_NO_OVERWRITE:"); printf("INSERT_WITH_NO_OVERWRITE:");
transcribe_len1();
transcribe_lsn(); transcribe_lsn();
transcribe_txnid(); transcribe_txnid();
transcribe_fileid(); transcribe_fileid();
......
...@@ -131,7 +131,8 @@ void generate_log_writer (void) { ...@@ -131,7 +131,8 @@ void generate_log_writer (void) {
fprintf(hf, ");\n"); fprintf(hf, ");\n");
fprintf(cf, ") {\n"); fprintf(cf, ") {\n");
fprintf(cf, " if (txn==0) return 0;\n"); fprintf(cf, " if (txn==0) return 0;\n");
fprintf(cf, " const unsigned int buflen= (1 // log command\n"); fprintf(cf, " const unsigned int buflen= (+4 // len at the beginning\n");
fprintf(cf, " +1 // log command\n");
fprintf(cf, " +8 // lsn\n"); fprintf(cf, " +8 // lsn\n");
DO_FIELDS(ft, lt, DO_FIELDS(ft, lt,
fprintf(cf, " +toku_logsizeof_%s(%s)\n", ft->type, ft->name)); fprintf(cf, " +toku_logsizeof_%s(%s)\n", ft->type, ft->name));
...@@ -141,6 +142,7 @@ void generate_log_writer (void) { ...@@ -141,6 +142,7 @@ void generate_log_writer (void) {
fprintf(cf, " char *buf = toku_malloc(buflen);\n"); fprintf(cf, " char *buf = toku_malloc(buflen);\n");
fprintf(cf, " if (buf==0) return errno;\n"); fprintf(cf, " if (buf==0) return errno;\n");
fprintf(cf, " wbuf_init(&wbuf, buf, buflen);\n"); fprintf(cf, " wbuf_init(&wbuf, buf, buflen);\n");
fprintf(cf, " wbuf_int(&wbuf, buflen);\n");
fprintf(cf, " wbuf_char(&wbuf, '%c');\n", lt->command); fprintf(cf, " wbuf_char(&wbuf, '%c');\n", lt->command);
fprintf(cf, " wbuf_LSN(&wbuf, txn->logger->lsn);\n"); fprintf(cf, " wbuf_LSN(&wbuf, txn->logger->lsn);\n");
fprintf(cf, " txn->last_lsn = txn->logger->lsn;\n"); fprintf(cf, " txn->last_lsn = txn->logger->lsn;\n");
...@@ -165,12 +167,11 @@ void generate_log_writer (void) { ...@@ -165,12 +167,11 @@ void generate_log_writer (void) {
void generate_log_reader (void) { void generate_log_reader (void) {
DO_LOGTYPES(lt, ({ DO_LOGTYPES(lt, ({
fprintf2(cf, hf, "int tokulog_fread_%s (FILE *infile, struct logtype_%s *data, char cmd)", lt->name, lt->name); fprintf(cf, "static int tokulog_fread_%s (FILE *infile, struct logtype_%s *data, u_int32_t crc)", lt->name, lt->name);
fprintf(hf, ";\n"); fprintf(hf, ";\n");
fprintf(cf, " {\n"); fprintf(cf, " {\n");
fprintf(cf, " int r=0;\n"); fprintf(cf, " int r=0;\n");
fprintf(cf, " u_int32_t actual_len=1;\n"); fprintf(cf, " u_int32_t actual_len=5; // 1 for the command, 4 for the first len.\n");
fprintf(cf, " u_int32_t crc = toku_crc32(0, &cmd, 1);\n");
fprintf(cf, " r=toku_fread_%-16s(infile, &data->%-16s, &crc, &actual_len); if (r!=0) return r;\n", "LSN", "lsn"); fprintf(cf, " r=toku_fread_%-16s(infile, &data->%-16s, &crc, &actual_len); if (r!=0) return r;\n", "LSN", "lsn");
DO_FIELDS(ft, lt, DO_FIELDS(ft, lt,
fprintf(cf, " r=toku_fread_%-16s(infile, &data->%-16s, &crc, &actual_len); if (r!=0) return r;\n", ft->type, ft->name)); fprintf(cf, " r=toku_fread_%-16s(infile, &data->%-16s, &crc, &actual_len); if (r!=0) return r;\n", ft->type, ft->name));
...@@ -184,13 +185,18 @@ void generate_log_reader (void) { ...@@ -184,13 +185,18 @@ void generate_log_reader (void) {
fprintf2(cf, hf, "int tokulog_fread (FILE *infile, struct log_entry *le)"); fprintf2(cf, hf, "int tokulog_fread (FILE *infile, struct log_entry *le)");
fprintf(hf, ";\n"); fprintf(hf, ";\n");
fprintf(cf, " {\n"); fprintf(cf, " {\n");
fprintf(cf, " u_int32_t len1; int r;\n");
fprintf(cf, " u_int32_t crc=0,ignorelen=0;\n");
fprintf(cf, " r = toku_fread_u_int32_t(infile, &len1,&crc,&ignorelen); if (r!=0) return r;\n");
fprintf(cf, " int cmd=fgetc(infile);\n"); fprintf(cf, " int cmd=fgetc(infile);\n");
fprintf(cf, " if (cmd==EOF) return EOF;\n"); fprintf(cf, " if (cmd==EOF) return EOF;\n");
fprintf(cf, " char cmdchar = cmd;\n");
fprintf(cf, " crc = toku_crc32(crc, &cmdchar, 1);\n");
fprintf(cf, " le->cmd=cmd;\n"); fprintf(cf, " le->cmd=cmd;\n");
fprintf(cf, " switch ((enum lt_cmd)cmd) {\n"); fprintf(cf, " switch ((enum lt_cmd)cmd) {\n");
DO_LOGTYPES(lt, ({ DO_LOGTYPES(lt, ({
fprintf(cf, " case LT_%s:\n", lt->name); fprintf(cf, " case LT_%s:\n", lt->name);
fprintf(cf, " return tokulog_fread_%s (infile, &le->u.%s, cmd);\n", lt->name, lt->name); fprintf(cf, " return tokulog_fread_%s (infile, &le->u.%s, crc);\n", lt->name, lt->name);
})); }));
fprintf(cf, " };\n"); fprintf(cf, " };\n");
fprintf(cf, " return DB_BADFORMAT;\n"); // Should read past the record using the len field. fprintf(cf, " return DB_BADFORMAT;\n"); // Should read past the record using the len field.
...@@ -202,11 +208,15 @@ void generate_logprint (void) { ...@@ -202,11 +208,15 @@ void generate_logprint (void) {
fprintf(hf, ";\n"); fprintf(hf, ";\n");
fprintf(cf, " {\n"); fprintf(cf, " {\n");
fprintf(cf, " int cmd, r;\n"); fprintf(cf, " int cmd, r;\n");
fprintf(cf, " u_int32_t len1, crc_in_file;\n");
fprintf(cf, " u_int32_t crc = 0, ignorelen=0;\n");
fprintf(cf, " r=toku_fread_u_int32_t(f, &len1, &crc, &ignorelen);\n");
fprintf(cf, " if (r==EOF) return EOF;\n");
fprintf(cf, " cmd=fgetc(f);\n"); fprintf(cf, " cmd=fgetc(f);\n");
fprintf(cf, " if (cmd==EOF) return EOF;\n"); fprintf(cf, " if (cmd==EOF) return DB_BADFORMAT;\n");
fprintf(cf, " u_int32_t len_in_file, len=1;\n"); fprintf(cf, " u_int32_t len_in_file, len=1+4; // cmd + len1\n");
fprintf(cf, " char charcmd = cmd;\n"); fprintf(cf, " char charcmd = cmd;\n");
fprintf(cf, " u_int32_t crc_in_file, crc = toku_crc32(0, &charcmd, 1);\n"); fprintf(cf, " crc = toku_crc32(crc, &charcmd, 1);\n");
fprintf(cf, " switch ((enum lt_cmd)cmd) {\n"); fprintf(cf, " switch ((enum lt_cmd)cmd) {\n");
DO_LOGTYPES(lt, ({ DO_LOGTYPES(lt, ({
fprintf(cf, " case LT_%s: \n", lt->name); fprintf(cf, " case LT_%s: \n", lt->name);
...@@ -216,8 +226,8 @@ void generate_logprint (void) { ...@@ -216,8 +226,8 @@ void generate_logprint (void) {
DO_FIELDS(ft, lt, DO_FIELDS(ft, lt,
fprintf(cf, " r = toku_logprint_%-16s(outf, f, \"%s\", &crc, &len); if (r!=0) return r;\n", ft->type, ft->name)); fprintf(cf, " r = toku_logprint_%-16s(outf, f, \"%s\", &crc, &len); if (r!=0) return r;\n", ft->type, ft->name));
fprintf(cf, " r = toku_fread_u_int32_t_nocrclen (f, &crc_in_file); len+=4; if (r!=0) return r;\n"); fprintf(cf, " r = toku_fread_u_int32_t_nocrclen (f, &crc_in_file); len+=4; if (r!=0) return r;\n");
fprintf(cf, " fprintf(outf, \" crc=%%d\", crc_in_file);\n"); fprintf(cf, " fprintf(outf, \" crc=%%08x\", crc_in_file);\n");
fprintf(cf, " if (crc_in_file!=crc) fprintf(outf, \" actual_crc=%%d\", crc);\n"); fprintf(cf, " if (crc_in_file!=crc) fprintf(outf, \" actual_crc=%%08x\", crc);\n");
fprintf(cf, " r = toku_fread_u_int32_t_nocrclen (f, &len_in_file); len+=4; if (r!=0) return r;\n"); fprintf(cf, " r = toku_fread_u_int32_t_nocrclen (f, &len_in_file); len+=4; if (r!=0) return r;\n");
fprintf(cf, " fprintf(outf, \" len=%%d\", len_in_file);\n"); fprintf(cf, " fprintf(outf, \" len=%%d\", len_in_file);\n");
fprintf(cf, " if (len_in_file!=len) fprintf(outf, \" actual_len=%%d\", len);\n"); fprintf(cf, " if (len_in_file!=len) fprintf(outf, \" actual_len=%%d\", len);\n");
......
...@@ -62,7 +62,11 @@ static void toku_recover_fheader (struct logtype_fheader *c) { ...@@ -62,7 +62,11 @@ static void toku_recover_fheader (struct logtype_fheader *c) {
h->freelist = c->header.freelist; h->freelist = c->header.freelist;
h->unused_memory = c->header.unused_memory; h->unused_memory = c->header.unused_memory;
h->n_named_roots = c->header.n_named_roots; h->n_named_roots = c->header.n_named_roots;
h->unnamed_root = c->header.root; if ((signed)c->header.n_named_roots==-1) {
h->unnamed_root = c->header.u.one.root;
} else {
assert(0);
}
toku_cachetable_put(cf, 0, h, 0, brtheader_flush_callback, brtheader_fetch_callback, 0); toku_cachetable_put(cf, 0, h, 0, brtheader_flush_callback, brtheader_fetch_callback, 0);
} }
...@@ -73,7 +77,7 @@ static void toku_recover_newbrtnode (struct logtype_newbrtnode *c) { ...@@ -73,7 +77,7 @@ static void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
TAGMALLOC(BRTNODE, n); TAGMALLOC(BRTNODE, n);
n->nodesize = c->nodesize; n->nodesize = c->nodesize;
n->thisnodename = c->diskoff; n->thisnodename = c->diskoff;
n->lsn = c->lsn; n->log_lsn = n->disk_lsn = c->lsn;
n->layout_version = 0; n->layout_version = 0;
n->parent_brtnode = 0; n->parent_brtnode = 0;
n->height = c->height; n->height = c->height;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment