Commit df93c2e0 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2355], make waits on writes user controlled

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@17707 c7de825b-a66e-492c-adef-691d508d4ae1
parent c19b9fd9
...@@ -50,9 +50,9 @@ static const char *ha_tokudb_exts[] = { ...@@ -50,9 +50,9 @@ static const char *ha_tokudb_exts[] = {
}; };
#define lockretryN(N) \ #define lockretryN(N) \
for (int lockretrycount=0; lockretrycount<(N); lockretrycount++) for (ulonglong lockretrycount=0; lockretrycount<(N/(1<<3) + 1); lockretrycount++)
#define lockretry lockretryN(100) #define lockretry lockretryN(800)
#define lockretry_wait \ #define lockretry_wait \
if (error != DB_LOCK_NOTGRANTED) { \ if (error != DB_LOCK_NOTGRANTED) { \
...@@ -61,7 +61,13 @@ static const char *ha_tokudb_exts[] = { ...@@ -61,7 +61,13 @@ static const char *ha_tokudb_exts[] = {
if (tokudb_debug & TOKUDB_DEBUG_LOCKRETRY) { \ if (tokudb_debug & TOKUDB_DEBUG_LOCKRETRY) { \
TOKUDB_TRACE("%s count=%d\n", __FUNCTION__, lockretrycount); \ TOKUDB_TRACE("%s count=%d\n", __FUNCTION__, lockretrycount); \
} \ } \
usleep((lockretrycount<4 ? (1<<lockretrycount) : (1<<3)) * 1024); \ if (lockretrycount%200 == 0) { \
if (ha_thd()->killed) { \
error = DB_LOCK_NOTGRANTED; \
break; \
} \
} \
usleep((lockretrycount<4 ? (1<<lockretrycount) : (1<<3)) * 1024); \
// //
// This offset is calculated starting from AFTER the NULL bytes // This offset is calculated starting from AFTER the NULL bytes
...@@ -3009,6 +3015,7 @@ int ha_tokudb::insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_v ...@@ -3009,6 +3015,7 @@ int ha_tokudb::insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_v
THD *thd = ha_thd(); THD *thd = ha_thd();
bool is_replace_into; bool is_replace_into;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key); uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
ulonglong wait_lock_time = get_write_lock_wait_time(thd);
is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) || is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) ||
(thd_sql_command(thd) == SQLCOM_REPLACE_SELECT); (thd_sql_command(thd) == SQLCOM_REPLACE_SELECT);
...@@ -3035,7 +3042,7 @@ int ha_tokudb::insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_v ...@@ -3035,7 +3042,7 @@ int ha_tokudb::insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_v
} }
lockretry { lockretryN(wait_lock_time){
error = share->file->put( error = share->file->put(
share->file, share->file,
txn, txn,
...@@ -3073,7 +3080,7 @@ int ha_tokudb::insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_v ...@@ -3073,7 +3080,7 @@ int ha_tokudb::insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_v
bzero((void *) &row, sizeof(row)); bzero((void *) &row, sizeof(row));
} }
lockretry { lockretryN(wait_lock_time){
error = share->key_file[keynr]->put( error = share->key_file[keynr]->put(
share->key_file[keynr], share->key_file[keynr],
txn, txn,
...@@ -3101,6 +3108,7 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN ...@@ -3101,6 +3108,7 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN
int error; int error;
bool is_replace_into; bool is_replace_into;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key); uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
ulonglong wait_lock_time = get_write_lock_wait_time(thd);
is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) || is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) ||
(thd_sql_command(thd) == SQLCOM_REPLACE_SELECT); (thd_sql_command(thd) == SQLCOM_REPLACE_SELECT);
...@@ -3111,19 +3119,22 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN ...@@ -3111,19 +3119,22 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN
share->mult_put_flags[primary_key] = DB_NOOVERWRITE; share->mult_put_flags[primary_key] = DB_NOOVERWRITE;
} }
error = db_env->put_multiple( lockretryN(wait_lock_time){
db_env, error = db_env->put_multiple(
NULL, db_env,
txn, NULL,
pk_key, txn,
pk_val, pk_key,
curr_num_DBs, pk_val,
share->key_file, curr_num_DBs,
mult_key_dbt, share->key_file,
mult_rec_dbt, mult_key_dbt,
share->mult_put_flags, mult_rec_dbt,
NULL share->mult_put_flags,
); NULL
);
lockretry_wait;
}
// //
// We break if we hit an error, unless it is a dup key error // We break if we hit an error, unless it is a dup key error
...@@ -3310,6 +3321,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3310,6 +3321,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
DBT rec_dbts[MAX_KEY + 1]; DBT rec_dbts[MAX_KEY + 1];
u_int32_t curr_db_index; u_int32_t curr_db_index;
bool use_put_multiple = share->version > 2; bool use_put_multiple = share->version > 2;
ulonglong wait_lock_time = get_write_lock_wait_time(thd);
LINT_INIT(error); LINT_INIT(error);
bzero((void *) &row, sizeof(row)); bzero((void *) &row, sizeof(row));
...@@ -3456,14 +3468,16 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3456,14 +3468,16 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
// make sure that for clustering keys, we are using DB_YESOVERWRITE, // make sure that for clustering keys, we are using DB_YESOVERWRITE,
// therefore making this put an overwrite if the key has not changed // therefore making this put an overwrite if the key has not changed
// //
error = share->key_file[keynr]->put( lockretryN(wait_lock_time){
share->key_file[keynr], error = share->key_file[keynr]->put(
txn, share->key_file[keynr],
&key, txn,
&row, &key,
put_flags &row,
); put_flags
);
lockretry_wait;
}
// //
// We break if we hit an error, unless it is a dup key error // We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors // and MySQL told us to ignore duplicate key errors
...@@ -3483,19 +3497,22 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3483,19 +3497,22 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
} }
if (use_put_multiple) { if (use_put_multiple) {
error = db_env->put_multiple( lockretryN(wait_lock_time){
db_env, error = db_env->put_multiple(
NULL, db_env,
txn, NULL,
&prim_key, txn,
&prim_row, &prim_key,
curr_db_index, &prim_row,
dbs, curr_db_index,
key_dbts, dbs,
rec_dbts, key_dbts,
mult_put_flags, rec_dbts,
NULL mult_put_flags,
); NULL
);
lockretry_wait;
}
} }
if (!error) { if (!error) {
trx->stmt_progress.updated++; trx->stmt_progress.updated++;
...@@ -3540,18 +3557,25 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT ...@@ -3540,18 +3557,25 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
int error; int error;
DBT key; DBT key;
bool has_null; bool has_null;
ulonglong wait_lock_time = get_write_lock_wait_time(ha_thd());
DBUG_PRINT("enter", ("index: %d", keynr)); DBUG_PRINT("enter", ("index: %d", keynr));
DBUG_PRINT("primary", ("index: %d", primary_key)); DBUG_PRINT("primary", ("index: %d", primary_key));
DBUG_DUMP("prim_key", (uchar *) prim_key->data, prim_key->size); DBUG_DUMP("prim_key", (uchar *) prim_key->data, prim_key->size);
if (keynr == primary_key) { // Unique key if (keynr == primary_key) { // Unique key
DBUG_PRINT("Primary key", ("index: %d", keynr)); DBUG_PRINT("Primary key", ("index: %d", keynr));
error = share->key_file[keynr]->del(share->key_file[keynr], trans, prim_key , DB_DELETE_ANY); lockretryN(wait_lock_time){
error = share->key_file[keynr]->del(share->key_file[keynr], trans, prim_key , DB_DELETE_ANY);
lockretry_wait;
}
} }
else { else {
DBUG_PRINT("Secondary key", ("index: %d", keynr)); DBUG_PRINT("Secondary key", ("index: %d", keynr));
create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null); create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null);
error = share->key_file[keynr]->del(share->key_file[keynr], trans, &key , DB_DELETE_ANY); lockretryN(wait_lock_time){
error = share->key_file[keynr]->del(share->key_file[keynr], trans, &key , DB_DELETE_ANY);
lockretry_wait;
}
} }
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
......
...@@ -51,6 +51,16 @@ static uchar *tokudb_get_key(TOKUDB_SHARE * share, size_t * length, my_bool not_ ...@@ -51,6 +51,16 @@ static uchar *tokudb_get_key(TOKUDB_SHARE * share, size_t * length, my_bool not_
static handler *tokudb_create_handler(handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root); static handler *tokudb_create_handler(handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root);
static MYSQL_THDVAR_BOOL(commit_sync, PLUGIN_VAR_THDLOCAL, "sync on txn commit", static MYSQL_THDVAR_BOOL(commit_sync, PLUGIN_VAR_THDLOCAL, "sync on txn commit",
/* check */ NULL, /* update */ NULL, /* default*/ TRUE); /* check */ NULL, /* update */ NULL, /* default*/ TRUE);
static MYSQL_THDVAR_ULONGLONG(write_lock_wait,
0,
"time waiting for write lock",
NULL,
NULL,
5000, // default
0, // min?
1<<63, // max
1 // blocksize
);
static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer); static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer);
static void tokudb_cleanup_log_files(void); static void tokudb_cleanup_log_files(void);
...@@ -431,6 +441,11 @@ bool tokudb_flush_logs(handlerton * hton) { ...@@ -431,6 +441,11 @@ bool tokudb_flush_logs(handlerton * hton) {
TOKUDB_DBUG_RETURN(result); TOKUDB_DBUG_RETURN(result);
} }
ulonglong get_write_lock_wait_time (THD* thd) {
ulonglong ret_val = THDVAR(thd, write_lock_wait);
return (ret_val == 0) ? ULONGLONG_MAX : ret_val;
}
static int tokudb_commit(handlerton * hton, THD * thd, bool all) { static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
TOKUDB_DBUG_ENTER("tokudb_commit"); TOKUDB_DBUG_ENTER("tokudb_commit");
DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt")); DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt"));
...@@ -1016,6 +1031,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = { ...@@ -1016,6 +1031,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = {
MYSQL_SYSVAR(log_dir), MYSQL_SYSVAR(log_dir),
MYSQL_SYSVAR(debug), MYSQL_SYSVAR(debug),
MYSQL_SYSVAR(commit_sync), MYSQL_SYSVAR(commit_sync),
MYSQL_SYSVAR(write_lock_wait),
MYSQL_SYSVAR(version), MYSQL_SYSVAR(version),
MYSQL_SYSVAR(init_flags), MYSQL_SYSVAR(init_flags),
MYSQL_SYSVAR(checkpointing_period), MYSQL_SYSVAR(checkpointing_period),
......
...@@ -11,7 +11,7 @@ extern DB *metadata_db; ...@@ -11,7 +11,7 @@ extern DB *metadata_db;
// thread variables // thread variables
ulonglong get_write_lock_wait_time (THD* thd);
extern HASH tokudb_open_tables; extern HASH tokudb_open_tables;
extern pthread_mutex_t tokudb_mutex; extern pthread_mutex_t tokudb_mutex;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment