Commit bea9b8af authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3043 merge changes from code review to main refs[t:3043]

git-svn-id: file:///svn/toku/tokudb@25904 c7de825b-a66e-492c-adef-691d508d4ae1
parent 076ee90f
......@@ -1672,37 +1672,37 @@ ule_add_placeholders(ULE ule, XIDS xids) {
}
}
int
uint64_t
ule_num_uxrs(ULE ule) {
return ule->num_cuxrs + ule->num_puxrs;
}
UXR
ule_get_uxr(ULE ule, int ith) {
invariant(0 <= ith && ith < ule_num_uxrs(ule));
ule_get_uxr(ULE ule, uint64_t ith) {
invariant(ith < ule_num_uxrs(ule));
return &ule->uxrs[ith];
}
int
uint32_t
ule_get_num_committed(ULE ule) {
return ule->num_cuxrs;
}
int
uint32_t
ule_get_num_provisional(ULE ule) {
return ule->num_puxrs;
}
int
ule_is_committed(ULE ule, int ith) {
invariant(0 <= ith && ith < ule_num_uxrs(ule));
return ith < (int) ule->num_cuxrs;
ule_is_committed(ULE ule, uint64_t ith) {
invariant(ith < ule_num_uxrs(ule));
return ith < ule->num_cuxrs;
}
int
ule_is_provisional(ULE ule, int ith) {
invariant(0 <= ith && ith < ule_num_uxrs(ule));
return ith >= (int) ule->num_cuxrs;
ule_is_provisional(ULE ule, uint64_t ith) {
invariant(ith < ule_num_uxrs(ule));
return ith >= ule->num_cuxrs;
}
void *
......
......@@ -24,12 +24,12 @@ ULEHANDLE toku_ule_create(void * le_p);
void toku_ule_free(ULEHANDLE ule_p);
int ule_num_uxrs(ULEHANDLE ule);
int ule_get_num_committed(ULEHANDLE ule);
int ule_get_num_provisional(ULEHANDLE ule);
UXRHANDLE ule_get_uxr(ULEHANDLE ule, int ith);
int ule_is_committed(ULEHANDLE ule, int ith);
int ule_is_provisional(ULEHANDLE ule, int ith);
uint64_t ule_num_uxrs(ULEHANDLE ule);
uint32_t ule_get_num_committed(ULEHANDLE ule);
uint32_t ule_get_num_provisional(ULEHANDLE ule);
UXRHANDLE ule_get_uxr(ULEHANDLE ule, uint64_t ith);
int ule_is_committed(ULEHANDLE ule, uint64_t ith);
int ule_is_provisional(ULEHANDLE ule, uint64_t ith);
void *ule_get_key(ULEHANDLE ule);
uint32_t ule_get_keylen(ULEHANDLE ule);
......
......@@ -53,12 +53,13 @@ indexer_commit_keys_add(struct indexer_commit_keys *keys, size_t length, void *p
int new_max_keys = keys->max_keys == 0 ? 8 : keys->max_keys * 2;
keys->keys = (DBT *) toku_realloc(keys->keys, new_max_keys * sizeof (DBT));
resource_assert(keys->keys);
for (int i = keys->current_keys; i < new_max_keys; i++)
for (int i = keys->current_keys; i < new_max_keys; i++) {
toku_init_dbt(&keys->keys[i]);
keys->keys[i].flags = DB_DBT_REALLOC;
}
keys->max_keys = new_max_keys;
}
DBT *key = &keys->keys[keys->current_keys];
key->flags = DB_DBT_REALLOC;
toku_dbt_set(length, ptr, key, NULL);
keys->current_keys++;
}
......@@ -70,11 +71,10 @@ indexer_commit_keys_set_empty(struct indexer_commit_keys *keys) {
}
// internal functions
static int indexer_setup_xids_committed(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result);
static int indexer_setup_xids_provisional(DB_INDEXER *indexer, ULEHANDLE ule, int trindex, TXNID xid, BOOL xid_is_live, XIDS *xids_result);
static int indexer_set_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result);
static int indexer_append_xid(DB_INDEXER *indexer, TXNID xid, BOOL xid_is_live, XIDS *xids_result);
static int indexer_find_prev_committed(DB_INDEXER *indexer, ULEHANDLE ule, int i);
static int indexer_find_prev_provisional(DB_INDEXER *indexer, ULEHANDLE ule, int i);
static BOOL indexer_find_prev_xr(DB_INDEXER *indexer, ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex);
static int indexer_generate_hot_key_val(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, UXRHANDLE uxr, DBT *hotkey, DBT *hotval);
static int indexer_brt_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
......@@ -96,9 +96,9 @@ indexer_undo_do_init(DB_INDEXER *indexer) {
// destroy the undo globals
void
indexer_undo_do_destroy(DB_INDEXER *indexer) {
indexer_commit_keys_destroy(&indexer->i->commit_keys);
toku_destroy_dbt(&indexer->i->hotkey);
toku_destroy_dbt(&indexer->i->hotval);
indexer_commit_keys_destroy(&indexer->i->commit_keys);
}
static int
......@@ -109,36 +109,33 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
XIDS xids = xids_get_root_xids();
// scan the committed stack from bottom to top
int num_committed = ule_get_num_committed(ule);
for (int trindex = 0; trindex < num_committed; trindex++) {
uint32_t num_committed = ule_get_num_committed(ule);
for (uint64_t xrindex = 0; xrindex < num_committed; xrindex++) {
indexer_commit_keys_set_empty(&indexer->i->commit_keys);
// get the transaction record
UXRHANDLE uxr = ule_get_uxr(ule, trindex);
invariant(uxr);
UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
// setup up the xids
TXNID this_xid = uxr_get_txnid(uxr);
result = indexer_setup_xids_committed(indexer, this_xid, &xids);
result = indexer_set_xid(indexer, this_xid, &xids);
if (result != 0)
break;
// skip placeholders
if (uxr_is_placeholder(uxr)) {
invariant(0); // not allowed
continue;
}
// placeholders in the committed stack are not allowed
if (uxr_is_placeholder(uxr))
invariant(0);
// undo
int prev_trindex = indexer_find_prev_committed(indexer, ule, trindex);
if (prev_trindex >= 0) {
UXRHANDLE prevuxr = ule_get_uxr(ule, prev_trindex);
if (xrindex > 0) {
uint64_t prev_xrindex = xrindex - 1;
UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex);
if (uxr_is_delete(prevuxr)) {
; // do nothing
} else if (uxr_is_insert(prevuxr)) {
// generate the hot key and val
result = indexer_generate_hot_key_val(indexer, hotdb, ule, prevuxr, &indexer->i->hotkey, &indexer->i->hotval);
// generate the hot delete key
result = indexer_generate_hot_key_val(indexer, hotdb, ule, prevuxr, &indexer->i->hotkey, NULL);
if (result == 0) {
// send the delete message
result = indexer_brt_delete_committed(indexer, hotdb, &indexer->i->hotkey, xids);
......@@ -155,14 +152,13 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
if (uxr_is_delete(uxr)) {
; // do nothing
} else if (uxr_is_insert(uxr)) {
// generate the hot key and val
// generate the hot insert key and val
result = indexer_generate_hot_key_val(indexer, hotdb, ule, uxr, &indexer->i->hotkey, &indexer->i->hotval);
if (result == 0) {
// send the insert message
result = indexer_brt_insert_committed(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
if (result == 0) {
if (result == 0)
indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
}
}
} else
invariant(0);
......@@ -193,23 +189,22 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
BOOL outermost_is_live = FALSE;
// scan the provisional stack from bottom to top
int num_committed = ule_get_num_committed(ule);
int num_provisional = ule_get_num_provisional(ule);
for (int trindex = num_committed; trindex < num_committed + num_provisional; trindex++) {
uint32_t num_committed = ule_get_num_committed(ule);
uint32_t num_provisional = ule_get_num_provisional(ule);
for (uint64_t xrindex = num_committed; xrindex < num_committed + num_provisional; xrindex++) {
// get the ith transaction record
UXRHANDLE uxr = ule_get_uxr(ule, trindex);
invariant(uxr);
UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
TXNID this_xid = uxr_get_txnid(uxr);
BOOL this_xid_is_live = indexer_is_xid_live(indexer, this_xid);
if (trindex == num_committed) {
if (xrindex == num_committed) {
outermost_xid = this_xid;
outermost_is_live = this_xid_is_live;
}
// setup up the xids
result = indexer_setup_xids_provisional(indexer, ule, trindex, this_xid, this_xid_is_live, &xids);
result = indexer_append_xid(indexer, this_xid, this_xid_is_live, &xids);
if (result != 0)
break;
......@@ -218,14 +213,15 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
continue;
// undo
int prev_trindex = indexer_find_prev_provisional(indexer, ule, trindex);
if (prev_trindex >= 0) {
UXRHANDLE prevuxr = ule_get_uxr(ule, prev_trindex);
uint64_t prev_xrindex;
BOOL prev_xrindex_found = indexer_find_prev_xr(indexer, ule, xrindex, &prev_xrindex);
if (prev_xrindex_found) {
UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex);
if (uxr_is_delete(prevuxr)) {
; // do nothing
} else if (uxr_is_insert(prevuxr)) {
// generate the hot key and val
result = indexer_generate_hot_key_val(indexer, hotdb, ule, prevuxr, &indexer->i->hotkey, &indexer->i->hotval);
// generate the hot delete key
result = indexer_generate_hot_key_val(indexer, hotdb, ule, prevuxr, &indexer->i->hotkey, NULL);
if (result == 0) {
// send the delete message
if (!outermost_is_live) {
......@@ -248,7 +244,7 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
if (uxr_is_delete(uxr)) {
; // do nothing
} else if (uxr_is_insert(uxr)) {
// generate the hot key and val
// generate the hot insert key and val
result = indexer_generate_hot_key_val(indexer, hotdb, ule, uxr, &indexer->i->hotkey, &indexer->i->hotval);
if (result == 0) {
// send the insert message
......@@ -281,12 +277,9 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
return result;
}
int
indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
int result;
result = indexer_undo_do_committed(indexer, hotdb, ule);
int result = indexer_undo_do_committed(indexer, hotdb, ule);
if (result == 0)
result = indexer_undo_do_provisional(indexer, hotdb, ule);
......@@ -298,11 +291,11 @@ indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
// the committed XIDS always = [this_xid]
static int
indexer_setup_xids_committed(DB_INDEXER *UU(indexer), TXNID this_xid, XIDS *xids_result) {
indexer_set_xid(DB_INDEXER *UU(indexer), TXNID this_xid, XIDS *xids_result) {
int result = 0;
XIDS old_xids = *xids_result;
XIDS new_xids = xids_get_root_xids();
if (this_xid > 0) {
if (this_xid != TXNID_NONE) {
XIDS child_xids;
result = xids_create_child(new_xids, &child_xids, this_xid);
xids_destroy(&new_xids);
......@@ -319,8 +312,7 @@ indexer_setup_xids_committed(DB_INDEXER *UU(indexer), TXNID this_xid, XIDS *xids
// the provisional XIDS = XIDS . [this_xid] when this_xid is live or when XIDS is empty
static int
indexer_setup_xids_provisional(DB_INDEXER *UU(indexer), ULEHANDLE ule, int trindex, TXNID xid, BOOL xid_is_live, XIDS *xids_result) {
invariant(trindex >= ule_get_num_committed(ule));
indexer_append_xid(DB_INDEXER *UU(indexer), TXNID xid, BOOL xid_is_live, XIDS *xids_result) {
int result = 0;
XIDS old_xids = *xids_result;
XIDS new_xids;
......@@ -406,35 +398,30 @@ indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_
return result;
}
static int
indexer_find_prev_committed(DB_INDEXER *UU(indexer), ULEHANDLE ule, int i) {
invariant(i < ule_num_uxrs(ule));
int previ = i - 1;
return previ;
}
static int
indexer_find_prev_provisional(DB_INDEXER *UU(indexer), ULEHANDLE ule, int i) {
invariant(i < ule_num_uxrs(ule));
int previ = i - 1;
while (previ >= 0) {
UXRHANDLE uxr = ule_get_uxr(ule, previ);
if (!uxr_is_placeholder(uxr))
break;
previ -= 1;
static BOOL
indexer_find_prev_xr(DB_INDEXER *UU(indexer), ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex) {
invariant(xrindex < ule_num_uxrs(ule));
BOOL prev_found = FALSE;
while (xrindex > 0) {
xrindex -= 1;
UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
if (!uxr_is_placeholder(uxr)) {
*prev_xrindex = xrindex;
prev_found = TRUE;
break;
}
}
return previ;
return prev_found;
}
static TOKUTXN
indexer_get_innermost_live_txn(DB_INDEXER *indexer, XIDS xids) {
DB_ENV *env = indexer->i->env;
uint8_t num_xids = xids_get_num_xids(xids);
TXNID xid = xids_get_xid(xids, num_xids-1);
TOKUTXN txn = NULL;
for (uint8_t num_xids = xids_get_num_xids(xids); txn == NULL && num_xids > 0; num_xids--) {
TXNID xid = xids_get_xid(xids, num_xids-1);
int result = toku_txnid2txn(env->i->logger, xid, &txn);
invariant(result == 0);
}
int result = toku_txnid2txn(env->i->logger, xid, &txn);
invariant(result == 0);
return txn;
}
......
......@@ -168,18 +168,19 @@ put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *
lazy_assert(0);
}
switch (dest_data->flags) {
case 0:
lazy_assert(0);
break;
case DB_DBT_REALLOC:
dest_data->data = toku_realloc(dest_data->data, src_key->size);
memcpy(dest_data->data, src_key->data, src_key->size);
dest_data->size = src_key->size;
break;
default:
lazy_assert(0);
}
if (dest_data)
switch (dest_data->flags) {
case 0:
lazy_assert(0);
break;
case DB_DBT_REALLOC:
dest_data->data = toku_realloc(dest_data->data, src_key->size);
memcpy(dest_data->data, src_key->data, src_key->size);
dest_data->size = src_key->size;
break;
default:
lazy_assert(0);
}
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment