Commit e9215c3f authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2811], merge handlerton MVCC changes to main line

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@23498 c7de825b-a66e-492c-adef-691d508d4ae1
parent b14084bb
...@@ -580,6 +580,9 @@ inline HA_TOKU_ISO_LEVEL tx_to_toku_iso(ulong tx_isolation) { ...@@ -580,6 +580,9 @@ inline HA_TOKU_ISO_LEVEL tx_to_toku_iso(ulong tx_isolation) {
else if (tx_isolation == ISO_READ_COMMITTED) { else if (tx_isolation == ISO_READ_COMMITTED) {
return hatoku_iso_read_committed; return hatoku_iso_read_committed;
} }
else if (tx_isolation == ISO_REPEATABLE_READ) {
return hatoku_iso_repeatable_read;
}
else { else {
return hatoku_iso_serializable; return hatoku_iso_serializable;
} }
...@@ -592,6 +595,9 @@ inline u_int32_t toku_iso_to_txn_flag (HA_TOKU_ISO_LEVEL lvl) { ...@@ -592,6 +595,9 @@ inline u_int32_t toku_iso_to_txn_flag (HA_TOKU_ISO_LEVEL lvl) {
else if (lvl == hatoku_iso_read_committed) { else if (lvl == hatoku_iso_read_committed) {
return DB_READ_COMMITTED; return DB_READ_COMMITTED;
} }
else if (lvl == hatoku_iso_repeatable_read) {
return DB_TXN_SNAPSHOT;
}
else { else {
return 0; return 0;
} }
...@@ -1143,6 +1149,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t ...@@ -1143,6 +1149,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
loader = NULL; loader = NULL;
abort_loader = false; abort_loader = false;
bzero(&lc, sizeof(lc)); bzero(&lc, sizeof(lc));
lock.type = TL_IGNORE;
} }
// //
...@@ -3000,7 +3007,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in ...@@ -3000,7 +3007,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
db, db,
txn, txn,
&tmp_cursor1, &tmp_cursor1,
0 DB_SERIALIZABLE
); );
if (error) { goto cleanup; } if (error) { goto cleanup; }
...@@ -3008,7 +3015,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in ...@@ -3008,7 +3015,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
db, db,
txn, txn,
&tmp_cursor2, &tmp_cursor2,
0 DB_SERIALIZABLE
); );
if (error) { goto cleanup; } if (error) { goto cleanup; }
...@@ -3169,7 +3176,7 @@ int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint ...@@ -3169,7 +3176,7 @@ int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint
share->key_file[dict_index], share->key_file[dict_index],
txn, txn,
&tmp_cursor, &tmp_cursor,
0 DB_SERIALIZABLE
); );
if (error) { goto cleanup; } if (error) { goto cleanup; }
...@@ -3996,11 +4003,11 @@ void ha_tokudb::column_bitmaps_signal() { ...@@ -3996,11 +4003,11 @@ void ha_tokudb::column_bitmaps_signal() {
int ha_tokudb::prepare_index_scan() { int ha_tokudb::prepare_index_scan() {
int error = 0; int error = 0;
DB* db = share->key_file[active_index]; DB* db = share->key_file[active_index];
HANDLE_INVALID_CURSOR();
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = db->pre_acquire_read_lock( error = cursor->c_pre_acquire_read_lock(
db, cursor,
transaction, db->dbt_neg_infty(),
db->dbt_neg_infty(),
db->dbt_pos_infty() db->dbt_pos_infty()
); );
lockretry_wait; lockretry_wait;
...@@ -4024,15 +4031,15 @@ int ha_tokudb::prepare_index_scan() { ...@@ -4024,15 +4031,15 @@ int ha_tokudb::prepare_index_scan() {
int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) { int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) {
int error = 0; int error = 0;
DBT start_key, end_key; DBT start_key, end_key;
HANDLE_INVALID_CURSOR();
pack_key(&start_key, active_index, key_buff, key, key_len, COL_NEG_INF); pack_key(&start_key, active_index, key_buff, key, key_len, COL_NEG_INF);
pack_key(&end_key, active_index, key_buff2, key, key_len, COL_POS_INF); pack_key(&end_key, active_index, key_buff2, key, key_len, COL_POS_INF);
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = share->key_file[active_index]->pre_acquire_read_lock( error = cursor->c_pre_acquire_read_lock(
share->key_file[active_index], cursor,
transaction,
&start_key, &start_key,
&end_key &end_key
); );
lockretry_wait; lockretry_wait;
} }
...@@ -4089,7 +4096,12 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -4089,7 +4096,12 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
range_lock_grabbed = false; range_lock_grabbed = false;
DBUG_ASSERT(keynr <= table->s->keys); DBUG_ASSERT(keynr <= table->s->keys);
DBUG_ASSERT(share->key_file[keynr]); DBUG_ASSERT(share->key_file[keynr]);
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, 0))) { cursor_flags = get_cursor_isolation_flags(lock.type, thd);
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, cursor_flags))) {
if (error == TOKUDB_MVCC_DICTIONARY_TOO_NEW) {
my_error(ER_TABLE_DEF_CHANGED, MYF(0));
}
table->status = STATUS_NOT_FOUND;
last_cursor_error = error; last_cursor_error = error;
cursor = NULL; // Safety cursor = NULL; // Safety
goto exit; goto exit;
...@@ -4146,7 +4158,7 @@ int ha_tokudb::handle_cursor_error(int error, int err_to_return, uint keynr) { ...@@ -4146,7 +4158,7 @@ int ha_tokudb::handle_cursor_error(int error, int err_to_return, uint keynr) {
cursor = NULL; cursor = NULL;
if (error == DB_NOTFOUND) { if (error == DB_NOTFOUND) {
error = err_to_return; error = err_to_return;
if ((share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, 0))) { if ((share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, cursor_flags))) {
cursor = NULL; // Safety cursor = NULL; // Safety
} }
} }
...@@ -4281,7 +4293,7 @@ int ha_tokudb::read_full_row(uchar * buf) { ...@@ -4281,7 +4293,7 @@ int ha_tokudb::read_full_row(uchar * buf) {
error = share->file->getf_set( error = share->file->getf_set(
share->file, share->file,
transaction, transaction,
0, cursor_flags,
&last_key, &last_key,
smart_dbt_callback_rowread_ptquery, smart_dbt_callback_rowread_ptquery,
&info &info
...@@ -4684,17 +4696,17 @@ int ha_tokudb::rnd_init(bool scan) { ...@@ -4684,17 +4696,17 @@ int ha_tokudb::rnd_init(bool scan) {
int error = 0; int error = 0;
read_lock_wait_time = get_read_lock_wait_time(ha_thd()); read_lock_wait_time = get_read_lock_wait_time(ha_thd());
range_lock_grabbed = false; range_lock_grabbed = false;
error = index_init(primary_key, 0);
if (error) { goto cleanup;}
if (scan) { if (scan) {
DB* db = share->key_file[primary_key]; DB* db = share->key_file[primary_key];
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = db->pre_acquire_read_lock(db, transaction, db->dbt_neg_infty(), db->dbt_pos_infty()); error = cursor->c_pre_acquire_read_lock(cursor, db->dbt_neg_infty(), db->dbt_pos_infty());
lockretry_wait; lockretry_wait;
} }
if (error) { last_cursor_error = error; goto cleanup; } if (error) { last_cursor_error = error; goto cleanup; }
} }
error = index_init(primary_key, 0);
if (error) { goto cleanup;}
// //
// only want to set range_lock_grabbed to true after index_init // only want to set range_lock_grabbed to true after index_init
// successfully executed for two reasons: // successfully executed for two reasons:
...@@ -4841,7 +4853,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { ...@@ -4841,7 +4853,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
info.keynr = primary_key; info.keynr = primary_key;
lockretryN(read_lock_wait_time) { lockretryN(read_lock_wait_time) {
error = share->file->getf_set(share->file, transaction, 0, key, smart_dbt_callback_rowread_ptquery, &info); error = share->file->getf_set(share->file, transaction, get_cursor_isolation_flags(lock.type, ha_thd()), key, smart_dbt_callback_rowread_ptquery, &info);
lockretry_wait; lockretry_wait;
} }
...@@ -4867,6 +4879,7 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k ...@@ -4867,6 +4879,7 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
bzero((void *) &start_dbt_key, sizeof(start_dbt_key)); bzero((void *) &start_dbt_key, sizeof(start_dbt_key));
bzero((void *) &end_dbt_key, sizeof(end_dbt_key)); bzero((void *) &end_dbt_key, sizeof(end_dbt_key));
HANDLE_INVALID_CURSOR();
if (start_key) { if (start_key) {
switch (start_key->flag) { switch (start_key->flag) {
case HA_READ_AFTER_KEY: case HA_READ_AFTER_KEY:
...@@ -4877,6 +4890,7 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k ...@@ -4877,6 +4890,7 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
break; break;
} }
} }
if (end_key) { if (end_key) {
switch (end_key->flag) { switch (end_key->flag) {
case HA_READ_BEFORE_KEY: case HA_READ_BEFORE_KEY:
...@@ -4890,9 +4904,8 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k ...@@ -4890,9 +4904,8 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
} }
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = share->key_file[active_index]->pre_acquire_read_lock( error = cursor->c_pre_acquire_read_lock(
share->key_file[active_index], cursor,
transaction,
start_key ? &start_dbt_key : share->key_file[active_index]->dbt_neg_infty(), start_key ? &start_dbt_key : share->key_file[active_index]->dbt_neg_infty(),
end_key ? &end_dbt_key : share->key_file[active_index]->dbt_pos_infty() end_key ? &end_dbt_key : share->key_file[active_index]->dbt_pos_infty()
); );
...@@ -5158,17 +5171,8 @@ int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) { ...@@ -5158,17 +5171,8 @@ int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) {
int error = ENOSYS; int error = ENOSYS;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key); uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
if (lt == lock_read) { if (lt == lock_read) {
for (uint i = 0; i < curr_num_DBs; i++) { error = 0;
DB* db = share->key_file[i]; goto cleanup;
error = db->pre_acquire_read_lock(
db,
trans,
db->dbt_neg_infty(),
db->dbt_pos_infty()
);
if (error) break;
}
if (error) goto cleanup;
} }
else if (lt == lock_write) { else if (lt == lock_write) {
if (tokudb_debug & TOKUDB_DEBUG_LOCK) if (tokudb_debug & TOKUDB_DEBUG_LOCK)
...@@ -5390,6 +5394,22 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { ...@@ -5390,6 +5394,22 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
u_int32_t ha_tokudb::get_cursor_isolation_flags(enum thr_lock_type lock_type, THD* thd) {
uint sql_command = thd_sql_command(thd);
bool in_lock_tables = thd_in_lock_tables(thd);
if ((lock_type == TL_READ && in_lock_tables) ||
(lock_type == TL_READ_HIGH_PRIORITY && in_lock_tables) ||
sql_command != SQLCOM_SELECT)
{
return DB_SERIALIZABLE;
}
else {
return 0;
}
}
/* /*
The idea with handler::store_lock() is the following: The idea with handler::store_lock() is the following:
...@@ -6140,6 +6160,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range* ...@@ -6140,6 +6160,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
ha_rows ret_val = HA_TOKUDB_RANGE_COUNT; ha_rows ret_val = HA_TOKUDB_RANGE_COUNT;
DB *kfile = share->key_file[keynr]; DB *kfile = share->key_file[keynr];
u_int64_t less, equal, greater; u_int64_t less, equal, greater;
u_int64_t total_rows_estimate = HA_TOKUDB_RANGE_COUNT;
u_int64_t start_rows, end_rows, rows; u_int64_t start_rows, end_rows, rows;
int is_exact; int is_exact;
int error; int error;
...@@ -6152,6 +6173,14 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range* ...@@ -6152,6 +6173,14 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
// As a result, equal may be 0 and greater may actually be equal+greater // As a result, equal may be 0 and greater may actually be equal+greater
// So, we call key_range64 on the key, and the key that is after it. // So, we call key_range64 on the key, and the key that is after it.
// //
if (!start_key && !end_key) {
error = estimate_num_rows(kfile, &end_rows, transaction);
if (error) {
ret_val = HA_TOKUDB_RANGE_COUNT;
goto cleanup;
}
start_rows = 0;
}
if (start_key) { if (start_key) {
inf_byte = (start_key->flag == HA_READ_KEY_EXACT) ? inf_byte = (start_key->flag == HA_READ_KEY_EXACT) ?
COL_NEG_INF : COL_POS_INF; COL_NEG_INF : COL_POS_INF;
...@@ -6177,6 +6206,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range* ...@@ -6177,6 +6206,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
goto cleanup; goto cleanup;
} }
start_rows= less; start_rows= less;
total_rows_estimate = less + equal + greater;
} }
else { else {
start_rows= 0; start_rows= 0;
...@@ -6209,7 +6239,11 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range* ...@@ -6209,7 +6239,11 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
end_rows= less; end_rows= less;
} }
else { else {
end_rows = stats.records; //
// first if-clause ensures that start_key is non-NULL
//
assert(start_key);
end_rows = total_rows_estimate;
} }
rows = (end_rows > start_rows) ? end_rows - start_rows : 1; rows = (end_rows > start_rows) ? end_rows - start_rows : 1;
...@@ -6437,22 +6471,6 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6437,22 +6471,6 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
if (error) { goto cleanup; } if (error) { goto cleanup; }
} }
//
// grab some locks to make this go faster
// first a global read lock on the main DB, because
// we intend to scan the entire thing
//
lockretryN(read_lock_wait_time){
error = share->file->pre_acquire_read_lock(
share->file,
txn,
share->file->dbt_neg_infty(),
share->file->dbt_pos_infty()
);
lockretry_wait;
}
if (error) { goto cleanup; }
error = db_env->create_loader( error = db_env->create_loader(
db_env, db_env,
txn, txn,
...@@ -6480,6 +6498,21 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6480,6 +6498,21 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
goto cleanup; goto cleanup;
} }
//
// grab some locks to make this go faster
// first a global read lock on the main DB, because
// we intend to scan the entire thing
//
lockretryN(read_lock_wait_time){
error = tmp_cursor->c_pre_acquire_read_lock(
tmp_cursor,
share->file->dbt_neg_infty(),
share->file->dbt_pos_infty()
);
lockretry_wait;
}
if (error) { goto cleanup; }
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED); cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
while (cursor_ret_val != DB_NOTFOUND) { while (cursor_ret_val != DB_NOTFOUND) {
...@@ -6761,25 +6794,22 @@ int ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) { ...@@ -6761,25 +6794,22 @@ int ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) {
// this is a bit hacky, but it is the best we have right now // this is a bit hacky, but it is the best we have right now
// //
txn = trx->sub_sp_level ? trx->sub_sp_level : trx->sp_level; txn = trx->sub_sp_level ? trx->sub_sp_level : trx->sp_level;
if (txn == NULL) { if (txn == NULL) {
error = db_env->txn_begin(db_env, NULL, &txn, 0); error = db_env->txn_begin(db_env, NULL, &txn, DB_READ_UNCOMMITTED);
if (error) { if (error) {
goto cleanup; goto cleanup;
} }
do_commit = true; do_commit = true;
} }
//
// prelock so each scan goes faster
//
error = acquire_table_lock(txn,lock_read);
if (error) {
goto cleanup;
}
// //
// for each DB, scan through entire table and do nothing // for each DB, scan through entire table and do nothing
// //
for (uint i = 0; i < curr_num_DBs; i++) { for (uint i = 0; i < curr_num_DBs; i++) {
error = share->key_file[i]->optimize(share->key_file[i]);
if (error) {
goto cleanup;
}
error = share->key_file[i]->cursor(share->key_file[i], txn, &tmp_cursor, 0); error = share->key_file[i]->cursor(share->key_file[i], txn, &tmp_cursor, 0);
if (error) { if (error) {
tmp_cursor = NULL; tmp_cursor = NULL;
......
...@@ -211,6 +211,7 @@ class ha_tokudb : public handler { ...@@ -211,6 +211,7 @@ class ha_tokudb : public handler {
// instance of cursor being used for init_xxx and rnd_xxx functions // instance of cursor being used for init_xxx and rnd_xxx functions
// //
DBC *cursor; DBC *cursor;
u_int32_t cursor_flags; // flags for cursor
// //
// flags that are returned in table_flags() // flags that are returned in table_flags()
// //
...@@ -432,6 +433,7 @@ class ha_tokudb : public handler { ...@@ -432,6 +433,7 @@ class ha_tokudb : public handler {
ha_rows records_in_range(uint inx, key_range * min_key, key_range * max_key); ha_rows records_in_range(uint inx, key_range * min_key, key_range * max_key);
u_int32_t get_cursor_isolation_flags(enum thr_lock_type lock_type, THD* thd);
THR_LOCK_DATA **store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_lock_type lock_type); THR_LOCK_DATA **store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_lock_type lock_type);
int get_status(); int get_status();
......
...@@ -81,6 +81,7 @@ typedef enum { ...@@ -81,6 +81,7 @@ typedef enum {
hatoku_iso_not_set = 0, hatoku_iso_not_set = 0,
hatoku_iso_read_uncommitted, hatoku_iso_read_uncommitted,
hatoku_iso_read_committed, hatoku_iso_read_committed,
hatoku_iso_repeatable_read,
hatoku_iso_serializable hatoku_iso_serializable
} HA_TOKU_ISO_LEVEL; } HA_TOKU_ISO_LEVEL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment