/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc.  All rights reserved."
#ident "$Id: test_stress1.c 39258 2012-01-27 13:51:58Z zardosht $"
#include "test.h"

#include <stdio.h>
#include <stdlib.h>

#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>

#include "threaded_stress_test_helpers.h"

u_int64_t num_basements_decompressed;
u_int64_t num_buffers_decompressed;
u_int64_t num_basements_fetched;
u_int64_t num_buffers_fetched;
u_int64_t num_pivots_fetched;

static void checkpoint_callback_1(void * extra) {
    DB_ENV* env = extra;
    u_int64_t old_num_basements_decompressed = num_basements_decompressed;
    u_int64_t old_num_buffers_decompressed = num_buffers_decompressed;
    u_int64_t old_num_basements_fetched = num_basements_fetched;
    u_int64_t old_num_buffers_fetched = num_buffers_fetched;
    u_int64_t old_num_pivots_fetched = num_pivots_fetched;

    num_basements_decompressed = 
        get_engine_status_val(env, "FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_DECOMPRESSED_WRITE");
        
    num_buffers_decompressed = 
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE");

    num_basements_fetched = 
        get_engine_status_val(env, "FT_NUM_BASEMENTS_FETCHED_NORMAL") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_FETCHED_PREFETCH") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_FETCHED_WRITE");

    num_buffers_fetched = 
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_FETCHED_NORMAL") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_FETCHED_PREFETCH") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_FETCHED_WRITE");

    num_pivots_fetched = 
        get_engine_status_val(env, "FT_NUM_PIVOTS_FETCHED_QUERY") +
        get_engine_status_val(env, "FT_NUM_PIVOTS_FETCHED_PREFETCH") +
        get_engine_status_val(env, "FT_NUM_PIVOTS_FETCHED_WRITE");
        
    printf("basements decompressed %"PRIu64" \n", num_basements_decompressed - old_num_basements_decompressed);
    printf("buffers   decompressed %"PRIu64" \n", num_buffers_decompressed- old_num_buffers_decompressed);
    printf("basements fetched      %"PRIu64" \n", num_basements_fetched - old_num_basements_fetched);
    printf("buffers fetched        %"PRIu64" \n", num_buffers_fetched - old_num_buffers_fetched);
    printf("pivots fetched         %"PRIu64" \n", num_pivots_fetched - old_num_pivots_fetched);
    printf("************************************************************\n");
}

static void checkpoint_callback_2(void * extra) {
    DB_ENV* env = extra;
    num_basements_decompressed = 
        get_engine_status_val(env, "FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_DECOMPRESSED_WRITE");
        
    num_buffers_decompressed = 
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE");
    
    num_basements_fetched = 
        get_engine_status_val(env, "FT_NUM_BASEMENTS_FETCHED_NORMAL") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_FETCHED_PREFETCH") +
        get_engine_status_val(env, "FT_NUM_BASEMENTS_FETCHED_WRITE");
    
    num_buffers_fetched = 
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_FETCHED_NORMAL") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_FETCHED_PREFETCH") +
        get_engine_status_val(env, "FT_NUM_MSG_BUFFER_FETCHED_WRITE");
    
    num_pivots_fetched = 
        get_engine_status_val(env, "FT_NUM_PIVOTS_FETCHED_QUERY") +
        get_engine_status_val(env, "FT_NUM_PIVOTS_FETCHED_PREFETCH") +
        get_engine_status_val(env, "FT_NUM_PIVOTS_FETCHED_WRITE");
}



//
// This test is a form of stress that does operations on a single dictionary:
// We create a dictionary bigger than the cachetable (around 4x greater).
// Then, we spawn a bunch of pthreads that do the following:
//  - scan dictionary forward with bulk fetch
//  - scan dictionary forward slowly
//  - scan dictionary backward with bulk fetch
//  - scan dictionary backward slowly
//  - Grow the dictionary with insertions
//  - do random point queries into the dictionary
// With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
// If the test runs to completion without crashing, we consider it a success. It also tests that snapshots
// work correctly by verifying that table scans sum their vals to 0.
//
// This does NOT test:
//  - splits and merges
//  - multiple DBs
//
// Variables that are interesting to tweak and run:
//  - small cachetable
//  - number of elements
//

static int checkpoint_var(DB_TXN *txn, ARG arg, void* operation_extra) {
    int db_index = random()%arg->num_DBs;
    int r = 0;
    int val_size = *(int *)operation_extra;
    DB* db = arg->dbp[db_index];
    char data[val_size];
    memset(data, 0, sizeof(data));
    for (int i = 0; i < 10; i++) {
        // do point queries
        ptquery_and_maybe_check_op(db, txn, arg, FALSE);        
    }
    for (int i = 0; i < 20; i++) {
        // do a random insertion
        int rand_key = random() % arg->num_elements;
        DBT key, val;
        r = db->put(
            db, 
            txn, 
            dbt_init(&key, &rand_key, sizeof(rand_key)), 
            dbt_init(&val, data, sizeof(data)), 
            0);
        if (r != 0) {
            goto cleanup;
        }
    }
cleanup:
    return r;
}


static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
    db_env_set_checkpoint_callback(checkpoint_callback_1, env);
    db_env_set_checkpoint_callback2(checkpoint_callback_2, env);
    int n = cli_args->num_elements;
    //
    // the threads that we want:
    //   - some threads constantly updating random values
    //   - one thread doing table scan with bulk fetch
    //   - one thread doing table scan without bulk fetch
    //   - some threads doing random point queries
    //

    if (verbose) printf("starting creation of pthreads\n");
    int val_size = cli_args->val_size;
    const int num_threads = cli_args->num_ptquery_threads;
    struct arg myargs[num_threads];
    for (int i = 0; i < num_threads; i++) {
        arg_init(&myargs[i], n, dbp, env, cli_args);
    }
    for (int i = 0; i < num_threads; i++) {
        myargs[i].operation = checkpoint_var;
        myargs[i].crash_on_operation_failure = false;
        myargs[i].operation_extra = &val_size;
    }
    run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
}

int
test_main(int argc, char *const argv[]) {
    struct cli_args args = get_default_args_for_perf();
    args.env_args.checkpointing_period = 30;
    args.num_DBs = 4;
    args.num_ptquery_threads = 4;
    parse_stress_test_args(argc, argv, &args);
    stress_test_main(&args);
    return 0;
}