Commit 9777c3aa authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:5071], do some cleanup in cachetable

git-svn-id: file:///svn/toku/tokudb@44512 c7de825b-a66e-492c-adef-691d508d4ae1
parent b9b269da
......@@ -4,7 +4,6 @@
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
#include <stdlib.h>
#include <string.h>
......@@ -24,15 +23,6 @@
#include "log-internal.h"
#include "kibbutz.h"
#include "ft-internal.h"
#define TRACE_CACHETABLE 0
#if TRACE_CACHETABLE
#define WHEN_TRACE_CT(x) x
#else
#define WHEN_TRACE_CT(x) ((void)0)
#endif
///////////////////////////////////////////////////////////////////////////////////
// Engine status
//
......@@ -44,9 +34,6 @@
// so they are still easily available to the debugger and to save lots of typing.
static u_int64_t cachetable_miss;
static u_int64_t cachetable_misstime; // time spent waiting for disk read
static u_int64_t cachetable_waittime; // time spent waiting for another thread to release lock (e.g. prefetch, writing)
static u_int64_t cachetable_wait_reading; // how many times does get_and_pin() wait for a node to be read?
static u_int64_t cachetable_wait_writing; // how many times does get_and_pin() wait for a node to be written?
static u_int64_t cachetable_puts; // how many times has a newly created node been put into the cachetable?
static u_int64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
static u_int64_t cachetable_maybe_get_and_pins; // how many times has maybe_get_and_pin(_clean) been called?
......@@ -56,10 +43,8 @@ static u_int64_t cleaner_executions; // number of times the cleaner thread's loo
static CACHETABLE_STATUS_S ct_status;
// Note, toku_cachetable_get_status() is below, after declaration of cachetable.
#define STATUS_INIT(k,t,l) { \
ct_status.status[k].keyname = #k; \
ct_status.status[k].type = t; \
......@@ -73,9 +58,6 @@ status_init(void) {
STATUS_INIT(CT_MISS, UINT64, "miss");
STATUS_INIT(CT_MISSTIME, UINT64, "miss time");
STATUS_INIT(CT_WAITTIME, UINT64, "wait time");
STATUS_INIT(CT_WAIT_READING, UINT64, "wait reading");
STATUS_INIT(CT_WAIT_WRITING, UINT64, "wait writing");
STATUS_INIT(CT_PUTS, UINT64, "puts (new nodes created)");
STATUS_INIT(CT_PREFETCHES, UINT64, "prefetches");
STATUS_INIT(CT_MAYBE_GET_AND_PINS, UINT64, "maybe_get_and_pin");
......@@ -98,14 +80,6 @@ status_init(void) {
#define STATUS_VALUE(x) ct_status.status[x].value.num
enum ctpair_state {
CTPAIR_IDLE = 1, // in memory
CTPAIR_READING = 2, // being read into memory
CTPAIR_WRITING = 3, // being written from memory
};
/* The workqueue pointer cq is set in:
* cachetable_complete_write_pair() cq is cleared, called from many paths, cachetable lock is held during this function
* cachetable_flush_cachefile() called during close and truncate, cachetable lock is held during this function
......@@ -122,14 +96,6 @@ struct ctpair {
void* disk_data; // data used to fetch/flush value_data to and from disk.
PAIR_ATTR attr;
//
// This variable used to be used by various functions
// to select a course of action. This is NO LONGER the case.
// From now, this variable is for informational purposes
// only, and should not be relied upon for any action
// It is a helpful variable to peek at in the debugger.
//
enum ctpair_state state;
enum cachetable_dirty dirty;
char verify_flag; // Used in verify_cachetable()
......@@ -265,9 +231,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
STATUS_VALUE(CT_MISS) = cachetable_miss;
STATUS_VALUE(CT_MISSTIME) = cachetable_misstime;
STATUS_VALUE(CT_WAITTIME) = cachetable_waittime;
STATUS_VALUE(CT_WAIT_READING) = cachetable_wait_reading;
STATUS_VALUE(CT_WAIT_WRITING) = cachetable_wait_writing;
STATUS_VALUE(CT_PUTS) = cachetable_puts;
STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches;
STATUS_VALUE(CT_MAYBE_GET_AND_PINS) = cachetable_maybe_get_and_pins;
......@@ -323,7 +286,7 @@ static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) {
// Wait for cache table space to become available
// size_current is number of bytes currently occupied by data (referred to by pairs)
// size_evicting is number of bytes queued up to be written out (sum of sizes of pairs in CTPAIR_WRITING state)
// size_evicting is number of bytes queued up to be evicted
static inline void cachetable_wait_write(CACHETABLE ct) {
// if we're writing more than half the data in the cachetable
while (2*ct->size_evicting > ct->size_current) {
......@@ -892,13 +855,6 @@ u_int32_t toku_cachetable_hash (CACHEFILE cachefile, BLOCKNUM key)
return final(cachefile->filenum.fileid, (u_int32_t)(key.b>>32), (u_int32_t)key.b);
}
#if 0
static unsigned int hashit (CACHETABLE ct, CACHEKEY key, CACHEFILE cachefile) {
assert(0==(ct->table_size & (ct->table_size -1))); // make sure table is power of two
return (toku_cachetable_hash(key,cachefile))&(ct->table_size-1);
}
#endif
// has ct locked on entry
// This function MUST NOT release and reacquire the cachetable lock
// Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
......@@ -1224,7 +1180,6 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) {
if (p->cq)
workqueue_enq(p->cq, &p->asyncwork, 1);
else {
p->state = CTPAIR_IDLE;
BOOL destroyed;
cachetable_complete_write_pair(ct, p, remove_me, &destroyed);
}
......@@ -1249,7 +1204,6 @@ static void try_evict_pair(CACHETABLE ct, PAIR p) {
// be trying to evict something this thread is trying to read
if (!nb_mutex_users(&p->value_nb_mutex)) {
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
p->state = CTPAIR_WRITING;
assert(ct->size_evicting >= 0);
ct->size_evicting += p->attr.size;
......@@ -1276,7 +1230,6 @@ static void try_evict_pair(CACHETABLE ct, PAIR p) {
// a thread pool.
static void flush_and_maybe_remove (CACHETABLE ct, PAIR p) {
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
p->state = CTPAIR_WRITING;
// this needs to be done here regardless of whether the eviction occurs on the main thread or on
// a writer thread, because there may be a completion queue that needs access to this information
WORKITEM wi = &p->asyncwork;
......@@ -1302,10 +1255,6 @@ static void flush_and_maybe_remove (CACHETABLE ct, PAIR p) {
}
static void do_partial_eviction(CACHETABLE ct, PAIR p) {
// This really should be something else, but need to set it to something
// other than CTPAIR_IDLE so that other threads know to not hold
// ydb lock while waiting on this node
p->state = CTPAIR_WRITING;
PAIR_ATTR new_attr;
PAIR_ATTR old_attr = p->attr;
......@@ -1322,7 +1271,6 @@ static void do_partial_eviction(CACHETABLE ct, PAIR p) {
workqueue_wakeup_write(&ct->wq, 0);
}
p->state = CTPAIR_IDLE;
if (p->cq) {
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1);
......@@ -1404,14 +1352,6 @@ static void maybe_flush_some (CACHETABLE ct, long size) {
if (bytes_freed_estimate > 0) {
curr_in_clock->size_evicting_estimate = bytes_freed_estimate;
ct->size_evicting += bytes_freed_estimate;
// set the state before we post on writer thread (and give up cachetable lock)
// if we do not set the state here, any thread that tries
// to access this node in between when this function exits and the
// workitem gets placed on a writer thread will block (because it will think
// that the PAIR's lock will be released soon), and keep blocking
// until the workitem is done. This would be bad, as that thread may
// be holding the ydb lock.
curr_in_clock->state = CTPAIR_WRITING;
WORKITEM wi = &curr_in_clock->asyncwork;
workitem_init(wi, cachetable_partial_eviction, curr_in_clock);
workqueue_enq(&ct->wq, wi, 0);
......@@ -1466,7 +1406,6 @@ void toku_cachetable_maybe_flush_some(CACHETABLE ct) {
// Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
static PAIR cachetable_insert_at(CACHETABLE ct,
CACHEFILE cachefile, CACHEKEY key, void *value,
enum ctpair_state state,
u_int32_t fullhash,
PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback,
......@@ -1484,7 +1423,6 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
p->fullhash = fullhash;
p->dirty = dirty;
p->attr = attr;
p->state = state;
p->flush_callback = write_callback.flush_callback;
p->pe_callback = write_callback.pe_callback;
p->pe_est_callback = write_callback.pe_est_callback;
......@@ -1523,11 +1461,9 @@ static int cachetable_put_internal(
)
{
CACHETABLE ct = cachefile->cachetable;
int count=0;
{
PAIR p;
for (p=ct->table[fullhash&(cachefile->cachetable->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
// Ideally, we would like to just assert(FALSE) here
// and not return an error, but as of Dr. Noga,
......@@ -1550,8 +1486,7 @@ static int cachetable_put_internal(
ct,
cachefile,
key,
value,
CTPAIR_IDLE,
value,
fullhash,
attr,
write_callback,
......@@ -1568,10 +1503,8 @@ static int cachetable_put_internal(
static int cachetable_get_pair (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, PAIR* pv) {
CACHETABLE ct = cachefile->cachetable;
PAIR p;
int count = 0;
int r = -1;
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
*pv = p;
r = 0;
......@@ -1681,7 +1614,6 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p)
// The pair is not cloneable, just write the pair to disk
// we already have p->value_nb_mutex and we just do the write in our own thread.
p->state = CTPAIR_WRITING;
cachetable_write_locked_pair(ct, p); // keeps the PAIR's write lock
}
}
......@@ -1717,7 +1649,6 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
// The pair is not cloneable, just write the pair to disk
// we already have p->value_nb_mutex and we just do the write in our own thread.
// this will grab and release disk_nb_mutex
p->state = CTPAIR_WRITING;
cachetable_write_locked_pair(ct, p); // keeps the PAIR's write lock
}
// if we are checkpointing a PAIR, a cq should not exist
......@@ -1886,7 +1817,6 @@ int toku_cachetable_put_with_dep_pairs(
int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void*value, PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback
) {
WHEN_TRACE_CT(printf("%s:%d CT cachetable_put(%lld)=%p\n", __FILE__, __LINE__, key, value));
CACHETABLE ct = cachefile->cachetable;
cachetable_lock(ct);
cachetable_wait_write(ct);
......@@ -1930,7 +1860,6 @@ do_partial_fetch(
// As of Dr. No, only clean PAIRs may have pieces missing,
// so we do a sanity check here.
assert(!p->dirty);
p->state = CTPAIR_READING;
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex);
cachetable_unlock(ct);
......@@ -1939,7 +1868,6 @@ do_partial_fetch(
cachetable_lock(ct);
p->attr = new_attr;
cachetable_change_pair_attr(ct, old_attr, new_attr);
p->state = CTPAIR_IDLE;
nb_mutex_unlock(&p->disk_nb_mutex);
if (keep_pair_locked) {
// if the caller wants the pair to remain locked
......@@ -2046,8 +1974,6 @@ static void cachetable_fetch_pair(
// FIXME this should be enum cachetable_dirty, right?
int dirty = 0;
WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key));
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex);
cachetable_unlock(ct);
......@@ -2066,7 +1992,6 @@ static void cachetable_fetch_pair(
p->disk_data = disk_data;
p->attr = attr;
cachetable_add_pair_attr(ct, attr);
p->state = CTPAIR_IDLE;
nb_mutex_unlock(&p->disk_nb_mutex);
if (keep_pair_locked) {
// if the caller wants the pair to remain locked
......@@ -2161,27 +2086,12 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
{
CACHETABLE ct = cachefile->cachetable;
PAIR p;
int count=0;
cachetable_lock(ct);
cachetable_wait_write(ct);
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
// still have the cachetable lock
//
// at this point, we know the node is at least partially in memory,
// but we do not know if the user requires a partial fetch (because
// some basement node is missing or some message buffer needs
// to be decompressed. So, we check to see if a partial fetch is required
//
if (p->state == CTPAIR_READING) {
cachetable_wait_reading++;
}
else if (p->state == CTPAIR_WRITING) {
cachetable_wait_writing++;
}
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
pair_touch(p);
if (may_modify_value) {
......@@ -2216,11 +2126,9 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
// As of Dr. No, only clean PAIRs may have pieces missing,
// so we do a sanity check here.
assert(!p->dirty);
p->state = CTPAIR_READING;
do_partial_fetch(ct, cachefile, p, pf_callback, read_extraargs, TRUE);
}
WHEN_TRACE_CT(printf("%s:%d cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
goto got_value;
}
}
......@@ -2233,7 +2141,6 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
cachefile,
key,
zero_value,
CTPAIR_READING,
fullhash,
zero_attr,
write_callback,
......@@ -2267,7 +2174,6 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
if (sizep) *sizep = p->attr.size;
maybe_flush_some(ct, 0);
cachetable_unlock(ct);
WHEN_TRACE_CT(printf("%s:%d did fetch: cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
return 0;
}
......@@ -2284,12 +2190,10 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value) {
CACHETABLE ct = cachefile->cachetable;
PAIR p;
int count = 0;
int r = -1;
cachetable_lock(ct);
cachetable_maybe_get_and_pins++;
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
if (!p->checkpoint_pending && //If checkpoint pending, we would need to first write it, which would make it clean
p->dirty &&
......@@ -2316,12 +2220,10 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value) {
CACHETABLE ct = cachefile->cachetable;
PAIR p;
int count = 0;
int r = -1;
cachetable_lock(ct);
cachetable_maybe_get_and_pins++;
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
if (!p->checkpoint_pending && //If checkpoint pending, we would need to first write it, which would make it clean (if the pin would be used for writes. If would be used for read-only we could return it, but that would increase complexity)
nb_mutex_users(&p->value_nb_mutex) == 0
......@@ -2345,14 +2247,11 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
// size==0 means that the size didn't change.
{
CACHETABLE ct = cachefile->cachetable;
WHEN_TRACE_CT(printf("%s:%d unpin(%lld)", __FILE__, __LINE__, key));
//printf("%s:%d is dirty now=%d\n", __FILE__, __LINE__, dirty);
int count = 0;
int r = -1;
//assert(fullhash == toku_cachetable_hash(cachefile, key));
if (!have_ct_lock) cachetable_lock(ct);
for (PAIR p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
assert(nb_mutex_writers(&p->value_nb_mutex)>0);
// this is a client thread that is unlocking the PAIR
......@@ -2372,11 +2271,8 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
cachetable_change_pair_attr(ct, old_attr, new_attr);
p->attr = attr;
}
WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned));
{
if (flush) {
maybe_flush_some(ct, 0);
}
if (flush) {
maybe_flush_some(ct, 0);
}
r = 0; // we found one
break;
......@@ -2426,18 +2322,14 @@ int toku_cachetable_get_and_pin_nonblocking (
cachetable_lock(ct);
//
// Even though there is a risk that cachetable_wait_write may wait on a bunch
// of I/O to complete, we call this with the ydb lock held, because if we
// of I/O to complete, we call this because if we
// are in this situation where a lot of data is being evicted on writer threads
// then we are in a screw case anyway, so it should be ok to hold the ydb lock,
// until a bunch of data is written out.
// then we are in a screw case anyway.
//
cachetable_wait_write(ct);
int count = 0;
PAIR p;
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cf) {
//
// In Doofenshmirtz, we keep the root to leaf path pinned
// as we perform a query on a dictionary at any given time.
......@@ -2446,14 +2338,13 @@ int toku_cachetable_get_and_pin_nonblocking (
// So, if there is a write lock grabbed
// on the PAIR that we want to lock, then some expensive operation
// MUST be happening (read from disk, write to disk, flush, etc...),
// and we should run the unlockers and release the ydb lock
// and we should run the unlockers.
// Otherwise, if there is no write lock grabbed, we know there will
// be no stall, so we grab the lock and return to the user
//
if (!nb_mutex_writers(&p->value_nb_mutex) &&
(!may_modify_value || resolve_checkpointing_fast(p)))
{
//cachetable_hit++;
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
if (may_modify_value && p->checkpoint_pending) {
write_locked_pair_for_checkpoint(ct, p);
......@@ -2473,7 +2364,6 @@ int toku_cachetable_get_and_pin_nonblocking (
//
if (partial_fetch_required) {
cachetable_lock(ct);
p->state = CTPAIR_READING;
run_unlockers(unlockers); // The contract says the unlockers are run with the ct lock being held.
// Now wait for the I/O to occur.
do_partial_fetch(ct, cf, p, pf_callback, read_extraargs, FALSE);
......@@ -2488,13 +2378,6 @@ int toku_cachetable_get_and_pin_nonblocking (
else {
run_unlockers(unlockers); // The contract says the unlockers are run with the ct lock being held.
// Now wait for the I/O to occur.
// We need to obtain the lock (waiting for the write to finish), but then we only waited so we could wake up again
if (p->state == CTPAIR_READING) {
cachetable_wait_reading++;
}
else if (p->state == CTPAIR_WRITING) {
cachetable_wait_writing++;
}
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
if (may_modify_value && p->checkpoint_pending) {
write_locked_pair_for_checkpoint(ct, p);
......@@ -2533,7 +2416,6 @@ int toku_cachetable_get_and_pin_nonblocking (
cf,
key,
zero_value,
CTPAIR_READING,
fullhash,
zero_attr,
write_callback,
......@@ -2632,7 +2514,6 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
cf,
key,
zero_value,
CTPAIR_READING,
fullhash,
zero_attr,
write_callback,
......@@ -2661,7 +2542,6 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
BOOL partial_fetch_required = pf_req_callback(p->value_data, read_extraargs);
if (partial_fetch_required) {
p->state = CTPAIR_READING;
struct cachefile_partial_prefetch_args *MALLOC(cpargs);
cpargs->p = p;
cpargs->pf_callback = pf_callback;
......@@ -2688,13 +2568,11 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey) {
CACHETABLE ct = cachefile->cachetable;
PAIR *ptr_to_p,p;
int count = 0;
u_int32_t fullhash = toku_cachetable_hash(cachefile, oldkey);
cachetable_lock(ct);
for (ptr_to_p = &ct->table[fullhash&(ct->table_size-1)], p = *ptr_to_p;
p;
ptr_to_p = &p->hash_chain, p = *ptr_to_p) {
count++;
if (p->key.b==oldkey.b && p->cachefile==cachefile) {
*ptr_to_p = p->hash_chain;
p->key = newkey;
......@@ -2715,62 +2593,6 @@ void toku_cachefile_verify (CACHEFILE cf) {
toku_cachetable_verify(cf->cachetable);
}
// for gdb
int64_t UU() toku_cachetable_size_slowslow (CACHETABLE ct) {
// DANGER DANGER DANGER
// This only works if every entry in the cachetable is actually a
// FTNODE. Don't say you weren't warned.
PAIR p;
BOOL is_first = TRUE;
int64_t ret = 0;
for (p=ct->clock_head; ct->clock_head!=NULL && (p!=ct->clock_head || is_first); p=p->clock_next) {
is_first=FALSE;
ret += ftnode_memory_size((FTNODE) p->value_data);
}
return ret;
}
int64_t UU() toku_cachetable_size_discrepancy (CACHETABLE ct) {
// DANGER DANGER DANGER
// This only works if every entry in the cachetable is actually a
// FTNODE. Don't say you weren't warned.
PAIR p;
BOOL is_first = TRUE;
int64_t ret = 0;
for (p=ct->clock_head; ct->clock_head!=NULL && (p!=ct->clock_head || is_first); p=p->clock_next) {
is_first=FALSE;
ret += ftnode_memory_size((FTNODE) p->value_data) - p->attr.size;
}
return ret;
}
int64_t UU() toku_cachetable_size_discrepancy_pinned (CACHETABLE ct) {
// DANGER DANGER DANGER
// This only works if every entry in the cachetable is actually a
// FTNODE. Don't say you weren't warned.
PAIR p;
BOOL is_first = TRUE;
int64_t ret = 0;
for (p=ct->clock_head; ct->clock_head!=NULL && (p!=ct->clock_head || is_first); p=p->clock_next) {
is_first=FALSE;
if (nb_mutex_writers(&p->value_nb_mutex)) {
ret += ftnode_memory_size((FTNODE) p->value_data) - p->attr.size;
}
}
return ret;
}
int64_t UU() toku_cachetable_size_slow (CACHETABLE ct) {
PAIR p;
BOOL is_first = TRUE;
int64_t ret = 0;
for (p=ct->clock_head; ct->clock_head!=NULL && (p!=ct->clock_head || is_first); p=p->clock_next) {
is_first=FALSE;
ret += p->attr.size;
}
return ret;
}
void toku_cachetable_verify (CACHETABLE ct) {
cachetable_lock(ct);
......@@ -2994,7 +2816,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
}
}
/* Requires that no locks be held that are used by the checkpoint logic (ydb, etc.) */
/* Requires that no locks be held that are used by the checkpoint logic */
void
toku_cachetable_minicron_shutdown(CACHETABLE ct) {
int r = toku_minicron_shutdown(&ct->checkpointer);
......@@ -3049,11 +2871,9 @@ int toku_cachetable_unpin_and_remove (
// Removing something already present is OK.
CACHETABLE ct = cachefile->cachetable;
PAIR p;
int count = 0;
cachetable_lock(ct);
u_int32_t fullhash = toku_cachetable_hash(cachefile, key);
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it.
assert(nb_mutex_writers(&p->value_nb_mutex));
......@@ -3272,9 +3092,6 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
return 0;
}
// TODO: #1510 locking of cachetable is suspect
// verify correct algorithm overall
int
toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
// Requires: All three checkpoint-relevant locks must be held (see checkpoint.c).
......@@ -3282,7 +3099,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
// Use the begin_checkpoint callback to take necessary snapshots (header, btt)
// Mark every dirty node as "pending." ("Pending" means that the node must be
// written to disk before it can be modified.)
{
unsigned i;
cachetable_lock(ct);
......@@ -3405,12 +3221,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
ct->checkpoint_is_beginning = TRUE; // detect threadsafety bugs, must set checkpoint_is_beginning ...
invariant(ct->checkpoint_prohibited == 0); // ... before testing checkpoint_prohibited
invariant(ct->n_checkpoint_clones_running == 0);
u_int64_t leaf_sum = 0;
u_int64_t nonleaf_sum = 0;
u_int64_t rollback_sum = 0;
u_int64_t maybe_leaf_sum = 0;
u_int64_t maybe_nonleaf_sum = 0;
u_int64_t maybe_rollback_sum = 0;
for (i=0; i < ct->table_size; i++) {
PAIR p;
for (p = ct->table[i]; p; p=p->hash_chain) {
......@@ -3429,16 +3239,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
// we may end up clearing the pending bit before the
// current lock is ever released.
if (p->dirty || nb_mutex_writers(&p->value_nb_mutex)) {
if (p->dirty) {
leaf_sum += p->attr.leaf_size;
nonleaf_sum += p->attr.nonleaf_size;
rollback_sum += p->attr.rollback_size;
}
else {
maybe_leaf_sum += p->attr.leaf_size;
maybe_nonleaf_sum += p->attr.nonleaf_size;
maybe_rollback_sum += p->attr.rollback_size;
}
p->checkpoint_pending = TRUE;
if (ct->pending_head) {
ct->pending_head->pending_prev = p;
......@@ -3450,15 +3250,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
}
}
}
if (0) {
printf("leaf_sum: %"PRIu64"\n", leaf_sum);
printf("nonleaf_sum: %"PRIu64"\n", nonleaf_sum);
printf("rollback_sum: %"PRIu64"\n", rollback_sum);
printf("maybe_leaf_sum: %"PRIu64"\n", maybe_leaf_sum);
printf("maybe_nonleaf: %"PRIu64"\n", maybe_nonleaf_sum);
printf("maybe_rollback: %"PRIu64"\n", maybe_rollback_sum);
printf("*****************************\n");
}
rwlock_write_unlock(&ct->pending_lock);
//begin_checkpoint_userdata must be called AFTER all the pairs are marked as pending.
......@@ -3689,12 +3480,10 @@ void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_s
int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr,
int *dirty_ptr, long long *pin_ptr, long *size_ptr) {
PAIR p;
int count = 0;
int r = -1;
u_int32_t fullhash = toku_cachetable_hash(cf, key);
cachetable_lock(ct);
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
count++;
if (p->key.b == key.b && p->cachefile == cf) {
if (value_ptr)
*value_ptr = p->value_data;
......@@ -3941,9 +3730,6 @@ void
toku_cachetable_helgrind_ignore(void) {
VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_waittime, sizeof cachetable_waittime);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_wait_reading, sizeof cachetable_wait_reading);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_wait_writing, sizeof cachetable_wait_writing);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_puts, sizeof cachetable_puts);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_maybe_get_and_pins, sizeof cachetable_maybe_get_and_pins);
......
......@@ -161,7 +161,7 @@ typedef int (*CACHETABLE_PARTIAL_EVICTION_CALLBACK)(void *ftnode_pv, PAIR_ATTR o
// a partial fetch. If this function returns FALSE, then the PAIR's value is returned to the caller as is.
//
// An alternative to having this callback is to always call CACHETABLE_PARTIAL_FETCH_CALLBACK, and let
// CACHETABLE_PARTIAL_FETCH_CALLBACK decide whether to possibly release the ydb lock and perform I/O.
// CACHETABLE_PARTIAL_FETCH_CALLBACK decide whether to do any partial fetching or not.
// There is no particular reason why this alternative was not chosen.
// Requires: a read lock to be held on the PAIR
typedef BOOL (*CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK)(void *ftnode_pv, void *read_extraargs);
......@@ -459,11 +459,6 @@ int toku_cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, CACHEFILE cf,
// Verify the whole cachetable that the cachefile is in. Slow.
void toku_cachefile_verify (CACHEFILE cf);
int64_t toku_cachetable_size_slowslow (CACHETABLE t);
int64_t toku_cachetable_size_discrepancy (CACHETABLE t);
int64_t toku_cachetable_size_discrepancy_pinned (CACHETABLE t);
int64_t toku_cachetable_size_slow (CACHETABLE t);
// Verify the cachetable. Slow.
void toku_cachetable_verify (CACHETABLE t);
......@@ -480,9 +475,6 @@ u_int64_t toku_cachefile_size(CACHEFILE cf);
typedef enum {
CT_MISS = 0,
CT_MISSTIME, // how many usec spent waiting for disk read because of cache miss
CT_WAITTIME, // how many usec spent waiting for another thread to release cache line
CT_WAIT_READING,
CT_WAIT_WRITING,
CT_PUTS, // how many times has a newly created node been put into the cachetable?
CT_PREFETCHES, // how many times has a block been prefetched into the cachetable?
CT_MAYBE_GET_AND_PINS, // how many times has maybe_get_and_pin(_clean) been called?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment