Commit 00415bd9 authored by Barry Perlman's avatar Barry Perlman Committed by Yoni Fogel

Addresses #1987, #1936 refs[t:1987] refs[t:1936] Insert broadcast commit on...

Addresses #1987, #1936 refs[t:1987] refs[t:1936] Insert broadcast commit on upgrade from version 10 to 11, record original version in new header field.  Also fixed incorrect message type for broadcast commit.

git-svn-id: file:///svn/toku/tokudb@14430 c7de825b-a66e-492c-adef-691d508d4ae1
parent 71a5cdc4
......@@ -399,6 +399,7 @@ upgrade_brtheader_10_11(struct brt_header **brth_10, struct brt_header ** brth_1
*brth_11 = *brth_10;
*brth_10 = NULL;
(*brth_11)->layout_version = BRT_LAYOUT_VERSION_11;
(*brth_11)->layout_version_original = BRT_LAYOUT_VERSION_10;
return 0;
}
......
......@@ -161,6 +161,9 @@ struct brt_header {
int panic; // If nonzero there was a write error. Don't write any more, because it probably only gets worse. This is the error code.
char *panic_string; // A malloced string that can indicate what went wrong.
int layout_version;
int layout_version_original; // different (<) from layout_version if upgraded from a previous version (useful for debugging)
int layout_version_read_from_disk; // transient, not serialized to disk
BOOL upgrade_brt_performed; // initially FALSE, set TRUE when brt has been fully updated (even though nodes may not have been)
unsigned int nodesize;
BLOCKNUM root; // roots of the dictionary
struct remembered_hash root_hash; // hash of the root offset.
......@@ -337,7 +340,7 @@ int toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **error_stri
int toku_brtheader_begin_checkpoint (CACHEFILE cachefile, LSN checkpoint_lsn, void *header_v);
int toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v);
int toku_brtheader_end_checkpoint (CACHEFILE cachefile, void *header_v);
int toku_maybe_upgrade_brt(BRT t);
int toku_db_badformat(void);
#endif
......@@ -1054,6 +1054,38 @@ cleanup:
}
int
toku_maybe_upgrade_brt(BRT t) { // possibly do some work to complete the version upgrade of brt
int r = 0;
int version = t->h->layout_version_read_from_disk;
if (!t->h->upgrade_brt_performed) {
switch (version) {
case BRT_LAYOUT_VERSION_10:
r = toku_brt_broadcast_commit_all(t);
//Fall through on purpose.
case BRT_LAYOUT_VERSION:
if (r == 0) {
t->h->upgrade_brt_performed = TRUE;
}
break;
default:
assert(FALSE);
}
}
if (r) {
if (t->h->panic==0) {
char *e = strerror(r);
int l = 200 + strlen(e);
char s[l];
t->h->panic=r;
snprintf(s, l-1, "While upgrading brt version, error %d (%s)", r, e);
t->h->panic_string = toku_strdup(s);
}
}
return r;
}
// ################
......@@ -1101,25 +1133,27 @@ void toku_verify_counts (BRTNODE node) {
static u_int32_t
serialize_brt_header_min_size (u_int32_t version) {
u_int32_t size;
u_int32_t size = 0;
switch(version) {
case BRT_LAYOUT_VERSION_10:
case BRT_LAYOUT_VERSION_11:
size = (+8 // "tokudata"
+4 // version
+4 // size
+8 // byte order verification
+8 // checkpoint_count
+8 // checkpoint_lsn
+4 // tree's nodesize
+8 // translation_size_on_disk
+8 // translation_address_on_disk
+4 // checksum
);
size+=(+8 // diskoff
+4 // flags
);
break;
size += 4; // original_version
// fall through to add up bytes in previous version
case BRT_LAYOUT_VERSION_10:
size += (+8 // "tokudata"
+4 // version
+4 // size
+8 // byte order verification
+8 // checkpoint_count
+8 // checkpoint_lsn
+4 // tree's nodesize
+8 // translation_size_on_disk
+8 // translation_address_on_disk
+4 // checksum
);
size+=(+8 // diskoff
+4 // flags
);
break;
default:
assert(FALSE);
}
......@@ -1129,7 +1163,7 @@ serialize_brt_header_min_size (u_int32_t version) {
int toku_serialize_brt_header_size (struct brt_header *h) {
u_int32_t size = serialize_brt_header_min_size(h->layout_version);
//Add any dynamic data.
//There is no dynamic data.
assert(size <= BLOCK_ALLOCATOR_HEADER_RESERVE);
return size;
}
......@@ -1149,10 +1183,11 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h,
wbuf_DISKOFF(wbuf, translation_location_on_disk);
wbuf_DISKOFF(wbuf, translation_size_on_disk);
wbuf_BLOCKNUM(wbuf, h->root);
wbuf_int (wbuf, h->flags);
wbuf_int(wbuf, h->flags);
wbuf_int(wbuf, h->layout_version_original);
u_int32_t checksum = x1764_finish(&wbuf->checksum);
wbuf_int(wbuf, checksum);
assert(wbuf->ndone<=wbuf->size);
assert(wbuf->ndone == wbuf->size);
return 0;
}
......@@ -1403,6 +1438,7 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
h->root_hash.valid = FALSE;
h->flags = rbuf_int(&rc);
deserialize_descriptor_from(fd, h, &h->descriptor);
h->layout_version_original = rbuf_int(&rc);
(void)rbuf_int(&rc); //Read in checksum and ignore (already verified).
if (rc.ndone!=rc.size) {ret = EINVAL; goto died1;}
toku_free(rc.buf);
......@@ -1442,8 +1478,11 @@ deserialize_brtheader_versioned (int fd, struct rbuf *rb, struct brt_header **br
default:
assert(FALSE);
}
if (rval == 0)
if (rval == 0) {
assert((*brth)->layout_version == BRT_LAYOUT_VERSION);
(*brth)->layout_version_read_from_disk = version;
(*brth)->upgrade_brt_performed = FALSE;
}
return rval;
}
......
......@@ -2587,7 +2587,7 @@ toku_brt_broadcast_commit_all (BRT brt)
XIDS message_xids = xids_get_root_xids();
static DBT zero; //Want a zeroed DBT for key, val. Never changes so can be re-used.
BRT_MSG_S brtcmd = { BRT_INSERT, message_xids, .u.id={&zero,&zero}};
BRT_MSG_S brtcmd = { BRT_COMMIT_BROADCAST_ALL, message_xids, .u.id={&zero,&zero}};
r = toku_brt_root_put_cmd(brt, &brtcmd);
if (r!=0) return r;
return r;
......@@ -2883,6 +2883,8 @@ int toku_brt_alloc_init_header(BRT t) {
}
t->h->layout_version = BRT_LAYOUT_VERSION;
t->h->layout_version_original = BRT_LAYOUT_VERSION;
t->h->layout_version_read_from_disk = BRT_LAYOUT_VERSION; // fake, prevent unnecessary upgrade logic
memset(&t->h->descriptor, 0, sizeof(t->h->descriptor));
......@@ -3044,6 +3046,10 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre
t->temp_descriptor.dbt.data = NULL;
t->did_set_descriptor = 0;
}
r = toku_maybe_upgrade_brt(t); // possibly do some work to complete the version upgrade of brt
if (r!=0) goto died_after_read_and_pin;
r = brtheader_note_brt_open(t);
if (r!=0) goto died_after_read_and_pin;
if (t->db) t->db->descriptor = &t->h->descriptor.dbt;
......
......@@ -29,8 +29,9 @@ dump_header (int f, struct brt_header **header) {
int r;
r = toku_deserialize_brtheader_from (f, &h); assert(r==0);
printf("brtheader:\n");
if (h->layout_version==BRT_LAYOUT_VERSION_6) printf(" layout_version<=6\n");
else printf(" layout_version=%d\n", h->layout_version);
printf(" layout_version=%d\n", h->layout_version);
printf(" layout_version_original=%d\n", h->layout_version_original);
printf(" layout_version_read_from_disk=%d\n", h->layout_version_read_from_disk);
printf(" dirty=%d\n", h->dirty);
printf(" nodesize=%u\n", h->nodesize);
printf(" unnamed_root=%" PRId64 "\n", h->root.b);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment