Commit 5d2443fb authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

refs #5702 add verifying range queries to perf_iibench, only for the primary key


git-svn-id: file:///svn/toku/tokudb@52600 c7de825b-a66e-492c-adef-691d508d4ae1
parent c86344b1
......@@ -25,50 +25,106 @@
//
static const int iibench_num_dbs = 4;
static const size_t iibench_secondary_key_size = sizeof(uint64_t) * 2;
static const size_t iibench_secondary_key_size = 16;
struct iibench_row {
int64_t pk;
uint64_t pk;
int64_t a;
int64_t b;
int64_t c;
};
static int64_t hash(int64_t key) {
int64_t hash = 0;
char *buf = (char *) &key;
struct iibench_secondary_row {
int64_t column;
uint64_t pk;
};
static int64_t hash(uint64_t key) {
uint64_t hash = 0;
uint8_t *buf = (uint8_t *) &key;
for (int i = 0; i < 8; i++) {
hash += (((buf[i] + 1) * 17) & 0xFF) << (i * 8);
}
return hash;
}
static void iibench_generate_secondary_keys(int64_t pk, struct iibench_row *row) {
row->a = hash(pk);
row->b = hash(pk * 2);
row->c = hash(pk * 3);
static int64_t iibench_generate_column_by_pk(int64_t pk, int db_idx) {
invariant(db_idx > 0);
return hash(pk * db_idx);
}
static void iibench_generate_row(int64_t pk, struct iibench_row *row) {
row->a = iibench_generate_column_by_pk(pk, 1);
row->b = iibench_generate_column_by_pk(pk, 2);
row->c = iibench_generate_column_by_pk(pk, 3);
}
static void UU() iibench_verify_row(struct iibench_row *row) {
static void iibench_parse_row(const DBT *key, const DBT *val, struct iibench_row *row) {
char *CAST_FROM_VOIDP(val_buf, val->data);
invariant(key->size == 8);
invariant(val->size == 24);
memcpy(&row->pk, key->data, 8);
memcpy(&row->a, val_buf + 0, 8);
memcpy(&row->b, val_buf + 8, 8);
memcpy(&row->c, val_buf + 16, 8);
}
static void UU() iibench_verify_row(const struct iibench_row *row) {
struct iibench_row expected_row;
iibench_generate_secondary_keys(row->pk, &expected_row);
iibench_generate_row(row->pk, &expected_row);
invariant(row->a == expected_row.a);
invariant(row->b == expected_row.b);
invariant(row->c == expected_row.c);
}
static void iibench_fill_key_buf(int64_t pk, int64_t *buf) {
memcpy(&buf[0], &pk, sizeof(int64_t));
static void iibench_parse_secondary_row(const DBT *key, const DBT *val, struct iibench_secondary_row *row) {
char *CAST_FROM_VOIDP(key_buf, key->data);
invariant(key->size == iibench_secondary_key_size);
invariant(val->size == 0);
memcpy(&row->column, key_buf + 0, 8);
memcpy(&row->pk, key_buf + 8, 8);
}
static void UU() iibench_verify_secondary_row(const struct iibench_secondary_row *row, int db_idx) {
int64_t expected = iibench_generate_column_by_pk(row->pk, db_idx);
invariant(row->column == expected);
}
static void iibench_fill_val_buf(int64_t pk, int64_t *buf) {
static void iibench_fill_key_buf(uint64_t pk, int64_t *buf) {
memcpy(&buf[0], &pk, 8);
}
static void iibench_fill_val_buf(uint64_t pk, int64_t *buf) {
struct iibench_row row;
iibench_generate_secondary_keys(pk, &row);
iibench_generate_row(pk, &row);
memcpy(&buf[0], &row.a, sizeof(row.a));
memcpy(&buf[1], &row.b, sizeof(row.b));
memcpy(&buf[2], &row.c, sizeof(row.c));
}
static int iibench_get_db_idx(DB *db) {
DESCRIPTOR desc = db->cmp_descriptor;
invariant_notnull(desc->dbt.data);
invariant(desc->dbt.size == sizeof(int));
int db_idx;
memcpy(&db_idx, desc->dbt.data, desc->dbt.size);
return db_idx;
}
static void iibench_rangequery_cb(DB *db, const DBT *key, const DBT *val, void *extra) {
invariant_null(extra);
int db_idx = iibench_get_db_idx(db);
if (db_idx == 0) {
struct iibench_row row;
iibench_parse_row(key, val, &row);
iibench_verify_row(&row);
} else {
struct iibench_secondary_row row;
iibench_parse_secondary_row(key, val, &row);
iibench_verify_secondary_row(&row, db_idx);
}
}
struct iibench_op_extra {
uint64_t autoincrement;
};
......@@ -137,43 +193,21 @@ static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
return r;
}
static void
stress_table(DB_ENV* env, DB **dbs, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_put_threads;
struct iibench_op_extra iib_extra = {
.autoincrement = 0
};
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbs, env, cli_args);
myargs[i].operation = iibench_put_op;
myargs[i].operation_extra = &iib_extra;
}
const bool crash_at_end = false;
run_workers(myargs, num_threads, cli_args->num_seconds, crash_at_end, cli_args);
}
static int iibench_generate_row_for_put(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *UU(src_key), const DBT *src_val) {
DESCRIPTOR desc = dest_db->cmp_descriptor;
invariant(src_db != dest_db);
invariant_notnull(src_key->data);
invariant(src_key->size == sizeof(int64_t));
invariant(src_key->size == 8);
invariant(dest_key->size == iibench_secondary_key_size);
invariant(dest_key->flags == DB_DBT_REALLOC);
invariant_notnull(desc->dbt.data);
invariant(desc->dbt.size == sizeof(int));
// Get the column index from the descriptor. This is a secondary index
// Get the db index from the descriptor. This is a secondary index
// so it has to be greater than zero (which would be the pk). Then
// grab the appropriate secondary key from the source val, which is
// an array of the 3 columns, so we have to subtract 1 from the index.
int column_index;
memcpy(&column_index, desc->dbt.data, desc->dbt.size);
invariant(column_index > 0 && column_index < 4);
int db_idx = iibench_get_db_idx(dest_db);
invariant(db_idx > 0 && db_idx < 4);
int64_t *CAST_FROM_VOIDP(columns, src_val->data);
int64_t secondary_key = columns[column_index - 1];
int64_t secondary_key = columns[db_idx - 1];
// First write down the secondary key, then the primary key (in src_key)
int64_t *CAST_FROM_VOIDP(dest_key_buf, dest_key->data);
......@@ -200,6 +234,79 @@ static DB *iibench_set_descriptor_after_db_opens(DB_ENV *env, DB *db, int idx, r
return db;
}
static int iibench_compare_keys(DB *db, const DBT *a, const DBT *b) {
int db_idx = iibench_get_db_idx(db);
if (db_idx == 0) {
invariant(a->size == 8);
invariant(b->size == 8);
uint64_t x = *(uint64_t *) a->data;
uint64_t y = *(uint64_t *) b->data;
if (x < y) {
return -1;
} else if (x == y) {
return 0;
} else {
return 1;
}
} else {
invariant(a->size == 16);
invariant(b->size == 16);
int64_t x = *(int64_t *) a->data;
int64_t y = *(int64_t *) b->data;
uint64_t pk_x = *(uint64_t *) (((char *) a->data) + 8);
uint64_t pk_y = *(uint64_t *) (((char *) b->data) + 8);
if (x < y) {
return -1;
} else if (x == y) {
if (pk_x < pk_y) {
return -1;
} else if (pk_x == pk_y) {
return 0;
} else {
return 1;
}
} else {
return 1;
}
}
}
// Do a range query over the primary index, verifying the contents of the rows
static int iibench_rangequery_op(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) {
DB *db = arg->dbp[0];
rangequery_db(db, txn, arg, iibench_rangequery_cb, nullptr);
increment_counter(stats_extra, PTQUERIES, 1);
return 0;
}
static void
stress_table(DB_ENV* env, DB **dbs, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_put_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
// Put threads do iibench-like inserts with an auto-increment primary key
// Query threads do range queries of a certain size, verifying row contents.
struct iibench_op_extra put_extra = {
.autoincrement = 0
};
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbs, env, cli_args);
if (i < cli_args->num_put_threads) {
myargs[i].operation = iibench_put_op;
myargs[i].operation_extra = &put_extra;
} else {
myargs[i].operation = iibench_rangequery_op;
myargs[i].operation_extra = nullptr;
myargs[i].txn_flags |= DB_TXN_READ_ONLY;
myargs[i].sleep_ms = 1000; // 1 second between range queries
}
}
const bool crash_at_end = false;
run_workers(myargs, num_threads, cli_args->num_seconds, crash_at_end, cli_args);
}
int test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
args.num_elements = 0; // want to start with empty DBs
......@@ -217,6 +324,6 @@ int test_main(int argc, char *const argv[]) {
}
args.env_args.generate_put_callback = iibench_generate_row_for_put;
after_db_open_hook = iibench_set_descriptor_after_db_opens;
perf_test_main(&args);
perf_test_main_with_cmp(&args, iibench_compare_keys);
return 0;
}
......@@ -1005,13 +1005,22 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext
return r;
}
typedef void (*rangequery_row_cb)(DB *db, const DBT *key, const DBT *val, void *extra);
struct rangequery_cb_extra {
int rows_read;
int limit;
// Call cb(db, key, value, cb_extra) on up to $limit rows.
const int limit;
const rangequery_row_cb cb;
DB *const db;
void *const cb_extra;
};
static int rangequery_cb(const DBT *UU(key), const DBT *UU(value), void *extra) {
static int rangequery_cb(const DBT *key, const DBT *value, void *extra) {
struct rangequery_cb_extra *CAST_FROM_VOIDP(info, extra);
if (info->cb != nullptr) {
info->cb(info->db, key, value, info->cb_extra);
}
if (++info->rows_read >= info->limit) {
return 0;
} else {
......@@ -1019,7 +1028,7 @@ static int rangequery_cb(const DBT *UU(key), const DBT *UU(value), void *extra)
}
}
static void rangequery_db(DB *db, DB_TXN *txn, ARG arg) {
static void rangequery_db(DB *db, DB_TXN *txn, ARG arg, rangequery_row_cb cb, void *cb_extra) {
const int limit = arg->cli->range_query_limit;
int r;
......@@ -1036,10 +1045,13 @@ static void rangequery_db(DB *db, DB_TXN *txn, ARG arg) {
r = db->cursor(db, txn, &cursor, 0); CKERR(r);
r = cursor->c_pre_acquire_range_lock(cursor, &start_key, &end_key); CKERR(r);
struct rangequery_cb_extra extra;
extra.rows_read = 0;
extra.limit = limit;
struct rangequery_cb_extra extra = {
.rows_read = 0,
.limit = limit,
.cb = cb,
.db = db,
.cb_extra = cb_extra,
};
r = cursor->c_getf_set(cursor, 0, &start_key, rangequery_cb, &extra);
while (r == 0 && extra.rows_read < extra.limit && run_test) {
r = cursor->c_getf_next(cursor, 0, rangequery_cb, &extra);
......@@ -1051,7 +1063,7 @@ static void rangequery_db(DB *db, DB_TXN *txn, ARG arg) {
static int UU() rangequery_op(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) {
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB *db = arg->dbp[db_index];
rangequery_db(db, txn, arg);
rangequery_db(db, txn, arg, nullptr, nullptr);
increment_counter(stats_extra, PTQUERIES, 1);
return 0;
}
......@@ -2647,7 +2659,7 @@ UU() stress_recover(struct cli_args *args) {
}
static void
test_main(struct cli_args *args, bool fill_with_zeroes)
open_and_stress_tables(struct cli_args *args, bool fill_with_zeroes, int (*cmp)(DB *, const DBT *, const DBT *))
{
if ((args->key_size < 8 && args->key_size != 4) ||
(args->val_size < 8 && args->val_size != 4)) {
......@@ -2666,7 +2678,7 @@ test_main(struct cli_args *args, bool fill_with_zeroes)
&env,
dbs,
args->num_DBs,
stress_cmp,
cmp,
args
);
{ int chk_r = fill_tables(env, dbs, args, fill_with_zeroes); CKERR(chk_r); }
......@@ -2676,7 +2688,7 @@ test_main(struct cli_args *args, bool fill_with_zeroes)
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_cmp,
cmp,
args); CKERR(chk_r); }
if (args->warm_cache) {
do_warm_cache(env, dbs, args);
......@@ -2690,14 +2702,21 @@ static void
UU() stress_test_main(struct cli_args *args) {
// Begin the test with fixed size values equal to zero.
// This is important for correctness testing.
test_main(args, true);
open_and_stress_tables(args, true, stress_cmp);
}
static void
UU() perf_test_main(struct cli_args *args) {
// Do not begin the test by creating a table of all zeroes.
// We want to control the row size and its compressibility.
test_main(args, false);
open_and_stress_tables(args, false, stress_cmp);
}
static void
UU() perf_test_main_with_cmp(struct cli_args *args, int (*cmp)(DB *, const DBT *, const DBT *)) {
// Do not begin the test by creating a table of all zeroes.
// We want to control the row size and its compressibility.
open_and_stress_tables(args, false, cmp);
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment