Commit 9c146d6e authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

#refs 5133 merge undo-do algorithm changes to main. now, we read a row from...

#refs 5133 merge undo-do algorithm changes to main. now, we read a row from the source db AND gather up the leaf entrie's provisional data in the cursor callback, which makes it atomic with respect to commit and abort on that DB. had to fix a few tests after changing some leaf entry signatures


git-svn-id: file:///svn/toku/tokudb@44949 c7de825b-a66e-492c-adef-691d508d4ae1
parent 79d04799
......@@ -13,8 +13,6 @@
// - does not perform snapshot reads. it reads everything, including uncommitted.
//
// A LE_CURSOR is good for scanning a FT from beginning to end. Useful for hot indexing.
//
// It caches the key that is was last positioned over to speed up key comparisions.
struct le_cursor {
// TODO: remove DBs from the ft layer comparison function
......@@ -22,26 +20,22 @@ struct le_cursor {
// use a fake db for comparisons.
struct __toku_db fake_db;
FT_CURSOR ft_cursor;
DBT key; // the key that the le cursor is positioned at
// TODO a better implementation would fetch the key from the brt cursor
bool neg_infinity; // true when the le cursor is positioned at -infinity (initial setting)
bool pos_infinity; // true when the le cursor is positioned at +infinity (when _next returns DB_NOTFOUND)
};
int
toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE brt, TOKUTXN txn) {
toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE ft_handle, TOKUTXN txn) {
int result = 0;
LE_CURSOR le_cursor = (LE_CURSOR) toku_malloc(sizeof (struct le_cursor));
if (le_cursor == NULL) {
result = errno;
}
else {
result = toku_ft_cursor(brt, &le_cursor->ft_cursor, txn, false, false);
result = toku_ft_cursor(ft_handle, &le_cursor->ft_cursor, txn, false, false);
if (result == 0) {
// TODO move the leaf mode to the brt cursor constructor
// TODO move the leaf mode to the ft cursor constructor
toku_ft_cursor_set_leaf_mode(le_cursor->ft_cursor);
toku_init_dbt(&le_cursor->key);
le_cursor->key.flags = DB_DBT_REALLOC;
le_cursor->neg_infinity = true;
le_cursor->pos_infinity = false;
// zero out the fake DB. this is a rare operation so it's not too slow.
......@@ -61,45 +55,25 @@ toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE brt, TOKUTXN txn) {
int
toku_le_cursor_close(LE_CURSOR le_cursor) {
int result = toku_ft_cursor_close(le_cursor->ft_cursor);
toku_destroy_dbt(&le_cursor->key);
toku_free(le_cursor);
return result;
}
// this implementation copies the key and leafentry into the supplied DBTs.
// this may be too slow. an alternative implementation could avoid copying the
// key by fetching the key from the brt cursor, and could avoid copying the leaf entry
// by processing the leaf entry in the brt cursor callback.
struct le_cursor_callback_arg {
DBT *key, *val;
};
// copy the key and the leaf entry to the given DBTs
static int
le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *v, bool lock_only) {
if (lock_only) {
; // do nothing
} else {
struct le_cursor_callback_arg *arg = (struct le_cursor_callback_arg *) v;
toku_dbt_set(keylen, key, arg->key, NULL);
toku_dbt_set(vallen, val, arg->val, NULL);
}
return 0;
}
// Move to the next leaf entry under the LE_CURSOR
// Success: returns zero, calls the getf callback with the getf_v parameter
// Failure: returns a non-zero error number
int
toku_le_cursor_next(LE_CURSOR le_cursor, DBT *le) {
toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) {
int result;
if (le_cursor->pos_infinity)
if (le_cursor->pos_infinity) {
result = DB_NOTFOUND;
else {
} else {
le_cursor->neg_infinity = false;
struct le_cursor_callback_arg arg = { &le_cursor->key, le };
// TODO replace this with a non deprecated function
result = toku_ft_cursor_get(le_cursor->ft_cursor, NULL, le_cursor_callback, &arg, DB_NEXT);
if (result == DB_NOTFOUND)
// TODO replace this with a non deprecated function. Which?
result = toku_ft_cursor_get(le_cursor->ft_cursor, NULL, getf, getf_v, DB_NEXT);
if (result == DB_NOTFOUND) {
le_cursor->pos_infinity = true;
}
}
return result;
}
......@@ -115,9 +89,10 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) {
// get the comparison function and descriptor from the cursor's ft
FT_HANDLE ft_handle = le_cursor->ft_cursor->ft_handle;
ft_compare_func keycompare = toku_ft_get_bt_compare(ft_handle);
// store the descriptor in the fake DB to do a key comparison
le_cursor->fake_db.cmp_descriptor = toku_ft_get_cmp_descriptor(ft_handle);
int r = keycompare(&le_cursor->fake_db, &le_cursor->key, key);
// get the current position from the cursor and compare it to the given key.
DBT *cursor_key = &le_cursor->ft_cursor->key;
int r = keycompare(&le_cursor->fake_db, cursor_key, key);
if (r < 0) {
result = true; // key is right of the cursor key
} else {
......
......@@ -7,6 +7,8 @@
#ifndef LE_CURSOR_H
#define LE_CURSOR_H
#include "ft-ops.h"
// A leaf entry cursor (LE_CURSOR) is a special type of FT_CURSOR that visits all of the leaf entries in a tree
// and returns the leaf entry to the caller. It maintains a copy of the key that it was last positioned over to
// speed up key comparisions with a given key. For example, the hot indexing could use the _key_right_of_cursor
......@@ -27,10 +29,10 @@ int toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE brt, TOKUTXN tx
// Failure: returns a non-zero error number
int toku_le_cursor_close(LE_CURSOR le_cursor);
// Retrieve the next leaf entry under the LE_CURSOR
// Success: returns zero, stores the leaf entry key into the key dbt, and the leaf entry into the val dbt
// Move to the next leaf entry under the LE_CURSOR
// Success: returns zero, calls the getf callback with the getf_v parameter
// Failure: returns a non-zero error number
int toku_le_cursor_next(LE_CURSOR le_cursor, DBT *le);
int toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v);
// Return TRUE if the key is to the right of the LE_CURSOR position. that is, current cursor key < given key
// Otherwise returns FALSE when the key is at or to the left of the LE_CURSOR position. that is, current cursor key >= given key
......
......@@ -13,6 +13,21 @@
static TOKUTXN const null_txn = 0;
static DB * const null_db = 0;
static int
get_next_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN vallen, bytevec val, void *extra, bool lock_only) {
DBT *val_dbt = extra;
if (!lock_only) {
toku_dbt_set(vallen, val, val_dbt, NULL);
}
return 0;
}
static int
le_cursor_get_next(LE_CURSOR cursor, DBT *val) {
int r = toku_le_cursor_next(cursor, get_next_callback, val);
return r;
}
static int test_ft_cursor_keycompare(DB *desc __attribute__((unused)), const DBT *a, const DBT *b) {
return toku_keycompare(a->data, a->size, b->data, b->size);
}
......@@ -141,7 +156,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
int i;
for (i=0; ; i++) {
error = toku_le_cursor_next(cursor, &val);
error = le_cursor_get_next(cursor, &val);
if (error != 0)
break;
......
......@@ -16,6 +16,21 @@
static TOKUTXN const null_txn = 0;
static DB * const null_db = 0;
static int
get_next_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN vallen, bytevec val, void *extra, bool lock_only) {
DBT *val_dbt = extra;
if (!lock_only) {
toku_dbt_set(vallen, val, val_dbt, NULL);
}
return 0;
}
static int
le_cursor_get_next(LE_CURSOR cursor, DBT *val) {
int r = toku_le_cursor_next(cursor, get_next_callback, val);
return r;
}
static int
test_keycompare(DB* UU(desc), const DBT *a, const DBT *b) {
return toku_keycompare(a->data, a->size, b->data, b->size);
......@@ -148,7 +163,7 @@ test_pos_infinity(const char *fname, int n) {
int i;
for (i = 0; ; i++) {
error = toku_le_cursor_next(cursor, &val);
error = le_cursor_get_next(cursor, &val);
if (error != 0)
break;
......@@ -210,7 +225,7 @@ test_between(const char *fname, int n) {
int i;
for (i = 0; ; i++) {
// move the LE_CURSOR forward
error = toku_le_cursor_next(cursor, &val);
error = le_cursor_get_next(cursor, &val);
if (error != 0)
break;
......
......@@ -13,6 +13,21 @@
static TOKUTXN const null_txn = 0;
static DB * const null_db = 0;
static int
get_next_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN vallen, bytevec val, void *extra, bool lock_only) {
DBT *val_dbt = extra;
if (!lock_only) {
toku_dbt_set(vallen, val, val_dbt, NULL);
}
return 0;
}
static int
le_cursor_get_next(LE_CURSOR cursor, DBT *val) {
int r = toku_le_cursor_next(cursor, get_next_callback, val);
return r;
}
static int test_ft_cursor_keycompare(DB *db __attribute__((unused)), const DBT *a, const DBT *b) {
return toku_keycompare(a->data, a->size, b->data, b->size);
}
......@@ -109,7 +124,7 @@ walk_tree(const char *fname, int n) {
for (i = 0; ; i++) {
error = TOKUDB_TRY_AGAIN;
while (error == TOKUDB_TRY_AGAIN) {
error = toku_le_cursor_next(cursor, &val);
error = le_cursor_get_next(cursor, &val);
}
if (error != 0)
break;
......
......@@ -23,6 +23,19 @@ struct indexer_commit_keys {
DBT *keys; // the variable length keys array
};
// a ule and all of its provisional txn info
// used by the undo-do algorithm to gather up ule provisional info in
// a cursor callback that provides exclusive access to the source DB
// with respect to txn commit and abort
struct ule_prov_info {
ULEHANDLE ule;
uint32_t num_provisional;
uint32_t num_committed;
TXNID *prov_ids;
TOKUTXN *prov_txns;
TOKUTXN_STATE *prov_states;
};
struct __toku_indexer_internal {
DB_ENV *env;
DB_TXN *txn;
......@@ -63,6 +76,6 @@ void indexer_undo_do_init(DB_INDEXER *indexer);
void indexer_undo_do_destroy(DB_INDEXER *indexer);
int indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
int indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, struct ule_prov_info *prov_info);
#endif
......@@ -177,49 +177,6 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
return result;
}
static void fill_prov_info(
ULEHANDLE ule,
TXNID* prov_ids,
TOKUTXN_STATE* prov_states,
TOKUTXN* prov_txns,
DB_INDEXER *indexer
)
{
uint32_t num_provisional = ule_get_num_provisional(ule);
uint32_t num_committed = ule_get_num_committed(ule);
DB_ENV *env = indexer->i->env;
TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
toku_txn_manager_suspend(txn_manager);
for (uint32_t i = 0; i < num_provisional; i++) {
UXRHANDLE uxr = ule_get_uxr(ule, num_committed+i);
prov_ids[i] = uxr_get_txnid(uxr);
if (indexer->i->test_xid_state) {
prov_states[i] = indexer->i->test_xid_state(indexer, prov_ids[i]);
prov_txns[i] = NULL;
}
else {
TOKUTXN txn = NULL;
toku_txn_manager_id2txn_unlocked(
txn_manager,
prov_ids[i],
&txn
);
prov_txns[i] = txn;
if (txn) {
prov_states[i] = toku_txn_get_state(txn);
if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) {
// pin
toku_txn_manager_pin_live_txn_unlocked(txn_manager, txn);
}
}
else {
prov_states[i] = TOKUTXN_RETIRED;
}
}
}
toku_txn_manager_resume(txn_manager);
}
static void release_txns(
ULEHANDLE ule,
TOKUTXN_STATE* prov_states,
......@@ -255,24 +212,24 @@ exit:
}
static int
indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, struct ule_prov_info *prov_info) {
int result = 0;
uint32_t num_committed = ule_get_num_committed(ule);
uint32_t num_provisional = ule_get_num_provisional(ule);
indexer_commit_keys_set_empty(&indexer->i->commit_keys);
// init the xids to the root xid
XIDS xids = xids_get_root_xids();
TXNID prov_ids[num_provisional];
TOKUTXN_STATE prov_states[num_provisional];
TOKUTXN prov_txns[num_provisional];
memset(prov_txns, 0, sizeof(prov_txns));
uint32_t num_provisional = prov_info->num_provisional;
uint32_t num_committed = prov_info->num_committed;
TXNID *prov_ids = prov_info->prov_ids;
TOKUTXN *prov_txns = prov_info->prov_txns;
TOKUTXN_STATE *prov_states = prov_info->prov_states;
// nothing to do if there's nothing provisional
if (num_provisional == 0) {
goto exit;
}
fill_prov_info(ule, prov_ids, prov_states, prov_txns, indexer);
TXNID outermost_xid_state = prov_states[0];
// scan the provisional stack from the outermost to the innermost transaction record
......@@ -407,13 +364,14 @@ exit:
}
int
indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, struct ule_prov_info *prov_info) {
int result = indexer_undo_do_committed(indexer, hotdb, ule);
if (result == 0)
result = indexer_undo_do_provisional(indexer, hotdb, ule);
if ( indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK )
if (result == 0) {
result = indexer_undo_do_provisional(indexer, hotdb, ule, prov_info);
}
if (indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK) {
result = EINVAL;
}
return result;
}
......
......@@ -67,10 +67,6 @@ toku_indexer_get_status(INDEXER_STATUS statp) {
#define STATUS_VALUE(x) indexer_status.status[x].value.num
#include "indexer-internal.h"
static int build_index(DB_INDEXER *indexer);
......@@ -148,6 +144,9 @@ toku_indexer_unlock(DB_INDEXER* indexer) {
toku_mutex_unlock(&indexer->i->indexer_lock);
}
// forward declare the test-only wrapper function for undo-do
static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
int
toku_indexer_create_indexer(DB_ENV *env,
DB_TXN *txn,
......@@ -176,7 +175,7 @@ toku_indexer_create_indexer(DB_ENV *env,
indexer->i->indexer_flags = indexer_flags;
indexer->i->loop_mod = 1000; // call poll_func every 1000 rows
indexer->i->estimated_rows = 0;
indexer->i->undo_do = indexer_undo_do; // TEST export the undo do function
indexer->i->undo_do = test_indexer_undo_do; // TEST export the undo do function
XCALLOC_N(N, indexer->i->fnums);
if ( !indexer->i->fnums ) { rval = ENOMEM; goto create_exit; }
......@@ -280,49 +279,176 @@ toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key) {
return toku_le_cursor_is_key_greater(indexer->i->lec, key);
}
// initialize provisional info by allocating enough space to
// hold provisional ids, states, and txns for each of the
// provisional entries in the ule. the ule remains owned by
// the caller, not the prov info.
static void
ule_prov_info_init(struct ule_prov_info *prov_info, ULEHANDLE ule) {
prov_info->ule = ule;
prov_info->num_provisional = ule_get_num_provisional(ule);
prov_info->num_committed = ule_get_num_committed(ule);
uint32_t n = prov_info->num_provisional;
if (n > 0) {
prov_info->prov_ids = toku_malloc(n * sizeof(prov_info->prov_ids));
prov_info->prov_states = toku_malloc(n * sizeof(prov_info->prov_states));
prov_info->prov_txns = toku_malloc(n * sizeof(prov_info->prov_txns));
}
}
// clean up anything possibly created by ule_prov_info_init()
static void
ule_prov_info_destroy(struct ule_prov_info *prov_info) {
if (prov_info->num_provisional > 0) {
toku_free(prov_info->prov_ids);
toku_free(prov_info->prov_states);
toku_free(prov_info->prov_txns);
} else {
// nothing to free if there was nothing provisional
invariant(prov_info->prov_ids == NULL);
invariant(prov_info->prov_states == NULL);
invariant(prov_info->prov_txns == NULL);
}
}
static void
indexer_fill_prov_info(DB_INDEXER *indexer, struct ule_prov_info *prov_info) {
ULEHANDLE ule = prov_info->ule;
uint32_t num_provisional = prov_info->num_provisional;
uint32_t num_committed = prov_info->num_committed;
TXNID *prov_ids = prov_info->prov_ids;
TOKUTXN_STATE *prov_states = prov_info->prov_states;
TOKUTXN *prov_txns = prov_info->prov_txns;
// hold the txn manager lock while we inspect txn state
// and pin some live txns
DB_ENV *env = indexer->i->env;
TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
toku_txn_manager_suspend(txn_manager);
for (uint32_t i = 0; i < num_provisional; i++) {
UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i);
prov_ids[i] = uxr_get_txnid(uxr);
if (indexer->i->test_xid_state) {
prov_states[i] = indexer->i->test_xid_state(indexer, prov_ids[i]);
prov_txns[i] = NULL;
}
else {
TOKUTXN txn = NULL;
toku_txn_manager_id2txn_unlocked(
txn_manager,
prov_ids[i],
&txn
);
prov_txns[i] = txn;
if (txn) {
prov_states[i] = toku_txn_get_state(txn);
if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) {
// pin this live txn so it can't commit or abort until we're done with it
toku_txn_manager_pin_live_txn_unlocked(txn_manager, txn);
}
}
else {
prov_states[i] = TOKUTXN_RETIRED;
}
}
}
toku_txn_manager_resume(txn_manager);
}
struct le_cursor_extra {
DB_INDEXER *indexer;
struct ule_prov_info *prov_info;
};
// cursor callback, so its synchronized with other db operations using
// cachetable pair locks. because no txn can commit on this db, read
// the provisional info for the newly read ule.
static int
le_cursor_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN vallen, bytevec val, void *extra, bool lock_only) {
if (lock_only || val == NULL) {
; // do nothing if only locking or val==NULL, meaning there are no more elements
} else {
struct le_cursor_extra *cursor_extra = extra;
struct ule_prov_info *prov_info = cursor_extra->prov_info;
// TODO(John): Do we need to actually copy this ule and save it after
// copying all of the provisional info? or is the info all we need?
void *le_buf = toku_xmemdup(val, vallen);
ULEHANDLE ule = toku_ule_create(le_buf);
invariant(ule);
ule_prov_info_init(prov_info, ule);
indexer_fill_prov_info(cursor_extra->indexer, prov_info);
}
return 0;
}
// get the next ule and fill out its provisional info in the
// prov_info struct provided. caller is responsible for cleaning
// up the ule info after it's done.
static int
get_next_ule_with_prov_info(DB_INDEXER *indexer, struct ule_prov_info *prov_info) {
struct le_cursor_extra extra = {
.indexer = indexer,
.prov_info = prov_info,
};
int r = toku_le_cursor_next(indexer->i->lec, le_cursor_callback, &extra);
return r;
}
static int
build_index(DB_INDEXER *indexer) {
int result = 0;
DBT key; toku_init_dbt_flags(&key, DB_DBT_REALLOC);
DBT le; toku_init_dbt_flags(&le, DB_DBT_REALLOC);
bool done = false;
for (uint64_t loop_count = 0; !done; loop_count++) {
toku_indexer_lock(indexer);
result = toku_le_cursor_next(indexer->i->lec, &le);
// grab the next leaf entry and get its provisional info. we'll
// need the provisional info for the undo-do algorithm, and we get
// it here so it can be read atomically with respect to txn commit
// and abort.
//
// this allocates space for the prov info, so we have to destroy it
// when we're done.
struct ule_prov_info prov_info;
memset(&prov_info, 0, sizeof(prov_info));
result = get_next_ule_with_prov_info(indexer, &prov_info);
if (result != 0) {
invariant(prov_info.ule == NULL);
done = true;
if (result == DB_NOTFOUND)
if (result == DB_NOTFOUND) {
result = 0; // all done, normal way to exit loop successfully
}
}
else {
// this code may be faster ule malloc/free is not done every time
ULEHANDLE ule = toku_ule_create(le.data);
invariant(prov_info.ule);
ULEHANDLE ule = prov_info.ule;
for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) {
DB *db = indexer->i->dest_dbs[which_db];
result = indexer_undo_do(indexer, db, ule);
if ( (result != 0) && (indexer->i->error_callback != NULL)) {
result = indexer_undo_do(indexer, db, ule, &prov_info);
if ((result != 0) && (indexer->i->error_callback != NULL)) {
// grab the key and call the error callback
DBT key; toku_init_dbt_flags(&key, DB_DBT_REALLOC);
toku_dbt_set(ule_get_keylen(ule), ule_get_key(ule), &key, NULL);
indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra);
toku_destroy_dbt(&key);
}
toku_ule_free(prov_info.ule);
}
toku_ule_free(ule);
}
toku_indexer_unlock(indexer);
ule_prov_info_destroy(&prov_info);
if (result == 0)
if (result == 0) {
result = maybe_call_poll_func(indexer, loop_count);
if (result != 0)
}
if (result != 0) {
done = true;
}
}
toku_destroy_dbt(&key);
toku_destroy_dbt(&le);
// post index creation cleanup
// - optimize?
// - garbage collect?
......@@ -334,7 +460,6 @@ build_index(DB_INDEXER *indexer) {
(void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1);
}
return result;
}
......@@ -349,11 +474,7 @@ close_indexer(DB_INDEXER *indexer) {
// to create them are not in the recovery log.)
DB_TXN *txn = indexer->i->txn;
TOKUTXN tokutxn = db_txn_struct_i(txn)->tokutxn;
//BRT brt; // caused a warning with -Wunused-but-set-variable
//DB *db;
for (int which_db = 0; which_db < indexer->i->N ; which_db++) {
//db = indexer->i->dest_dbs[which_db];
//brt = db_struct_i(db)->ft_handle;
toku_txn_require_checkpoint_on_commit(tokutxn);
}
......@@ -449,6 +570,19 @@ toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) {
indexer->i->test_only_flags = flags;
}
// this allows us to call the undo do function in tests using
// a convenience wrapper that gets and destroys the ule's prov info
static int
test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
struct ule_prov_info prov_info;
memset(&prov_info, 0, sizeof(prov_info));
ule_prov_info_init(&prov_info, ule);
indexer_fill_prov_info(indexer, &prov_info);
int r = indexer_undo_do(indexer, hotdb, ule, &prov_info);
ule_prov_info_destroy(&prov_info);
return r;
}
DB *
toku_indexer_get_src_db(DB_INDEXER *indexer) {
return indexer->i->src_db;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment