Commit 4c5e9d1a authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1670 Standardized descriptor (de)serialization.

Added versioning to descriptor.
Changing a descriptor REQUIRES the version to increase.
Version 0 is reserved for a non-descriptor db. (cannot be set).
Not yet added to brtnodes.
Still possible to change descriptor with an open brt (which would not be upgraded).


git-svn-id: file:///svn/toku/tokudb@11198 c7de825b-a66e-492c-adef-691d508d4ae1
parent b3c8329d
...@@ -193,6 +193,9 @@ struct __toku_dbt { ...@@ -193,6 +193,9 @@ struct __toku_dbt {
u_int32_t flags; /* 32-bit offset=20 size=4, 64=bit offset=24 size=4 */ u_int32_t flags; /* 32-bit offset=20 size=4, 64=bit offset=24 size=4 */
/* 4 more bytes of alignment in the 64-bit case. */ /* 4 more bytes of alignment in the 64-bit case. */
}; };
typedef int (*toku_dbt_upgradef)(DB*,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val);
struct __toku_db { struct __toku_db {
struct __toku_db_internal *i; struct __toku_db_internal *i;
#define db_struct_i(x) ((x)->i) #define db_struct_i(x) ((x)->i)
...@@ -207,7 +210,7 @@ struct __toku_db { ...@@ -207,7 +210,7 @@ struct __toku_db {
int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */; int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */;
int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */; int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */;
const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */; const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */;
int (*set_descriptor) (DB*, const DBT*) /* set row/dictionary descriptor for a db. Available only while db is open */; int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
void* __toku_dummy0[22]; void* __toku_dummy0[22];
char __toku_dummy1[96]; char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=236 size=4, 64=bit offset=376 size=8 */ void *api_internal; /* 32-bit offset=236 size=4, 64=bit offset=376 size=8 */
......
...@@ -203,6 +203,9 @@ struct __toku_dbt { ...@@ -203,6 +203,9 @@ struct __toku_dbt {
u_int32_t flags; /* 32-bit offset=20 size=4, 64=bit offset=24 size=4 */ u_int32_t flags; /* 32-bit offset=20 size=4, 64=bit offset=24 size=4 */
/* 4 more bytes of alignment in the 64-bit case. */ /* 4 more bytes of alignment in the 64-bit case. */
}; };
typedef int (*toku_dbt_upgradef)(DB*,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val);
struct __toku_db { struct __toku_db {
struct __toku_db_internal *i; struct __toku_db_internal *i;
#define db_struct_i(x) ((x)->i) #define db_struct_i(x) ((x)->i)
...@@ -217,7 +220,7 @@ struct __toku_db { ...@@ -217,7 +220,7 @@ struct __toku_db {
int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */; int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */;
int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */; int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */;
const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */; const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */;
int (*set_descriptor) (DB*, const DBT*) /* set row/dictionary descriptor for a db. Available only while db is open */; int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
void* __toku_dummy0[25]; void* __toku_dummy0[25];
char __toku_dummy1[96]; char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=248 size=4, 64=bit offset=400 size=8 */ void *api_internal; /* 32-bit offset=248 size=4, 64=bit offset=400 size=8 */
......
...@@ -206,6 +206,9 @@ struct __toku_dbt { ...@@ -206,6 +206,9 @@ struct __toku_dbt {
u_int32_t flags; /* 32-bit offset=20 size=4, 64=bit offset=24 size=4 */ u_int32_t flags; /* 32-bit offset=20 size=4, 64=bit offset=24 size=4 */
/* 4 more bytes of alignment in the 64-bit case. */ /* 4 more bytes of alignment in the 64-bit case. */
}; };
typedef int (*toku_dbt_upgradef)(DB*,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val);
struct __toku_db { struct __toku_db {
struct __toku_db_internal *i; struct __toku_db_internal *i;
#define db_struct_i(x) ((x)->i) #define db_struct_i(x) ((x)->i)
...@@ -220,7 +223,7 @@ struct __toku_db { ...@@ -220,7 +223,7 @@ struct __toku_db {
int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */; int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */;
int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */; int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */;
const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */; const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */;
int (*set_descriptor) (DB*, const DBT*) /* set row/dictionary descriptor for a db. Available only while db is open */; int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
void* __toku_dummy0[27]; void* __toku_dummy0[27];
char __toku_dummy1[96]; char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=256 size=4, 64=bit offset=416 size=8 */ void *api_internal; /* 32-bit offset=256 size=4, 64=bit offset=416 size=8 */
......
...@@ -206,6 +206,9 @@ struct __toku_dbt { ...@@ -206,6 +206,9 @@ struct __toku_dbt {
u_int32_t flags; /* 32-bit offset=24 size=4, 64=bit offset=32 size=4 */ u_int32_t flags; /* 32-bit offset=24 size=4, 64=bit offset=32 size=4 */
/* 4 more bytes of alignment in the 64-bit case. */ /* 4 more bytes of alignment in the 64-bit case. */
}; };
typedef int (*toku_dbt_upgradef)(DB*,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val);
struct __toku_db { struct __toku_db {
struct __toku_db_internal *i; struct __toku_db_internal *i;
#define db_struct_i(x) ((x)->i) #define db_struct_i(x) ((x)->i)
...@@ -220,7 +223,7 @@ struct __toku_db { ...@@ -220,7 +223,7 @@ struct __toku_db {
int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */; int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */;
int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */; int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */;
const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */; const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */;
int (*set_descriptor) (DB*, const DBT*) /* set row/dictionary descriptor for a db. Available only while db is open */; int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
void* __toku_dummy0[30]; void* __toku_dummy0[30];
char __toku_dummy1[96]; char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=268 size=4, 64=bit offset=440 size=8 */ void *api_internal; /* 32-bit offset=268 size=4, 64=bit offset=440 size=8 */
......
...@@ -209,6 +209,9 @@ struct __toku_dbt { ...@@ -209,6 +209,9 @@ struct __toku_dbt {
u_int32_t flags; /* 32-bit offset=24 size=4, 64=bit offset=32 size=4 */ u_int32_t flags; /* 32-bit offset=24 size=4, 64=bit offset=32 size=4 */
/* 4 more bytes of alignment in the 64-bit case. */ /* 4 more bytes of alignment in the 64-bit case. */
}; };
typedef int (*toku_dbt_upgradef)(DB*,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val);
struct __toku_db { struct __toku_db {
struct __toku_db_internal *i; struct __toku_db_internal *i;
#define db_struct_i(x) ((x)->i) #define db_struct_i(x) ((x)->i)
...@@ -224,7 +227,7 @@ struct __toku_db { ...@@ -224,7 +227,7 @@ struct __toku_db {
int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */; int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */;
int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */; int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */;
const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */; const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */;
int (*set_descriptor) (DB*, const DBT*) /* set row/dictionary descriptor for a db. Available only while db is open */; int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
void* __toku_dummy1[34]; void* __toku_dummy1[34];
char __toku_dummy2[80]; char __toku_dummy2[80];
void *api_internal; /* 32-bit offset=276 size=4, 64=bit offset=464 size=8 */ void *api_internal; /* 32-bit offset=276 size=4, 64=bit offset=464 size=8 */
......
...@@ -366,6 +366,10 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un ...@@ -366,6 +366,10 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
assert(sizeof(dbt_fields32)==sizeof(dbt_fields64)); assert(sizeof(dbt_fields32)==sizeof(dbt_fields64));
print_struct("dbt", 0, dbt_fields32, dbt_fields64, sizeof(dbt_fields32)/sizeof(dbt_fields32[0]), 0); print_struct("dbt", 0, dbt_fields32, dbt_fields64, sizeof(dbt_fields32)/sizeof(dbt_fields32[0]), 0);
printf("typedef int (*toku_dbt_upgradef)(DB*,\n");
printf(" u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,\n");
printf(" u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val);\n");
assert(sizeof(db_fields32)==sizeof(db_fields64)); assert(sizeof(db_fields32)==sizeof(db_fields64));
{ {
const char *extra[]={"int (*key_range64)(DB*, DB_TXN *, DBT *, u_int64_t *less, u_int64_t *equal, u_int64_t *greater, int *is_exact)", const char *extra[]={"int (*key_range64)(DB*, DB_TXN *, DBT *, u_int64_t *less, u_int64_t *equal, u_int64_t *greater, int *is_exact)",
...@@ -377,7 +381,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un ...@@ -377,7 +381,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
"int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */", "int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */",
"int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */", "int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */",
"const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */", "const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */",
"int (*set_descriptor) (DB*, const DBT*) /* set row/dictionary descriptor for a db. Available only while db is open */", "int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */",
NULL}; NULL};
print_struct("db", 1, db_fields32, db_fields64, sizeof(db_fields32)/sizeof(db_fields32[0]), extra); print_struct("db", 1, db_fields32, db_fields64, sizeof(db_fields32)/sizeof(db_fields32[0]), extra);
} }
......
...@@ -177,6 +177,9 @@ struct __toku_dbt { ...@@ -177,6 +177,9 @@ struct __toku_dbt {
u_int32_t ulen; u_int32_t ulen;
u_int32_t flags; u_int32_t flags;
}; };
typedef int (*toku_dbt_upgradef)(DB*,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val);
struct __toku_db { struct __toku_db {
struct __toku_db_internal *i; struct __toku_db_internal *i;
#define db_struct_i(x) ((x)->i) #define db_struct_i(x) ((x)->i)
...@@ -191,7 +194,7 @@ struct __toku_db { ...@@ -191,7 +194,7 @@ struct __toku_db {
int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */; int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */;
int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */; int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */;
const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */; const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */;
int (*set_descriptor) (DB*, const DBT*) /* set row/dictionary descriptor for a db. Available only while db is open */; int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
void *api_internal; void *api_internal;
int (*close) (DB*, u_int32_t); int (*close) (DB*, u_int32_t);
int (*cursor) (DB *, DB_TXN *, DBC **, u_int32_t); int (*cursor) (DB *, DB_TXN *, DBC **, u_int32_t);
......
...@@ -177,6 +177,9 @@ struct __toku_dbt { ...@@ -177,6 +177,9 @@ struct __toku_dbt {
u_int32_t ulen; u_int32_t ulen;
u_int32_t flags; u_int32_t flags;
}; };
typedef int (*toku_dbt_upgradef)(DB*,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val);
struct __toku_db { struct __toku_db {
struct __toku_db_internal *i; struct __toku_db_internal *i;
#define db_struct_i(x) ((x)->i) #define db_struct_i(x) ((x)->i)
...@@ -191,7 +194,7 @@ struct __toku_db { ...@@ -191,7 +194,7 @@ struct __toku_db {
int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */; int (*delboth) (DB*, DB_TXN*, DBT*, DBT*, u_int32_t) /* Delete the key/value pair. */;
int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */; int (*row_size_supported) (DB*, u_int32_t) /* Test whether a row size is supported. */;
const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */; const DBT *descriptor /* saved row/dictionary descriptor for aiding in comparisons */;
int (*set_descriptor) (DB*, const DBT*) /* set row/dictionary descriptor for a db. Available only while db is open */; int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
void *api_internal; void *api_internal;
int (*close) (DB*, u_int32_t); int (*close) (DB*, u_int32_t);
int (*cursor) (DB *, DB_TXN *, DBC **, u_int32_t); int (*cursor) (DB *, DB_TXN *, DBC **, u_int32_t);
......
...@@ -168,7 +168,7 @@ struct brt_header { ...@@ -168,7 +168,7 @@ struct brt_header {
BLOCKNUM root; // roots of the dictionary BLOCKNUM root; // roots of the dictionary
struct remembered_hash root_hash; // hash of the root offset. struct remembered_hash root_hash; // hash of the root offset.
unsigned int flags; unsigned int flags;
DBT descriptor; struct descriptor descriptor;
u_int64_t root_put_counter; // the generation number of the brt u_int64_t root_put_counter; // the generation number of the brt
...@@ -187,7 +187,8 @@ struct brt { ...@@ -187,7 +187,8 @@ struct brt {
unsigned int flags; unsigned int flags;
unsigned int did_set_flags; unsigned int did_set_flags;
unsigned int did_set_descriptor; unsigned int did_set_descriptor;
DBT temp_descriptor; struct descriptor temp_descriptor;
toku_dbt_upgradef dbt_userformat_upgrade;
int (*compare_fun)(DB*,const DBT*,const DBT*); int (*compare_fun)(DB*,const DBT*,const DBT*);
int (*dup_compare)(DB*,const DBT*,const DBT*); int (*dup_compare)(DB*,const DBT*,const DBT*);
DB *db; // To pass to the compare fun, and close once transactions are done. DB *db; // To pass to the compare fun, and close once transactions are done.
...@@ -215,7 +216,7 @@ int toku_serialize_brt_header_size (struct brt_header *h); ...@@ -215,7 +216,7 @@ int toku_serialize_brt_header_size (struct brt_header *h);
int toku_serialize_brt_header_to (int fd, struct brt_header *h); int toku_serialize_brt_header_to (int fd, struct brt_header *h);
int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h, int64_t address_translation, int64_t size_translation); int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h, int64_t address_translation, int64_t size_translation);
int toku_deserialize_brtheader_from (int fd, struct brt_header **brth); int toku_deserialize_brtheader_from (int fd, struct brt_header **brth);
int toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset); int toku_serialize_descriptor_contents_to_fd(int fd, struct descriptor *desc, DISKOFF offset);
void toku_brtnode_free (BRTNODE *node); void toku_brtnode_free (BRTNODE *node);
......
...@@ -1062,19 +1062,38 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) { ...@@ -1062,19 +1062,38 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
return rr; return rr;
} }
u_int32_t
toku_serialize_descriptor_size(struct descriptor *desc) {
//Checksum NOT included in this. Checksum only exists in header's version.
u_int32_t size = 4+ //version
4; //size
size += desc->dbt.size;
return size;
}
static void
serialize_descriptor_contents_to_wbuf(struct wbuf *wb, struct descriptor *desc) {
if (desc->version==0) assert(desc->dbt.size==0);
wbuf_int(wb, desc->version);
wbuf_bytes(wb, desc->dbt.data, desc->dbt.size);
}
//Descriptor is written to disk during toku_brt_open iff we have a new (or changed) //Descriptor is written to disk during toku_brt_open iff we have a new (or changed)
//descriptor. //descriptor.
//Descriptors are NOT written during the header checkpoint process. //Descriptors are NOT written during the header checkpoint process.
int int
toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset) { toku_serialize_descriptor_contents_to_fd(int fd, struct descriptor *desc, DISKOFF offset) {
int r; int r;
// make the checksum // make the checksum
int64_t size = desc->size+4; //4 for checksum int64_t size = toku_serialize_descriptor_size(desc)+4; //4 for checksum
struct wbuf w; struct wbuf w;
wbuf_init(&w, toku_xmalloc(size), size); wbuf_init(&w, toku_xmalloc(size), size);
wbuf_literal_bytes(&w, desc->data, desc->size); serialize_descriptor_contents_to_wbuf(&w, desc);
u_int32_t checksum = x1764_finish(&w.checksum); {
wbuf_int(&w, checksum); //Add checksum
u_int32_t checksum = x1764_finish(&w.checksum);
wbuf_int(&w, checksum);
}
assert(w.ndone==w.size); assert(w.ndone==w.size);
{ {
lock_for_pwrite(); lock_for_pwrite();
...@@ -1089,7 +1108,25 @@ toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset) { ...@@ -1089,7 +1108,25 @@ toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset) {
} }
static void static void
deserialize_descriptor_from(int fd, struct brt_header *h, DBT *desc) { deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor *desc) {
desc->version = rbuf_int(rb);
u_int32_t size;
bytevec data;
rbuf_bytes(rb, &data, &size);
bytevec data_copy;
if (size>0)
data_copy = toku_memdup(data, size); //Cannot keep the reference from rbuf. Must copy.
else {
assert(size==0);
data_copy = NULL;
}
assert(data_copy);
toku_fill_dbt(&desc->dbt, data_copy, size);
if (desc->version==0) assert(desc->dbt.size==0);
}
static void
deserialize_descriptor_from(int fd, struct brt_header *h, struct descriptor *desc) {
DISKOFF offset; DISKOFF offset;
DISKOFF size; DISKOFF size;
toku_get_descriptor_offset_size(h->blocktable, &offset, &size); toku_get_descriptor_offset_size(h->blocktable, &offset, &size);
...@@ -1111,8 +1148,12 @@ deserialize_descriptor_from(int fd, struct brt_header *h, DBT *desc) { ...@@ -1111,8 +1148,12 @@ deserialize_descriptor_from(int fd, struct brt_header *h, DBT *desc) {
u_int32_t stored_x1764 = toku_dtoh32(*(int*)(dbuf + size-4)); u_int32_t stored_x1764 = toku_dtoh32(*(int*)(dbuf + size-4));
assert(x1764 == stored_x1764); assert(x1764 == stored_x1764);
} }
desc->size = size-4; {
desc->data = dbuf; //Uses 4 extra bytes, but fast. struct rbuf rb = {.buf = dbuf, .size = size, .ndone = 0};
deserialize_descriptor_from_rbuf(&rb, desc);
}
assert(toku_serialize_descriptor_size(desc)+4 == size);
toku_free(dbuf);
} }
} }
} }
......
...@@ -603,7 +603,7 @@ brtheader_destroy(struct brt_header *h) { ...@@ -603,7 +603,7 @@ brtheader_destroy(struct brt_header *h) {
else { else {
assert(h->type == BRTHEADER_CURRENT); assert(h->type == BRTHEADER_CURRENT);
toku_blocktable_destroy(&h->blocktable); toku_blocktable_destroy(&h->blocktable);
if (h->descriptor.data) toku_free(h->descriptor.data); if (h->descriptor.dbt.data) toku_free(h->descriptor.dbt.data);
} }
} }
...@@ -2993,21 +2993,28 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre ...@@ -2993,21 +2993,28 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre
} }
assert(t->h); assert(t->h);
if (t->did_set_descriptor) { if (t->did_set_descriptor) {
if (t->h->descriptor.size!=t->temp_descriptor.size || if (t->h->descriptor.dbt.size!=t->temp_descriptor.dbt.size ||
memcmp(t->h->descriptor.data, t->temp_descriptor.data, t->temp_descriptor.size)) { memcmp(t->h->descriptor.dbt.data, t->temp_descriptor.dbt.data, t->temp_descriptor.dbt.size)) {
if (t->temp_descriptor.version <= t->h->descriptor.version) {
//Changing descriptor requires upping the version.
r = EINVAL;
goto died_after_read_and_pin;
}
//TODO: Disallow changing if exists two brts with the same header (counting this one)
// The upgrade would be impossible/very hard!
DISKOFF offset; DISKOFF offset;
//4 for checksum //4 for checksum
toku_realloc_descriptor_on_disk(t->h->blocktable, t->temp_descriptor.size+4, &offset, t->h); toku_realloc_descriptor_on_disk(t->h->blocktable, toku_serialize_descriptor_size(&t->temp_descriptor)+4, &offset, t->h);
r = toku_serialize_descriptor_contents_to_fd(toku_cachefile_fd(t->cf), &t->temp_descriptor, offset); r = toku_serialize_descriptor_contents_to_fd(toku_cachefile_fd(t->cf), &t->temp_descriptor, offset);
if (r!=0) goto died_after_read_and_pin; if (r!=0) goto died_after_read_and_pin;
if (t->h->descriptor.data) toku_free(t->h->descriptor.data); if (t->h->descriptor.dbt.data) toku_free(t->h->descriptor.dbt.data);
toku_fill_dbt(&t->h->descriptor, t->temp_descriptor.data, t->temp_descriptor.size); t->h->descriptor = t->temp_descriptor;
} }
else toku_free(t->temp_descriptor.data); else toku_free(t->temp_descriptor.dbt.data);
t->temp_descriptor.data = NULL; t->temp_descriptor.dbt.data = NULL;
t->did_set_descriptor = 0; t->did_set_descriptor = 0;
} }
if (t->db) t->db->descriptor = &t->h->descriptor; if (t->db) t->db->descriptor = &t->h->descriptor.dbt;
//Opening a brt may restore to previous checkpoint. Truncate if necessary. //Opening a brt may restore to previous checkpoint. Truncate if necessary.
toku_maybe_truncate_cachefile_on_open(t->h->blocktable, t->h); toku_maybe_truncate_cachefile_on_open(t->h->blocktable, t->h);
...@@ -3216,7 +3223,7 @@ int toku_close_brt (BRT brt, TOKULOGGER logger, char **error_string) { ...@@ -3216,7 +3223,7 @@ int toku_close_brt (BRT brt, TOKULOGGER logger, char **error_string) {
if (r==0 && error_string) assert(*error_string == 0); if (r==0 && error_string) assert(*error_string == 0);
} }
if (brt->fname) toku_free(brt->fname); if (brt->fname) toku_free(brt->fname);
if (brt->temp_descriptor.data) toku_free(brt->temp_descriptor.data); if (brt->temp_descriptor.dbt.data) toku_free(brt->temp_descriptor.dbt.data);
toku_free(brt); toku_free(brt);
return r; return r;
} }
...@@ -3240,15 +3247,20 @@ int toku_brt_create(BRT *brt_ptr) { ...@@ -3240,15 +3247,20 @@ int toku_brt_create(BRT *brt_ptr) {
} }
int int
toku_brt_set_descriptor (BRT t, const DBT *descriptor) { toku_brt_set_descriptor (BRT t, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) {
int r; int r;
if (t->did_set_descriptor) r = EINVAL; if (t->did_set_descriptor) r = EINVAL;
else if (version==0) r = EINVAL; //0 is reserved for default (no descriptor).
else if (dbt_userformat_upgrade==NULL) r = EINVAL; //Must have an upgrade function.
else { else {
void *copy = toku_memdup(descriptor->data, descriptor->size); void *copy = toku_memdup(descriptor->data, descriptor->size);
if (!copy) r = ENOMEM; if (!copy) r = ENOMEM;
else { else {
if (t->temp_descriptor.data) toku_free(t->temp_descriptor.data); t->temp_descriptor.version = version;
toku_fill_dbt(&t->temp_descriptor, copy, descriptor->size); assert(!t->temp_descriptor.dbt.data);
toku_fill_dbt(&t->temp_descriptor.dbt, copy, descriptor->size);
assert(!t->dbt_userformat_upgrade);
t->dbt_userformat_upgrade = dbt_userformat_upgrade;
t->did_set_descriptor = 1; t->did_set_descriptor = 1;
r = 0; r = 0;
} }
......
...@@ -28,9 +28,14 @@ typedef int(*BRT_GET_STRADDLE_CALLBACK_FUNCTION)(ITEMLEN, bytevec, ITEMLEN, byte ...@@ -28,9 +28,14 @@ typedef int(*BRT_GET_STRADDLE_CALLBACK_FUNCTION)(ITEMLEN, bytevec, ITEMLEN, byte
int toku_open_brt (const char *fname, int is_create, BRT *, int nodesize, CACHETABLE, TOKUTXN, int(*)(DB*,const DBT*,const DBT*), DB*); int toku_open_brt (const char *fname, int is_create, BRT *, int nodesize, CACHETABLE, TOKUTXN, int(*)(DB*,const DBT*,const DBT*), DB*);
struct descriptor {
u_int32_t version;
DBT dbt;
};
u_int32_t toku_serialize_descriptor_size(struct descriptor *desc);
int toku_brt_create(BRT *); int toku_brt_create(BRT *);
int toku_brt_set_flags(BRT, unsigned int flags); int toku_brt_set_flags(BRT, unsigned int flags);
int toku_brt_set_descriptor (BRT t, const DBT *descriptor); int toku_brt_set_descriptor (BRT t, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade);
int toku_brt_get_flags(BRT, unsigned int *flags); int toku_brt_get_flags(BRT, unsigned int *flags);
int toku_brt_set_nodesize(BRT, unsigned int nodesize); int toku_brt_set_nodesize(BRT, unsigned int nodesize);
int toku_brt_get_nodesize(BRT, unsigned int *nodesize); int toku_brt_get_nodesize(BRT, unsigned int *nodesize);
......
...@@ -58,6 +58,13 @@ verify_int_cmp (DB *dbp, const DBT *a, const DBT *b) { ...@@ -58,6 +58,13 @@ verify_int_cmp (DB *dbp, const DBT *a, const DBT *b) {
return r; return r;
} }
static int abort_on_upgrade(DB* UU(pdb),
u_int32_t UU(old_version), const DBT *UU(old_descriptor), const DBT *UU(old_key), const DBT *UU(old_val),
u_int32_t UU(new_version), const DBT *UU(new_descriptor), const DBT *UU(new_key), const DBT *UU(new_val)) {
assert(FALSE); //Must not upgrade.
return ENOSYS;
}
static void static void
open_db(int descriptor) { open_db(int descriptor) {
/* create the dup database file */ /* create the dup database file */
...@@ -74,7 +81,8 @@ open_db(int descriptor) { ...@@ -74,7 +81,8 @@ open_db(int descriptor) {
} }
if (descriptor >= 0) { if (descriptor >= 0) {
assert(descriptor < NUM); assert(descriptor < NUM);
r = db->set_descriptor(db, &descriptors[descriptor]); u_int32_t descriptor_version = 1;
r = db->set_descriptor(db, descriptor_version, &descriptors[descriptor], abort_on_upgrade);
CKERR(r); CKERR(r);
last_open_descriptor = descriptor; last_open_descriptor = descriptor;
} }
...@@ -101,6 +109,7 @@ delete_db(void) { ...@@ -101,6 +109,7 @@ delete_db(void) {
CKERR2(r, ENOENT); //Abort deleted it CKERR2(r, ENOENT); //Abort deleted it
} }
else CKERR(r); else CKERR(r);
last_open_descriptor = -1;
} }
static void static void
...@@ -185,12 +194,19 @@ runtest(void) { ...@@ -185,12 +194,19 @@ runtest(void) {
open_db(-1); open_db(-1);
test_insert(i); test_insert(i);
close_db(); close_db();
open_db(-1);
test_insert(i);
close_db();
delete_db();
open_db(order[i]); open_db(order[i]);
test_insert(i); test_insert(i);
close_db(); close_db();
open_db(order[i]);
test_insert(i);
close_db();
delete_db();
} }
delete_db();
env->close(env, 0); env->close(env, 0);
} }
......
...@@ -3282,13 +3282,13 @@ static int toku_db_set_dup_compare(DB *db, int (*dup_compare)(DB *, const DBT *, ...@@ -3282,13 +3282,13 @@ static int toku_db_set_dup_compare(DB *db, int (*dup_compare)(DB *, const DBT *,
return r; return r;
} }
static int toku_db_set_descriptor(DB *db, const DBT *descriptor) { static int toku_db_set_descriptor(DB *db, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
int r; int r;
if (db_opened(db)) return EINVAL; if (db_opened(db)) return EINVAL;
else if (!descriptor) r = EINVAL; else if (!descriptor) r = EINVAL;
else if (descriptor->size>0 && !descriptor->data) r = EINVAL; else if (descriptor->size>0 && !descriptor->data) r = EINVAL;
else r = toku_brt_set_descriptor(db->i->brt, descriptor); else r = toku_brt_set_descriptor(db->i->brt, version, descriptor, dbt_userformat_upgrade);
return r; return r;
} }
...@@ -3573,8 +3573,11 @@ static int locked_db_set_dup_compare(DB * db, int (*dup_compare) (DB *, const DB ...@@ -3573,8 +3573,11 @@ static int locked_db_set_dup_compare(DB * db, int (*dup_compare) (DB *, const DB
toku_ydb_lock(); int r = toku_db_set_dup_compare(db, dup_compare); toku_ydb_unlock(); return r; toku_ydb_lock(); int r = toku_db_set_dup_compare(db, dup_compare); toku_ydb_unlock(); return r;
} }
static int locked_db_set_descriptor(DB *db, const DBT *descriptor) { static int locked_db_set_descriptor(DB *db, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) {
toku_ydb_lock(); int r = toku_db_set_descriptor(db, descriptor); toku_ydb_unlock(); return r; toku_ydb_lock();
int r = toku_db_set_descriptor(db, version, descriptor, dbt_userformat_upgrade);
toku_ydb_unlock();
return r;
} }
static void locked_db_set_errfile (DB *db, FILE *errfile) { static void locked_db_set_errfile (DB *db, FILE *errfile) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment