Commit 24a9c87e authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2033], add process info for queries

git-svn-id: file:///svn/mysql/tokudb-engine/src@14784 c7de825b-a66e-492c-adef-691d508d4ae1
parent d04d4f1a
...@@ -1005,9 +1005,6 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t ...@@ -1005,9 +1005,6 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
using_ignore = 0; using_ignore = 0;
last_cursor_error = 0; last_cursor_error = 0;
range_lock_grabbed = false; range_lock_grabbed = false;
num_added_rows_in_stmt = 0;
num_deleted_rows_in_stmt = 0;
num_updated_rows_in_stmt = 0;
blob_buff = NULL; blob_buff = NULL;
num_blob_bytes = 0; num_blob_bytes = 0;
delay_updating_ai_metadata = false; delay_updating_ai_metadata = false;
...@@ -2635,7 +2632,7 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2635,7 +2632,7 @@ int ha_tokudb::write_row(uchar * record) {
TOKUDB_DBUG_ENTER("ha_tokudb::write_row"); TOKUDB_DBUG_ENTER("ha_tokudb::write_row");
DBT row, prim_key, key; DBT row, prim_key, key;
int error; int error;
THD *thd = NULL; THD *thd = ha_thd();
u_int32_t put_flags; u_int32_t put_flags;
bool has_null; bool has_null;
DB_TXN* sub_trans = NULL; DB_TXN* sub_trans = NULL;
...@@ -2643,10 +2640,9 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2643,10 +2640,9 @@ int ha_tokudb::write_row(uchar * record) {
bool is_replace_into; bool is_replace_into;
tokudb_trx_data *trx = NULL; tokudb_trx_data *trx = NULL;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key); uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
declare_lockretry;
declare_lockretry;
thd = ha_thd();
is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) || is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) ||
(thd_sql_command(thd) == SQLCOM_REPLACE_SELECT); (thd_sql_command(thd) == SQLCOM_REPLACE_SELECT);
...@@ -2807,11 +2803,8 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2807,11 +2803,8 @@ int ha_tokudb::write_row(uchar * record) {
if (!error) { if (!error) {
added_rows++; added_rows++;
num_added_rows_in_stmt++; trx->stmt_progress.inserted++;
if ((num_added_rows_in_stmt % 1000) == 0) { track_progress(thd);
sprintf(write_status_msg, "Inserted about %llu rows", num_added_rows_in_stmt);
thd_proc_info(thd, write_status_msg);
}
} }
cleanup: cleanup:
if (error == DB_KEYEXIST) { if (error == DB_KEYEXIST) {
...@@ -2915,6 +2908,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2915,6 +2908,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
THD* thd = ha_thd(); THD* thd = ha_thd();
DB_TXN* sub_trans = NULL; DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
// //
// this can only fail if we have not opened the environment // this can only fail if we have not opened the environment
...@@ -3050,11 +3044,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3050,11 +3044,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
} }
if (!error) { if (!error) {
num_updated_rows_in_stmt++; trx->stmt_progress.updated++;
if ((num_updated_rows_in_stmt % 1000) == 0) { track_progress(thd);
sprintf(write_status_msg, "Updated about %llu rows", num_updated_rows_in_stmt);
thd_proc_info(thd, write_status_msg);
}
} }
...@@ -3167,6 +3158,7 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -3167,6 +3158,7 @@ int ha_tokudb::delete_row(const uchar * record) {
key_map keys = table_share->keys_in_use; key_map keys = table_share->keys_in_use;
bool has_null; bool has_null;
THD* thd = ha_thd(); THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
// //
// this can only fail if we have not opened the environment // this can only fail if we have not opened the environment
...@@ -3190,11 +3182,8 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -3190,11 +3182,8 @@ int ha_tokudb::delete_row(const uchar * record) {
} }
else { else {
deleted_rows++; deleted_rows++;
num_deleted_rows_in_stmt++; trx->stmt_progress.deleted++;
if ((num_deleted_rows_in_stmt % 1000) == 0) { track_progress(thd);
sprintf(write_status_msg, "Deleted about %llu rows", num_deleted_rows_in_stmt);
thd_proc_info(thd, write_status_msg);
}
} }
{ {
int r = db_env->checkpointing_end_atomic_operation(db_env); int r = db_env->checkpointing_end_atomic_operation(db_env);
...@@ -3585,6 +3574,8 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { ...@@ -3585,6 +3574,8 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
bool has_null; bool has_null;
int cmp; int cmp;
u_int32_t flags; u_int32_t flags;
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
...@@ -3613,7 +3604,8 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { ...@@ -3613,7 +3604,8 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
if (cmp) { if (cmp) {
error = HA_ERR_END_OF_FILE; error = HA_ERR_END_OF_FILE;
} }
trx->stmt_progress.queried++;
track_progress(thd);
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -3643,6 +3635,8 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -3643,6 +3635,8 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
DBT lookup_key; DBT lookup_key;
int error; int error;
u_int32_t flags = 0; u_int32_t flags = 0;
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
struct smart_dbt_info info; struct smart_dbt_info info;
struct index_read_info ir_info; struct index_read_info ir_info;
...@@ -3718,6 +3712,9 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -3718,6 +3712,9 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR)) { if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR)) {
TOKUDB_TRACE("error:%d:%d\n", error, find_flag); TOKUDB_TRACE("error:%d:%d\n", error, find_flag);
} }
trx->stmt_progress.queried++;
track_progress(thd);
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -3738,6 +3735,8 @@ int ha_tokudb::index_next(uchar * buf) { ...@@ -3738,6 +3735,8 @@ int ha_tokudb::index_next(uchar * buf) {
int error; int error;
struct smart_dbt_info info; struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0); u_int32_t flags = SET_READ_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
...@@ -3753,6 +3752,8 @@ int ha_tokudb::index_next(uchar * buf) { ...@@ -3753,6 +3752,8 @@ int ha_tokudb::index_next(uchar * buf) {
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf); error = read_full_row(buf);
} }
trx->stmt_progress.queried++;
track_progress(thd);
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -3777,6 +3778,8 @@ int ha_tokudb::index_prev(uchar * buf) { ...@@ -3777,6 +3778,8 @@ int ha_tokudb::index_prev(uchar * buf) {
int error; int error;
struct smart_dbt_info info; struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0); u_int32_t flags = SET_READ_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
...@@ -3792,6 +3795,8 @@ int ha_tokudb::index_prev(uchar * buf) { ...@@ -3792,6 +3795,8 @@ int ha_tokudb::index_prev(uchar * buf) {
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf); error = read_full_row(buf);
} }
trx->stmt_progress.queried++;
track_progress(thd);
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
...@@ -3811,6 +3816,8 @@ int ha_tokudb::index_first(uchar * buf) { ...@@ -3811,6 +3816,8 @@ int ha_tokudb::index_first(uchar * buf) {
int error; int error;
struct smart_dbt_info info; struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0); u_int32_t flags = SET_READ_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_first_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_first_count, &LOCK_status);
...@@ -3827,6 +3834,8 @@ int ha_tokudb::index_first(uchar * buf) { ...@@ -3827,6 +3834,8 @@ int ha_tokudb::index_first(uchar * buf) {
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf); error = read_full_row(buf);
} }
trx->stmt_progress.queried++;
track_progress(thd);
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
...@@ -3846,6 +3855,8 @@ int ha_tokudb::index_last(uchar * buf) { ...@@ -3846,6 +3855,8 @@ int ha_tokudb::index_last(uchar * buf) {
int error; int error;
struct smart_dbt_info info; struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0); u_int32_t flags = SET_READ_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_last_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_last_count, &LOCK_status);
...@@ -3862,6 +3873,11 @@ int ha_tokudb::index_last(uchar * buf) { ...@@ -3862,6 +3873,11 @@ int ha_tokudb::index_last(uchar * buf) {
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf); error = read_full_row(buf);
} }
if (trx) {
trx->stmt_progress.queried++;
}
track_progress(thd);
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -3921,6 +3937,9 @@ int ha_tokudb::rnd_next(uchar * buf) { ...@@ -3921,6 +3937,9 @@ int ha_tokudb::rnd_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next"); TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next");
int error; int error;
u_int32_t flags = SET_READ_FLAG(0); u_int32_t flags = SET_READ_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
struct smart_dbt_info info; struct smart_dbt_info info;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
...@@ -3935,11 +3954,36 @@ int ha_tokudb::rnd_next(uchar * buf) { ...@@ -3935,11 +3954,36 @@ int ha_tokudb::rnd_next(uchar * buf) {
info.keynr = primary_key; info.keynr = primary_key;
error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,primary_key); error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,primary_key);
trx->stmt_progress.queried++;
track_progress(thd);
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
void ha_tokudb::track_progress(THD* thd) {
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
if (trx) {
bool update_status = (trx->stmt_progress.queried % 1000) == 1 ||
(trx->stmt_progress.inserted% 1000) == 1 ||
(trx->stmt_progress.updated% 1000) == 1 ||
(trx->stmt_progress.deleted% 1000) == 1;
if (update_status) {
sprintf(
write_status_msg,
"Queried about %llu rows, inserted about %llu rows, updated about %llu rows, deleted about %llu rows",
trx->stmt_progress.queried,
trx->stmt_progress.inserted,
trx->stmt_progress.updated,
trx->stmt_progress.deleted
);
thd_proc_info(thd, write_status_msg);
}
}
}
DBT *ha_tokudb::get_pos(DBT * to, uchar * pos) { DBT *ha_tokudb::get_pos(DBT * to, uchar * pos) {
TOKUDB_DBUG_ENTER("ha_tokudb::get_pos"); TOKUDB_DBUG_ENTER("ha_tokudb::get_pos");
/* We don't need to set app_data here */ /* We don't need to set app_data here */
...@@ -4359,6 +4403,7 @@ int ha_tokudb::create_txn(THD* thd, tokudb_trx_data* trx) { ...@@ -4359,6 +4403,7 @@ int ha_tokudb::create_txn(THD* thd, tokudb_trx_data* trx) {
if (tokudb_debug & TOKUDB_DEBUG_TXN) { if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("stmt:%p:%p\n", trx->sp_level, trx->stmt); TOKUDB_TRACE("stmt:%p:%p\n", trx->sp_level, trx->stmt);
} }
reset_stmt_progress(&trx->stmt_progress);
trans_register_ha(thd, FALSE, tokudb_hton); trans_register_ha(thd, FALSE, tokudb_hton);
cleanup: cleanup:
return error; return error;
...@@ -4392,12 +4437,6 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -4392,12 +4437,6 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
declare_lockretry; declare_lockretry;
#endif #endif
//
// reset per-stmt variables
//
num_added_rows_in_stmt = 0;
num_deleted_rows_in_stmt = 0;
num_updated_rows_in_stmt = 0;
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
if (!trx) { if (!trx) {
...@@ -4447,6 +4486,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -4447,6 +4486,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
*/ */
DBUG_PRINT("trans", ("commiting non-updating transaction")); DBUG_PRINT("trans", ("commiting non-updating transaction"));
error = trx->stmt->commit(trx->stmt, 0); error = trx->stmt->commit(trx->stmt, 0);
reset_stmt_progress(&trx->stmt_progress);
if (tokudb_debug & TOKUDB_DEBUG_TXN) if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("commit:%p:%d\n", trx->stmt, error); TOKUDB_TRACE("commit:%p:%d\n", trx->stmt, error);
trx->stmt = NULL; trx->stmt = NULL;
...@@ -4471,12 +4511,6 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { ...@@ -4471,12 +4511,6 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
TOKUDB_DBUG_ENTER("ha_tokudb::start_stmt"); TOKUDB_DBUG_ENTER("ha_tokudb::start_stmt");
int error = 0; int error = 0;
//
// reset per-stmt variables
//
num_added_rows_in_stmt = 0;
num_deleted_rows_in_stmt = 0;
num_updated_rows_in_stmt = 0;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
DBUG_ASSERT(trx); DBUG_ASSERT(trx);
......
...@@ -196,19 +196,6 @@ class ha_tokudb : public handler { ...@@ -196,19 +196,6 @@ class ha_tokudb : public handler {
ulonglong deleted_rows; ulonglong deleted_rows;
//
// count on number of rows inserted by statement
// this is to help give user progress on what is happening
// the reason that the variables added_rows and deleted_rows
// are not used is that those variables are also used to help
// estimate the number of rows in the DB. There are tricky things that
// can happen with "lock tables", so I do not want to couple these
// two features together. There is a little duplicate work, but I think it is fine
//
ulonglong num_added_rows_in_stmt;
ulonglong num_deleted_rows_in_stmt;
ulonglong num_updated_rows_in_stmt;
uint last_dup_key; uint last_dup_key;
// //
// if set to 0, then the primary key is not hidden // if set to 0, then the primary key is not hidden
...@@ -445,6 +432,8 @@ class ha_tokudb : public handler { ...@@ -445,6 +432,8 @@ class ha_tokudb : public handler {
return tokudb_prefix_cmp_dbt_key(share->key_file[keynr], first_key, second_key); return tokudb_prefix_cmp_dbt_key(share->key_file[keynr], first_key, second_key);
} }
void track_progress(THD* thd);
int heavi_ret_val; int heavi_ret_val;
// //
......
...@@ -83,17 +83,33 @@ typedef enum { ...@@ -83,17 +83,33 @@ typedef enum {
typedef struct st_tokudb_stmt_progress {
ulonglong inserted;
ulonglong updated;
ulonglong deleted;
ulonglong queried;
} tokudb_stmt_progress;
typedef struct st_tokudb_trx_data { typedef struct st_tokudb_trx_data {
DB_TXN *all; DB_TXN *all;
DB_TXN *stmt; DB_TXN *stmt;
DB_TXN *sp_level; DB_TXN *sp_level;
uint tokudb_lock_count; uint tokudb_lock_count;
HA_TOKU_ISO_LEVEL iso_level; HA_TOKU_ISO_LEVEL iso_level;
tokudb_stmt_progress stmt_progress;
} tokudb_trx_data; } tokudb_trx_data;
extern char *tokudb_data_dir; extern char *tokudb_data_dir;
extern const char *ha_tokudb_ext; extern const char *ha_tokudb_ext;
static void reset_stmt_progress (tokudb_stmt_progress* val) {
val->deleted = 0;
val->inserted = 0;
val->updated = 0;
val->queried = 0;
}
static int get_name_length(const char *name) { static int get_name_length(const char *name) {
int n = 0; int n = 0;
const char *newname = name; const char *newname = name;
......
...@@ -442,6 +442,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) { ...@@ -442,6 +442,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
if (all) { if (all) {
trx->iso_level = hatoku_iso_not_set; trx->iso_level = hatoku_iso_not_set;
} }
reset_stmt_progress(&trx->stmt_progress);
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -467,6 +468,7 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) { ...@@ -467,6 +468,7 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
if (all) { if (all) {
trx->iso_level = hatoku_iso_not_set; trx->iso_level = hatoku_iso_not_set;
} }
reset_stmt_progress(&trx->stmt_progress);
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment