diff --git a/db-benchmark-test/scanrace.c b/db-benchmark-test/scanrace.c index 1012437f8df4132acde7d9cee394f9ea9ea13484..52ee9ef3055bdd2b81098be17e61940240f6f84c 100644 --- a/db-benchmark-test/scanrace.c +++ b/db-benchmark-test/scanrace.c @@ -122,7 +122,7 @@ static void scanrace_lwc (void) { r = db->cursor(db, tid, &dbc, 0); assert(r==0); long rowcounter=0; while (0 == (r = dbc->c_getf_next(dbc, DB_PRELOCKED, counttotalbytes, &e))) { - if (rowcounter%1024==0) { printf("."); fflush(stdout); } + if (rowcounter%128==0) { printf("."); fflush(stdout); } rowcounter++; if (limitcount>0 && rowcounter>=limitcount) break; } diff --git a/newbrt/brt.c b/newbrt/brt.c index 63313e1a6d0e565276bc3b655a25233d0e92ab1b..a26ef29240ea715ac1e65ce09350fc97a6a5426d 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -1688,7 +1688,7 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil if (BNC_NBYTESINBUF(node, childnum) == 0) { BLOCKNUM childblocknum = BNC_BLOCKNUM(node, childnum); u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnum); - void *child_v; + void *child_v = NULL; int r = toku_cachetable_maybe_get_and_pin(t->cf, childblocknum, childfullhash, &child_v); if (r!=0) { // It's not in main memory, so diff --git a/newbrt/cachetable.c b/newbrt/cachetable.c index f96a89daa9f170a630393a68dc2485bdcc7e25a7..2bc99e5960a42d3ea0fa610f4f380bff5dd00341 100644 --- a/newbrt/cachetable.c +++ b/newbrt/cachetable.c @@ -14,6 +14,10 @@ #include "cachetable-rwlock.h" #include "toku_worker.h" +#if !defined(TOKU_CACHETABLE_DO_EVICT_FROM_WRITER) +#error +#endif + // use worker threads 0->no 1->yes #define DO_WORKER_THREAD 1 #if DO_WORKER_THREAD @@ -51,8 +55,10 @@ struct ctpair { long size; enum ctpair_state state; enum cachetable_dirty dirty; - char verify_flag; // Used in verify_cachetable() - BOOL write_me; + + char verify_flag; // Used in verify_cachetable() + BOOL write_me; // write_pair + BOOL remove_me; // write_pair u_int32_t fullhash; @@ -60,7 +66,7 @@ struct ctpair { CACHETABLE_FETCH_CALLBACK fetch_callback; void *extraargs; - PAIR next,prev; // In LRU list. + PAIR next,prev; // In LRU list. PAIR hash_chain; LSN modified_lsn; // What was the LSN when modified (undefined if not dirty) @@ -635,7 +641,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) { if (p->cq) workqueue_enq(p->cq, &p->asyncwork, 1); else - cachetable_complete_write_pair(ct, p, TRUE); + cachetable_complete_write_pair(ct, p, p->remove_me); } // complete the write of a pair by reseting the writing flag, adjusting the write @@ -665,6 +671,7 @@ static void flush_and_maybe_remove (CACHETABLE ct, PAIR p, BOOL write_me) { p->state = CTPAIR_WRITING; ct->size_writing += p->size; assert(ct->size_writing >= 0); p->write_me = write_me; + p->remove_me = TRUE; #if DO_WORKER_THREAD WORKITEM wi = &p->asyncwork; workitem_init(wi, cachetable_writer, p); @@ -673,6 +680,9 @@ static void flush_and_maybe_remove (CACHETABLE ct, PAIR p, BOOL write_me) { if (!p->write_me || (!ctpair_pinned(&p->rwlock) && !p->dirty)) { cachetable_write_pair(ct, p); } else { +#if !TOKU_CACHETABLE_DO_EVICT_FROM_WRITER + p->remove_me = FALSE; // run the remove on the main thread +#endif workqueue_enq(&ct->wq, wi, 0); } #else @@ -709,6 +719,12 @@ again: return r; } +void toku_cachetable_maybe_flush_some(CACHETABLE ct) { + cachetable_lock(ct); + maybe_flush_some(ct, 0); + cachetable_unlock(ct); +} + static PAIR cachetable_insert_at(CACHETABLE ct, CACHEFILE cachefile, CACHEKEY key, void *value, enum ctpair_state state, @@ -863,6 +879,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3 CACHETABLE ct = cachefile->cachetable; PAIR p; int count = 0; + int r = -1; cachetable_lock(ct); for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { count++; @@ -870,15 +887,14 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3 *value = p->value; ctpair_read_lock(&p->rwlock, ct->mutex); lru_touch(ct,p); - cachetable_unlock(ct); - note_hash_count(count); + r = 0; //printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value); - return 0; + break; } } cachetable_unlock(ct); note_hash_count(count); - return -1; + return r; } @@ -888,6 +904,7 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, WHEN_TRACE_CT(printf("%s:%d unpin(%lld)", __FILE__, __LINE__, key)); //printf("%s:%d is dirty now=%d\n", __FILE__, __LINE__, dirty); int count = 0; + int r = -1; //assert(fullhash == toku_cachetable_hash(cachefile, key)); cachetable_lock(ct); for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { @@ -903,20 +920,18 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, } WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned)); { - int r; if ((r=maybe_flush_some(ct, 0))) { cachetable_unlock(ct); return r; } } - cachetable_unlock(ct); - note_hash_count(count); - return 0; + r = 0; // we found one + break; } } cachetable_unlock(ct); note_hash_count(count); - return -1; + return r; } int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, diff --git a/newbrt/cachetable.h b/newbrt/cachetable.h index ac45a8146c92031c961b1e43bdea07e3f7752b98..b26b268dcfe1eb52c3df585be98d95cdf092ec05 100644 --- a/newbrt/cachetable.h +++ b/newbrt/cachetable.h @@ -216,6 +216,8 @@ int toku_graceful_delete(const char *db_fname); void toku_graceful_lock_init(void); void toku_graceful_lock_destroy(void); +#define TOKU_CACHETABLE_DO_EVICT_FROM_WRITER 0 +void toku_cachetable_maybe_flush_some(CACHETABLE ct); #endif diff --git a/newbrt/tests/cachetable-rename-test.c b/newbrt/tests/cachetable-rename-test.c index 3a5503387a017cb12dc6e113769353bbdfbd1db9..dd21843658a243c66adf5eb67b5984a0853bd393 100644 --- a/newbrt/tests/cachetable-rename-test.c +++ b/newbrt/tests/cachetable-rename-test.c @@ -26,6 +26,12 @@ static inline void test_mutex_unlock() { int r = toku_pthread_mutex_unlock(&test_mutex); assert(r == 0); } +static void maybe_flush(CACHETABLE t) { +#if !TOKU_CACHETABLE_DO_EVICT_FROM_WRITER + toku_cachetable_maybe_flush_some(t); +#endif +} + enum { KEYLIMIT = 4, TRIALLIMIT=256000 }; static CACHEKEY keys[KEYLIMIT]; static void* vals[KEYLIMIT]; @@ -100,7 +106,7 @@ static void test_rename (void) { test_mutex_lock(); while (n_keys >= KEYLIMIT) { test_mutex_unlock(); - toku_pthread_yield(); + toku_pthread_yield(); maybe_flush(t); test_mutex_lock(); } assert(n_keys<KEYLIMIT); diff --git a/newbrt/tests/cachetable-test.c b/newbrt/tests/cachetable-test.c index ae30d08c58e8c52bb1b362eb674729772dd0b31e..709a4607c5f73fddaa44fad61a3d8a694ced6b6b 100644 --- a/newbrt/tests/cachetable-test.c +++ b/newbrt/tests/cachetable-test.c @@ -155,6 +155,12 @@ static int fetch (CACHEFILE f, CACHEKEY key, u_int32_t fullhash __attribute__((_ return 0; } +static void maybe_flush(CACHETABLE t) { +#if !TOKU_CACHETABLE_DO_EVICT_FROM_WRITER + toku_cachetable_maybe_flush_some(t); +#endif +} + // verify that a sequence of cachetable operations causes a particular sequence of // callbacks @@ -218,7 +224,7 @@ static void test0 (void) { assert(r==0); test_mutex_lock(); while (expect_n_flushes != 0) { - test_mutex_unlock(); toku_pthread_yield(); test_mutex_lock(); + test_mutex_unlock(); toku_pthread_yield(); maybe_flush(t); test_mutex_lock(); } assert(expect_n_flushes==0); test_mutex_unlock(); @@ -228,7 +234,7 @@ static void test0 (void) { assert(r==0); test_mutex_lock(); while (expect_n_flushes != 0) { - test_mutex_unlock(); toku_pthread_yield(); test_mutex_lock(); + test_mutex_unlock(); toku_pthread_yield(); maybe_flush(t); test_mutex_lock(); } assert(expect_n_flushes==0); test_mutex_unlock(); @@ -260,7 +266,7 @@ static void test0 (void) { assert(strcmp(((struct item *)item_v)->something,"something")==0); test_mutex_lock(); while (expect_n_flushes != 0) { - test_mutex_unlock(); toku_pthread_yield(); test_mutex_lock(); + test_mutex_unlock(); toku_pthread_yield(); maybe_flush(t); test_mutex_lock(); } assert(expect_n_flushes==0); test_mutex_unlock(); @@ -565,10 +571,13 @@ static void test_size_flush_callback(CACHEFILE f, BOOL rename_p __attribute__((__unused__))) { if (test_size_debug && verbose) printf("test_size_flush %p %" PRId64 " %p %ld %u %u\n", f, key.b, value, size, (unsigned)do_write, (unsigned)keep); if (keep) { - assert(do_write != 0); - test_mutex_lock(); - test_size_flush_key = key; - test_mutex_unlock(); + if (do_write) { + test_mutex_lock(); + test_size_flush_key = key; + test_mutex_unlock(); + } + } else { + assert(!do_write); } } @@ -663,7 +672,7 @@ static void test_size_flush() { int n_entries, hash_size; long size_current, size_limit; toku_cachetable_get_state(t, &n_entries, &hash_size, &size_current, &size_limit); while (n_entries != min2(i+1, n)) { - toku_pthread_yield(); + toku_pthread_yield(); maybe_flush(t); toku_cachetable_get_state(t, &n_entries, 0, 0, 0); } assert(n_entries == min2(i+1, n)); diff --git a/newbrt/tests/cachetable-test2.c b/newbrt/tests/cachetable-test2.c index 5480c97f35e5b980d9891f5ca407153303a0ecd7..5852f63ca727952ac96c1e7381707a1438ddc022 100644 --- a/newbrt/tests/cachetable-test2.c +++ b/newbrt/tests/cachetable-test2.c @@ -26,6 +26,12 @@ static inline void test_mutex_unlock() { int r = toku_pthread_mutex_unlock(&test_mutex); assert(r == 0); } +static void maybe_flush(CACHETABLE t) { +#if !TOKU_CACHETABLE_DO_EVICT_FROM_WRITER + toku_cachetable_maybe_flush_some(t); +#endif +} + static const int test_object_size = 1; static CACHETABLE ct; @@ -47,10 +53,10 @@ static void print_ints(void) { printf("}\n"); } -static void item_becomes_present(CACHEFILE cf, CACHEKEY key) { +static void item_becomes_present(CACHETABLE thect, CACHEFILE cf, CACHEKEY key) { test_mutex_lock(); while (n_present >= N_PRESENT_LIMIT) { - test_mutex_unlock(); toku_pthread_yield(); test_mutex_lock(); + test_mutex_unlock(); toku_pthread_yield(); maybe_flush(thect); test_mutex_lock(); } assert(n_present<N_PRESENT_LIMIT); present_items[n_present].cf = cf; @@ -159,7 +165,7 @@ static void test_chaining (void) { u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i)); r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, fetch_forchain, (void*)i); assert(r==0); - item_becomes_present(f[fnum], make_blocknum(i)); + item_becomes_present(ct, f[fnum], make_blocknum(i)); r = toku_cachetable_unpin(f[fnum], make_blocknum(i), fhash, CACHETABLE_CLEAN, test_object_size); assert(r==0); //print_ints(); @@ -204,7 +210,7 @@ static void test_chaining (void) { r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, fetch_forchain, (void*)i); assert(r==0 || r==-1); if (r==0) { - item_becomes_present(f[fnum], make_blocknum(i)); + item_becomes_present(ct, f[fnum], make_blocknum(i)); //print_ints(); //cachetable_print_state(ct); }