// test that an update calls back into the update function

#include "test.h"

const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;

DB_ENV *env;

BOOL cmp_desc_is_four;
u_int32_t four_byte_desc = 0xffffffff;
u_int64_t eight_byte_desc = 0x12345678ffffffff;


static int generate_row_for_put(
    DB *UU(dest_db), 
    DB *UU(src_db), 
    DBT *dest_key, 
    DBT *dest_val, 
    const DBT *src_key, 
    const DBT *src_val
    ) 
{    
    dest_key->data = src_key->data;
    dest_key->size = src_key->size;
    dest_key->flags = 0;
    dest_val->data = src_val->data;
    dest_val->size = src_val->size;
    dest_val->flags = 0;
    return 0;
}
static void assert_cmp_desc_valid (DB* db) {
    if (cmp_desc_is_four) {
        assert(db->cmp_descriptor->dbt.size == sizeof(four_byte_desc));
    }
    else {
        assert(db->cmp_descriptor->dbt.size == sizeof(eight_byte_desc));
    }
    unsigned char* cmp_desc_data = db->cmp_descriptor->dbt.data;
    assert(cmp_desc_data[0] == 0xff);
    assert(cmp_desc_data[1] == 0xff);
    assert(cmp_desc_data[2] == 0xff);
    assert(cmp_desc_data[3] == 0xff);
}

static void assert_desc_four (DB* db) {
    assert(db->descriptor->dbt.size == sizeof(four_byte_desc));
    assert(*(u_int32_t *)(db->descriptor->dbt.data) == four_byte_desc);
}
static void assert_desc_eight (DB* db) {
    assert(db->descriptor->dbt.size == sizeof(eight_byte_desc));
    assert(*(u_int64_t *)(db->descriptor->dbt.data) == eight_byte_desc);
}

static int
desc_int64_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
    assert_cmp_desc_valid(db);
    assert(a);
    assert(b);

    assert(a->size == sizeof(int64_t));
    assert(b->size == sizeof(int64_t));

    int64_t x = *(int64_t *) a->data;
    int64_t y = *(int64_t *) b->data;

    if (x<y) return -1;
    if (x>y) return 1;
    return 0;
}

static void setup (void) {
    int r;
    CHK(system("rm -rf " ENVDIR));
    CHK(toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO));
    CHK(db_env_create(&env, 0));
    env->set_errfile(env, stderr);
    r = env->set_default_bt_compare(env, desc_int64_dbt_cmp); CKERR(r);
    //r = env->set_cachesize(env, 0, 500000, 1); CKERR(r);
    r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r);
    CHK(env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO));
}

static void cleanup (void) {
    CHK(env->close(env, 0));
}

static void do_inserts_and_queries(DB* db) {
    int r = 0;
    DB_TXN* write_txn = NULL;
    r = env->txn_begin(env, NULL, &write_txn, 0);
    CKERR(r);
    for (int i = 0; i < 2000; i++) {
        u_int64_t key_data = random();
        u_int64_t val_data = random();
        DBT key, val;
        dbt_init(&key, &key_data, sizeof(key_data));
        dbt_init(&val, &val_data, sizeof(val_data));
        CHK(db->put(db, write_txn, &key, &val, 0));
    }
    r = write_txn->commit(write_txn, 0);
    CKERR(r);
    for (int i = 0; i < 2; i++) {
        DB_TXN* read_txn = NULL;
        r = env->txn_begin(env, NULL, &read_txn, 0);
        CKERR(r);
        DBC* cursor = NULL;
        r = db->cursor(db, read_txn, &cursor, 0);
        CKERR(r);
        if (i == 0) { 
            r = cursor->c_pre_acquire_range_lock(
                cursor,
                db->dbt_neg_infty(),
                db->dbt_pos_infty()
                );
            CKERR(r);
        }
        while(r != DB_NOTFOUND) {
            DBT key, val;
            memset(&key, 0, sizeof(key));
            memset(&val, 0, sizeof(val));
            r = cursor->c_get(cursor, &key, &val, DB_NEXT);
            assert(r == 0 || r == DB_NOTFOUND);
        }
        r = cursor->c_close(cursor);
        CKERR(r);
        r = read_txn->commit(read_txn, 0);
        CKERR(r);
    }
}

static void run_test(void) {
    DB* db = NULL;
    int r;
    cmp_desc_is_four = TRUE;

    DBT orig_desc;
    memset(&orig_desc, 0, sizeof(orig_desc));
    orig_desc.size = sizeof(four_byte_desc);
    orig_desc.data = &four_byte_desc;

    DBT other_desc;
    memset(&other_desc, 0, sizeof(other_desc));
    other_desc.size = sizeof(eight_byte_desc);
    other_desc.data = &eight_byte_desc;

    DB_LOADER *loader = NULL;    
    DBT key, val;
    u_int64_t k = 0;
    u_int64_t v = 0;
    IN_TXN_COMMIT(env, NULL, txn_create, 0, {
            CHK(db_create(&db, env, 0));
            assert(db->descriptor == NULL);
            r = db->set_pagesize(db, 2048);
            CKERR(r);
            r = db->set_readpagesize(db, 1024);
            CKERR(r);
            CHK(db->open(db, txn_create, "foo.db", NULL, DB_BTREE, DB_CREATE, 0666));
            assert(db->descriptor->dbt.size == 0);
            assert(db->cmp_descriptor->dbt.size == 0);
            CHK(db->change_descriptor(db, txn_create, &orig_desc, DB_UPDATE_CMP_DESCRIPTOR));
            assert_desc_four(db);
            assert_cmp_desc_valid(db);
            r = env->create_loader(env, txn_create, &loader, db, 1, &db, NULL, NULL, 0); 
            CKERR(r);
	    dbt_init(&key, &k, sizeof k);
	    dbt_init(&val, &v, sizeof v);
            r = loader->put(loader, &key, &val); 
            CKERR(r);
            r = loader->close(loader);
            CKERR(r);
            assert_cmp_desc_valid(db);    
        });
    assert_cmp_desc_valid(db);    
    CKERR(r);
    do_inserts_and_queries(db);
    IN_TXN_COMMIT(env, NULL, txn_1, 0, {
        CHK(db->change_descriptor(db, txn_1, &other_desc, 0));
        assert_desc_eight(db);
        assert_cmp_desc_valid(db);
    });
    assert_desc_eight(db);
    assert_cmp_desc_valid(db);
    do_inserts_and_queries(db);

    IN_TXN_ABORT(env, NULL, txn_1, 0, {
        CHK(db->change_descriptor(db, txn_1, &orig_desc, 0));
        assert_desc_four(db);
        assert_cmp_desc_valid(db);
    });
    assert_desc_eight(db);
    assert_cmp_desc_valid(db);
    do_inserts_and_queries(db);
    
    CHK(db->close(db, 0));

    // verify that after close and reopen, cmp_descriptor is now
    // latest descriptor
    cmp_desc_is_four = FALSE;
    CHK(db_create(&db, env, 0));
    CHK(db->open(db, NULL, "foo.db", NULL, DB_BTREE, DB_AUTO_COMMIT, 0666));
    assert_desc_eight(db);
    assert_cmp_desc_valid(db);
    do_inserts_and_queries(db);
    CHK(db->close(db, 0));

    cmp_desc_is_four = TRUE;
    CHK(db_create(&db, env, 0));
    CHK(db->open(db, NULL, "foo.db", NULL, DB_BTREE, DB_AUTO_COMMIT, 0666));
    IN_TXN_COMMIT(env, NULL, txn_1, 0, {
        CHK(db->change_descriptor(db, txn_1, &orig_desc, DB_UPDATE_CMP_DESCRIPTOR));
        assert_desc_four(db);
        assert_cmp_desc_valid(db);
    });
    assert_desc_four(db);
    assert_cmp_desc_valid(db);
    do_inserts_and_queries(db);
    CHK(db->close(db, 0));
    
}

int test_main (int argc, char * const argv[]) {
    parse_args(argc, argv);
    setup();
    run_test();
    cleanup();
    return 0;
}