threaded_stress_test_helpers.h 41.4 KB
Newer Older
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/* -*- mode: C; c-basic-offset: 4 -*- */

#ident "Copyright (c) 2009 Tokutek Inc.  All rights reserved."
#ident "$Id$"

#ifndef _THREADED_STRESS_TEST_HELPERS_H_
#define _THREADED_STRESS_TEST_HELPERS_H_

#include "test.h"

#include "rwlock.h"

#include <stdio.h>
#include <stdlib.h>

#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
21
#include <malloc.h>
Zardosht Kasheff's avatar
Zardosht Kasheff committed
22

23 24 25 26 27 28 29 30 31
static inline int32_t
myrandom_r(struct random_data *buf)
{
    int32_t x;
    int r = random_r(buf, &x);
    CKERR(r);
    return x;
}

32
volatile bool run_test; // should be volatile since we are communicating through this variable.
Zardosht Kasheff's avatar
Zardosht Kasheff committed
33 34

typedef struct arg *ARG;
35
typedef int (*operation_t)(DB_TXN *txn, ARG arg, void* operation_extra);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
36 37 38 39 40 41 42 43 44 45

typedef int (*test_update_callback_f)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra);

enum stress_lock_type {
    STRESS_LOCK_NONE = 0,
    STRESS_LOCK_SHARED,
    STRESS_LOCK_EXCL
};

struct arg {
46 47 48 49 50 51 52 53 54 55 56 57 58 59
    int num_elements; // number of elements per DB
    DB **dbp; // array of DBs
    int num_DBs; // number of DBs
    DB_ENV* env; // environment used
    bool bounded_element_range; // true if elements in dictionary are bounded
                                // by num_elements, that is, all keys in each
                                // DB are in [0, num_elements)
                                // false otherwise
    int sleep_ms; // number of milliseconds to sleep between operations
    u_int32_t txn_type; // isolation level for txn running operation
    operation_t operation; // function that is the operation to be run
    void* operation_extra; // extra parameter passed to operation
    enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking
    bool crash_on_operation_failure; // true if we should crash if operation returns non-zero, false otherwise
60
    struct random_data *random_data; // state for random_r
61
    bool single_txn;
62 63
};

64
struct env_args {
65 66 67 68 69
    int node_size;
    int basement_node_size;
    int checkpointing_period;
    int cleaner_period;
    int cleaner_iterations;
70
    u_int64_t cachetable_size;
71
    char *envdir;
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
    test_update_callback_f update_function; // update callback function
};

struct cli_args {
    int num_elements; // number of elements per DB
    int num_DBs; // number of DBs
    int time_of_test; // how long test should run
    bool only_create; // true if want to only create DBs but not run stress
    bool only_stress; // true if DBs are already created and want to only run stress
    int update_broadcast_period_ms; // specific to test_stress3
    int num_ptquery_threads; // number of threads to run point queries
    bool do_test_and_crash; // true if we should crash after running stress test. For recovery tests.
    bool do_recover; // true if we should run recover
    int num_update_threads; // number of threads running updates
    bool crash_on_update_failure; 
    bool print_performance;
    bool print_thread_performance;
    int performance_period;
    u_int32_t update_txn_size; // for clients that do updates, specifies number of updates per txn
    u_int32_t key_size; // number of bytes in vals. Must be at least 4
    u_int32_t val_size; // number of bytes in vals. Must be at least 4
    struct env_args env_args; // specifies environment variables
94
    bool single_txn;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
95 96 97 98
};

DB_TXN * const null_txn = 0;

99 100
static void arg_init(struct arg *arg, int num_elements, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
    arg->num_elements = num_elements;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
101
    arg->dbp = dbp;
102
    arg->num_DBs = cli_args->num_DBs;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
103
    arg->env = env;
104
    arg->bounded_element_range = true;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
105 106 107
    arg->sleep_ms = 0;
    arg->lock_type = STRESS_LOCK_NONE;
    arg->txn_type = DB_TXN_SNAPSHOT;
108
    arg->crash_on_operation_failure = cli_args->crash_on_update_failure;
109
    arg->single_txn = cli_args->single_txn;
110 111 112 113
    arg->operation_extra = NULL;
}

struct worker_extra {
114
    struct arg* thread_arg;
115 116
    toku_pthread_mutex_t *operation_lock_mutex;
    struct rwlock *operation_lock;
117 118
    int64_t num_operations_completed;
    int64_t pad[4]; // pad to 64 bytes
119 120 121 122 123
};

static void lock_worker_op(struct worker_extra* we) {
    ARG arg = we->thread_arg;
    if (arg->lock_type != STRESS_LOCK_NONE) {
124
        if (0) toku_pthread_mutex_lock(we->operation_lock_mutex);
125 126 127 128 129 130 131
        if (arg->lock_type == STRESS_LOCK_SHARED) {
            rwlock_read_lock(we->operation_lock, we->operation_lock_mutex);
        } else if (arg->lock_type == STRESS_LOCK_EXCL) {
            rwlock_write_lock(we->operation_lock, we->operation_lock_mutex);
        } else {
            assert(false);
        }
132
        if (0) toku_pthread_mutex_unlock(we->operation_lock_mutex);
133 134 135 136 137
    }
}

static void unlock_worker_op(struct worker_extra* we) {
    ARG arg = we->thread_arg;
138
    if (arg->lock_type != STRESS_LOCK_NONE) {
139
        if (0) toku_pthread_mutex_lock(we->operation_lock_mutex);
140 141 142 143 144 145 146
        if (arg->lock_type == STRESS_LOCK_SHARED) {
            rwlock_read_unlock(we->operation_lock);
        } else if (arg->lock_type == STRESS_LOCK_EXCL) {
            rwlock_write_unlock(we->operation_lock);
        } else {
            assert(false);
        }
147
        if (0) toku_pthread_mutex_unlock(we->operation_lock_mutex);
148
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
149 150 151
}

static void *worker(void *arg_v) {
152
    int r;
153 154
    struct worker_extra* we = arg_v;
    ARG arg = we->thread_arg;
155
    struct random_data random_data;
156
    memset(&random_data, 0, sizeof random_data);
157
    char *random_buf = toku_xmalloc(8);
158
    memset(random_buf, 0, 8);
159 160 161
    r = initstate_r(random(), random_buf, 8, &random_data);
    assert_zero(r);
    arg->random_data = &random_data;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
162 163
    DB_ENV *env = arg->env;
    DB_TXN *txn = NULL;
164
    if (verbose) {
165
        printf("%lu starting %p\n", toku_pthread_self(), arg->operation);
166
    }
167 168 169
    if (arg->single_txn) {
        r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
170
    while (run_test) {
171
        lock_worker_op(we);
172 173 174
        if (!arg->single_txn) {
            r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
        }
175
        r = arg->operation(txn, arg, arg->operation_extra);
176
        if (r == 0) {
177 178 179
            if (!arg->single_txn) {
                CHK(txn->commit(txn,0));
            }
180
        } else {
181
            if (arg->crash_on_operation_failure) {
182 183
                CKERR(r);
            } else {
184 185 186
                if (!arg->single_txn) {
                    CHK(txn->abort(txn));
                }
187 188
            }
        }
189 190
        unlock_worker_op(we);
        we->num_operations_completed++;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
191 192 193 194
        if (arg->sleep_ms) {
            usleep(arg->sleep_ms * 1000);
        }
    }
195 196 197
    if (arg->single_txn) {
        CHK(txn->commit(txn, 0));
    }
198 199
    if (verbose)
        printf("%lu returning\n", toku_pthread_self());
200
    toku_free(random_buf);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
201 202 203 204 205 206 207 208 209 210
    return arg;
}

typedef struct scan_cb_extra *SCAN_CB_EXTRA;
struct scan_cb_extra {
    bool fast;
    int64_t curr_sum;
    int64_t num_elements;
};

211 212 213 214 215 216
struct scan_op_extra {
    bool fast;
    bool fwd;
    bool prefetch;
};

Zardosht Kasheff's avatar
Zardosht Kasheff committed
217 218 219 220 221 222 223 224 225 226 227 228
static int
scan_cb(const DBT *a, const DBT *b, void *arg_v) {
    SCAN_CB_EXTRA cb_extra = arg_v;
    assert(a);
    assert(b);
    assert(cb_extra);
    assert(b->size >= sizeof(int));
    cb_extra->curr_sum += *(int *)b->data;
    cb_extra->num_elements++;
    return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0;
}

229 230 231 232 233 234 235
static int scan_op_and_maybe_check_sum(
    DB* db, 
    DB_TXN *txn, 
    struct scan_op_extra* sce, 
    bool check_sum
    ) 
{
Zardosht Kasheff's avatar
Zardosht Kasheff committed
236 237 238 239
    int r = 0;
    DBC* cursor = NULL;

    struct scan_cb_extra e;
240
    e.fast = sce->fast;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
241 242 243 244
    e.curr_sum = 0;
    e.num_elements = 0;

    CHK(db->cursor(db, txn, &cursor, 0));
245
    if (sce->prefetch) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
246 247 248 249
        r = cursor->c_pre_acquire_range_lock(cursor, db->dbt_neg_infty(), db->dbt_pos_infty());
        assert(r == 0);
    }
    while (r != DB_NOTFOUND) {
250
        if (sce->fwd) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
            r = cursor->c_getf_next(cursor, 0, scan_cb, &e);
        }
        else {
            r = cursor->c_getf_prev(cursor, 0, scan_cb, &e);
        }
        assert(r==0 || r==DB_NOTFOUND);
    }
    CHK(cursor->c_close(cursor));
    if (r == DB_NOTFOUND) {
        r = 0;
    }
    if (check_sum && e.curr_sum) {
        printf("e.curr_sum: %"PRId64" e.num_elements: %"PRId64" \n", e.curr_sum, e.num_elements);
        assert(false);
    }
    return r;
}

static int generate_row_for_put(
    DB *UU(dest_db), 
    DB *UU(src_db), 
    DBT *dest_key, 
    DBT *dest_val, 
    const DBT *src_key, 
    const DBT *src_val
    ) 
{    
    dest_key->data = src_key->data;
    dest_key->size = src_key->size;
    dest_key->flags = 0;
    dest_val->data = src_val->data;
    dest_val->size = src_val->size;
    dest_val->flags = 0;
    return 0;
}

287 288 289 290
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
    return 0;
}

291 292 293 294 295 296 297 298 299 300 301 302 303 304
static int UU() xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
    size_t s = 256;
    void *p = toku_xmalloc(s);
    toku_free(p);
    return 0;
}

static int UU() malloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
    size_t s = 256;
    void *p = malloc(s);
    free(p);
    return 0;
}

305
static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra)) {
306
    DB_ENV* env = arg->env;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
307 308 309 310 311 312 313 314 315 316 317
    int r;
    for (int num = 0; num < 2; num++) {
        DB *db_load;
        uint32_t db_flags = 0;
        uint32_t dbt_flags = 0;
        r = db_create(&db_load, env, 0);
        assert(r == 0);
        r = db_load->open(db_load, txn, "loader-db", NULL, DB_BTREE, DB_CREATE, 0666);
        assert(r == 0);
        DB_LOADER *loader;
        u_int32_t loader_flags = (num == 0) ? 0 : LOADER_USE_PUTS;
318
        r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
319
        CKERR(r);
320

Zardosht Kasheff's avatar
Zardosht Kasheff committed
321 322
        for (int i = 0; i < 1000; i++) {
            DBT key, val;
323
            int rand_key = i;
324
            int rand_val = myrandom_r(arg->random_data);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
325 326 327 328
            dbt_init(&key, &rand_key, sizeof(rand_key));
            dbt_init(&val, &rand_val, sizeof(rand_val));
            r = loader->put(loader, &key, &val); CKERR(r);
        }
329

Zardosht Kasheff's avatar
Zardosht Kasheff committed
330 331 332 333 334 335 336
        r = loader->close(loader); CKERR(r);
        r = db_load->close(db_load, 0); CKERR(r);
        r = env->dbremove(env, txn, "loader-db", NULL, 0); CKERR(r);
    }
    return 0;
}

337
static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
338
    int r;
339 340 341
    // callback is designed to run on tests with one DB
    // no particular reason why, just the way it was 
    // originally done
342
    int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
343
    DB* db = arg->dbp[db_index];
344
    int rand_key = myrandom_r(arg->random_data);
345 346
    if (arg->bounded_element_range) {
        rand_key = rand_key % arg->num_elements;
347 348 349 350 351 352 353 354 355
    }
    DBT key;
    dbt_init(&key, &rand_key, sizeof rand_key);
    u_int64_t less,equal,greater;
    int is_exact;
    r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact);
    assert(r == 0);
    return r;
}
Zardosht Kasheff's avatar
Zardosht Kasheff committed
356

357
static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
358
    int r;
359 360 361 362
    for (int i = 0; i < arg->num_DBs; i++) {
        DB* db = arg->dbp[i];
        r = db->verify_with_progress(db, NULL, NULL, 0, 0);
    }
363 364 365 366
    CKERR(r);
    return r;
}

367 368 369 370 371 372 373
static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra) {
    struct scan_op_extra* extra = operation_extra;
    for (int i = 0; i < arg->num_DBs; i++) {
        int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, true);
        assert_zero(r);
    }
    return 0;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
374 375
}

376 377 378 379 380 381 382
static int UU() scan_op_no_check(DB_TXN *txn, ARG arg, void* operation_extra) {
    struct scan_op_extra* extra = operation_extra;
    for (int i = 0; i < arg->num_DBs; i++) {
        int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, false);
        assert_zero(r);
    }
    return 0;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
383 384
}

385
static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, BOOL check) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
386
    int r;
387
    int rand_key = myrandom_r(arg->random_data);
388 389
    if (arg->bounded_element_range) {
        rand_key = rand_key % arg->num_elements;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
390 391 392 393 394 395 396 397 398 399
    }
    DBT key, val;
    memset(&val, 0, sizeof val);
    dbt_init(&key, &rand_key, sizeof rand_key);
    r = db->get(db, txn, &key, &val, 0);
    if (check) assert(r != DB_NOTFOUND);
    r = 0;
    return r;
}

400
static int UU() ptquery_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
401
    int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
402 403
    DB* db = arg->dbp[db_index];
    return ptquery_and_maybe_check_op(db, txn, arg, TRUE);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
404 405
}

406
static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
407
    int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
408 409
    DB* db = arg->dbp[db_index];
    return ptquery_and_maybe_check_op(db, txn, arg, FALSE);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
410 411
}

412 413 414 415 416 417 418 419 420
static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
    int db_index = arg->num_DBs > 1 ? random()%arg->num_DBs : 0;
    DB* db = arg->dbp[db_index];
    DBC* cursor = NULL;
    int r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
    r = cursor->c_close(cursor); assert(r == 0);
    return 0;
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
#define MAX_RANDOM_VAL 10000

enum update_type {
    UPDATE_ADD_DIFF,
    UPDATE_NEGATE,
    UPDATE_WITH_HISTORY
};

struct update_op_extra {
    enum update_type type;
    int pad_bytes;
    union {
        struct {
            int diff;
        } d;
        struct {
            int expected;
            int new;
        } h;
    } u;
};

443 444 445 446 447 448
struct update_op_args {
    int *update_history_buffer;
    u_int32_t update_txn_size;
    int update_pad_frequency;
};

449
static struct update_op_args UU() get_update_op_args(struct cli_args* cli_args, int* update_history_buffer) {    
450 451 452 453 454 455 456
    struct update_op_args uoe;
    uoe.update_history_buffer = update_history_buffer;
    uoe.update_pad_frequency = cli_args->num_elements/100; // arbitrary
    uoe.update_txn_size = cli_args->update_txn_size;
    return uoe;
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
static u_int64_t update_count = 0;

static int update_op_callback(DB *UU(db), const DBT *UU(key),
                              const DBT *old_val,
                              const DBT *extra,
                              void (*set_val)(const DBT *new_val,
                                              void *set_extra),
                              void *set_extra)
{
    int old_int_val = 0;
    if (old_val) {
        old_int_val = *(int*)old_val->data;
    }
    assert(extra->size == sizeof(struct update_op_extra));
    struct update_op_extra *e = extra->data;

    int new_int_val;
    switch (e->type) {
    case UPDATE_ADD_DIFF:
        new_int_val = old_int_val + e->u.d.diff;
        break;
    case UPDATE_NEGATE:
        new_int_val = -old_int_val;
        break;
    case UPDATE_WITH_HISTORY:
        assert(old_int_val == e->u.h.expected);
        new_int_val = e->u.h.new;
        break;
    default:
        assert(FALSE);
    }

    DBT new_val;
    u_int32_t data_size = sizeof(int) + e->pad_bytes;
    char* data [data_size];
    memset(data, 0, data_size);
    memcpy(data, &new_int_val, sizeof(new_int_val));
    set_val(dbt_init(&new_val, data, data_size), set_extra);
    return 0;
}

498
static int UU()update_op2(DB_TXN* txn, ARG arg, void* operation_extra) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
499
    int r;
500
    int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
501
    DB* db = arg->dbp[db_index];
Zardosht Kasheff's avatar
Zardosht Kasheff committed
502 503 504 505
    int curr_val_sum = 0;
    DBT key, val;
    int rand_key;
    int rand_key2;
506
    struct update_op_args* op_args = operation_extra;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
507 508 509 510 511
    update_count++;
    struct update_op_extra extra;
    memset(&extra, 0, sizeof(extra));
    extra.type = UPDATE_ADD_DIFF;
    extra.pad_bytes = 0;
512
    for (u_int32_t i = 0; i < op_args->update_txn_size; i++) {
513
        rand_key = myrandom_r(arg->random_data);
514 515
        if (arg->bounded_element_range) {
            rand_key = rand_key % (arg->num_elements/2);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
516
        }
517
        rand_key2 = arg->num_elements - rand_key;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
518 519 520 521 522 523 524 525 526 527
        assert(rand_key != rand_key2);
        extra.u.d.diff = 1;
        curr_val_sum += extra.u.d.diff;
        r = db->update(
            db,
            txn,
            dbt_init(&key, &rand_key, sizeof rand_key),
            dbt_init(&val, &extra, sizeof extra),
            0
            );
528
        if (r != 0) {
529 530
            return r;
        }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
531 532 533 534 535 536 537 538
        extra.u.d.diff = -1;
        r = db->update(
            db,
            txn,
            dbt_init(&key, &rand_key2, sizeof rand_key),
            dbt_init(&val, &extra, sizeof extra),
            0
            );
539
        if (r != 0) {
540 541
            return r;
        }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
542 543 544 545
    }
    return r;
}

546
static int UU()update_op(DB_TXN *txn, ARG arg, void* operation_extra) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
547
    int r;
548
    int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
549
    DB* db = arg->dbp[db_index];
Zardosht Kasheff's avatar
Zardosht Kasheff committed
550 551 552 553
    int curr_val_sum = 0;
    DBT key, val;
    int rand_key;
    update_count++;
554
    struct update_op_args* op_args = operation_extra;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
555 556 557 558
    struct update_op_extra extra;
    memset(&extra, 0, sizeof(extra));
    extra.type = UPDATE_ADD_DIFF;
    extra.pad_bytes = 0;
559 560
    if (op_args->update_pad_frequency) {
        if (update_count % (2*op_args->update_pad_frequency) == update_count%op_args->update_pad_frequency) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
561 562 563
            extra.pad_bytes = 100;
        }
    }
564
    for (u_int32_t i = 0; i < op_args->update_txn_size; i++) {
565
        rand_key = myrandom_r(arg->random_data);
566 567
        if (arg->bounded_element_range) {
            rand_key = rand_key % arg->num_elements;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
568
        }
569
        extra.u.d.diff = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
570 571 572 573 574 575 576 577 578 579 580 581
        // just make every other value random
        if (i%2 == 0) {
            extra.u.d.diff = -extra.u.d.diff;
        }
        curr_val_sum += extra.u.d.diff;
        r = db->update(
            db,
            txn,
            dbt_init(&key, &rand_key, sizeof rand_key),
            dbt_init(&val, &extra, sizeof extra),
            0
            );
582
        if (r != 0) {
583 584
            return r;
        }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
585 586 587 588 589
    }
    //
    // now put in one more to ensure that the sum stays 0
    //
    extra.u.d.diff = -curr_val_sum;
590
    rand_key = myrandom_r(arg->random_data);
591 592
    if (arg->bounded_element_range) {
        rand_key = rand_key % arg->num_elements;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
593 594 595 596 597 598 599 600
    }
    r = db->update(
        db,
        txn,
        dbt_init(&key, &rand_key, sizeof rand_key),
        dbt_init(&val, &extra, sizeof extra),
        0
        );
601
    if (r != 0) {
602 603
        return r;
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
604 605 606 607

    return r;
}

608 609 610 611
static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_extra) {
    struct update_op_args* op_args = operation_extra;
    assert(arg->bounded_element_range);
    assert(op_args->update_history_buffer);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
612
    int r;
613
    int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
614
    DB* db = arg->dbp[db_index];
Zardosht Kasheff's avatar
Zardosht Kasheff committed
615 616 617 618 619 620 621 622
    int curr_val_sum = 0;
    DBT key, val;
    int rand_key;
    struct update_op_extra extra;
    memset(&extra, 0, sizeof(extra));
    extra.type = UPDATE_WITH_HISTORY;
    update_count++;
    extra.pad_bytes = 0;
623 624
    if (op_args->update_pad_frequency) {
        if (update_count % (2*op_args->update_pad_frequency) != update_count%op_args->update_pad_frequency) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
625 626 627 628
            extra.pad_bytes = 500;
        }
        
    }
629
    for (u_int32_t i = 0; i < op_args->update_txn_size; i++) {
630 631
        rand_key = myrandom_r(arg->random_data) % arg->num_elements;
        extra.u.h.new = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
632 633 634 635 636
        // just make every other value random
        if (i%2 == 0) {
            extra.u.h.new = -extra.u.h.new;
        }
        curr_val_sum += extra.u.h.new;
637 638
        extra.u.h.expected = op_args->update_history_buffer[rand_key];
        op_args->update_history_buffer[rand_key] = extra.u.h.new;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
639 640 641 642 643 644 645
        r = db->update(
            db,
            txn,
            dbt_init(&key, &rand_key, sizeof rand_key),
            dbt_init(&val, &extra, sizeof extra),
            0
            );
646
        if (r != 0) {
647 648
            return r;
        }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
649 650 651 652 653
    }
    //
    // now put in one more to ensure that the sum stays 0
    //
    extra.u.h.new = -curr_val_sum;
654
    rand_key = myrandom_r(arg->random_data);
655 656
    if (arg->bounded_element_range) {
        rand_key = rand_key % arg->num_elements;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
657
    }
658 659
    extra.u.h.expected = op_args->update_history_buffer[rand_key];
    op_args->update_history_buffer[rand_key] = extra.u.h.new;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
660 661 662 663 664 665 666
    r = db->update(
        db,
        txn,
        dbt_init(&key, &rand_key, sizeof rand_key),
        dbt_init(&val, &extra, sizeof extra),
        0
        );
667
    if (r != 0) {
668 669
        return r;
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
670 671 672 673

    return r;
}

674
static int UU() update_broadcast_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
675 676
    struct update_op_extra extra;
    memset(&extra, 0, sizeof(extra));
677
    int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
678
    DB* db = arg->dbp[db_index];
Zardosht Kasheff's avatar
Zardosht Kasheff committed
679 680 681 682 683 684 685 686
    extra.type = UPDATE_NEGATE;
    extra.pad_bytes = 0;
    DBT val;
    int r = db->update_broadcast(db, txn, dbt_init(&val, &extra, sizeof extra), 0);
    CKERR(r);
    return r;
}

687
static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra)) {
Leif Walsh's avatar
Leif Walsh committed
688
    int r;
689 690 691 692 693 694
    for (int i = 0; i < arg->num_DBs; i++) {
        DB* db = arg->dbp[i];
        r = db->hot_optimize(db, NULL, NULL);
        CKERR(r);
    }
    return 0;
Leif Walsh's avatar
Leif Walsh committed
695 696
}

697
static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra)) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
698
    int r;
699
    int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
700 701
    DB* db = arg->dbp[db_index];
    r = (db)->close(db, 0); CKERR(r);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
702
    
703 704 705 706 707
    char name[30];
    memset(name, 0, sizeof(name));
    snprintf(name, sizeof(name), "main%d", db_index);

    r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0);  
Zardosht Kasheff's avatar
Zardosht Kasheff committed
708 709
    CKERR(r);
    
710
    r = db_create(&(arg->dbp[db_index]), arg->env, 0);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
711
    assert(r == 0);
712
    r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
713 714 715 716
    assert(r == 0);
    return 0;
}

717
static int UU() truncate_me(DB_TXN *txn, ARG UU(arg), void* UU(operation_extra)) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
718
    int r;
719 720 721 722 723
    for ( int i = 0; i < arg->num_DBs; i++) {
        u_int32_t row_count = 0;
        r = (*arg->dbp)->truncate(*arg->dbp, txn, &row_count, 0);
        assert(r == 0);
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
724 725 726 727 728
    return 0;
}



Zardosht Kasheff's avatar
Zardosht Kasheff committed
729 730 731
struct test_time_extra {
    int num_seconds;
    bool crash_at_end;
732 733 734 735 736
    struct worker_extra *wes;
    int num_wes;
    bool print_performance;
    bool print_thread_performance;
    int performance_period;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
737 738 739 740 741
};

static void *test_time(void *arg) {
    struct test_time_extra* tte = arg;
    int num_seconds = tte->num_seconds;
742

Zardosht Kasheff's avatar
Zardosht Kasheff committed
743 744 745
    //
    // if num_Seconds is set to 0, run indefinitely
    //
746 747 748 749 750 751
    if (num_seconds == 0) {
        num_seconds = INT32_MAX;
    }
    if (verbose) {
        printf("Sleeping for %d seconds\n", num_seconds);
    }
752
    int64_t num_operations_completed_total[tte->num_wes];
753 754 755 756 757 758 759 760 761 762 763 764
    memset(num_operations_completed_total, 0, sizeof num_operations_completed_total);
    for (int i = 0; i < num_seconds; i += tte->performance_period) {
        usleep(tte->performance_period*1000*1000);
        int total_operations_in_period = 0;
        for (int we = 0; we < tte->num_wes; ++we) {
            int last = num_operations_completed_total[we];
            int current = __sync_fetch_and_add(&tte->wes[we].num_operations_completed, 0);
            if (tte->print_thread_performance) {
                printf("Thread %d Iteration %d Operations %d\n", we, i, current - last);
            }
            total_operations_in_period += (current - last);
            num_operations_completed_total[we] = current;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
765
        }
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790
        if (tte->print_performance) {
            printf("Iteration %d Total_Operations %d\n", i, total_operations_in_period);
        }
    }
    int total_operations_in_test = 0;
    for (int we = 0; we < tte->num_wes; ++we) {
        int current = __sync_fetch_and_add(&tte->wes[we].num_operations_completed, 0);
        if (tte->print_thread_performance) {
        printf("TOTAL Thread %d Operations %d\n", we, current);
        }
        total_operations_in_test += current;
    }
    if (tte->print_performance) {
        printf("Total_Operations %d\n", total_operations_in_test);
    }

    if (verbose) {
        printf("should now end test\n");
    }
    __sync_bool_compare_and_swap(&run_test, true, false); // make this atomic to make valgrind --tool=drd happy.
    if (verbose) {
        printf("run_test %d\n", run_test);
    }
    if (tte->crash_at_end) {
        toku_hard_crash_on_purpose();
Zardosht Kasheff's avatar
Zardosht Kasheff committed
791 792 793 794
    }
    return arg;
}

795 796 797 798 799 800 801 802
static int run_workers(
    struct arg *thread_args, 
    int num_threads, 
    u_int32_t num_seconds, 
    bool crash_at_end,
    struct cli_args* cli_args
    ) 
{
Zardosht Kasheff's avatar
Zardosht Kasheff committed
803 804 805 806 807 808 809
    int r;
    toku_pthread_mutex_t mutex;
    toku_pthread_mutex_init(&mutex, NULL);
    struct rwlock rwlock;
    rwlock_init(&rwlock);
    toku_pthread_t tids[num_threads];
    toku_pthread_t time_tid;
810 811
    struct worker_extra *worker_extra = (struct worker_extra *)
        memalign(64, num_threads * sizeof (struct worker_extra)); // allocate worker_extra's on cache line boundaries
Zardosht Kasheff's avatar
Zardosht Kasheff committed
812 813 814
    struct test_time_extra tte;
    tte.num_seconds = num_seconds;
    tte.crash_at_end = crash_at_end;
815 816 817 818 819
    tte.wes = worker_extra;
    tte.num_wes = num_threads;
    tte.print_performance = cli_args->print_performance;
    tte.print_thread_performance = cli_args->print_thread_performance;
    tte.performance_period = cli_args->performance_period;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
820 821
    run_test = true;
    for (int i = 0; i < num_threads; ++i) {
822 823 824 825 826
        worker_extra[i].thread_arg = &thread_args[i];
        worker_extra[i].operation_lock = &rwlock;
        worker_extra[i].operation_lock_mutex = &mutex;
        worker_extra[i].num_operations_completed = 0;
        CHK(toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]));
827 828
        if (verbose) 
            printf("%lu created\n", tids[i]);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
829 830
    }
    CHK(toku_pthread_create(&time_tid, NULL, test_time, &tte));
831 832
    if (verbose) 
        printf("%lu created\n", time_tid);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
833 834 835

    void *ret;
    r = toku_pthread_join(time_tid, &ret); assert_zero(r);
836
    if (verbose) printf("%lu joined\n", time_tid);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
837 838
    for (int i = 0; i < num_threads; ++i) {
        r = toku_pthread_join(tids[i], &ret); assert_zero(r);
839 840
        if (verbose) 
            printf("%lu joined\n", tids[i]);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
841
    }
842 843
    if (verbose) 
        printf("ending test, pthreads have joined\n");
844
    rwlock_destroy(&rwlock);
845
    toku_pthread_mutex_destroy(&mutex);
846
    toku_free(worker_extra);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
847 848 849 850
    return r;
}


851
static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
852
                        int (*bt_compare)(DB *, const DBT *, const DBT *),
853 854
                        struct env_args env_args
) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
855 856
    int r;

857
    char rmcmd[32 + strlen(env_args.envdir)]; sprintf(rmcmd, "rm -rf %s", env_args.envdir);
858
    r = system(rmcmd);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
859
    CKERR(r);
860
    r = toku_os_mkdir(env_args.envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
861 862 863

    DB_ENV *env;
    r = db_env_create(&env, 0); assert(r == 0);
864
    r = env->set_redzone(env, 0); CKERR(r);
865
    r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
866
    r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
867
    r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r);
868 869 870 871 872
    r = env->open(env, env_args.envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
    r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
    r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
    r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
    *env_res = env;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
873 874


875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891
    for (int i = 0; i < num_DBs; i++) {
        DB *db;
        char name[30];
        memset(name, 0, sizeof(name));
        snprintf(name, sizeof(name), "main%d", i);
        r = db_create(&db, env, 0);
        CKERR(r);
        r = db->set_flags(db, 0);
        CKERR(r);
        r = db->set_pagesize(db, env_args.node_size);
        CKERR(r);
        r = db->set_readpagesize(db, env_args.basement_node_size);
        CKERR(r);
        r = db->open(db, null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666);
        CKERR(r);
        db_res[i] = db;
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
892 893 894
    return r;
}

895
static int fill_table_from_fun(DB *db, int num_elements, int key_bufsz, int val_bufsz,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
896 897 898 899 900 901
                               void (*callback)(int idx, void *extra,
                                                void *key, int *keysz,
                                                void *val, int *valsz),
                               void *extra) {
    int r = 0;
    for (long i = 0; i < num_elements; ++i) {
902 903 904
        char keybuf[key_bufsz], valbuf[val_bufsz];
        memset(keybuf, 0, sizeof(keybuf));
        memset(valbuf, 0, sizeof(valbuf));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
905 906
        int keysz, valsz;
        callback(i, extra, keybuf, &keysz, valbuf, &valsz);
907 908 909
        // let's make sure the data stored fits in the buffers we passed in
        assert(keysz <= key_bufsz);
        assert(valsz <= val_bufsz);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
910
        DBT key, val;
911 912 913 914 915 916 917 918 919 920 921
        // make size of data what is specified w/input parameters
        // note that key and val have sizes of
        // key_bufsz and val_bufsz, which were passed into this
        // function, not what was stored by the callback
        r = db->put(
            db, 
            null_txn, 
            dbt_init(&key, keybuf, key_bufsz),
            dbt_init(&val, valbuf, val_bufsz), 
            0
            );
Zardosht Kasheff's avatar
Zardosht Kasheff committed
922 923 924 925 926 927 928 929 930 931 932 933 934
        assert(r == 0);
    }
    return r;
}

static void zero_element_callback(int idx, void *UU(extra), void *keyv, int *keysz, void *valv, int *valsz) {
    int *key = keyv, *val = valv;
    *key = idx;
    *val = 0;
    *keysz = sizeof(int);
    *valsz = sizeof(int);
}

935 936 937 938 939 940 941 942 943 944 945 946 947
static int fill_tables_with_zeroes(DB **dbs, int num_DBs, int num_elements, u_int32_t key_size, u_int32_t val_size) {
    for (int i = 0; i < num_DBs; i++) {
        assert(key_size >= sizeof(int));
        assert(val_size >= sizeof(int));
        int r = fill_table_from_fun(
            dbs[i], 
            num_elements, 
            key_size, 
            val_size, 
            zero_element_callback, 
            NULL
            );
        CKERR(r);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
948
    }
949
    return 0;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
950 951
}

952
static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
953
                      int (*bt_compare)(DB *, const DBT *, const DBT *),
954
                      struct env_args env_args) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
955 956 957 958 959
    int r;

    /* create the dup database file */
    DB_ENV *env;
    r = db_env_create(&env, 0); assert(r == 0);
960
    r = env->set_redzone(env, 0); CKERR(r);
961
    r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
962
    env->set_update(env, env_args.update_function);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
963
    // set the cache size to 10MB
964
    r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
965
    r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r);
966 967 968 969
    r = env->open(env, env_args.envdir, DB_RECOVER|DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
    r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
    r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
    r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
970
    *env_res = env;
971 972 973 974 975 976 977 978 979 980 981 982 983

    
    for (int i = 0; i < num_DBs; i++) {
        DB *db;
        char name[30];
        memset(name, 0, sizeof(name));
        snprintf(name, sizeof(name), "main%d", i);
        r = db_create(&db, env, 0);
        CKERR(r);
        r = db->open(db, null_txn, name, NULL, DB_BTREE, 0, 0666);
        CKERR(r);
        db_res[i] = db;
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
984 985 986
    return r;
}

987
static int close_tables(DB_ENV *env, DB**  dbs, int num_DBs) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
988
    int r;
989 990 991
    for (int i = 0; i < num_DBs; i++) {
        r = dbs[i]->close(dbs[i], 0); CKERR(r);
    }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
992 993 994 995
    r = env->close(env, 0); CKERR(r);
    return r;
}

996
static const struct env_args DEFAULT_ENV_ARGS = {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
997 998 999
    .node_size = 4096,
    .basement_node_size = 1024,
    .checkpointing_period = 10,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1000 1001
    .cleaner_period = 1,
    .cleaner_iterations = 1,
1002
    .cachetable_size = 300000,
1003
    .envdir = ENVDIR,
1004
    .update_function = update_op_callback,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1005 1006
};

1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017
static const struct env_args DEFAULT_PERF_ENV_ARGS = {
    .node_size = 4*1024*1024,
    .basement_node_size = 128*1024,
    .checkpointing_period = 60,
    .cleaner_period = 1,
    .cleaner_iterations = 5,
    .cachetable_size = 1<<30,
    .envdir = ENVDIR,
    .update_function = NULL,
};

1018 1019
#define MIN_VAL_SIZE sizeof(int)
#define MIN_KEY_SIZE sizeof(int)
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1020
static struct cli_args UU() get_default_args(void) {
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
    struct cli_args DEFAULT_ARGS = {
        .num_elements = 150000,
        .num_DBs = 1,
        .time_of_test = 180,
        .only_create = false,
        .only_stress = false,
        .update_broadcast_period_ms = 2000,
        .num_ptquery_threads = 1,
        .do_test_and_crash = false,
        .do_recover = false,
        .num_update_threads = 1,
        .crash_on_update_failure = true,
        .print_performance = false,
        .print_thread_performance = false,
        .performance_period = 1,
        .update_txn_size = 1000,
        .key_size = MIN_KEY_SIZE,
        .val_size = MIN_VAL_SIZE,
        .env_args = DEFAULT_ENV_ARGS,
1040
        .single_txn = false,
1041 1042 1043 1044
        };
    return DEFAULT_ARGS;
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
1045
static struct cli_args UU() get_default_args_for_perf(void) {
1046 1047 1048 1049 1050 1051 1052
    struct cli_args args = get_default_args();
    args.num_elements = 1000000; //default of 1M
    args.print_performance = true;
    args.env_args = DEFAULT_PERF_ENV_ARGS;
    return args;
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
1053
static inline void parse_stress_test_args (int argc, char *const argv[], struct cli_args *args) {
1054
    struct cli_args default_args = *args;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1055 1056 1057
    const char *argv0=argv[0];
    while (argc>1) {
        int resultcode=0;
1058
        if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--verbose") == 0) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1059 1060 1061 1062 1063 1064 1065 1066 1067
            verbose++;
        } 
        else if (strcmp(argv[1], "-q")==0) {
            verbose=0;
        } 
        else if (strcmp(argv[1], "-h")==0) {
        do_usage:
            fprintf(stderr, "Usage:\n%s [-h|-v|-q] [OPTIONS] [--only_create|--only_stress]\n", argv0);
            fprintf(stderr, "OPTIONS are among:\n");
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083
            fprintf(stderr, "\t--num_elements                  INT (default %d)\n", default_args.num_elements);
            fprintf(stderr, "\t--num_DBs                       INT (default %d)\n", default_args.num_DBs);
            fprintf(stderr, "\t--num_seconds                   INT (default %ds)\n", default_args.time_of_test);
            fprintf(stderr, "\t--node_size                     INT (default %d bytes)\n", default_args.env_args.node_size);
            fprintf(stderr, "\t--basement_node_size            INT (default %d bytes)\n", default_args.env_args.basement_node_size);
            fprintf(stderr, "\t--cachetable_size               INT (default %ld bytes)\n", default_args.env_args.cachetable_size);
            fprintf(stderr, "\t--checkpointing_period          INT (default %ds)\n",      default_args.env_args.checkpointing_period);
            fprintf(stderr, "\t--cleaner_period                INT (default %ds)\n",      default_args.env_args.cleaner_period);
            fprintf(stderr, "\t--cleaner_iterations            INT (default %ds)\n",      default_args.env_args.cleaner_iterations);
            fprintf(stderr, "\t--update_broadcast_period       INT (default %dms)\n",     default_args.update_broadcast_period_ms);
            fprintf(stderr, "\t--num_ptquery_threads           INT (default %d threads)\n", default_args.num_ptquery_threads);
            fprintf(stderr, "\t--num_update_threads            INT (default %d threads)\n", default_args.num_update_threads);
            fprintf(stderr, "\t--update_txn_size               INT (default %d rows)\n", default_args.update_txn_size);
            fprintf(stderr, "\t--key_size                      INT (default %d, minimum %ld)\n", default_args.key_size, MIN_KEY_SIZE);
            fprintf(stderr, "\t--val_size                      INT (default %d, minimum %ld)\n", default_args.val_size, MIN_VAL_SIZE);
            fprintf(stderr, "\t--[no-]crash_on_update_failure  BOOL (default %s)\n", default_args.crash_on_update_failure ? "yes" : "no");
1084 1085
            fprintf(stderr, "\t--print_performance             \n");
            fprintf(stderr, "\t--print_thread_performance      \n");
1086
            fprintf(stderr, "\t--performance_period            INT (default %d)\n", default_args.performance_period);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1087 1088 1089
            exit(resultcode);
        }
        else if (strcmp(argv[1], "--num_elements") == 0) {
1090
            argc--; argv++;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1091 1092
            args->num_elements = atoi(argv[1]);
        }
1093 1094 1095 1096
        else if (strcmp(argv[1], "--num_DBs") == 0) {
            argc--; argv++;
            args->num_DBs = atoi(argv[1]);
        }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1097
        else if (strcmp(argv[1], "--num_seconds") == 0) {
1098
            argc--; argv++;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1099 1100 1101
            args->time_of_test = atoi(argv[1]);
        }
        else if (strcmp(argv[1], "--node_size") == 0) {
1102
            argc--; argv++;
1103
            args->env_args.node_size = atoi(argv[1]);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1104 1105
        }
        else if (strcmp(argv[1], "--basement_node_size") == 0) {
1106
            argc--; argv++;
1107
            args->env_args.basement_node_size = atoi(argv[1]);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1108 1109
        }
        else if (strcmp(argv[1], "--cachetable_size") == 0) {
1110
            argc--; argv++;
1111
            args->env_args.cachetable_size = strtoll(argv[1], NULL, 0);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1112 1113
        }
        else if (strcmp(argv[1], "--checkpointing_period") == 0) {
1114
            argc--; argv++;
1115
            args->env_args.checkpointing_period = atoi(argv[1]);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1116
        }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1117 1118
        else if (strcmp(argv[1], "--cleaner_period") == 0) {
            argc--; argv++;
1119
            args->env_args.cleaner_period = atoi(argv[1]);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1120 1121 1122
        }
        else if (strcmp(argv[1], "--cleaner_iterations") == 0) {
            argc--; argv++;
1123
            args->env_args.cleaner_iterations = atoi(argv[1]);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1124
        }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1125
        else if (strcmp(argv[1], "--update_broadcast_period") == 0) {
1126
            argc--; argv++;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1127 1128
            args->update_broadcast_period_ms = atoi(argv[1]);
        }
1129
        else if (strcmp(argv[1], "--num_ptquery_threads") == 0 || strcmp(argv[1], "--num_threads") == 0) {
1130
            argc--; argv++;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1131 1132
            args->num_ptquery_threads = atoi(argv[1]);
        }
1133 1134 1135 1136 1137 1138 1139 1140 1141 1142
        else if (strcmp(argv[1], "--num_update_threads") == 0) {
            argc--; argv++;
            args->num_update_threads = atoi(argv[1]);
        }
        else if (strcmp(argv[1], "--crash_on_update_failure") == 0) {
            args->crash_on_update_failure = true;
        }
        else if (strcmp(argv[1], "--no-crash_on_update_failure") == 0) {
            args->crash_on_update_failure = false;
        }
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152
        else if (strcmp(argv[1], "--print_performance") == 0) {
            args->print_performance = true;
        }
        else if (strcmp(argv[1], "--print_thread_performance") == 0) {
            args->print_thread_performance = true;
        }
        else if (strcmp(argv[1], "--performance_period") == 0) {
            argc--; argv++;
            args->performance_period = atoi(argv[1]);
        }
1153 1154 1155 1156
        else if (strcmp(argv[1], "--update_txn_size") == 0) {
            argc--; argv++;
            args->update_txn_size = atoi(argv[1]);
        }
1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
        else if (strcmp(argv[1], "--key_size") == 0) {
            argc--; argv++;
            args->key_size = atoi(argv[1]);
            assert(args->key_size >= MIN_KEY_SIZE);
        }
        else if (strcmp(argv[1], "--val_size") == 0) {
            argc--; argv++;
            args->val_size = atoi(argv[1]);
            assert(args->val_size >= MIN_VAL_SIZE);
        }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178
        else if (strcmp(argv[1], "--only_create") == 0) {
            args->only_create = true;
        }
        else if (strcmp(argv[1], "--only_stress") == 0) {
            args->only_stress = true;
        }
        else if (strcmp(argv[1], "--test") == 0) {
            args->do_test_and_crash = true;
        }
        else if (strcmp(argv[1], "--recover") == 0) {
            args->do_recover = true;
        }
1179 1180
        else if (strcmp(argv[1], "--envdir") == 0 && argc > 1) {
            argc--; argv++;
1181
            args->env_args.envdir = argv[1];
1182
        }
1183 1184 1185
        else if (strcmp(argv[1], "--single_txn") == 0) {
            args->single_txn = true;
        }
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
        else {
            resultcode=1;
            goto do_usage;
        }
        argc--;
        argv++;
    }
    if (args->only_create && args->only_stress) {
        goto do_usage;
    }
}

static void
stress_table(DB_ENV *, DB **, struct cli_args *);

1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
static int
stress_int_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
  assert(db && a && b);
  assert(a->size >= sizeof(int));
  assert(b->size >= sizeof(int));

  int x = *(int *) a->data;
  int y = *(int *) b->data;

    if (x<y) return -1;
    if (x>y) return 1;
    return 0;
}


Zardosht Kasheff's avatar
Zardosht Kasheff committed
1216 1217 1218 1219
static void
stress_test_main(struct cli_args *args)
{
    DB_ENV* env = NULL;
1220 1221
    DB* dbs[args->num_DBs];
    memset(dbs, 0, sizeof(dbs));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1222
    if (!args->only_stress) {
1223
        create_tables(
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1224
            &env,
1225 1226 1227 1228 1229 1230 1231
            dbs,
            args->num_DBs,
            stress_int_dbt_cmp,
            args->env_args
            );
        CHK(fill_tables_with_zeroes(dbs, args->num_DBs, args->num_elements, args->key_size, args->val_size));
        CHK(close_tables(env, dbs, args->num_DBs));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1232 1233
    }
    if (!args->only_create) {
1234 1235 1236 1237 1238 1239 1240
        CHK(open_tables(&env,
                       dbs,
                       args->num_DBs,
                       stress_int_dbt_cmp,
                       args->env_args));
        stress_table(env, dbs, args);
        CHK(close_tables(env, dbs, args->num_DBs));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1241 1242 1243 1244 1245 1246
    }
}

static void
UU() stress_recover(struct cli_args *args) {
    DB_ENV* env = NULL;
1247 1248 1249 1250 1251 1252 1253
    DB* dbs[args->num_DBs];
    memset(dbs, 0, sizeof(dbs));
    CHK(open_tables(&env,
                   dbs,
                   args->num_DBs,
                   stress_int_dbt_cmp,
                   args->env_args));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1254 1255 1256

    DB_TXN* txn = NULL;    
    struct arg recover_args;
1257
    arg_init(&recover_args, args->num_elements, dbs, env, args);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1258 1259
    int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
    CKERR(r);
1260 1261 1262 1263 1264
    struct scan_op_extra soe;
    soe.fast = TRUE;
    soe.fwd = TRUE;
    soe.prefetch = FALSE;
    r = scan_op(txn, &recover_args, &soe);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1265 1266
    CKERR(r);
    CHK(txn->commit(txn,0));
1267
    CHK(close_tables(env, dbs, args->num_DBs));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1268 1269 1270
}

#endif