Commit d543d109 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3497 merge handlerton write lock changes to main refs[t:3497]

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@31569 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6b3c1ae1
...@@ -465,8 +465,7 @@ smart_dbt_callback_ir_rowread(DBT const *key, DBT const *row, void *context) { ...@@ -465,8 +465,7 @@ smart_dbt_callback_ir_rowread(DBT const *key, DBT const *row, void *context) {
// macro that modifies read flag for cursor operations depending on whether // macro that modifies read flag for cursor operations depending on whether
// we have preacquired lock or not // we have preacquired lock or not
// //
#define SET_READ_FLAG(flg) ((range_lock_grabbed) ? ((flg) | DB_PRELOCKED) : (flg)) #define SET_PRELOCK_FLAG(flg) ((flg) | (range_lock_grabbed ? (use_write_locks ? DB_PRELOCKED_WRITE : DB_PRELOCKED) : 0))
// //
// This method retrieves the value of the auto increment column of a record in MySQL format // This method retrieves the value of the auto increment column of a record in MySQL format
...@@ -4288,7 +4287,7 @@ int ha_tokudb::prepare_index_scan() { ...@@ -4288,7 +4287,7 @@ int ha_tokudb::prepare_index_scan() {
DB* db = share->key_file[active_index]; DB* db = share->key_file[active_index];
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = cursor->c_pre_acquire_read_lock( error = cursor->c_pre_acquire_range_lock(
cursor, cursor,
db->dbt_neg_infty(), db->dbt_neg_infty(),
db->dbt_pos_infty() db->dbt_pos_infty()
...@@ -4319,7 +4318,7 @@ int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) { ...@@ -4319,7 +4318,7 @@ int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) {
pack_key(&end_key, active_index, key_buff2, key, key_len, COL_POS_INF); pack_key(&end_key, active_index, key_buff2, key, key_len, COL_POS_INF);
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = cursor->c_pre_acquire_read_lock( error = cursor->c_pre_acquire_range_lock(
cursor, cursor,
&start_key, &start_key,
&end_key &end_key
...@@ -4380,6 +4379,8 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -4380,6 +4379,8 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
DBUG_ASSERT(keynr <= table->s->keys); DBUG_ASSERT(keynr <= table->s->keys);
DBUG_ASSERT(share->key_file[keynr]); DBUG_ASSERT(share->key_file[keynr]);
cursor_flags = get_cursor_isolation_flags(lock.type, thd); cursor_flags = get_cursor_isolation_flags(lock.type, thd);
if (use_write_locks)
cursor_flags |= DB_RMW;
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, cursor_flags))) { if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, cursor_flags))) {
if (error == TOKUDB_MVCC_DICTIONARY_TOO_NEW) { if (error == TOKUDB_MVCC_DICTIONARY_TOO_NEW) {
error = HA_ERR_TABLE_DEF_CHANGED; error = HA_ERR_TABLE_DEF_CHANGED;
...@@ -4633,7 +4634,7 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { ...@@ -4633,7 +4634,7 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
pack_key(&curr_key, active_index, key_buff2, key, keylen, COL_ZERO); pack_key(&curr_key, active_index, key_buff2, key, keylen, COL_ZERO);
flags = SET_READ_FLAG(0); flags = SET_PRELOCK_FLAG(0);
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info);
lockretry_wait; lockretry_wait;
...@@ -4704,7 +4705,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -4704,7 +4705,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
ir_info.smart_dbt_info = info; ir_info.smart_dbt_info = info;
ir_info.cmp = 0; ir_info.cmp = 0;
flags = SET_READ_FLAG(0); flags = SET_PRELOCK_FLAG(0);
switch (find_flag) { switch (find_flag) {
case HA_READ_KEY_EXACT: /* Find first record else error */ case HA_READ_KEY_EXACT: /* Find first record else error */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF);
...@@ -4808,7 +4809,7 @@ int ha_tokudb::index_next(uchar * buf) { ...@@ -4808,7 +4809,7 @@ int ha_tokudb::index_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_next"); TOKUDB_DBUG_ENTER("ha_tokudb::index_next");
int error = 0; int error = 0;
struct smart_dbt_info info; struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0); u_int32_t flags = SET_PRELOCK_FLAG(0);
THD* thd = ha_thd(); THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
...@@ -4856,7 +4857,7 @@ int ha_tokudb::index_prev(uchar * buf) { ...@@ -4856,7 +4857,7 @@ int ha_tokudb::index_prev(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_next"); TOKUDB_DBUG_ENTER("ha_tokudb::index_next");
int error = 0; int error = 0;
struct smart_dbt_info info; struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0); u_int32_t flags = SET_PRELOCK_FLAG(0);
THD* thd = ha_thd(); THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
...@@ -4900,7 +4901,7 @@ int ha_tokudb::index_first(uchar * buf) { ...@@ -4900,7 +4901,7 @@ int ha_tokudb::index_first(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_first"); TOKUDB_DBUG_ENTER("ha_tokudb::index_first");
int error = 0; int error = 0;
struct smart_dbt_info info; struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0); u_int32_t flags = SET_PRELOCK_FLAG(0);
THD* thd = ha_thd(); THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
...@@ -4944,7 +4945,7 @@ int ha_tokudb::index_last(uchar * buf) { ...@@ -4944,7 +4945,7 @@ int ha_tokudb::index_last(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_last"); TOKUDB_DBUG_ENTER("ha_tokudb::index_last");
int error = 0; int error = 0;
struct smart_dbt_info info; struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0); u_int32_t flags = SET_PRELOCK_FLAG(0);
THD* thd = ha_thd(); THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
...@@ -4995,7 +4996,7 @@ int ha_tokudb::rnd_init(bool scan) { ...@@ -4995,7 +4996,7 @@ int ha_tokudb::rnd_init(bool scan) {
if (scan) { if (scan) {
DB* db = share->key_file[primary_key]; DB* db = share->key_file[primary_key];
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = cursor->c_pre_acquire_read_lock(cursor, db->dbt_neg_infty(), db->dbt_pos_infty()); error = cursor->c_pre_acquire_range_lock(cursor, db->dbt_neg_infty(), db->dbt_pos_infty());
lockretry_wait; lockretry_wait;
} }
if (error) { goto cleanup; } if (error) { goto cleanup; }
...@@ -5038,7 +5039,7 @@ int ha_tokudb::rnd_end() { ...@@ -5038,7 +5039,7 @@ int ha_tokudb::rnd_end() {
int ha_tokudb::rnd_next(uchar * buf) { int ha_tokudb::rnd_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next"); TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next");
int error = 0; int error = 0;
u_int32_t flags = SET_READ_FLAG(0); u_int32_t flags = SET_PRELOCK_FLAG(0);
THD* thd = ha_thd(); THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
...@@ -5163,9 +5164,8 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { ...@@ -5163,9 +5164,8 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_key) { int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_range_first"); TOKUDB_DBUG_ENTER("ha_tokudb::prelock_range");
int error = 0; int error = 0;
DBT start_dbt_key; DBT start_dbt_key;
...@@ -5197,11 +5197,10 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k ...@@ -5197,11 +5197,10 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
pack_key(&end_dbt_key, active_index, end_key_buff, end_key->key, end_key->length, COL_POS_INF); pack_key(&end_dbt_key, active_index, end_key_buff, end_key->key, end_key->length, COL_POS_INF);
break; break;
} }
} }
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = cursor->c_pre_acquire_read_lock( error = cursor->c_pre_acquire_range_lock(
cursor, cursor,
start_key ? &start_dbt_key : share->key_file[active_index]->dbt_neg_infty(), start_key ? &start_dbt_key : share->key_file[active_index]->dbt_neg_infty(),
end_key ? &end_dbt_key : share->key_file[active_index]->dbt_pos_infty() end_key ? &end_dbt_key : share->key_file[active_index]->dbt_pos_infty()
...@@ -5245,8 +5244,7 @@ int ha_tokudb::read_range_first( ...@@ -5245,8 +5244,7 @@ int ha_tokudb::read_range_first(
bool eq_range, bool eq_range,
bool sorted) bool sorted)
{ {
int error; int error = prelock_range(start_key, end_key);
error = prelock_range(start_key, end_key);
if (error) { goto cleanup; } if (error) { goto cleanup; }
range_lock_grabbed = true; range_lock_grabbed = true;
...@@ -5254,6 +5252,7 @@ int ha_tokudb::read_range_first( ...@@ -5254,6 +5252,7 @@ int ha_tokudb::read_range_first(
cleanup: cleanup:
return error; return error;
} }
int ha_tokudb::read_range_next() int ha_tokudb::read_range_next()
{ {
TOKUDB_DBUG_ENTER("ha_tokudb::read_range_next"); TOKUDB_DBUG_ENTER("ha_tokudb::read_range_next");
...@@ -5579,11 +5578,9 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -5579,11 +5578,9 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
TOKUDB_DBUG_ENTER("ha_tokudb::external_lock cmd=%d %d", thd_sql_command(thd), lock_type); TOKUDB_DBUG_ENTER("ha_tokudb::external_lock cmd=%d %d", thd_sql_command(thd), lock_type);
if (tokudb_debug & TOKUDB_DEBUG_LOCK) if (tokudb_debug & TOKUDB_DEBUG_LOCK)
TOKUDB_TRACE("%s cmd=%d %d\n", __FUNCTION__, thd_sql_command(thd), lock_type); TOKUDB_TRACE("%s cmd=%d %d\n", __FUNCTION__, thd_sql_command(thd), lock_type);
// QQQ this is here to allow experiments without transactions
int error = 0; int error = 0;
tokudb_trx_data *trx = NULL; tokudb_trx_data *trx = NULL;
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
if (!trx) { if (!trx) {
error = create_tokudb_trx_data_instance(&trx); error = create_tokudb_trx_data_instance(&trx);
...@@ -5595,6 +5592,9 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -5595,6 +5592,9 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
} }
if (lock_type != F_UNLCK) { if (lock_type != F_UNLCK) {
is_fast_alter_running = false; is_fast_alter_running = false;
use_write_locks = false;
if (get_tokudb_use_write_locks(thd) && lock_type == F_WRLCK)
use_write_locks = true;
if (!trx->tokudb_lock_count++) { if (!trx->tokudb_lock_count++) {
DBUG_ASSERT(trx->stmt == 0); DBUG_ASSERT(trx->stmt == 0);
transaction = NULL; // Safety transaction = NULL; // Safety
...@@ -5720,8 +5720,8 @@ u_int32_t ha_tokudb::get_cursor_isolation_flags(enum thr_lock_type lock_type, TH ...@@ -5720,8 +5720,8 @@ u_int32_t ha_tokudb::get_cursor_isolation_flags(enum thr_lock_type lock_type, TH
} }
else if ((lock_type == TL_READ && in_lock_tables) || else if ((lock_type == TL_READ && in_lock_tables) ||
(lock_type == TL_READ_HIGH_PRIORITY && in_lock_tables) || (lock_type == TL_READ_HIGH_PRIORITY && in_lock_tables) ||
sql_command != SQLCOM_SELECT) sql_command != SQLCOM_SELECT ||
{ (sql_command == SQLCOM_SELECT && lock_type >= TL_WRITE_ALLOW_WRITE)) { // select for update
return DB_SERIALIZABLE; return DB_SERIALIZABLE;
} }
else { else {
...@@ -7019,7 +7019,7 @@ int ha_tokudb::tokudb_add_index( ...@@ -7019,7 +7019,7 @@ int ha_tokudb::tokudb_add_index(
// we intend to scan the entire thing // we intend to scan the entire thing
// //
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = tmp_cursor->c_pre_acquire_read_lock( error = tmp_cursor->c_pre_acquire_range_lock(
tmp_cursor, tmp_cursor,
share->file->dbt_neg_infty(), share->file->dbt_neg_infty(),
share->file->dbt_pos_infty() share->file->dbt_pos_infty()
......
...@@ -235,6 +235,8 @@ class ha_tokudb : public handler { ...@@ -235,6 +235,8 @@ class ha_tokudb : public handler {
// //
DB_TXN *transaction; DB_TXN *transaction;
bool is_fast_alter_running; bool is_fast_alter_running;
bool use_write_locks;
// //
// instance of cursor being used for init_xxx and rnd_xxx functions // instance of cursor being used for init_xxx and rnd_xxx functions
// //
...@@ -354,7 +356,7 @@ class ha_tokudb : public handler { ...@@ -354,7 +356,7 @@ class ha_tokudb : public handler {
); );
void set_query_columns(uint keynr); void set_query_columns(uint keynr);
int prelock_range ( const key_range *start_key, const key_range *end_key); int prelock_range (const key_range *start_key, const key_range *end_key);
int create_txn(THD* thd, tokudb_trx_data* trx); int create_txn(THD* thd, tokudb_trx_data* trx);
bool may_table_be_empty(); bool may_table_be_empty();
int delete_or_rename_table (const char* from_name, const char* to_name, bool is_delete); int delete_or_rename_table (const char* from_name, const char* to_name, bool is_delete);
......
...@@ -128,6 +128,13 @@ static MYSQL_THDVAR_UINT(block_size, ...@@ -128,6 +128,13 @@ static MYSQL_THDVAR_UINT(block_size,
~0L, // max ~0L, // max
1 // blocksize??? 1 // blocksize???
); );
static MYSQL_THDVAR_BOOL(use_write_locks,
0,
"if on, will use write locks instead of read locks",
NULL,
NULL,
FALSE
);
void tokudb_checkpoint_lock(THD * thd); void tokudb_checkpoint_lock(THD * thd);
void tokudb_checkpoint_unlock(THD * thd); void tokudb_checkpoint_unlock(THD * thd);
...@@ -595,6 +602,10 @@ uint get_tokudb_block_size(THD* thd) { ...@@ -595,6 +602,10 @@ uint get_tokudb_block_size(THD* thd) {
return THDVAR(thd, block_size); return THDVAR(thd, block_size);
} }
bool get_tokudb_use_write_locks(THD* thd) {
return THDVAR(thd, use_write_locks) != 0;
}
typedef struct txn_progress_info { typedef struct txn_progress_info {
char status[200]; char status[200];
THD* thd; THD* thd;
...@@ -1534,6 +1545,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = { ...@@ -1534,6 +1545,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = {
MYSQL_SYSVAR(fs_reserve_percent), MYSQL_SYSVAR(fs_reserve_percent),
MYSQL_SYSVAR(tmp_dir), MYSQL_SYSVAR(tmp_dir),
MYSQL_SYSVAR(block_size), MYSQL_SYSVAR(block_size),
MYSQL_SYSVAR(use_write_locks),
NULL NULL
}; };
......
...@@ -19,6 +19,7 @@ bool get_disable_slow_alter(THD* thd); ...@@ -19,6 +19,7 @@ bool get_disable_slow_alter(THD* thd);
bool get_create_index_online(THD* thd); bool get_create_index_online(THD* thd);
bool get_prelock_empty(THD* thd); bool get_prelock_empty(THD* thd);
uint get_tokudb_block_size(THD* thd); uint get_tokudb_block_size(THD* thd);
bool get_tokudb_use_write_locks(THD* thd);
extern HASH tokudb_open_tables; extern HASH tokudb_open_tables;
extern pthread_mutex_t tokudb_mutex; extern pthread_mutex_t tokudb_mutex;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment