Commit 3af2681d authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3038], move changes to main

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@25704 c7de825b-a66e-492c-adef-691d508d4ae1
parent afe751ef
......@@ -174,6 +174,7 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share)
}
thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
my_rwlock_init(&share->num_DBs_lock, 0);
}
exit:
......@@ -241,6 +242,7 @@ static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
my_hash_delete(&tokudb_open_tables, (uchar *) share);
thr_lock_delete(&share->lock);
pthread_mutex_destroy(&share->mutex);
rwlock_destroy(&share->num_DBs_lock);
my_free((uchar *) share, MYF(0));
}
pthread_mutex_unlock(&tokudb_mutex);
......@@ -323,6 +325,17 @@ typedef struct index_read_info {
} *INDEX_READ_INFO;
int ai_poll_fun(void *extra, float progress) {
LOADER_CONTEXT context = (LOADER_CONTEXT)extra;
if (context->thd->killed) {
sprintf(context->write_status_msg, "The process has been killed, aborting add index.");
return 1;
}
sprintf(context->write_status_msg, "Adding of indexes about %.1f%% done", progress*100);
thd_proc_info(context->thd, context->write_status_msg);
return 0;
}
int poll_fun(void *extra, float progress) {
LOADER_CONTEXT context = (LOADER_CONTEXT)extra;
if (context->thd->killed) {
......@@ -1640,6 +1653,8 @@ int ha_tokudb::initialize_share(
share->try_table_lock = false;
}
share->num_DBs = table_share->keys + test(hidden_primary_key);
error = 0;
exit:
return error;
......@@ -2984,7 +2999,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
}
exit_try_table_lock:
pthread_mutex_lock(&share->mutex);
share->try_table_lock = false;
share->try_table_lock = false; // RFP what good is the mutex?
pthread_mutex_unlock(&share->mutex);
}
DBUG_VOID_RETURN;
......@@ -3564,7 +3579,7 @@ int ha_tokudb::write_row(uchar * record) {
DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL;
tokudb_trx_data *trx = NULL;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
uint curr_num_DBs;
bool create_sub_trans = false;
//
......@@ -3604,6 +3619,11 @@ int ha_tokudb::write_row(uchar * record) {
pthread_mutex_unlock(&share->mutex);
}
//
// grab reader lock on numDBs_lock
//
rw_rdlock(&share->num_DBs_lock);
curr_num_DBs = share->num_DBs;
if (hidden_primary_key) {
get_auto_primary_key(current_ident);
......@@ -3668,6 +3688,7 @@ int ha_tokudb::write_row(uchar * record) {
track_progress(thd);
}
cleanup:
rw_unlock(&share->num_DBs_lock);
if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY;
}
......@@ -3726,7 +3747,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
uint curr_num_DBs;
ulonglong wait_lock_time = get_write_lock_wait_time(thd);
LINT_INIT(error);
......@@ -3763,6 +3784,12 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
pthread_mutex_unlock(&share->mutex);
}
//
// grab reader lock on numDBs_lock
//
rw_rdlock(&share->num_DBs_lock);
curr_num_DBs = share->num_DBs;
if (using_ignore) {
error = db_env->txn_begin(db_env, transaction, &sub_trans, DB_INHERIT_ISOLATION);
if (error) {
......@@ -3861,6 +3888,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
cleanup:
rw_unlock(&share->num_DBs_lock);
if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY;
}
......@@ -3878,48 +3906,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
TOKUDB_DBUG_RETURN(error);
}
//
//
// Delete one key in key_file[keynr]
// This uses key_buff2, when keynr != primary key, so it's important that
// a function that calls this doesn't use this buffer for anything else.
// Parameters:
// [in] trans - transaction to be used for the delete
// keynr - index for which a key needs to be deleted
// [in] record - row in MySQL format. Must delete a key for this row
// [in] prim_key - key for record in primary table
// Returns:
// 0 on success
// error otherwise
//
int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::remove_key");
int error = 0;
DBT key;
bool has_null;
ulonglong wait_lock_time = get_write_lock_wait_time(ha_thd());
DBUG_PRINT("enter", ("index: %d", keynr));
DBUG_PRINT("primary", ("index: %d", primary_key));
DBUG_DUMP("prim_key", (uchar *) prim_key->data, prim_key->size);
if (keynr == primary_key) { // Unique key
DBUG_PRINT("Primary key", ("index: %d", keynr));
lockretryN(wait_lock_time){
error = share->key_file[keynr]->del(share->key_file[keynr], trans, prim_key , DB_DELETE_ANY);
lockretry_wait;
}
}
else {
DBUG_PRINT("Secondary key", ("index: %d", keynr));
create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null);
lockretryN(wait_lock_time){
error = share->key_file[keynr]->del(share->key_file[keynr], trans, &key , DB_DELETE_ANY);
lockretry_wait;
}
}
TOKUDB_DBUG_RETURN(error);
}
//
// Deletes a row in the table, called when handling a DELETE query
// Parameters:
......@@ -3935,11 +3921,17 @@ int ha_tokudb::delete_row(const uchar * record) {
bool has_null;
THD* thd = ha_thd();
ulonglong wait_lock_time = get_write_lock_wait_time(thd);
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
uint curr_num_DBs;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status);
//
// grab reader lock on numDBs_lock
//
rw_rdlock(&share->num_DBs_lock);
curr_num_DBs = share->num_DBs;
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null);
if (table_share->blob_fields) {
if (fix_rec_buff_for_blob(max_row_length(record))) {
......@@ -3974,6 +3966,7 @@ int ha_tokudb::delete_row(const uchar * record) {
track_progress(thd);
}
cleanup:
rw_unlock(&share->num_DBs_lock);
TOKUDB_DBUG_RETURN(error);
}
......@@ -5230,7 +5223,8 @@ int ha_tokudb::reset(void) {
//
int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) {
int error = ENOSYS;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
rw_rdlock(&share->num_DBs_lock);
uint curr_num_DBs = share->num_DBs;
if (lt == lock_read) {
error = 0;
goto cleanup;
......@@ -5256,6 +5250,7 @@ int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) {
error = 0;
cleanup:
rw_unlock(&share->num_DBs_lock);
return error;
}
......@@ -5507,9 +5502,22 @@ u_int32_t ha_tokudb::get_cursor_isolation_flags(enum thr_lock_type lock_type, TH
THR_LOCK_DATA **ha_tokudb::store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_lock_type lock_type) {
TOKUDB_DBUG_ENTER("ha_tokudb::store_lock, lock_type=%d cmd=%d", lock_type, thd_sql_command(thd));
if (tokudb_debug & TOKUDB_DEBUG_LOCK)
if (tokudb_debug & TOKUDB_DEBUG_LOCK) {
TOKUDB_TRACE("%s lock_type=%d cmd=%d\n", __FUNCTION__, lock_type, thd_sql_command(thd));
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) {
}
if (lock_type == TL_WRITE_ALLOW_READ &&
get_create_index_online(thd) &&
thd_sql_command(thd)== SQLCOM_CREATE_INDEX
)
{
rw_rdlock(&share->num_DBs_lock);
if (share->num_DBs == (table->s->keys + test(hidden_primary_key))) {
lock_type = TL_WRITE_ALLOW_WRITE;
}
lock.type = lock_type;
rw_unlock(&share->num_DBs_lock);
}
else if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) {
/* If we are not doing a LOCK TABLE, then allow multiple writers */
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) &&
!thd->in_lock_tables && thd_sql_command(thd) != SQLCOM_TRUNCATE && !thd_tablespace_op(thd)) {
......@@ -6451,14 +6459,20 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
DB_TXN* txn = NULL;
THD* thd = ha_thd();
DB_LOADER* loader = NULL;
DB_INDEXER* indexer = NULL;
bool loader_use_puts = get_load_save_space(thd);
bool use_hot_index = (lock.type == TL_WRITE_ALLOW_WRITE);
u_int32_t loader_flags = loader_use_puts ? LOADER_USE_PUTS : 0;
u_int32_t indexer_flags = 0;
u_int32_t mult_db_flags[MAX_KEY + 1] = {0};
u_int32_t mult_put_flags[MAX_KEY + 1];
u_int32_t mult_dbt_flags[MAX_KEY + 1];
bool incremented_numDBs = false;
struct loader_context lc = {0};
lc.thd = thd;
lc.ha = this;
loader_error = 0;
bool rw_lock_taken = false;
for (u_int32_t i = 0; i < MAX_KEY+1; i++) {
mult_put_flags[i] = DB_YESOVERWRITE;
mult_dbt_flags[i] = DB_DBT_REALLOC;
......@@ -6501,6 +6515,8 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
}
}
rw_wrlock(&share->num_DBs_lock);
rw_lock_taken = true;
//
// open all the DB files and set the appropriate variables in share
// they go to the end of share->key_file
......@@ -6543,84 +6559,125 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
if (error) { goto cleanup; }
}
error = db_env->create_loader(
db_env,
txn,
&loader,
NULL, // no src_db needed
num_of_keys,
&share->key_file[curr_num_DBs],
mult_put_flags,
mult_dbt_flags,
loader_flags
);
if (error) { goto cleanup; }
if (use_hot_index && num_of_keys == 1 && (key_info[0].flags & HA_NOSAME) == 0) {
if (share->num_DBs > curr_num_DBs) {
//
// already have hot index in progress, get out
//
error = HA_ERR_INTERNAL_ERROR;
rw_unlock(&share->num_DBs_lock);
goto cleanup;
}
share->num_DBs++;
incremented_numDBs = true;
error = db_env->create_indexer(
db_env,
txn,
&indexer,
share->file,
num_of_keys,
&share->key_file[curr_num_DBs],
mult_db_flags,
indexer_flags
);
if (error) { goto cleanup; }
error = loader->set_poll_function(loader, poll_fun, &lc);
if (error) { goto cleanup; }
error = indexer->set_poll_function(indexer, ai_poll_fun, &lc);
if (error) { goto cleanup; }
error = loader->set_error_callback(loader, loader_ai_err_fun, &lc);
if (error) { goto cleanup; }
error = indexer->set_error_callback(indexer, loader_ai_err_fun, &lc);
if (error) { goto cleanup; }
//
// scan primary table, create each secondary key, add to each DB
//
if ((error = share->file->cursor(share->file, txn, &tmp_cursor, 0))) {
tmp_cursor = NULL; // Safety
goto cleanup;
}
rw_unlock(&share->num_DBs_lock);
rw_lock_taken = false;
error = indexer->build(indexer);
if (error) { goto cleanup; }
//
// grab some locks to make this go faster
// first a global read lock on the main DB, because
// we intend to scan the entire thing
//
lockretryN(read_lock_wait_time){
error = tmp_cursor->c_pre_acquire_read_lock(
tmp_cursor,
share->file->dbt_neg_infty(),
share->file->dbt_pos_infty()
);
lockretry_wait;
error = indexer->close(indexer);
if (error) { goto cleanup; }
indexer = NULL;
}
if (error) { goto cleanup; }
else {
rw_unlock(&share->num_DBs_lock);
rw_lock_taken = false;
error = db_env->create_loader(
db_env,
txn,
&loader,
NULL, // no src_db needed
num_of_keys,
&share->key_file[curr_num_DBs],
mult_put_flags,
mult_dbt_flags,
loader_flags
);
if (error) { goto cleanup; }
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
error = loader->set_poll_function(loader, poll_fun, &lc);
if (error) { goto cleanup; }
while (cursor_ret_val != DB_NOTFOUND) {
if (cursor_ret_val) {
error = cursor_ret_val;
error = loader->set_error_callback(loader, loader_ai_err_fun, &lc);
if (error) { goto cleanup; }
//
// scan primary table, create each secondary key, add to each DB
//
if ((error = share->file->cursor(share->file, txn, &tmp_cursor, 0))) {
tmp_cursor = NULL; // Safety
goto cleanup;
}
error = loader->put(loader, &curr_pk_key, &curr_pk_val);
//
// grab some locks to make this go faster
// first a global read lock on the main DB, because
// we intend to scan the entire thing
//
lockretryN(read_lock_wait_time){
error = tmp_cursor->c_pre_acquire_read_lock(
tmp_cursor,
share->file->dbt_neg_infty(),
share->file->dbt_pos_infty()
);
lockretry_wait;
}
if (error) { goto cleanup; }
num_processed++;
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
if ((num_processed % 1000) == 0) {
if (loader_use_puts) {
sprintf(status_msg, "Adding indexes: Processed %llu of about %llu rows.", num_processed, (long long unsigned) share->rows);
}
else {
sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.", num_processed, (long long unsigned) share->rows);
}
thd_proc_info(thd, status_msg);
if (thd->killed) {
error = ER_ABORTING_CONNECTION;
while (cursor_ret_val != DB_NOTFOUND) {
if (cursor_ret_val) {
error = cursor_ret_val;
goto cleanup;
}
}
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
}
error = tmp_cursor->c_close(tmp_cursor);
assert(error==0);
tmp_cursor = NULL;
error = loader->close(loader);
loader = NULL;
if (error) goto cleanup;
error = loader->put(loader, &curr_pk_key, &curr_pk_val);
if (error) { goto cleanup; }
num_processed++;
if ((num_processed % 1000) == 0) {
if (loader_use_puts) {
sprintf(status_msg, "Adding indexes: Processed %llu of about %llu rows.", num_processed, (long long unsigned) share->rows);
}
else {
sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.", num_processed, (long long unsigned) share->rows);
}
thd_proc_info(thd, status_msg);
if (thd->killed) {
error = ER_ABORTING_CONNECTION;
goto cleanup;
}
}
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
}
error = tmp_cursor->c_close(tmp_cursor);
assert(error==0);
tmp_cursor = NULL;
error = loader->close(loader);
loader = NULL;
if (error) goto cleanup;
}
curr_index = curr_num_DBs;
for (uint i = 0; i < num_of_keys; i++, curr_index++) {
if (key_info[i].flags & HA_NOSAME) {
......@@ -6658,6 +6715,10 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
error = 0;
cleanup:
if (rw_lock_taken) {
rw_unlock(&share->num_DBs_lock);
rw_lock_taken = false;
}
if (tmp_cursor) {
int r = tmp_cursor->c_close(tmp_cursor);
assert(r==0);
......@@ -6668,6 +6729,11 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
thd_proc_info(thd, status_msg);
loader->abort(loader);
}
if (indexer != NULL) {
sprintf(status_msg, "aborting creation of indexes.");
thd_proc_info(thd, status_msg);
indexer->abort(indexer);
}
if (txn) {
if (error) {
curr_index = curr_num_DBs;
......@@ -6692,6 +6758,14 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
another transaction has accessed the table. \
To add indexes, make sure no transactions touch the table.", share->table_name);
}
//
// need to restore num_DBs
//
if (error && incremented_numDBs) {
rw_wrlock(&share->num_DBs_lock);
share->num_DBs--;
rw_unlock(&share->num_DBs_lock);
}
TOKUDB_DBUG_RETURN(error ? error : loader_error);
}
......
......@@ -79,6 +79,8 @@ typedef struct st_tokudb_share {
bool has_unique_keys;
bool replace_into_fast;
rw_lock_t num_DBs_lock;
u_int32_t num_DBs;
} TOKUDB_SHARE;
#define HA_TOKU_VERSION 3
......@@ -309,7 +311,6 @@ class ha_tokudb : public handler {
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT* create_dbt_key_for_lookup(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, int8_t inf_byte);
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row);
int handle_cursor_error(int error, int err_to_return, uint keynr);
DBT *get_pos(DBT * to, uchar * pos);
......
......@@ -98,6 +98,13 @@ static MYSQL_THDVAR_BOOL(load_save_space,
NULL,
FALSE
);
static MYSQL_THDVAR_BOOL(create_index_online,
0,
"if on, create index done online",
NULL,
NULL,
FALSE
);
static MYSQL_THDVAR_BOOL(prelock_empty,
0,
"Tokudb Prelock Empty Table",
......@@ -545,6 +552,10 @@ bool get_load_save_space(THD* thd) {
return (THDVAR(thd, load_save_space) != 0);
}
bool get_create_index_online(THD* thd) {
return (THDVAR(thd, create_index_online) != 0);
}
bool get_prelock_empty(THD* thd) {
return (THDVAR(thd, prelock_empty) != 0);
}
......@@ -1423,6 +1434,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = {
MYSQL_SYSVAR(read_lock_wait),
MYSQL_SYSVAR(pk_insert_mode),
MYSQL_SYSVAR(load_save_space),
MYSQL_SYSVAR(create_index_online),
MYSQL_SYSVAR(version),
MYSQL_SYSVAR(init_flags),
MYSQL_SYSVAR(checkpointing_period),
......
......@@ -15,6 +15,7 @@ ulonglong get_write_lock_wait_time (THD* thd);
ulonglong get_read_lock_wait_time (THD* thd);
uint get_pk_insert_mode(THD* thd);
bool get_load_save_space(THD* thd);
bool get_create_index_online(THD* thd);
bool get_prelock_empty(THD* thd);
extern HASH tokudb_open_tables;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment