Commit ae18f598 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Add XID (Addresses #242), and also fix the problem where deletes were sneaking...

Add XID (Addresses #242), and also fix the problem where deletes were sneaking around inserts (Fixes #332.)

git-svn-id: file:///svn/tokudb@2098 c7de825b-a66e-492c-adef-691d508d4ae1
parent 962d535c
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage # GCOV_FLAGS = -fprofile-arcs -ftest-coverage
# PROF_FLAGS = -pg # PROF_FLAGS = -pg
OPTFLAGS = -O2 # OPTFLAGS = -O2
ifeq ($(VERBOSE),2) ifeq ($(VERBOSE),2)
VERBVERBOSE=-v VERBVERBOSE=-v
......
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
enum { TREE_FANOUT = BRT_FANOUT }; enum { TREE_FANOUT = BRT_FANOUT };
enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */ enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */
enum { PMA_ITEM_OVERHEAD = 4 }; enum { PMA_ITEM_OVERHEAD = 4 };
enum { BRT_CMD_OVERHEAD = 1 }; enum { BRT_CMD_OVERHEAD = (1 // the type
+ 8) // the xid
};
enum { BRT_DEFAULT_NODE_SIZE = 1 << 20 }; enum { BRT_DEFAULT_NODE_SIZE = 1 << 20 };
struct nodeheader_in_file { struct nodeheader_in_file {
...@@ -56,7 +58,7 @@ struct brtnode { ...@@ -56,7 +58,7 @@ struct brtnode {
// When we checkpoint: Create a checkpoint record, and cause every dirty node to be written to disk. The new checkpoint record is *not* incorporated into the disk_lsn of the written nodes. // When we checkpoint: Create a checkpoint record, and cause every dirty node to be written to disk. The new checkpoint record is *not* incorporated into the disk_lsn of the written nodes.
// While we are checkpointing, someone may modify a dirty node that has not yet been written. In that case, when we unpin the node, we make the new copy (because the disk_lsn<checkpoint_lsn), just as we would usually. // While we are checkpointing, someone may modify a dirty node that has not yet been written. In that case, when we unpin the node, we make the new copy (because the disk_lsn<checkpoint_lsn), just as we would usually.
// //
int layout_version; // What version of the data structure? int layout_version; // What version of the data structure? (version 2 adds the xid to the brt cmds)
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */ int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
u_int32_t rand4fingerprint; u_int32_t rand4fingerprint;
u_int32_t local_fingerprint; /* For leaves this is everything in the buffer. For nonleaves, this is everything in the buffers, but does not include child subtree fingerprints. */ u_int32_t local_fingerprint; /* For leaves this is everything in the buffer. For nonleaves, this is everything in the buffers, but does not include child subtree fingerprints. */
...@@ -159,6 +161,7 @@ enum brt_cmd_type { ...@@ -159,6 +161,7 @@ enum brt_cmd_type {
/* tree commands */ /* tree commands */
struct brt_cmd { struct brt_cmd {
enum brt_cmd_type type; enum brt_cmd_type type;
TXNID xid;
union { union {
/* insert or delete */ /* insert or delete */
struct brt_cmd_insert_delete { struct brt_cmd_insert_delete {
...@@ -185,8 +188,8 @@ extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt); ...@@ -185,8 +188,8 @@ extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt);
static const BRTNODE null_brtnode=0; static const BRTNODE null_brtnode=0;
extern u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen); extern u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen);
extern u_int32_t toku_calccrc32_cmd (int type, const void *key, int keylen, const void *val, int vallen); extern u_int32_t toku_calccrc32_cmd (int type, TXNID xid, const void *key, int keylen, const void *val, int vallen);
extern u_int32_t toku_calccrc32_cmdstruct (BRT_CMD_S *cmd); extern u_int32_t toku_calccrc32_cmdstruct (BRT_CMD cmd);
// How long is the pivot key? // How long is the pivot key?
unsigned int toku_brt_pivot_key_len (BRT, struct kv_pair *); // Given the tree unsigned int toku_brt_pivot_key_len (BRT, struct kv_pair *); // Given the tree
......
...@@ -26,7 +26,7 @@ static void test_serialize(void) { ...@@ -26,7 +26,7 @@ static void test_serialize(void) {
sn.thisnodename = sn.nodesize*20; sn.thisnodename = sn.nodesize*20;
sn.disk_lsn.lsn = 789; sn.disk_lsn.lsn = 789;
sn.log_lsn.lsn = 123456; sn.log_lsn.lsn = 123456;
sn.layout_version = 1; sn.layout_version = 2;
sn.height = 1; sn.height = 1;
sn.rand4fingerprint = randval; sn.rand4fingerprint = randval;
sn.local_fingerprint = 0; sn.local_fingerprint = 0;
...@@ -40,9 +40,9 @@ static void test_serialize(void) { ...@@ -40,9 +40,9 @@ static void test_serialize(void) {
BNC_SUBTREE_FINGERPRINT(&sn, 1) = random(); BNC_SUBTREE_FINGERPRINT(&sn, 1) = random();
r = toku_fifo_create(&BNC_BUFFER(&sn,0)); assert(r==0); r = toku_fifo_create(&BNC_BUFFER(&sn,0)); assert(r==0);
r = toku_fifo_create(&BNC_BUFFER(&sn,1)); assert(r==0); r = toku_fifo_create(&BNC_BUFFER(&sn,1)); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "a", 2, "aval", 5); r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, (TXNID)0); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, (TXNID)0, "a", 2, "aval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "b", 2, "bval", 5); r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, (TXNID)123); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, (TXNID)123, "b", 2, "bval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "x", 2, "xval", 5); r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, (TXNID)234); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, (TXNID)234, "x", 2, "xval", 5);
BNC_NBYTESINBUF(&sn, 0) = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5); BNC_NBYTESINBUF(&sn, 0) = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5); BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
{ {
...@@ -59,7 +59,7 @@ static void test_serialize(void) { ...@@ -59,7 +59,7 @@ static void test_serialize(void) {
assert(dn->thisnodename==nodesize*20); assert(dn->thisnodename==nodesize*20);
assert(dn->disk_lsn.lsn==123456); assert(dn->disk_lsn.lsn==123456);
assert(dn->layout_version ==1); assert(dn->layout_version ==2);
assert(dn->height == 1); assert(dn->height == 1);
assert(dn->rand4fingerprint==randval); assert(dn->rand4fingerprint==randval);
assert(dn->u.n.n_children==2); assert(dn->u.n.n_children==2);
......
...@@ -49,7 +49,7 @@ static unsigned int toku_serialize_brtnode_size_slow(BRTNODE node) { ...@@ -49,7 +49,7 @@ static unsigned int toku_serialize_brtnode_size_slow(BRTNODE node) {
FIFO_ITERATE(BNC_BUFFER(node,i), FIFO_ITERATE(BNC_BUFFER(node,i),
key __attribute__((__unused__)), keylen, key __attribute__((__unused__)), keylen,
data __attribute__((__unused__)), datalen, data __attribute__((__unused__)), datalen,
type __attribute__((__unused__)), type __attribute__((__unused__)), xid __attribute__((__unused__)),
(hsize+=BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+keylen+datalen)); (hsize+=BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+keylen+datalen));
} }
assert(hsize==node->u.n.n_bytes_in_buffers); assert(hsize==node->u.n.n_bytes_in_buffers);
...@@ -154,12 +154,13 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node) ...@@ -154,12 +154,13 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node)
for (i=0; i< n_buffers; i++) { for (i=0; i< n_buffers; i++) {
//printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i])); //printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i]));
wbuf_int(&w, toku_fifo_n_entries(BNC_BUFFER(node,i))); wbuf_int(&w, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
({ ({
wbuf_char(&w, type); wbuf_char(&w, type);
wbuf_TXNID(&w, xid);
wbuf_bytes(&w, key, keylen); wbuf_bytes(&w, key, keylen);
wbuf_bytes(&w, data, datalen); wbuf_bytes(&w, data, datalen);
check_local_fingerprint+=node->rand4fingerprint*toku_calccrc32_cmd(type, key, keylen, data, datalen); check_local_fingerprint+=node->rand4fingerprint*toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
})); }));
} }
//printf("%s:%d check_local_fingerprint=%8x\n", __FILE__, __LINE__, check_local_fingerprint); //printf("%s:%d check_local_fingerprint=%8x\n", __FILE__, __LINE__, check_local_fingerprint);
...@@ -257,7 +258,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl ...@@ -257,7 +258,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
} }
} }
result->layout_version = rbuf_int(&rc); result->layout_version = rbuf_int(&rc);
if (result->layout_version!=1) { if (result->layout_version!=2) {
r=DB_BADFORMAT; r=DB_BADFORMAT;
goto died1; goto died1;
} }
...@@ -337,17 +338,17 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl ...@@ -337,17 +338,17 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
//printf("%d in hash\n", n_in_hash); //printf("%d in hash\n", n_in_hash);
for (i=0; i<n_in_this_hash; i++) { for (i=0; i<n_in_this_hash; i++) {
int diff; int diff;
int type;
bytevec key; ITEMLEN keylen; bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen; bytevec val; ITEMLEN vallen;
toku_verify_counts(result); toku_verify_counts(result);
type = rbuf_char(&rc); int type = rbuf_char(&rc);
TXNID xid = rbuf_ulonglong(&rc);
rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */ rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(&rc, &val, &vallen); rbuf_bytes(&rc, &val, &vallen);
check_local_fingerprint += result->rand4fingerprint * toku_calccrc32_cmd(type, key, keylen, val, vallen); check_local_fingerprint += result->rand4fingerprint * toku_calccrc32_cmd(type, xid, key, keylen, val, vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val); //printf("Found %s,%s\n", (char*)key, (char*)val);
{ {
r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type); /* Copies the data into the hash table. */ r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type, xid); /* Copies the data into the hash table. */
if (r!=0) { goto died_12; } if (r!=0) { goto died_12; }
} }
diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
......
...@@ -2287,6 +2287,7 @@ static void test_brt_delete() { ...@@ -2287,6 +2287,7 @@ static void test_brt_delete() {
test_brt_delete_cursor_first(100); toku_memory_check_all_free(); test_brt_delete_cursor_first(100); toku_memory_check_all_free();
test_brt_delete_cursor_first(500); toku_memory_check_all_free(); test_brt_delete_cursor_first(500); toku_memory_check_all_free();
test_brt_delete_cursor_first(10000); toku_memory_check_all_free(); test_brt_delete_cursor_first(10000); toku_memory_check_all_free();
test_insert_delete_lookup(2); toku_memory_check_all_free();
test_insert_delete_lookup(512); toku_memory_check_all_free(); test_insert_delete_lookup(512); toku_memory_check_all_free();
} }
...@@ -2831,6 +2832,7 @@ static void brt_blackbox_test (void) { ...@@ -2831,6 +2832,7 @@ static void brt_blackbox_test (void) {
int main (int argc , const char *argv[]) { int main (int argc , const char *argv[]) {
default_parse_args(argc, argv); default_parse_args(argc, argv);
brt_blackbox_test(); brt_blackbox_test();
toku_malloc_cleanup(); toku_malloc_cleanup();
if (verbose) printf("test ok\n"); if (verbose) printf("test ok\n");
......
...@@ -24,9 +24,9 @@ static void verify_local_fingerprint (BRTNODE node) { ...@@ -24,9 +24,9 @@ static void verify_local_fingerprint (BRTNODE node) {
int i; int i;
if (node->height>0) { if (node->height>0) {
for (i=0; i<node->u.n.n_children; i++) for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
({ ({
fp += node->rand4fingerprint * toku_calccrc32_cmd(type, key, keylen, data, datalen); fp += node->rand4fingerprint * toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
})); }));
assert(fp==node->local_fingerprint); assert(fp==node->local_fingerprint);
} else { } else {
...@@ -69,6 +69,7 @@ int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, b ...@@ -69,6 +69,7 @@ int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, b
bytevec data __attribute__((__unused__)), bytevec data __attribute__((__unused__)),
unsigned int datalen __attribute__((__unused__)), unsigned int datalen __attribute__((__unused__)),
int type __attribute__((__unused__)), int type __attribute__((__unused__)),
TXNID xid __attribute__((__unused__)),
void *ignore __attribute__((__unused__))) { void *ignore __attribute__((__unused__))) {
if (thislorange) assert(toku_keycompare(thislorange,thislolen,key,keylen)<0); if (thislorange) assert(toku_keycompare(thislorange,thislolen,key,keylen)<0);
if (thishirange && toku_keycompare(key,keylen,thishirange,thishilen)>0) { if (thishirange && toku_keycompare(key,keylen,thishirange,thishilen)>0) {
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
* *
*/ */
#include <arpa/inet.h>
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#include <inttypes.h> #include <inttypes.h>
...@@ -254,7 +255,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height) ...@@ -254,7 +255,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
n->thisnodename = nodename; n->thisnodename = nodename;
n->disk_lsn.lsn = 0; // a new one can always be 0. n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn; n->log_lsn = n->disk_lsn;
n->layout_version = 1; n->layout_version = 2;
n->height = height; n->height = height;
n->rand4fingerprint = random(); n->rand4fingerprint = random();
n->local_fingerprint = 0; n->local_fingerprint = 0;
...@@ -308,11 +309,11 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height, TOKUTXN txn) ...@@ -308,11 +309,11 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height, TOKUTXN txn)
toku_update_brtnode_lsn(n, txn); toku_update_brtnode_lsn(n, txn);
} }
static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v, int type) { static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v, int type, TXNID xid) {
unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + k->size + v->size; unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + k->size + v->size;
int r = toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type); int r = toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, xid);
if (r!=0) return r; if (r!=0) return r;
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_cmd(type, k->data, k->size, v->data, v->size); node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_cmd(type, xid, k->data, k->size, v->data, v->size);
BNC_NBYTESINBUF(node,childnum) += n_bytes_added; BNC_NBYTESINBUF(node,childnum) += n_bytes_added;
node->u.n.n_bytes_in_buffers += n_bytes_added; node->u.n.n_bytes_in_buffers += n_bytes_added;
node->dirty = 1; node->dirty = 1;
...@@ -392,21 +393,22 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node ...@@ -392,21 +393,22 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
bytevec key, data; bytevec key, data;
unsigned int keylen, datalen; unsigned int keylen, datalen;
int type; int type;
int fr = toku_fifo_peek(from_htab, &key, &keylen, &data, &datalen, &type); TXNID xid;
int fr = toku_fifo_peek(from_htab, &key, &keylen, &data, &datalen, &type, &xid);
if (fr!=0) break; if (fr!=0) break;
int n_bytes_moved = keylen+datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; int n_bytes_moved = keylen+datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
BYTESTRING keybs = { .len = keylen, .data = (char*)key }; BYTESTRING keybs = { .len = keylen, .data = (char*)key };
BYTESTRING databs = { .len = datalen, .data = (char*)data }; BYTESTRING databs = { .len = datalen, .data = (char*)data };
u_int32_t old_from_fingerprint = node->local_fingerprint; u_int32_t old_from_fingerprint = node->local_fingerprint;
u_int32_t old_to_fingerprint = B->local_fingerprint; u_int32_t old_to_fingerprint = B->local_fingerprint;
u_int32_t delta = toku_calccrc32_cmd(type, key, keylen, data, datalen); u_int32_t delta = toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta; u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
u_int32_t new_to_fingerprint = old_to_fingerprint + B->rand4fingerprint *delta; u_int32_t new_to_fingerprint = old_to_fingerprint + B->rand4fingerprint *delta;
if (r!=0) return r; if (r!=0) return r;
r = toku_log_brtdeq(txn, txnid, fnum, node->thisnodename, n_children_in_a, type, keybs, databs, old_from_fingerprint, new_from_fingerprint); r = toku_log_brtdeq(txn, xid, fnum, node->thisnodename, n_children_in_a, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
if (r!=0) return r; if (r!=0) return r;
r = toku_log_brtenq(txn, txnid, fnum, B->thisnodename, targchild, type, keybs, databs, old_to_fingerprint, new_to_fingerprint); r = toku_log_brtenq(txn, xid, fnum, B->thisnodename, targchild, type, keybs, databs, old_to_fingerprint, new_to_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type); r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
if (r!=0) return r; if (r!=0) return r;
toku_fifo_deq(from_htab); toku_fifo_deq(from_htab);
// key and data will no longer be valid // key and data will no longer be valid
...@@ -525,7 +527,7 @@ static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRT ...@@ -525,7 +527,7 @@ static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRT
if (r!=0) return r; if (r!=0) return r;
assert(again_split==0); /* I only did the insert if I knew it wouldn't push down, and hence wouldn't split. */ assert(again_split==0); /* I only did the insert if I knew it wouldn't push down, and hence wouldn't split. */
} else { } else {
r=insert_to_buffer_in_nonleaf(node, childnum_of_node, k, v, cmd->type); r=insert_to_buffer_in_nonleaf(node, childnum_of_node, k, v, cmd->type, cmd->xid);
} }
fixup_child_fingerprint(node, childnum_of_node, child, t, txn); fixup_child_fingerprint(node, childnum_of_node, child, t, txn);
return r; return r;
...@@ -576,7 +578,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE ...@@ -576,7 +578,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE
static int split_count=0; static int split_count=0;
/* NODE is a node with a child. /* NODE is a node with a child.
* childnum was split into two nodes childa, and childb. * childnum was split into two nodes childa, and childb. childa is the same as the original child. childb is a new child.
* We must slide things around, & move things from the old table to the new tables. * We must slide things around, & move things from the old table to the new tables.
* We also move things to the new children as much as we can without doing any pushdowns or splitting of the child. * We also move things to the new children as much as we can without doing any pushdowns or splitting of the child.
* We must delete the old buffer (but the old child is already deleted.) * We must delete the old buffer (but the old child is already deleted.)
...@@ -623,8 +625,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -623,8 +625,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BNC_NBYTESINBUF(node, childnum+1) = 0; BNC_NBYTESINBUF(node, childnum+1) = 0;
// Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child. // Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child.
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid,
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_cmd(type, skey, skeylen, sval, svallen)); node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_cmd(type, xid, skey, skeylen, sval, svallen));
// Slide the keys over // Slide the keys over
{ {
...@@ -653,40 +655,44 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -653,40 +655,44 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */ node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */
/* Keep pushing to the children, but not if the children would require a pushdown */ /* Keep pushing to the children, but not if the children would require a pushdown */
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, ({ FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid, ({
DBT skd, svd; DBT skd, svd;
toku_fill_dbt(&skd, skey, skeylen); BRT_CMD_S brtcmd = { type, xid, .u.id= {toku_fill_dbt(&skd, skey, skeylen),
toku_fill_dbt(&svd, sval, svallen); toku_fill_dbt(&svd, sval, svallen)} };
BRT_CMD_S brtcmd;
brtcmd.type = type; brtcmd.u.id.key = &skd; brtcmd.u.id.val = &svd;
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb); //verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
int tochildnum = childnum; int tochildnum;
BRTNODE tochild = childa; BRTNODE tochild;
if (type == BRT_INSERT || type == BRT_DELETE_BOTH) { switch (type) {
int cmp = brt_compare_pivot(t, &skd, &svd, childsplitk->data); case BRT_INSERT:
if (cmp > 0) { case BRT_DELETE_BOTH:
tochildnum = childnum+1; tochild = childb; case BRT_DELETE:
} //case BRT_DELETE:
} {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, tochild, &brtcmd, tochildnum, txn); int cmp = brt_compare_pivot(t, &skd, &svd, childsplitk->data);
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb); if (cmp > 0) {
if (type == BRT_DELETE) { tochildnum = childnum+1; tochild = childb;
int r2 = push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &brtcmd, childnum+1, txn); } else {
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb); tochildnum = childnum; tochild = childa;
if (r2!=0) { }
// In this case we must put things from old_h into the new buffers.
// This code is wrong, so I'll abort.
abort();
return r2;
} }
} goto ok;
if (r!=0) { case BRT_NONE:
// In this case we must put things from old_h into the new buffers. // Don't have to do anything in this case, can just drop the command
// This code is wrong, so I'll abort. goto ok;
abort(); }
return r; printf("Bad type %d\n", type); // Don't use default: because I want a compiler warning if I forget a enum case, and I want a runtime error if the type isn't one of the expected ones.
assert(0);
ok:
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,tochildnum))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, tochild, &brtcmd, tochildnum, txn);
} else {
r=insert_to_buffer_in_nonleaf(node, tochildnum, &skd, &svd, type, xid);
} }
})); //verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (r!=0) printf("r=%d\n", r);
assert(r==0);
}));
toku_fifo_free(&old_h); toku_fifo_free(&old_h);
...@@ -769,17 +775,14 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum, ...@@ -769,17 +775,14 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
//printf("%s:%d Try random_pick, weight=%d \n", __FILE__, __LINE__, BNC_NBYTESINBUF(node, childnum)); //printf("%s:%d Try random_pick, weight=%d \n", __FILE__, __LINE__, BNC_NBYTESINBUF(node, childnum));
assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))>0); assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))>0);
int type; int type;
while(0==toku_fifo_peek(BNC_BUFFER(node,childnum), &key, &keylen, &val, &vallen, &type)) { TXNID xid;
while(0==toku_fifo_peek(BNC_BUFFER(node,childnum), &key, &keylen, &val, &vallen, &type, &xid)) {
int child_did_split=0; BRTNODE childa, childb; int child_did_split=0; BRTNODE childa, childb;
DBT hk,hv; DBT hk,hv;
DBT childsplitk; DBT childsplitk;
BRT_CMD_S brtcmd;
toku_fill_dbt(&hk, key, keylen); BRT_CMD_S brtcmd = { type, xid, .u.id= {toku_fill_dbt(&hk, key, keylen),
toku_fill_dbt(&hv, val, vallen); toku_fill_dbt(&hv, val, vallen)} };
brtcmd.type = type;
brtcmd.u.id.key = &hk;
brtcmd.u.id.val = &hv;
//printf("%s:%d random_picked\n", __FILE__, __LINE__); //printf("%s:%d random_picked\n", __FILE__, __LINE__);
toku_init_dbt(&childsplitk); toku_init_dbt(&childsplitk);
...@@ -792,7 +795,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum, ...@@ -792,7 +795,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
if (0){ if (0){
unsigned int sum=0; unsigned int sum=0;
FIFO_ITERATE(BNC_BUFFER(node,childnum), subhk __attribute__((__unused__)), hkl, hd __attribute__((__unused__)), hdl, subtype __attribute__((__unused__)), FIFO_ITERATE(BNC_BUFFER(node,childnum), subhk __attribute__((__unused__)), hkl, hd __attribute__((__unused__)), hdl, subtype __attribute__((__unused__)), subxid __attribute__((__unused__)),
sum+=hkl+hdl+KEY_VALUE_OVERHEAD+BRT_CMD_OVERHEAD); sum+=hkl+hdl+KEY_VALUE_OVERHEAD+BRT_CMD_OVERHEAD);
printf("%s:%d sum=%d\n", __FILE__, __LINE__, sum); printf("%s:%d sum=%d\n", __FILE__, __LINE__, sum);
assert(sum==BNC_NBYTESINBUF(node, childnum)); assert(sum==BNC_NBYTESINBUF(node, childnum));
...@@ -1030,9 +1033,9 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1030,9 +1033,9 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd,
DBT *v = cmd->u.id.val; DBT *v = cmd->u.id.val;
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
int r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type); int r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, cmd->xid);
assert(r==0); assert(r==0);
node->local_fingerprint += node->rand4fingerprint * toku_calccrc32_cmd(type, k->data, k->size, v->data, v->size); node->local_fingerprint += node->rand4fingerprint * toku_calccrc32_cmd(type, cmd->xid, k->data, k->size, v->data, v->size);
node->u.n.n_bytes_in_buffers += diff; node->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(node, childnum) += diff; BNC_NBYTESINBUF(node, childnum) += diff;
node->dirty = 1; node->dirty = 1;
...@@ -1174,9 +1177,9 @@ static void verify_local_fingerprint_nonleaf (BRTNODE node) { ...@@ -1174,9 +1177,9 @@ static void verify_local_fingerprint_nonleaf (BRTNODE node) {
int i; int i;
if (node->height==0) return; if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++) for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
({ ({
fp += node->rand4fingerprint * toku_calccrc32_cmd(type, key, keylen, data, datalen); fp += node->rand4fingerprint * toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
})); }));
assert(fp==node->local_fingerprint); assert(fp==node->local_fingerprint);
} }
...@@ -1682,11 +1685,8 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKUTXN txn) { ...@@ -1682,11 +1685,8 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKUTXN txn) {
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) { int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r; int r;
BRT_CMD_S brtcmd; BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}};
brtcmd.type = BRT_INSERT;
brtcmd.u.id.key = key;
brtcmd.u.id.val = val;
r = brt_root_put_cmd(brt, &brtcmd, txn); r = brt_root_put_cmd(brt, &brtcmd, txn);
return r; return r;
} }
...@@ -1708,25 +1708,15 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) { ...@@ -1708,25 +1708,15 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) { int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r; int r;
BRT_CMD_S brtcmd;
DBT val; DBT val;
BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
toku_init_dbt(&val);
val.size = 0;
brtcmd.type = BRT_DELETE;
brtcmd.u.id.key = key;
brtcmd.u.id.val = &val;
r = brt_root_put_cmd(brt, &brtcmd, txn); r = brt_root_put_cmd(brt, &brtcmd, txn);
return r; return r;
} }
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) { int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r; int r;
BRT_CMD_S brtcmd; BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}};
brtcmd.type = BRT_DELETE_BOTH;
brtcmd.u.id.key = key;
brtcmd.u.id.val = val;
r = brt_root_put_cmd(brt, &brtcmd, txn); r = brt_root_put_cmd(brt, &brtcmd, txn);
return r; return r;
} }
...@@ -1750,19 +1740,20 @@ int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN ...@@ -1750,19 +1740,20 @@ int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN
//printf("%s %s\n", lorange ? lorange : "NULL", hirange ? hirange : "NULL"); //printf("%s %s\n", lorange ? lorange : "NULL", hirange ? hirange : "NULL");
{ {
int i; int i;
for (i=0; i< node->u.n.n_children-1; i++) { for (i=0; i< node->u.n.n_children; i++) {
printf("%*schild %d buffered (%d entries):\n", depth+1, "", i, toku_fifo_n_entries(BNC_BUFFER(node,i))); printf("%*schild %d buffered (%d entries):\n", depth+1, "", i, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
({ ({
printf("%*s %s %s %d\n", depth+2, "", (char*)key, (char*)data, type); data=data; datalen=datalen; keylen=keylen;
assert(strlen((char*)key)+1==keylen); printf("%*s xid=%lld %d (type=%d)\n", depth+2, "", xid, ntohl(*(int*)key), type);
assert(strlen((char*)data)+1==datalen); //assert(strlen((char*)key)+1==keylen);
//assert(strlen((char*)data)+1==datalen);
})); }));
} }
for (i=0; i<node->u.n.n_children; i++) { for (i=0; i<node->u.n.n_children; i++) {
printf("%*schild %d\n", depth, "", i); printf("%*schild %d\n", depth, "", i);
if (i>0) { if (i>0) {
printf("%*spivot %d=%s\n", depth+1, "", i-1, (char*)node->u.n.childkeys[i-1]); printf("%*spivot %d len=%d %d\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, ntohl(*(int*)&node->u.n.childkeys[i-1]->key));
} }
toku_dump_brtnode(brt, BNC_DISKOFF(node, i), depth+4, toku_dump_brtnode(brt, BNC_DISKOFF(node, i), depth+4,
(i==0) ? lorange : node->u.n.childkeys[i-1], (i==0) ? lorange : node->u.n.childkeys[i-1],
...@@ -1774,10 +1765,10 @@ int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN ...@@ -1774,10 +1765,10 @@ int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN
} }
} }
} else { } else {
printf("%*sNode %lld nodesize=%d height=%d n_bytes_in_buffer=%d keyrange=%s %s\n", printf("%*sNode %lld nodesize=%d height=%d n_bytes_in_buffer=%d keyrange=%d %d\n",
depth, "", off, node->nodesize, node->height, node->u.l.n_bytes_in_buffer, (char*)lorange, (char*)hirange); depth, "", off, node->nodesize, node->height, node->u.l.n_bytes_in_buffer, lorange ? ntohl(*(int*)lorange) : 0, hirange ? ntohl(*(int*)hirange) : 0);
PMA_ITERATE(node->u.l.buffer, key, keylen, val, vallen, PMA_ITERATE(node->u.l.buffer, key, keylen, val __attribute__((__unused__)), vallen,
( keylen=keylen, vallen=vallen, printf(" %s:%s", (char*)key, (char*)val))); ( keylen=keylen, vallen=vallen, printf(" (%d)%d ", keylen, ntohl(*(int*)key))));
printf("\n"); printf("\n");
} }
r = toku_cachetable_unpin(brt->cf, off, 0, 0); r = toku_cachetable_unpin(brt->cf, off, 0, 0);
...@@ -1788,6 +1779,7 @@ int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN ...@@ -1788,6 +1779,7 @@ int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN
int toku_dump_brt (BRT brt) { int toku_dump_brt (BRT brt) {
int r; int r;
CACHEKEY *rootp; CACHEKEY *rootp;
struct brt_header *prev_header = brt->h;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) { if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); } if (0) { died0: toku_unpin_brt_header(brt); }
return r; return r;
...@@ -1796,6 +1788,7 @@ int toku_dump_brt (BRT brt) { ...@@ -1796,6 +1788,7 @@ int toku_dump_brt (BRT brt) {
printf("split_count=%d\n", split_count); printf("split_count=%d\n", split_count);
if ((r = toku_dump_brtnode(brt, *rootp, 0, 0, 0, 0, 0, null_brtnode))) goto died0; if ((r = toku_dump_brtnode(brt, *rootp, 0, 0, 0, 0, 0, null_brtnode))) goto died0;
if ((r = toku_unpin_brt_header(brt))!=0) return r; if ((r = toku_unpin_brt_header(brt))!=0) return r;
brt->h = prev_header;
return 0; return 0;
} }
......
...@@ -46,10 +46,10 @@ void test_fifo_enq(int n) { ...@@ -46,10 +46,10 @@ void test_fifo_enq(int n) {
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
buildkey(i); buildkey(i);
buildval(i); buildval(i);
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i); assert(r == 0); r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, (TXNID)i); assert(r == 0);
} }
void checkit(bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen, int type, void *arg) { void checkit(bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen, int type, TXNID xid, void *arg) {
if (verbose) printf("checkit %d %d\n", i, type); if (verbose) printf("checkit %d %d\n", i, type);
assert(arg == 0); assert(arg == 0);
buildkey(i); buildkey(i);
...@@ -57,6 +57,7 @@ void test_fifo_enq(int n) { ...@@ -57,6 +57,7 @@ void test_fifo_enq(int n) {
assert((int) keylen == thekeylen); assert(memcmp(key, thekey, keylen) == 0); assert((int) keylen == thekeylen); assert(memcmp(key, thekey, keylen) == 0);
assert((int) vallen == thevallen); assert(memcmp(val, theval, vallen) == 0); assert((int) vallen == thevallen); assert(memcmp(val, theval, vallen) == 0);
assert(i % 256 == type); assert(i % 256 == type);
assert((TXNID)i==xid);
i += 1; i += 1;
} }
......
...@@ -65,10 +65,11 @@ int toku_fifo_n_entries(FIFO fifo) { ...@@ -65,10 +65,11 @@ int toku_fifo_n_entries(FIFO fifo) {
return fifo->n; return fifo->n;
} }
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type) { int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, TXNID xid) {
struct fifo_entry *entry = toku_malloc(sizeof (struct fifo_entry) + keylen + datalen); struct fifo_entry *entry = toku_malloc(sizeof (struct fifo_entry) + keylen + datalen);
if (entry == 0) return ENOMEM; if (entry == 0) return ENOMEM;
entry->type = type; entry->type = type;
entry->xid = xid;
entry->keylen = keylen; entry->keylen = keylen;
memcpy(entry->key, key, keylen); memcpy(entry->key, key, keylen);
entry->vallen = datalen; entry->vallen = datalen;
...@@ -78,7 +79,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d ...@@ -78,7 +79,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
} }
/* peek at the head (the oldest entry) of the fifo */ /* peek at the head (the oldest entry) of the fifo */
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, int *type) { int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, int *type, TXNID *xid) {
struct fifo_entry *entry = fifo_peek(fifo); struct fifo_entry *entry = fifo_peek(fifo);
if (entry == 0) return -1; if (entry == 0) return -1;
*key = entry->key; *key = entry->key;
...@@ -86,6 +87,7 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, ...@@ -86,6 +87,7 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data,
*data = entry->key + entry->keylen; *data = entry->key + entry->keylen;
*datalen = entry->vallen; *datalen = entry->vallen;
*type = entry->type; *type = entry->type;
*xid = entry->xid;
return 0; return 0;
} }
...@@ -96,10 +98,10 @@ int toku_fifo_deq(FIFO fifo) { ...@@ -96,10 +98,10 @@ int toku_fifo_deq(FIFO fifo) {
return 0; return 0;
} }
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, void*), void *arg) { void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void *arg) {
struct fifo_entry *entry; struct fifo_entry *entry;
for (entry = fifo_peek(fifo); entry; entry = entry->next) for (entry = fifo_peek(fifo); entry; entry = entry->next)
f(entry->key, entry->keylen, entry->key + entry->keylen, entry->vallen, entry->type, arg); f(entry->key, entry->keylen, entry->key + entry->keylen, entry->vallen, entry->type, entry->xid, arg);
} }
......
...@@ -5,6 +5,7 @@ struct fifo_entry { ...@@ -5,6 +5,7 @@ struct fifo_entry {
unsigned int keylen; unsigned int keylen;
unsigned int vallen; unsigned int vallen;
unsigned char type; unsigned char type;
TXNID xid;
unsigned char key[]; unsigned char key[];
}; };
...@@ -18,20 +19,21 @@ typedef struct fifo *FIFO; ...@@ -18,20 +19,21 @@ typedef struct fifo *FIFO;
int toku_fifo_create(FIFO *); int toku_fifo_create(FIFO *);
void toku_fifo_free(FIFO *); void toku_fifo_free(FIFO *);
int toku_fifo_n_entries(FIFO); int toku_fifo_n_entries(FIFO);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type); int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, TXNID xid);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type); int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type, TXNID *xid);
int toku_fifo_deq(FIFO); int toku_fifo_deq(FIFO);
int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type); int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type);
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, void*), void*); void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void*);
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,body) ({ \ #define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,xidvar,body) ({ \
struct fifo_entry *entry; \ struct fifo_entry *entry; \
for (entry = fifo->head; entry; entry = entry->next) { \ for (entry = fifo->head; entry; entry = entry->next) { \
unsigned int keylenvar = entry->keylen; \ unsigned int keylenvar = entry->keylen; \
void *keyvar = entry->key; \ void *keyvar = entry->key; \
unsigned int datalenvar = entry->vallen; \ unsigned int datalenvar = entry->vallen; \
void *datavar = entry->key + entry->keylen; \ void *datavar = entry->key + entry->keylen; \
unsigned int typevar = entry->type; \ enum brt_cmd_type typevar = entry->type; \
TXNID xidvar = entry->xid; \
body; \ body; \
} \ } \
}) })
......
...@@ -21,16 +21,20 @@ u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, i ...@@ -21,16 +21,20 @@ u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, i
return toku_calc_more_crc32_kvpair(toku_null_crc, key, keylen, val, vallen); return toku_calc_more_crc32_kvpair(toku_null_crc, key, keylen, val, vallen);
} }
u_int32_t toku_calccrc32_cmd (int type, const void *key, int keylen, const void *val, int vallen) { u_int32_t toku_calccrc32_cmd (int type, TXNID xid, const void *key, int keylen, const void *val, int vallen) {
unsigned char type_c = type; unsigned char type_c = type;
return toku_calc_more_crc32_kvpair(toku_crc32(toku_null_crc, unsigned int a = htonl(xid>>32);
&type_c, 1), unsigned int b = htonl(xid&0xffffffff);
return toku_calc_more_crc32_kvpair(toku_crc32(toku_crc32(toku_crc32(toku_null_crc,
&type_c, 1),
&a, 4),
&b, 4),
key, keylen, val, vallen); key, keylen, val, vallen);
} }
u_int32_t toku_calccrc32_cmdstruct (BRT_CMD cmd) { u_int32_t toku_calccrc32_cmdstruct (BRT_CMD cmd) {
if (cmd->type <= BRT_DELETE_BOTH) if (cmd->type <= BRT_DELETE_BOTH)
return toku_calccrc32_cmd (cmd->type, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size); return toku_calccrc32_cmd (cmd->type, cmd->xid, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size);
else else
assert(0); /* Should not have come here. */ assert(0); /* Should not have come here. */
} }
...@@ -171,7 +171,7 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) { ...@@ -171,7 +171,7 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
n->thisnodename = c->diskoff; n->thisnodename = c->diskoff;
n->log_lsn = n->disk_lsn = c->lsn; n->log_lsn = n->disk_lsn = c->lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn); //printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
n->layout_version = 1; n->layout_version = 2;
n->height = c->height; n->height = c->height;
n->rand4fingerprint = c->rand4fingerprint; n->rand4fingerprint = c->rand4fingerprint;
n->flags = c->is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ??? n->flags = c->is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment