/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: /* * Copyright (c) 2010-2012 Tokutek Inc. All rights reserved. * The technology is licensed by the Massachusetts Institute of Technology, * Rutgers State University of New Jersey, and the Research Foundation of * State University of New York at Stony Brook under United States of America * Serial No. 11/760379 and to the patents and/or patent applications resulting from it. */ #ident "$Id$" // test the hotindexer undo do function // read a description of the live transactions and a leafentry from a test file, run the undo do function, // and print out the actions taken by the undo do function while processing the leafentry #include "test.h" #include <ft/fttypes.h> #include <ft/omt.h> #include <ft/leafentry.h> #include <ft/ule.h> #include <ft/ule-internal.h> #include <ft/le-cursor.h> #include "indexer-internal.h" #include <ft/xids-internal.h> struct txn { TXNID xid; TOKUTXN_STATE state; }; struct live { int n; int o; struct txn *txns; }; static void live_init(struct live *live) { live->n = live->o = 0; live->txns = NULL; } static void live_destroy(struct live *live) { toku_free(live->txns); } static void live_add(struct live *live, TXNID xid, TOKUTXN_STATE state) { if (live->o >= live->n) { int newn = live->n == 0 ? 1 : live->n * 2; live->txns = (struct txn *) toku_realloc(live->txns, newn * sizeof (struct txn)); resource_assert(live->txns); live->n = newn; } live->txns[live->o++] = (struct txn ) { xid, state }; } static TOKUTXN_STATE lookup_txn_state(struct live *live, TXNID xid) { TOKUTXN_STATE r = TOKUTXN_RETIRED; for (int i = 0; i < live->o; i++) { if (live->txns[i].xid == xid) { r = live->txns[i].state; break; } } return r; } // live transaction ID set struct live live_xids; static void uxr_init(UXR uxr, uint8_t type, void *val, uint32_t vallen, TXNID xid) { uxr->type = type; uxr->valp = toku_malloc(vallen); resource_assert(uxr->valp); memcpy(uxr->valp, val, vallen); uxr->vallen = vallen; uxr->xid = xid; } static void uxr_destroy(UXR uxr) { toku_free(uxr->valp); uxr->valp = NULL; } static ULE ule_init(ULE ule) { ule->num_puxrs = 0; ule->num_cuxrs = 0; ule->keyp = NULL; ule->keylen = 0; ule->uxrs = ule->uxrs_static; return ule; } static void ule_set_key(ULE ule, void *key, uint32_t keylen) { ule->keyp = toku_realloc(ule->keyp, keylen); memcpy(ule->keyp, key, keylen); ule->keylen = keylen; } static void ule_destroy(ULE ule) { for (unsigned int i = 0; i < ule->num_cuxrs + ule->num_puxrs; i++) uxr_destroy(&ule->uxrs[i]); toku_free(ule->keyp); ule->keyp = NULL; } static void ule_add_provisional(ULE ule, UXR uxr) { invariant(ule->num_cuxrs + ule->num_puxrs + 1 <= MAX_TRANSACTION_RECORDS*2); ule->uxrs[ule->num_cuxrs + ule->num_puxrs] = *uxr; ule->num_puxrs++; } static void ule_add_committed(ULE ule, UXR uxr) { lazy_assert(ule->num_puxrs == 0); invariant(ule->num_cuxrs + 1 <= MAX_TRANSACTION_RECORDS*2); ule->uxrs[ule->num_cuxrs] = *uxr; ule->num_cuxrs++; } static ULE ule_create(void) { ULE ule = (ULE) toku_calloc(1, sizeof (ULE_S)); resource_assert(ule); if (ule) ule_init(ule); return ule; } static void ule_free(ULE ule) { ule_destroy(ule); toku_free(ule); } static void print_xids(XIDS xids) { printf("["); if (xids->num_xids == 0) printf("0"); else { for (int i = 0; i < xids->num_xids; i++) { printf("%" PRIu64, xids->ids[i]); if (i+1 < xids->num_xids) printf(","); } } printf("] "); } static void print_dbt(DBT *dbt) { printf("%.*s ", dbt->size, (char *) dbt->data); } static int put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) { (void) dest_db; (void) src_db; (void) dest_key; (void) dest_data; (void) src_key; (void) src_data; lazy_assert(src_db != NULL && dest_db != NULL); switch (dest_key->flags) { case 0: dest_key->data = src_data->data; dest_key->size = src_data->size; break; case DB_DBT_REALLOC: dest_key->data = toku_realloc(dest_key->data, src_data->size); memcpy(dest_key->data, src_data->data, src_data->size); dest_key->size = src_data->size; break; default: lazy_assert(0); } if (dest_data) switch (dest_data->flags) { case 0: lazy_assert(0); break; case DB_DBT_REALLOC: dest_data->data = toku_realloc(dest_data->data, src_key->size); memcpy(dest_data->data, src_key->data, src_key->size); dest_data->size = src_key->size; break; default: lazy_assert(0); } return 0; } static int del_callback(DB *dest_db, DB *src_db, DBT *dest_key, const DBT *src_key, const DBT *src_data) { (void) dest_db; (void) src_db; (void) dest_key; (void) src_key; (void) src_data; lazy_assert(src_db != NULL && dest_db != NULL); switch (dest_key->flags) { case 0: dest_key->data = src_data->data; dest_key->size = src_data->size; break; case DB_DBT_REALLOC: dest_key->data = toku_realloc(dest_key->data, src_data->size); memcpy(dest_key->data, src_data->data, src_data->size); dest_key->size = src_data->size; break; default: lazy_assert(0); } return 0; } static DB_INDEXER *test_indexer = NULL; static DB *test_hotdb = NULL; static TOKUTXN_STATE test_xid_state(DB_INDEXER *indexer, TXNID xid) { invariant(indexer == test_indexer); TOKUTXN_STATE r = lookup_txn_state(&live_xids, xid); return r; } static int test_lock_key(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key) { invariant(indexer == test_indexer); invariant(hotdb == test_hotdb); TOKUTXN_STATE txn_state = test_xid_state(indexer, xid); invariant(txn_state == TOKUTXN_LIVE || txn_state == TOKUTXN_PREPARING); printf("lock [%" PRIu64 "] ", xid); print_dbt(key); printf("\n"); return 0; } static int test_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) { invariant(indexer == test_indexer); invariant(hotdb == test_hotdb); printf("delete_provisional "); print_xids(xids); print_dbt(hotkey); printf("\n"); return 0; } static int test_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) { invariant(indexer == test_indexer); invariant(hotdb == test_hotdb); printf("delete_committed "); print_xids(xids); print_dbt(hotkey); printf("\n"); return 0; } static int test_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids) { invariant(indexer == test_indexer); invariant(hotdb == test_hotdb); printf("insert_provisional "); print_xids(xids); print_dbt(hotkey); print_dbt(hotval); printf("\n"); return 0; } static int test_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids) { invariant(indexer == test_indexer); invariant(hotdb == test_hotdb); printf("insert_committed "); print_xids(xids); print_dbt(hotkey); print_dbt(hotval); printf("\n"); return 0; } static int test_commit_any(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) { invariant(indexer == test_indexer); invariant(hotdb == test_hotdb); printf("commit_any "); print_xids(xids); print_dbt(hotkey); printf("\n"); return 0; } static int split_fields(char *line, char *fields[], int maxfields) { int i; for (i = 0; i < maxfields; i++, line = NULL) { fields[i] = strtok(line, " "); if (fields[i] == NULL) break; } return i; } static int read_line(char **line_ptr, size_t *len_ptr, FILE *f) { char *line = *line_ptr; size_t len = 0; bool in_comment = false; while (1) { int c = fgetc(f); if (c == EOF) break; else if (c == '\n') { in_comment = false; if (len > 0) break; } else { if (c == '#') in_comment = true; if (!in_comment) { XREALLOC_N(len+1, line); line[len++] = c; } } } if (len > 0) { XREALLOC_N(len+1, line); line[len] = '\0'; } *line_ptr = line; *len_ptr = len; return len == 0 ? -1 : 0; } static int read_test(char *testname, ULE ule) { int r = 0; FILE *f = fopen(testname, "r"); if (f) { char *line = NULL; size_t len = 0; while (read_line(&line, &len, f) != -1) { // printf("%s", line); const int maxfields = 8; char *fields[maxfields]; int nfields = split_fields(line, fields, maxfields); // for (int i = 0; i < nfields; i++); printf("%s ", fields[i]); printf("\n"); if (nfields < 1) continue; // live xid... if (strcmp(fields[0], "live") == 0) { for (int i = 1; i < nfields; i++) live_add(&live_xids, atoll(fields[i]), TOKUTXN_LIVE); continue; } // xid <XID> [live|committing|aborting] if (strcmp(fields[0], "xid") == 0 && nfields == 3) { TXNID xid = atoll(fields[1]); TOKUTXN_STATE state = TOKUTXN_RETIRED; if (strcmp(fields[2], "live") == 0) state = TOKUTXN_LIVE; else if (strcmp(fields[2], "preparing") == 0) state = TOKUTXN_PREPARING; else if (strcmp(fields[2], "committing") == 0) state = TOKUTXN_COMMITTING; else if (strcmp(fields[2], "aborting") == 0) state = TOKUTXN_ABORTING; else assert(0); live_add(&live_xids, xid, state); continue; } // key KEY if (strcmp(fields[0], "key") == 0 && nfields == 2) { ule_set_key(ule, fields[1], strlen(fields[1])); continue; } // insert committed|provisional XID DATA if (strcmp(fields[0], "insert") == 0 && nfields == 4) { UXR_S uxr_s; uxr_init(&uxr_s, XR_INSERT, fields[3], strlen(fields[3]), atoll(fields[2])); if (fields[1][0] == 'p') ule_add_provisional(ule, &uxr_s); if (fields[1][0] == 'c') ule_add_committed(ule, &uxr_s); continue; } // delete committed|provisional XID if (strcmp(fields[0], "delete") == 0 && nfields == 3) { UXR_S uxr_s; uxr_init(&uxr_s, XR_DELETE, NULL, 0, atoll(fields[2])); if (fields[1][0] == 'p') ule_add_provisional(ule, &uxr_s); if (fields[1][0] == 'c') ule_add_committed(ule, &uxr_s); continue; } // placeholder XID if (strcmp(fields[0], "placeholder") == 0 && nfields == 2) { UXR_S uxr_s; uxr_init(&uxr_s, XR_PLACEHOLDER, NULL, 0, atoll(fields[1])); ule_add_provisional(ule, &uxr_s); continue; } // placeholder provisional XID if (strcmp(fields[0], "placeholder") == 0 && nfields == 3 && fields[1][0] == 'p') { UXR_S uxr_s; uxr_init(&uxr_s, XR_PLACEHOLDER, NULL, 0, atoll(fields[2])); ule_add_provisional(ule, &uxr_s); continue; } fprintf(stderr, "%s???\n", line); r = EINVAL; } toku_free(line); fclose(f); } else { r = errno; fprintf(stderr, "fopen %s errno=%d\n", testname, errno); } return r; } static int run_test(char *envdir, char *testname) { if (verbose) printf("%s\n", testname); live_init(&live_xids); int r; DB_ENV *env = NULL; r = db_env_create(&env, 0); assert_zero(r); r = env->set_redzone(env, 0); assert_zero(r); r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r); r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r); r = env->open(env, envdir, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r); DB *src_db = NULL; r = db_create(&src_db, env, 0); assert_zero(r); r = src_db->open(src_db, NULL, "0.tdb", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r); DB *dest_db = NULL; r = db_create(&dest_db, env, 0); assert_zero(r); r = dest_db->open(dest_db, NULL, "1.tdb", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r); DB_TXN *txn = NULL; r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r); DB_INDEXER *indexer = NULL; r = env->create_indexer(env, txn, &indexer, src_db, 1, &dest_db, NULL, 0); assert_zero(r); // set test callbacks indexer->i->test_xid_state = test_xid_state; indexer->i->test_lock_key = test_lock_key; indexer->i->test_delete_provisional = test_delete_provisional; indexer->i->test_delete_committed = test_delete_committed; indexer->i->test_insert_provisional = test_insert_provisional; indexer->i->test_insert_committed = test_insert_committed; indexer->i->test_commit_any = test_commit_any; // verify indexer and hotdb in the callbacks test_indexer = indexer; test_hotdb = dest_db; // create a ule ULE ule = ule_create(); ule_init(ule); // read the test r = read_test(testname, ule); if (r != 0) return r; r = indexer->i->undo_do(indexer, dest_db, ule); assert_zero(r); ule_free(ule); r = indexer->close(indexer); assert_zero(r); r = txn->abort(txn); assert_zero(r); r = src_db->close(src_db, 0); assert_zero(r); r = dest_db->close(dest_db, 0); assert_zero(r); r = env->close(env, 0); assert_zero(r); live_destroy(&live_xids); return r; } int test_main(int argc, char * const argv[]) { int r; // parse_args(argc, argv); int i; for (i = 1; i < argc; i++) { char * const arg = argv[i]; if (strcmp(arg, "-v") == 0) { verbose++; continue; } if (strcmp(arg, "-q") == 0) { verbose = 0; continue; } break; } for (r = 0 ; r == 0 && i < argc; i++) { char *testname = argv[i]; char envdir[strlen(ENVDIR) + 1 + 32 + 1]; sprintf(envdir, "%s.%d", ENVDIR, toku_os_getpid()); char syscmd[32 + strlen(envdir)]; sprintf(syscmd, "rm -rf %s", envdir); r = system(syscmd); assert_zero(r); r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r); r = run_test(envdir, testname); } return r; }