Commit 8ef52123 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

closes #5241, finish removal of completion queues

git-svn-id: file:///svn/toku/tokudb@45640 c7de825b-a66e-492c-adef-691d508d4ae1
parent a53f3540
...@@ -115,7 +115,6 @@ struct ctpair { ...@@ -115,7 +115,6 @@ struct ctpair {
struct nb_mutex value_nb_mutex; // single writer, protects value_data struct nb_mutex value_nb_mutex; // single writer, protects value_data
struct nb_mutex disk_nb_mutex; // single writer, protects disk_data, is used for writing cloned nodes for checkpoint struct nb_mutex disk_nb_mutex; // single writer, protects disk_data, is used for writing cloned nodes for checkpoint
struct workqueue *cq; // writers sometimes return ctpair's using this queue
struct workitem asyncwork; // work item for the worker threads struct workitem asyncwork; // work item for the worker threads
struct workitem checkpoint_asyncwork; // work item for the worker threads struct workitem checkpoint_asyncwork; // work item for the worker threads
struct toku_list next_for_cachefile; // link in the cachefile list struct toku_list next_for_cachefile; // link in the cachefile list
...@@ -1027,7 +1026,6 @@ static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) { ...@@ -1027,7 +1026,6 @@ static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) {
// references to it // references to it
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p) { static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p) {
p->cq = 0;
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
cachetable_maybe_remove_and_free_pair(ct, p); cachetable_maybe_remove_and_free_pair(ct, p);
} }
...@@ -1050,7 +1048,6 @@ static void cachetable_evict_pair(CACHETABLE ct, PAIR p) { ...@@ -1050,7 +1048,6 @@ static void cachetable_evict_pair(CACHETABLE ct, PAIR p) {
if (8*ct->size_evicting <= ct->size_current) { if (8*ct->size_evicting <= ct->size_current) {
workqueue_wakeup_write(&ct->wq, 0); workqueue_wakeup_write(&ct->wq, 0);
} }
assert(!p->cq);
cachetable_complete_write_pair(ct, p); cachetable_complete_write_pair(ct, p);
} }
...@@ -1116,7 +1113,6 @@ static void do_partial_eviction(CACHETABLE ct, PAIR p) { ...@@ -1116,7 +1113,6 @@ static void do_partial_eviction(CACHETABLE ct, PAIR p) {
if (8*ct->size_evicting <= ct->size_current) { if (8*ct->size_evicting <= ct->size_current) {
workqueue_wakeup_write(&ct->wq, 0); workqueue_wakeup_write(&ct->wq, 0);
} }
assert(!p->cq);
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
} }
...@@ -1177,7 +1173,6 @@ static bool run_eviction_on_pair(PAIR curr_in_clock, CACHETABLE ct) { ...@@ -1177,7 +1173,6 @@ static bool run_eviction_on_pair(PAIR curr_in_clock, CACHETABLE ct) {
workqueue_enq(&ct->wq, wi, 0); workqueue_enq(&ct->wq, wi, 0);
} }
else { else {
assert(!curr_in_clock->cq);
nb_mutex_unlock(&curr_in_clock->value_nb_mutex); nb_mutex_unlock(&curr_in_clock->value_nb_mutex);
bjm_remove_background_job(cf->bjm); bjm_remove_background_job(cf->bjm);
} }
...@@ -1289,7 +1284,6 @@ static PAIR cachetable_insert_at(CACHETABLE ct, ...@@ -1289,7 +1284,6 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
p->clock_next = p->clock_prev = 0; p->clock_next = p->clock_prev = 0;
nb_mutex_init(&p->value_nb_mutex); nb_mutex_init(&p->value_nb_mutex);
nb_mutex_init(&p->disk_nb_mutex); nb_mutex_init(&p->disk_nb_mutex);
p->cq = 0;
pair_add_to_clock(ct, p); pair_add_to_clock(ct, p);
toku_list_push(&cachefile->pairs_for_cachefile, &p->next_for_cachefile); toku_list_push(&cachefile->pairs_for_cachefile, &p->next_for_cachefile);
u_int32_t h = fullhash & (ct->table_size-1); u_int32_t h = fullhash & (ct->table_size-1);
...@@ -1486,7 +1480,7 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p) ...@@ -1486,7 +1480,7 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p)
// On exit: the node is written out // On exit: the node is written out
// Method: take write lock // Method: take write lock
// maybe write out the node // maybe write out the node
// if p->cq, put on completion queue. Else release write lock // Else release write lock
static void static void
write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p) write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
{ {
...@@ -1504,10 +1498,6 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p) ...@@ -1504,10 +1498,6 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
// this will grab and release disk_nb_mutex // this will grab and release disk_nb_mutex
cachetable_write_locked_pair(ct, p); // keeps the PAIR's write lock cachetable_write_locked_pair(ct, p); // keeps the PAIR's write lock
} }
// if we are checkpointing a PAIR, a cq should not exist
// close cannot be running, and unpin_and_remove
// should have set the PAIR to clean
assert(!p->cq);
// now release value_nb_mutex, before we write the PAIR out // now release value_nb_mutex, before we write the PAIR out
// so that the PAIR is available to client threads // so that the PAIR is available to client threads
...@@ -1536,14 +1526,8 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p) ...@@ -1536,14 +1526,8 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
// and the pending lock // and the pending lock
// //
p->checkpoint_pending = FALSE; p->checkpoint_pending = FALSE;
if (p->cq) {
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
} }
}
} }
// //
...@@ -1695,8 +1679,7 @@ static uint64_t get_tnow(void) { ...@@ -1695,8 +1679,7 @@ static uint64_t get_tnow(void) {
// //
// cachetable lock and PAIR lock are held on entry // cachetable lock and PAIR lock are held on entry
// On exit, cachetable lock is still held, but PAIR lock // On exit, cachetable lock is still held, but PAIR lock
// is either released or ownership of PAIR lock is transferred // is either released.
// via the completion queue.
// //
static void static void
do_partial_fetch( do_partial_fetch(
...@@ -1722,7 +1705,6 @@ do_partial_fetch( ...@@ -1722,7 +1705,6 @@ do_partial_fetch(
p->attr = new_attr; p->attr = new_attr;
cachetable_change_pair_attr(ct, old_attr, new_attr); cachetable_change_pair_attr(ct, old_attr, new_attr);
nb_mutex_unlock(&p->disk_nb_mutex); nb_mutex_unlock(&p->disk_nb_mutex);
assert(!p->cq);
if (!keep_pair_locked) { if (!keep_pair_locked) {
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
} }
...@@ -1834,7 +1816,6 @@ static void cachetable_fetch_pair( ...@@ -1834,7 +1816,6 @@ static void cachetable_fetch_pair(
p->attr = attr; p->attr = attr;
cachetable_add_pair_attr(ct, attr); cachetable_add_pair_attr(ct, attr);
nb_mutex_unlock(&p->disk_nb_mutex); nb_mutex_unlock(&p->disk_nb_mutex);
assert(!p->cq);
if (!keep_pair_locked) { if (!keep_pair_locked) {
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
} }
...@@ -2077,15 +2058,6 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, ...@@ -2077,15 +2058,6 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
for (PAIR p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { for (PAIR p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
assert(nb_mutex_writers(&p->value_nb_mutex)>0); assert(nb_mutex_writers(&p->value_nb_mutex)>0);
// this is a client thread that is unlocking the PAIR
// That is, a cleaner, flusher, or get_and_pin thread
// So, there must not be a completion queue lying around
// cachefile closes wait for the client threads to complete,
// and unpin_and_remove cannot be running because
// unpin_and_remove starts by holding the PAIR lock
// So, we should assert that a completion queue does not
// exist
assert(!p->cq);
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
if (dirty) p->dirty = CACHETABLE_DIRTY; if (dirty) p->dirty = CACHETABLE_DIRTY;
if (attr.is_valid) { if (attr.is_valid) {
...@@ -2205,27 +2177,7 @@ int toku_cachetable_get_and_pin_nonblocking ( ...@@ -2205,27 +2177,7 @@ int toku_cachetable_get_and_pin_nonblocking (
if (may_modify_value && p->checkpoint_pending) { if (may_modify_value && p->checkpoint_pending) {
write_locked_pair_for_checkpoint(ct, p); write_locked_pair_for_checkpoint(ct, p);
} }
// deadlock discovered in #4357 shows we need
// to do this. After running unlockers and waiting
// on the PAIR lock, a flusher thread may come
// along and try to unpin_and_remove this PAIR.
// In that case, the thread running unpin_and_remove
// sets up a completion queue and we must transfer ownership
// of this PAIR lock to that thread via the completion
// queue
if (p->cq) {
// while we wait on the PAIR lock, a thread may come in and
// call toku_cachetable_unpin_and_remove on this PAIR.
// In that case, we must do NOTHING with the PAIR, as
// it has been removed from the cachetable's data structures.
// So, we should just pass the PAIR over to the completion
// queue.
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
}
cachetable_unlock(ct); cachetable_unlock(ct);
return TOKUDB_TRY_AGAIN; return TOKUDB_TRY_AGAIN;
} }
...@@ -2349,11 +2301,6 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -2349,11 +2301,6 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
} }
} }
else if (nb_mutex_users(&p->value_nb_mutex)==0) { else if (nb_mutex_users(&p->value_nb_mutex)==0) {
// client should not be trying to prefetch a node that is either
// belongs to a cachefile being flushed or to a PAIR being
// unpinned and removed
assert(!p->cq);
// nobody else is using the node, so we should go ahead and prefetch // nobody else is using the node, so we should go ahead and prefetch
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
BOOL partial_fetch_required = pf_req_callback(p->value_data, read_extraargs); BOOL partial_fetch_required = pf_req_callback(p->value_data, read_extraargs);
...@@ -2372,9 +2319,6 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -2372,9 +2319,6 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
} }
} }
else { else {
// sanity check, we already have an assert
// before locking the PAIR
assert(!p->cq);
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
} }
} }
...@@ -2683,9 +2627,6 @@ int toku_cachetable_unpin_and_remove ( ...@@ -2683,9 +2627,6 @@ int toku_cachetable_unpin_and_remove (
remove_key_extra remove_key_extra
); );
} }
// we must not have a completion queue
// lying around, as we may create one now
assert(!p->cq);
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->disk_nb_mutex); nb_mutex_unlock(&p->disk_nb_mutex);
// //
...@@ -3430,7 +3371,6 @@ toku_cleaner_thread (void *cachetable_v) ...@@ -3430,7 +3371,6 @@ toku_cleaner_thread (void *cachetable_v)
// The cleaner callback must have unlocked the pair, so we // The cleaner callback must have unlocked the pair, so we
// don't need to unlock it if the cleaner callback is called. // don't need to unlock it if the cleaner callback is called.
if (!cleaner_callback_called) { if (!cleaner_callback_called) {
assert(!best_pair->cq);
nb_mutex_unlock(&best_pair->value_nb_mutex); nb_mutex_unlock(&best_pair->value_nb_mutex);
} }
// We need to make sure the cachefile sticks around so a close // We need to make sure the cachefile sticks around so a close
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment