Commit 30a0c91f authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

closes #5978, merge to main

git-svn-id: file:///svn/toku/tokudb@53062 c7de825b-a66e-492c-adef-691d508d4ae1
parent 64d39005
...@@ -150,6 +150,11 @@ struct ctpair { ...@@ -150,6 +150,11 @@ struct ctpair {
// protected by PAIR->mutex // protected by PAIR->mutex
uint32_t count; // clock count uint32_t count; // clock count
uint32_t refcount; // if > 0, then this PAIR is referenced by
// callers to the cachetable, and therefore cannot
// be evicted
uint32_t num_waiting_on_refs; // number of threads waiting on refcount to go to zero
toku_cond_t refcount_wait; // cond used to wait for refcount to go to zero
// locks // locks
toku::frwlock value_rwlock; toku::frwlock value_rwlock;
......
...@@ -86,7 +86,9 @@ static PAIR_ATTR const zero_attr = { ...@@ -86,7 +86,9 @@ static PAIR_ATTR const zero_attr = {
static inline void ctpair_destroy(PAIR p) { static inline void ctpair_destroy(PAIR p) {
p->value_rwlock.deinit(); p->value_rwlock.deinit();
paranoid_invariant(p->refcount == 0);
nb_mutex_destroy(&p->disk_nb_mutex); nb_mutex_destroy(&p->disk_nb_mutex);
toku_cond_destroy(&p->refcount_wait);
toku_free(p); toku_free(p);
} }
...@@ -98,6 +100,30 @@ static inline void pair_unlock(PAIR p) { ...@@ -98,6 +100,30 @@ static inline void pair_unlock(PAIR p) {
toku_mutex_unlock(p->mutex); toku_mutex_unlock(p->mutex);
} }
// adds a reference to the PAIR
// on input and output, PAIR mutex is held
static void pair_add_ref_unlocked(PAIR p) {
p->refcount++;
}
// releases a reference to the PAIR
// on input and output, PAIR mutex is held
static void pair_release_ref_unlocked(PAIR p) {
paranoid_invariant(p->refcount > 0);
p->refcount--;
if (p->refcount == 0 && p->num_waiting_on_refs > 0) {
toku_cond_broadcast(&p->refcount_wait);
}
}
static void pair_wait_for_ref_release_unlocked(PAIR p) {
p->num_waiting_on_refs++;
while (p->refcount > 0) {
toku_cond_wait(&p->refcount_wait, p->mutex);
}
p->num_waiting_on_refs--;
}
bool toku_ctpair_is_write_locked(PAIR pair) { bool toku_ctpair_is_write_locked(PAIR pair) {
return pair->value_rwlock.writers() == 1; return pair->value_rwlock.writers() == 1;
} }
...@@ -573,7 +599,7 @@ static void cachetable_maybe_remove_and_free_pair ( ...@@ -573,7 +599,7 @@ static void cachetable_maybe_remove_and_free_pair (
) )
{ {
// this ensures that a clone running in the background first completes // this ensures that a clone running in the background first completes
if (p->value_rwlock.users() == 0) { if (p->value_rwlock.users() == 0 && p->refcount == 0) {
// assumption is that if we are about to remove the pair // assumption is that if we are about to remove the pair
// that no one has grabbed the disk_nb_mutex, // that no one has grabbed the disk_nb_mutex,
// and that there is no cloned_value_data, because // and that there is no cloned_value_data, because
...@@ -760,6 +786,9 @@ void pair_init(PAIR p, ...@@ -760,6 +786,9 @@ void pair_init(PAIR p,
p->write_extraargs = write_callback.write_extraargs; p->write_extraargs = write_callback.write_extraargs;
p->count = 0; // <CER> Is zero the correct init value? p->count = 0; // <CER> Is zero the correct init value?
p->refcount = 0;
p->num_waiting_on_refs = 0;
toku_cond_init(&p->refcount_wait, NULL);
p->checkpoint_pending = false; p->checkpoint_pending = false;
p->mutex = list->get_mutex_for_pair(fullhash); p->mutex = list->get_mutex_for_pair(fullhash);
...@@ -1851,7 +1880,6 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, ...@@ -1851,7 +1880,6 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
// //
static int static int
cachetable_unpin_internal( cachetable_unpin_internal(
PAIR locked_p,
CACHEFILE cachefile, CACHEFILE cachefile,
PAIR p, PAIR p,
enum cachetable_dirty dirty, enum cachetable_dirty dirty,
...@@ -1865,9 +1893,7 @@ cachetable_unpin_internal( ...@@ -1865,9 +1893,7 @@ cachetable_unpin_internal(
bool added_data_to_cachetable = false; bool added_data_to_cachetable = false;
// hack for #3969, only exists in case where we run unlockers // hack for #3969, only exists in case where we run unlockers
if (!locked_p || locked_p->mutex != p->mutex) {
pair_lock(p); pair_lock(p);
}
PAIR_ATTR old_attr = p->attr; PAIR_ATTR old_attr = p->attr;
PAIR_ATTR new_attr = attr; PAIR_ATTR new_attr = attr;
if (dirty) { if (dirty) {
...@@ -1878,9 +1904,7 @@ cachetable_unpin_internal( ...@@ -1878,9 +1904,7 @@ cachetable_unpin_internal(
} }
bool read_lock_grabbed = p->value_rwlock.readers() != 0; bool read_lock_grabbed = p->value_rwlock.readers() != 0;
unpin_pair(p, read_lock_grabbed); unpin_pair(p, read_lock_grabbed);
if (!locked_p || locked_p->mutex != p->mutex) {
pair_unlock(p); pair_unlock(p);
}
if (attr.is_valid) { if (attr.is_valid) {
if (new_attr.size > old_attr.size) { if (new_attr.size > old_attr.size) {
...@@ -1902,18 +1926,18 @@ cachetable_unpin_internal( ...@@ -1902,18 +1926,18 @@ cachetable_unpin_internal(
} }
int toku_cachetable_unpin(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) { int toku_cachetable_unpin(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
return cachetable_unpin_internal(NULL, cachefile, p, dirty, attr, true); return cachetable_unpin_internal(cachefile, p, dirty, attr, true);
} }
int toku_cachetable_unpin_ct_prelocked_no_flush(PAIR locked_p, CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) { int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
return cachetable_unpin_internal(locked_p, cachefile, p, dirty, attr, false); return cachetable_unpin_internal(cachefile, p, dirty, attr, false);
} }
static void static void
run_unlockers (PAIR p, UNLOCKERS unlockers) { run_unlockers (UNLOCKERS unlockers) {
while (unlockers) { while (unlockers) {
assert(unlockers->locked); assert(unlockers->locked);
unlockers->locked = false; unlockers->locked = false;
unlockers->f(p, unlockers->extra); unlockers->f(unlockers->extra);
unlockers=unlockers->next; unlockers=unlockers->next;
} }
} }
...@@ -1944,19 +1968,27 @@ maybe_pin_pair( ...@@ -1944,19 +1968,27 @@ maybe_pin_pair(
// run the unlockers, as we intend to return the value to the user // run the unlockers, as we intend to return the value to the user
if (lock_type == PL_READ) { if (lock_type == PL_READ) {
if (p->value_rwlock.read_lock_is_expensive()) { if (p->value_rwlock.read_lock_is_expensive()) {
run_unlockers(p, unlockers); pair_add_ref_unlocked(p);
pair_unlock(p);
run_unlockers(unlockers);
retval = TOKUDB_TRY_AGAIN; retval = TOKUDB_TRY_AGAIN;
pair_lock(p);
pair_release_ref_unlocked(p);
} }
p->value_rwlock.read_lock(); p->value_rwlock.read_lock();
} }
else if (lock_type == PL_WRITE_EXPENSIVE || lock_type == PL_WRITE_CHEAP){ else if (lock_type == PL_WRITE_EXPENSIVE || lock_type == PL_WRITE_CHEAP){
if (p->value_rwlock.write_lock_is_expensive()) { if (p->value_rwlock.write_lock_is_expensive()) {
run_unlockers(p, unlockers); pair_add_ref_unlocked(p);
pair_unlock(p);
run_unlockers(unlockers);
// change expensive to false because // change expensive to false because
// we will unpin the pair immedietely // we will unpin the pair immedietely
// after pinning it // after pinning it
expensive = false; expensive = false;
retval = TOKUDB_TRY_AGAIN; retval = TOKUDB_TRY_AGAIN;
pair_lock(p);
pair_release_ref_unlocked(p);
} }
p->value_rwlock.write_lock(expensive); p->value_rwlock.write_lock(expensive);
} }
...@@ -2036,7 +2068,7 @@ try_again: ...@@ -2036,7 +2068,7 @@ try_again:
// will not block. // will not block.
p->value_rwlock.write_lock(true); p->value_rwlock.write_lock(true);
pair_unlock(p); pair_unlock(p);
run_unlockers(NULL, unlockers); // we hold the write list_lock. run_unlockers(unlockers); // we hold the write list_lock.
ct->list.write_list_unlock(); ct->list.write_list_unlock();
// at this point, only the pair is pinned, // at this point, only the pair is pinned,
...@@ -2075,7 +2107,7 @@ try_again: ...@@ -2075,7 +2107,7 @@ try_again:
// still check for partial fetch // still check for partial fetch
bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
if (partial_fetch_required) { if (partial_fetch_required) {
run_unlockers(NULL, unlockers); run_unlockers(unlockers);
// we are now getting an expensive write lock, because we // we are now getting an expensive write lock, because we
// are doing a partial fetch. So, if we previously have // are doing a partial fetch. So, if we previously have
...@@ -2302,6 +2334,8 @@ void toku_cachetable_verify (CACHETABLE ct) { ...@@ -2302,6 +2334,8 @@ void toku_cachetable_verify (CACHETABLE ct) {
ct->list.verify(); ct->list.verify();
} }
struct pair_flush_for_close{ struct pair_flush_for_close{
PAIR p; PAIR p;
BACKGROUND_JOB_MANAGER bjm; BACKGROUND_JOB_MANAGER bjm;
...@@ -2406,6 +2440,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { ...@@ -2406,6 +2440,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
assert(nb_mutex_users(&p->disk_nb_mutex) == 0); assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
assert(!p->cloned_value_data); assert(!p->cloned_value_data);
assert(p->dirty == CACHETABLE_CLEAN); assert(p->dirty == CACHETABLE_CLEAN);
assert(p->refcount == 0);
// TODO: maybe break up this function // TODO: maybe break up this function
// so that write lock does not need to be held for entire // so that write lock does not need to be held for entire
// free // free
...@@ -2481,7 +2516,7 @@ int toku_test_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, uint32_t fullh ...@@ -2481,7 +2516,7 @@ int toku_test_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, uint32_t fullh
int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) { int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) {
// We hold the cachetable mutex. // We hold the cachetable mutex.
PAIR p = test_get_pair(cachefile, key, fullhash, true); PAIR p = test_get_pair(cachefile, key, fullhash, true);
return toku_cachetable_unpin_ct_prelocked_no_flush(NULL, cachefile, p, dirty, attr); return toku_cachetable_unpin_ct_prelocked_no_flush(cachefile, p, dirty, attr);
} }
//test-only wrapper //test-only wrapper
...@@ -2597,11 +2632,15 @@ int toku_cachetable_unpin_and_remove ( ...@@ -2597,11 +2632,15 @@ int toku_cachetable_unpin_and_remove (
// //
cachetable_remove_pair(&ct->list, &ct->ev, p); cachetable_remove_pair(&ct->list, &ct->ev, p);
ct->list.write_list_unlock(); ct->list.write_list_unlock();
if (p->refcount > 0) {
pair_wait_for_ref_release_unlocked(p);
}
if (p->value_rwlock.users() > 0) { if (p->value_rwlock.users() > 0) {
// Need to wait for everyone else to leave // Need to wait for everyone else to leave
// This write lock will be granted only after all waiting // This write lock will be granted only after all waiting
// threads are done. // threads are done.
p->value_rwlock.write_lock(true); p->value_rwlock.write_lock(true);
assert(p->refcount == 0);
assert(p->value_rwlock.users() == 1); // us assert(p->value_rwlock.users() == 1); // us
assert(!p->checkpoint_pending); assert(!p->checkpoint_pending);
assert(p->attr.cache_pressure_size == 0); assert(p->attr.cache_pressure_size == 0);
...@@ -3745,7 +3784,13 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) { ...@@ -3745,7 +3784,13 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
goto exit; goto exit;
} }
pair_lock(curr_in_clock); pair_lock(curr_in_clock);
// these are the circumstances under which we don't run eviction on a pair:
// - if other users are waiting on the lock
// - if the PAIR is referenced by users
// - if the PAIR's disk_nb_mutex is in use, implying that it is
// undergoing a checkpoint
if (curr_in_clock->value_rwlock.users() || if (curr_in_clock->value_rwlock.users() ||
curr_in_clock->refcount > 0 ||
nb_mutex_users(&curr_in_clock->disk_nb_mutex)) nb_mutex_users(&curr_in_clock->disk_nb_mutex))
{ {
pair_unlock(curr_in_clock); pair_unlock(curr_in_clock);
......
...@@ -326,7 +326,7 @@ void toku_cachetable_pf_pinned_pair( ...@@ -326,7 +326,7 @@ void toku_cachetable_pf_pinned_pair(
struct unlockers { struct unlockers {
bool locked; bool locked;
void (*f)(PAIR p, void* extra); void (*f)(void* extra);
void *extra; void *extra;
UNLOCKERS next; UNLOCKERS next;
}; };
...@@ -385,7 +385,7 @@ int toku_cachetable_unpin(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATT ...@@ -385,7 +385,7 @@ int toku_cachetable_unpin(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATT
// Returns: 0 if success, otherwise returns an error number. // Returns: 0 if success, otherwise returns an error number.
// Requires: The ct is locked. // Requires: The ct is locked.
int toku_cachetable_unpin_ct_prelocked_no_flush(PAIR, CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size); int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size);
// Effect: The same as tokud_cachetable_unpin, except that the ct must not be locked. // Effect: The same as tokud_cachetable_unpin, except that the ct must not be locked.
// Requires: The ct is NOT locked. // Requires: The ct is NOT locked.
......
...@@ -4789,14 +4789,13 @@ struct unlock_ftnode_extra { ...@@ -4789,14 +4789,13 @@ struct unlock_ftnode_extra {
}; };
// When this is called, the cachetable lock is held // When this is called, the cachetable lock is held
static void static void
unlock_ftnode_fun (PAIR p, void *v) { unlock_ftnode_fun (void *v) {
struct unlock_ftnode_extra *x = NULL; struct unlock_ftnode_extra *x = NULL;
CAST_FROM_VOIDP(x, v); CAST_FROM_VOIDP(x, v);
FT_HANDLE brt = x->ft_handle; FT_HANDLE brt = x->ft_handle;
FTNODE node = x->node; FTNODE node = x->node;
// CT lock is held // CT lock is held
int r = toku_cachetable_unpin_ct_prelocked_no_flush( int r = toku_cachetable_unpin_ct_prelocked_no_flush(
p,
brt->ft->cf, brt->ft->cf,
node->ct_pair, node->ct_pair,
(enum cachetable_dirty) node->dirty, (enum cachetable_dirty) node->dirty,
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-simple-verify.cc 52748 2013-01-31 21:12:42Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "test.h"
//
// This test verifies that if a node is pinned by a thread
// doing get_and_pin_nonblocking while another thread is trying
// to unpin_and_remove it, that nothing bad happens.
//
CACHEFILE f1;
PAIR p1;
PAIR p2;
static int
fetch_one(CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
void **dd __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
assert(k.b == 1);
p1 = p;
return 0;
}
static int
fetch_two (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
void **dd __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
assert(k.b == 2);
p2 = p;
return 0;
}
toku_pthread_t unpin_and_remove_tid;
static void *unpin_and_remove_one(void *UU(arg)) {
int r = toku_cachetable_unpin_and_remove(
f1,
p1,
NULL,
NULL
);
assert_zero(r);
return arg;
}
static void
unpin_two (void* UU(v)) {
int r = toku_cachetable_unpin_ct_prelocked_no_flush(
f1,
p2,
CACHETABLE_DIRTY,
make_pair_attr(8)
);
assert_zero(r);
// at this point, we have p1 pinned, want to start a thread to do an unpin_and_remove
// on p1
r = toku_pthread_create(
&unpin_and_remove_tid,
NULL,
unpin_and_remove_one,
NULL
);
assert_zero(r);
// sleep to give a chance for the unpin_and_remove to get going
usleep(512*1024);
}
static void *repin_one(void *UU(arg)) {
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
struct unlockers unlockers = {true, unpin_two, NULL, NULL};
void* v1;
long s1;
int r = toku_cachetable_get_and_pin_nonblocking(
f1,
make_blocknum(1),
1,
&v1,
&s1,
wc,
def_fetch,
def_pf_req_callback,
def_pf_callback,
PL_WRITE_EXPENSIVE,
NULL,
&unlockers
);
assert(r == TOKUDB_TRY_AGAIN);
return arg;
}
static void
cachetable_test (void) {
const int test_limit = 1000;
int r;
toku_pair_list_set_lock_size(2); // set two bucket mutexes
CACHETABLE ct;
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
const char *fname1 = TOKU_TEST_FILENAME;
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
// bring pairs 1 and 2 into memory, then unpin
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, fetch_one, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v1, &s1, wc, fetch_two, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
toku_pthread_t tid1;
r = toku_pthread_create(&tid1, NULL, repin_one, NULL);
assert_zero(r);
void *ret;
r = toku_pthread_join(tid1, &ret);
assert_zero(r);
r = toku_pthread_join(unpin_and_remove_tid, &ret);
assert_zero(r);
toku_cachetable_verify(ct);
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
// test ought to run bunch of times in hope of hitting bug
uint32_t num_test_runs = 1;
for (uint32_t i = 0; i < num_test_runs; i++) {
if (verbose) {
printf("starting test run %" PRIu32 " \n", i);
}
cachetable_test();
}
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-simple-verify.cc 52748 2013-01-31 21:12:42Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "test.h"
//
// This test verifies the behavior that originally caused
// #5978 is fixed. Here is what we do. We have four pairs with
// blocknums and fullhashes of 1,2,3,4. The cachetable has only
// two bucket mutexes, so 1 and 3 share a pair mutex, as do 2 and 4.
// We pin all four with expensive write locks. Then, on backgroud threads,
// we call get_and_pin_nonblocking on 3, where the unlockers unpins 2, and
// we call get_and_pin_nonblocking on 4, where the unlockers unpins 1. Run this
// enough times, and we should see a deadlock before the fix, and no deadlock
// after the fix.
//
CACHEFILE f1;
PAIR p3;
PAIR p4;
static int
fetch_three (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
void **dd __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
assert(k.b == 3);
p3 = p;
return 0;
}
static int
fetch_four (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
void **dd __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
assert(k.b == 4);
p4 = p;
return 0;
}
static void
unpin_four (void* UU(v)) {
int r = toku_cachetable_unpin_ct_prelocked_no_flush(
f1,
p3,
CACHETABLE_DIRTY,
make_pair_attr(8)
);
assert_zero(r);
}
static void
unpin_three (void* UU(v)) {
int r = toku_cachetable_unpin_ct_prelocked_no_flush(
f1,
p4,
CACHETABLE_DIRTY,
make_pair_attr(8)
);
assert_zero(r);
}
static void *repin_one(void *UU(arg)) {
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
struct unlockers unlockers = {true, unpin_four, NULL, NULL};
void* v1;
long s1;
int r = toku_cachetable_get_and_pin_nonblocking(
f1,
make_blocknum(1),
1,
&v1,
&s1,
wc,
def_fetch,
def_pf_req_callback,
def_pf_callback,
PL_WRITE_EXPENSIVE,
NULL,
&unlockers
);
assert(r == TOKUDB_TRY_AGAIN);
return arg;
}
static void *repin_two(void *UU(arg)) {
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
struct unlockers unlockers = {true, unpin_three, NULL, NULL};
void* v1;
long s1;
int r = toku_cachetable_get_and_pin_nonblocking(
f1,
make_blocknum(2),
2,
&v1,
&s1,
wc,
def_fetch,
def_pf_req_callback,
def_pf_callback,
PL_WRITE_EXPENSIVE,
NULL,
&unlockers
);
assert(r == TOKUDB_TRY_AGAIN);
return arg;
}
static void
cachetable_test (void) {
const int test_limit = 1000;
int r;
toku_pair_list_set_lock_size(2); // set two bucket mutexes
CACHETABLE ct;
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
const char *fname1 = TOKU_TEST_FILENAME;
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
// bring pairs 1 and 2 into memory, then unpin
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
// now pin pairs 3 and 4
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v1, &s1, wc, fetch_three, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v1, &s1, wc, fetch_four, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
toku_pthread_t tid1;
toku_pthread_t tid2;
r = toku_pthread_create(&tid1, NULL, repin_one, NULL);
assert_zero(r);
r = toku_pthread_create(&tid2, NULL, repin_two, NULL);
assert_zero(r);
// unpin 1 and 2 so tid1 and tid2 can make progress
usleep(512*1024);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8));
assert_zero(r);
r = toku_test_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_DIRTY, make_pair_attr(8));
assert_zero(r);
void *ret;
r = toku_pthread_join(tid1, &ret);
assert_zero(r);
r = toku_pthread_join(tid2, &ret);
assert_zero(r);
toku_cachetable_verify(ct);
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
// test ought to run bunch of times in hope of hitting bug
uint32_t num_test_runs = 30;
for (uint32_t i = 0; i < num_test_runs; i++) {
if (verbose) {
printf("starting test run %" PRIu32 " \n", i);
}
cachetable_test();
}
return 0;
}
...@@ -27,7 +27,7 @@ static void kibbutz_work(void *fe_v) ...@@ -27,7 +27,7 @@ static void kibbutz_work(void *fe_v)
} }
static void static void
unlock_dummy (PAIR UU(p), void* UU(v)) { unlock_dummy (void* UU(v)) {
} }
static void reset_unlockers(UNLOCKERS unlockers) { static void reset_unlockers(UNLOCKERS unlockers) {
......
...@@ -35,7 +35,7 @@ static void kibbutz_work(void *fe_v) ...@@ -35,7 +35,7 @@ static void kibbutz_work(void *fe_v)
} }
static void static void
unlock_dummy (PAIR UU(p), void* UU(v)) { unlock_dummy (void* UU(v)) {
} }
static void reset_unlockers(UNLOCKERS unlockers) { static void reset_unlockers(UNLOCKERS unlockers) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment