Commit 4e695636 authored by Zardosht Kasheff's avatar Zardosht Kasheff

refs #61, several code simplifications:

 - break up cachetable_flush_cachefile into more digestable functions,
 - decouple hash_id from filenum
 - break up close_userdata into close_userdata and free_userdata
parent e68aba30
...@@ -184,7 +184,7 @@ struct cachefile { ...@@ -184,7 +184,7 @@ struct cachefile {
// they are managed whenever we add or remove a pair from // they are managed whenever we add or remove a pair from
// the cachetable. As of Riddler, this linked list is only used to // the cachetable. As of Riddler, this linked list is only used to
// make cachetable_flush_cachefile more efficient // make cachetable_flush_cachefile more efficient
PAIR cf_head; PAIR cf_head; // doubly linked list that is NOT circular
uint32_t num_pairs; // count on number of pairs in the cachetable belong to this cachefile uint32_t num_pairs; // count on number of pairs in the cachetable belong to this cachefile
bool for_checkpoint; //True if part of the in-progress checkpoint bool for_checkpoint; //True if part of the in-progress checkpoint
...@@ -209,6 +209,7 @@ struct cachefile { ...@@ -209,6 +209,7 @@ struct cachefile {
void *userdata; void *userdata;
void (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files. void (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files.
void (*close_userdata)(CACHEFILE cf, int fd, void *userdata, bool lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function. void (*close_userdata)(CACHEFILE cf, int fd, void *userdata, bool lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function.
void (*free_userdata)(CACHEFILE cf, void *userdata); // when closing the last reference to a cachefile, first call this function.
void (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function. void (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function.
void (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function. void (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function.
void (*end_checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // after checkpointing cachefiles call this function. void (*end_checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // after checkpointing cachefiles call this function.
...@@ -427,11 +428,13 @@ public: ...@@ -427,11 +428,13 @@ public:
void add_cf_unlocked(CACHEFILE newcf); void add_cf_unlocked(CACHEFILE newcf);
void remove_cf(CACHEFILE cf); void remove_cf(CACHEFILE cf);
FILENUM reserve_filenum(); FILENUM reserve_filenum();
uint32_t get_new_hash_id_unlocked();
CACHEFILE find_cachefile_unlocked(struct fileid* fileid); CACHEFILE find_cachefile_unlocked(struct fileid* fileid);
void verify_unused_filenum(FILENUM filenum); void verify_unused_filenum(FILENUM filenum);
// access to these fields are protected by the lock // access to these fields are protected by the lock
CACHEFILE m_head; CACHEFILE m_active_head; // head of CACHEFILEs that are active
FILENUM m_next_filenum_to_use; FILENUM m_next_filenum_to_use;
uint32_t m_next_hash_id_to_use;
toku_pthread_rwlock_t m_lock; // this field is publoc so we are still POD toku_pthread_rwlock_t m_lock; // this field is publoc so we are still POD
}; };
......
...@@ -378,10 +378,11 @@ static void create_new_cachefile( ...@@ -378,10 +378,11 @@ static void create_new_cachefile(
CACHEFILE newcf = NULL; CACHEFILE newcf = NULL;
XCALLOC(newcf); XCALLOC(newcf);
newcf->cachetable = ct; newcf->cachetable = ct;
newcf->filenum = filenum;
newcf->hash_id = hash_id; newcf->hash_id = hash_id;
newcf->fd = fd;
newcf->fileid = fileid; newcf->fileid = fileid;
newcf->filenum = filenum;
newcf->fd = fd;
newcf->fname_in_env = toku_xstrdup(fname_in_env); newcf->fname_in_env = toku_xstrdup(fname_in_env);
bjm_init(&newcf->bjm); bjm_init(&newcf->bjm);
*cfptr = newcf; *cfptr = newcf;
...@@ -413,8 +414,15 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd ...@@ -413,8 +414,15 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
goto exit; goto exit;
} }
ct->cf_list.verify_unused_filenum(filenum); ct->cf_list.verify_unused_filenum(filenum);
create_new_cachefile(
create_new_cachefile(ct, filenum, filenum.fileid, fd, fname_in_env, fileid, &newcf); ct,
filenum,
ct->cf_list.get_new_hash_id_unlocked(),
fd,
fname_in_env,
fileid,
&newcf
);
ct->cf_list.add_cf_unlocked(newcf); ct->cf_list.add_cf_unlocked(newcf);
...@@ -468,7 +476,9 @@ void toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) { ...@@ -468,7 +476,9 @@ void toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) {
// Call the close userdata callback to notify the client this cachefile // Call the close userdata callback to notify the client this cachefile
// and its underlying file are going to be closed // and its underlying file are going to be closed
if (cf->close_userdata) { if (cf->close_userdata) {
invariant(cf->free_userdata);
cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn); cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn);
cf->free_userdata(cf, cf->userdata);
} }
ct->cf_list.remove_cf(cf); ct->cf_list.remove_cf(cf);
...@@ -2381,50 +2391,28 @@ static void remove_pair_for_close(PAIR p, CACHETABLE ct) { ...@@ -2381,50 +2391,28 @@ static void remove_pair_for_close(PAIR p, CACHETABLE ct) {
cachetable_maybe_remove_and_free_pair(&ct->list, &ct->ev, p); cachetable_maybe_remove_and_free_pair(&ct->list, &ct->ev, p);
} }
// Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if // helper function for cachetable_flush_cachefile, which happens on a close
// the cachefile is NULL. // writes out the dirty pairs on background threads and returns when
// Must be holding cachetable lock on entry. // the writing is done
// static void write_dirty_pairs_for_close(CACHETABLE ct, CACHEFILE cf) {
// This function assumes that no client thread is accessing or
// trying to access the cachefile while this function is executing.
// This implies no client thread will be trying to lock any nodes
// belonging to the cachefile.
//
// This function also assumes that the cachefile is not in the process
// of being used by a checkpoint. If a checkpoint is currently happening,
// it does NOT include this cachefile.
//
static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
//
// Because work on a kibbutz is always done by the client thread,
// and this function assumes that no client thread is doing any work
// on the cachefile, we assume that no client thread will be adding jobs
// to this cachefile's kibbutz.
//
// The caller of this function must ensure that there are
// no jobs added to the kibbutz. This implies that the only work other
// threads may be doing is work by the writer threads.
//
// first write out dirty PAIRs
BACKGROUND_JOB_MANAGER bjm = NULL; BACKGROUND_JOB_MANAGER bjm = NULL;
bjm_init(&bjm); bjm_init(&bjm);
ct->list.write_list_lock(); // TODO: (Zardosht), verify that this lock is unnecessary to take here ct->list.write_list_lock(); // TODO: (Zardosht), verify that this lock is unnecessary to take here
PAIR p = NULL; PAIR p = NULL;
uint32_t i;
// write out dirty PAIRs // write out dirty PAIRs
uint32_t i;
if (cf) { if (cf) {
for (i = 0, p = cf->cf_head; for (i = 0, p = cf->cf_head;
i < cf->num_pairs; i < cf->num_pairs;
i++, p = p->cf_next) i++, p = p->cf_next)
{ {
flush_pair_for_close_on_background_thread(p, bjm, ct); flush_pair_for_close_on_background_thread(p, bjm, ct);
} }
} }
else { else {
for (i = 0, p = ct->list.m_checkpoint_head; for (i = 0, p = ct->list.m_checkpoint_head;
i < ct->list.m_n_in_table; i < ct->list.m_n_in_table;
i++, p = p->clock_next) i++, p = p->clock_next)
{ {
flush_pair_for_close_on_background_thread(p, bjm, ct); flush_pair_for_close_on_background_thread(p, bjm, ct);
} }
...@@ -2432,40 +2420,79 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { ...@@ -2432,40 +2420,79 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
ct->list.write_list_unlock(); ct->list.write_list_unlock();
bjm_wait_for_jobs_to_finish(bjm); bjm_wait_for_jobs_to_finish(bjm);
bjm_destroy(bjm); bjm_destroy(bjm);
}
// now that everything is clean, get rid of everything
static void remove_all_pairs_for_close(CACHETABLE ct, CACHEFILE cf) {
ct->list.write_list_lock(); ct->list.write_list_lock();
if (cf) { if (cf) {
while (cf->num_pairs > 0) { while (cf->num_pairs > 0) {
p = cf->cf_head; PAIR p = cf->cf_head;
remove_pair_for_close(p, ct); remove_pair_for_close(p, ct);
} }
} }
else { else {
while (ct->list.m_n_in_table > 0) { while (ct->list.m_n_in_table > 0) {
p = ct->list.m_checkpoint_head; PAIR p = ct->list.m_checkpoint_head;
remove_pair_for_close(p, ct); remove_pair_for_close(p, ct);
} }
} }
ct->list.write_list_unlock();
}
static void verify_cachefile_flushed(CACHETABLE ct, CACHEFILE cf) {
// assert here that cachefile is flushed by checking // assert here that cachefile is flushed by checking
// pair_list and finding no pairs belonging to this cachefile // pair_list and finding no pairs belonging to this cachefile
// Make a list of pairs that belong to this cachefile. // Make a list of pairs that belong to this cachefile.
#ifdef TOKU_DEBUG_PARANOID #ifdef TOKU_DEBUG_PARANOID
if (cf) { if (cf) {
ct->list.write_list_lock();
// assert here that cachefile is flushed by checking // assert here that cachefile is flushed by checking
// pair_list and finding no pairs belonging to this cachefile // pair_list and finding no pairs belonging to this cachefile
// Make a list of pairs that belong to this cachefile. // Make a list of pairs that belong to this cachefile.
uint32_t i;
PAIR p = NULL;
for (i = 0, p = ct->list.m_checkpoint_head; for (i = 0, p = ct->list.m_checkpoint_head;
i < ct->list.m_n_in_table; i < ct->list.m_n_in_table;
i++, p = p->clock_next) i++, p = p->clock_next)
{ {
assert(p->cachefile != cf); assert(p->cachefile != cf);
} }
ct->list.write_list_unlock();
} }
#endif #endif
}
// Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if
// the cachefile is NULL.
// Must be holding cachetable lock on entry.
//
// This function assumes that no client thread is accessing or
// trying to access the cachefile while this function is executing.
// This implies no client thread will be trying to lock any nodes
// belonging to the cachefile.
//
// This function also assumes that the cachefile is not in the process
// of being used by a checkpoint. If a checkpoint is currently happening,
// it does NOT include this cachefile.
//
static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
//
// Because work on a kibbutz is always done by the client thread,
// and this function assumes that no client thread is doing any work
// on the cachefile, we assume that no client thread will be adding jobs
// to this cachefile's kibbutz.
//
// The caller of this function must ensure that there are
// no jobs added to the kibbutz. This implies that the only work other
// threads may be doing is work by the writer threads.
//
// first write out dirty PAIRs
write_dirty_pairs_for_close(ct, cf);
ct->list.write_list_unlock(); // now that everything is clean, get rid of everything
remove_all_pairs_for_close(ct, cf);
verify_cachefile_flushed(ct, cf);
if (cf) { if (cf) {
bjm_reset(cf->bjm); bjm_reset(cf->bjm);
} }
...@@ -2871,6 +2898,7 @@ toku_cachefile_set_userdata (CACHEFILE cf, ...@@ -2871,6 +2898,7 @@ toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata, void *userdata,
void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
void (*close_userdata)(CACHEFILE, int, void*, bool, LSN), void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
void (*free_userdata)(CACHEFILE, void*),
void (*checkpoint_userdata)(CACHEFILE, int, void*), void (*checkpoint_userdata)(CACHEFILE, int, void*),
void (*begin_checkpoint_userdata)(LSN, void*), void (*begin_checkpoint_userdata)(LSN, void*),
void (*end_checkpoint_userdata)(CACHEFILE, int, void*), void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
...@@ -2879,6 +2907,7 @@ toku_cachefile_set_userdata (CACHEFILE cf, ...@@ -2879,6 +2907,7 @@ toku_cachefile_set_userdata (CACHEFILE cf,
cf->userdata = userdata; cf->userdata = userdata;
cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint; cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint;
cf->close_userdata = close_userdata; cf->close_userdata = close_userdata;
cf->free_userdata = free_userdata;
cf->checkpoint_userdata = checkpoint_userdata; cf->checkpoint_userdata = checkpoint_userdata;
cf->begin_checkpoint_userdata = begin_checkpoint_userdata; cf->begin_checkpoint_userdata = begin_checkpoint_userdata;
cf->end_checkpoint_userdata = end_checkpoint_userdata; cf->end_checkpoint_userdata = end_checkpoint_userdata;
...@@ -4286,7 +4315,7 @@ void checkpointer::increment_num_txns() { ...@@ -4286,7 +4315,7 @@ void checkpointer::increment_num_txns() {
// //
void checkpointer::update_cachefiles() { void checkpointer::update_cachefiles() {
CACHEFILE cf; CACHEFILE cf;
for(cf = m_cf_list->m_head; cf; cf=cf->next) { for(cf = m_cf_list->m_active_head; cf; cf=cf->next) {
assert(cf->begin_checkpoint_userdata); assert(cf->begin_checkpoint_userdata);
if (cf->for_checkpoint) { if (cf->for_checkpoint) {
cf->begin_checkpoint_userdata(m_lsn_of_checkpoint_in_progress, cf->begin_checkpoint_userdata(m_lsn_of_checkpoint_in_progress,
...@@ -4306,7 +4335,7 @@ void checkpointer::begin_checkpoint() { ...@@ -4306,7 +4335,7 @@ void checkpointer::begin_checkpoint() {
// 2. Make list of cachefiles to be included in the checkpoint. // 2. Make list of cachefiles to be included in the checkpoint.
// TODO: <CER> How do we remove the non-lock cachetable reference here? // TODO: <CER> How do we remove the non-lock cachetable reference here?
m_cf_list->read_lock(); m_cf_list->read_lock();
for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) { for (CACHEFILE cf = m_cf_list->m_active_head; cf; cf = cf->next) {
// The caller must serialize open, close, and begin checkpoint. // The caller must serialize open, close, and begin checkpoint.
// So we should never see a closing cachefile here. // So we should never see a closing cachefile here.
// <CER> Is there an assert we can add here? // <CER> Is there an assert we can add here?
...@@ -4365,7 +4394,7 @@ void checkpointer::log_begin_checkpoint() { ...@@ -4365,7 +4394,7 @@ void checkpointer::log_begin_checkpoint() {
m_lsn_of_checkpoint_in_progress = begin_lsn; m_lsn_of_checkpoint_in_progress = begin_lsn;
// Log the list of open dictionaries. // Log the list of open dictionaries.
for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) { for (CACHEFILE cf = m_cf_list->m_active_head; cf; cf = cf->next) {
assert(cf->log_fassociate_during_checkpoint); assert(cf->log_fassociate_during_checkpoint);
cf->log_fassociate_during_checkpoint(cf, cf->userdata); cf->log_fassociate_during_checkpoint(cf, cf->userdata);
} }
...@@ -4446,7 +4475,7 @@ void checkpointer::end_checkpoint(void (*testcallback_f)(void*), void* testextr ...@@ -4446,7 +4475,7 @@ void checkpointer::end_checkpoint(void (*testcallback_f)(void*), void* testextr
void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) { void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) {
m_cf_list->read_lock(); m_cf_list->read_lock();
uint32_t curr_index = 0; uint32_t curr_index = 0;
for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) { for (CACHEFILE cf = m_cf_list->m_active_head; cf; cf = cf->next) {
if (cf->for_checkpoint) { if (cf->for_checkpoint) {
assert(curr_index < m_checkpoint_num_files); assert(curr_index < m_checkpoint_num_files);
checkpoint_cfs[curr_index] = cf; checkpoint_cfs[curr_index] = cf;
...@@ -4538,8 +4567,9 @@ void checkpointer::remove_cachefiles(CACHEFILE* checkpoint_cfs) { ...@@ -4538,8 +4567,9 @@ void checkpointer::remove_cachefiles(CACHEFILE* checkpoint_cfs) {
static_assert(std::is_pod<cachefile_list>::value, "cachefile_list isn't POD"); static_assert(std::is_pod<cachefile_list>::value, "cachefile_list isn't POD");
void cachefile_list::init() { void cachefile_list::init() {
m_head = NULL; m_active_head = NULL;
m_next_filenum_to_use.fileid = 0; m_next_filenum_to_use.fileid = 0;
m_next_hash_id_to_use = 0;
toku_pthread_rwlock_init(&m_lock, NULL); toku_pthread_rwlock_init(&m_lock, NULL);
} }
...@@ -4567,7 +4597,7 @@ int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFIL ...@@ -4567,7 +4597,7 @@ int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFIL
CACHEFILE extant; CACHEFILE extant;
int r; int r;
r = ENOENT; r = ENOENT;
for (extant = m_head; extant; extant = extant->next) { for (extant = m_active_head; extant; extant = extant->next) {
if (extant->fname_in_env && if (extant->fname_in_env &&
!strcmp(extant->fname_in_env, iname_in_env)) { !strcmp(extant->fname_in_env, iname_in_env)) {
*cf = extant; *cf = extant;
...@@ -4584,7 +4614,7 @@ int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) { ...@@ -4584,7 +4614,7 @@ int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) {
CACHEFILE extant; CACHEFILE extant;
int r = ENOENT; int r = ENOENT;
*cf = NULL; *cf = NULL;
for (extant = m_head; extant; extant = extant->next) { for (extant = m_active_head; extant; extant = extant->next) {
if (extant->filenum.fileid==filenum.fileid) { if (extant->filenum.fileid==filenum.fileid) {
*cf = extant; *cf = extant;
r = 0; r = 0;
...@@ -4596,27 +4626,27 @@ int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) { ...@@ -4596,27 +4626,27 @@ int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) {
} }
void cachefile_list::add_cf_unlocked(CACHEFILE cf) { void cachefile_list::add_cf_unlocked(CACHEFILE cf) {
cf->next = m_head; cf->next = m_active_head;
cf->prev = NULL; cf->prev = NULL;
if (m_head) { if (m_active_head) {
m_head->prev = cf; m_active_head->prev = cf;
} }
m_head = cf; m_active_head = cf;
} }
void cachefile_list::remove_cf(CACHEFILE cf) { void cachefile_list::remove_cf(CACHEFILE cf) {
write_lock(); write_lock();
invariant(m_head != NULL); invariant(m_active_head != NULL);
if (cf->next) { if (cf->next) {
cf->next->prev = cf->prev; cf->next->prev = cf->prev;
} }
if (cf->prev) { if (cf->prev) {
cf->prev->next = cf->next; cf->prev->next = cf->next;
} }
if (cf == m_head) { if (cf == m_active_head) {
invariant(cf->prev == NULL); invariant(cf->prev == NULL);
m_head = cf->next; m_active_head = cf->next;
} }
write_unlock(); write_unlock();
} }
...@@ -4627,7 +4657,7 @@ FILENUM cachefile_list::reserve_filenum() { ...@@ -4627,7 +4657,7 @@ FILENUM cachefile_list::reserve_filenum() {
// taking a write lock because we are modifying next_filenum_to_use // taking a write lock because we are modifying next_filenum_to_use
write_lock(); write_lock();
try_again: try_again:
for (extant = m_head; extant; extant = extant->next) { for (extant = m_active_head; extant; extant = extant->next) {
if (m_next_filenum_to_use.fileid==extant->filenum.fileid) { if (m_next_filenum_to_use.fileid==extant->filenum.fileid) {
m_next_filenum_to_use.fileid++; m_next_filenum_to_use.fileid++;
goto try_again; goto try_again;
...@@ -4639,9 +4669,15 @@ try_again: ...@@ -4639,9 +4669,15 @@ try_again:
return filenum; return filenum;
} }
uint32_t cachefile_list::get_new_hash_id_unlocked() {
uint32_t retval = m_next_hash_id_to_use;
m_next_hash_id_to_use++;
return retval;
}
CACHEFILE cachefile_list::find_cachefile_unlocked(struct fileid* fileid) { CACHEFILE cachefile_list::find_cachefile_unlocked(struct fileid* fileid) {
CACHEFILE retval = NULL; CACHEFILE retval = NULL;
for (CACHEFILE extant = m_head; extant; extant = extant->next) { for (CACHEFILE extant = m_active_head; extant; extant = extant->next) {
if (toku_fileids_are_equal(&extant->fileid, fileid)) { if (toku_fileids_are_equal(&extant->fileid, fileid)) {
// Clients must serialize cachefile open, close, and unlink // Clients must serialize cachefile open, close, and unlink
// So, during open, we should never see a closing cachefile // So, during open, we should never see a closing cachefile
...@@ -4656,7 +4692,7 @@ exit: ...@@ -4656,7 +4692,7 @@ exit:
} }
void cachefile_list::verify_unused_filenum(FILENUM filenum) { void cachefile_list::verify_unused_filenum(FILENUM filenum) {
for (CACHEFILE extant = m_head; extant; extant = extant->next) { for (CACHEFILE extant = m_active_head; extant; extant = extant->next) {
invariant(extant->filenum.fileid != filenum.fileid); invariant(extant->filenum.fileid != filenum.fileid);
} }
} }
......
...@@ -275,6 +275,7 @@ typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, bool for_checkpoint, v ...@@ -275,6 +275,7 @@ typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, bool for_checkpoint, v
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
void (*close_userdata)(CACHEFILE, int, void*, bool, LSN), void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
void (*free_userdata)(CACHEFILE, void*),
void (*checkpoint_userdata)(CACHEFILE, int, void*), void (*checkpoint_userdata)(CACHEFILE, int, void*),
void (*begin_checkpoint_userdata)(LSN, void*), void (*begin_checkpoint_userdata)(LSN, void*),
void (*end_checkpoint_userdata)(CACHEFILE, int, void*), void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
......
...@@ -317,6 +317,11 @@ static void ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_val ...@@ -317,6 +317,11 @@ static void ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_val
ft_end_checkpoint(cachefile, fd, header_v); ft_end_checkpoint(cachefile, fd, header_v);
assert(!ft->h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary) assert(!ft->h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
} }
}
// maps to cf->free_userdata
static void ft_free(CACHEFILE cachefile UU(), void *header_v) {
FT ft = (FT) header_v;
toku_ft_free(ft); toku_ft_free(ft);
} }
...@@ -392,6 +397,7 @@ static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) { ...@@ -392,6 +397,7 @@ static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
ft, ft,
ft_log_fassociate_during_checkpoint, ft_log_fassociate_during_checkpoint,
ft_close, ft_close,
ft_free,
ft_checkpoint, ft_checkpoint,
ft_begin_checkpoint, ft_begin_checkpoint,
ft_end_checkpoint, ft_end_checkpoint,
...@@ -494,6 +500,7 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac ...@@ -494,6 +500,7 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
(void*)h, (void*)h,
ft_log_fassociate_during_checkpoint, ft_log_fassociate_during_checkpoint,
ft_close, ft_close,
ft_free,
ft_checkpoint, ft_checkpoint,
ft_begin_checkpoint, ft_begin_checkpoint,
ft_end_checkpoint, ft_end_checkpoint,
......
...@@ -136,7 +136,7 @@ void checkpointer_test::test_begin_checkpoint() { ...@@ -136,7 +136,7 @@ void checkpointer_test::test_begin_checkpoint() {
struct cachefile cf; struct cachefile cf;
cf.next = NULL; cf.next = NULL;
cf.for_checkpoint = false; cf.for_checkpoint = false;
m_cp.m_cf_list->m_head = &cf; m_cp.m_cf_list->m_active_head = &cf;
create_dummy_functions(&cf); create_dummy_functions(&cf);
m_cp.begin_checkpoint(); m_cp.begin_checkpoint();
...@@ -146,7 +146,7 @@ void checkpointer_test::test_begin_checkpoint() { ...@@ -146,7 +146,7 @@ void checkpointer_test::test_begin_checkpoint() {
// 3. Call checkpoint with MANY cachefiles. // 3. Call checkpoint with MANY cachefiles.
const uint32_t count = 3; const uint32_t count = 3;
struct cachefile cfs[count]; struct cachefile cfs[count];
m_cp.m_cf_list->m_head = &cfs[0]; m_cp.m_cf_list->m_active_head = &cfs[0];
for (uint32_t i = 0; i < count; ++i) { for (uint32_t i = 0; i < count; ++i) {
cfs[i].for_checkpoint = false; cfs[i].for_checkpoint = false;
create_dummy_functions(&cfs[i]); create_dummy_functions(&cfs[i]);
...@@ -196,7 +196,7 @@ void checkpointer_test::test_pending_bits() { ...@@ -196,7 +196,7 @@ void checkpointer_test::test_pending_bits() {
memset(&cf, 0, sizeof(cf)); memset(&cf, 0, sizeof(cf));
cf.next = NULL; cf.next = NULL;
cf.for_checkpoint = true; cf.for_checkpoint = true;
m_cp.m_cf_list->m_head = &cf; m_cp.m_cf_list->m_active_head = &cf;
create_dummy_functions(&cf); create_dummy_functions(&cf);
CACHEKEY k; CACHEKEY k;
...@@ -341,7 +341,7 @@ void checkpointer_test::test_end_checkpoint() { ...@@ -341,7 +341,7 @@ void checkpointer_test::test_end_checkpoint() {
ZERO_STRUCT(m_cp); ZERO_STRUCT(m_cp);
m_cp.init(&ctbl.list, NULL, &ctbl.ev, &cfl); m_cp.init(&ctbl.list, NULL, &ctbl.ev, &cfl);
m_cp.m_cf_list->m_head = &cf; m_cp.m_cf_list->m_active_head = &cf;
// 2. Add data before running checkpoint. // 2. Add data before running checkpoint.
const uint32_t count = 6; const uint32_t count = 6;
......
...@@ -422,6 +422,7 @@ cachetable_test (void) { ...@@ -422,6 +422,7 @@ cachetable_test (void) {
NULL, NULL,
&dummy_log_fassociate, &dummy_log_fassociate,
&dummy_close_usr, &dummy_close_usr,
&dummy_free_usr,
&dummy_chckpnt_usr, &dummy_chckpnt_usr,
&test_begin_checkpoint, &test_begin_checkpoint,
&dummy_end, &dummy_end,
......
...@@ -554,6 +554,7 @@ cachetable_test (void) { ...@@ -554,6 +554,7 @@ cachetable_test (void) {
NULL, NULL,
&dummy_log_fassociate, &dummy_log_fassociate,
&dummy_close_usr, &dummy_close_usr,
&dummy_free_usr,
&dummy_chckpnt_usr, &dummy_chckpnt_usr,
test_begin_checkpoint, // called in begin_checkpoint test_begin_checkpoint, // called in begin_checkpoint
&dummy_end, &dummy_end,
......
...@@ -95,6 +95,7 @@ PATENT RIGHTS GRANT: ...@@ -95,6 +95,7 @@ PATENT RIGHTS GRANT:
// //
static void dummy_log_fassociate(CACHEFILE UU(cf), void* UU(p)) { } static void dummy_log_fassociate(CACHEFILE UU(cf), void* UU(p)) { }
static void dummy_close_usr(CACHEFILE UU(cf), int UU(i), void* UU(p), bool UU(b), LSN UU(lsn)) { } static void dummy_close_usr(CACHEFILE UU(cf), int UU(i), void* UU(p), bool UU(b), LSN UU(lsn)) { }
static void dummy_free_usr(CACHEFILE UU(cf), void* UU(p)) { }
static void dummy_chckpnt_usr(CACHEFILE UU(cf), int UU(i), void* UU(p)) { } static void dummy_chckpnt_usr(CACHEFILE UU(cf), int UU(i), void* UU(p)) { }
static void dummy_begin(LSN UU(lsn), void* UU(p)) { } static void dummy_begin(LSN UU(lsn), void* UU(p)) { }
static void dummy_end(CACHEFILE UU(cf), int UU(i), void* UU(p)) { } static void dummy_end(CACHEFILE UU(cf), int UU(i), void* UU(p)) { }
...@@ -112,6 +113,7 @@ create_dummy_functions(CACHEFILE cf) ...@@ -112,6 +113,7 @@ create_dummy_functions(CACHEFILE cf)
ud, ud,
&dummy_log_fassociate, &dummy_log_fassociate,
&dummy_close_usr, &dummy_close_usr,
&dummy_free_usr,
&dummy_chckpnt_usr, &dummy_chckpnt_usr,
&dummy_begin, &dummy_begin,
&dummy_end, &dummy_end,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment