Commit 9bb62a9b authored by Dave Wells's avatar Dave Wells Committed by Yoni Fogel

changes from code review [t:2843]

git-svn-id: file:///svn/toku/tokudb@25761 c7de825b-a66e-492c-adef-691d508d4ae1
parent be63bc38
......@@ -16,7 +16,6 @@ struct __toku_indexer_internal {
DB *src_db;
int N;
DB **dest_dbs; /* [N] */
uint32_t *db_flags;
uint32_t indexer_flags;
void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra);
void *error_extra;
......
......@@ -50,6 +50,7 @@ associate_indexer_with_hot_dbs(DB_INDEXER *indexer, DB *dest_dbs[], int N) {
int result2 = toku_db_set_indexer(dest_dbs[j], NULL);
lazy_assert(result2 == 0);
}
break;
}
}
return result;
......@@ -114,7 +115,7 @@ toku_indexer_create_indexer(DB_ENV *env,
DB *src_db,
int N,
DB *dest_dbs[N],
uint32_t db_flags[N],
uint32_t db_flags[N] UU(),
uint32_t indexer_flags)
{
int rval = 0;
......@@ -134,7 +135,6 @@ toku_indexer_create_indexer(DB_ENV *env,
indexer->i->src_db = src_db;
indexer->i->N = N;
indexer->i->dest_dbs = dest_dbs;
indexer->i->db_flags = db_flags;
indexer->i->indexer_flags = indexer_flags;
indexer->i->loop_mod = 1000; // call poll_func every 1000 rows
indexer->i->estimated_rows = 0;
......@@ -214,6 +214,8 @@ toku_indexer_set_error_callback(DB_INDEXER *indexer,
return 0;
}
// returns TRUE if right of le_cursor
// returns FALSE if left or equal to le_cursor
int
toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, DB *db, const DBT *key) {
return is_key_right_of_le_cursor(indexer->i->lec, key, db);
......@@ -238,6 +240,7 @@ build_index(DB_INDEXER *indexer) {
result = 0; // all done, normal way to exit loop successfully
}
else {
// this code may be faster ule malloc/free is not done every time
ULEHANDLE ule = toku_ule_create(le.data);
for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) {
DB *db = indexer->i->dest_dbs[which_db];
......@@ -250,8 +253,11 @@ build_index(DB_INDEXER *indexer) {
toku_ule_free(ule);
}
toku_ydb_unlock_and_yield(1000); // if there is lock contention, then sleep for 1 millisecond after the unlock
// if there is lock contention, then sleep for 1 millisecond after the unlock
// note: the value 1000 was empirically determined to provide good query performance
// during hotindexing
toku_ydb_unlock_and_yield(1000);
if (result == 0)
result = maybe_call_poll_func(indexer, loop_count);
if (result != 0)
......@@ -305,12 +311,12 @@ close_indexer(DB_INDEXER *indexer) {
require_local_checkpoint(brt, tokutxn);
}
// Disassociate the indexer from the hot dbs
disassociate_indexer_from_hot_dbs(indexer);
// Disassociate the indexer from the hot db and free_indexer
// disassociate_indexer_from_hot_dbs(indexer);
free_indexer(indexer);
}
toku_ydb_unlock();
free_indexer(indexer);
if ( r == 0 ) {
(void) toku_sync_fetch_and_increment_uint64(&status.close);
} else {
......@@ -324,7 +330,13 @@ abort_indexer(DB_INDEXER *indexer) {
(void) toku_sync_fetch_and_decrement_uint32(&status.current);
(void) toku_sync_fetch_and_increment_uint64(&status.abort);
free_indexer(indexer);
toku_ydb_lock();
{
// Disassociate the indexer from the hot db and free_indexer
disassociate_indexer_from_hot_dbs(indexer);
free_indexer(indexer);
}
toku_ydb_unlock();
return 0;
}
......
......@@ -1466,6 +1466,22 @@ locked_env_checkpointing_set_period(DB_ENV * env, u_int32_t seconds) {
toku_ydb_lock(); int r = env_checkpointing_set_period(env, seconds); toku_ydb_unlock(); return r;
}
static int
locked_env_create_indexer(DB_ENV *env,
DB_TXN *txn,
DB_INDEXER **indexerp,
DB *src_db,
int N,
DB *dest_dbs[N],
uint32_t db_flags[N],
uint32_t indexer_flags) {
toku_ydb_lock();
int r = toku_indexer_create_indexer(env, txn, indexerp, src_db, N, dest_dbs, db_flags, indexer_flags);
toku_ydb_unlock();
return r;
}
static int
env_checkpointing_get_period(DB_ENV * env, u_int32_t *seconds) {
HANDLE_PANICKED_ENV(env);
......@@ -2106,9 +2122,10 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) {
SENV(txn_stat);
result->txn_begin = locked_txn_begin;
SENV(set_redzone);
#undef SENV
SENV(create_indexer);
// note : create_loader should use SENV
result->create_loader = toku_loader_create_loader;
result->create_indexer = toku_indexer_create_indexer;
#undef SENV
MALLOC(result->i);
if (result->i == 0) { r = ENOMEM; goto cleanup; }
......@@ -5608,8 +5625,15 @@ locked_db_get_fragmentation(DB * db, TOKU_DB_FRAGMENTATION report) {
int
toku_db_set_indexer(DB *db, DB_INDEXER * indexer) {
db->i->indexer = indexer;
return 0;
int r = 0;
if ( db->i->indexer != NULL && indexer != NULL ) {
// you are trying to overwrite a valid indexer
r = EINVAL;
}
else {
db->i->indexer = indexer;
}
return r;
}
static int
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment