Commit 7b09e824 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1924 refs[t:1924] Initial version of upgrade (10->11) logic is complete

git-svn-id: file:///svn/toku/tokudb@13938 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5aea325f
......@@ -3,6 +3,8 @@
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "ule.h"
#include "fifo.h"
/*******************
* Purpose of this file is to provide backwards compatibility with earlier versions
* of the file format.
......@@ -15,11 +17,417 @@
*
*/
enum { BRT_CMD_OVERHEAD_10 = (1 // the type
+ 8) // the xid
};
// FIFO_10 (data structure changed, so we need to provide the old one)
// Calculate the fingerprint for a kvpair
static void toku_calc_more_murmur_kvpair (struct x1764 *mm, const void *key, int keylen, const void *val, int vallen) {
int i;
i = toku_htod32(keylen);
x1764_add(mm, (void*)&i, 4);
x1764_add(mm, key, keylen);
i = toku_htod32(vallen);
x1764_add(mm, (void*)&i, 4);
x1764_add(mm, val, vallen);
}
static u_int32_t calc_fingerprint_cmd10 (u_int32_t type, TXNID xid, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen) {
unsigned char type_c = (unsigned char)type;
unsigned int a = toku_htod32(xid>>32);
unsigned int b = toku_htod32(xid&0xffffffff);
struct x1764 mm;
x1764_init(&mm);
x1764_add(&mm, &type_c, 1);
x1764_add(&mm, &a, 4);
x1764_add(&mm, &b, 4);
toku_calc_more_murmur_kvpair(&mm, key, keylen, val, vallen);
return x1764_finish(&mm);
}
#define FIFO10_ITERATE(fifo10,keyvar,keylenvar,datavar,datalenvar,typevar,xidvar,body) do { \
int fifo10_iterate_off; \
for (fifo10_iterate_off = toku_fifo10_iterate_internal_start(fifo10); \
toku_fifo10_iterate_internal_has_more(fifo10, fifo10_iterate_off); \
fifo10_iterate_off = toku_fifo10_iterate_internal_next(fifo10, fifo10_iterate_off)) { \
struct fifo10_entry *e = toku_fifo10_iterate_internal_get_entry(fifo10, fifo10_iterate_off); \
bytevec keyvar = e->key; \
ITEMLEN keylenvar = e->keylen; \
bytevec datavar = e->key + e->keylen; \
ITEMLEN datalenvar = e->vallen; \
int typevar = e->type; \
TXNID xidvar = e->xid; \
body; \
} } while (0)
struct fifo10_entry {
unsigned int keylen;
unsigned int vallen;
unsigned char type;
TXNID xid;
unsigned char key[];
};
struct fifo {
int n_items_in_fifo;
char *memory; // An array of bytes into which fifo10_entries are embedded.
int memory_size; // How big is fifo10_memory
int memory_start; // Where is the first used byte?
int memory_used; // How many bytes are in use?
};
const int fifo10_initial_size = 4096;
static void fifo10_init(struct fifo *fifo10) {
fifo10->n_items_in_fifo = 0;
fifo10->memory = 0;
fifo10->memory_size = 0;
fifo10->memory_start = 0;
fifo10->memory_used = 0;
}
static int fifo10_entry_size(struct fifo10_entry *entry) {
return sizeof (struct fifo10_entry) + entry->keylen + entry->vallen;
}
static int toku_fifo10_create(FIFO *ptr) {
struct fifo *MALLOC(fifo10);
if (fifo10 == 0) return ENOMEM;
fifo10_init(fifo10);
*ptr = fifo10;
return 0;
}
static void toku_fifo10_free(FIFO *ptr) {
FIFO fifo10 = *ptr;
if (fifo10->memory) toku_free(fifo10->memory);
fifo10->memory=0;
toku_free(fifo10);
*ptr = 0;
}
static int next_power_of_two (int n) {
int r = 4096;
while (r < n) {
r*=2;
assert(r>0);
}
return r;
}
static int toku_fifo10_enq(FIFO fifo10, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, TXNID xid) {
int need_space_here = sizeof(struct fifo10_entry) + keylen + datalen;
int need_space_total = fifo10->memory_used+need_space_here;
if (fifo10->memory == NULL) {
fifo10->memory_size = next_power_of_two(need_space_total);
fifo10->memory = toku_malloc(fifo10->memory_size);
}
if (fifo10->memory_start+need_space_total > fifo10->memory_size) {
// Out of memory at the end.
int next_2 = next_power_of_two(need_space_total);
if ((2*next_2 > fifo10->memory_size)
|| (8*next_2 < fifo10->memory_size)) {
// resize the fifo10
char *newmem = toku_malloc(next_2);
char *oldmem = fifo10->memory;
if (newmem==0) return ENOMEM;
memcpy(newmem, oldmem+fifo10->memory_start, fifo10->memory_used);
fifo10->memory_size = next_2;
fifo10->memory_start = 0;
fifo10->memory = newmem;
toku_free(oldmem);
} else {
// slide things over
memmove(fifo10->memory, fifo10->memory+fifo10->memory_start, fifo10->memory_used);
fifo10->memory_start = 0;
}
}
struct fifo10_entry *entry = (struct fifo10_entry *)(fifo10->memory + fifo10->memory_start + fifo10->memory_used);
entry->type = (unsigned char)type;
entry->xid = xid;
entry->keylen = keylen;
memcpy(entry->key, key, keylen);
entry->vallen = datalen;
memcpy(entry->key + keylen, data, datalen);
fifo10->n_items_in_fifo++;
fifo10->memory_used += need_space_here;
return 0;
}
static int toku_fifo10_iterate_internal_start(FIFO fifo10) { return fifo10->memory_start; }
static int toku_fifo10_iterate_internal_has_more(FIFO fifo10, int off) { return off < fifo10->memory_start + fifo10->memory_used; }
static int toku_fifo10_iterate_internal_next(FIFO fifo10, int off) {
struct fifo10_entry *e = (struct fifo10_entry *)(fifo10->memory + off);
return off + fifo10_entry_size(e);
}
struct fifo10_entry * toku_fifo10_iterate_internal_get_entry(FIFO fifo10, int off) {
return (struct fifo10_entry *)(fifo10->memory + off);
}
// LEAFENTRY ACCESSORS
//
// This ugly factorization of the macro is done so that we can do ## or not depending on which version of the
// compiler we are using, without repeating all this crufty offset calculation.
static inline void putint (unsigned char *p, u_int32_t i) {
#if 1
*(u_int32_t*)p = toku_htod32(i);
#else
p[0]=(i>>24)&0xff;
p[1]=(i>>16)&0xff;
p[2]=(i>> 8)&0xff;
p[3]=(i>> 0)&0xff;
#endif
}
static inline void putint64 (unsigned char *p, u_int64_t i) {
putint(p, (u_int32_t)(i>>32));
putint(p+4, (u_int32_t)(i&0xffffffff));
}
static inline u_int32_t getint (unsigned char *p) {
#if 1
return toku_dtoh32(*(u_int32_t*)p);
#else
return (p[0]<<24)+(p[1]<<16)+(p[2]<<8)+(p[3]);
#endif
}
static inline u_int64_t getint64 (unsigned char *p) {
u_int64_t H = getint(p);
u_int64_t L = getint(p+4);
return (H<<32) + L;
}
#define DO_LE_COMMITTED(funname,le) case LE_COMMITTED: { \
unsigned char* __klenaddr = 1+(unsigned char*)le; u_int32_t __klen = getint(__klenaddr); \
unsigned char* __kvaladdr = 4 + __klenaddr; \
unsigned char* __clenaddr = __klen + __kvaladdr; u_int32_t __clen = getint(__clenaddr); \
unsigned char* __cvaladdr = 4 + __clenaddr; \
return funname ## _le10_committed(__klen, __kvaladdr, __clen, __cvaladdr
#define DO_LE_BOTH(funname,le) case LE_BOTH: { \
unsigned char* __xidaddr = 1+(unsigned char*)le; u_int64_t __xid = getint64(__xidaddr); \
unsigned char* __klenaddr = 8 + __xidaddr; u_int32_t __klen = getint(__klenaddr); \
unsigned char* __kvaladdr = 4 + __klenaddr; \
unsigned char* __clenaddr = __klen + __kvaladdr; u_int32_t __clen = getint(__clenaddr); \
unsigned char* __cvaladdr = 4 + __clenaddr; \
unsigned char* __plenaddr = __clen + __cvaladdr; u_int32_t __plen = getint(__plenaddr); \
unsigned char* __pvaladdr = 4 + __plenaddr; \
return funname ## _le10_both(__xid, __klen, __kvaladdr, __clen, __cvaladdr, __plen, __pvaladdr
#define DO_LE_PROVDEL(funname,le ) case LE_PROVDEL: { \
unsigned char* __xidaddr = 1+(unsigned char*)le; u_int64_t __xid = getint64(__xidaddr); \
unsigned char* __klenaddr = 8 + __xidaddr; u_int32_t __klen = getint(__klenaddr); \
unsigned char* __kvaladdr = 4 + __klenaddr; \
unsigned char* __dlenaddr = __klen + __kvaladdr; u_int32_t __dlen = getint(__dlenaddr); \
unsigned char* __dvaladdr = 4 + __dlenaddr; \
return funname ## _le10_provdel(__xid, __klen, __kvaladdr, __dlen, __dvaladdr
#define DO_LE_PROVPAIR(funname,le) case LE_PROVPAIR: { \
unsigned char* __xidaddr = 1+(unsigned char*)le; u_int64_t __xid = getint64(__xidaddr); \
unsigned char* __klenaddr = 8 + __xidaddr; u_int32_t __klen = getint(__klenaddr); \
unsigned char* __kvaladdr = 4 + __klenaddr; \
unsigned char* __plenaddr = __klen + __kvaladdr; u_int32_t __plen = getint(__plenaddr); \
unsigned char* __pvaladdr = 4 + __plenaddr; \
return funname ## _le10_provpair(__xid, __klen, __kvaladdr, __plen, __pvaladdr
#ifdef __ICL
#define LESWITCHCALL(le,funname, ...) do { \
switch(get_le_state(le)) { \
DO_LE_COMMITTED(funname,le) , __VA_ARGS__); } \
DO_LE_BOTH (funname,le) , __VA_ARGS__); } \
DO_LE_PROVDEL (funname,le) , __VA_ARGS__); } \
DO_LE_PROVPAIR (funname,le) , __VA_ARGS__); } \
} abort(); } while (0)
#else
#define LESWITCHCALL(le,funname, ...) do { \
switch(get_le_state(le)) { \
DO_LE_COMMITTED(funname,le) , ## __VA_ARGS__); } \
DO_LE_BOTH (funname,le) , ## __VA_ARGS__); } \
DO_LE_PROVDEL (funname,le) , ## __VA_ARGS__); } \
DO_LE_PROVPAIR (funname,le) , ## __VA_ARGS__); } \
} abort(); } while (0)
#endif
static u_int32_t memsize_le10_committed (u_int32_t keylen, void *key __attribute__((__unused__)),
u_int32_t vallen, void *val __attribute__((__unused__))) {
return 1+ 2*4 + keylen + vallen;
}
static u_int32_t memsize_le10_both (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t clen, void *cval __attribute__((__unused__)),
u_int32_t plen, void *pval __attribute__((__unused__))) {
return 1 + 8 + 4*3 + klen + clen + plen;
}
static u_int32_t memsize_le10_provdel (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t clen, void *cval __attribute__((__unused__))) {
return 1 + 8 + 4*2 + klen + clen;
}
static u_int32_t memsize_le10_provpair (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t plen, void *pval __attribute__((__unused__))) {
return 1 + 8 + 4*2 + klen + plen;
}
u_int32_t leafentry_memsize_10 (LEAFENTRY le) {
LESWITCHCALL(le, memsize);
abort(); return 0; // make certain compilers happy
}
static u_int32_t disksize_le10_committed (u_int32_t keylen, void *key __attribute__((__unused__)),
u_int32_t vallen, void *val __attribute__((__unused__))) {
return 1 + 4 + 4 + keylen + vallen;
}
static u_int32_t disksize_le10_both (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t clen, void *cval __attribute__((__unused__)),
u_int32_t plen, void *pval __attribute__((__unused__))) {
return 1 + 8 + 4*3 + klen + clen + plen;
}
static u_int32_t disksize_le10_provdel (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t clen, void *cval __attribute__((__unused__))) {
return 1 + 8 + 4 + 4 + klen + clen;
}
static u_int32_t disksize_le10_provpair (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t plen, void *pval __attribute__((__unused__))) {
return 1 + 8 + 4 + 4 + klen + plen;
}
static u_int32_t
le10_disksize_internal (LEAFENTRY le) {
LESWITCHCALL(le, disksize);
abort(); return 0; // make certain compilers happy
}
u_int32_t le10_disksize (LEAFENTRY le) {
u_int32_t d = le10_disksize_internal(le);
// this computation is currently identical to the _disksize_internal
u_int32_t m = leafentry_memsize_10(le);
assert(m==d);
return d;
}
u_int32_t any_keylen_le10_committed (u_int32_t keylen, void *UU(key), u_int32_t UU(vallen), void *UU(val)) {
return keylen;
}
u_int32_t any_keylen_le10_both (TXNID UU(xid), u_int32_t klen, void *UU(kval), u_int32_t UU(clen), void *UU(cval), u_int32_t UU(plen), void *UU(pval)) {
return klen;
}
u_int32_t any_keylen_le10_provdel (TXNID UU(xid), u_int32_t klen, void *UU(kval), u_int32_t UU(clen), void *UU(cval)) {
return klen;
}
u_int32_t any_keylen_le10_provpair (TXNID UU(xid), u_int32_t klen, void *UU(kval), u_int32_t UU(plen), void *UU(pval)) {
return klen;
}
u_int32_t le10_any_keylen (LEAFENTRY le) {
LESWITCHCALL(le, any_keylen);
abort(); return 0; // make certain compilers happy
}
u_int32_t any_vallen_le10_committed (u_int32_t UU(keylen), void *UU(key), u_int32_t vallen, void *UU(val)) {
return vallen;
}
u_int32_t any_vallen_le10_both (TXNID UU(xid), u_int32_t UU(klen), void *UU(kval), u_int32_t UU(clen), void *UU(cval), u_int32_t plen, void *UU(pval)) {
return plen;
}
u_int32_t any_vallen_le10_provdel (TXNID UU(xid), u_int32_t UU(klen), void *UU(kval), u_int32_t clen, void *UU(cval)) {
return clen; // for provisional delete, there is no *any* key, so return 0. What else can we do?
}
u_int32_t any_vallen_le10_provpair (TXNID UU(xid), u_int32_t UU(klen), void *UU(kval), u_int32_t plen, void *UU(pval)) {
return plen;
}
u_int32_t le10_any_vallen (LEAFENTRY le) {
LESWITCHCALL(le, any_vallen);
abort(); return 0; // make certain compilers happy
}
//LEAFENTRY constructors
//Constructors for version 10 leafentries, possibly needed for upgrades.
int
le10_committed (u_int32_t klen, void* kval, u_int32_t dlen, void* dval, u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result) {
size_t size = 9+klen+dlen;
unsigned char *lec=toku_malloc(size);
assert(lec);
lec[0] = LE_COMMITTED;
putint(lec+1, klen);
memcpy(lec+1+4, kval, klen);
putint(lec+1+4+klen, dlen);
memcpy(lec+1+4+klen+4, dval, dlen);
*resultsize=size;
*disksize = 1 + 4 + 4 + klen + dlen;
*result=(LEAFENTRY)lec;
return 0;
}
int
le10_both (TXNID xid, u_int32_t klen, void* kval, u_int32_t clen, void* cval, u_int32_t plen, void* pval,
u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result) {
size_t size = 1+8+4*3+klen+clen+plen;
unsigned char *lec=toku_malloc(size);
assert(lec);
lec[0] = LE_BOTH;
putint64(lec+1, xid);
putint (lec+1+8, klen);
memcpy (lec+1+8+4, kval, klen);
putint (lec+1+8+4+klen, clen);
memcpy (lec+1+8+4+klen+4, cval, clen);
putint (lec+1+8+4+klen+4+clen, plen);
memcpy (lec+1+8+4+klen+4+clen+4, pval, plen);
*resultsize=size;
*disksize = 1 + 8 + 4*3 + klen + clen + plen;
*result=(LEAFENTRY)lec;
return 0;
}
int
le10_provdel (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dval,
u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result) {
size_t size = 1 + 8 + 2*4 + klen + dlen;
unsigned char *lec= toku_malloc(size);
assert(lec);
lec[0] = LE_PROVDEL;
putint64(lec+1, xid);
putint (lec+1+8, klen);
memcpy (lec+1+8+4, kval, klen);
putint (lec+1+8+4+klen, dlen);
memcpy (lec+1+8+4+klen+4, dval, dlen);
*memsize=size;
*disksize = 1 + 4 + 4 + 8 + klen + dlen;
*result=(LEAFENTRY)lec;
return 0;
}
int
le10_provpair (TXNID xid, u_int32_t klen, void* kval, u_int32_t plen, void* pval, u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result) {
size_t size = 1 + 8 + 2*4 + klen + plen;
unsigned char *lec= toku_malloc(size);
assert(lec);
lec[0] = LE_PROVPAIR;
putint64(lec+1, xid);
putint (lec+1+8, klen);
memcpy (lec+1+8+4, kval, klen);
putint (lec+1+8+4+klen, plen);
memcpy (lec+1+8+4+klen+4, pval, plen);
*memsize=size;
*disksize = 1 + 4 + 4 + 8 + klen + plen;
*result=(LEAFENTRY)lec;
return 0;
}
// Given a version 10 header, create a version 11 header.
// If new memory is needed for the new header, allocate it here and free the memory of the old version header.
static int
upgrade_brtheader_10_11(brt_header_10 **brth_10, brt_header_11 ** brth_11) {
upgrade_brtheader_10_11(struct brt_header **brth_10, struct brt_header ** brth_11) {
assert((*brth_10)->layout_version == BRT_LAYOUT_VERSION_10);
*brth_11 = *brth_10;
*brth_10 = NULL;
......@@ -29,7 +437,7 @@ upgrade_brtheader_10_11(brt_header_10 **brth_10, brt_header_11 ** brth_11) {
static int
deserialize_brtheader_10 (int fd, struct rbuf *rb, brt_header_10 **brth) {
deserialize_brtheader_10 (int fd, struct rbuf *rb, struct brt_header **brth) {
// We already know:
// we have an rbuf representing the header.
// The checksum has been validated
......@@ -47,7 +455,7 @@ deserialize_brtheader_10 (int fd, struct rbuf *rb, brt_header_10 **brth) {
}
brt_header_10 *CALLOC(h);
struct brt_header *CALLOC(h);
if (h==0) return errno;
int ret=-1;
if (0) { died1: toku_free(h); return ret; }
......@@ -116,13 +524,456 @@ decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf
return r;
}
static int
deserialize_brtnode_leaf_from_rbuf_10 (BRTNODE result, bytevec magic, struct rbuf *rb) {
//The only difference between this version and version 11 (for this function)
//is the line that calculates size of leafentry.
int r;
int i;
if (memcmp(magic, "tokuleaf", 8)!=0) {
r = toku_db_badformat();
return r;
}
result->u.l.leaf_stats.nkeys = rbuf_ulonglong(rb);
result->u.l.leaf_stats.ndata = rbuf_ulonglong(rb);
result->u.l.leaf_stats.dsize = rbuf_ulonglong(rb);
result->u.l.leaf_stats.exact = TRUE;
int n_in_buf = rbuf_int(rb);
result->u.l.n_bytes_in_buffer = 0;
result->u.l.seqinsert = 0;
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
toku_mempool_init(&result->u.l.buffer_mempool, rb->buf, rb->size);
u_int32_t actual_sum = 0;
u_int32_t start_of_data = rb->ndone;
OMTVALUE *MALLOC_N(n_in_buf, array);
for (i=0; i<n_in_buf; i++) {
LEAFENTRY le = (LEAFENTRY)(&rb->buf[rb->ndone]);
u_int32_t disksize = le10_disksize(le); //Only difference between 10 & 11
rb->ndone += disksize;
assert(rb->ndone<=rb->size);
array[i]=(OMTVALUE)le;
actual_sum += x1764_memory(le, disksize);
}
toku_trace("fill array");
u_int32_t end_of_data = rb->ndone;
result->u.l.n_bytes_in_buffer += end_of_data-start_of_data + n_in_buf*OMT_ITEM_OVERHEAD;
actual_sum *= result->rand4fingerprint;
r = toku_omt_create_steal_sorted_array(&result->u.l.buffer, &array, n_in_buf, n_in_buf);
toku_trace("create omt");
if (r!=0) {
toku_free(array);
r = toku_db_badformat();
if (0) { died_1: toku_omt_destroy(&result->u.l.buffer); }
return r;
}
assert(array==NULL);
result->u.l.buffer_mempool.frag_size = start_of_data;
result->u.l.buffer_mempool.free_offset = end_of_data;
if (r!=0) goto died_1;
if (actual_sum!=result->local_fingerprint) {
//fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf);
r = toku_db_badformat();
goto died_1;
} else {
//fprintf(stderr, "%s:%d Good checksum=%08x height=%d\n", __FILE__, __LINE__, actual_sum, result->height);
}
//toku_verify_counts(result);
(void)rbuf_int(rb); //Ignore the crc (already verified).
if (rb->ndone != rb->size) { //Verify we read exactly the entire block.
r = toku_db_badformat(); goto died_1;
}
r = toku_leaflock_borrow(&result->u.l.leaflock);
if (r!=0) goto died_1;
rb->buf = NULL; //Buffer was used for node's mempool.
return 0;
}
static int
deserialize_brtnode_nonleaf_from_rbuf_10 (BRTNODE result, bytevec magic, struct rbuf *rb) {
assert(FALSE); //This is so far just the copy of version 11
int r;
int i;
if (memcmp(magic, "tokunode", 8)!=0) {
r = toku_db_badformat();
return r;
}
result->u.n.totalchildkeylens=0;
u_int32_t subtree_fingerprint = rbuf_int(rb);
u_int32_t check_subtree_fingerprint = 0;
result->u.n.n_children = rbuf_int(rb);
MALLOC_N(result->u.n.n_children+1, result->u.n.childinfos);
MALLOC_N(result->u.n.n_children, result->u.n.childkeys);
//printf("n_children=%d\n", result->n_children);
assert(result->u.n.n_children>=0);
for (i=0; i<result->u.n.n_children; i++) {
u_int32_t childfp = rbuf_int(rb);
BNC_SUBTREE_FINGERPRINT(result, i)= childfp;
check_subtree_fingerprint += childfp;
struct subtree_estimates *se = &(BNC_SUBTREE_ESTIMATES(result, i));
se->nkeys = rbuf_ulonglong(rb);
se->ndata = rbuf_ulonglong(rb);
se->dsize = rbuf_ulonglong(rb);
se->exact = (BOOL) (rbuf_char(rb) != 0);
}
for (i=0; i<result->u.n.n_children-1; i++) {
if (result->flags & TOKU_DB_DUPSORT) {
bytevec keyptr, dataptr;
unsigned int keylen, datalen;
rbuf_bytes(rb, &keyptr, &keylen);
rbuf_bytes(rb, &dataptr, &datalen);
result->u.n.childkeys[i] = kv_pair_malloc(keyptr, keylen, dataptr, datalen);
} else {
bytevec childkeyptr;
unsigned int cklen;
rbuf_bytes(rb, &childkeyptr, &cklen); /* Returns a pointer into the rbuf. */
result->u.n.childkeys[i] = kv_pair_malloc((void*)childkeyptr, cklen, 0, 0);
}
//printf(" key %d length=%d data=%s\n", i, result->childkeylens[i], result->childkeys[i]);
result->u.n.totalchildkeylens+=toku_brtnode_pivot_key_len(result, result->u.n.childkeys[i]);
}
for (i=0; i<result->u.n.n_children; i++) {
BNC_BLOCKNUM(result,i) = rbuf_blocknum(rb);
BNC_HAVE_FULLHASH(result, i) = FALSE;
BNC_NBYTESINBUF(result,i) = 0;
//printf("Child %d at %lld\n", i, result->children[i]);
}
result->u.n.n_bytes_in_buffers = 0;
for (i=0; i<result->u.n.n_children; i++) {
r=toku_fifo10_create(&BNC_BUFFER(result,i));
if (r!=0) {
int j;
if (0) { died_1: j=result->u.n.n_bytes_in_buffers; }
for (j=0; j<i; j++) toku_fifo10_free(&BNC_BUFFER(result,j));
return toku_db_badformat();
}
}
{
int cnum;
u_int32_t check_local_fingerprint = 0;
for (cnum=0; cnum<result->u.n.n_children; cnum++) {
int n_in_this_hash = rbuf_int(rb);
//printf("%d in hash\n", n_in_hash);
//START HERE
for (i=0; i<n_in_this_hash; i++) {
int diff;
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
//toku_verify_counts(result);
int type = rbuf_char(rb);
TXNID xid = rbuf_ulonglong(rb);
rbuf_bytes(rb, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rb, &val, &vallen);
check_local_fingerprint += result->rand4fingerprint * calc_fingerprint_cmd10(type, xid, key, keylen, val, vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val);
{
r=toku_fifo10_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type, xid); /* Copies the data into the hash table. */
if (r!=0) { goto died_1; }
}
diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD_10;
result->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(result,cnum) += diff;
//printf("Inserted\n");
}
}
if (check_local_fingerprint != result->local_fingerprint) {
fprintf(stderr, "%s:%d local fingerprint is wrong (found %8x calcualted %8x\n", __FILE__, __LINE__, result->local_fingerprint, check_local_fingerprint);
return toku_db_badformat();
}
if (check_subtree_fingerprint+check_local_fingerprint != subtree_fingerprint) {
fprintf(stderr, "%s:%d subtree fingerprint is wrong\n", __FILE__, __LINE__);
return toku_db_badformat();
}
}
(void)rbuf_int(rb); //Ignore the crc (already verified).
if (rb->ndone != rb->size) { //Verify we read exactly the entire block.
r = toku_db_badformat(); goto died_1;
}
return 0;
}
static int
deserialize_brtnode_from_rbuf_10 (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h, struct rbuf *rb) {
blocknum = blocknum;
fullhash = fullhash;
brtnode = brtnode;
h = h;
rb = rb;
assert(FALSE);
TAGMALLOC(BRTNODE, result);
int r;
if (result==0) {
r=errno;
if (0) { died0: toku_free(result); }
return r;
}
result->desc = &h->descriptor;
result->ever_been_written = 1;
//printf("Deserializing %lld datasize=%d\n", off, datasize);
bytevec magic;
rbuf_literal_bytes(rb, &magic, 8);
result->layout_version = rbuf_int(rb);
assert(result->layout_version == BRT_LAYOUT_VERSION_10);
result->disk_lsn.lsn = rbuf_ulonglong(rb);
{
//Restrict scope for now since we do not support upgrades.
struct descriptor desc;
//desc.dbt.data is TEMPORARY. Will be unusable when the rc buffer is freed.
deserialize_descriptor_from_rbuf(rb, &desc, TRUE);
assert(desc.version == result->desc->version); //We do not yet support upgrading the dbts.
}
result->nodesize = rbuf_int(rb);
result->log_lsn = result->disk_lsn;
result->thisnodename = blocknum;
result->flags = rbuf_int(rb);
result->height = rbuf_int(rb);
result->rand4fingerprint = rbuf_int(rb);
result->local_fingerprint = rbuf_int(rb);
// printf("%s:%d read %08x\n", __FILE__, __LINE__, result->local_fingerprint);
result->dirty = 0;
result->fullhash = fullhash;
//printf("height==%d\n", result->height);
if (result->height>0)
r = deserialize_brtnode_nonleaf_from_rbuf_10(result, magic, rb);
else
r = deserialize_brtnode_leaf_from_rbuf_10(result, magic, rb);
if (r!=0) goto died0;
//printf("%s:%d Ok got %lld n_children=%d\n", __FILE__, __LINE__, result->thisnodename, result->n_children);
if (result->height>0) {
// For height==0 we used the buf inside the OMT
toku_free(rb->buf);
rb->buf = NULL;
}
toku_trace("deserial done");
*brtnode = result;
//toku_verify_counts(result);
return 0;
}
static void le_unpack_le10_committed(u_int32_t klen, void *kval, u_int32_t vallen, void *val, ULE ule) {
//Committed value
toku_upgrade_ule_init_empty_ule(ule, klen, kval);
toku_upgrade_ule_remove_innermost_uxr(ule); // pop committed delete
toku_upgrade_ule_push_insert_uxr(ule, 0, vallen, val);
}
static void le_unpack_le10_both(TXNID xid, u_int32_t klen, void *kval, u_int32_t clen, void *cval, u_int32_t plen, void *pval, ULE ule) {
if (xid==0) {
//Really committed
le_unpack_le10_committed(klen, kval, plen, pval, ule);
}
else {
//committed value and provisional insert
toku_upgrade_ule_init_empty_ule(ule, klen, kval);
toku_upgrade_ule_remove_innermost_uxr(ule); // pop committed delete
toku_upgrade_ule_push_insert_uxr(ule, 0, clen, cval); // push committed
toku_upgrade_ule_push_insert_uxr(ule, xid, plen, pval); // push provisional
}
}
static void le_unpack_le10_provdel(TXNID xid, u_int32_t klen, void *kval, u_int32_t clen, void *cval, ULE ule) {
if (xid==0) {
//Really committed delete
toku_upgrade_ule_init_empty_ule(ule, klen, kval);
}
else {
//committed value and provisional delete
toku_upgrade_ule_init_empty_ule(ule, klen, kval);
toku_upgrade_ule_remove_innermost_uxr(ule); // pop committed delete
toku_upgrade_ule_push_insert_uxr(ule, 0, clen, cval); // push committed
toku_upgrade_ule_push_delete_uxr(ule, xid); // push provisional
}
}
static void le_unpack_le10_provpair(TXNID xid, u_int32_t klen, void *kval, u_int32_t plen, void *pval, ULE ule) {
if (xid==0) {
//Really committed
le_unpack_le10_committed(klen, kval, plen, pval, ule);
}
else {
//committed delete and provisional insert
toku_upgrade_ule_init_empty_ule(ule, klen, kval);
toku_upgrade_ule_push_insert_uxr(ule, xid, plen, pval); // push provisional
}
}
//Used to unpack a version 10 record to ule, which can be packed to version 11.
static void
le_unpack_from_version_10(ULE ule, LEAFENTRY le) {
LESWITCHCALL(le, le_unpack, ule);
abort(); return; // make certain compilers happy
}
static u_int32_t
le10_crc(LEAFENTRY v) {
return x1764_memory(v, leafentry_memsize_10(v));
}
//old_le10 is opaque data only readable by accessors (Not a 'new' LEAFENTRY)
static int
upgrade_single_leafentry_10_11 (BRTNODE node, u_int32_t idx, LEAFENTRY old_le10) {
//See brt_leaf_apply_cmd_once for template
size_t newlen=0, newdisksize=0;
LEAFENTRY new_le = NULL;
void *maybe_free = NULL;
ULE_S ule;
int r;
assert(old_le10);
le_unpack_from_version_10(&ule, old_le10);
r = le_pack(&ule, // create packed leafentry
&newlen, &newdisksize,
&new_le,
node->u.l.buffer, &node->u.l.buffer_mempool, &maybe_free);
if (r!=0) goto cleanup;
if (new_le) {
//Version 10 leafentry is being upgraded
//Update size of memory information and crc
//Subtract old version 10 leafentry information
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + le10_disksize(old_le10);
node->local_fingerprint -= node->rand4fingerprint * le10_crc(old_le10);
u_int32_t size = leafentry_memsize_10(old_le10);
// This mfree must occur after the mempool_malloc so that when the mempool is compressed everything is accounted for.
// But we must compute the size before doing the mempool mfree because otherwise the le pointer is no good.
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since old_le10 may be no good any more.
assert(newdisksize == leafentry_disksize(new_le));
//Add new version 10 leafentry information
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(new_le);
if ((r = toku_omt_set_at(node->u.l.buffer, new_le, idx))) goto cleanup;
}
else {
//Version 10 leafentry is being deleted
//It was there, note that it's gone
//(Was already removed from mempool)
// Figure out if one of the other keys is the same key
toku_upgrade_maybe_bump_nkeys(node, idx, old_le10, -1);
if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) goto cleanup;
//Subtract old version 10 leafentry information
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + le10_disksize(old_le10);
node->local_fingerprint -= node->rand4fingerprint * le10_crc(old_le10);
{
u_int32_t oldlen = le10_any_vallen(old_le10) + le10_any_keylen(old_le10);
assert(node->u.l.leaf_stats.dsize >= oldlen);
node->u.l.leaf_stats.dsize -= oldlen;
}
assert(node->u.l.leaf_stats.dsize < (1U<<31)); // make sure we didn't underflow
node->u.l.leaf_stats.ndata --;
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, leafentry_memsize(old_le10)); // Must pass 0, since old_le10 may be no good any more.
}
r = 0;
cleanup:
if (maybe_free) toku_free(maybe_free);
return r;
}
//Upgrade each leafentry from version 10 to 11(nested transactions)
//Need to update checksums, and memory pools
static int
upgrade_brtnode_leaf_10_11 (BRTNODE node) {
int r;
u_int32_t idx = 0;
u_int32_t omtsize = toku_omt_size(node->u.l.buffer);
while (idx < omtsize) {
OMTVALUE old_le10;
r = toku_omt_fetch(node->u.l.buffer, idx, &old_le10, NULL);
assert(r==0);
assert(old_le10);
r = upgrade_single_leafentry_10_11(node, idx, old_le10);
if (r!=0) goto cleanup;
u_int32_t new_omtsize = toku_omt_size(node->u.l.buffer);
if (new_omtsize != omtsize) {
assert(omtsize-1 == new_omtsize);
omtsize = new_omtsize;
//something was deleted, next leafentry is at same index.
}
else
idx++; //next leafentry is at next index
}
r = 0;
cleanup:
return r;
}
static int
upgrade_brtnode_nonleaf_10_11 (BRTNODE node) {
int i;
int r;
for (i=0; i<node->u.n.n_children; i++) {
FIFO fifo11;
FIFO fifo10 = BNC_BUFFER(node,i);
BNC_BUFFER(node,i) = NULL;
r = toku_fifo_create(&fifo11);
assert(r==0);
FIFO10_ITERATE(fifo10, keyp, keylen, valp, vallen, type, xid,
XIDS xids;
if (xid == 0)
xids = xids_get_root_xids();
else {
//Assume all transactions have no parents.
r = xids_create_child(xids_get_root_xids(), &xids, xid);
assert(r==0);
}
//Remove checksum contribution of this fifo_entry
node->local_fingerprint -= node->rand4fingerprint * calc_fingerprint_cmd10(type, xid, keyp, keylen, valp, vallen);
//Remove bytes_in_buf contribution of this fifo_entry
u_int32_t bytes10 = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD_10;
node->u.n.n_bytes_in_buffers -= bytes10;
BNC_NBYTESINBUF(node, i) -= bytes10;
//Add checksum contribution of the new fifo_entry
node->local_fingerprint += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, xids, keyp, keylen, valp, vallen);
//Add bytes_in_buf contribution of the new fifo_entry
u_int32_t bytes11 = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
node->u.n.n_bytes_in_buffers += bytes11;
BNC_NBYTESINBUF(node, i) += bytes11;
//Enqueue new fifo entry
r = toku_fifo_enq(fifo11, keyp, keylen, valp, vallen, type, xids);
assert(r==0);
xids_destroy(&xids);
);
toku_fifo10_free(&fifo10);
BNC_BUFFER(node,i) = fifo11;
fifo11 = NULL;
}
return 0;
}
// Structure of brtnode is same for versions 10, 11. The only difference is in the
// contents of the leafentries and the messages. For this reason, the outer structure
// of the brtnode is left in place (*brtnode_10 is reused.)
static int
upgrade_brtnode_10_11 (BRTNODE *brtnode_10, BRTNODE *brtnode_11) {
int r;
if ((*brtnode_10)->height>0)
r = upgrade_brtnode_nonleaf_10_11(*brtnode_10);
else
r = upgrade_brtnode_leaf_10_11(*brtnode_10);
*brtnode_11 = *brtnode_10;
*brtnode_10 = NULL;
(*brtnode_11)->dirty = 1;
return 0;
}
......@@ -6,12 +6,28 @@
#ifndef BACKWARD_10_H
#define BACKWARD_10_H
static int deserialize_brtheader_10 (int fd, struct rbuf *rb, brt_header_10 **brth);
int le10_committed (u_int32_t klen, void* kval, u_int32_t dlen, void* dval, u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result);
int le10_both (TXNID xid, u_int32_t cklen, void* ckval, u_int32_t cdlen, void* cdval, u_int32_t pdlen, void* pdval,
u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result);
int le10_provdel (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dval,
u_int32_t *resultsize, u_int32_t *memsize, LEAFENTRY *result);
int le10_provpair (TXNID xid, u_int32_t klen, void* kval, u_int32_t plen, void* pval, u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result);
static int upgrade_brtheader_10_11 (brt_header_10 **brth_10, brt_header_11 **brth_11);
enum le_state { LE_COMMITTED=1, // A committed pair.
LE_BOTH, // A committed pair and a provisional pair.
LE_PROVDEL, // A committed pair that has been provisionally deleted
LE_PROVPAIR }; // No committed value, but a provisional pair.
static int decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum);
static int deserialize_brtnode_from_rbuf_10 (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h, struct rbuf *rb);
static inline enum le_state get_le_state(LEAFENTRY le) {
return (enum le_state)*(unsigned char *)le;
}
#include "ule.h"
//Exposed ule functions for the purpose of upgrading
void toku_upgrade_ule_init_empty_ule(ULE ule, u_int32_t keylen, void * keyp);
void toku_upgrade_ule_remove_innermost_uxr(ULE ule);
void toku_upgrade_ule_push_insert_uxr(ULE ule, TXNID xid, u_int32_t vallen, void * valp);
void toku_upgrade_ule_push_delete_uxr(ULE ule, TXNID xid);
//Exposed brt functions for the purpose of upgrading
void toku_upgrade_maybe_bump_nkeys (BRTNODE node, u_int32_t idx, LEAFENTRY le, int direction);
#endif
......@@ -92,7 +92,7 @@ struct brtnode {
// When we checkpoint: Create a checkpoint record, and cause every dirty node to be written to disk. The new checkpoint record is *not* incorporated into the disk_lsn of the written nodes.
// While we are checkpointing, someone may modify a dirty node that has not yet been written. In that case, when we unpin the node, we make the new copy (because the disk_lsn<checkpoint_lsn), just as we would usually.
//
int layout_version; // What version of the data structure? (version 2 adds the xid to the brt cmds)
int layout_version; // What version of the data structure?
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
u_int32_t rand4fingerprint;
u_int32_t local_fingerprint; /* For leaves this is everything in the buffer. For nonleaves, this is everything in the buffers, but does not include child subtree fingerprints. */
......@@ -178,10 +178,6 @@ struct brt_header {
struct list zombie_brts;
};
typedef struct brt_header brt_header_10;
typedef struct brt_header brt_header_11;
struct brt {
CACHEFILE cf;
char *fname; // the filename
......
......@@ -7,6 +7,12 @@
#include "backwards_10.h"
// NOTE: The backwards compatability functions are in a file that is included at the END of this file.
static int deserialize_brtheader_10 (int fd, struct rbuf *rb, struct brt_header **brth);
static int upgrade_brtheader_10_11 (struct brt_header **brth_10, struct brt_header **brth_11);
static int decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum);
static int deserialize_brtnode_from_rbuf_10 (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h, struct rbuf *rb);
static int upgrade_brtnode_10_11 (BRTNODE *brtnode_10, BRTNODE *brtnode_11);
#if 0
static u_int64_t ntohll(u_int64_t v) {
......@@ -965,13 +971,27 @@ decompress_brtnode_from_raw_block_into_rbuf_versioned(u_int32_t version, u_int8_
static int
deserialize_brtnode_from_rbuf_versioned (u_int32_t version, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h, struct rbuf *rb) {
int r;
BRTNODE brtnode_10 = NULL;
BRTNODE brtnode_11 = NULL;
int upgrade = 0;
switch (version) {
case BRT_LAYOUT_VERSION_10:
r = deserialize_brtnode_from_rbuf_10(blocknum, fullhash, brtnode, h, rb);
break;
if (!upgrade)
r = deserialize_brtnode_from_rbuf_10(blocknum, fullhash, &brtnode_10, h, rb);
upgrade++;
if (r==0)
r = upgrade_brtnode_10_11(&brtnode_10, &brtnode_11);
//Fall through on purpose.
case BRT_LAYOUT_VERSION:
r = deserialize_brtnode_from_rbuf(blocknum, fullhash, brtnode, h, rb);
break;
if (!upgrade)
r = deserialize_brtnode_from_rbuf(blocknum, fullhash, &brtnode_11, h, rb);
if (r==0) {
assert(brtnode_11);
*brtnode = brtnode_11;
}
if (upgrade && r == 0) (*brtnode)->dirty = 1;
break; // this is the only break
default:
assert(FALSE);
}
......@@ -1397,8 +1417,8 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
static int
deserialize_brtheader_versioned (int fd, struct rbuf *rb, struct brt_header **brth, u_int32_t version) {
int rval;
brt_header_10 *brth_10 = NULL;
brt_header_11 *brth_11 = NULL;
struct brt_header *brth_10 = NULL;
struct brt_header *brth_11 = NULL;
int upgrade = 0;
switch(version) {
......@@ -1408,6 +1428,7 @@ deserialize_brtheader_versioned (int fd, struct rbuf *rb, struct brt_header **br
upgrade++;
if (rval == 0)
rval = upgrade_brtheader_10_11(&brth_10, &brth_11);
//Fall through on purpose.
case BRT_LAYOUT_VERSION:
if (!upgrade)
rval = deserialize_brtheader (fd, rb, &brth_11);
......
......@@ -1418,7 +1418,7 @@ brt_leaf_apply_cmd_once (BRTNODE node, BRT_CMD cmd,
u_int32_t size = leafentry_memsize(le);
// This mfree must occur after the mempool_malloc so that when the mempool is compressed everything is accounted for.
// But we must compute the size before doing the mempool malloc because otherwise the le pointer is no good.
// But we must compute the size before doing the mempool mfree because otherwise the le pointer is no good.
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since le may be no good any more.
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
......@@ -1428,7 +1428,7 @@ brt_leaf_apply_cmd_once (BRTNODE node, BRT_CMD cmd,
} else {
if (le) {
// It's there, note that it's gone and remove it from the mempool
// It was there, note that it's gone and remove it from the mempool
// Figure out if one of the other keys is the same key
maybe_bump_nkeys(node, idx, le, -1);
......@@ -4751,3 +4751,11 @@ toku_brt_note_table_lock (BRT brt, TOKUTXN txn)
}
return 0;
}
//Wrapper functions for upgrading from version 10.
#include "backwards_10.h"
void
toku_upgrade_maybe_bump_nkeys (BRTNODE node, u_int32_t idx, LEAFENTRY le, int direction) {
maybe_bump_nkeys(node, idx, le, direction);
}
......@@ -9,175 +9,6 @@ u_int32_t toku_le_crc(LEAFENTRY v) {
}
//TODO: #1125 delete function
static void *
le10_malloc(OMT omt, struct mempool *mp, size_t size, void **maybe_free)
{
if (omt)
return mempool_malloc_from_omt(omt, mp, size, maybe_free);
else
return toku_malloc(size);
}
//Constructors for version 10 leafentries, possibly needed for upgrades.
int
le10_committed (u_int32_t klen, void* kval, u_int32_t dlen, void* dval, u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result,
OMT omt, struct mempool *mp, void **maybe_free) {
size_t size = 9+klen+dlen;
unsigned char *lec=le10_malloc(omt, mp, size, maybe_free);
assert(lec);
lec[0] = LE_COMMITTED;
putint(lec+1, klen);
memcpy(lec+1+4, kval, klen);
putint(lec+1+4+klen, dlen);
memcpy(lec+1+4+klen+4, dval, dlen);
*resultsize=size;
*disksize = 1 + 4 + 4 + klen + dlen;
*result=(LEAFENTRY)lec;
return 0;
}
int
le10_both (TXNID xid, u_int32_t klen, void* kval, u_int32_t clen, void* cval, u_int32_t plen, void* pval,
u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result,
OMT omt, struct mempool *mp, void **maybe_free) {
size_t size = 1+8+4*3+klen+clen+plen;
unsigned char *lec=le10_malloc(omt, mp, size, maybe_free);
assert(lec);
lec[0] = LE_BOTH;
putint64(lec+1, xid);
putint (lec+1+8, klen);
memcpy (lec+1+8+4, kval, klen);
putint (lec+1+8+4+klen, clen);
memcpy (lec+1+8+4+klen+4, cval, clen);
putint (lec+1+8+4+klen+4+clen, plen);
memcpy (lec+1+8+4+klen+4+clen+4, pval, plen);
*resultsize=size;
*disksize = 1 + 8 + 4*3 + klen + clen + plen;
*result=(LEAFENTRY)lec;
return 0;
}
int
le10_provdel (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dval,
u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result,
OMT omt, struct mempool *mp, void **maybe_free) {
size_t size = 1 + 8 + 2*4 + klen + dlen;
unsigned char *lec= le10_malloc(omt, mp, size, maybe_free);
assert(lec);
lec[0] = LE_PROVDEL;
putint64(lec+1, xid);
putint (lec+1+8, klen);
memcpy (lec+1+8+4, kval, klen);
putint (lec+1+8+4+klen, dlen);
memcpy (lec+1+8+4+klen+4, dval, dlen);
*memsize=size;
*disksize = 1 + 4 + 4 + 8 + klen + dlen;
*result=(LEAFENTRY)lec;
return 0;
}
int
le10_provpair (TXNID xid, u_int32_t klen, void* kval, u_int32_t plen, void* pval, u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result,
OMT omt, struct mempool *mp, void **maybe_free) {
size_t size = 1 + 8 + 2*4 + klen + plen;
unsigned char *lec= le10_malloc(omt, mp, size, maybe_free);
assert(lec);
lec[0] = LE_PROVPAIR;
putint64(lec+1, xid);
putint (lec+1+8, klen);
memcpy (lec+1+8+4, kval, klen);
putint (lec+1+8+4+klen, plen);
memcpy (lec+1+8+4+klen+4, pval, plen);
*memsize=size;
*disksize = 1 + 4 + 4 + 8 + klen + plen;
*result=(LEAFENTRY)lec;
return 0;
}
#if 0 //Needed for upgrade (probably)
//TODO: #1125 FUNCTION NEEDED for upgrading?
static u_int32_t memsize_le10_committed (u_int32_t keylen, void *key __attribute__((__unused__)),
u_int32_t vallen, void *val __attribute__((__unused__))) {
return 1+ 2*4 + keylen + vallen;
}
//TODO: #1125 FUNCTION NEEDED for upgrading?
static u_int32_t memsize_le10_both (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t clen, void *cval __attribute__((__unused__)),
u_int32_t plen, void *pval __attribute__((__unused__))) {
return 1 + 8 + 4*3 + klen + clen + plen;
}
//TODO: #1125 FUNCTION NEEDED for upgrading?
static u_int32_t memsize_le10_provdel (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t clen, void *cval __attribute__((__unused__))) {
return 1 + 8 + 4*2 + klen + clen;
}
//TODO: #1125 FUNCTION NEEDED for upgrading?
static u_int32_t memsize_le10_provpair (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t plen, void *pval __attribute__((__unused__))) {
return 1 + 8 + 4*2 + klen + plen;
}
u_int32_t leafentry_memsize_10 (LEAFENTRY le) {
LESWITCHCALL(le, memsize);
abort(); return 0; // make certain compilers happy
}
//TODO: #1125 FUNCTION NEEDED for upgrading?
static u_int32_t disksize_le10_committed (u_int32_t keylen, void *key __attribute__((__unused__)),
u_int32_t vallen, void *val __attribute__((__unused__))) {
return 1 + 4 + 4 + keylen + vallen;
}
//TODO: #1125 FUNCTION NEEDED for upgrading?
static u_int32_t disksize_le10_both (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t clen, void *cval __attribute__((__unused__)),
u_int32_t plen, void *pval __attribute__((__unused__))) {
return 1 + 8 + 4*3 + klen + clen + plen;
}
//TODO: #1125 FUNCTION NEEDED for upgrading?
static u_int32_t disksize_le10_provdel (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t clen, void *cval __attribute__((__unused__))) {
return 1 + 8 + 4 + 4 + klen + clen;
}
//TODO: #1125 FUNCTION NEEDED for upgrading?
static u_int32_t disksize_le10_provpair (TXNID txnid __attribute__((__unused__)),
u_int32_t klen, void *kval __attribute__((__unused__)),
u_int32_t plen, void *pval __attribute__((__unused__))) {
return 1 + 8 + 4 + 4 + klen + plen;
}
//TODO: #1125 FUNCTION NEEDED for upgrading?
static u_int32_t
le10_disksize_internal (LEAFENTRY le) {
LESWITCHCALL(le, disksize);
abort(); return 0; // make certain compilers happy
}
//TODO: #1125 FUNCTION NEEDED for upgrading?
u_int32_t le10_disksize (LEAFENTRY le) {
u_int32_t d = le10_disksize_internal(le);
#if 0
// this computation is currently identical to the _disksize_internal
u_int32_t m = le10_memsize(le);
assert(m==d);
#endif
return d;
}
#endif
void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le) {
wbuf_literal_bytes(w, le, leafentry_disksize(le));
}
......
......@@ -85,108 +85,6 @@ typedef struct leafentry *LEAFENTRY;
u_int32_t toku_le_crc(LEAFENTRY v);
//TODO: #1125 next four probably are not necessary once testing for new structure is done (except possibly for test-leafentry.c, rename to test-leafentry10.c
int le10_committed (u_int32_t klen, void* kval, u_int32_t dlen, void* dval, u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result,
OMT, struct mempool *, void **maybe_free);
int le10_both (TXNID xid, u_int32_t cklen, void* ckval, u_int32_t cdlen, void* cdval, u_int32_t pdlen, void* pdval,
u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result,
OMT, struct mempool *, void **maybe_free);
int le10_provdel (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dval,
u_int32_t *resultsize, u_int32_t *memsize, LEAFENTRY *result,
OMT, struct mempool *, void **maybe_free);
int le10_provpair (TXNID xid, u_int32_t klen, void* kval, u_int32_t plen, void* pval, u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result,
OMT omt, struct mempool *mp, void **maybe_free);
enum le_state { LE_COMMITTED=1, // A committed pair.
LE_BOTH, // A committed pair and a provisional pair.
LE_PROVDEL, // A committed pair that has been provisionally deleted
LE_PROVPAIR }; // No committed value, but a provisional pair.
static inline enum le_state get_le_state(LEAFENTRY le) {
return (enum le_state)*(unsigned char *)le;
}
static inline void putint (unsigned char *p, u_int32_t i) {
#if 1
*(u_int32_t*)p = toku_htod32(i);
#else
p[0]=(i>>24)&0xff;
p[1]=(i>>16)&0xff;
p[2]=(i>> 8)&0xff;
p[3]=(i>> 0)&0xff;
#endif
}
static inline void putint64 (unsigned char *p, u_int64_t i) {
putint(p, (u_int32_t)(i>>32));
putint(p+4, (u_int32_t)(i&0xffffffff));
}
static inline u_int32_t getint (unsigned char *p) {
#if 1
return toku_dtoh32(*(u_int32_t*)p);
#else
return (p[0]<<24)+(p[1]<<16)+(p[2]<<8)+(p[3]);
#endif
}
static inline u_int64_t getint64 (unsigned char *p) {
u_int64_t H = getint(p);
u_int64_t L = getint(p+4);
return (H<<32) + L;
}
// This ugly factorization of the macro is done so that we can do ## or not depending on which version of the
// compiler we are using, without repeating all this crufty offset calculation.
#define DO_LE_COMMITTED(funname,le) case LE_COMMITTED: { \
unsigned char* __klenaddr = 1+(unsigned char*)le; u_int32_t __klen = getint(__klenaddr); \
unsigned char* __kvaladdr = 4 + __klenaddr; \
unsigned char* __clenaddr = __klen + __kvaladdr; u_int32_t __clen = getint(__clenaddr); \
unsigned char* __cvaladdr = 4 + __clenaddr; \
return funname ## _le10_committed(__klen, __kvaladdr, __clen, __cvaladdr
#define DO_LE_BOTH(funname,le) case LE_BOTH: { \
unsigned char* __xidaddr = 1+(unsigned char*)le; u_int64_t __xid = getint64(__xidaddr); \
unsigned char* __klenaddr = 8 + __xidaddr; u_int32_t __klen = getint(__klenaddr); \
unsigned char* __kvaladdr = 4 + __klenaddr; \
unsigned char* __clenaddr = __klen + __kvaladdr; u_int32_t __clen = getint(__clenaddr); \
unsigned char* __cvaladdr = 4 + __clenaddr; \
unsigned char* __plenaddr = __clen + __cvaladdr; u_int32_t __plen = getint(__plenaddr); \
unsigned char* __pvaladdr = 4 + __plenaddr; \
return funname ## _le10_both(__xid, __klen, __kvaladdr, __clen, __cvaladdr, __plen, __pvaladdr
#define DO_LE_PROVDEL(funname,le ) case LE_PROVDEL: { \
unsigned char* __xidaddr = 1+(unsigned char*)le; u_int64_t __xid = getint64(__xidaddr); \
unsigned char* __klenaddr = 8 + __xidaddr; u_int32_t __klen = getint(__klenaddr); \
unsigned char* __kvaladdr = 4 + __klenaddr; \
unsigned char* __dlenaddr = __klen + __kvaladdr; u_int32_t __dlen = getint(__dlenaddr); \
unsigned char* __dvaladdr = 4 + __dlenaddr; \
return funname ## _le10_provdel(__xid, __klen, __kvaladdr, __dlen, __dvaladdr
#define DO_LE_PROVPAIR(funname,le) case LE_PROVPAIR: { \
unsigned char* __xidaddr = 1+(unsigned char*)le; u_int64_t __xid = getint64(__xidaddr); \
unsigned char* __klenaddr = 8 + __xidaddr; u_int32_t __klen = getint(__klenaddr); \
unsigned char* __kvaladdr = 4 + __klenaddr; \
unsigned char* __plenaddr = __klen + __kvaladdr; u_int32_t __plen = getint(__plenaddr); \
unsigned char* __pvaladdr = 4 + __plenaddr; \
return funname ## _le10_provpair(__xid, __klen, __kvaladdr, __plen, __pvaladdr
#ifdef __ICL
#define LESWITCHCALL(le,funname, ...) do { \
switch(get_le_state(le)) { \
DO_LE_COMMITTED(funname,le) , __VA_ARGS__); } \
DO_LE_BOTH (funname,le) , __VA_ARGS__); } \
DO_LE_PROVDEL (funname,le) , __VA_ARGS__); } \
DO_LE_PROVPAIR (funname,le) , __VA_ARGS__); } \
} abort(); } while (0)
#else
#define LESWITCHCALL(le,funname, ...) do { \
switch(get_le_state(le)) { \
DO_LE_COMMITTED(funname,le) , ## __VA_ARGS__); } \
DO_LE_BOTH (funname,le) , ## __VA_ARGS__); } \
DO_LE_PROVDEL (funname,le) , ## __VA_ARGS__); } \
DO_LE_PROVPAIR (funname,le) , ## __VA_ARGS__); } \
} abort(); } while (0)
#endif
size_t leafentry_memsize (LEAFENTRY le); // the size of a leafentry in memory.
size_t leafentry_disksize (LEAFENTRY le); // this is the same as logsizeof_LEAFENTRY. The size of a leafentry on disk.
void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
......
......@@ -4,6 +4,7 @@
#include "test.h"
#include "brttypes.h"
#include "includes.h"
#include "backwards_10.h"
static char
int32_get_char(u_int32_t i, int which) {
......@@ -19,7 +20,7 @@ static void test_leafentry_1 (void) {
LEAFENTRY l;
int r;
u_int32_t msize, dsize;
r = le10_committed(4, "abc", 3, "xy", &msize, &dsize, &l, 0, 0, 0);
r = le10_committed(4, "abc", 3, "xy", &msize, &dsize, &l);
assert(r==0);
char expect[] = {LE_COMMITTED,
UINT32TOCHAR(4),
......@@ -36,7 +37,7 @@ static void test_leafentry_2 (void) {
LEAFENTRY l;
int r;
u_int32_t msize, dsize;
r = le10_both(0x0123456789abcdef0LL, 3, "ab", 4, "xyz", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
r = le10_both(0x0123456789abcdef0LL, 3, "ab", 4, "xyz", 5, "lmno", &msize, &dsize, &l);
assert(r==0);
char expect[] = {LE_BOTH,
UINT64TOCHAR(0x0123456789abcdef0LL),
......@@ -53,7 +54,7 @@ static void test_leafentry_3 (void) {
LEAFENTRY l;
int r;
u_int32_t msize, dsize;
r = le10_provdel(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
r = le10_provdel(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l);
assert(r==0);
char expect[] = {LE_PROVDEL,
UINT64TOCHAR(0x0123456789abcdef0LL),
......@@ -69,7 +70,7 @@ static void test_leafentry_4 (void) {
LEAFENTRY l;
int r;
u_int32_t msize, dsize;
r = le10_provpair(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
r = le10_provpair(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l);
assert(r==0);
char expect[] = {LE_PROVPAIR,
UINT64TOCHAR(0x0123456789abcdef0LL),
......@@ -101,7 +102,7 @@ static void test_leafentry_3long (void) {
LEAFENTRY l;
int r;
u_int32_t msize, dsize;
r = le10_provdel(0x0123456789abcdef0LL, 301, zeros, 1025, zeros, &msize, &dsize, &l, 0, 0, 0);
r = le10_provdel(0x0123456789abcdef0LL, 301, zeros, 1025, zeros, &msize, &dsize, &l);
assert(r==0);
assert(sizeof(expect_3long)==msize);
assert(msize==dsize);
......
......@@ -79,51 +79,6 @@ static inline BOOL uxr_is_delete(UXR uxr);
static inline BOOL uxr_is_placeholder(UXR uxr);
///////////// TEMP TEMP TEMP TEMP
///////////// scaffolding/upgrading begins here
///////////// Some of this code may be used to upgrade an old database to our new version.
//
// le_unpack_le_* functions are throwaway code as part of phase 1 (temp
// scaffolding)
//
#if 0
static void le_unpack_le10_committed(u_int32_t klen, void *kval, u_int32_t vallen, void *val, ULE ule) {
//Committed value
ule_init_empty_ule(ule, klen, kval);
ule_remove_innermost_uxr(ule); // pop committed delete
ule_push_insert_uxr(ule, 0, vallen, val);
}
static void le_unpack_le10_both(TXNID xid, u_int32_t klen, void *kval, u_int32_t clen, void *cval, u_int32_t plen, void *pval, ULE ule) {
//committed value and provisional insert
ule_init_empty_ule(ule, klen, kval);
ule_remove_innermost_uxr(ule); // pop committed delete
ule_push_insert_uxr(ule, 0, clen, cval); // push committed
ule_push_insert_uxr(ule, xid, plen, pval); // push provisional
}
static void le_unpack_le10_provdel(TXNID xid, u_int32_t klen, void *kval, u_int32_t clen, void *cval, ULE ule) {
//committed value and provisional delete
ule_init_empty_ule(ule, klen, kval);
ule_remove_innermost_uxr(ule); // pop committed delete
ule_push_insert_uxr(ule, 0, clen, cval); // push committed
ule_push_delete_uxr(ule, xid); // push provisional
}
static void le_unpack_le10_provpair(TXNID xid, u_int32_t klen, void *kval, u_int32_t plen, void *pval, ULE ule) {
//committed delete and provisional insert
ule_init_empty_ule(ule, klen, kval);
ule_push_insert_uxr(ule, xid, plen, pval); // push provisional
}
//Used to unpack a version 10 record to ule, which can be packed to version 11.
static void UU()
le_unpack_from_version_10(ULE ule, LEAFENTRY le) {
LESWITCHCALL(le, le_unpack, ule);
}
#endif
static void *
le_malloc(OMT omt, struct mempool *mp, size_t size, void **maybe_free)
{
......@@ -1529,5 +1484,25 @@ bool transaction_open(TXNID xid) {
#endif
// Wrapper code to support backwards compatibility with version 10 (until we don't want it).
// These wrappers should be removed if/when we remove support for version 10 leafentries.
#include "backwards_10.h"
void
toku_upgrade_ule_init_empty_ule(ULE ule, u_int32_t keylen, void * keyp) {
ule_init_empty_ule(ule, keylen, keyp);
}
void
toku_upgrade_ule_remove_innermost_uxr(ULE ule) {
ule_remove_innermost_uxr(ule);
}
void
toku_upgrade_ule_push_insert_uxr(ULE ule, TXNID xid, u_int32_t vallen, void * valp) {
ule_push_insert_uxr(ule, xid, vallen, valp);
}
void
toku_upgrade_ule_push_delete_uxr(ULE ule, TXNID xid) {
ule_push_delete_uxr(ule, xid);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment