Commit e59ed34a authored by John Esmet's avatar John Esmet

FT-197 Store cachefiles in OMTs only, remove linked list pointers

parent a8cacc54
......@@ -178,8 +178,6 @@ class pair_list;
// Maps to a file on disk.
//
struct cachefile {
CACHEFILE next;
CACHEFILE prev;
// these next two fields are protected by cachetable's list lock
// they are managed whenever we add or remove a pair from
// the cachetable. As of Riddler, this linked list is only used to
......@@ -439,14 +437,12 @@ class cachefile_list {
bool evict_some_stale_pair(evictor* ev);
void free_stale_data(evictor* ev);
// access to these fields are protected by the lock
CACHEFILE m_active_head; // head of CACHEFILEs that are active
CACHEFILE m_stale_head; // head of CACHEFILEs that are stale
CACHEFILE m_stale_tail; // tail of CACHEFILEs that are stale
FILENUM m_next_filenum_to_use;
uint32_t m_next_hash_id_to_use;
toku_pthread_rwlock_t m_lock; // this field is publoc so we are still POD
toku::omt<CACHEFILE> m_active_filenum;
toku::omt<CACHEFILE> m_active_fileid;
toku::omt<CACHEFILE> m_stale_fileid;
private:
CACHEFILE find_cachefile_in_list_unlocked(CACHEFILE start, struct fileid* fileid);
};
......
......@@ -4414,43 +4414,48 @@ void checkpointer::increment_num_txns() {
m_checkpoint_num_txns++;
}
//
// Update the user data in any cachefiles in our checkpoint list.
//
void checkpointer::update_cachefiles() {
CACHEFILE cf;
for(cf = m_cf_list->m_active_head; cf; cf=cf->next) {
struct iterate_begin_checkpoint {
LSN lsn_of_checkpoint_in_progress;
iterate_begin_checkpoint(LSN lsn) : lsn_of_checkpoint_in_progress(lsn) { }
static int fn(const CACHEFILE &cf, const uint32_t UU(idx), struct iterate_begin_checkpoint *info) {
assert(cf->begin_checkpoint_userdata);
if (cf->for_checkpoint) {
cf->begin_checkpoint_userdata(m_lsn_of_checkpoint_in_progress,
cf->userdata);
cf->begin_checkpoint_userdata(info->lsn_of_checkpoint_in_progress, cf->userdata);
}
return 0;
}
};
//
// Update the user data in any cachefiles in our checkpoint list.
//
void checkpointer::update_cachefiles() {
struct iterate_begin_checkpoint iterate(m_lsn_of_checkpoint_in_progress);
int r = m_cf_list->m_active_fileid.iterate<struct iterate_begin_checkpoint,
iterate_begin_checkpoint::fn>(&iterate);
assert_zero(r);
}
struct iterate_note_pin {
static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
assert(cf->note_pin_by_checkpoint);
cf->note_pin_by_checkpoint(cf, cf->userdata);
cf->for_checkpoint = true;
return 0;
}
};
//
// Sets up and kicks off a checkpoint.
//
void checkpointer::begin_checkpoint() {
// 1. Initialize the accountability counters.
m_checkpoint_num_files = 0;
m_checkpoint_num_txns = 0;
// 2. Make list of cachefiles to be included in the checkpoint.
// TODO: <CER> How do we remove the non-lock cachetable reference here?
m_cf_list->read_lock();
for (CACHEFILE cf = m_cf_list->m_active_head; cf; cf = cf->next) {
// The caller must serialize open, close, and begin checkpoint.
// So we should never see a closing cachefile here.
// <CER> Is there an assert we can add here?
// Putting this check here so that this method may be called
// by cachetable tests.
assert(cf->note_pin_by_checkpoint);
cf->note_pin_by_checkpoint(cf, cf->userdata);
cf->for_checkpoint = true;
m_checkpoint_num_files++;
}
m_cf_list->m_active_fileid.iterate<void *, iterate_note_pin::fn>(nullptr);
m_checkpoint_num_files = m_cf_list->m_active_fileid.size();
m_cf_list->read_unlock();
// 3. Create log entries for this checkpoint.
......@@ -4475,6 +4480,14 @@ void checkpointer::begin_checkpoint() {
m_list->write_pending_exp_unlock();
}
struct iterate_log_fassociate {
static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
assert(cf->log_fassociate_during_checkpoint);
cf->log_fassociate_during_checkpoint(cf, cf->userdata);
return 0;
}
};
//
// Assuming the logger exists, this will write out the folloing
// information to the log.
......@@ -4498,10 +4511,7 @@ void checkpointer::log_begin_checkpoint() {
m_lsn_of_checkpoint_in_progress = begin_lsn;
// Log the list of open dictionaries.
for (CACHEFILE cf = m_cf_list->m_active_head; cf; cf = cf->next) {
assert(cf->log_fassociate_during_checkpoint);
cf->log_fassociate_during_checkpoint(cf, cf->userdata);
}
m_cf_list->m_active_fileid.iterate<void *, iterate_log_fassociate::fn>(nullptr);
// Write open transactions to the log.
r = toku_txn_manager_iter_over_live_txns(
......@@ -4576,17 +4586,29 @@ void checkpointer::end_checkpoint(void (*testcallback_f)(void*), void* testextr
toku_free(checkpoint_cfs);
}
void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) {
m_cf_list->read_lock();
uint32_t curr_index = 0;
for (CACHEFILE cf = m_cf_list->m_active_head; cf; cf = cf->next) {
struct iterate_checkpoint_cfs {
CACHEFILE *checkpoint_cfs;
uint32_t checkpoint_num_files;
uint32_t curr_index;
iterate_checkpoint_cfs(CACHEFILE *cfs, uint32_t num_files) :
checkpoint_cfs(cfs), checkpoint_num_files(num_files), curr_index(0) {
}
static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_checkpoint_cfs *info) {
if (cf->for_checkpoint) {
assert(curr_index < m_checkpoint_num_files);
checkpoint_cfs[curr_index] = cf;
curr_index++;
assert(info->curr_index < info->checkpoint_num_files);
info->checkpoint_cfs[info->curr_index] = cf;
info->curr_index++;
}
return 0;
}
assert(curr_index == m_checkpoint_num_files);
};
void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) {
struct iterate_checkpoint_cfs iterate(checkpoint_cfs, m_checkpoint_num_files);
m_cf_list->read_lock();
m_cf_list->m_active_fileid.iterate<struct iterate_checkpoint_cfs, iterate_checkpoint_cfs::fn>(&iterate);
assert(iterate.curr_index == m_checkpoint_num_files);
m_cf_list->read_unlock();
}
......@@ -4671,19 +4693,18 @@ void checkpointer::remove_cachefiles(CACHEFILE* checkpoint_cfs) {
static_assert(std::is_pod<cachefile_list>::value, "cachefile_list isn't POD");
void cachefile_list::init() {
m_active_head = NULL;
m_stale_head = NULL;
m_stale_tail = NULL;
m_next_filenum_to_use.fileid = 0;
m_next_hash_id_to_use = 0;
toku_pthread_rwlock_init(&m_lock, NULL);
m_active_filenum.create();
m_active_fileid.create();
m_stale_fileid.create();
}
void cachefile_list::destroy() {
m_active_filenum.destroy();
m_active_fileid.destroy();
m_stale_fileid.destroy();
toku_pthread_rwlock_destroy(&m_lock);
}
......@@ -4702,34 +4723,31 @@ void cachefile_list::write_lock() {
void cachefile_list::write_unlock() {
toku_pthread_rwlock_wrunlock(&m_lock);
}
int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf) {
read_lock();
CACHEFILE extant;
int r;
r = ENOENT;
for (extant = m_active_head; extant; extant = extant->next) {
if (extant->fname_in_env &&
!strcmp(extant->fname_in_env, iname_in_env)) {
*cf = extant;
r = 0;
break;
struct iterate_find_iname {
const char *iname_in_env;
CACHEFILE found_cf;
iterate_find_iname(const char *iname) : iname_in_env(iname), found_cf(nullptr) { }
static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_find_iname *info) {
if (cf->fname_in_env && strcmp(cf->fname_in_env, info->iname_in_env) == 0) {
info->found_cf = cf;
return -1;
}
return 0;
}
read_unlock();
return r;
}
};
int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf) {
struct iterate_find_iname iterate(iname_in_env);
int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) {
read_lock();
CACHEFILE extant;
int r = ENOENT;
*cf = NULL;
for (extant = m_active_head; extant; extant = extant->next) {
if (extant->filenum.fileid==filenum.fileid) {
*cf = extant;
int r = m_active_fileid.iterate<iterate_find_iname, iterate_find_iname::fn>(&iterate);
if (iterate.found_cf != nullptr) {
assert(strcmp(iterate.found_cf->fname_in_env, iname_in_env) == 0);
*cf = iterate.found_cf;
r = 0;
break;
}
} else {
r = ENOENT;
}
read_unlock();
return r;
......@@ -4746,20 +4764,23 @@ static int cachefile_find_by_filenum(const CACHEFILE &a_cf, const FILENUM &b) {
}
}
int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) {
read_lock();
int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, cf, nullptr);
if (r == DB_NOTFOUND) {
r = ENOENT;
} else {
invariant_zero(r);
}
read_unlock();
return r;
}
static int cachefile_find_by_fileid(const CACHEFILE &a_cf, const struct fileid &b) {
return toku_fileid_cmp(a_cf->fileid, b);
}
void cachefile_list::add_cf_unlocked(CACHEFILE cf) {
invariant(cf->next == NULL);
invariant(cf->prev == NULL);
cf->next = m_active_head;
cf->prev = NULL;
if (m_active_head) {
m_active_head->prev = cf;
}
m_active_head = cf;
int r;
r = m_active_filenum.insert<FILENUM, cachefile_find_by_filenum>(cf, cf->filenum, nullptr);
assert_zero(r);
......@@ -4769,36 +4790,13 @@ void cachefile_list::add_cf_unlocked(CACHEFILE cf) {
void cachefile_list::add_stale_cf(CACHEFILE cf) {
write_lock();
invariant(cf->next == NULL);
invariant(cf->prev == NULL);
cf->next = m_stale_head;
cf->prev = NULL;
if (m_stale_head) {
m_stale_head->prev = cf;
}
m_stale_head = cf;
if (m_stale_tail == NULL) {
m_stale_tail = cf;
}
int r = m_stale_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr);
assert_zero(r);
write_unlock();
}
void cachefile_list::remove_cf(CACHEFILE cf) {
write_lock();
invariant(m_active_head != NULL);
if (cf->next) {
cf->next->prev = cf->prev;
}
if (cf->prev) {
cf->prev->next = cf->next;
}
if (cf == m_active_head) {
invariant(cf->prev == NULL);
m_active_head = cf->next;
}
cf->prev = NULL;
cf->next = NULL;
uint32_t idx;
int r;
......@@ -4816,24 +4814,12 @@ void cachefile_list::remove_cf(CACHEFILE cf) {
}
void cachefile_list::remove_stale_cf_unlocked(CACHEFILE cf) {
invariant(m_stale_head != NULL);
invariant(m_stale_tail != NULL);
if (cf->next) {
cf->next->prev = cf->prev;
}
if (cf->prev) {
cf->prev->next = cf->next;
}
if (cf == m_stale_head) {
invariant(cf->prev == NULL);
m_stale_head = cf->next;
}
if (cf == m_stale_tail) {
invariant(cf->next == NULL);
m_stale_tail = cf->prev;
}
cf->prev = NULL;
cf->next = NULL;
uint32_t idx;
int r;
r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx);
assert_zero(r);
r = m_stale_fileid.delete_at(idx);
assert_zero(r);
}
FILENUM cachefile_list::reserve_filenum() {
......@@ -4849,11 +4835,6 @@ FILENUM cachefile_list::reserve_filenum() {
break;
}
FILENUM filenum = m_next_filenum_to_use;
#if TOKU_DEBUG_PARANOID
for (CACHEFILE extant = m_active_head; extant; extant = extant->next) {
assert(filenum.fileid != extant->filenum.fileid);
}
#endif
m_next_filenum_to_use.fileid++;
write_unlock();
return filenum;
......@@ -4865,91 +4846,77 @@ uint32_t cachefile_list::get_new_hash_id_unlocked() {
return retval;
}
CACHEFILE cachefile_list::find_cachefile_in_list_unlocked(
CACHEFILE start,
struct fileid* fileid
)
{
CACHEFILE retval = NULL;
for (CACHEFILE extant = start; extant; extant = extant->next) {
if (toku_fileids_are_equal(&extant->fileid, fileid)) {
// Clients must serialize cachefile open, close, and unlink
// So, during open, we should never see a closing cachefile
// or one that has been marked as unlink on close.
assert(!extant->unlink_on_close);
retval = extant;
goto exit;
}
}
exit:
return retval;
}
CACHEFILE cachefile_list::find_cachefile_unlocked(struct fileid* fileid) {
CACHEFILE cf = nullptr;
int r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr);
if (r == 0) {
assert(!cf->unlink_on_close);
}
#if TOKU_DEBUG_PARANOID
assert(cf == find_cachefile_in_list_unlocked(m_active_head, fileid));
#endif
return cf;
}
CACHEFILE cachefile_list::find_stale_cachefile_unlocked(struct fileid* fileid) {
return find_cachefile_in_list_unlocked(m_stale_head, fileid);
CACHEFILE cf = nullptr;
int r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr);
if (r == 0) {
assert(!cf->unlink_on_close);
}
return cf;
}
void cachefile_list::verify_unused_filenum(FILENUM filenum) {
int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, nullptr, nullptr);
assert(r == DB_NOTFOUND);
#if TOKU_DEBUG_PARANOID
for (CACHEFILE extant = m_active_head; extant; extant = extant->next) {
invariant(extant->filenum.fileid != filenum.fileid);
}
#endif
}
// returns true if some eviction ran, false otherwise
bool cachefile_list::evict_some_stale_pair(evictor* ev) {
PAIR p = NULL;
CACHEFILE cf_to_destroy = NULL;
write_lock();
if (m_stale_tail == NULL) {
if (m_stale_fileid.size() == 0) {
write_unlock();
return false;
}
p = m_stale_tail->cf_head;
CACHEFILE stale_cf = nullptr;
int r = m_stale_fileid.fetch(0, &stale_cf);
assert_zero(r);
// we should not have a cf in the stale list
// that does not have any pairs
PAIR p = stale_cf->cf_head;
paranoid_invariant(p != NULL);
evict_pair_from_cachefile(p);
// now that we have evicted something,
// let's check if the cachefile is needed anymore
if (m_stale_tail->cf_head == NULL) {
cf_to_destroy = m_stale_tail;
remove_stale_cf_unlocked(m_stale_tail);
//
// it is not needed if the latest eviction caused
// the cf_head for that cf to become null
bool destroy_cf = stale_cf->cf_head == nullptr;
if (destroy_cf) {
remove_stale_cf_unlocked(stale_cf);
}
write_unlock();
ev->remove_pair_attr(p->attr);
cachetable_free_pair(p);
if (cf_to_destroy) {
cachefile_destroy(cf_to_destroy);
if (destroy_cf) {
cachefile_destroy(stale_cf);
}
return true;
}
void cachefile_list::free_stale_data(evictor* ev) {
write_lock();
while (m_stale_tail != NULL) {
PAIR p = m_stale_tail->cf_head;
while (m_stale_fileid.size() != 0) {
CACHEFILE stale_cf = nullptr;
int r = m_stale_fileid.fetch(0, &stale_cf);
assert_zero(r);
// we should not have a cf in the stale list
// that does not have any pairs
PAIR p = stale_cf->cf_head;
paranoid_invariant(p != NULL);
evict_pair_from_cachefile(p);
......@@ -4958,10 +4925,9 @@ void cachefile_list::free_stale_data(evictor* ev) {
// now that we have evicted something,
// let's check if the cachefile is needed anymore
if (m_stale_tail->cf_head == NULL) {
CACHEFILE cf_to_destroy = m_stale_tail;
remove_stale_cf_unlocked(m_stale_tail);
cachefile_destroy(cf_to_destroy);
if (stale_cf->cf_head == NULL) {
remove_stale_cf_unlocked(stale_cf);
cachefile_destroy(stale_cf);
}
}
write_unlock();
......
......@@ -112,6 +112,14 @@ struct checkpointer_test {
uint32_t k);
};
static void init_cachefile(CACHEFILE cf, int which_cf, bool for_checkpoint) {
memset(cf, 0, sizeof(*cf));
create_dummy_functions(cf);
cf->fileid = { 0, (unsigned) which_cf };
cf->filenum = { (unsigned) which_cf };
cf->for_checkpoint = for_checkpoint;
}
//------------------------------------------------------------------------------
// test_begin_checkpoint() -
//
......@@ -135,33 +143,28 @@ void checkpointer_test::test_begin_checkpoint() {
// 2. Call checkpoint with ONE cachefile.
//cachefile cf;
struct cachefile cf;
cf.next = NULL;
cf.for_checkpoint = false;
m_cp.m_cf_list->m_active_head = &cf;
create_dummy_functions(&cf);
init_cachefile(&cf, 0, false);
m_cp.m_cf_list->add_cf_unlocked(&cf);
m_cp.begin_checkpoint();
assert(m_cp.m_checkpoint_num_files == 1);
assert(cf.for_checkpoint == true);
m_cp.m_cf_list->remove_cf(&cf);
// 3. Call checkpoint with MANY cachefiles.
const uint32_t count = 3;
struct cachefile cfs[count];
m_cp.m_cf_list->m_active_head = &cfs[0];
for (uint32_t i = 0; i < count; ++i) {
cfs[i].for_checkpoint = false;
init_cachefile(&cfs[i], i, false);
create_dummy_functions(&cfs[i]);
if (i == count - 1) {
cfs[i].next = NULL;
} else {
cfs[i].next = &cfs[i + 1];
}
m_cp.m_cf_list->add_cf_unlocked(&cfs[i]);
}
m_cp.begin_checkpoint();
assert(m_cp.m_checkpoint_num_files == count);
for (uint32_t i = 0; i < count; ++i) {
assert(cfs[i].for_checkpoint == true);
cfl.remove_cf(&cfs[i]);
}
ctbl.list.destroy();
m_cp.destroy();
......@@ -195,10 +198,8 @@ void checkpointer_test::test_pending_bits() {
//
struct cachefile cf;
cf.cachetable = &ctbl;
memset(&cf, 0, sizeof(cf));
cf.next = NULL;
cf.for_checkpoint = true;
m_cp.m_cf_list->m_active_head = &cf;
init_cachefile(&cf, 0, true);
m_cp.m_cf_list->add_cf_unlocked(&cf);
create_dummy_functions(&cf);
CACHEKEY k;
......@@ -258,6 +259,7 @@ void checkpointer_test::test_pending_bits() {
ctbl.list.destroy();
m_cp.destroy();
cfl.remove_cf(&cf);
cfl.destroy();
}
......@@ -337,14 +339,11 @@ void checkpointer_test::test_end_checkpoint() {
cfl.init();
struct cachefile cf;
memset(&cf, 0, sizeof(cf));
cf.next = NULL;
cf.for_checkpoint = true;
create_dummy_functions(&cf);
init_cachefile(&cf, 0, true);
ZERO_STRUCT(m_cp);
m_cp.init(&ctbl.list, NULL, &ctbl.ev, &cfl);
m_cp.m_cf_list->m_active_head = &cf;
m_cp.m_cf_list->add_cf_unlocked(&cf);
// 2. Add data before running checkpoint.
const uint32_t count = 6;
......@@ -394,6 +393,7 @@ void checkpointer_test::test_end_checkpoint() {
assert(pp);
m_cp.m_list->evict_completely(pp);
}
cfl.remove_cf(&cf);
m_cp.destroy();
ctbl.list.destroy();
cfl.destroy();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment