/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007-2012 Tokutek Inc.  All rights reserved."
#ident "$Id: perf_insert_multiple.cc 50140 2012-11-19 19:18:30Z esmet $"

#include <db.h>
#include <portability/toku_atomic.h>

#include "test.h"
#include "threaded_stress_test_helpers.h"

//
// This test tries to emulate iibench at the ydb layer. There is one
// unique index with an auto-increment key, plus several non-unique
// secondary indexes with random keys.
//

struct iibench_op_extra {
    uint64_t autoincrement;
};

static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) {
    const int num_dbs = arg->cli->num_DBs;
    DB **dbs = arg->dbp;
    DB_ENV *env = arg->env;
    DBT mult_key_dbt[num_dbs];
    DBT mult_val_dbt[num_dbs];
    uint32_t mult_put_flags[num_dbs];
    memset(mult_key_dbt, 0, sizeof(mult_key_dbt));
    memset(mult_val_dbt, 0, sizeof(mult_val_dbt));

    // The first index is unique with serial autoincrement keys.
    // The rest are have keys generated with this thread's random data.
    for (int i = 0; i < num_dbs; i++) {
        mult_put_flags[i] = get_put_flags(arg->cli) | (i == 0 ? DB_NOOVERWRITE : 0);
        dbs[i]->app_private = i == 0 ? nullptr : arg->random_data;
    }

    int r = 0;
    uint8_t valbuf[arg->cli->val_size];
    ZERO_ARRAY(valbuf);

    uint64_t puts_to_increment = 0;
    for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
        fill_zeroed_array(valbuf, arg->cli->val_size, 
                arg->random_data, arg->cli->compressibility);
        struct iibench_op_extra *CAST_FROM_VOIDP(info, operation_extra);
        uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1);
        dbt_init(&mult_key_dbt[0], &pk, sizeof pk);
        dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf);
        r = env->put_multiple(
            env, 
            dbs[0], // source db.
            txn, 
            &mult_key_dbt[0], // source db key
            &mult_val_dbt[0], // source db value
            num_dbs, // total number of dbs
            dbs, // array of dbs
            mult_key_dbt, // array of keys
            mult_val_dbt, // array of values
            mult_put_flags // array of flags
            );
        if (r != 0) {
            goto cleanup;
        }
        puts_to_increment++;
        if (puts_to_increment == 100) {
            increment_counter(stats_extra, PUTS, puts_to_increment);
            puts_to_increment = 0;
        }
    }
cleanup:
    return r;
}

static void
stress_table(DB_ENV* env, DB** UU(dbp), struct cli_args *cli_args) {
    if (verbose) printf("starting creation of pthreads\n");
    const int num_threads = cli_args->num_put_threads;
    struct iibench_op_extra iib_extra = {
        .autoincrement = 0
    };
    struct arg myargs[num_threads];
    for (int i = 0; i < num_threads; i++) {
        arg_init(&myargs[i], dbp, env, cli_args);
        myargs[i].operation = iibench_put_op;
        myargs[i].operation_extra = &iib_extra;
    }
    // TODO: set generate row for put callback
    (void) env;

    const bool crash_at_end = false;
    run_workers(myargs, num_threads, cli_args->num_seconds, crash_at_end, cli_args);
}

static int iibench_generate_row_for_put(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *UU(src_key), const DBT *src_val) {
    invariant(src_db != dest_db);
    invariant(dest_db->app_private != nullptr);
    struct random_data *CAST_FROM_VOIDP(r_data, dest_db->app_private);
    uint64_t key = randu64(r_data);
    DBT key_dbt = { .data = &key, .size = sizeof(key) };
    *dest_key = key_dbt;
    *dest_val = *src_val;
    return 0;
}
int
test_main(int argc, char *const argv[]) {
    struct cli_args args = get_default_args_for_perf();
    args.num_elements = 0;  // want to start with empty DBs
    // In MySQL, iibench has one primary key and three secondaries and does 1k inserts per txn.
    args.num_DBs = 4;
    args.txn_size = 1000;
    args.key_size = 8;
    args.val_size = 8;
    parse_stress_test_args(argc, argv, &args);
    // when there are multiple threads, its valid for two of them to
    // generate the same key and one of them fail with DB_LOCK_NOTGRANTED
    if (args.num_put_threads > 1) {
        args.crash_on_operation_failure = false;
    }
    // TODO: Use a more complex, more expensive comparison function that
    // unpacks keys and walks a descriptor to satisfy the comparison.
    args.env_args.generate_put_callback = iibench_generate_row_for_put;
    stress_test_main_with_cmp(&args, stress_uint64_dbt_cmp);
    return 0;
}