/* -*- mode: C; c-basic-offset: 4 -*- */ #include <toku_portability.h> #ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." /* Like test_log6 except abort. * And abort some stuff, but not others (unlike test_log6_abort which aborts everything) */ #include <assert.h> #include <toku_portability.h> #include <db.h> #include <stdlib.h> #include <search.h> #include <sys/stat.h> #include <sys/types.h> #include <memory.h> #include "test.h" #ifndef DB_DELETE_ANY #define DB_DELETE_ANY 0 #endif // ENVDIR is defined in the Makefile // How many iterations are we going to do insertions and deletions. This is a bound to the number of distinct keys in the DB. #define N 1000 static int n_keys_mentioned=0; static int random_keys_mentioned[N]; static DB *pending_i, *pending_d, *committed; // Keep track of what's in the committed database separately struct pair {int x,y;}; static void insert_in_mem (int x, int y, int *count, struct pair *pairs) { assert(*count<N); pairs[(*count)++]=(struct pair){x,y}; } static void delete_in_mem (int x, int *count, struct pair *pairs) { int i; for (i=0; i<*count; i++) { if (pairs[i].x==x) { pairs[i]=pairs[--(*count)]; return; } } } static int com_count=0, pend_count=0, peni_count=0; static struct pair com_data[N], pend_data[N], peni_data[N]; static void insert_pending (int key, int val, DB_TXN *bookx) { DBT keyd,datad; //printf("IP %u,%u\n", key,val); insert_in_mem(key, val, &peni_count, peni_data); pending_i->put(pending_i, bookx, dbt_init(&keyd, &key, sizeof(key)), dbt_init(&datad, &val, sizeof(val)), 0); delete_in_mem(key, &pend_count, pend_data); pending_d->del(pending_d, bookx, dbt_init(&keyd, &key, sizeof(key)), 0); } static void put_a_random_item (DB *db, DB_TXN *tid, int i, DB_TXN *bookx) { char hello[30], there[30]; DBT key,data; int randv = myrandom(); random_keys_mentioned[n_keys_mentioned++] = randv; insert_pending(randv, i, bookx); //printf("Insert %u\n", randv); snprintf(hello, sizeof(hello), "hello%d.%d", randv, i); snprintf(there, sizeof(hello), "there%d", i); memset(&key, 0, sizeof(key)); memset(&data, 0, sizeof(data)); key.data = hello; key.size=strlen(hello)+1; data.data = there; data.size=strlen(there)+1; int r=db->put(db, tid, &key, &data, 0); if (r!=0) printf("%s:%d i=%d r=%d (%s)\n", __FILE__, __LINE__, i, r, strerror(r)); assert(r==0); } static void delete_a_random_item (DB *db, DB_TXN *tid, DB_TXN *bookx) { if (n_keys_mentioned==0) return; int ridx = myrandom()%n_keys_mentioned; int randv = random_keys_mentioned[ridx]; DBT keyd; DBT vald; //printf("Delete %u\n", randv); dbt_init(&keyd, &randv, sizeof(randv)); dbt_init(&vald, &randv, sizeof(randv)); pending_i->del(pending_i, bookx, &keyd, 0); delete_in_mem(randv, &peni_count, peni_data); pending_d->put(pending_d, bookx, &keyd, &vald, 0); insert_in_mem(randv, randv, &pend_count, pend_data); db->del(db, tid, &keyd, DB_DELETE_ANY); } static void commit_items (DB_ENV *env, int UU(i)) { //printf("commit_items %d\n", i); DB_TXN *txn; int r=env->txn_begin(env, 0, &txn, 0); assert(r==0); DBC *cursor; r = pending_i->cursor(pending_i, txn, &cursor, 0); assert(r==0); DBT k,v; memset(&k,0,sizeof(k)); memset(&v,0,sizeof(v)); //printf("%d items in peni\n", peni_count); while (cursor->c_get(cursor, &k, &v, DB_FIRST)==0) { assert(k.size==4); assert(v.size==4); int ki=*(int*)k.data; int vi=*(int*)v.data; //printf(" put %u %u\n", ki, vi); r=committed->put(committed, txn, dbt_init(&k, &ki, sizeof(ki)), dbt_init(&v, &vi, sizeof(vi)), 0); insert_in_mem(ki, vi, &com_count, com_data); assert(r==0); r=pending_i->del(pending_i, txn, &k, 0); assert(r==0); } r=cursor->c_close(cursor); assert(r==0); r = pending_d->cursor(pending_d, txn, &cursor, 0); assert(r==0); memset(&k,0,sizeof(k)); memset(&v,0,sizeof(v)); while (cursor->c_get(cursor, &k, &v, DB_FIRST)==0) { assert(k.size==4); assert(v.size==4); int ki=*(int*)k.data; int vi=*(int*)v.data; assert(ki==vi); //printf(" del %u\n", ki); committed->del(committed, txn, dbt_init(&k, &ki, sizeof(ki)), 0); delete_in_mem(ki, &com_count, com_data); // ignore result from that del r=pending_d->del(pending_d, txn, &k, 0); assert(r==0); } r=cursor->c_close(cursor); assert(r==0); r=txn->commit(txn, 0); assert(r==0); } static void abort_items (DB_ENV *env) { DB_TXN *txn; int r=env->txn_begin(env, 0, &txn, 0); assert(r==0); //printf("abort_items\n"); DBC *cursor; r = pending_i->cursor(pending_i, txn, &cursor, 0); assert(r==0); DBT k,v; memset(&k,0,sizeof(k)); memset(&v,0,sizeof(v)); while (cursor->c_get(cursor, &k, &v, DB_FIRST)==0) { assert(k.size==4); assert(v.size==4); int ki=*(int*)k.data; //printf("Deleting %u\n", ki); r=pending_i->del(pending_i, txn, dbt_init(&k, &ki, sizeof(ki)), 0); assert(r==0); } r=cursor->c_close(cursor); assert(r==0); r = pending_d->cursor(pending_d, txn, &cursor, 0); assert(r==0); memset(&k,0,sizeof(k)); memset(&v,0,sizeof(v)); while (cursor->c_get(cursor, &k, &v, DB_FIRST)==0) { assert(k.size==4); assert(v.size==4); int ki=*(int*)k.data; r=pending_d->del(pending_d, txn, dbt_init(&k, &ki, sizeof(ki)), 0); assert(r==0); } r=cursor->c_close(cursor); assert(r==0); r=txn->commit(txn, 0); assert(r==0); } static int compare_pairs (const void *a, const void *b) { return memcmp(a,b,4); } static void verify_items (DB_ENV *env, DB *db) { DB_TXN *txn; int r=env->txn_begin(env, 0, &txn, 0); assert(r==0); DBC *cursor; DBT k,v; memset(&k,0,sizeof(k)); memset(&v,0,sizeof(v)); #if 0 r=db->cursor(db, txn, &cursor, 0); assert(r==0); while (cursor->c_get(cursor, &k, &v, DB_NEXT)==0) { } r=cursor->c_close(cursor); assert(r==0); #endif r = committed->cursor(committed, txn, &cursor, 0); assert(r==0); qsort(com_data, com_count, sizeof(com_data[0]), compare_pairs); int curscount=0; //printf(" count=%d\n", com_count); while (cursor->c_get(cursor, &k, &v, DB_NEXT)==0) { int kv=*(int*)k.data; int dv=*(int*)v.data; //printf(" sorted com_data[%d]=%d, cursor got %d\n", curscount, com_data[curscount].x, kv); assert(com_data[curscount].x==kv); DBT k2,v2; memset(&k2, 0, sizeof(k2)); memset(&v2, 0, sizeof(v2)); char hello[30], there[30]; snprintf(hello, sizeof(hello), "hello%d.%d", kv, dv); snprintf(there, sizeof(hello), "there%d", dv); k2.data = hello; k2.size=strlen(hello)+1; //printf("committed: %u,%u\n", kv, dv); r=db->get(db, txn, &k2, &v2, 0); assert(r==0); assert(strcmp(v2.data, there)==0); curscount++; } assert(curscount==com_count); r=cursor->c_close(cursor); assert(r==0); r=txn->commit(txn, 0); assert(r==0); } static void make_db (void) { DB_ENV *env; DB *db; DB_TXN *tid, *bookx; int r; int i; system("rm -rf " ENVDIR); r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); r=db_env_create(&env, 0); assert(r==0); env->set_errfile(env, stderr); r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r=db_create(&db, env, 0); CKERR(r); r=db_create(&pending_i, env, 0); CKERR(r); r=db_create(&pending_d, env, 0); CKERR(r); r=db_create(&committed, env, 0); CKERR(r); r=env->txn_begin(env, 0, &tid, 0); assert(r==0); r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r=pending_i->open(pending_i, tid, "pending_i.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r=pending_d->open(pending_d, tid, "pending_d.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r=committed->open(committed, tid, "committed.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r=tid->commit(tid, 0); assert(r==0); r=env->txn_begin(env, 0, &tid, 0); assert(r==0); r=env->txn_begin(env, 0, &bookx, 0); assert(r==0); for (i=0; i<N; i++) { int randv = myrandom(); //if (i%10000==0) printf("."); if (randv%100==0) { r=tid->abort(tid); assert(r==0); r=bookx->commit(bookx, 0); assert(r==0); r=env->txn_begin(env, 0, &bookx, 0); assert(r==0); abort_items(env); r=env->txn_begin(env, 0, &tid, 0); assert(r==0); } else if (randv%1000==1) { r=tid->commit(tid, 0); assert(r==0); r=bookx->commit(bookx, 0); assert(r==0); r=env->txn_begin(env, 0, &bookx, 0); assert(r==0); commit_items(env, i); r=env->txn_begin(env, 0, &tid, 0); assert(r==0); } else if (randv%3==0) { delete_a_random_item(db, tid, bookx); } else { put_a_random_item(db, tid, i, bookx); } } r=tid->commit(tid, 0); assert(r==0); r=bookx->commit(bookx, 0); assert(r==0); commit_items(env, i); verify_items(env, db); r=pending_i->close(pending_i, 0); assert(r==0); r=pending_d->close(pending_d, 0); assert(r==0); r=committed->close(committed, 0); assert(r==0); r=db->close(db, 0); assert(r==0); r=env->close(env, 0); assert(r==0); } int test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute__((__unused__))) { make_db(); return 0; }