Commit c322ded9 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

fixes #5924 add perf range query. the range query limit is specified via the command line.


git-svn-id: file:///svn/toku/tokudb@52432 c7de825b-a66e-492c-adef-691d508d4ae1
parent df9d3534
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id: perf_rangequery.cc 52406 2013-01-24 00:45:19Z esmet $"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = rangequery_op;
myargs[i].txn_flags |= DB_TXN_READ_ONLY;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
perf_test_main(&args);
return 0;
}
...@@ -98,6 +98,7 @@ struct cli_args { ...@@ -98,6 +98,7 @@ struct cli_args {
bool do_recover; // true if we should run recover bool do_recover; // true if we should run recover
int num_update_threads; // number of threads running updates int num_update_threads; // number of threads running updates
int num_put_threads; // number of threads running puts int num_put_threads; // number of threads running puts
int range_query_limit; // how many rows to look at for range queries
bool serial_insert; bool serial_insert;
bool interleave; // for insert benchmarks, whether to interleave separate threads' puts (or segregate them) bool interleave; // for insert benchmarks, whether to interleave separate threads' puts (or segregate them)
bool crash_on_operation_failure; bool crash_on_operation_failure;
...@@ -635,6 +636,15 @@ static int generate_row_for_put( ...@@ -635,6 +636,15 @@ static int generate_row_for_put(
// int if the key size is exactly 4, and it treats // int if the key size is exactly 4, and it treats
// the key as an 8 byte int if the key size is 8 or more. // the key as an 8 byte int if the key size is 8 or more.
static int64_t random_bounded_key(struct random_data *random_data, ARG arg) {
// Effect: Returns a random key in the table, possible bounded by the number of elements.
int64_t key = myrandom_r(random_data);
if (arg->bounded_element_range && arg->cli->num_elements > 0) {
key = key % arg->cli->num_elements;
}
return key;
}
static int64_t breverse(int64_t v) static int64_t breverse(int64_t v)
// Effect: return the bits in i, reversed // Effect: return the bits in i, reversed
// Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious // Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
...@@ -677,10 +687,7 @@ fill_key_buf_random(struct random_data *random_data, uint8_t *data, ARG arg) { ...@@ -677,10 +687,7 @@ fill_key_buf_random(struct random_data *random_data, uint8_t *data, ARG arg) {
// Effect: Fill data with a random, little-endian, 4 or 8 byte integer, possibly // Effect: Fill data with a random, little-endian, 4 or 8 byte integer, possibly
// bounded by the size of the table, and padded with zeroes until key_size. // bounded by the size of the table, and padded with zeroes until key_size.
// Requires, Notes: see fill_key_buf() // Requires, Notes: see fill_key_buf()
int64_t key = myrandom_r(random_data); int64_t key = random_bounded_key(random_data, arg);
if (arg->bounded_element_range && arg->cli->num_elements > 0) {
key = key % arg->cli->num_elements;
}
fill_key_buf(key, data, arg->cli); fill_key_buf(key, data, arg->cli);
} }
...@@ -997,6 +1004,57 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext ...@@ -997,6 +1004,57 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext
return r; return r;
} }
struct rangequery_cb_extra {
int rows_read;
int limit;
};
static int rangequery_cb(const DBT *UU(key), const DBT *UU(value), void *extra) {
struct rangequery_cb_extra *CAST_FROM_VOIDP(info, extra);
if (++info->rows_read >= info->limit) {
return 0;
} else {
return TOKUDB_CURSOR_CONTINUE;
}
}
static void rangequery_db(DB *db, DB_TXN *txn, ARG arg) {
const int limit = arg->cli->range_query_limit;
int r;
DBC *cursor;
DBT start_key, end_key;
uint8_t start_keybuf[arg->cli->key_size];
uint8_t end_keybuf[arg->cli->key_size];
dbt_init(&start_key, start_keybuf, sizeof start_keybuf);
dbt_init(&end_key, end_keybuf, sizeof end_keybuf);
const uint64_t start_k = random_bounded_key(arg->random_data, arg);
fill_key_buf(start_k, start_keybuf, arg->cli);
fill_key_buf(start_k + limit, end_keybuf, arg->cli);
r = db->cursor(db, txn, &cursor, 0); CKERR(r);
r = cursor->c_pre_acquire_range_lock(cursor, &start_key, &end_key); CKERR(r);
struct rangequery_cb_extra extra;
extra.rows_read = 0;
extra.limit = limit;
r = cursor->c_getf_set(cursor, 0, &start_key, rangequery_cb, &extra);
while (r == 0 && extra.rows_read < extra.limit && run_test) {
r = cursor->c_getf_next(cursor, 0, rangequery_cb, &extra);
}
r = cursor->c_close(cursor); CKERR(r);
}
static int UU() rangequery_op(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) {
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB *db = arg->dbp[db_index];
rangequery_db(db, txn, arg);
increment_counter(stats_extra, PTQUERIES, 1);
return 0;
}
static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0; int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
...@@ -1173,10 +1231,7 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1173,10 +1231,7 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
for (uint32_t i = 0; i < arg->cli->txn_size; i++) { for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
if (arg->prelock_updates) { if (arg->prelock_updates) {
if (i == 0) { if (i == 0) {
update_key = myrandom_r(arg->random_data); update_key = random_bounded_key(arg->random_data, arg);
if (arg->bounded_element_range) {
update_key = update_key % arg->cli->num_elements;
}
const int max_key_in_table = arg->cli->num_elements - 1; const int max_key_in_table = arg->cli->num_elements - 1;
const bool range_wraps = (update_key + (int) arg->cli->txn_size - 1) > max_key_in_table; const bool range_wraps = (update_key + (int) arg->cli->txn_size - 1) > max_key_in_table;
...@@ -1968,6 +2023,7 @@ static struct cli_args UU() get_default_args(void) { ...@@ -1968,6 +2023,7 @@ static struct cli_args UU() get_default_args(void) {
.do_recover = false, .do_recover = false,
.num_update_threads = 1, .num_update_threads = 1,
.num_put_threads = 1, .num_put_threads = 1,
.range_query_limit = 100,
.serial_insert = false, .serial_insert = false,
.interleave = false, .interleave = false,
.crash_on_operation_failure = true, .crash_on_operation_failure = true,
...@@ -2340,6 +2396,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -2340,6 +2396,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
INT32_ARG_NONNEG("--num_ptquery_threads", num_ptquery_threads, " threads"), INT32_ARG_NONNEG("--num_ptquery_threads", num_ptquery_threads, " threads"),
INT32_ARG_NONNEG("--num_put_threads", num_put_threads, " threads"), INT32_ARG_NONNEG("--num_put_threads", num_put_threads, " threads"),
INT32_ARG_NONNEG("--num_update_threads", num_update_threads, " threads"), INT32_ARG_NONNEG("--num_update_threads", num_update_threads, " threads"),
INT32_ARG_NONNEG("--range_query_limit", range_query_limit, " rows"),
UINT32_ARG("--txn_size", txn_size, " rows"), UINT32_ARG("--txn_size", txn_size, " rows"),
UINT32_ARG("--num_bucket_mutexes", env_args.num_bucket_mutexes, " mutexes"), UINT32_ARG("--num_bucket_mutexes", env_args.num_bucket_mutexes, " mutexes"),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment