Commit 89b85f6e authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3453 merge tokudb-engine.3453 to tokudb-engine refs[t:3453]

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@35394 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6a79c691
...@@ -43,24 +43,6 @@ static const char *ha_tokudb_exts[] = { ...@@ -43,24 +43,6 @@ static const char *ha_tokudb_exts[] = {
NullS NullS
}; };
#define lockretryN(N) \
for (ulonglong lockretrycount=0; lockretrycount<(N/(1<<3) + 1); lockretrycount++)
#define lockretry_wait \
if (error != DB_LOCK_NOTGRANTED) { \
break; \
} \
if (tokudb_debug & TOKUDB_DEBUG_LOCKRETRY) { \
TOKUDB_TRACE("%s count=%d\n", __FUNCTION__, (int) lockretrycount); \
} \
if (lockretrycount%200 == 0) { \
if (ha_thd()->killed) { \
error = DB_LOCK_NOTGRANTED; \
break; \
} \
} \
usleep((lockretrycount<4 ? (1<<lockretrycount) : (1<<3)) * 1024); \
// //
// This offset is calculated starting from AFTER the NULL bytes // This offset is calculated starting from AFTER the NULL bytes
// //
...@@ -1722,7 +1704,7 @@ int ha_tokudb::initialize_share( ...@@ -1722,7 +1704,7 @@ int ha_tokudb::initialize_share(
init_auto_increment(); init_auto_increment();
} }
if (may_table_be_empty()) { if (may_table_be_empty(NULL)) {
share->try_table_lock = true; share->try_table_lock = true;
} }
else { else {
...@@ -3058,15 +3040,18 @@ bool ha_tokudb::check_if_incompatible_data(HA_CREATE_INFO * info, uint table_cha ...@@ -3058,15 +3040,18 @@ bool ha_tokudb::check_if_incompatible_data(HA_CREATE_INFO * info, uint table_cha
// It is NOT meant to be a 100% check for emptiness. // It is NOT meant to be a 100% check for emptiness.
// This is used for a bulk load optimization. // This is used for a bulk load optimization.
// //
bool ha_tokudb::may_table_be_empty() { bool ha_tokudb::may_table_be_empty(DB_TXN *txn) {
int error; int error;
bool ret_val = false; bool ret_val = false;
DBC* tmp_cursor = NULL; DBC* tmp_cursor = NULL;
DB_TXN* txn = NULL; DB_TXN* tmp_txn = NULL;
error = db_env->txn_begin(db_env, 0, &txn, 0); if (txn == NULL) {
if (error) { error = db_env->txn_begin(db_env, 0, &tmp_txn, 0);
goto cleanup; if (error) {
goto cleanup;
}
txn = tmp_txn;
} }
error = share->file->cursor(share->file, txn, &tmp_cursor, 0); error = share->file->cursor(share->file, txn, &tmp_cursor, 0);
...@@ -3087,9 +3072,9 @@ cleanup: ...@@ -3087,9 +3072,9 @@ cleanup:
assert(r==0); assert(r==0);
tmp_cursor = NULL; tmp_cursor = NULL;
} }
if (txn) { if (tmp_txn) {
commit_txn(txn, 0); commit_txn(tmp_txn, 0);
txn = NULL; tmp_txn = NULL;
} }
return ret_val; return ret_val;
} }
...@@ -3108,7 +3093,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { ...@@ -3108,7 +3093,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
lock_count = 0; lock_count = 0;
if (share->try_table_lock) { if (share->try_table_lock) {
if (get_prelock_empty(thd) && may_table_be_empty()) { if (get_prelock_empty(thd) && may_table_be_empty(transaction)) {
if (using_ignore || get_load_save_space(thd)) { if (using_ignore || get_load_save_space(thd)) {
acquire_table_lock(transaction, lock_write); acquire_table_lock(transaction, lock_write);
} }
...@@ -3671,22 +3656,19 @@ int ha_tokudb::insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk ...@@ -3671,22 +3656,19 @@ int ha_tokudb::insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk
u_int32_t put_flags = mult_put_flags[primary_key]; u_int32_t put_flags = mult_put_flags[primary_key];
THD *thd = ha_thd(); THD *thd = ha_thd();
uint curr_num_DBs = table->s->keys + test(hidden_primary_key); uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
ulonglong wait_lock_time = get_write_lock_wait_time(thd);
assert(curr_num_DBs == 1); assert(curr_num_DBs == 1);
set_main_dict_put_flags(thd, &put_flags, true); set_main_dict_put_flags(thd, &put_flags, true);
lockretryN(wait_lock_time){ //XXX Get rid of the lock retry logic
error = share->file->put( error = share->file->put(
share->file, share->file,
txn, txn,
pk_key, pk_key,
pk_val, pk_val,
put_flags put_flags
); );
lockretry_wait;
}
if (error) { if (error) {
last_dup_key = primary_key; last_dup_key = primary_key;
...@@ -3700,25 +3682,22 @@ cleanup: ...@@ -3700,25 +3682,22 @@ cleanup:
int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN* txn, THD* thd) { int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN* txn, THD* thd) {
int error = 0; int error = 0;
uint curr_num_DBs = share->num_DBs; uint curr_num_DBs = share->num_DBs;
ulonglong wait_lock_time = get_write_lock_wait_time(thd);
set_main_dict_put_flags(thd, &mult_put_flags[primary_key], false); set_main_dict_put_flags(thd, &mult_put_flags[primary_key], false);
lockretryN(wait_lock_time){ //XXX: Get rid of the lock retry logic
error = db_env->put_multiple( error = db_env->put_multiple(
db_env, db_env,
share->key_file[primary_key], share->key_file[primary_key],
txn, txn,
pk_key, pk_key,
pk_val, pk_val,
curr_num_DBs, curr_num_DBs,
share->key_file, share->key_file,
mult_key_dbt, mult_key_dbt,
mult_rec_dbt, mult_rec_dbt,
mult_put_flags mult_put_flags
); );
lockretry_wait;
}
// //
// We break if we hit an error, unless it is a dup key error // We break if we hit an error, unless it is a dup key error
...@@ -3923,7 +3902,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3923,7 +3902,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
uint curr_num_DBs; uint curr_num_DBs;
ulonglong wait_lock_time = get_write_lock_wait_time(thd);
LINT_INIT(error); LINT_INIT(error);
bzero((void *) &prim_key, sizeof(prim_key)); bzero((void *) &prim_key, sizeof(prim_key));
...@@ -4034,25 +4012,25 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -4034,25 +4012,25 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
if (error) { goto cleanup; } if (error) { goto cleanup; }
set_main_dict_put_flags(thd, &mult_put_flags[primary_key], false); set_main_dict_put_flags(thd, &mult_put_flags[primary_key], false);
lockretryN(wait_lock_time){
error = db_env->update_multiple( //XXX: Get rid of the lock retry logic
db_env, error = db_env->update_multiple(
share->key_file[primary_key], db_env,
txn, share->key_file[primary_key],
&old_prim_key, txn,
&old_prim_row, &old_prim_key,
&prim_key, &old_prim_row,
&prim_row, &prim_key,
curr_num_DBs, &prim_row,
share->key_file, curr_num_DBs,
mult_put_flags, share->key_file,
2*curr_num_DBs, mult_put_flags,
mult_key_dbt, 2*curr_num_DBs,
curr_num_DBs, mult_key_dbt,
mult_rec_dbt curr_num_DBs,
); mult_rec_dbt
lockretry_wait; );
}
if (error == DB_KEYEXIST) { if (error == DB_KEYEXIST) {
last_dup_key = primary_key; last_dup_key = primary_key;
} }
...@@ -4095,7 +4073,6 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -4095,7 +4073,6 @@ int ha_tokudb::delete_row(const uchar * record) {
DBT row, prim_key; DBT row, prim_key;
bool has_null; bool has_null;
THD* thd = ha_thd(); THD* thd = ha_thd();
ulonglong wait_lock_time = get_write_lock_wait_time(thd);
uint curr_num_DBs; uint curr_num_DBs;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
...@@ -4117,20 +4094,19 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -4117,20 +4094,19 @@ int ha_tokudb::delete_row(const uchar * record) {
if ((error = pack_row(&row, (const uchar *) record, primary_key))){ if ((error = pack_row(&row, (const uchar *) record, primary_key))){
goto cleanup; goto cleanup;
} }
lockretryN(wait_lock_time){
error = db_env->del_multiple( //XXX: Get rid of the lock retry logic
db_env, error = db_env->del_multiple(
share->key_file[primary_key], db_env,
transaction, share->key_file[primary_key],
&prim_key, transaction,
&row, &prim_key,
curr_num_DBs, &row,
share->key_file, curr_num_DBs,
mult_key_dbt, share->key_file,
mult_del_flags mult_key_dbt,
); mult_del_flags
lockretry_wait; );
}
if (error) { if (error) {
DBUG_PRINT("error", ("Got error %d", error)); DBUG_PRINT("error", ("Got error %d", error));
...@@ -4242,7 +4218,7 @@ cleanup: ...@@ -4242,7 +4218,7 @@ cleanup:
// 0 on success // 0 on success
// error otherwise // error otherwise
// //
int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) { int ha_tokudb::prepare_index_key_scan(const uchar * key, uint key_len) {
int error = 0; int error = 0;
DBT start_key, end_key; DBT start_key, end_key;
THD* thd = ha_thd(); THD* thd = ha_thd();
...@@ -4252,14 +4228,13 @@ int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) { ...@@ -4252,14 +4228,13 @@ int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) {
pack_key(&end_key, active_index, prelocked_right_range, key, key_len, COL_POS_INF); pack_key(&end_key, active_index, prelocked_right_range, key, key_len, COL_POS_INF);
prelocked_right_range_size = end_key.size; prelocked_right_range_size = end_key.size;
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_pre_acquire_range_lock( error = cursor->c_pre_acquire_range_lock(
cursor, cursor,
&start_key, &start_key,
&end_key &end_key
); );
lockretry_wait;
}
if (error){ if (error){
goto cleanup; goto cleanup;
} }
...@@ -4271,7 +4246,8 @@ cleanup: ...@@ -4271,7 +4246,8 @@ cleanup:
if (error) { if (error) {
last_cursor_error = error; last_cursor_error = error;
// //
// cursor should be initialized here, but in case it is not, we still check // cursor should be initialized here, but in case it is not,
// we still check
// //
if (cursor) { if (cursor) {
int r = cursor->c_close(cursor); int r = cursor->c_close(cursor);
...@@ -4302,7 +4278,6 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -4302,7 +4278,6 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
int error; int error;
THD* thd = ha_thd(); THD* thd = ha_thd();
DBUG_PRINT("enter", ("table: '%s' key: %d", table_share->table_name.str, keynr)); DBUG_PRINT("enter", ("table: '%s' key: %d", table_share->table_name.str, keynr));
read_lock_wait_time = get_read_lock_wait_time(ha_thd());
/* /*
Under some very rare conditions (like full joins) we may already have Under some very rare conditions (like full joins) we may already have
...@@ -4526,17 +4501,17 @@ int ha_tokudb::read_full_row(uchar * buf) { ...@@ -4526,17 +4501,17 @@ int ha_tokudb::read_full_row(uchar * buf) {
// //
// assumes key is stored in this->last_key // assumes key is stored in this->last_key
// //
lockretryN(read_lock_wait_time){
error = share->file->getf_set( //XXX: Get rid of the lock retry logic
share->file, error = share->file->getf_set(
transaction, share->file,
cursor_flags, transaction,
&last_key, cursor_flags,
smart_dbt_callback_rowread_ptquery, &last_key,
&info smart_dbt_callback_rowread_ptquery,
); &info
lockretry_wait; );
}
if (error) { if (error) {
if (error == DB_LOCK_NOTGRANTED) { if (error == DB_LOCK_NOTGRANTED) {
error = HA_ERR_LOCK_WAIT_TIMEOUT; error = HA_ERR_LOCK_WAIT_TIMEOUT;
...@@ -4634,34 +4609,31 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -4634,34 +4609,31 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
case HA_READ_KEY_EXACT: /* Find first record else error */ case HA_READ_KEY_EXACT: /* Find first record else error */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF);
ir_info.orig_key = &lookup_key; ir_info.orig_key = &lookup_key;
lockretryN(read_lock_wait_time){
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info); //XXX: Get rid of the lock retry logic
lockretry_wait; error = cursor->c_getf_set_range(cursor, flags,
} &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info);
if (ir_info.cmp) { if (ir_info.cmp) {
error = DB_NOTFOUND; error = DB_NOTFOUND;
} }
break; break;
case HA_READ_AFTER_KEY: /* Find next rec. after key-record */ case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF);
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_set_range(cursor, flags,
lockretry_wait; &lookup_key, SMART_DBT_CALLBACK, &info);
}
break; break;
case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */ case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF);
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_set_range_reverse(cursor, flags,
lockretry_wait; &lookup_key, SMART_DBT_CALLBACK, &info);
}
break; break;
case HA_READ_KEY_OR_NEXT: /* Record or next record */ case HA_READ_KEY_OR_NEXT: /* Record or next record */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF);
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_set_range(cursor, flags,
lockretry_wait; &lookup_key, SMART_DBT_CALLBACK, &info);
}
break; break;
// //
// This case does not seem to ever be used, it is ok for it to be slow // This case does not seem to ever be used, it is ok for it to be slow
...@@ -4669,10 +4641,9 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -4669,10 +4641,9 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
case HA_READ_KEY_OR_PREV: /* Record or previous */ case HA_READ_KEY_OR_PREV: /* Record or previous */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF);
ir_info.orig_key = &lookup_key; ir_info.orig_key = &lookup_key;
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info); error = cursor->c_getf_set_range(cursor, flags,
lockretry_wait; &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info);
}
if (error == DB_NOTFOUND) { if (error == DB_NOTFOUND) {
error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK, &info);
} }
...@@ -4682,18 +4653,15 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -4682,18 +4653,15 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
break; break;
case HA_READ_PREFIX_LAST_OR_PREV: /* Last or prev key with the same prefix */ case HA_READ_PREFIX_LAST_OR_PREV: /* Last or prev key with the same prefix */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF);
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_set_range_reverse(cursor, flags,
lockretry_wait; &lookup_key, SMART_DBT_CALLBACK, &info);
}
break; break;
case HA_READ_PREFIX_LAST: case HA_READ_PREFIX_LAST:
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF);
ir_info.orig_key = &lookup_key; ir_info.orig_key = &lookup_key;
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info); error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info);
lockretry_wait;
}
if (ir_info.cmp) { if (ir_info.cmp) {
error = DB_NOTFOUND; error = DB_NOTFOUND;
} }
...@@ -5052,16 +5020,14 @@ int ha_tokudb::get_next(uchar* buf, int direction) { ...@@ -5052,16 +5020,14 @@ int ha_tokudb::get_next(uchar* buf, int direction) {
// call c_getf_next with purpose of filling in range_query_buff // call c_getf_next with purpose of filling in range_query_buff
// //
if (direction > 0) { if (direction > 0) {
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_next(cursor, flags, smart_dbt_bf_callback, &bf_info); error = cursor->c_getf_next(cursor, flags,
lockretry_wait; smart_dbt_bf_callback, &bf_info);
}
} }
else { else {
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_prev(cursor, flags, smart_dbt_bf_callback, &bf_info); error = cursor->c_getf_prev(cursor, flags,
lockretry_wait; smart_dbt_bf_callback, &bf_info);
}
} }
error = handle_cursor_error(error, HA_ERR_END_OF_FILE,active_index); error = handle_cursor_error(error, HA_ERR_END_OF_FILE,active_index);
...@@ -5078,11 +5044,11 @@ int ha_tokudb::get_next(uchar* buf, int direction) { ...@@ -5078,11 +5044,11 @@ int ha_tokudb::get_next(uchar* buf, int direction) {
info.buf = buf; info.buf = buf;
info.keynr = active_index; info.keynr = active_index;
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_next(cursor, flags,
lockretry_wait; SMART_DBT_CALLBACK, &info);
} error = handle_cursor_error(error, HA_ERR_END_OF_FILE,
error = handle_cursor_error(error, HA_ERR_END_OF_FILE,active_index); active_index);
} }
} }
...@@ -5168,10 +5134,9 @@ int ha_tokudb::index_first(uchar * buf) { ...@@ -5168,10 +5134,9 @@ int ha_tokudb::index_first(uchar * buf) {
info.buf = buf; info.buf = buf;
info.keynr = active_index; info.keynr = active_index;
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_first(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_first(cursor, flags,
lockretry_wait; SMART_DBT_CALLBACK, &info);
}
error = handle_cursor_error(error,HA_ERR_END_OF_FILE,active_index); error = handle_cursor_error(error,HA_ERR_END_OF_FILE,active_index);
// //
...@@ -5213,10 +5178,9 @@ int ha_tokudb::index_last(uchar * buf) { ...@@ -5213,10 +5178,9 @@ int ha_tokudb::index_last(uchar * buf) {
info.buf = buf; info.buf = buf;
info.keynr = active_index; info.keynr = active_index;
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_last(cursor, flags,
lockretry_wait; SMART_DBT_CALLBACK, &info);
}
error = handle_cursor_error(error,HA_ERR_END_OF_FILE,active_index); error = handle_cursor_error(error,HA_ERR_END_OF_FILE,active_index);
// //
// still need to get entire contents of the row if operation done on // still need to get entire contents of the row if operation done on
...@@ -5245,7 +5209,6 @@ cleanup: ...@@ -5245,7 +5209,6 @@ cleanup:
int ha_tokudb::rnd_init(bool scan) { int ha_tokudb::rnd_init(bool scan) {
TOKUDB_DBUG_ENTER("ha_tokudb::rnd_init"); TOKUDB_DBUG_ENTER("ha_tokudb::rnd_init");
int error = 0; int error = 0;
read_lock_wait_time = get_read_lock_wait_time(ha_thd());
range_lock_grabbed = false; range_lock_grabbed = false;
error = index_init(primary_key, 0); error = index_init(primary_key, 0);
if (error) { goto cleanup;} if (error) { goto cleanup;}
...@@ -5368,7 +5331,6 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { ...@@ -5368,7 +5331,6 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
struct smart_dbt_info info; struct smart_dbt_info info;
bool old_unpack_entire_row = unpack_entire_row; bool old_unpack_entire_row = unpack_entire_row;
DBT* key = get_pos(&db_pos, pos); DBT* key = get_pos(&db_pos, pos);
read_lock_wait_time = get_read_lock_wait_time(ha_thd());
unpack_entire_row = true; unpack_entire_row = true;
statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status);
...@@ -5378,10 +5340,10 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { ...@@ -5378,10 +5340,10 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
info.buf = buf; info.buf = buf;
info.keynr = primary_key; info.keynr = primary_key;
lockretryN(read_lock_wait_time) { //XXX: Get rid of the lock retry logic
error = share->file->getf_set(share->file, transaction, get_cursor_isolation_flags(lock.type, ha_thd()), key, smart_dbt_callback_rowread_ptquery, &info); error = share->file->getf_set(share->file, transaction,
lockretry_wait; get_cursor_isolation_flags(lock.type, ha_thd()),
} key, smart_dbt_callback_rowread_ptquery, &info);
if (error == DB_NOTFOUND) { if (error == DB_NOTFOUND) {
error = HA_ERR_KEY_NOT_FOUND; error = HA_ERR_KEY_NOT_FOUND;
...@@ -5436,14 +5398,12 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k ...@@ -5436,14 +5398,12 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
prelocked_right_range_size = 0; prelocked_right_range_size = 0;
} }
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = cursor->c_pre_acquire_range_lock( error = cursor->c_pre_acquire_range_lock(
cursor, cursor,
start_key ? &start_dbt_key : share->key_file[active_index]->dbt_neg_infty(), start_key ? &start_dbt_key : share->key_file[active_index]->dbt_neg_infty(),
end_key ? &end_dbt_key : share->key_file[active_index]->dbt_pos_infty() end_key ? &end_dbt_key : share->key_file[active_index]->dbt_pos_infty()
); );
lockretry_wait;
}
if (error){ if (error){
last_cursor_error = error; last_cursor_error = error;
// //
...@@ -7145,7 +7105,6 @@ int ha_tokudb::tokudb_add_index( ...@@ -7145,7 +7105,6 @@ int ha_tokudb::tokudb_add_index(
// //
char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound. char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound.
ulonglong num_processed = 0; //variable that stores number of elements inserted thus far ulonglong num_processed = 0; //variable that stores number of elements inserted thus far
read_lock_wait_time = get_read_lock_wait_time(ha_thd());
thd_proc_info(thd, "Adding indexes"); thd_proc_info(thd, "Adding indexes");
...@@ -7289,14 +7248,12 @@ int ha_tokudb::tokudb_add_index( ...@@ -7289,14 +7248,12 @@ int ha_tokudb::tokudb_add_index(
// first a global read lock on the main DB, because // first a global read lock on the main DB, because
// we intend to scan the entire thing // we intend to scan the entire thing
// //
lockretryN(read_lock_wait_time){ //XXX: Get rid of the lock retry logic
error = tmp_cursor->c_pre_acquire_range_lock( error = tmp_cursor->c_pre_acquire_range_lock(
tmp_cursor, tmp_cursor,
share->file->dbt_neg_infty(), share->file->dbt_neg_infty(),
share->file->dbt_pos_infty() share->file->dbt_pos_infty()
); );
lockretry_wait;
}
if (error) { goto cleanup; } if (error) { goto cleanup; }
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED); cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
......
...@@ -370,7 +370,7 @@ private: ...@@ -370,7 +370,7 @@ private:
void set_query_columns(uint keynr); void set_query_columns(uint keynr);
int prelock_range (const key_range *start_key, const key_range *end_key); int prelock_range (const key_range *start_key, const key_range *end_key);
int create_txn(THD* thd, tokudb_trx_data* trx); int create_txn(THD* thd, tokudb_trx_data* trx);
bool may_table_be_empty(); bool may_table_be_empty(DB_TXN *txn);
int delete_or_rename_table (const char* from_name, const char* to_name, bool is_delete); int delete_or_rename_table (const char* from_name, const char* to_name, bool is_delete);
int delete_or_rename_dictionary( const char* from_name, const char* to_name, const char* index_name, bool is_key, DB_TXN* txn, bool is_delete); int delete_or_rename_dictionary( const char* from_name, const char* to_name, const char* index_name, bool is_key, DB_TXN* txn, bool is_delete);
int truncate_dictionary( uint keynr, DB_TXN* txn ); int truncate_dictionary( uint keynr, DB_TXN* txn );
......
...@@ -14,20 +14,17 @@ extern "C" { ...@@ -14,20 +14,17 @@ extern "C" {
#include "toku_time.h" #include "toku_time.h"
} }
/* We define DTRACE after mysql_priv.h in case it disabled dtrace in the main server */ /* We define DTRACE after mysql_priv.h in case it disabled dtrace in the main server */
#ifdef HAVE_DTRACE #ifdef HAVE_DTRACE
#define _DTRACE_VERSION 1 #define _DTRACE_VERSION 1
#else #else
#endif #endif
#include <mysql/plugin.h> #include <mysql/plugin.h>
#include "hatoku_hton.h" #include "hatoku_hton.h"
#include "hatoku_defines.h" #include "hatoku_defines.h"
#include "ha_tokudb.h" #include "ha_tokudb.h"
#undef PACKAGE #undef PACKAGE
#undef VERSION #undef VERSION
#undef HAVE_DTRACE #undef HAVE_DTRACE
...@@ -35,6 +32,8 @@ extern "C" { ...@@ -35,6 +32,8 @@ extern "C" {
#define TOKU_METADB_NAME "tokudb_meta" #define TOKU_METADB_NAME "tokudb_meta"
#define DEFAULT_LOCK_TIMEOUT_USEC (4UL * 1000 * 1000)
typedef struct savepoint_info { typedef struct savepoint_info {
DB_TXN* txn; DB_TXN* txn;
tokudb_trx_data* trx; tokudb_trx_data* trx;
...@@ -50,36 +49,20 @@ static inline void thd_data_set(THD *thd, int slot, void *data) { ...@@ -50,36 +49,20 @@ static inline void thd_data_set(THD *thd, int slot, void *data) {
thd->ha_data[slot].ha_ptr = data; thd->ha_data[slot].ha_ptr = data;
} }
static uchar *tokudb_get_key(TOKUDB_SHARE * share, size_t * length, my_bool not_used __attribute__ ((unused))) { static uchar *tokudb_get_key(TOKUDB_SHARE * share, size_t * length, my_bool not_used __attribute__ ((unused))) {
*length = share->table_name_length; *length = share->table_name_length;
return (uchar *) share->table_name; return (uchar *) share->table_name;
} }
static handler *tokudb_create_handler(handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root); static handler *tokudb_create_handler(handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root);
static MYSQL_THDVAR_BOOL(commit_sync, PLUGIN_VAR_THDLOCAL, "sync on txn commit", static MYSQL_THDVAR_BOOL(commit_sync,
/* check */ NULL, /* update */ NULL, /* default*/ TRUE); PLUGIN_VAR_THDLOCAL,
static MYSQL_THDVAR_ULONGLONG(write_lock_wait, "sync on txn commit",
0, /* check */ NULL,
"time waiting for write lock", /* update */ NULL,
NULL, /* default*/ TRUE
NULL, );
5000, // default
0, // min?
ULONGLONG_MAX, // max
1 // blocksize
);
static MYSQL_THDVAR_ULONGLONG(read_lock_wait,
0,
"time waiting for read lock",
NULL,
NULL,
4000, // default
0, // min?
ULONGLONG_MAX, // max
1 // blocksize
);
static MYSQL_THDVAR_UINT(pk_insert_mode, static MYSQL_THDVAR_UINT(pk_insert_mode,
0, 0,
"set the primary key insert mode", "set the primary key insert mode",
...@@ -138,10 +121,9 @@ static MYSQL_THDVAR_UINT(read_block_size, ...@@ -138,10 +121,9 @@ static MYSQL_THDVAR_UINT(read_block_size,
~0L, // max ~0L, // max
1 // blocksize??? 1 // blocksize???
); );
static MYSQL_THDVAR_UINT(read_buf_size, static MYSQL_THDVAR_UINT(read_buf_size,
0, 0,
"fractal tree read block size", "fractal tree read block size", //TODO: Is this a typo?
NULL, NULL,
NULL, NULL,
128*1024, // default 128*1024, // default
...@@ -150,7 +132,6 @@ static MYSQL_THDVAR_UINT(read_buf_size, ...@@ -150,7 +132,6 @@ static MYSQL_THDVAR_UINT(read_buf_size,
1 // blocksize??? 1 // blocksize???
); );
void tokudb_checkpoint_lock(THD * thd); void tokudb_checkpoint_lock(THD * thd);
void tokudb_checkpoint_unlock(THD * thd); void tokudb_checkpoint_unlock(THD * thd);
...@@ -180,7 +161,6 @@ static MYSQL_THDVAR_BOOL(checkpoint_lock, ...@@ -180,7 +161,6 @@ static MYSQL_THDVAR_BOOL(checkpoint_lock,
FALSE FALSE
); );
static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer); static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer);
static void tokudb_cleanup_log_files(void); static void tokudb_cleanup_log_files(void);
static int tokudb_end(handlerton * hton, ha_panic_function type); static int tokudb_end(handlerton * hton, ha_panic_function type);
...@@ -210,6 +190,7 @@ HASH tokudb_open_tables; ...@@ -210,6 +190,7 @@ HASH tokudb_open_tables;
pthread_mutex_t tokudb_mutex; pthread_mutex_t tokudb_mutex;
pthread_mutex_t tokudb_meta_mutex; pthread_mutex_t tokudb_meta_mutex;
static ulonglong tokudb_lock_timeout;
//my_bool tokudb_shared_data = FALSE; //my_bool tokudb_shared_data = FALSE;
static u_int32_t tokudb_init_flags = static u_int32_t tokudb_init_flags =
...@@ -443,6 +424,9 @@ static int tokudb_init_func(void *p) { ...@@ -443,6 +424,9 @@ static int tokudb_init_func(void *p) {
r = db_env->checkpointing_set_period(db_env, tokudb_checkpointing_period); r = db_env->checkpointing_set_period(db_env, tokudb_checkpointing_period);
assert(!r); assert(!r);
r = db_env->set_lock_timeout(db_env, DEFAULT_LOCK_TIMEOUT_USEC);
assert(r == 0);
r = db_create(&metadata_db, db_env, 0); r = db_create(&metadata_db, db_env, 0);
if (r) { if (r) {
DBUG_PRINT("info", ("failed to create metadata db %d\n", r)); DBUG_PRINT("info", ("failed to create metadata db %d\n", r));
...@@ -587,17 +571,6 @@ exit: ...@@ -587,17 +571,6 @@ exit:
TOKUDB_DBUG_RETURN(result); TOKUDB_DBUG_RETURN(result);
} }
ulonglong get_write_lock_wait_time (THD* thd) {
ulonglong ret_val = THDVAR(thd, write_lock_wait);
return (ret_val == 0) ? ULONGLONG_MAX : ret_val;
}
ulonglong get_read_lock_wait_time (THD* thd) {
ulonglong ret_val = THDVAR(thd, read_lock_wait);
return (ret_val == 0) ? ULONGLONG_MAX : ret_val;
}
uint get_pk_insert_mode(THD* thd) { uint get_pk_insert_mode(THD* thd) {
return THDVAR(thd, pk_insert_mode); return THDVAR(thd, pk_insert_mode);
} }
...@@ -1566,8 +1539,23 @@ static uint tokudb_alter_table_flags(uint flags) ...@@ -1566,8 +1539,23 @@ static uint tokudb_alter_table_flags(uint flags)
// system variables // system variables
static MYSQL_SYSVAR_ULONGLONG(cache_size, tokudb_cache_size, PLUGIN_VAR_READONLY, "TokuDB cache table size", NULL, NULL, 0, 0, ~0LL, 0); static void tokudb_lock_timeout_update(THD * thd,
struct st_mysql_sys_var * sys_var,
void * var, const void * save)
{
ulonglong * timeout = (ulonglong *) var;
*timeout = *(const ulonglong *) save;
db_env->set_lock_timeout(db_env, *timeout);
}
static MYSQL_SYSVAR_ULONGLONG(lock_timeout, tokudb_lock_timeout,
0, "TokuDB lock timeout",
NULL, tokudb_lock_timeout_update, DEFAULT_LOCK_TIMEOUT_USEC,
0, ~0LL, 0);
static MYSQL_SYSVAR_ULONGLONG(cache_size, tokudb_cache_size,
PLUGIN_VAR_READONLY, "TokuDB cache table size", NULL, NULL, 0,
0, ~0LL, 0);
static MYSQL_SYSVAR_ULONGLONG(max_lock_memory, tokudb_max_lock_memory, PLUGIN_VAR_READONLY, "TokuDB max memory for locks", NULL, NULL, 0, 0, ~0LL, 0); static MYSQL_SYSVAR_ULONGLONG(max_lock_memory, tokudb_max_lock_memory, PLUGIN_VAR_READONLY, "TokuDB max memory for locks", NULL, NULL, 0, 0, ~0LL, 0);
static MYSQL_SYSVAR_ULONG(debug, tokudb_debug, 0, "TokuDB Debug", NULL, NULL, 0, 0, ~0L, 0); static MYSQL_SYSVAR_ULONG(debug, tokudb_debug, 0, "TokuDB Debug", NULL, NULL, 0, 0, ~0L, 0);
...@@ -1592,8 +1580,14 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = { ...@@ -1592,8 +1580,14 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = {
MYSQL_SYSVAR(log_dir), MYSQL_SYSVAR(log_dir),
MYSQL_SYSVAR(debug), MYSQL_SYSVAR(debug),
MYSQL_SYSVAR(commit_sync), MYSQL_SYSVAR(commit_sync),
MYSQL_SYSVAR(write_lock_wait),
MYSQL_SYSVAR(read_lock_wait), // XXX: implmement a new mysql system variable in our handlerton
// called tokudb_lock_timeout. this variable defines the maximum
// time that threads will wait for a lock to be acquired
MYSQL_SYSVAR(lock_timeout),
// XXX remove the old tokudb_read_lock_wait session variable
// XXX remove the old tokudb_write_lock_wait session variable
MYSQL_SYSVAR(pk_insert_mode), MYSQL_SYSVAR(pk_insert_mode),
MYSQL_SYSVAR(load_save_space), MYSQL_SYSVAR(load_save_space),
MYSQL_SYSVAR(disable_slow_alter), MYSQL_SYSVAR(disable_slow_alter),
......
...@@ -9,10 +9,7 @@ extern handlerton *tokudb_hton; ...@@ -9,10 +9,7 @@ extern handlerton *tokudb_hton;
extern DB_ENV *db_env; extern DB_ENV *db_env;
extern DB *metadata_db; extern DB *metadata_db;
// thread variables // thread variables
ulonglong get_write_lock_wait_time (THD* thd);
ulonglong get_read_lock_wait_time (THD* thd);
uint get_pk_insert_mode(THD* thd); uint get_pk_insert_mode(THD* thd);
bool get_load_save_space(THD* thd); bool get_load_save_space(THD* thd);
bool get_disable_slow_alter(THD* thd); bool get_disable_slow_alter(THD* thd);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment