Commit 96e12721 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

[t:3219] Merge the 3219 fixes onto the main line. Refs #3219.

{{{
svn merge -r28558:28575 https://svn.tokutek.com/tokudb/toku/tokudb.3219c
}}}
.


git-svn-id: file:///svn/toku/tokudb@28588 c7de825b-a66e-492c-adef-691d508d4ae1
parent 70c82da3
......@@ -1889,7 +1889,6 @@ static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_MSG cmd,
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
/* find the right subtree */
//TODO: accesses key, val directly
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t);
......@@ -4558,7 +4557,7 @@ brt_search_leaf_node(BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTI
}
static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor);
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor, UNLOCKERS unlockers);
// the number of nodes to prefetch
#define TOKU_DO_PREFETCH 2
......@@ -4586,9 +4585,24 @@ brt_node_maybe_prefetch(BRT brt, BRTNODE node, int childnum, BRT_CURSOR brtcurso
#endif
struct unlock_brtnode_extra {
BRT brt;
BRTNODE node;
};
// When this is called, the cachetable lock is held
static void
unlock_brtnode_fun (void *v) {
struct unlock_brtnode_extra *x = v;
BRT brt = x->brt;
BRTNODE node = x->node;
// CT lock is held
int r = toku_cachetable_unpin_ct_prelocked(brt->cf, node->thisnodename, node->fullhash, (enum cachetable_dirty) node->dirty, brtnode_memory_size(node));
assert(r==0);
}
/* search in a node's child */
static int
brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *parent_re, BOOL *doprefetch, BRT_CURSOR brtcursor, BOOL *did_react)
brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *parent_re, BOOL *doprefetch, BRT_CURSOR brtcursor, BOOL *did_react, UNLOCKERS unlockers)
// Effect: Search in a node's child.
// If we change the shape, set *did_react = TRUE. Else set *did_react = FALSE.
{
......@@ -4608,16 +4622,21 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
BLOCKNUM childblocknum = BNC_BLOCKNUM(node,childnum);
u_int32_t fullhash = compute_child_fullhash(brt->cf, node, childnum);
{
int rr = toku_cachetable_get_and_pin_nonblocking(brt->cf, childblocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
int rr = toku_cachetable_get_and_pin_nonblocking(brt->cf, childblocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h, unlockers);
if (rr==TOKUDB_TRY_AGAIN) return rr;
lazy_assert_zero(rr);
}
BRTNODE childnode = node_v;
struct unlock_brtnode_extra unlock_extra = {brt,childnode};
struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers};
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode);
enum reactivity child_re = RE_STABLE;
int r = brt_search_node(brt, childnode, search, getf, getf_v, &child_re, doprefetch, brtcursor);
int r = brt_search_node(brt, childnode, search, getf, getf_v, &child_re, doprefetch, brtcursor, &next_unlockers);
if (r!=TOKUDB_TRY_AGAIN) {
// Even if r is reactive, we want to handle the maybe reactive child.
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode);
......@@ -4628,11 +4647,11 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
brt_node_maybe_prefetch(brt, node, childnum, brtcursor, doprefetch);
#endif
assert(next_unlockers.locked);
{
int rr = toku_unpin_brtnode(brt, childnode); // unpin the childnode before handling the reactive child (because that may make the childnode disappear.)
if (rr!=0) r = rr;
}
{
BOOL did_io = FALSE;
int rr = brt_handle_maybe_reactive_child(brt, node, childnum, child_re, &did_io, did_react);
......@@ -4642,12 +4661,16 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
*parent_re = get_nonleaf_reactivity(node);
verify_local_fingerprint_nonleaf(node);
} else {
// try again.
assert(!next_unlockers.locked);
}
return r;
}
static int
brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor)
brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor, UNLOCKERS unlockers)
{
int count=0;
again:
......@@ -4672,10 +4695,10 @@ brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CAL
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)))) {
BOOL did_change_shape = FALSE;
verify_local_fingerprint_nonleaf(node);
int r = brt_search_child(brt, node, child[c], search, getf, getf_v, re, doprefetch, brtcursor, &did_change_shape);
int r = brt_search_child(brt, node, child[c], search, getf, getf_v, re, doprefetch, brtcursor, &did_change_shape, unlockers);
lazy_assert(r != EAGAIN);
if (r == 0) return r; //Success
if (r != DB_NOTFOUND) return r; //Error (or message to quit early, such as TOKUDB_FOUND_BUT_REJECTED)
if (r != DB_NOTFOUND) return r; //Error (or message to quit early, such as TOKUDB_FOUND_BUT_REJECTED or TOKUDB_TRY_AGAIN)
if (did_change_shape) goto again;
}
}
......@@ -4683,16 +4706,16 @@ brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CAL
/* check the first (left) or last (right) node if nothing has been found */
BOOL ignore_did_change_shape; // ignore this
verify_local_fingerprint_nonleaf(node);
return brt_search_child(brt, node, child[c], search, getf, getf_v, re, doprefetch, brtcursor, &ignore_did_change_shape);
return brt_search_child(brt, node, child[c], search, getf, getf_v, re, doprefetch, brtcursor, &ignore_did_change_shape, unlockers);
}
}
static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor)
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor, UNLOCKERS unlockers)
{
verify_local_fingerprint_nonleaf(node);
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, getf, getf_v, re, doprefetch, brtcursor);
return brt_search_nonleaf_node(brt, node, search, getf, getf_v, re, doprefetch, brtcursor, unlockers);
else {
return brt_search_leaf_node(node, search, getf, getf_v, re, doprefetch, brtcursor);
}
......@@ -4723,21 +4746,30 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
BRTNODE node = node_v;
struct unlock_brtnode_extra unlock_extra = {brt,node};
struct unlockers unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, (UNLOCKERS)NULL};
{
enum reactivity re = RE_STABLE;
BOOL doprefetch = FALSE;
//static int counter = 0; counter++;
r = brt_search_node(brt, node, search, getf, getf_v, &re, &doprefetch, brtcursor);
r = brt_search_node(brt, node, search, getf, getf_v, &re, &doprefetch, brtcursor, &unlockers);
if (r==TOKUDB_TRY_AGAIN) {
assert(!unlockers.locked);
goto try_again;
} else {
assert(unlockers.locked);
}
if (r!=0) goto return_r;
r = brt_handle_maybe_reactive_child_at_root(brt, rootp, &node, re);
}
return_r:
assert(unlockers.locked);
rr = toku_unpin_brtnode(brt, node);
lazy_assert_zero(rr);
if (r==TOKUDB_TRY_AGAIN) goto try_again;
//Heaviside function (+direction) queries define only a lower or upper
//bound. Some queries require both an upper and lower bound.
......
......@@ -1600,7 +1600,8 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
}
int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, enum cachetable_dirty dirty, long size)
static int
toku_cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, enum cachetable_dirty dirty, long size, BOOL have_ct_lock)
// size==0 means that the size didn't change.
{
CACHETABLE ct = cachefile->cachetable;
......@@ -1610,7 +1611,7 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
int count = 0;
int r = -1;
//assert(fullhash == toku_cachetable_hash(cachefile, key));
cachetable_lock(ct);
if (!have_ct_lock) cachetable_lock(ct);
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
......@@ -1634,16 +1635,36 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
}
}
note_hash_count(count);
cachetable_unlock(ct);
if (!have_ct_lock) cachetable_unlock(ct);
return r;
}
int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, enum cachetable_dirty dirty, long size) {
// By default we don't have the lock
return toku_cachetable_unpin_internal(cachefile, key, fullhash, dirty, size, FALSE);
}
int toku_cachetable_unpin_ct_prelocked(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, enum cachetable_dirty dirty, long size) {
// By default we don't have the lock
return toku_cachetable_unpin_internal(cachefile, key, fullhash, dirty, size, TRUE);
}
static void
run_unlockers (UNLOCKERS unlockers) {
while (unlockers) {
assert(unlockers->locked);
unlockers->locked = FALSE;
unlockers->f(unlockers->extra);
unlockers=unlockers->next;
}
}
int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs)
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs,
UNLOCKERS unlockers)
// Effect: If the block is in the cachetable, then pin it and return it.
// Otherwise call the lock_unlock_callback (to unlock), fetch the data (but don't pin it, since we'll just end up pinning it again later), and the call (to lock)
// and return TOKU_DB_TRYAGAIN.
// and return TOKUDB_TRY_AGAIN.
{
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
......@@ -1670,9 +1691,11 @@ int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cf, CACHEKEY key, u_int32
case CTPAIR_INVALID: assert(0);
case CTPAIR_READING:
case CTPAIR_WRITING:
run_unlockers(unlockers); // The contract says the unlockers are run with the ct lock being held.
if (ct->ydb_unlock_callback) ct->ydb_unlock_callback();
// Now wait for the I/O to occur.
// We need to obtain the read lock (waiting for the write to finish), but then we only waited so we could wake up again. So rather than locking the read lock, and then releasing it we call this function.
rwlock_read_lock_and_unlock(&p->rwlock, ct->mutex); // recall that this lock releases and reacquires the ct->mutex.
rwlock_read_lock_and_unlock(&p->rwlock, ct->mutex); // recall that this lock releases and reacquires the ct->mutex, letting writers finish up first.
cachetable_unlock(ct);
if (ct->ydb_lock_callback) ct->ydb_lock_callback();
return TOKUDB_TRY_AGAIN;
......@@ -1694,6 +1717,7 @@ int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cf, CACHEKEY key, u_int32
p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, fetch_callback, extraargs, CACHETABLE_CLEAN);
assert(p);
rwlock_write_lock(&p->rwlock, ct->mutex);
run_unlockers(unlockers); // we hold the ct mutex.
if (ct->ydb_unlock_callback) ct->ydb_unlock_callback();
int r = cachetable_fetch_pair(ct, cf, p);
cachetable_unlock(ct);
......
......@@ -165,12 +165,21 @@ int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs);
typedef struct unlockers *UNLOCKERS;
struct unlockers {
BOOL locked;
void (*f)(void*extra);
void *extra;
UNLOCKERS next;
};
// Effect: If the block is in the cachetable, then return it.
// Otherwise call the release_lock_callback, fetch the data (but don't pin it, since we'll just end up pinning it again later),
// Otherwise call the release_lock_callback, call the functions in unlockers, fetch the data (but don't pin it, since we'll just end up pinning it again later),
// and return TOKU_DB_TRYAGAIN.
int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs);
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs,
UNLOCKERS unlockers);
#define CAN_RELEASE_LOCK_DURING_IO
// Maybe get and pin a memory object.
......@@ -189,11 +198,16 @@ enum cachetable_dirty {
CACHETABLE_DIRTY=1, // the cached object is dirty WRT the cachefile
};
// Unpin a memory object
int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetable_dirty dirty, long size);
// Effect: Unpin a memory object
// Effects: If the memory object is in the cachetable, then OR the dirty flag,
// update the size, and release the read lock on the memory object.
// Returns: 0 if success, otherwise returns an error number.
int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetable_dirty dirty, long size);
// Requires: The ct is locked.
int toku_cachetable_unpin_ct_prelocked(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetable_dirty dirty, long size);
// Effect: The same as tokud_cachetable_unpin, except that the ct must not be locked.
// Requires: The ct is NOT locked.
int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing something already present is OK. */
// Effect: Remove an object from the cachetable. Don't write it back.
......
......@@ -37,7 +37,6 @@ TRANSPARENT_UPGRADE_SRCS = $(wildcard upgrade*.c)
NONSTANDARD_SRCS= \
$(RECOVER_SRCS) \
$(LOADER_SRCS) \
test3219.c \
SRCS = $(sort $(wildcard *.c))
# To patch out upgrade tests, replace line above with line below
......
......@@ -14,8 +14,8 @@
#include <pthread.h>
DB_ENV *env;
DB *db;
static DB_ENV *env;
static DB *db;
static void
insert(int i, DB_TXN *txn)
......@@ -52,7 +52,7 @@ lookup(int i, DB_TXN *txn)
}
#define N_ROWS 1000000
#define N_TXNS 1000000
#define N_TXNS 10000
#define N_ROWS_PER_TXN 1
#define INITIAL_SIZE 1000
......@@ -112,8 +112,10 @@ static void*
start_b (void *arg __attribute__((__unused__))) {
int r;
for (int j=0; j<N_TXNS; j++) {
if (verbose) {
printf("."); fflush(stdout);
if (j%(N_TXNS/10)==0) printf("\n");
}
DB_TXN *txn;
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
for (int i=0; i<N_ROWS_PER_TXN; i++) {
......@@ -160,8 +162,9 @@ run_test (void)
finish();
}
int test_main (int argc __attribute__((__unused__)), char*const argv[] __attribute__((__unused__))) {
int test_main (int argc, char*const argv[]) {
parse_args(argc, argv);
run_test();
printf("\n");
if (verbose) printf("\n");
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment