Commit 01a85de7 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2811], merge handlerton MVCC changes to main line

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@23498 c7de825b-a66e-492c-adef-691d508d4ae1
parent 199c9e9e
......@@ -580,6 +580,9 @@ inline HA_TOKU_ISO_LEVEL tx_to_toku_iso(ulong tx_isolation) {
else if (tx_isolation == ISO_READ_COMMITTED) {
return hatoku_iso_read_committed;
}
else if (tx_isolation == ISO_REPEATABLE_READ) {
return hatoku_iso_repeatable_read;
}
else {
return hatoku_iso_serializable;
}
......@@ -592,6 +595,9 @@ inline u_int32_t toku_iso_to_txn_flag (HA_TOKU_ISO_LEVEL lvl) {
else if (lvl == hatoku_iso_read_committed) {
return DB_READ_COMMITTED;
}
else if (lvl == hatoku_iso_repeatable_read) {
return DB_TXN_SNAPSHOT;
}
else {
return 0;
}
......@@ -1143,6 +1149,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
loader = NULL;
abort_loader = false;
bzero(&lc, sizeof(lc));
lock.type = TL_IGNORE;
}
//
......@@ -3000,7 +3007,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
db,
txn,
&tmp_cursor1,
0
DB_SERIALIZABLE
);
if (error) { goto cleanup; }
......@@ -3008,7 +3015,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
db,
txn,
&tmp_cursor2,
0
DB_SERIALIZABLE
);
if (error) { goto cleanup; }
......@@ -3169,7 +3176,7 @@ int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint
share->key_file[dict_index],
txn,
&tmp_cursor,
0
DB_SERIALIZABLE
);
if (error) { goto cleanup; }
......@@ -3996,10 +4003,10 @@ void ha_tokudb::column_bitmaps_signal() {
int ha_tokudb::prepare_index_scan() {
int error = 0;
DB* db = share->key_file[active_index];
HANDLE_INVALID_CURSOR();
lockretryN(read_lock_wait_time){
error = db->pre_acquire_read_lock(
db,
transaction,
error = cursor->c_pre_acquire_read_lock(
cursor,
db->dbt_neg_infty(),
db->dbt_pos_infty()
);
......@@ -4024,13 +4031,13 @@ cleanup:
int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) {
int error = 0;
DBT start_key, end_key;
HANDLE_INVALID_CURSOR();
pack_key(&start_key, active_index, key_buff, key, key_len, COL_NEG_INF);
pack_key(&end_key, active_index, key_buff2, key, key_len, COL_POS_INF);
lockretryN(read_lock_wait_time){
error = share->key_file[active_index]->pre_acquire_read_lock(
share->key_file[active_index],
transaction,
error = cursor->c_pre_acquire_read_lock(
cursor,
&start_key,
&end_key
);
......@@ -4089,7 +4096,12 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
range_lock_grabbed = false;
DBUG_ASSERT(keynr <= table->s->keys);
DBUG_ASSERT(share->key_file[keynr]);
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, 0))) {
cursor_flags = get_cursor_isolation_flags(lock.type, thd);
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, cursor_flags))) {
if (error == TOKUDB_MVCC_DICTIONARY_TOO_NEW) {
my_error(ER_TABLE_DEF_CHANGED, MYF(0));
}
table->status = STATUS_NOT_FOUND;
last_cursor_error = error;
cursor = NULL; // Safety
goto exit;
......@@ -4146,7 +4158,7 @@ int ha_tokudb::handle_cursor_error(int error, int err_to_return, uint keynr) {
cursor = NULL;
if (error == DB_NOTFOUND) {
error = err_to_return;
if ((share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, 0))) {
if ((share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, cursor_flags))) {
cursor = NULL; // Safety
}
}
......@@ -4281,7 +4293,7 @@ int ha_tokudb::read_full_row(uchar * buf) {
error = share->file->getf_set(
share->file,
transaction,
0,
cursor_flags,
&last_key,
smart_dbt_callback_rowread_ptquery,
&info
......@@ -4684,17 +4696,17 @@ int ha_tokudb::rnd_init(bool scan) {
int error = 0;
read_lock_wait_time = get_read_lock_wait_time(ha_thd());
range_lock_grabbed = false;
error = index_init(primary_key, 0);
if (error) { goto cleanup;}
if (scan) {
DB* db = share->key_file[primary_key];
lockretryN(read_lock_wait_time){
error = db->pre_acquire_read_lock(db, transaction, db->dbt_neg_infty(), db->dbt_pos_infty());
error = cursor->c_pre_acquire_read_lock(cursor, db->dbt_neg_infty(), db->dbt_pos_infty());
lockretry_wait;
}
if (error) { last_cursor_error = error; goto cleanup; }
}
error = index_init(primary_key, 0);
if (error) { goto cleanup;}
//
// only want to set range_lock_grabbed to true after index_init
// successfully executed for two reasons:
......@@ -4841,7 +4853,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
info.keynr = primary_key;
lockretryN(read_lock_wait_time) {
error = share->file->getf_set(share->file, transaction, 0, key, smart_dbt_callback_rowread_ptquery, &info);
error = share->file->getf_set(share->file, transaction, get_cursor_isolation_flags(lock.type, ha_thd()), key, smart_dbt_callback_rowread_ptquery, &info);
lockretry_wait;
}
......@@ -4867,6 +4879,7 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
bzero((void *) &start_dbt_key, sizeof(start_dbt_key));
bzero((void *) &end_dbt_key, sizeof(end_dbt_key));
HANDLE_INVALID_CURSOR();
if (start_key) {
switch (start_key->flag) {
case HA_READ_AFTER_KEY:
......@@ -4877,6 +4890,7 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
break;
}
}
if (end_key) {
switch (end_key->flag) {
case HA_READ_BEFORE_KEY:
......@@ -4890,9 +4904,8 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
}
lockretryN(read_lock_wait_time){
error = share->key_file[active_index]->pre_acquire_read_lock(
share->key_file[active_index],
transaction,
error = cursor->c_pre_acquire_read_lock(
cursor,
start_key ? &start_dbt_key : share->key_file[active_index]->dbt_neg_infty(),
end_key ? &end_dbt_key : share->key_file[active_index]->dbt_pos_infty()
);
......@@ -5158,17 +5171,8 @@ int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) {
int error = ENOSYS;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
if (lt == lock_read) {
for (uint i = 0; i < curr_num_DBs; i++) {
DB* db = share->key_file[i];
error = db->pre_acquire_read_lock(
db,
trans,
db->dbt_neg_infty(),
db->dbt_pos_infty()
);
if (error) break;
}
if (error) goto cleanup;
error = 0;
goto cleanup;
}
else if (lt == lock_write) {
if (tokudb_debug & TOKUDB_DEBUG_LOCK)
......@@ -5390,6 +5394,22 @@ cleanup:
TOKUDB_DBUG_RETURN(error);
}
u_int32_t ha_tokudb::get_cursor_isolation_flags(enum thr_lock_type lock_type, THD* thd) {
uint sql_command = thd_sql_command(thd);
bool in_lock_tables = thd_in_lock_tables(thd);
if ((lock_type == TL_READ && in_lock_tables) ||
(lock_type == TL_READ_HIGH_PRIORITY && in_lock_tables) ||
sql_command != SQLCOM_SELECT)
{
return DB_SERIALIZABLE;
}
else {
return 0;
}
}
/*
The idea with handler::store_lock() is the following:
......@@ -6140,6 +6160,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
ha_rows ret_val = HA_TOKUDB_RANGE_COUNT;
DB *kfile = share->key_file[keynr];
u_int64_t less, equal, greater;
u_int64_t total_rows_estimate = HA_TOKUDB_RANGE_COUNT;
u_int64_t start_rows, end_rows, rows;
int is_exact;
int error;
......@@ -6152,6 +6173,14 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
// As a result, equal may be 0 and greater may actually be equal+greater
// So, we call key_range64 on the key, and the key that is after it.
//
if (!start_key && !end_key) {
error = estimate_num_rows(kfile, &end_rows, transaction);
if (error) {
ret_val = HA_TOKUDB_RANGE_COUNT;
goto cleanup;
}
start_rows = 0;
}
if (start_key) {
inf_byte = (start_key->flag == HA_READ_KEY_EXACT) ?
COL_NEG_INF : COL_POS_INF;
......@@ -6177,6 +6206,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
goto cleanup;
}
start_rows= less;
total_rows_estimate = less + equal + greater;
}
else {
start_rows= 0;
......@@ -6209,7 +6239,11 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
end_rows= less;
}
else {
end_rows = stats.records;
//
// first if-clause ensures that start_key is non-NULL
//
assert(start_key);
end_rows = total_rows_estimate;
}
rows = (end_rows > start_rows) ? end_rows - start_rows : 1;
......@@ -6437,22 +6471,6 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
if (error) { goto cleanup; }
}
//
// grab some locks to make this go faster
// first a global read lock on the main DB, because
// we intend to scan the entire thing
//
lockretryN(read_lock_wait_time){
error = share->file->pre_acquire_read_lock(
share->file,
txn,
share->file->dbt_neg_infty(),
share->file->dbt_pos_infty()
);
lockretry_wait;
}
if (error) { goto cleanup; }
error = db_env->create_loader(
db_env,
txn,
......@@ -6480,6 +6498,21 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
goto cleanup;
}
//
// grab some locks to make this go faster
// first a global read lock on the main DB, because
// we intend to scan the entire thing
//
lockretryN(read_lock_wait_time){
error = tmp_cursor->c_pre_acquire_read_lock(
tmp_cursor,
share->file->dbt_neg_infty(),
share->file->dbt_pos_infty()
);
lockretry_wait;
}
if (error) { goto cleanup; }
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
while (cursor_ret_val != DB_NOTFOUND) {
......@@ -6762,24 +6795,21 @@ int ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) {
//
txn = trx->sub_sp_level ? trx->sub_sp_level : trx->sp_level;
if (txn == NULL) {
error = db_env->txn_begin(db_env, NULL, &txn, 0);
error = db_env->txn_begin(db_env, NULL, &txn, DB_READ_UNCOMMITTED);
if (error) {
goto cleanup;
}
do_commit = true;
}
//
// prelock so each scan goes faster
//
error = acquire_table_lock(txn,lock_read);
if (error) {
goto cleanup;
}
//
// for each DB, scan through entire table and do nothing
//
for (uint i = 0; i < curr_num_DBs; i++) {
error = share->key_file[i]->optimize(share->key_file[i]);
if (error) {
goto cleanup;
}
error = share->key_file[i]->cursor(share->key_file[i], txn, &tmp_cursor, 0);
if (error) {
tmp_cursor = NULL;
......
......@@ -211,6 +211,7 @@ private:
// instance of cursor being used for init_xxx and rnd_xxx functions
//
DBC *cursor;
u_int32_t cursor_flags; // flags for cursor
//
// flags that are returned in table_flags()
//
......@@ -432,6 +433,7 @@ public:
ha_rows records_in_range(uint inx, key_range * min_key, key_range * max_key);
u_int32_t get_cursor_isolation_flags(enum thr_lock_type lock_type, THD* thd);
THR_LOCK_DATA **store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_lock_type lock_type);
int get_status();
......
......@@ -81,6 +81,7 @@ typedef enum {
hatoku_iso_not_set = 0,
hatoku_iso_read_uncommitted,
hatoku_iso_read_committed,
hatoku_iso_repeatable_read,
hatoku_iso_serializable
} HA_TOKU_ISO_LEVEL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment