Commit cbdfffc0 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4333], add test

git-svn-id: file:///svn/toku/tokudb@39006 c7de825b-a66e-492c-adef-691d508d4ae1
parent ba7e5a49
#ident "$Id: cachetable-pin-checkpoint.c 38124 2011-12-22 19:45:52Z bkuszmaul $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
//
// This test ensures that get_and_pin with dependent nodes works
// as intended with checkpoints, by having multiple threads changing
// values on elements in data, and ensure that checkpoints always get snapshots
// such that the sum of all the elements in data are 0.
//
// The arrays
// must be power of 2 minus 1
#define NUM_ELEMENTS 127
// must be (NUM_ELEMENTS +1)/2 - 1
#define NUM_INTERNAL 63
#define NUM_MOVER_THREADS 4
int64_t data[NUM_ELEMENTS];
int64_t checkpointed_data[NUM_ELEMENTS];
u_int32_t time_of_test;
BOOL run_test;
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
PAIR_ATTR s __attribute__((__unused__)),
PAIR_ATTR* new_size __attribute__((__unused__)),
BOOL write_me,
BOOL keep_me,
BOOL checkpoint_me
) {
int64_t val_to_write = *(int64_t *)v;
size_t data_index = (size_t)k.b;
if (write_me) {
usleep(10);
data[data_index] = val_to_write;
if (checkpoint_me) checkpointed_data[data_index] = val_to_write;
}
if (!keep_me) {
toku_free(v);
}
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k,
u_int32_t fullhash __attribute__((__unused__)),
void **value,
PAIR_ATTR *sizep,
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
size_t data_index = (size_t)k.b;
// assert that data_index is valid
// if it is INT64_MAX, then that means
// the block is not supposed to be in the cachetable
assert(data[data_index] != INT64_MAX);
int64_t* data_val = toku_malloc(sizeof(int64_t));
usleep(10);
*data_val = data[data_index];
*value = data_val;
*sizep = make_pair_attr(8);
return 0;
}
static void *test_time(void *arg) {
//
// if num_Seconds is set to 0, run indefinitely
//
if (time_of_test != 0) {
usleep(time_of_test*1000*1000);
if (verbose) printf("should now end test\n");
run_test = FALSE;
}
if (verbose) printf("should be ending test now\n");
return arg;
}
CACHETABLE ct;
CACHEFILE f1;
static void move_number_to_child(
int parent,
int64_t* parent_val,
enum cachetable_dirty parent_dirty
)
{
int child = 0;
int r;
child = ((random() % 2) == 0) ? (2*parent + 1) : (2*parent + 2);
void* v1;
long s1;
CACHEKEY parent_key;
parent_key.b = parent;
u_int32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);
CACHEKEY child_key;
child_key.b = child;
u_int32_t child_fullhash = toku_cachetable_hash(f1, child_key);
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
child_key,
child_fullhash,
&v1,
&s1,
flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
NULL,
NULL,
1, //num_dependent_pairs
&f1,
&parent_key,
&parent_fullhash,
&parent_dirty
);
assert(r==0);
int64_t* child_val = (int64_t *)v1;
assert(child_val != parent_val); // sanity check that we are messing with different vals
assert(*parent_val != INT64_MAX);
assert(*child_val != INT64_MAX);
usleep(10);
(*parent_val)++;
(*child_val)--;
r = toku_cachetable_unpin(f1, parent_key, parent_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
assert_zero(r);
if (child < NUM_INTERNAL) {
move_number_to_child(child, child_val, CACHETABLE_DIRTY);
}
else {
r = toku_cachetable_unpin(f1, child_key, child_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
assert_zero(r);
}
}
static void *move_numbers(void *arg) {
while (run_test) {
int parent = 0;
int r;
void* v1;
long s1;
CACHEKEY parent_key;
parent_key.b = parent;
u_int32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
parent_key,
parent_fullhash,
&v1,
&s1,
flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
NULL,
NULL,
0, //num_dependent_pairs
NULL,
NULL,
NULL,
NULL
);
assert(r==0);
int64_t* parent_val = (int64_t *)v1;
move_number_to_child(parent, parent_val, CACHETABLE_CLEAN);
}
return arg;
}
static void remove_data(CACHEKEY* cachekey, BOOL for_checkpoint, void* UU(extra)) {
assert(cachekey->b < NUM_ELEMENTS);
data[cachekey->b] = INT64_MAX;
if (for_checkpoint) {
checkpointed_data[cachekey->b] = INT64_MAX;
}
}
static void get_data(CACHEKEY* cachekey, u_int32_t* fullhash, void* extra) {
int* key = extra;
cachekey->b = *key;
*fullhash = toku_cachetable_hash(f1, *cachekey);
data[*key] = INT64_MAX - 1;
}
static void merge_and_split_child(
int parent,
int64_t* parent_val,
enum cachetable_dirty parent_dirty
)
{
int child = 0;
int other_child = 0;
int r;
BOOL even = (random() % 2) == 0;
child = (even) ? (2*parent + 1) : (2*parent + 2);
other_child = (!even) ? (2*parent + 1) : (2*parent + 2);
assert(child != other_child);
void* v1;
long s1;
CACHEKEY parent_key;
parent_key.b = parent;
u_int32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);
CACHEKEY child_key;
child_key.b = child;
u_int32_t child_fullhash = toku_cachetable_hash(f1, child_key);
enum cachetable_dirty child_dirty = CACHETABLE_CLEAN;
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
child_key,
child_fullhash,
&v1,
&s1,
flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
NULL,
NULL,
1, //num_dependent_pairs
&f1,
&parent_key,
&parent_fullhash,
&parent_dirty
);
assert(r==0);
int64_t* child_val = (int64_t *)v1;
CACHEKEY other_child_key;
other_child_key.b = other_child;
u_int32_t other_child_fullhash = toku_cachetable_hash(f1, other_child_key);
CACHEFILE cfs[2];
cfs[0] = f1;
cfs[1] = f1;
CACHEKEY keys[2];
keys[0] = parent_key;
keys[1] = child_key;
u_int32_t hashes[2];
hashes[0] = parent_fullhash;
hashes[1] = child_fullhash;
enum cachetable_dirty dirties[2];
dirties[0] = parent_dirty;
dirties[1] = child_dirty;
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
other_child_key,
other_child_fullhash,
&v1,
&s1,
flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
NULL,
NULL,
2, //num_dependent_pairs
cfs,
keys,
hashes,
dirties
);
assert(r==0);
int64_t* other_child_val = (int64_t *)v1;
assert(*parent_val != INT64_MAX);
assert(*child_val != INT64_MAX);
assert(*other_child_val != INT64_MAX);
// lets get rid of other_child_val with a merge
*child_val += *other_child_val;
*other_child_val = INT64_MAX;
toku_cachetable_unpin_and_remove(f1, other_child_key, remove_data, NULL);
dirties[1] = CACHETABLE_DIRTY;
child_dirty = CACHETABLE_DIRTY;
// now do a split
CACHEKEY new_key;
u_int32_t new_fullhash;
int64_t* data_val = toku_malloc(sizeof(int64_t));
r = toku_cachetable_put_with_dep_pairs(
f1,
get_data,
data_val,
make_pair_attr(8),
flush, def_pe_est_callback, def_pe_callback, def_cleaner_callback,
NULL, // parameter for flush_callback, pe_est_callback, pe_callback, and cleaner_callback
&other_child,
2, // number of dependent pairs that we may need to checkpoint
cfs,
keys,
hashes,
dirties,
&new_key,
&new_fullhash
);
assert(new_key.b == other_child);
assert(new_fullhash == other_child_fullhash);
*data_val = 5000;
*child_val -= 5000;
r = toku_cachetable_unpin(f1, parent_key, parent_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
assert_zero(r);
r = toku_cachetable_unpin(f1, other_child_key, other_child_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
assert_zero(r);
if (child < NUM_INTERNAL) {
merge_and_split_child(child, child_val, CACHETABLE_DIRTY);
}
else {
r = toku_cachetable_unpin(f1, child_key, child_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
assert_zero(r);
}
}
static void *merge_and_split(void *arg) {
while (run_test) {
int parent = 0;
int r;
void* v1;
long s1;
CACHEKEY parent_key;
parent_key.b = parent;
u_int32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
parent_key,
parent_fullhash,
&v1,
&s1,
flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
NULL,
NULL,
0, //num_dependent_pairs
NULL,
NULL,
NULL,
NULL
);
assert(r==0);
int64_t* parent_val = (int64_t *)v1;
merge_and_split_child(parent, parent_val, CACHETABLE_CLEAN);
}
return arg;
}
static int num_checkpoints = 0;
static void *checkpoints(void *arg) {
// first verify that checkpointed_data is correct;
while(run_test) {
int64_t sum = 0;
for (int i = 0; i < NUM_ELEMENTS; i++) {
if (checkpointed_data[i] != INT64_MAX) {
sum += checkpointed_data[i];
}
}
assert (sum==0);
//
// now run a checkpoint
//
int r;
r = toku_cachetable_begin_checkpoint(ct, NULL); assert(r == 0);
r = toku_cachetable_end_checkpoint(
ct,
NULL,
fake_ydb_lock,
fake_ydb_unlock,
NULL,
NULL
);
assert(r==0);
assert (sum==0);
for (int i = 0; i < NUM_ELEMENTS; i++) {
if (checkpointed_data[i] != INT64_MAX) {
sum += checkpointed_data[i];
}
}
assert (sum==0);
num_checkpoints++;
}
return arg;
}
static int
test_begin_checkpoint (
LSN UU(checkpoint_lsn),
void* UU(header_v))
{
memcpy(checkpointed_data, data, sizeof(int64_t)*NUM_ELEMENTS);
return 0;
}
static int
dummy_int_checkpoint_userdata(CACHEFILE UU(cf), int UU(n), void* UU(extra)) {
return 0;
}
static void sum_vals(void) {
int64_t sum = 0;
for (int i = 0; i < NUM_ELEMENTS; i++) {
//printf("actual: i %d val %"PRId64" \n", i, data[i]);
if (data[i] != INT64_MAX) {
sum += data[i];
}
}
if (verbose) printf("actual sum %"PRId64" \n", sum);
assert(sum == 0);
sum = 0;
for (int i = 0; i < NUM_ELEMENTS; i++) {
//printf("checkpointed: i %d val %"PRId64" \n", i, checkpointed_data[i]);
if (checkpointed_data[i] != INT64_MAX) {
sum += checkpointed_data[i];
}
}
if (verbose) printf("checkpointed sum %"PRId64" \n", sum);
assert(sum == 0);
}
static void
cachetable_test (void) {
const int test_limit = NUM_ELEMENTS;
//
// let's set up the data
//
for (int64_t i = 0; i < NUM_ELEMENTS; i++) {
data[i] = 0;
checkpointed_data[i] = 0;
}
time_of_test = 60;
int r;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test-put-checkpoint.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
toku_cachefile_set_userdata(
f1,
NULL,
NULL,
NULL,
NULL,
dummy_int_checkpoint_userdata,
test_begin_checkpoint, // called in begin_checkpoint
dummy_int_checkpoint_userdata,
NULL,
NULL
);
toku_pthread_t time_tid;
toku_pthread_t checkpoint_tid;
toku_pthread_t move_tid[NUM_MOVER_THREADS];
toku_pthread_t merge_and_split_tid[NUM_MOVER_THREADS];
run_test = TRUE;
for (int i = 0; i < NUM_MOVER_THREADS; i++) {
r = toku_pthread_create(&move_tid[i], NULL, move_numbers, NULL);
assert_zero(r);
}
for (int i = 0; i < NUM_MOVER_THREADS; i++) {
r = toku_pthread_create(&merge_and_split_tid[i], NULL, merge_and_split, NULL);
assert_zero(r);
}
r = toku_pthread_create(&checkpoint_tid, NULL, checkpoints, NULL);
assert_zero(r);
r = toku_pthread_create(&time_tid, NULL, test_time, NULL);
assert_zero(r);
void *ret;
r = toku_pthread_join(time_tid, &ret);
assert_zero(r);
r = toku_pthread_join(checkpoint_tid, &ret);
assert_zero(r);
for (int i = 0; i < NUM_MOVER_THREADS; i++) {
r = toku_pthread_join(merge_and_split_tid[i], &ret);
assert_zero(r);
}
for (int i = 0; i < NUM_MOVER_THREADS; i++) {
r = toku_pthread_join(move_tid[i], &ret);
assert_zero(r);
}
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
sum_vals();
if (verbose) printf("num_checkpoints %d\n", num_checkpoints);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment