Commit d64dbafb authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

[t:3610] Get rid of a bunch of {{{toku_cacheable_get_and_pin}}} calls in favor...

[t:3610] Get rid of a bunch of {{{toku_cacheable_get_and_pin}}} calls in favor of the {{{toku_pin_brtnode}}} functions. Refs #3683.

git-svn-id: file:///svn/toku/tokudb@32712 c7de825b-a66e-492c-adef-691d508d4ae1
parent 99ba8643
......@@ -319,7 +319,7 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha
}
static inline void
toku_verify_estimates (BRT t, BRTNODE node);
toku_verify_estimates (BRT t, BRTNODE node, ANCESTORS, struct pivot_bounds const * const bounds);
void toku_unpin_brtnode (BRT brt, BRTNODE node)
// Effect: Unpin a brt node.
......@@ -416,9 +416,29 @@ fixup_child_estimates (BRTNODE node, int childnum_of_node, BRTNODE child, BOOL d
}
}
static struct kv_pair const *prepivotkey (BRTNODE node, int childnum, struct kv_pair const * const lower_bound_exclusive) {
if (childnum==0)
return lower_bound_exclusive;
else {
return node->childkeys[childnum-1];
}
}
static struct kv_pair const *postpivotkey (BRTNODE node, int childnum, struct kv_pair const * const upper_bound_inclusive) {
if (childnum+1 == node->n_children)
return upper_bound_inclusive;
else {
return node->childkeys[childnum];
}
}
static struct pivot_bounds next_pivot_keys (BRTNODE node, int childnum, struct pivot_bounds const * const old_pb) {
struct pivot_bounds pb = {.lower_bound_exclusive = prepivotkey(node, childnum, old_pb->lower_bound_exclusive),
.upper_bound_inclusive = postpivotkey(node, childnum, old_pb->upper_bound_inclusive)};
return pb;
}
static inline void
toku_verify_estimates (BRT t, BRTNODE node) {
toku_verify_estimates (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds) {
int childnum;
for (childnum=0; childnum<node->n_children; childnum++) {
// we'll just do this estimate
......@@ -426,27 +446,14 @@ toku_verify_estimates (BRT t, BRTNODE node) {
// can only check the state of available partitions
if (BP_STATE(node, childnum) == PT_AVAIL) {
if (node->height > 0) {
struct ancestors next_ancestors = {node, childnum, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnum, bounds);
BLOCKNUM childblocknum = BP_BLOCKNUM(node, childnum);
u_int32_t fullhash = compute_child_fullhash(t->cf, node, childnum);
void *childnode_v;
BRTNODE childnode;
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin(
t->cf,
childblocknum,
fullhash,
&childnode_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
t->h
);
assert_zero(r);
BRTNODE childnode = childnode_v;
toku_pin_brtnode_holding_lock(t, childblocknum, fullhash, &next_ancestors, &next_bounds, &bfe, &childnode);
for (int i=0; i<childnode->n_children; i++) {
child_estimate += BP_SUBTREE_EST(childnode, i).ndata;
}
......@@ -993,27 +1000,6 @@ init_childkey(BRTNODE node, int childnum, struct kv_pair *pivotkey, size_t pivot
node->totalchildkeylens += pivotkeysize;
}
static struct kv_pair const *prepivotkey (BRTNODE node, int childnum, struct kv_pair const * const lower_bound_exclusive) {
if (childnum==0)
return lower_bound_exclusive;
else {
return node->childkeys[childnum-1];
}
}
static struct kv_pair const *postpivotkey (BRTNODE node, int childnum, struct kv_pair const * const upper_bound_inclusive) {
if (childnum+1 == node->n_children)
return upper_bound_inclusive;
else {
return node->childkeys[childnum];
}
}
static struct pivot_bounds next_pivot_keys (BRTNODE node, int childnum, struct pivot_bounds const * const old_pb) {
struct pivot_bounds pb = {.lower_bound_exclusive = prepivotkey(node, childnum, old_pb->lower_bound_exclusive),
.upper_bound_inclusive = postpivotkey(node, childnum, old_pb->upper_bound_inclusive)};
return pb;
}
// Used only by test programs: append a child node to a parent node
void
toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivotkey, size_t pivotkeysize) {
......@@ -1446,7 +1432,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
}
static void
brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react)
brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react, ANCESTORS ancestors, struct pivot_bounds const * const bounds)
{
if (0) {
printf("%s:%d Node %" PRId64 "->u.n.n_children=%d estimates=", __FILE__, __LINE__, node->thisnodename.b, node->n_children);
......@@ -1458,24 +1444,16 @@ brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react)
BRTNODE child;
assert(BNC_NBYTESINBUF(node, childnum)==0); // require that the buffer for this child is empty
{
void *childnode_v;
// For now, don't use toku_pin_brtnode since we aren't yet prepared to deal with the TRY_AGAIN, and we don't have to apply all the messages above to do this split operation.
struct ancestors next_ancestors = {node, childnum, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnum, bounds);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin(t->cf,
BP_BLOCKNUM(node, childnum),
compute_child_fullhash(t->cf, node, childnum),
&childnode_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
t->h);
assert(r==0);
child = childnode_v;
toku_pin_brtnode_holding_lock(t,
BP_BLOCKNUM(node, childnum),
compute_child_fullhash(t->cf, node, childnum),
&next_ancestors, &next_bounds, &bfe,
&child);
assert(child->thisnodename.b!=0);
VERIFY_NODE(t,child);
}
......@@ -2417,47 +2395,20 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react,
BRTNODE childa, childb;
{
void *childnode_v;
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnuma);
struct ancestors next_ancestors = {node, childnuma, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnuma, bounds);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin(
t->cf,
BP_BLOCKNUM(node, childnuma),
childfullhash,
&childnode_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
t->h
);
assert(r==0);
childa = childnode_v;
toku_pin_brtnode_holding_lock(t, BP_BLOCKNUM(node, childnuma), childfullhash, &next_ancestors, &next_bounds, &bfe, &childa);
}
{
void *childnode_v;
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnumb);
struct ancestors next_ancestors = {node, childnumb, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnumb, bounds);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin(
t->cf,
BP_BLOCKNUM(node, childnumb),
childfullhash, &childnode_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
t->h
);
assert(r==0);
childb = childnode_v;
toku_pin_brtnode_holding_lock(t, BP_BLOCKNUM(node, childnumb), childfullhash, &next_ancestors, &next_bounds, &bfe, &childb);
}
// now we have both children pinned in main memory.
......@@ -2521,7 +2472,7 @@ brt_handle_maybe_reactive_child(BRT t, BRTNODE node, int childnum, enum reactivi
*did_react = FALSE;
return;
case RE_FISSIBLE:
brt_split_child(t, node, childnum, did_react);
brt_split_child(t, node, childnum, did_react, ancestors, bounds);
return;
case RE_FUSIBLE:
brt_merge_child(t, node, childnum, did_react, ancestors, bounds);
......@@ -2990,7 +2941,7 @@ static void apply_cmd_to_in_memory_non_root_leaves (
)
{
void *node_v;
int r = toku_cachetable_get_and_pin_if_in_memory(t->cf, nodenum, fullhash, &node_v);
int r = toku_cachetable_get_and_pin_if_in_memory(t->cf, nodenum, fullhash, &node_v); // this one doesn't need to use the toku_pin_brtnode function because it doesn't bring anything in, so it cannot create a non-up-to-date leaf node.
if (r) { goto exit; }
BRTNODE node = node_v;
......@@ -5056,7 +5007,6 @@ maybe_flush_pinned_node(BRT t, BRTNODE node, int childnum, BRTNODE child) {
FIFO fifo = BNC_BUFFER(node,childnum);
if (child->height==0) {
// The child is a leaf node.
assert_leaf_up_to_date(child);
bytevec key, val;
ITEMLEN keylen, vallen;
u_int32_t type;
......@@ -5077,6 +5027,8 @@ maybe_flush_pinned_node(BRT t, BRTNODE node, int childnum, BRTNODE child) {
node->dirty=TRUE;
child->dirty=TRUE;
fixup_child_estimates(node, childnum, child, TRUE);
for (int i=0; i<child->n_children; i++)
BLB_SOFTCOPYISUPTODATE(child,i)=true;
} else {
bytevec key,val;
ITEMLEN keylen, vallen;
......@@ -6080,30 +6032,17 @@ toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename,
u_int32_t fullhash, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater) {
u_int32_t fullhash, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater,
ANCESTORS ancestors, struct pivot_bounds const * const bounds) {
BRTNODE node;
{
void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, nodename));
struct brtnode_fetch_extra bfe;
// TODO: (Zardosht) change this
fill_bfe_for_full_read(&bfe, brt->h);
int rr = toku_cachetable_get_and_pin(
brt->cf,
nodename,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
brt->h
);
assert_zero(rr);
node = node_v;
toku_pin_brtnode_holding_lock(brt, nodename, fullhash,
ancestors, bounds, &bfe,
&node);
assert(node->fullhash==fullhash);
}
int n_keys = node->n_children-1;
......@@ -6130,8 +6069,11 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename,
} else {
// nextcomp>=0 and prevcomp<=0, so something in the subtree could match
// but they are not both zero, so it's not the whole subtree, so we need to recurse
struct ancestors next_ancestors = {node, i, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, i, bounds);
if (node->height > 0) {
toku_brt_keyrange_internal(brt, BP_BLOCKNUM(node, i), compute_child_fullhash(brt->cf, node, i), key, less, equal, greater);
toku_brt_keyrange_internal(brt, BP_BLOCKNUM(node, i), compute_child_fullhash(brt->cf, node, i), key, less, equal, greater,
&next_ancestors, &next_bounds);
}
else {
struct cmd_leafval_heaviside_extra be = {brt, key};
......@@ -6155,7 +6097,8 @@ int toku_brt_keyrange (BRT brt, DBT *key, u_int64_t *less, u_int64_t *equal, u
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
*less = *equal = *greater = 0;
toku_brt_keyrange_internal (brt, *rootp, fullhash, key, less, equal, greater);
toku_brt_keyrange_internal (brt, *rootp, fullhash, key, less, equal, greater,
(ANCESTORS)NULL, &infinite_bounds);
return 0;
}
......@@ -6173,25 +6116,10 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) {
u_int32_t fullhash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
CACHEKEY root = *rootp;
void *node_v;
struct brtnode_fetch_extra bfe;
fill_bfe_for_min_read(&bfe, brt->h);
int r = toku_cachetable_get_and_pin(
brt->cf,
root,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
brt->h
);
if (r!=0) return r;
BRTNODE node = node_v;
BRTNODE node;
toku_pin_brtnode_holding_lock(brt, root, fullhash, (ANCESTORS)NULL, &infinite_bounds, &bfe, &node);
s->nkeys = s->ndata = s->dsize = 0;
int i;
......@@ -6202,7 +6130,7 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) {
s->dsize += se->dsize;
}
r = toku_cachetable_unpin(brt->cf, root, fullhash, CACHETABLE_CLEAN, 0);
int r = toku_cachetable_unpin(brt->cf, root, fullhash, CACHETABLE_CLEAN, 0);
if (r!=0) return r;
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment