Commit 72689d50 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

[t:3604] Merge everything from the tokudb.3312c+3524+msn branch. Refs #3604.

git-svn-id: file:///svn/toku/tokudb@32294 c7de825b-a66e-492c-adef-691d508d4ae1
parent e78bd42f
...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status { ...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status {
u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */ u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */
u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */ u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */
u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */ u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */
u_int64_t maybe_get_and_pin_if_in_memorys; /* how many times has get_and_pin_if_in_memory been called */
int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */ int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */
int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */ int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */
int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */ int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */
......
...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status { ...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status {
u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */ u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */
u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */ u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */
u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */ u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */
u_int64_t maybe_get_and_pin_if_in_memorys; /* how many times has get_and_pin_if_in_memory been called */
int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */ int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */
int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */ int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */
int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */ int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */
......
...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status { ...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status {
u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */ u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */
u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */ u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */
u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */ u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */
u_int64_t maybe_get_and_pin_if_in_memorys; /* how many times has get_and_pin_if_in_memory been called */
int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */ int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */
int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */ int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */
int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */ int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */
......
...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status { ...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status {
u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */ u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */
u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */ u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */
u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */ u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */
u_int64_t maybe_get_and_pin_if_in_memorys; /* how many times has get_and_pin_if_in_memory been called */
int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */ int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */
int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */ int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */
int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */ int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */
......
...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status { ...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status {
u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */ u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */
u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */ u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */
u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */ u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */
u_int64_t maybe_get_and_pin_if_in_memorys; /* how many times has get_and_pin_if_in_memory been called */
int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */ int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */
int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */ int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */
int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */ int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */
......
...@@ -495,6 +495,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__ ...@@ -495,6 +495,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
printf(" u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */ \n"); printf(" u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */ \n");
printf(" u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */ \n"); printf(" u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */ \n");
printf(" u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */ \n"); printf(" u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */ \n");
printf(" u_int64_t maybe_get_and_pin_if_in_memorys; /* how many times has get_and_pin_if_in_memory been called */ \n");
printf(" int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */ \n"); printf(" int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */ \n");
printf(" int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */ \n"); printf(" int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */ \n");
printf(" int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */ \n"); printf(" int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */ \n");
......
...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status { ...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status {
u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */ u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */
u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */ u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */
u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */ u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */
u_int64_t maybe_get_and_pin_if_in_memorys; /* how many times has get_and_pin_if_in_memory been called */
int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */ int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */
int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */ int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */
int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */ int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */
......
...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status { ...@@ -103,6 +103,7 @@ typedef struct __toku_engine_status {
u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */ u_int64_t prefetches; /* how many times has a block been prefetched into the cachetable */
u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */ u_int64_t maybe_get_and_pins; /* how many times has maybe_get_and_pin(_clean) been called */
u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */ u_int64_t maybe_get_and_pin_hits; /* how many times has get_and_pin(_clean) returned with a node */
u_int64_t maybe_get_and_pin_if_in_memorys; /* how many times has get_and_pin_if_in_memory been called */
int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */ int64_t cachetable_size_current; /* sum of the sizes of the nodes represented in the cachetable */
int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */ int64_t cachetable_size_limit; /* the limit to the sum of the node sizes */
int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */ int64_t cachetable_size_writing; /* the sum of the sizes of the nodes being written */
......
...@@ -19,6 +19,7 @@ SKIP_NEWBRTRULE=1 ...@@ -19,6 +19,7 @@ SKIP_NEWBRTRULE=1
include $(TOKUROOT)toku_include/Makefile.include include $(TOKUROOT)toku_include/Makefile.include
LDFLAGS+=-L$(TOKUROOT)lib -Wl,-rpath,$(shell pwd)/$(TOKUROOT)lib LDFLAGS+=-L$(TOKUROOT)lib -Wl,-rpath,$(shell pwd)/$(TOKUROOT)lib
LDLIBS+=-lnewbrt -ltokuportability LDLIBS+=-lnewbrt -ltokuportability
# When debugging, try: valgrind --show-reachable=yes --leak-check=full ./brt-test # When debugging, try: valgrind --show-reachable=yes --leak-check=full ./brt-test
BINS_RAW= \ BINS_RAW= \
...@@ -51,7 +52,6 @@ BRT_SOURCES = \ ...@@ -51,7 +52,6 @@ BRT_SOURCES = \
fifo \ fifo \
key \ key \
leafentry \ leafentry \
leaflock \
le-cursor \ le-cursor \
logfilemgr \ logfilemgr \
logger \ logger \
...@@ -60,7 +60,6 @@ BRT_SOURCES = \ ...@@ -60,7 +60,6 @@ BRT_SOURCES = \
log_print \ log_print \
logcursor \ logcursor \
memarena \ memarena \
mempool \
minicron \ minicron \
omt \ omt \
pqueue \ pqueue \
...@@ -109,7 +108,7 @@ brtloader.$(OEXT): $(DEPEND_COMPILE) ...@@ -109,7 +108,7 @@ brtloader.$(OEXT): $(DEPEND_COMPILE)
$(NEWBRT_O_FILES): VISIBILITY= $(NEWBRT_O_FILES): VISIBILITY=
$(NEWBRT_SO): $(NEWBRT_O_FILES) $(NEWBRT_SO): $(NEWBRT_O_FILES)
echo $(patsubst %,newbrt/%,$(NEWBRT_O_FILES)) > ../lib/newbrt.olist echo $(patsubst %,newbrt/%,$(NEWBRT_O_FILES)) > ../lib/newbrt.olist
$(TOKULINKER) $(SHARED) $(SYMBOLS) $(GCOV_FLAGS) $(SKIP_WARNING) $(NEWBRT_O_FILES) -o$(NEWBRT_SO) -nostdlib $(LCILKRTS) $(TOKULINKER) $(SHARED) $(SYMBOLS) $(GCOV_FLAGS) $(SKIP_WARNING) $(NEWBRT_O_FILES) -o$(NEWBRT_SO) $(LINUX_NOSTDLIB) $(LCILKRTS)
log_code.$(OEXT): log_header.h wbuf.h log-internal.h rbuf.h log_code.$(OEXT): log_header.h wbuf.h log-internal.h rbuf.h
......
This diff is collapsed.
...@@ -30,14 +30,45 @@ typedef struct brt_search { ...@@ -30,14 +30,45 @@ typedef struct brt_search {
enum brt_search_direction_e direction; enum brt_search_direction_e direction;
const DBT *k; const DBT *k;
void *context; void *context;
// To fix #3522, we need to remember the pivots that we have searched unsuccessfully.
// For example, when searching right (left), we call search->compare() on the ith pivot key. If search->compare(0 returns
// nonzero, then we search the ith subtree. If that subsearch returns DB_NOTFOUND then maybe the key isn't present in the
// tree. But maybe we are doing a DB_NEXT (DB_PREV), and everything was deleted. So we remember the pivot, and later we
// will only search subtrees which contain keys that are bigger than (less than) the pivot.
// The code is a kludge (even before this fix), and interacts strangely with the TOKUDB_FOUND_BUT_REJECTED (which is there
// because a failed DB_GET we would keep searching the rest of the tree). We probably should write the various lookup
// codes (NEXT, PREV, CURRENT, etc) more directly, and we should probably use a binary search within a node to search the
// pivots so that we can support a larger fanout.
// These changes (3312+3522) also (probably) introduce an isolation error (#3529).
// We must make sure we lock the right range for proper isolation level.
// There's probably a bug in which the following could happen.
// Thread A: Searches through deleted keys A,B,D,E and finds nothing, so searches the next leaf, releasing the YDB lock.
// Thread B: Inserts key C, and acquires the write lock, then commits.
// Thread A: Resumes, searching F,G,H and return success. Thread A then read-locks the range A-H, and doesn't notice
// the value C inserted by thread B. Thus a failure of serialization.
// See #3529.
// There also remains a potential thrashing problem. When we get a TOKUDB_TRY_AGAIN, we unpin everything. There's
// no guarantee that we will get everything pinned again. We ought to keep nodes pinned when we retry, except that on the
// way out with a DB_NOTFOUND we ought to unpin those nodes. See #3528.
BOOL have_pivot_bound;
DBT pivot_bound;
} brt_search_t; } brt_search_t;
/* initialize the search compare object */ /* initialize the search compare object */
static inline brt_search_t *brt_search_init(brt_search_t *so, brt_search_compare_func_t compare, enum brt_search_direction_e direction, const DBT *k, void *context) { static inline brt_search_t *brt_search_init(brt_search_t *so, brt_search_compare_func_t compare, enum brt_search_direction_e direction, const DBT *k, void *context) {
so->compare = compare; so->direction = direction; so->k = k; so->context = context; so->compare = compare;
so->direction = direction;
so->k = k;
so->context = context;
so->have_pivot_bound = FALSE;
return so; return so;
} }
static inline void brt_search_finish(brt_search_t *so) {
if (so->have_pivot_bound) toku_free(so->pivot_bound.data);
}
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
}; };
#endif #endif
......
This diff is collapsed.
...@@ -5,47 +5,65 @@ ...@@ -5,47 +5,65 @@
#include "includes.h" #include "includes.h"
#include "ule.h" #include "ule.h"
// dummymsn needed to simulate msn because messages are injected at a lower level than toku_brt_root_put_cmd()
#define MIN_DUMMYMSN ((MSN) {(uint64_t)1<<48})
static MSN dummymsn;
static int testsetup_initialized = 0;
// Must be called before any other test_setup_xxx() functions are called.
void
toku_testsetup_initialize(void) {
if (testsetup_initialized == 0) {
testsetup_initialized = 1;
dummymsn = MIN_DUMMYMSN;
}
}
static MSN
next_dummymsn(void) {
++(dummymsn.msn);
return dummymsn;
}
BOOL ignore_if_was_already_open; BOOL ignore_if_was_already_open;
int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) { int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) {
BRTNODE node; BRTNODE node;
assert(testsetup_initialized);
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, MAX_LSN, &brt->h, &ignore_if_was_already_open); int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, MAX_LSN, &brt->h, &ignore_if_was_already_open);
if (r!=0) return r; if (r!=0) return r;
toku_create_new_brtnode(brt, &node, 0, 0); toku_create_new_brtnode(brt, &node, 0, 1);
*blocknum = node->thisnodename; *blocknum = node->thisnodename;
r = toku_unpin_brtnode(brt, node); toku_unpin_brtnode(brt, node);
if (r!=0) return r;
return 0; return 0;
} }
// Don't bother to clean up carefully if something goes wrong. (E.g., it's OK to have malloced stuff that hasn't been freed.) // Don't bother to clean up carefully if something goes wrong. (E.g., it's OK to have malloced stuff that hasn't been freed.)
int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_children, BLOCKNUM *children, char **keys, int *keylens) { int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_children, BLOCKNUM *children, char **keys, int *keylens) {
BRTNODE node; BRTNODE node;
assert(testsetup_initialized);
assert(n_children<=BRT_FANOUT); assert(n_children<=BRT_FANOUT);
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, MAX_LSN, &brt->h, &ignore_if_was_already_open); int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, MAX_LSN, &brt->h, &ignore_if_was_already_open);
if (r!=0) return r; if (r!=0) return r;
toku_create_new_brtnode(brt, &node, height, 0); toku_create_new_brtnode(brt, &node, height, n_children);
node->u.n.n_children=n_children;
MALLOC_N(n_children+1, node->u.n.childinfos);
MALLOC_N(n_children, node->u.n.childkeys);
node->u.n.totalchildkeylens=0;
node->u.n.n_bytes_in_buffers=0;
int i; int i;
for (i=0; i<n_children; i++) { for (i=0; i<n_children; i++) {
node->u.n.childinfos[i] = (struct brtnode_nonleaf_childinfo){ .subtree_estimates = zero_estimates, node->u.n.childinfos[i].blocknum = children[i];
.blocknum = children[i],
.n_bytes_in_buffer = 0 };
r = toku_fifo_create(&BNC_BUFFER(node,i)); if (r!=0) return r;
} }
for (i=0; i+1<n_children; i++) { for (i=0; i+1<n_children; i++) {
node->u.n.childkeys[i] = kv_pair_malloc(keys[i], keylens[i], 0, 0); node->childkeys[i] = kv_pair_malloc(keys[i], keylens[i], 0, 0);
node->u.n.totalchildkeylens += keylens[i]; node->totalchildkeylens += keylens[i];
} }
*blocknum = node->thisnodename; *blocknum = node->thisnodename;
return toku_unpin_brtnode(brt, node); toku_unpin_brtnode(brt, node);
return 0;
} }
int toku_testsetup_root(BRT brt, BLOCKNUM blocknum) { int toku_testsetup_root(BRT brt, BLOCKNUM blocknum) {
assert(testsetup_initialized);
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, MAX_LSN, &brt->h, &ignore_if_was_already_open); int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, MAX_LSN, &brt->h, &ignore_if_was_already_open);
if (r!=0) return r; if (r!=0) return r;
brt->h->root = blocknum; brt->h->root = blocknum;
...@@ -55,21 +73,24 @@ int toku_testsetup_root(BRT brt, BLOCKNUM blocknum) { ...@@ -55,21 +73,24 @@ int toku_testsetup_root(BRT brt, BLOCKNUM blocknum) {
int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on disk int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on disk
{ {
assert(testsetup_initialized);
void *node_v; void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, diskoff, toku_cachetable_hash(brt->cf, diskoff), &node_v, NULL, int r = toku_cachetable_get_and_pin(brt->cf, diskoff, toku_cachetable_hash(brt->cf, diskoff), &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt);
assert(r==0); assert(r==0);
int size = toku_serialize_brtnode_size(node_v); int size = toku_serialize_brtnode_size(node_v);
r = toku_unpin_brtnode(brt, node_v); toku_unpin_brtnode(brt, node_v);
assert(r==0);
return size; return size;
} }
int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int keylen, char *val, int vallen) { int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int keylen, char *val, int vallen) {
void *node_v; void *node_v;
int r; int r;
assert(testsetup_initialized);
r = toku_cachetable_get_and_pin(brt->cf, blocknum, toku_cachetable_hash(brt->cf, blocknum), &node_v, NULL, r = toku_cachetable_get_and_pin(brt->cf, blocknum, toku_cachetable_hash(brt->cf, blocknum), &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt);
if (r!=0) return r; if (r!=0) return r;
BRTNODE node=node_v; BRTNODE node=node_v;
toku_verify_or_set_counts(node); toku_verify_or_set_counts(node);
...@@ -80,47 +101,51 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke ...@@ -80,47 +101,51 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
OMTVALUE storeddatav; OMTVALUE storeddatav;
u_int32_t idx; u_int32_t idx;
DBT keydbt,valdbt; DBT keydbt,valdbt;
BRT_MSG_S cmd = {BRT_INSERT, xids_get_root_xids(), MSN msn = next_dummymsn();
BRT_MSG_S cmd = {BRT_INSERT, msn, xids_get_root_xids(),
.u.id={toku_fill_dbt(&keydbt, key, keylen), .u.id={toku_fill_dbt(&keydbt, key, keylen),
toku_fill_dbt(&valdbt, val, vallen)}}; toku_fill_dbt(&valdbt, val, vallen)}};
//Generate a leafentry (committed insert key,val) //Generate a leafentry (committed insert key,val)
r = apply_msg_to_leafentry(&cmd, NULL, //No old leafentry r = apply_msg_to_leafentry(&cmd, NULL, //No old leafentry
&lesize, &disksize, &leafentry, &lesize, &disksize, &leafentry,
node->u.l.buffer, &node->u.l.buffer_mempool, 0, NULL, NULL); NULL, NULL);
assert(r==0); assert(r==0);
struct cmd_leafval_heaviside_extra be = {brt, &cmd}; struct cmd_leafval_heaviside_extra be = {brt, &keydbt};
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_heaviside, &be, &storeddatav, &idx, NULL); r = toku_omt_find_zero(node->u.l.bn[0].buffer, toku_cmd_leafval_heaviside, &be, &storeddatav, &idx, NULL);
if (r==0) { if (r==0) {
LEAFENTRY storeddata=storeddatav; LEAFENTRY storeddata=storeddatav;
// It's already there. So now we have to remove it and put the new one back in. // It's already there. So now we have to remove it and put the new one back in.
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(storeddata); node->u.l.bn[0].n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(storeddata);
toku_mempool_mfree(&node->u.l.buffer_mempool, storeddata, leafentry_memsize(storeddata)); toku_free(storeddata);
// Now put the new kv in. // Now put the new kv in.
toku_omt_set_at(node->u.l.buffer, leafentry, idx); toku_omt_set_at(node->u.l.bn[0].buffer, leafentry, idx);
} else { } else {
r = toku_omt_insert(node->u.l.buffer, leafentry, toku_cmd_leafval_heaviside, &be, 0); r = toku_omt_insert(node->u.l.bn[0].buffer, leafentry, toku_cmd_leafval_heaviside, &be, 0);
assert(r==0); assert(r==0);
} }
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + disksize; node->u.l.bn[0].n_bytes_in_buffer += OMT_ITEM_OVERHEAD + disksize;
node->dirty=1; node->dirty=1;
toku_verify_or_set_counts(node); toku_verify_or_set_counts(node);
r = toku_unpin_brtnode(brt, node_v); toku_unpin_brtnode(brt, node_v);
return r; return 0;
} }
int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_type cmdtype, char *key, int keylen, char *val, int vallen) { int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_type cmdtype, char *key, int keylen, char *val, int vallen) {
void *node_v; void *node_v;
int r; int r;
assert(testsetup_initialized);
r = toku_cachetable_get_and_pin(brt->cf, blocknum, toku_cachetable_hash(brt->cf, blocknum), &node_v, NULL, r = toku_cachetable_get_and_pin(brt->cf, blocknum, toku_cachetable_hash(brt->cf, blocknum), &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt);
if (r!=0) return r; if (r!=0) return r;
BRTNODE node=node_v; BRTNODE node=node_v;
assert(node->height>0); assert(node->height>0);
...@@ -131,13 +156,14 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t ...@@ -131,13 +156,14 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
brt); brt);
XIDS xids_0 = xids_get_root_xids(); XIDS xids_0 = xids_get_root_xids();
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, xids_0); MSN msn = next_dummymsn();
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, msn, xids_0);
assert(r==0); assert(r==0);
int sizediff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids_0); int sizediff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids_0);
node->u.n.n_bytes_in_buffers += sizediff; node->u.n.n_bytes_in_buffers += sizediff;
BNC_NBYTESINBUF(node, childnum) += sizediff; BNC_NBYTESINBUF(node, childnum) += sizediff;
node->dirty = 1; node->dirty = 1;
r = toku_unpin_brtnode(brt, node_v); toku_unpin_brtnode(brt, node_v);
return r; return 0;
} }
This diff is collapsed.
This diff is collapsed.
...@@ -282,10 +282,6 @@ int toku_brt_zombie_needed (BRT brt) __attribute__ ((warn_unused_result)); ...@@ -282,10 +282,6 @@ int toku_brt_zombie_needed (BRT brt) __attribute__ ((warn_unused_result));
int toku_brt_get_fragmentation(BRT brt, TOKU_DB_FRAGMENTATION report) __attribute__ ((warn_unused_result)); int toku_brt_get_fragmentation(BRT brt, TOKU_DB_FRAGMENTATION report) __attribute__ ((warn_unused_result));
int toku_brt_header_set_panic(struct brt_header *h, int panic, char *panic_string) __attribute__ ((warn_unused_result)); int toku_brt_header_set_panic(struct brt_header *h, int panic, char *panic_string) __attribute__ ((warn_unused_result));
BOOL toku_brt_is_empty (BRT brt, BOOL *try_again) __attribute__ ((warn_unused_result));
// Effect: Return TRUE iff the tree is empty. (However if *try_again is set to TRUE by toku_brt_is_empty, then the answer is inconclusive, and the function should
// be tried again. It's a good idea to release the big ydb lock in this case.
BOOL toku_brt_is_empty_fast (BRT brt); BOOL toku_brt_is_empty_fast (BRT brt);
// Effect: Return TRUE if there are no messages or leaf entries in the tree. If so, it's empty. If there are messages or leaf entries, we say it's not empty // Effect: Return TRUE if there are no messages or leaf entries in the tree. If so, it's empty. If there are messages or leaf entries, we say it's not empty
// even though if we were to optimize the tree it might turn out that they are empty. // even though if we were to optimize the tree it might turn out that they are empty.
...@@ -295,10 +291,6 @@ BOOL toku_brt_is_empty_fast (BRT brt) __attribute__ ((warn_unused_result)); ...@@ -295,10 +291,6 @@ BOOL toku_brt_is_empty_fast (BRT brt) __attribute__ ((warn_unused_result));
// even though if we were to optimize the tree it might turn out that they are empty. // even though if we were to optimize the tree it might turn out that they are empty.
BOOL toku_brt_is_recovery_logging_suppressed (BRT) __attribute__ ((warn_unused_result)); BOOL toku_brt_is_recovery_logging_suppressed (BRT) __attribute__ ((warn_unused_result));
//TODO: #1485 once we have multiple main threads, restore this code, analyze performance.
#ifndef TOKU_MULTIPLE_MAIN_THREADS
#define TOKU_MULTIPLE_MAIN_THREADS 0
#endif
void toku_brt_leaf_reset_calc_leaf_stats(BRTNODE node); void toku_brt_leaf_reset_calc_leaf_stats(BRTNODE node);
......
...@@ -18,6 +18,7 @@ enum brt_layout_version_e { ...@@ -18,6 +18,7 @@ enum brt_layout_version_e {
BRT_LAYOUT_VERSION_12 = 12, // Diff from 11 to 12: Added BRT_CMD 'BRT_INSERT_NO_OVERWRITE', compressed block format, num old blocks BRT_LAYOUT_VERSION_12 = 12, // Diff from 11 to 12: Added BRT_CMD 'BRT_INSERT_NO_OVERWRITE', compressed block format, num old blocks
BRT_LAYOUT_VERSION_13 = 13, // Diff from 12 to 13: Fixed loader pivot bug, added build_id to every node, timestamps to brtheader BRT_LAYOUT_VERSION_13 = 13, // Diff from 12 to 13: Fixed loader pivot bug, added build_id to every node, timestamps to brtheader
BRT_LAYOUT_VERSION_14 = 14, // Diff from 13 to 14: Added MVCC; deprecated TOKU_DB_VALCMP_BUILTIN(_13); Remove fingerprints; Support QUICKLZ; add end-to-end checksum on uncompressed data. BRT_LAYOUT_VERSION_14 = 14, // Diff from 13 to 14: Added MVCC; deprecated TOKU_DB_VALCMP_BUILTIN(_13); Remove fingerprints; Support QUICKLZ; add end-to-end checksum on uncompressed data.
BRT_LAYOUT_VERSION_15 = 15, // Diff from 14 to 15: TODO
BRT_NEXT_VERSION, // the version after the current version BRT_NEXT_VERSION, // the version after the current version
BRT_LAYOUT_VERSION = BRT_NEXT_VERSION-1, // A hack so I don't have to change this line. BRT_LAYOUT_VERSION = BRT_NEXT_VERSION-1, // A hack so I don't have to change this line.
BRT_LAYOUT_MIN_SUPPORTED_VERSION = BRT_LAYOUT_VERSION_13, // Minimum version supported BRT_LAYOUT_MIN_SUPPORTED_VERSION = BRT_LAYOUT_VERSION_13, // Minimum version supported
...@@ -25,6 +26,7 @@ enum brt_layout_version_e { ...@@ -25,6 +26,7 @@ enum brt_layout_version_e {
// Define these symbolically so the knowledge of exactly which layout version got rid of fingerprints isn't spread all over the code. // Define these symbolically so the knowledge of exactly which layout version got rid of fingerprints isn't spread all over the code.
BRT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT = BRT_LAYOUT_VERSION_13, BRT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT = BRT_LAYOUT_VERSION_13,
BRT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM = BRT_LAYOUT_VERSION_14, BRT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM = BRT_LAYOUT_VERSION_14,
BRT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES = BRT_LAYOUT_VERSION_15,
}; };
#endif #endif
...@@ -136,36 +136,43 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) { ...@@ -136,36 +136,43 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
printf(" layout_version_original=%d\n", n->layout_version_original); printf(" layout_version_original=%d\n", n->layout_version_original);
printf(" layout_version_read_from_disk=%d\n", n->layout_version_read_from_disk); printf(" layout_version_read_from_disk=%d\n", n->layout_version_read_from_disk);
printf(" build_id=%d\n", n->build_id); printf(" build_id=%d\n", n->build_id);
if (n->height>0) { printf(" max_msn_applied_to_node=%"PRId64" (0x%"PRIx64")\n", n->max_msn_applied_to_node.msn, n->max_msn_applied_to_node.msn);
printf(" n_children=%d\n", n->u.n.n_children);
printf(" total_childkeylens=%u\n", n->u.n.totalchildkeylens); printf(" n_children=%d\n", n->n_children);
printf(" total_childkeylens=%u\n", n->totalchildkeylens);
if (n->height > 0) {
printf(" n_bytes_in_buffers=%u\n", n->u.n.n_bytes_in_buffers); printf(" n_bytes_in_buffers=%u\n", n->u.n.n_bytes_in_buffers);
}
int i; int i;
printf(" subleafentry_estimates={"); printf(" subleafentry_estimates={");
for (i=0; i<n->u.n.n_children; i++) { for (i=0; i<n->n_children; i++) {
if (i>0) printf(" "); if (i>0) printf(" ");
struct subtree_estimates *est = &(BNC_SUBTREE_ESTIMATES(n, i)); struct subtree_estimates *est = &n->subtree_estimates[i];
printf("{nkey=%" PRIu64 " ndata=%" PRIu64 " dsize=%" PRIu64 " %s }", est->nkeys, est->ndata, est->dsize, est->exact ? "T" : "F"); printf("{nkey=%" PRIu64 " ndata=%" PRIu64 " dsize=%" PRIu64 " %s }", est->nkeys, est->ndata, est->dsize, est->exact ? "T" : "F");
} }
printf("}\n"); printf("}\n");
printf(" pivots:\n"); printf(" pivots:\n");
for (i=0; i<n->u.n.n_children-1; i++) { for (i=0; i<n->n_children-1; i++) {
struct kv_pair *piv = n->u.n.childkeys[i]; struct kv_pair *piv = n->childkeys[i];
printf(" pivot %d:", i); printf(" pivot %d:", i);
assert(n->flags == 0); assert(n->flags == 0);
print_item(kv_pair_key_const(piv), kv_pair_keylen(piv)); print_item(kv_pair_key_const(piv), kv_pair_keylen(piv));
printf("\n"); printf("\n");
} }
printf(" children:\n"); printf(" children:\n");
for (i=0; i<n->u.n.n_children; i++) { for (i=0; i<n->n_children; i++) {
if (n->height > 0) {
printf(" child %d: %" PRId64 "\n", i, BNC_BLOCKNUM(n, i).b); printf(" child %d: %" PRId64 "\n", i, BNC_BLOCKNUM(n, i).b);
unsigned int n_bytes = BNC_NBYTESINBUF(n, i); unsigned int n_bytes = BNC_NBYTESINBUF(n, i);
int n_entries = toku_fifo_n_entries(BNC_BUFFER(n, i)); int n_entries = toku_fifo_n_entries(BNC_BUFFER(n, i));
if (n_bytes > 0 || n_entries > 0) if (n_bytes > 0 || n_entries > 0) {
printf(" buffer contains %u bytes (%d items)\n", n_bytes, n_entries); printf(" buffer contains %u bytes (%d items)\n", n_bytes, n_entries);
}
if (dump_data) { if (dump_data) {
FIFO_ITERATE(BNC_BUFFER(n,i), key, keylen, data, datalen, typ, xids, FIFO_ITERATE(BNC_BUFFER(n,i), key, keylen, data, datalen, typ, msn, xids,
{ {
printf(" msn=%"PRIu64" (0x%"PRIx64") ", msn.msn, msn.msn);
printf(" TYPE="); printf(" TYPE=");
switch ((enum brt_msg_type)typ) { switch ((enum brt_msg_type)typ) {
case BRT_NONE: printf("NONE"); goto ok; case BRT_NONE: printf("NONE"); goto ok;
...@@ -196,15 +203,14 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) { ...@@ -196,15 +203,14 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
} }
); );
} }
}
} else { } else {
struct subtree_estimates *est = &n->u.l.leaf_stats; printf(" optimized_for_upgrade=%u\n", n->u.l.bn[i].optimized_for_upgrade);
printf("{nkey=%" PRIu64 " ndata=%" PRIu64 " dsize=%" PRIu64 " %s }\n", est->nkeys, est->ndata, est->dsize, est->exact ? "T" : "F"); printf(" n_bytes_in_buffer=%u\n", n->u.l.bn[i].n_bytes_in_buffer);
printf(" optimized_for_upgrade=%u\n", n->u.l.optimized_for_upgrade); printf(" items_in_buffer =%u\n", toku_omt_size(n->u.l.bn[i].buffer));
printf(" n_bytes_in_buffer=%u\n", n->u.l.n_bytes_in_buffer); if (dump_data) toku_omt_iterate(n->u.l.bn[i].buffer, print_le, 0);
printf(" items_in_buffer =%u\n", toku_omt_size(n->u.l.buffer)); }
if (dump_data) toku_omt_iterate(n->u.l.buffer, print_le, 0); }
} toku_brtnode_free(&n); toku_brtnode_free(&n);
} }
static void static void
...@@ -510,5 +516,3 @@ main (int argc, const char *const argv[]) { ...@@ -510,5 +516,3 @@ main (int argc, const char *const argv[]) {
toku_brtheader_free(h); toku_brtheader_free(h);
return 0; return 0;
} }
...@@ -3104,13 +3104,14 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -3104,13 +3104,14 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
node->layout_version = BRT_LAYOUT_VERSION; node->layout_version = BRT_LAYOUT_VERSION;
node->layout_version_original = BRT_LAYOUT_VERSION; node->layout_version_original = BRT_LAYOUT_VERSION;
node->build_id = BUILD_ID; node->build_id = BUILD_ID;
node->max_msn_applied_to_node = MIN_MSN;
node->height=height; node->height=height;
node->u.n.n_children = n_children; node->n_children = n_children;
node->flags = 0; node->flags = 0;
XMALLOC_N(n_children-1, node->u.n.childkeys); XMALLOC_N(n_children-1, node->childkeys);
for (int i=0; i<n_children-1; i++) for (int i=0; i<n_children-1; i++)
node->u.n.childkeys[i] = NULL; node->childkeys[i] = NULL;
unsigned int totalchildkeylens = 0; unsigned int totalchildkeylens = 0;
for (int i=0; i<n_children-1; i++) { for (int i=0; i<n_children-1; i++) {
struct kv_pair *childkey = kv_pair_malloc(pivots[i].data, pivots[i].size, NULL, 0); struct kv_pair *childkey = kv_pair_malloc(pivots[i].data, pivots[i].size, NULL, 0);
...@@ -3118,16 +3119,17 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -3118,16 +3119,17 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
result = errno; result = errno;
break; break;
} }
node->u.n.childkeys[i] = childkey; node->childkeys[i] = childkey;
totalchildkeylens += kv_pair_keylen(childkey); totalchildkeylens += kv_pair_keylen(childkey);
} }
node->u.n.n_bytes_in_buffers = 0; node->u.n.n_bytes_in_buffers = 0;
node->u.n.totalchildkeylens = totalchildkeylens; node->totalchildkeylens = totalchildkeylens;
XMALLOC_N(n_children, node->u.n.childinfos); XMALLOC_N(n_children, node->u.n.childinfos);
XMALLOC_N(n_children, node->subtree_estimates);
for (int i=0; i<n_children; i++) { for (int i=0; i<n_children; i++) {
struct brtnode_nonleaf_childinfo *ci = &node->u.n.childinfos[i]; struct brtnode_nonleaf_childinfo *ci = &node->u.n.childinfos[i];
ci->subtree_estimates = subtree_info[i].subtree_estimates;
ci->blocknum = make_blocknum(subtree_info[i].block); ci->blocknum = make_blocknum(subtree_info[i].block);
node->subtree_estimates[i] = subtree_info[i].subtree_estimates;
ci->have_fullhash = FALSE; ci->have_fullhash = FALSE;
ci->fullhash = 0; ci->fullhash = 0;
ci->buffer = NULL; ci->buffer = NULL;
...@@ -3141,7 +3143,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -3141,7 +3143,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
size_t n_bytes; size_t n_bytes;
char *bytes; char *bytes;
int r; int r;
r = toku_serialize_brtnode_to_memory(node, 1, 1, &n_bytes, &bytes); r = toku_serialize_brtnode_to_memory(node, &n_bytes, &bytes);
if (r) { if (r) {
result = r; result = r;
} else { } else {
...@@ -3162,7 +3164,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -3162,7 +3164,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
for (int i=0; i<n_children-1; i++) { for (int i=0; i<n_children-1; i++) {
toku_free(pivots[i].data); toku_free(pivots[i].data);
toku_free(node->u.n.childkeys[i]); toku_free(node->childkeys[i]);
} }
for (int i=0; i<n_children; i++) { for (int i=0; i<n_children; i++) {
if (node->u.n.childinfos[i].buffer) { if (node->u.n.childinfos[i].buffer) {
...@@ -3172,7 +3174,8 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -3172,7 +3174,8 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
} }
toku_free(pivots); toku_free(pivots);
toku_free(node->u.n.childinfos); toku_free(node->u.n.childinfos);
toku_free(node->u.n.childkeys); toku_free(node->childkeys);
toku_free(node->subtree_estimates);
toku_free(node); toku_free(node);
toku_free(subtree_info); toku_free(subtree_info);
......
...@@ -20,6 +20,8 @@ extern "C" { ...@@ -20,6 +20,8 @@ extern "C" {
typedef struct brt *BRT; typedef struct brt *BRT;
typedef struct brtnode *BRTNODE; typedef struct brtnode *BRTNODE;
typedef struct brtnode_leaf_basement_node *BASEMENTNODE;
typedef struct subtree_estimates *SUBTREE_EST;
struct brt_header; struct brt_header;
struct wbuf; struct wbuf;
struct dbuf; struct dbuf;
...@@ -43,11 +45,19 @@ typedef struct { ...@@ -43,11 +45,19 @@ typedef struct {
char *data; char *data;
} BYTESTRING; } BYTESTRING;
/* Make the LSN be a struct instead of an integer so that we get better type checking. */ /* Log Sequence Number (LSN)
* Make the LSN be a struct instead of an integer so that we get better type checking. */
typedef struct __toku_lsn { u_int64_t lsn; } LSN; typedef struct __toku_lsn { u_int64_t lsn; } LSN;
#define ZERO_LSN ((LSN){0}) #define ZERO_LSN ((LSN){0})
#define MAX_LSN ((LSN){UINT64_MAX}) #define MAX_LSN ((LSN){UINT64_MAX})
/* Message Sequence Number (MSN)
* Make the MSN be a struct instead of an integer so that we get better type checking. */
typedef struct __toku_msn { u_int64_t msn; } MSN;
#define ZERO_MSN ((MSN){0}) // dummy used for message construction, to be filled in when msg is applied to tree
#define MIN_MSN ((MSN){(u_int64_t)1<<32}) // first 2**32 values reserved for messages created before Dr. No (for upgrade)
#define MAX_MSN ((MSN){UINT64_MAX})
/* At the brt layer, a FILENUM uniquely identifies an open file. /* At the brt layer, a FILENUM uniquely identifies an open file.
* At the ydb layer, a DICTIONARY_ID uniquely identifies an open dictionary. * At the ydb layer, a DICTIONARY_ID uniquely identifies an open dictionary.
* With the introduction of the loader (ticket 2216), it is possible for the file that holds * With the introduction of the loader (ticket 2216), it is possible for the file that holds
...@@ -112,6 +122,7 @@ typedef struct fifo_msg_t *FIFO_MSG; ...@@ -112,6 +122,7 @@ typedef struct fifo_msg_t *FIFO_MSG;
/* tree commands */ /* tree commands */
struct brt_msg { struct brt_msg {
enum brt_msg_type type; enum brt_msg_type type;
MSN msn; // message sequence number
XIDS xids; XIDS xids;
union { union {
/* insert or delete */ /* insert or delete */
......
This diff is collapsed.
...@@ -9,7 +9,6 @@ ...@@ -9,7 +9,6 @@
#include <fcntl.h> #include <fcntl.h>
#include "brttypes.h" #include "brttypes.h"
#include "workqueue.h" #include "workqueue.h"
#include "leaflock.h"
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" { extern "C" {
...@@ -124,6 +123,8 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, void ...@@ -124,6 +123,8 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, void
// Can access fd (fd is protected by a readlock during call) // Can access fd (fd is protected by a readlock during call)
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, int *dirtyp, void *extraargs); typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, int *dirtyp, void *extraargs);
typedef int (*CACHETABLE_PARTIAL_EVICTION_CALLBACK)(void *brtnode_pv, long bytes_to_free, long* bytes_freed, void *extraargs);
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
int (*log_fassociate_during_checkpoint)(CACHEFILE, void*), int (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*), int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
...@@ -153,7 +154,10 @@ CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf); ...@@ -153,7 +154,10 @@ CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf);
int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
void *value, long size, void *value, long size,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs); CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *extraargs
);
// Get and pin a memory object. // Get and pin a memory object.
// Effects: If the memory object is in the cachetable, acquire a read lock on it. // Effects: If the memory object is in the cachetable, acquire a read lock on it.
...@@ -163,7 +167,9 @@ int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -163,7 +167,9 @@ int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/,
void **/*value*/, long *sizep, void **/*value*/, long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs); CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *extraargs);
typedef struct unlockers *UNLOCKERS; typedef struct unlockers *UNLOCKERS;
struct unlockers { struct unlockers {
...@@ -178,19 +184,24 @@ struct unlockers { ...@@ -178,19 +184,24 @@ struct unlockers {
// and return TOKU_DB_TRYAGAIN. // and return TOKU_DB_TRYAGAIN.
int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep, int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs, CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *extraargs,
UNLOCKERS unlockers); UNLOCKERS unlockers);
#define CAN_RELEASE_LOCK_DURING_IO #define CAN_RELEASE_LOCK_DURING_IO
// Maybe get and pin a memory object. int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**);
// Effects: This function is identical to the get_and_pin function except that it // Effect: Maybe get and pin a memory object.
// will not attempt to fetch a memory object that is not in the cachetable. // This function is similar to the get_and_pin function except that it
// will not attempt to fetch a memory object that is not in the cachetable or requires any kind of blocking to get it.
// Returns: If the the item is already in memory, then return 0 and store it in the // Returns: If the the item is already in memory, then return 0 and store it in the
// void**. If the item is not in memory, then return a nonzero error number. // void**. If the item is not in memory, then return a nonzero error number.
int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**);
// Like maybe get and pin, but may pin a clean pair. int toku_cachetable_get_and_pin_if_in_memory (CACHEFILE /*cachefile*/, CACHEKEY /*key*/, u_int32_t /*fullhash*/, void**/*value*/);
int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**); // Effect: Get and pin an object if it is in memory, (even if doing so would require blocking, e.g., to wait on a checkpoint).
// This is similar to maybe_get_and_pin except that maybe_get_and_pin won't block waiting on a checkpoint.
// Returns: 0 iff the item is in memory (otherwise return a error)
// Modifies: *value (if returning 0, then the pointer to the value is stored in *value.
// cachetable pair clean or dirty WRT external memory // cachetable pair clean or dirty WRT external memory
enum cachetable_dirty { enum cachetable_dirty {
...@@ -200,7 +211,7 @@ enum cachetable_dirty { ...@@ -200,7 +211,7 @@ enum cachetable_dirty {
int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetable_dirty dirty, long size); int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetable_dirty dirty, long size);
// Effect: Unpin a memory object // Effect: Unpin a memory object
// Effects: If the memory object is in the cachetable, then OR the dirty flag, // Modifies: If the memory object is in the cachetable, then OR the dirty flag,
// update the size, and release the read lock on the memory object. // update the size, and release the read lock on the memory object.
// Returns: 0 if success, otherwise returns an error number. // Returns: 0 if success, otherwise returns an error number.
// Requires: The ct is locked. // Requires: The ct is locked.
...@@ -216,6 +227,7 @@ int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing somethin ...@@ -216,6 +227,7 @@ int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing somethin
int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *extraargs); void *extraargs);
// Effect: Prefetch a memory object for a given key into the cachetable // Effect: Prefetch a memory object for a given key into the cachetable
// Precondition: The cachetable mutex is NOT held. // Precondition: The cachetable mutex is NOT held.
...@@ -349,6 +361,7 @@ typedef struct cachetable_status { ...@@ -349,6 +361,7 @@ typedef struct cachetable_status {
u_int64_t prefetches; // how many times has a block been prefetched into the cachetable? u_int64_t prefetches; // how many times has a block been prefetched into the cachetable?
u_int64_t maybe_get_and_pins; // how many times has maybe_get_and_pin(_clean) been called? u_int64_t maybe_get_and_pins; // how many times has maybe_get_and_pin(_clean) been called?
u_int64_t maybe_get_and_pin_hits; // how many times has maybe_get_and_pin(_clean) returned with a node? u_int64_t maybe_get_and_pin_hits; // how many times has maybe_get_and_pin(_clean) returned with a node?
u_int64_t get_and_pin_if_in_memorys; // how many times has get_and_pin_if_in_memory been called?
int64_t size_current; // the sum of the sizes of the nodes represented in the cachetable int64_t size_current; // the sum of the sizes of the nodes represented in the cachetable
int64_t size_limit; // the limit to the sum of the node sizes int64_t size_limit; // the limit to the sum of the node sizes
int64_t size_writing; // the sum of the sizes of the nodes being written int64_t size_writing; // the sum of the sizes of the nodes being written
...@@ -360,8 +373,6 @@ typedef struct cachetable_status { ...@@ -360,8 +373,6 @@ typedef struct cachetable_status {
void toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS s); void toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS s);
LEAFLOCK_POOL toku_cachefile_leaflock_pool(CACHEFILE cf);
void toku_cachetable_set_env_dir(CACHETABLE ct, const char *env_dir); void toku_cachetable_set_env_dir(CACHETABLE ct, const char *env_dir);
char * toku_construct_full_name(int count, ...); char * toku_construct_full_name(int count, ...);
char * toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env); char * toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env);
......
...@@ -69,7 +69,7 @@ void toku_fifo_size_hint(FIFO fifo, size_t size) { ...@@ -69,7 +69,7 @@ void toku_fifo_size_hint(FIFO fifo, size_t size) {
} }
} }
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, XIDS xids) { int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, MSN msn, XIDS xids) {
int need_space_here = sizeof(struct fifo_entry) int need_space_here = sizeof(struct fifo_entry)
+ keylen + datalen + keylen + datalen
+ xids_get_size(xids) + xids_get_size(xids)
...@@ -101,6 +101,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d ...@@ -101,6 +101,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
} }
struct fifo_entry *entry = (struct fifo_entry *)(fifo->memory + fifo->memory_start + fifo->memory_used); struct fifo_entry *entry = (struct fifo_entry *)(fifo->memory + fifo->memory_start + fifo->memory_used);
entry->type = (unsigned char)type; entry->type = (unsigned char)type;
entry->msn = msn;
xids_cpy(&entry->xids_s, xids); xids_cpy(&entry->xids_s, xids);
entry->keylen = keylen; entry->keylen = keylen;
unsigned char *e_key = xids_get_end_of_array(&entry->xids_s); unsigned char *e_key = xids_get_end_of_array(&entry->xids_s);
...@@ -113,11 +114,11 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d ...@@ -113,11 +114,11 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
} }
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd) { int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd) {
return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->xids); return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->msn, cmd->xids);
} }
/* peek at the head (the oldest entry) of the fifo */ /* peek at the head (the oldest entry) of the fifo */
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, u_int32_t *type, XIDS *xids) { int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, u_int32_t *type, MSN *msn, XIDS *xids) {
struct fifo_entry *entry = fifo_peek(fifo); struct fifo_entry *entry = fifo_peek(fifo);
if (entry == 0) return -1; if (entry == 0) return -1;
unsigned char *e_key = xids_get_end_of_array(&entry->xids_s); unsigned char *e_key = xids_get_end_of_array(&entry->xids_s);
...@@ -126,6 +127,7 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, ...@@ -126,6 +127,7 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data,
*data = e_key + entry->keylen; *data = e_key + entry->keylen;
*datalen = entry->vallen; *datalen = entry->vallen;
*type = entry->type; *type = entry->type;
*msn = entry->msn;
*xids = &entry->xids_s; *xids = &entry->xids_s;
return 0; return 0;
} }
...@@ -168,10 +170,10 @@ struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off) { ...@@ -168,10 +170,10 @@ struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off) {
return (struct fifo_entry *)(fifo->memory + off); return (struct fifo_entry *)(fifo->memory + off);
} }
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, XIDS xids, void*), void *arg) { void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, MSN msn, XIDS xids, void*), void *arg) {
FIFO_ITERATE(fifo, FIFO_ITERATE(fifo,
key, keylen, data, datalen, type, xids, key, keylen, data, datalen, type, msn, xids,
f(key,keylen,data,datalen,type,xids, arg)); f(key,keylen,data,datalen,type,msn,xids, arg));
} }
unsigned long toku_fifo_memory_size(FIFO fifo) { unsigned long toku_fifo_memory_size(FIFO fifo) {
......
...@@ -21,6 +21,7 @@ struct __attribute__((__packed__)) fifo_entry { ...@@ -21,6 +21,7 @@ struct __attribute__((__packed__)) fifo_entry {
unsigned int keylen; unsigned int keylen;
unsigned int vallen; unsigned int vallen;
unsigned char type; unsigned char type;
MSN msn;
XIDS_S xids_s; XIDS_S xids_s;
}; };
...@@ -42,9 +43,9 @@ int toku_fifo_n_entries(FIFO); ...@@ -42,9 +43,9 @@ int toku_fifo_n_entries(FIFO);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd); int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, XIDS xids); int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, MSN msn, XIDS xids);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, XIDS *xids); int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, MSN *msn, XIDS *xids);
// int toku_fifo_peek_cmdstruct (FIFO, BRT_MSG, DBT*, DBT*); // fill in the BRT_MSG, using the two DBTs for the DBT part. // int toku_fifo_peek_cmdstruct (FIFO, BRT_MSG, DBT*, DBT*); // fill in the BRT_MSG, using the two DBTs for the DBT part.
int toku_fifo_deq(FIFO); int toku_fifo_deq(FIFO);
...@@ -54,9 +55,9 @@ unsigned long toku_fifo_memory_size(FIFO); // return how much memory the fifo us ...@@ -54,9 +55,9 @@ unsigned long toku_fifo_memory_size(FIFO); // return how much memory the fifo us
//These two are problematic, since I don't want to malloc() the bytevecs, but dequeueing the fifo frees the memory. //These two are problematic, since I don't want to malloc() the bytevecs, but dequeueing the fifo frees the memory.
//int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid); //int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid);
//int toku_fifo_peek_deq_cmdstruct (FIFO, BRT_MSG, DBT*, DBT*); // fill in the BRT_MSG, using the two DBTs for the DBT part. //int toku_fifo_peek_deq_cmdstruct (FIFO, BRT_MSG, DBT*, DBT*); // fill in the BRT_MSG, using the two DBTs for the DBT part.
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, XIDS xids, void*), void*); void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, MSN msn, XIDS xids, void*), void*);
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,xidsvar,body) do { \ #define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,msnvar,xidsvar,body) do { \
int fifo_iterate_off; \ int fifo_iterate_off; \
for (fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \ for (fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \
toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \ toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \
...@@ -65,6 +66,7 @@ void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,I ...@@ -65,6 +66,7 @@ void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,I
ITEMLEN keylenvar = e->keylen; \ ITEMLEN keylenvar = e->keylen; \
ITEMLEN datalenvar = e->vallen; \ ITEMLEN datalenvar = e->vallen; \
enum brt_msg_type typevar = (enum brt_msg_type)e->type; \ enum brt_msg_type typevar = (enum brt_msg_type)e->type; \
MSN msnvar = e->msn; \
XIDS xidsvar = &e->xids_s; \ XIDS xidsvar = &e->xids_s; \
bytevec keyvar = xids_get_end_of_array(xidsvar); \ bytevec keyvar = xids_get_end_of_array(xidsvar); \
bytevec datavar = (const u_int8_t*)keyvar + e->keylen; \ bytevec datavar = (const u_int8_t*)keyvar + e->keylen; \
......
...@@ -44,7 +44,6 @@ ...@@ -44,7 +44,6 @@
#include "log_header.h" #include "log_header.h"
#include "logcursor.h" #include "logcursor.h"
#include "logfilemgr.h" #include "logfilemgr.h"
#include "mempool.h"
#include "rbuf.h" #include "rbuf.h"
#include "threadpool.h" #include "threadpool.h"
#include "toku_assert.h" #include "toku_assert.h"
......
...@@ -67,6 +67,12 @@ static inline unsigned int kv_pair_keylen(const struct kv_pair *pair) { ...@@ -67,6 +67,12 @@ static inline unsigned int kv_pair_keylen(const struct kv_pair *pair) {
return pair->keylen; return pair->keylen;
} }
static inline DBT kv_pair_key_to_dbt (const struct kv_pair *pair) {
const DBT d = {.data = (void*)kv_pair_key_const(pair),
.size = kv_pair_keylen(pair)};
return d;
}
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
}; };
#endif #endif
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
#include "toku_pthread.h"
#include "leaflock.h"
#include "toku_assert.h"
#include "memory.h"
#include <errno.h>
#include <string.h>
// See ticket 1423.
//
// Purpose of this file is to manage a pool of locks that are used
// to lock brt leaf nodes and the cursors that point to them.
// Each lock protects one set of cursors, the cursors that point
// to a single brt leaf node. (Actually, the cursors point to
// a leaf node's omt.)
// Because the cursors that point to a brt leaf node are organized
// in a linked list (whose head is in the brt leaf node), the
// operations listed below are not threadsafe.
//
// It is necessary to hold the lock around following operations:
// - associate cursor with brt block (brt_cursor_update())
// [ puts cursor on linked list ]
// - invalidate cursor (done only by search path)
// [ removes cursor from linked list ]
// - invalidate all cursors associated with a brt block
// (done only by (a) writer thread or (b) insert/delete,
// at least until we have brt-node-level locks)
// [ removes all cursors from linked list ]
//
// When a leaf is created, it borrows ownership of a leaflock.
// The leaf has a reference to the leaflock.
//
// When a leaf is evicted, ownership of the leaflock returns to the
// pool of available leaflocks.
//
// The reason an unused leaflock (which is no longer associated with
// any particular leaf node) is kept in a pool (rather than destroyed)
// is that some client thread may be waiting on the lock or about to
// request the lock.
//
// The brt leaf node has a reference to the leaflock, and there is
// a reference to the leaflock in every cursor that references the
// brt leaf node.
//
struct leaflock {
toku_pthread_mutex_t lock;
LEAFLOCK next;
int id;
};
struct leaflock_pool {
LEAFLOCK free_list;
toku_pthread_mutex_t pool_mutex;
u_int32_t numlocks; // how many locks ever created?
u_int32_t pool_high_water_mark; // max number of locks ever in pool
u_int32_t num_in_pool; // number of locks currently in pool
};
static void
leaflock_pool_lock(LEAFLOCK_POOL pool) {
int r = toku_pthread_mutex_lock(&pool->pool_mutex); assert(r==0);
}
static void
leaflock_pool_unlock(LEAFLOCK_POOL pool) {
int r = toku_pthread_mutex_unlock(&pool->pool_mutex); assert(r==0);
}
int
toku_leaflock_create(LEAFLOCK_POOL* pool) {
int r;
LEAFLOCK_POOL XCALLOC(result);
if (!result) r = ENOMEM;
else {
r = toku_pthread_mutex_init(&result->pool_mutex, NULL);
assert(r == 0);
*pool = result;
}
return r;
}
int
toku_leaflock_destroy(LEAFLOCK_POOL* pool_p) {
LEAFLOCK_POOL pool = *pool_p;
*pool_p = NULL;
leaflock_pool_lock(pool);
int r;
assert(pool->num_in_pool==pool->numlocks);
while (pool->free_list) {
LEAFLOCK to_free = pool->free_list;
pool->free_list = pool->free_list->next;
r = toku_pthread_mutex_destroy(&to_free->lock); assert(r==0);
toku_free(to_free);
}
leaflock_pool_unlock(pool);
r = toku_pthread_mutex_destroy(&pool->pool_mutex); assert(r == 0);
toku_free(pool);
return r;
}
int
toku_leaflock_borrow(LEAFLOCK_POOL pool, LEAFLOCK *leaflockp) {
leaflock_pool_lock(pool);
LEAFLOCK loaner;
int r;
if (pool->free_list) {
assert(pool->num_in_pool>0);
pool->num_in_pool--;
loaner = pool->free_list;
pool->free_list = pool->free_list->next;
r = 0;
}
else {
pool->numlocks++;
//Create one
CALLOC(loaner);
if (loaner==NULL) r = ENOMEM;
else {
loaner->id = pool->numlocks;
r = toku_pthread_mutex_init(&loaner->lock, NULL); assert(r==0);
}
}
if (r==0) {
loaner->next = NULL;
*leaflockp = loaner;
}
leaflock_pool_unlock(pool);
return r;
}
//Caller of this function must be holding the lock being returned.
void
toku_leaflock_unlock_and_return(LEAFLOCK_POOL pool, LEAFLOCK *leaflockp) {
leaflock_pool_lock(pool);
LEAFLOCK loaner = *leaflockp;
*leaflockp = NULL; //Take away caller's reference for good hygiene.
toku_leaflock_unlock_by_leaf(loaner);
pool->num_in_pool++;
if (pool->num_in_pool > pool->pool_high_water_mark)
pool->pool_high_water_mark = pool->num_in_pool;
assert (pool->num_in_pool <= pool->numlocks);
loaner->next = pool->free_list;
pool->free_list = loaner;
leaflock_pool_unlock(pool);
}
void
toku_leaflock_lock_by_leaf(LEAFLOCK leaflock) {
assert(leaflock->next==NULL);
int r = toku_pthread_mutex_lock(&leaflock->lock); assert(r==0);
}
void
toku_leaflock_unlock_by_leaf(LEAFLOCK leaflock) {
assert(leaflock->next==NULL);
int r = toku_pthread_mutex_unlock(&leaflock->lock); assert(r==0);
}
void
toku_leaflock_lock_by_cursor(LEAFLOCK leaflock) {
int r = toku_pthread_mutex_lock(&leaflock->lock); assert(r==0);
}
void
toku_leaflock_unlock_by_cursor(LEAFLOCK leaflock) {
int r = toku_pthread_mutex_unlock(&leaflock->lock); assert(r==0);
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef TOKU_LEAFLOCK_H
#define TOKU_LEAFLOCK_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
typedef struct leaflock *LEAFLOCK;
typedef struct leaflock_pool *LEAFLOCK_POOL;
int toku_leaflock_create(LEAFLOCK_POOL* pool);
int toku_leaflock_destroy(LEAFLOCK_POOL* pool);
int toku_leaflock_borrow(LEAFLOCK_POOL pool, LEAFLOCK *leaflockp);
void toku_leaflock_unlock_and_return(LEAFLOCK_POOL pool, LEAFLOCK *leaflockp);
void toku_leaflock_lock_by_leaf(LEAFLOCK leaflock);
void toku_leaflock_unlock_by_leaf(LEAFLOCK leaflock);
void toku_leaflock_lock_by_cursor(LEAFLOCK leaflock);
void toku_leaflock_unlock_by_cursor(LEAFLOCK leaflock);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
...@@ -177,12 +177,8 @@ toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, BOOL create) ...@@ -177,12 +177,8 @@ toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, BOOL create)
//Must have no data blocks (rollback logs or otherwise). //Must have no data blocks (rollback logs or otherwise).
toku_block_verify_no_data_blocks_except_root_unlocked(t->h->blocktable, t->h->root); toku_block_verify_no_data_blocks_except_root_unlocked(t->h->blocktable, t->h->root);
toku_brtheader_unlock(t->h); toku_brtheader_unlock(t->h);
BOOL try_again = TRUE;;
BOOL is_empty; BOOL is_empty;
while (try_again) { is_empty = toku_brt_is_empty_fast(t);
try_again = FALSE;
is_empty = toku_brt_is_empty(t, &try_again);
}
assert(is_empty); assert(is_empty);
return r; return r;
} }
...@@ -219,12 +215,8 @@ toku_logger_close_rollback(TOKULOGGER logger, BOOL recovery_failed) { ...@@ -219,12 +215,8 @@ toku_logger_close_rollback(TOKULOGGER logger, BOOL recovery_failed) {
{ {
// This almost doesn't work. If there were anything in there, then the header might get dirtied by // This almost doesn't work. If there were anything in there, then the header might get dirtied by
// toku_brt_is_empty(). But it turns out absolutely nothing is in there, so it's OK to assert that it's empty. // toku_brt_is_empty(). But it turns out absolutely nothing is in there, so it's OK to assert that it's empty.
BOOL try_again = TRUE;
BOOL is_empty; BOOL is_empty;
while (try_again) { is_empty = toku_brt_is_empty_fast(brt_to_close);
try_again = FALSE;
is_empty = toku_brt_is_empty(brt_to_close, &try_again);
}
assert(is_empty); assert(is_empty);
} }
assert(!h->dirty); // it should not have been dirtied by the toku_brt_is_empty test. assert(!h->dirty); // it should not have been dirtied by the toku_brt_is_empty test.
......
...@@ -6,9 +6,6 @@ ...@@ -6,9 +6,6 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* We have too many memory management tricks: /* We have too many memory management tricks:
* mempool for a collection of objects that are all allocated together.
* It's pretty rigid about what happens when you run out of memory.
* There's a callback to compress data.
* memarena (this code) is for a collection of objects that cannot be moved. * memarena (this code) is for a collection of objects that cannot be moved.
* The pattern is allocate more and more stuff. * The pattern is allocate more and more stuff.
* Don't free items as you go. * Don't free items as you go.
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
void toku_mempool_init(struct mempool *mp, void *base, size_t size) {
// printf("mempool_init %p %p %d\n", mp, base, size);
assert(base != 0 && size<(1U<<31)); // used to be assert(size >= 0), but changed to size_t so now let's make sure it's not more than 2GB...
mp->base = base;
mp->size = size;
mp->free_offset = 0;
mp->frag_size = 0;
}
void toku_mempool_fini(struct mempool *mp __attribute__((unused))) {
// printf("mempool_fini %p %p %d %d\n", mp, mp->base, mp->size, mp->frag_size);
}
void *toku_mempool_get_base(struct mempool *mp) {
return mp->base;
}
size_t toku_mempool_get_size(struct mempool *mp) {
return mp->size;
}
size_t toku_mempool_get_frag_size(struct mempool *mp) {
return mp->frag_size;
}
void *toku_mempool_malloc(struct mempool *mp, size_t size, int alignment) {
assert(mp->free_offset <= mp->size);
void *vp;
size_t offset = (mp->free_offset + (alignment-1)) & ~(alignment-1);
//printf("mempool_malloc size=%ld base=%p free_offset=%ld mp->size=%ld offset=%ld\n", size, mp->base, mp->free_offset, mp->size, offset);
if (offset + size > mp->size) {
vp = 0;
} else {
vp = (char *)mp->base + offset;
mp->free_offset = offset + size;
}
assert(mp->free_offset <= mp->size);
assert(((long)vp & (alignment-1)) == 0);
assert(vp == 0 || toku_mempool_inrange(mp, vp, size));
//printf("mempool returning %p\n", vp);
return vp;
}
// if vp is null then we are freeing something, but not specifying what. The data won't be freed until compression is done.
void toku_mempool_mfree(struct mempool *mp, void *vp, size_t size) {
if (vp) assert(toku_mempool_inrange(mp, vp, size));
mp->frag_size += size;
assert(mp->frag_size <= mp->size);
}
#ifndef _TOKU_MEMPOOL_H
#define _TOKU_MEMPOOL_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* a memory pool is a contiguous region of memory that supports single
allocations from the pool. these allocated regions are never recycled.
when the memory pool no longer has free space, the allocated chunks
must be relocated by the application to a new memory pool. */
#include <sys/types.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
struct mempool;
struct mempool {
void *base; /* the base address of the memory */
size_t free_offset; /* the offset of the memory pool free space */
size_t size; /* the size of the memory */
size_t frag_size; /* the size of the fragmented memory */
};
/* initialize the memory pool with the base address and size of a
contiguous chunk of memory */
void toku_mempool_init(struct mempool *mp, void *base, size_t size);
/* finalize the memory pool */
void toku_mempool_fini(struct mempool *mp);
/* get the base address of the memory pool */
void *toku_mempool_get_base(struct mempool *mp);
/* get the size of the memory pool */
size_t toku_mempool_get_size(struct mempool *mp);
/* get the amount of fragmented space in the memory pool */
size_t toku_mempool_get_frag_size(struct mempool *mp);
/* allocate a chunk of memory from the memory pool suitably aligned */
void *toku_mempool_malloc(struct mempool *mp, size_t size, int alignment);
/* free a previously allocated chunk of memory. the free only updates
a count of the amount of free space in the memory pool. the memory
pool does not keep track of the locations of the free chunks */
void toku_mempool_mfree(struct mempool *mp, void *vp, size_t size);
/* verify that a memory range is contained within a mempool */
static inline int toku_mempool_inrange(struct mempool *mp, void *vp, size_t size) {
return (mp->base <= vp) && ((char *)vp + size <= (char *)mp->base + mp->size);
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
...@@ -255,7 +255,7 @@ static int omt_convert_to_tree(OMT omt) { ...@@ -255,7 +255,7 @@ static int omt_convert_to_tree(OMT omt) {
omt->is_array = FALSE; omt->is_array = FALSE;
omt->i.t.nodes = new_nodes; omt->i.t.nodes = new_nodes;
omt->capacity = new_size; omt->capacity = new_size;
omt->i.t.free_idx = 0; /* Allocating from mempool starts over. */ omt->i.t.free_idx = 0;
omt->i.t.root = NODE_NULL; omt->i.t.root = NODE_NULL;
rebuild_from_sorted_array(omt, &omt->i.t.root, tmp_values, num_nodes); rebuild_from_sorted_array(omt, &omt->i.t.root, tmp_values, num_nodes);
toku_free(values); toku_free(values);
...@@ -787,6 +787,20 @@ int toku_omt_fetch(OMT V, u_int32_t i, OMTVALUE *v, OMTCURSOR c) { ...@@ -787,6 +787,20 @@ int toku_omt_fetch(OMT V, u_int32_t i, OMTVALUE *v, OMTCURSOR c) {
return 0; return 0;
} }
static int
free_item (OMTVALUE lev, u_int32_t UU(idx), void *vsi) {
assert(vsi == NULL);
toku_free(lev);
return 0;
}
void toku_omt_free_items(OMT omt) {
invalidate_cursors(omt);
int r = toku_omt_iterate(omt, free_item, NULL);
lazy_assert_zero(r);
}
int toku_omt_iterate(OMT omt, int (*f)(OMTVALUE, u_int32_t, void*), void*v) { int toku_omt_iterate(OMT omt, int (*f)(OMTVALUE, u_int32_t, void*), void*v) {
if (omt->is_array) { if (omt->is_array) {
return iterate_internal_array(omt, 0, omt_size(omt), f, v); return iterate_internal_array(omt, 0, omt_size(omt), f, v);
......
...@@ -232,6 +232,13 @@ int toku_omt_iterate_on_range(OMT omt, u_int32_t left, u_int32_t right, int (*f) ...@@ -232,6 +232,13 @@ int toku_omt_iterate_on_range(OMT omt, u_int32_t left, u_int32_t right, int (*f)
// Performance: time=O(i+\log N) where i is the number of times f is called, and N is the number of elements in omt. // Performance: time=O(i+\log N) where i is the number of times f is called, and N is the number of elements in omt.
// Rational: Although the functional iterator requires defining another function (as opposed to C++ style iterator), it is much easier to read. // Rational: Although the functional iterator requires defining another function (as opposed to C++ style iterator), it is much easier to read.
void toku_omt_free_items(OMT omt);
// Effect: Iterate over the values of the omt, from left to right, freeing each value with toku_free
// Requires: all items in OMT to have been malloced with toku_malloc
// Rational: This function was added due to a problem encountered in brt.c. We needed to free the elements and then
// destroy the OMT. However, destroying the OMT requires invalidating cursors. This cannot be done if the values of the OMT
// have been already freed. So, this function is written to invalidate cursors and free items.
int toku_omt_iterate(OMT omt, int (*f)(OMTVALUE, u_int32_t, void*), void*v); int toku_omt_iterate(OMT omt, int (*f)(OMTVALUE, u_int32_t, void*), void*v);
// Effect: Iterate over the values of the omt, from left to right, calling f on each value. // Effect: Iterate over the values of the omt, from left to right, calling f on each value.
// The second argument passed to f is the index of the value. // The second argument passed to f is the index of the value.
......
...@@ -105,6 +105,11 @@ static inline LSN rbuf_lsn (struct rbuf *r) { ...@@ -105,6 +105,11 @@ static inline LSN rbuf_lsn (struct rbuf *r) {
return lsn; return lsn;
} }
static inline MSN rbuf_msn (struct rbuf *r) {
MSN msn = {rbuf_ulonglong(r)};
return msn;
}
static inline BLOCKNUM rbuf_blocknum (struct rbuf *r) { static inline BLOCKNUM rbuf_blocknum (struct rbuf *r) {
BLOCKNUM result = make_blocknum(rbuf_longlong(r)); BLOCKNUM result = make_blocknum(rbuf_longlong(r));
return result; return result;
......
...@@ -164,7 +164,7 @@ static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key ...@@ -164,7 +164,7 @@ static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key
DBT key_dbt,data_dbt; DBT key_dbt,data_dbt;
XIDS xids = toku_txn_get_xids(txn); XIDS xids = toku_txn_get_xids(txn);
BRT_MSG_S brtcmd = { type, xids, BRT_MSG_S brtcmd = { type, ZERO_MSN, xids,
.u.id={(key.len > 0) .u.id={(key.len > 0)
? toku_fill_dbt(&key_dbt, key.data, key.len) ? toku_fill_dbt(&key_dbt, key.data, key.len)
: toku_init_dbt(&key_dbt), : toku_init_dbt(&key_dbt),
......
...@@ -510,6 +510,22 @@ static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM l ...@@ -510,6 +510,22 @@ static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM l
return r; return r;
} }
// callback for partially evicting a cachetable entry
static int toku_rollback_pe_callback (
void *rollback_v,
long bytes_to_free,
long* bytes_freed,
void* UU(extraargs)
)
{
assert(bytes_to_free > 0);
assert(rollback_v != NULL);
*bytes_freed = 0;
return 0;
}
static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t older_hash, ROLLBACK_LOG_NODE *result) { static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t older_hash, ROLLBACK_LOG_NODE *result) {
ROLLBACK_LOG_NODE MALLOC(log); ROLLBACK_LOG_NODE MALLOC(log);
assert(log); assert(log);
...@@ -536,7 +552,9 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o ...@@ -536,7 +552,9 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o
*result = log; *result = log;
r=toku_cachetable_put(cf, log->thislogname, log->thishash, r=toku_cachetable_put(cf, log->thislogname, log->thishash,
log, rollback_memory_size(log), log, rollback_memory_size(log),
toku_rollback_flush_callback, toku_rollback_fetch_callback, toku_rollback_flush_callback,
toku_rollback_fetch_callback,
toku_rollback_pe_callback,
h); h);
assert(r==0); assert(r==0);
txn->current_rollback = log->thislogname; txn->current_rollback = log->thislogname;
...@@ -751,6 +769,7 @@ toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) { ...@@ -751,6 +769,7 @@ toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
r = toku_cachefile_prefetch(cf, name, hash, r = toku_cachefile_prefetch(cf, name, hash,
toku_rollback_flush_callback, toku_rollback_flush_callback,
toku_rollback_fetch_callback, toku_rollback_fetch_callback,
toku_rollback_pe_callback,
h); h);
assert(r==0); assert(r==0);
} }
...@@ -774,7 +793,9 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO ...@@ -774,7 +793,9 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO
struct brt_header *h = toku_cachefile_get_userdata(cf); struct brt_header *h = toku_cachefile_get_userdata(cf);
r = toku_cachetable_get_and_pin(cf, name, hash, r = toku_cachetable_get_and_pin(cf, name, hash,
&log_v, NULL, &log_v, NULL,
toku_rollback_flush_callback, toku_rollback_fetch_callback, toku_rollback_flush_callback,
toku_rollback_fetch_callback,
toku_rollback_pe_callback,
h); h);
assert(r==0); assert(r==0);
log = (ROLLBACK_LOG_NODE)log_v; log = (ROLLBACK_LOG_NODE)log_v;
......
...@@ -32,6 +32,11 @@ sub_block_header_size(int n_sub_blocks) { ...@@ -32,6 +32,11 @@ sub_block_header_size(int n_sub_blocks) {
return sizeof (u_int32_t) + n_sub_blocks * sizeof (struct stored_sub_block); return sizeof (u_int32_t) + n_sub_blocks * sizeof (struct stored_sub_block);
} }
void
set_compressed_size_bound(struct sub_block *se) {
se->compressed_size_bound = compressBound(se->uncompressed_size);
}
// get the sum of the sub block compressed sizes // get the sum of the sub block compressed sizes
size_t size_t
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]) { get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]) {
...@@ -88,6 +93,19 @@ choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_siz ...@@ -88,6 +93,19 @@ choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_siz
return 0; return 0;
} }
// Choose the right size of basement nodes. For now, just align up to
// 256k blocks and hope it compresses well enough.
int
choose_basement_node_size(int total_size, int *sub_block_size_ret, int *n_sub_blocks_ret) {
if (total_size < 0)
return EINVAL;
*n_sub_blocks_ret = (total_size + max_basement_node_uncompressed_size - 1) / max_basement_node_uncompressed_size;
*sub_block_size_ret = max_basement_node_uncompressed_size;
return 0;
}
void void
set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]) { set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]) {
int size_left = total_size; int size_left = total_size;
...@@ -133,19 +151,37 @@ void toku_set_default_compression_method (enum toku_compression_method a) { ...@@ -133,19 +151,37 @@ void toku_set_default_compression_method (enum toku_compression_method a) {
assert(0); assert(0);
} }
//
void // takes the uncompressed contents of sub_block
compress_sub_block(struct sub_block *sub_block) { // and compresses them into sb_compressed_ptr
// cs_bound is the compressed size bound
// Returns the size of the compressed data
//
u_int32_t
compress_nocrc_sub_block(
struct sub_block *sub_block,
void* sb_compressed_ptr,
u_int32_t cs_bound
)
{
// compress it // compress it
Bytef *uncompressed_ptr = (Bytef *) sub_block->uncompressed_ptr; Bytef *uncompressed_ptr = (Bytef *) sub_block->uncompressed_ptr;
Bytef *compressed_ptr = (Bytef *) sub_block->compressed_ptr; Bytef *compressed_ptr = (Bytef *) sb_compressed_ptr;
uLongf uncompressed_len = sub_block->uncompressed_size; uLongf uncompressed_len = sub_block->uncompressed_size;
uLongf real_compressed_len = sub_block->compressed_size_bound; uLongf real_compressed_len = cs_bound;
toku_compress(toku_compress_method, toku_compress(toku_compress_method,
compressed_ptr, &real_compressed_len, compressed_ptr, &real_compressed_len,
uncompressed_ptr, uncompressed_len); uncompressed_ptr, uncompressed_len);
sub_block->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size return real_compressed_len;
}
void
compress_sub_block(struct sub_block *sub_block) {
sub_block->compressed_size = compress_nocrc_sub_block(
sub_block,
sub_block->compressed_ptr,
sub_block->compressed_size_bound
);
// checksum it // checksum it
sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size); sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size);
} }
......
...@@ -16,6 +16,9 @@ void toku_set_default_compression_method (enum toku_compression_method a); ...@@ -16,6 +16,9 @@ void toku_set_default_compression_method (enum toku_compression_method a);
static const int max_sub_blocks = 8; static const int max_sub_blocks = 8;
static const int target_sub_block_size = 512*1024; static const int target_sub_block_size = 512*1024;
static const int max_basement_nodes = 32;
static const int max_basement_node_uncompressed_size = 256*1024;
static const int max_basement_node_compressed_size = 64*1024;
struct sub_block { struct sub_block {
void *uncompressed_ptr; void *uncompressed_ptr;
...@@ -41,6 +44,9 @@ sub_block_init(struct sub_block *sub_block); ...@@ -41,6 +44,9 @@ sub_block_init(struct sub_block *sub_block);
size_t size_t
sub_block_header_size(int n_sub_blocks); sub_block_header_size(int n_sub_blocks);
void
set_compressed_size_bound(struct sub_block *se);
// get the sum of the sub block compressed sizes // get the sum of the sub block compressed sizes
size_t size_t
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]); get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]);
...@@ -54,6 +60,9 @@ get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]); ...@@ -54,6 +60,9 @@ get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]);
int int
choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret); choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret);
int
choose_basement_node_size(int total_size, int *sub_block_size_ret, int *n_sub_blocks_ret);
void void
set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]); set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]);
...@@ -72,6 +81,13 @@ struct compress_work { ...@@ -72,6 +81,13 @@ struct compress_work {
void void
compress_work_init(struct compress_work *w, struct sub_block *sub_block); compress_work_init(struct compress_work *w, struct sub_block *sub_block);
u_int32_t
compress_nocrc_sub_block(
struct sub_block *sub_block,
void* sb_compressed_ptr,
u_int32_t cs_bound
);
void void
compress_sub_block(struct sub_block *sub_block); compress_sub_block(struct sub_block *sub_block);
......
This diff is collapsed.
...@@ -50,6 +50,7 @@ static void test5 (void) { ...@@ -50,6 +50,7 @@ static void test5 (void) {
} }
if (verbose) printf("\n"); if (verbose) printf("\n");
toku_free(values); toku_free(values);
r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
......
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
/* Do nothing */
printf("FLUSH: %d write_me %d\n", (int)k.b, w);
if (w) {
usleep (5*1024*1024);
}
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = 8;
return 0;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
*bytes_freed = 0;
return 0;
}
static void
cachetable_test (void) {
const int test_limit = 12;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
void* v2;
long s1, s2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, 8);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, NULL);
// usleep (2*1024*1024);
//r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 8);
//r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 8);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
...@@ -52,6 +52,19 @@ fetch (CACHEFILE UU(thiscf), int UU(fd), CACHEKEY UU(key), u_int32_t UU(fullhash ...@@ -52,6 +52,19 @@ fetch (CACHEFILE UU(thiscf), int UU(fd), CACHEKEY UU(key), u_int32_t UU(fullhash
return 0; return 0;
} }
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
*bytes_freed = 0;
return 0;
}
static void* static void*
do_update (void *UU(ignore)) do_update (void *UU(ignore))
{ {
...@@ -62,7 +75,7 @@ do_update (void *UU(ignore)) ...@@ -62,7 +75,7 @@ do_update (void *UU(ignore))
u_int32_t hi = toku_cachetable_hash(cf, key); u_int32_t hi = toku_cachetable_hash(cf, key);
void *vv; void *vv;
long size; long size;
int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, &size, flush, fetch, 0); int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, &size, flush, fetch, pe_callback, 0);
//printf("g"); //printf("g");
assert(r==0); assert(r==0);
assert(size==sizeof(int)); assert(size==sizeof(int));
...@@ -111,7 +124,7 @@ static void checkpoint_pending(void) { ...@@ -111,7 +124,7 @@ static void checkpoint_pending(void) {
CACHEKEY key = make_blocknum(i); CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(cf, key); u_int32_t hi = toku_cachetable_hash(cf, key);
values[i] = 42; values[i] = 42;
r = toku_cachetable_put(cf, key, hi, &values[i], sizeof(int), flush, fetch, 0); r = toku_cachetable_put(cf, key, hi, &values[i], sizeof(int), flush, fetch, pe_callback, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, item_size); r = toku_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, item_size);
......
...@@ -29,6 +29,19 @@ static int fetch(CACHEFILE cf, int UU(fd), CACHEKEY key, u_int32_t fullhash, voi ...@@ -29,6 +29,19 @@ static int fetch(CACHEFILE cf, int UU(fd), CACHEKEY key, u_int32_t fullhash, voi
return 0; return 0;
} }
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
*bytes_freed = 0;
return 0;
}
static int callback_was_called = 0; static int callback_was_called = 0;
static int callback2_was_called = 0; static int callback2_was_called = 0;
...@@ -69,7 +82,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) { ...@@ -69,7 +82,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
CACHEKEY key = make_blocknum(i); CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(f1, key); u_int32_t hi = toku_cachetable_hash(f1, key);
r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, fetch, 0); r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, fetch, pe_callback, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_unpin(f1, key, hi, dirty, item_size); r = toku_cachetable_unpin(f1, key, hi, dirty, item_size);
......
#include "includes.h"
#include "test.h"
int num_entries;
BOOL flush_may_occur;
int expected_flushed_key;
BOOL check_flush;
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
/* Do nothing */
if (check_flush && !keep) {
printf("FLUSH: %d write_me %d\n", (int)k.b, w);
assert(flush_may_occur);
assert(!w);
assert(expected_flushed_key == (int)k.b);
expected_flushed_key--;
}
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = 1;
return 0;
}
static int
big_fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = 4;
return 0;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
*bytes_freed = 0;
return 0;
}
static void
cachetable_test (void) {
const int test_limit = 4;
num_entries = 0;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
void* v2;
long s1, s2;
flush_may_occur = FALSE;
check_flush = TRUE;
for (int i = 0; i < 100000; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 1);
}
for (int i = 0; i < 8; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 1);
}
for (int i = 0; i < 4; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 1);
}
for (int i = 0; i < 2; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 1);
}
flush_may_occur = TRUE;
expected_flushed_key = 4;
r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, flush, big_fetch, pe_callback, NULL);
flush_may_occur = TRUE;
expected_flushed_key = 5;
r = toku_cachetable_unpin(f1, make_blocknum(5), 5, CACHETABLE_CLEAN, 4);
check_flush = FALSE;
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
#include "includes.h"
#include "test.h"
BOOL flush_may_occur;
long expected_bytes_to_free;
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v,
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep,
BOOL c __attribute__((__unused__))
) {
assert(flush_may_occur);
if (!keep) {
int* foo = v;
assert(*foo == 3);
toku_free(v);
}
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
int* foo = toku_malloc(sizeof(int));
*value = foo;
*sizep = 4;
*foo = 4;
return 0;
}
static void
other_flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
}
static int
other_fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
return 0;
}
static int
pe_callback (
void *brtnode_pv,
long bytes_to_free,
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
assert(expected_bytes_to_free == bytes_to_free);
*bytes_freed = 1;
expected_bytes_to_free--;
int* foo = brtnode_pv;
int blah = *foo;
*foo = blah-1;
return 0;
}
static int
other_pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed __attribute__((__unused__)),
void* extraargs __attribute__((__unused__))
)
{
return 0;
}
static void
cachetable_test (void) {
const int test_limit = 16;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
void* v2;
long s1, s2;
flush_may_occur = FALSE;
for (int i = 0; i < 100000; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 4);
}
for (int i = 0; i < 8; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 4);
}
for (int i = 0; i < 4; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 4);
}
for (int i = 0; i < 2; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 4);
}
flush_may_occur = FALSE;
expected_bytes_to_free = 4;
r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, other_flush, other_fetch, other_pe_callback, NULL);
flush_may_occur = TRUE;
r = toku_cachetable_unpin(f1, make_blocknum(5), 5, CACHETABLE_CLEAN, 4);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
...@@ -28,6 +28,19 @@ fetch (CACHEFILE f __attribute__((__unused__)), ...@@ -28,6 +28,19 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0; return 0;
} }
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
*bytes_freed = 0;
return 0;
}
static void static void
cachetable_count_pinned_test (int n) { cachetable_count_pinned_test (int n) {
const int test_limit = 2*n; const int test_limit = 2*n;
...@@ -43,7 +56,7 @@ cachetable_count_pinned_test (int n) { ...@@ -43,7 +56,7 @@ cachetable_count_pinned_test (int n) {
for (i=1; i<=n; i++) { for (i=1; i<=n; i++) {
u_int32_t hi; u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i)); hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, fetch, 0); r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, fetch, pe_callback, 0);
assert(r == 0); assert(r == 0);
assert(toku_cachefile_count_pinned(f1, 0) == i); assert(toku_cachefile_count_pinned(f1, 0) == i);
......
...@@ -28,6 +28,19 @@ fetch (CACHEFILE f __attribute__((__unused__)), ...@@ -28,6 +28,19 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0; return 0;
} }
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
*bytes_freed = 0;
return 0;
}
static void static void
cachetable_debug_test (int n) { cachetable_debug_test (int n) {
const int test_limit = n; const int test_limit = n;
...@@ -51,7 +64,7 @@ cachetable_debug_test (int n) { ...@@ -51,7 +64,7 @@ cachetable_debug_test (int n) {
const int item_size = 1; const int item_size = 1;
u_int32_t hi; u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i)); hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, item_size, flush, fetch, 0); r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, item_size, flush, fetch, pe_callback, 0);
assert(r == 0); assert(r == 0);
void *v; int dirty; long long pinned; long pair_size; void *v; int dirty; long long pinned; long pair_size;
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -31,6 +31,8 @@ doit (void) { ...@@ -31,6 +31,8 @@ doit (void) {
assert(r==0); assert(r==0);
toku_free(fname); toku_free(fname);
toku_testsetup_initialize(); // must precede any other toku_testsetup calls
r = toku_testsetup_leaf(t, &nodea); r = toku_testsetup_leaf(t, &nodea);
assert(r==0); assert(r==0);
......
...@@ -60,6 +60,8 @@ doit (int ksize __attribute__((__unused__))) { ...@@ -60,6 +60,8 @@ doit (int ksize __attribute__((__unused__))) {
assert(r==0); assert(r==0);
toku_free(fname); toku_free(fname);
toku_testsetup_initialize(); // must precede any other toku_testsetup calls
for (i=0; i<BRT_FANOUT; i++) { for (i=0; i<BRT_FANOUT; i++) {
r=toku_testsetup_leaf(t, &cnodes[i]); r=toku_testsetup_leaf(t, &cnodes[i]);
assert(r==0); assert(r==0);
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment