Commit 12b0208d authored by Yoni Fogel's avatar Yoni Fogel

[t:4875] Removed cruft from ft (dictionary_opened). Moved options in...

[t:4875] Removed cruft from ft (dictionary_opened).  Moved options in ft_handle to an options struct

git-svn-id: file:///svn/toku/tokudb@43769 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8aa19fbf
......@@ -350,7 +350,6 @@ struct ft {
// counter is effectively a boolean which alternates with each checkpoint.
LSN checkpoint_lsn; // LSN of creation of "checkpoint-begin" record in log.
int dirty;
BOOL dictionary_opened; // True once this header has been associated with a dictionary (a brt fully opened)
DICTIONARY_ID dict_id; // unique id for dictionary
int panic; // If nonzero there was a write error. Don't write any more, because it probably only gets worse. This is the error code.
char *panic_string; // A malloced string that can indicate what went wrong.
......@@ -409,21 +408,25 @@ static inline void setup_fake_db (DB *fake_db, DESCRIPTOR orig_desc) {
}
#define FAKE_DB(db, desc) struct __toku_db db; setup_fake_db(&db, (desc))
struct ft_handle {
// The header is shared. It is also ephemeral.
FT h;
struct ft_options {
unsigned int nodesize;
unsigned int basementnodesize;
enum toku_compression_method compression_method;
unsigned int flags;
BOOL did_set_flags;
ft_compare_func compare_fun;
ft_update_func update_fun;
on_redirect_callback redirect_callback;
void* redirect_callback_extra;
};
struct ft_handle {
// The header is shared. It is also ephemeral.
FT h;
on_redirect_callback redirect_callback;
void *redirect_callback_extra;
struct toku_list live_ft_handle_link;
BOOL did_set_flags;
struct ft_options options;
};
// FIXME needs toku prefix
......
......@@ -3062,20 +3062,20 @@ static int ft_open_file(const char *fname, int *fdp) {
int
toku_ft_set_compression_method(FT_HANDLE t, enum toku_compression_method method)
{
t->compression_method = method;
t->options.compression_method = method;
return 0;
}
int
toku_ft_get_compression_method(FT_HANDLE t, enum toku_compression_method *methodp)
{
*methodp = t->compression_method;
*methodp = t->options.compression_method;
return 0;
}
static int
verify_builtin_comparisons_consistent(FT_HANDLE t, u_int32_t flags) {
if ((flags & TOKU_DB_KEYCMP_BUILTIN) && (t->compare_fun != toku_builtin_compare_fun))
if ((flags & TOKU_DB_KEYCMP_BUILTIN) && (t->options.compare_fun != toku_builtin_compare_fun))
return EINVAL;
return 0;
}
......@@ -3141,6 +3141,20 @@ toku_ft_change_descriptor(
return r;
}
static void
toku_ft_handle_inherit_options(FT_HANDLE t, FT ft) {
struct ft_options options = {
.nodesize = ft->nodesize,
.basementnodesize = ft->basementnodesize,
.compression_method = ft->compression_method,
.flags = ft->flags,
.compare_fun = ft->compare_fun,
.update_fun = ft->update_fun
};
t->options = options;
t->did_set_flags = TRUE;
}
// This is the actual open, used for various purposes, such as normal use, recovery, and redirect.
// fname_in_env is the iname, relative to the env_dir (data_dir is already in iname as prefix).
// The checkpointed version (checkpoint_lsn) of the dictionary must be no later than max_acceptable_lsn .
......@@ -3150,10 +3164,11 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
BOOL txn_created = FALSE;
char *fname_in_cwd = NULL;
CACHEFILE cf = NULL;
FT ft = NULL;
BOOL did_create = FALSE;
if (t->did_set_flags) {
r = verify_builtin_comparisons_consistent(t, t->flags);
r = verify_builtin_comparisons_consistent(t, t->options.flags);
if (r!=0) { goto exit; }
}
if (txn && txn->logger->is_panicked) {
......@@ -3179,7 +3194,7 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
assert_zero(r);
}
txn_created = (BOOL)(txn!=NULL);
r = toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, mode, t->flags, t->nodesize, t->basementnodesize, t->compression_method);
r = toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, mode, t->options.flags, t->options.nodesize, t->options.basementnodesize, t->options.compression_method);
assert_zero(r); // only possible failure is panic, which we check above
r = ft_create_file(t, fname_in_cwd, &fd);
assert_zero(r);
......@@ -3188,12 +3203,12 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum);
if (r) { goto exit; }
}
assert(t->nodesize>0);
assert(t->options.nodesize>0);
BOOL was_already_open;
if (is_create) {
r = toku_read_ft_and_store_in_cachefile(t, cf, max_acceptable_lsn, &t->h, &was_already_open);
r = toku_read_ft_and_store_in_cachefile(t, cf, max_acceptable_lsn, &ft, &was_already_open);
if (r==TOKUDB_DICTIONARY_NO_HEADER) {
r = toku_create_new_ft(t, cf, txn);
r = toku_create_new_ft(&ft, &t->options, cf, txn);
if (r) { goto exit; }
}
else if (r!=0) {
......@@ -3208,27 +3223,21 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
// so it is ok for toku_read_ft_and_store_in_cachefile to have read
// the header via toku_read_ft_and_store_in_cachefile
} else {
r = toku_read_ft_and_store_in_cachefile(t, cf, max_acceptable_lsn, &t->h, &was_already_open);
r = toku_read_ft_and_store_in_cachefile(t, cf, max_acceptable_lsn, &ft, &was_already_open);
if (r) { goto exit; }
}
t->nodesize = t->h->nodesize; /* inherit the pagesize from the file */
t->basementnodesize = t->h->basementnodesize;
t->compression_method = t->h->compression_method;
if (!t->did_set_flags) {
r = verify_builtin_comparisons_consistent(t, t->flags);
r = verify_builtin_comparisons_consistent(t, t->options.flags);
if (r) { goto exit; }
t->flags = t->h->flags;
t->did_set_flags = TRUE;
} else {
if (t->flags != t->h->flags) { /* if flags have been set then flags must match */
r = EINVAL;
goto exit;
}
} else if (t->options.flags != ft->flags) { /* if flags have been set then flags must match */
r = EINVAL;
goto exit;
}
toku_ft_handle_inherit_options(t, ft);
if (!was_already_open) {
if (!did_create) { //Only log the fopen that OPENs the file. If it was already open, don't log.
r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(cf), t->flags);
r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(cf), t->options.flags);
assert_zero(r);
}
}
......@@ -3241,36 +3250,36 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
else {
dict_id = next_dict_id();
}
t->h->dict_id = dict_id;
ft->dict_id = dict_id;
}
else {
// dict_id is already in header
if (use_reserved_dict_id) {
assert(t->h->dict_id.dictid == use_dictionary_id.dictid);
assert(ft->dict_id.dictid == use_dictionary_id.dictid);
}
}
assert(t->h);
assert(t->h->dict_id.dictid != DICTIONARY_ID_NONE.dictid);
assert(t->h->dict_id.dictid < dict_id_serial);
assert(ft);
assert(ft->dict_id.dictid != DICTIONARY_ID_NONE.dictid);
assert(ft->dict_id.dictid < dict_id_serial);
// important note here,
// after this point, where we associate the header
// with the brt, the function is not allowed to fail
// Code that handles failure (located below "exit"),
// depends on this
toku_ft_note_ft_handle_open(t);
toku_ft_note_ft_handle_open(ft, t);
if (txn_created) {
assert(txn);
toku_ft_suppress_rollbacks(t->h, txn);
r = toku_txn_note_ft(txn, t->h);
toku_ft_suppress_rollbacks(ft, txn);
r = toku_txn_note_ft(txn, ft);
assert_zero(r);
}
//Opening a brt may restore to previous checkpoint. Truncate if necessary.
{
int fd = toku_cachefile_get_and_pin_fd (t->h->cf);
toku_maybe_truncate_cachefile_on_open(t->h->blocktable, fd, t->h);
toku_cachefile_unpin_fd(t->h->cf);
int fd = toku_cachefile_get_and_pin_fd (ft->cf);
toku_maybe_truncate_cachefile_on_open(ft->blocktable, fd, ft);
toku_cachefile_unpin_fd(ft->cf);
}
r = 0;
......@@ -3279,17 +3288,17 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
toku_free(fname_in_cwd);
}
if (r != 0 && cf) {
if (t->h) {
if (ft) {
// we only call toku_ft_note_ft_handle_open
// when the function succeeds, so if we are here,
// then that means we have a reference to the header
// but we have not linked it to this brt. So,
// we can simply try to remove the header.
// We don't need to unlink this brt from the header
if (!toku_ft_needed(t->h)) {
if (!toku_ft_needed(ft)) {
//Close immediately.
char *error_string = NULL;
r = toku_remove_ft(t->h, &error_string, false, ZERO_LSN);
r = toku_remove_ft(ft, &error_string, false, ZERO_LSN);
lazy_assert_zero(r);
}
}
......@@ -3357,24 +3366,24 @@ toku_ft_get_dictionary_id(FT_HANDLE brt) {
int toku_ft_set_flags(FT_HANDLE brt, unsigned int flags) {
assert(flags==(flags&TOKU_DB_KEYCMP_BUILTIN)); // make sure there are no extraneous flags
brt->did_set_flags = TRUE;
brt->flags = flags;
brt->options.flags = flags;
return 0;
}
int toku_ft_get_flags(FT_HANDLE brt, unsigned int *flags) {
*flags = brt->flags;
assert(brt->flags==(brt->flags&TOKU_DB_KEYCMP_BUILTIN)); // make sure there are no extraneous flags
*flags = brt->options.flags;
assert(brt->options.flags==(brt->options.flags&TOKU_DB_KEYCMP_BUILTIN)); // make sure there are no extraneous flags
return 0;
}
int toku_ft_set_nodesize(FT_HANDLE brt, unsigned int nodesize) {
brt->nodesize = nodesize;
brt->options.nodesize = nodesize;
return 0;
}
int toku_ft_get_nodesize(FT_HANDLE brt, unsigned int *nodesize) {
*nodesize = brt->nodesize;
*nodesize = brt->options.nodesize;
return 0;
}
......@@ -3386,17 +3395,17 @@ void toku_ft_get_maximum_advised_key_value_lengths (unsigned int *max_key_len, u
}
int toku_ft_set_basementnodesize(FT_HANDLE brt, unsigned int basementnodesize) {
brt->basementnodesize = basementnodesize;
brt->options.basementnodesize = basementnodesize;
return 0;
}
int toku_ft_get_basementnodesize(FT_HANDLE brt, unsigned int *basementnodesize) {
*basementnodesize = brt->basementnodesize;
*basementnodesize = brt->options.basementnodesize;
return 0;
}
int toku_ft_set_bt_compare(FT_HANDLE brt, int (*bt_compare)(DB*, const DBT*, const DBT*)) {
brt->compare_fun = bt_compare;
brt->options.compare_fun = bt_compare;
return 0;
}
......@@ -3407,12 +3416,12 @@ void toku_ft_set_redirect_callback(FT_HANDLE brt, on_redirect_callback redir_cb,
int toku_ft_set_update(FT_HANDLE brt, ft_update_func update_fun) {
brt->update_fun = update_fun;
brt->options.update_fun = update_fun;
return 0;
}
ft_compare_func toku_ft_get_bt_compare (FT_HANDLE brt) {
return brt->compare_fun;
return brt->options.compare_fun;
}
int
......@@ -3455,13 +3464,13 @@ int toku_ft_handle_create(FT_HANDLE *ft_handle_ptr) {
return ENOMEM;
memset(brt, 0, sizeof *brt);
toku_list_init(&brt->live_ft_handle_link);
brt->flags = 0;
brt->options.flags = 0;
brt->did_set_flags = FALSE;
brt->nodesize = FT_DEFAULT_NODE_SIZE;
brt->basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
brt->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
brt->compare_fun = toku_builtin_compare_fun;
brt->update_fun = NULL;
brt->options.nodesize = FT_DEFAULT_NODE_SIZE;
brt->options.basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
brt->options.compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
brt->options.compare_fun = toku_builtin_compare_fun;
brt->options.update_fun = NULL;
*ft_handle_ptr = brt;
return 0;
}
......@@ -3729,7 +3738,7 @@ copy_to_stale(OMTVALUE v, u_int32_t UU(idx), void *extrap)
entry->is_fresh = false;
DBT keydbt;
DBT *key = fill_dbt_for_fifo_entry(&keydbt, entry);
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->ft_handle->h->cmp_descriptor, .cmp = extra->ft_handle->compare_fun, .fifo = extra->bnc->buffer, .key = key, .msn = entry->msn };
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->ft_handle->h->cmp_descriptor, .cmp = extra->ft_handle->h->compare_fun, .fifo = extra->bnc->buffer, .key = key, .msn = entry->msn };
int r = toku_omt_insert(extra->bnc->stale_message_tree, (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &heaviside_extra, NULL);
assert_zero(r);
return r;
......@@ -3798,8 +3807,8 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, con
DBT hv;
FT_MSG_S ftcmd = { type, msn, xids, .u.id = { &hk, toku_fill_dbt(&hv, val, vallen) } };
toku_ft_bn_apply_cmd(
t->compare_fun,
t->update_fun,
t->h->compare_fun,
t->h->update_fun,
&t->h->cmp_descriptor,
bn,
&ftcmd,
......@@ -3956,13 +3965,13 @@ bnc_apply_messages_to_basement_node(
STAT64INFO_S stats_delta = {0,0};
u_int32_t stale_lbi, stale_ube;
if (!bn->stale_ancestor_messages_applied) {
find_bounds_within_message_tree(&t->h->cmp_descriptor, t->compare_fun, bnc->stale_message_tree, bnc->buffer, bounds, &stale_lbi, &stale_ube);
find_bounds_within_message_tree(&t->h->cmp_descriptor, t->h->compare_fun, bnc->stale_message_tree, bnc->buffer, bounds, &stale_lbi, &stale_ube);
} else {
stale_lbi = 0;
stale_ube = 0;
}
u_int32_t fresh_lbi, fresh_ube;
find_bounds_within_message_tree(&t->h->cmp_descriptor, t->compare_fun, bnc->fresh_message_tree, bnc->buffer, bounds, &fresh_lbi, &fresh_ube);
find_bounds_within_message_tree(&t->h->cmp_descriptor, t->h->compare_fun, bnc->fresh_message_tree, bnc->buffer, bounds, &fresh_lbi, &fresh_ube);
// We now know where all the messages we must apply are, so one of the
// following 4 cases will do the application, depending on which of
......@@ -4033,7 +4042,7 @@ bnc_apply_messages_to_basement_node(
assert_zero(r);
// This comparison extra struct won't change during iteration.
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc= &t->h->cmp_descriptor, .cmp = t->compare_fun, .fifo = bnc->buffer };
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc= &t->h->cmp_descriptor, .cmp = t->h->compare_fun, .fifo = bnc->buffer };
// Iterate over both lists, applying the smaller (in (key, msn)
// order) message at each step
......@@ -4826,7 +4835,7 @@ ft_cursor_search(FT_CURSOR cursor, ft_search_t *search, FT_GET_CALLBACK_FUNCTION
static inline int compare_k_x(FT_HANDLE brt, const DBT *k, const DBT *x) {
FAKE_DB(db, &brt->h->cmp_descriptor);
return brt->compare_fun(&db, k, x);
return brt->h->compare_fun(&db, k, x);
}
static int
......@@ -5172,7 +5181,7 @@ keyrange_compare (OMTVALUE lev, void *extra) {
struct keyrange_compare_s *s = extra;
// TODO: maybe put a const fake_db in the header
FAKE_DB(db, &s->ft_handle->h->cmp_descriptor);
return s->ft_handle->compare_fun(&db, &omt_dbt, s->key);
return s->ft_handle->h->compare_fun(&db, &omt_dbt, s->key);
}
static void
......@@ -5217,7 +5226,7 @@ toku_ft_keyrange_internal (FT_HANDLE brt, FTNODE node,
{
int r = 0;
// if KEY is NULL then use the leftmost key.
int child_number = key ? toku_ftnode_which_child (node, key, &brt->h->cmp_descriptor, brt->compare_fun) : 0;
int child_number = key ? toku_ftnode_which_child (node, key, &brt->h->cmp_descriptor, brt->h->compare_fun) : 0;
uint64_t rows_per_child = estimated_num_rows / node->n_children;
if (node->height == 0) {
......
......@@ -203,7 +203,7 @@ int toku_testsetup_insert_to_nonleaf (FT_HANDLE brt, BLOCKNUM blocknum, enum ft_
DBT k;
int childnum = toku_ftnode_which_child(node,
toku_fill_dbt(&k, key, keylen),
&brt->h->cmp_descriptor, brt->compare_fun);
&brt->h->cmp_descriptor, brt->h->compare_fun);
XIDS xids_0 = xids_get_root_xids();
MSN msn = next_dummymsn();
......
......@@ -19,7 +19,7 @@
static int
compare_pairs (FT_HANDLE brt, const DBT *a, const DBT *b) {
FAKE_DB(db, &brt->h->cmp_descriptor);
int cmp = brt->compare_fun(&db, a, b);
int cmp = brt->h->compare_fun(&db, a, b);
return cmp;
}
......@@ -27,7 +27,7 @@ static int
compare_leafentries (FT_HANDLE brt, LEAFENTRY a, LEAFENTRY b) {
DBT x,y;
FAKE_DB(db, &brt->h->cmp_descriptor);
int cmp = brt->compare_fun(&db,
int cmp = brt->h->compare_fun(&db,
toku_fill_dbt(&x, le_key(a), le_keylen(a)),
toku_fill_dbt(&y, le_key(b), le_keylen(b)));
return cmp;
......@@ -37,7 +37,7 @@ static int
compare_pair_to_leafentry (FT_HANDLE brt, const DBT *a, LEAFENTRY b) {
DBT y;
FAKE_DB(db, &brt->h->cmp_descriptor);
int cmp = brt->compare_fun(&db, a, toku_fill_dbt(&y, le_key(b), le_keylen(b)));
int cmp = brt->h->compare_fun(&db, a, toku_fill_dbt(&y, le_key(b), le_keylen(b)));
return cmp;
}
......@@ -45,7 +45,7 @@ static int
compare_pair_to_key (FT_HANDLE brt, const DBT *a, bytevec key, ITEMLEN keylen) {
DBT y;
FAKE_DB(db, &brt->h->cmp_descriptor);
int cmp = brt->compare_fun(&db, a, toku_fill_dbt(&y, key, keylen));
int cmp = brt->h->compare_fun(&db, a, toku_fill_dbt(&y, key, keylen));
return cmp;
}
......@@ -171,7 +171,7 @@ verify_sorted_by_key_msn(FT_HANDLE brt, FIFO fifo, OMT mt) {
assert_zero(r);
size_t offset = (size_t) v;
if (i > 0) {
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc = &brt->h->cmp_descriptor, .cmp = brt->compare_fun, .fifo = fifo };
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc = &brt->h->cmp_descriptor, .cmp = brt->h->compare_fun, .fifo = fifo };
if (toku_fifo_entry_key_msn_cmp(&extra, &last_offset, &offset) >= 0) {
result = TOKUDB_NEEDS_REPAIR;
break;
......@@ -185,7 +185,7 @@ verify_sorted_by_key_msn(FT_HANDLE brt, FIFO fifo, OMT mt) {
static int
count_eq_key_msn(FT_HANDLE brt, FIFO fifo, OMT mt, const DBT *key, MSN msn) {
struct toku_fifo_entry_key_msn_heaviside_extra extra = {
.desc = &brt->h->cmp_descriptor, .cmp = brt->compare_fun, .fifo = fifo, .key = key, .msn = msn
.desc = &brt->h->cmp_descriptor, .cmp = brt->h->compare_fun, .fifo = fifo, .key = key, .msn = msn
};
OMTVALUE v; u_int32_t idx;
int r = toku_omt_find_zero(mt, toku_fifo_entry_key_msn_heaviside, &extra, &v, &idx);
......@@ -311,7 +311,7 @@ toku_verify_ftnode (FT_HANDLE brt,
}
struct count_msgs_extra extra = { .count = 0, .key = &keydbt,
.msn = msn, .fifo = bnc->buffer,
.desc = &brt->h->cmp_descriptor, .cmp = brt->compare_fun };
.desc = &brt->h->cmp_descriptor, .cmp = brt->h->compare_fun };
extra.count = 0;
toku_omt_iterate(bnc->broadcast_list, count_msgs, &extra);
if (ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)) {
......
......@@ -254,7 +254,7 @@ ft_close (CACHEFILE cachefile, int fd, void *header_v, char **malloced_error_str
int r = 0;
if (h->panic) {
r = h->panic;
} else if (h->dictionary_opened) { //Otherwise header has never fully been created.
} else {
assert(h->cf == cachefile);
TOKULOGGER logger = toku_cachefile_logger(cachefile);
LSN lsn = ZERO_LSN;
......@@ -357,33 +357,40 @@ static int setup_initial_ft_root_node (FT h, BLOCKNUM blocknum) {
return r;
}
// TODO: (Zardosht) move this functionality to ft_init
// No need in having ft_init call this function
static int
ft_init_partial (FT_HANDLE t, CACHEFILE cf, TOKUTXN txn) {
int r;
t->h->flags = t->flags;
if (t->h->cf!=NULL) assert(t->h->cf == cf);
t->h->cf = cf;
t->h->nodesize = t->nodesize;
t->h->basementnodesize = t->basementnodesize;
t->h->compression_method = t->compression_method;
t->h->root_xid_that_created = txn ? txn->ancestor_txnid64 : TXNID_NONE;
t->h->compare_fun = t->compare_fun;
t->h->update_fun = t->update_fun;
t->h->in_memory_stats = ZEROSTATS;
t->h->on_disk_stats = ZEROSTATS;
t->h->checkpoint_staging_stats = ZEROSTATS;
t->h->highest_unused_msn_for_upgrade.msn = MIN_MSN.msn - 1;
BLOCKNUM root = t->h->root_blocknum;
r = setup_initial_ft_root_node(t->h, root);
ft_init (FT ft, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
ft->type = FT_CURRENT;
ft->checkpoint_header = NULL;
toku_ft_init_treelock(ft);
toku_blocktable_create_new(&ft->blocktable);
//Assign blocknum for root block, also dirty the header
toku_allocate_blocknum(ft->blocktable, &ft->root_blocknum, ft);
toku_list_init(&ft->live_ft_handles);
int r = toku_omt_create(&ft->txns);
assert_zero(r);
ft->flags = options->flags;
ft->nodesize = options->nodesize;
ft->basementnodesize = options->basementnodesize;
ft->compression_method = options->compression_method;
ft->compare_fun = options->compare_fun;
ft->update_fun = options->update_fun;
if (ft->cf!=NULL) assert(ft->cf == cf);
ft->cf = cf;
ft->root_xid_that_created = txn ? txn->ancestor_txnid64 : TXNID_NONE;
ft->in_memory_stats = ZEROSTATS;
ft->on_disk_stats = ZEROSTATS;
ft->checkpoint_staging_stats = ZEROSTATS;
ft->highest_unused_msn_for_upgrade.msn = MIN_MSN.msn - 1;
r = setup_initial_ft_root_node(ft, ft->root_blocknum);
if (r != 0) {
goto exit;
}
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
toku_cachefile_set_userdata(t->h->cf,
t->h,
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, ft, 0);
toku_cachefile_set_userdata(ft->cf,
ft,
ft_log_fassociate_during_checkpoint,
ft_log_suppress_rollback_during_checkpoint,
ft_close,
......@@ -392,67 +399,51 @@ ft_init_partial (FT_HANDLE t, CACHEFILE cf, TOKUTXN txn) {
ft_end_checkpoint,
ft_note_pin_by_checkpoint,
ft_note_unpin_by_checkpoint);
exit:
return r;
}
static int
ft_init (FT_HANDLE t, CACHEFILE cf, TOKUTXN txn) {
t->h->type = FT_CURRENT;
t->h->checkpoint_header = NULL;
toku_ft_init_treelock(t->h);
toku_blocktable_create_new(&t->h->blocktable);
BLOCKNUM root;
//Assign blocknum for root block, also dirty the header
toku_allocate_blocknum(t->h->blocktable, &root, t->h);
t->h->root_blocknum = root;
toku_list_init(&t->h->live_ft_handles);
int r = toku_omt_create(&t->h->txns);
assert_zero(r);
r = ft_init_partial(t, cf, txn);
if (r==0) toku_block_verify_no_free_blocknums(t->h->blocktable);
toku_block_verify_no_free_blocknums(ft->blocktable);
r = 0;
exit:
return r;
}
// allocate and initialize a brt header.
// t->h->cf is not set to anything.
// TODO: (Zardosht) make this function return a header and set
// it to t->h in the caller
int
toku_create_new_ft(FT_HANDLE t, CACHEFILE cf, TOKUTXN txn) {
toku_create_new_ft(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
int r;
invariant(ftp);
FT XCALLOC(ft);
assert (!t->h);
XCALLOC(t->h);
t->h->layout_version = FT_LAYOUT_VERSION;
t->h->layout_version_original = FT_LAYOUT_VERSION;
t->h->layout_version_read_from_disk = FT_LAYOUT_VERSION; // fake, prevent unnecessary upgrade logic
ft->layout_version = FT_LAYOUT_VERSION;
ft->layout_version_original = FT_LAYOUT_VERSION;
ft->layout_version_read_from_disk = FT_LAYOUT_VERSION; // fake, prevent unnecessary upgrade logic
t->h->build_id = BUILD_ID;
t->h->build_id_original = BUILD_ID;
ft->build_id = BUILD_ID;
ft->build_id_original = BUILD_ID;
uint64_t now = (uint64_t) time(NULL);
t->h->time_of_creation = now;
t->h->time_of_last_modification = now;
t->h->time_of_last_verification = 0;
ft->time_of_creation = now;
ft->time_of_last_modification = now;
ft->time_of_last_verification = 0;
memset(&t->h->descriptor, 0, sizeof(t->h->descriptor));
memset(&t->h->cmp_descriptor, 0, sizeof(t->h->cmp_descriptor));
memset(&ft->descriptor, 0, sizeof(ft->descriptor));
memset(&ft->cmp_descriptor, 0, sizeof(ft->cmp_descriptor));
r = ft_init(t, cf, txn);
r = ft_init(ft, options, cf, txn);
if (r != 0) {
goto exit;
}
*ftp = ft;
r = 0;
exit:
if (r != 0) {
if (t->h) {
toku_free(t->h);
t->h = NULL;
if (ft) {
toku_free(ft);
ft = NULL;
}
return r;
}
......@@ -470,8 +461,8 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
if ((h=toku_cachefile_get_userdata(cf))!=0) {
*header = h;
*was_open = TRUE;
assert(brt->update_fun == h->update_fun);
assert(brt->compare_fun == h->compare_fun);
assert(brt->options.update_fun == h->update_fun);
assert(brt->options.compare_fun == h->compare_fun);
return 0;
}
}
......@@ -495,8 +486,8 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
}
if (r!=0) return r;
h->cf = cf;
h->compare_fun = brt->compare_fun;
h->update_fun = brt->update_fun;
h->compare_fun = brt->options.compare_fun;
h->update_fun = brt->options.update_fun;
toku_cachefile_set_userdata(cf,
(void*)h,
ft_log_fassociate_during_checkpoint,
......@@ -512,12 +503,11 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
}
void
toku_ft_note_ft_handle_open(FT_HANDLE live) {
FT h = live->h;
toku_ft_lock(h);
toku_list_push(&h->live_ft_handles, &live->live_ft_handle_link);
h->dictionary_opened = TRUE;
toku_ft_unlock(h);
toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live) {
toku_ft_lock(ft);
live->h = ft;
toku_list_push(&ft->live_ft_handles, &live->live_ft_handle_link);
toku_ft_unlock(ft);
}
int
......@@ -674,18 +664,16 @@ dictionary_redirect_internal(const char *dst_fname_in_env, FT src_h, TOKUTXN txn
// we want to change it to dummy_dst
while (!toku_list_empty(&src_h->live_ft_handles)) {
list = src_h->live_ft_handles.next;
FT_HANDLE src_ft = NULL;
src_ft = toku_list_struct(list, struct ft_handle, live_ft_handle_link);
FT_HANDLE src_handle = NULL;
src_handle = toku_list_struct(list, struct ft_handle, live_ft_handle_link);
toku_ft_lock(src_h);
toku_list_remove(&src_ft->live_ft_handle_link);
toku_list_remove(&src_handle->live_ft_handle_link);
toku_ft_unlock(src_h);
src_ft->h = dst_h;
toku_ft_note_ft_handle_open(src_ft);
if (src_ft->redirect_callback) {
src_ft->redirect_callback(src_ft, src_ft->redirect_callback_extra);
toku_ft_note_ft_handle_open(dst_h, src_handle);
if (src_handle->redirect_callback) {
src_handle->redirect_callback(src_handle, src_handle->redirect_callback_extra);
}
}
assert(dst_h);
......
......@@ -22,11 +22,11 @@ void toku_ft_destroy_treelock(FT h);
void toku_ft_grab_treelock(FT h);
void toku_ft_release_treelock(FT h);
int toku_create_new_ft(FT_HANDLE t, CACHEFILE cf, TOKUTXN txn);
int toku_create_new_ft(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn);
void toku_ft_free (FT h);
int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header, BOOL* was_open);
void toku_ft_note_ft_handle_open(FT_HANDLE live);
void toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live);
int toku_ft_needed(FT h);
int toku_remove_ft (FT h, char **error_string, BOOL oplsn_valid, LSN oplsn) __attribute__ ((warn_unused_result));
......
......@@ -38,6 +38,7 @@ typedef struct ftnode_leaf_basement_node *BASEMENTNODE;
typedef struct ftnode_nonleaf_childinfo *NONLEAF_CHILDINFO;
typedef struct sub_block *SUB_BLOCK;
typedef struct ft *FT;
typedef struct ft_options *FT_OPTIONS;
struct wbuf;
struct dbuf;
......
......@@ -66,7 +66,7 @@ insert_into_child_buffer(FT_HANDLE brt, FTNODE node, int childnum, int minkey, i
unsigned int key = htonl(val);
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
toku_ft_append_to_child_buffer(brt->compare_fun, NULL, node, childnum, FT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
toku_ft_append_to_child_buffer(brt->h->compare_fun, NULL, node, childnum, FT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
node->max_msn_applied_to_node_on_disk = msn;
}
}
......
......@@ -48,7 +48,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, size_t keylen, void *val,
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
u_int64_t workdone=0;
toku_ft_leaf_apply_cmd(brt->compare_fun, brt->update_fun, &brt->h->cmp_descriptor, leafnode, &cmd, &workdone, NULL);
toku_ft_leaf_apply_cmd(brt->h->compare_fun, brt->h->update_fun, &brt->h->cmp_descriptor, leafnode, &cmd, &workdone, NULL);
{
int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0);
......@@ -56,7 +56,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, size_t keylen, void *val,
}
FT_MSG_S badcmd = { FT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_ft_leaf_apply_cmd(brt->compare_fun, brt->update_fun, &brt->h->cmp_descriptor, leafnode, &badcmd, &workdone, NULL);
toku_ft_leaf_apply_cmd(brt->h->compare_fun, brt->h->update_fun, &brt->h->cmp_descriptor, leafnode, &badcmd, &workdone, NULL);
// message should be rejected for duplicate msn, row should still have original val
......@@ -69,7 +69,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, size_t keylen, void *val,
// now verify that message with proper msn gets through
msn = next_dummymsn();
FT_MSG_S cmd2 = { FT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &val2 } };
toku_ft_leaf_apply_cmd(brt->compare_fun, brt->update_fun, &brt->h->cmp_descriptor, leafnode, &cmd2, &workdone, NULL);
toku_ft_leaf_apply_cmd(brt->h->compare_fun, brt->h->update_fun, &brt->h->cmp_descriptor, leafnode, &cmd2, &workdone, NULL);
// message should be accepted, val should have new value
{
......@@ -81,7 +81,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, size_t keylen, void *val,
// now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10;
FT_MSG_S cmd3 = { FT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_ft_leaf_apply_cmd(brt->compare_fun, brt->update_fun, &brt->h->cmp_descriptor, leafnode, &cmd3, &workdone, NULL);
toku_ft_leaf_apply_cmd(brt->h->compare_fun, brt->h->update_fun, &brt->h->cmp_descriptor, leafnode, &cmd3, &workdone, NULL);
// message should be rejected, val should still have value in pair2
{
......
......@@ -126,7 +126,7 @@ insert_random_message_to_bn(FT_HANDLE t, BASEMENTNODE blb, LEAFENTRY *save, XIDS
int64_t numbytes;
int r = apply_msg_to_leafentry(&msg, NULL, &memsize, save, NULL, NULL, NULL, &numbytes);
assert_zero(r);
toku_ft_bn_apply_cmd(t->compare_fun, t->update_fun, NULL, blb, &msg, NULL, NULL);
toku_ft_bn_apply_cmd(t->h->compare_fun, t->h->update_fun, NULL, blb, &msg, NULL, NULL);
if (msn.msn > blb->max_msn_applied.msn) {
blb->max_msn_applied = msn;
}
......@@ -167,11 +167,11 @@ insert_same_message_to_bns(FT_HANDLE t, BASEMENTNODE blb1, BASEMENTNODE blb2, LE
int64_t numbytes;
int r = apply_msg_to_leafentry(&msg, NULL, &memsize, save, NULL, NULL, NULL, &numbytes);
assert_zero(r);
toku_ft_bn_apply_cmd(t->compare_fun, t->update_fun, NULL, blb1, &msg, NULL, NULL);
toku_ft_bn_apply_cmd(t->h->compare_fun, t->h->update_fun, NULL, blb1, &msg, NULL, NULL);
if (msn.msn > blb1->max_msn_applied.msn) {
blb1->max_msn_applied = msn;
}
toku_ft_bn_apply_cmd(t->compare_fun, t->update_fun, NULL, blb2, &msg, NULL, NULL);
toku_ft_bn_apply_cmd(t->h->compare_fun, t->h->update_fun, NULL, blb2, &msg, NULL, NULL);
if (msn.msn > blb2->max_msn_applied.msn) {
blb2->max_msn_applied = msn;
}
......@@ -583,7 +583,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->compare_fun, t->update_fun, &t->h->descriptor, child, parent_messages[i], NULL, NULL);
toku_ft_leaf_apply_cmd(t->h->compare_fun, t->h->update_fun, &t->h->descriptor, child, parent_messages[i], NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
......@@ -807,7 +807,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (dummy_cmp(NULL, parent_messages[i]->u.id.key, &childkeys[7]) <= 0 &&
!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->compare_fun, t->update_fun, &t->h->descriptor, child, parent_messages[i], NULL, NULL);
toku_ft_leaf_apply_cmd(t->h->compare_fun, t->h->update_fun, &t->h->descriptor, child, parent_messages[i], NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
......@@ -1000,8 +1000,8 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->compare_fun, t->update_fun, &t->h->descriptor, child1, parent_messages[i], NULL, NULL);
toku_ft_leaf_apply_cmd(t->compare_fun, t->update_fun, &t->h->descriptor, child2, parent_messages[i], NULL, NULL);
toku_ft_leaf_apply_cmd(t->h->compare_fun, t->h->update_fun, &t->h->descriptor, child1, parent_messages[i], NULL, NULL);
toku_ft_leaf_apply_cmd(t->h->compare_fun, t->h->update_fun, &t->h->descriptor, child2, parent_messages[i], NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
......
......@@ -59,7 +59,7 @@ doit (void) {
assert(r==0);
toku_free(fname);
brt->update_fun = update_func;
brt->h->update_fun = update_func;
brt->h->update_fun = update_func;
toku_testsetup_initialize(); // must precede any other toku_testsetup calls
......
......@@ -59,7 +59,7 @@ doit (BOOL keep_other_bn_in_memory) {
assert(r==0);
toku_free(fname);
brt->update_fun = update_func;
brt->options.update_fun = update_func;
brt->h->update_fun = update_func;
toku_testsetup_initialize(); // must precede any other toku_testsetup calls
......
......@@ -58,7 +58,7 @@ doit (void) {
assert(r==0);
toku_free(fname);
brt->update_fun = update_func;
brt->options.update_fun = update_func;
brt->h->update_fun = update_func;
toku_testsetup_initialize(); // must precede any other toku_testsetup calls
......
......@@ -70,7 +70,7 @@ insert_into_child_buffer(FT_HANDLE brt, FTNODE node, int childnum, int minkey, i
unsigned int key = htonl(val);
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
toku_ft_append_to_child_buffer(brt->compare_fun, NULL, node, childnum, FT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
toku_ft_append_to_child_buffer(brt->h->compare_fun, NULL, node, childnum, FT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
// Create bad tree (don't do following):
// node->max_msn_applied_to_node = msn;
......
......@@ -58,7 +58,7 @@ insert_into_child_buffer(FT_HANDLE brt, FTNODE node, int childnum, int minkey, i
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
MSN msn = next_dummymsn();
toku_ft_append_to_child_buffer(brt->compare_fun, NULL, node, childnum, FT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
toku_ft_append_to_child_buffer(brt->h->compare_fun, NULL, node, childnum, FT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
}
}
......
......@@ -34,7 +34,7 @@ static inline int db_opened(DB *db) {
static inline toku_dbt_cmp
toku_db_get_compare_fun(DB* db) {
return db->i->ft_handle->compare_fun;
return db->i->ft_handle->h->compare_fun;
}
int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment