Commit 15f46d51 authored by Sachin Setiya's avatar Sachin Setiya

MDEV-7409 On RBR, extend the PROCESSLIST info to include at least the name of...

MDEV-7409 On RBR, extend the PROCESSLIST info to include at least the name of the recently used table

When RBR is used, add the db name to db Field and table name to Status
Field  of the "SHOW FULL PROCESSLIST" command for SQL thread.
parent b7b4c332
include/master-slave.inc
[connection master]
connection master;
create table t1(a int primary key);
connection slave;
SET GLOBAL debug_dbug="+d,should_sleep_for_mdev7409";
select * from t1;
a
connection master;
insert into t1(a) values(1);
#monitoring write rows
connection slave;
SELECT db , state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Write_rows_log_event::write_row(%) on table t1";
db state
test Write_rows_log_event::write_row(-1) on table t1
#monitoring update rows
connection master;
update t1 set a = a + 4194304 ;
connection slave;
SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::find_row(%) on table t1";
db state
test Update_rows_log_event::find_row(-1) on table t1
SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::unpack_current_row(%) on table t1";
db state
test Update_rows_log_event::unpack_current_row(-1) on table t1
SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::ha_update_row(%) on table t1";
db state
test Update_rows_log_event::ha_update_row(-1) on table t1
#monitoring delete rows
connection master;
delete from t1 where a>1;
connection slave;
SELECT db , state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::find_row(%) on table t1";
db state
test Delete_rows_log_event::find_row(-1) on table t1
SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::ha_delete_row(%) on table t1";
db state
test Delete_rows_log_event::ha_delete_row(-1) on table t1
connection master;
drop table t1;
connection slave;
SET GLOBAL debug_dbug="";
include/rpl_end.inc
connection server_2;
connection server_2;
connection server_2;
connection server_2;
connection server_1;
connection server_1;
connection server_1;
connection server_2;
connection server_1;
connection server_2;
connection server_2;
connection server_1;
connection server_1;
--source include/have_innodb.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
--enable_connect_log
--connection master
create table t1(a int primary key);
--save_master_pos
--connection slave
--sync_with_master
SET GLOBAL debug_dbug="+d,should_sleep_for_mdev7409";
select * from t1;
--connection master
insert into t1(a) values(1);
--save_master_pos
--echo #monitoring write rows
--connection slave
let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Write_rows_log_event::write_row(%) on table t1";
--source include/wait_condition.inc
SELECT db , state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Write_rows_log_event::write_row(%) on table t1";
--sync_with_master
--echo #monitoring update rows
--connection master
update t1 set a = a + 4194304 ;
--connection slave
let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::find_row(%) on table t1";
--source include/wait_condition.inc
SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::find_row(%) on table t1";
let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::unpack_current_row(%) on table t1";
--source include/wait_condition.inc
SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::unpack_current_row(%) on table t1";
let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::ha_update_row(%) on table t1";
--source include/wait_condition.inc
SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::ha_update_row(%) on table t1";
--source include/wait_condition.inc
--sync_with_master
--echo #monitoring delete rows
--connection master
delete from t1 where a>1;
--connection slave
let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::find_row(%) on table t1";
--source include/wait_condition.inc
SELECT db , state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::find_row(%) on table t1";
let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::ha_delete_row(%) on table t1";
--source include/wait_condition.inc
SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::ha_delete_row(%) on table t1";
--sync_with_master
#CleanUp
--connection master
drop table t1;
--connection slave
SET GLOBAL debug_dbug="";
--source include/rpl_end.inc
...@@ -11765,18 +11765,27 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi) ...@@ -11765,18 +11765,27 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi)
{ {
DBUG_ASSERT(m_table != NULL); DBUG_ASSERT(m_table != NULL);
const char *tmp= thd->get_proc_info(); const char *tmp= thd->get_proc_info();
const char *message= "Write_rows_log_event::write_row()"; char *tmp_db= thd->db;
char *message, msg[128];
my_snprintf(msg, sizeof(msg),"Write_rows_log_event::write_row() on table %s",
m_table->s->table_name.str);
thd->db= m_table->s->db.str;
message= msg;
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"Write_rows_log_event::write_row(%lld)", "Write_rows_log_event::write_row(%lld) on table %s",
(long long) wsrep_thd_trx_seqno(thd)); (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str);
message= thd->wsrep_info; message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */ #endif /* WSREP_PROC_INFO */
thd_proc_info(thd, message); thd_proc_info(thd, message);
DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
my_sleep(500000);
};);
int error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT); int error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
thd_proc_info(thd, tmp); thd_proc_info(thd, tmp);
thd->db= tmp_db;
if (error && !thd->is_error()) if (error && !thd->is_error())
{ {
...@@ -12372,32 +12381,45 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) ...@@ -12372,32 +12381,45 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
{ {
int error; int error;
const char *tmp= thd->get_proc_info(); const char *tmp= thd->get_proc_info();
const char *message= "Delete_rows_log_event::find_row()"; char *tmp_db= thd->db;
char *message, msg[128];
my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::find_row() on table %s",
m_table->s->table_name.str);
thd->db= m_table->s->db.str;
message= msg;
const bool invoke_triggers= const bool invoke_triggers=
slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers; slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers;
DBUG_ASSERT(m_table != NULL); DBUG_ASSERT(m_table != NULL);
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"Delete_rows_log_event::find_row(%lld)", "Delete_rows_log_event::find_row(%lld) on table %s",
(long long) wsrep_thd_trx_seqno(thd)); (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ;
message= thd->wsrep_info; message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */ #endif /* WSREP_PROC_INFO */
thd_proc_info(thd, message); thd_proc_info(thd, message);
DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
my_sleep(500000);
};);
if (!(error= find_row(rgi))) if (!(error= find_row(rgi)))
{ {
/* /*
Delete the record found, located in record[0] Delete the record found, located in record[0]
*/ */
message= "Delete_rows_log_event::ha_delete_row()"; my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::ha_delete_row() on table %s",
m_table->s->table_name.str);
message= msg;
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"Delete_rows_log_event::ha_delete_row(%lld)", "Delete_rows_log_event::ha_delete_row(%lld) on table %s",
(long long) wsrep_thd_trx_seqno(thd)); (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ;
message= thd->wsrep_info; message= thd->wsrep_info;
#endif #endif
thd_proc_info(thd, message); thd_proc_info(thd, message);
DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
my_sleep(500000);
};);
if (invoke_triggers && if (invoke_triggers &&
process_triggers(TRG_EVENT_DELETE, TRG_ACTION_BEFORE, FALSE)) process_triggers(TRG_EVENT_DELETE, TRG_ACTION_BEFORE, FALSE))
...@@ -12414,6 +12436,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) ...@@ -12414,6 +12436,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
m_table->file->ha_index_or_rnd_end(); m_table->file->ha_index_or_rnd_end();
} }
thd_proc_info(thd, tmp); thd_proc_info(thd, tmp);
thd->db= tmp_db;
return error; return error;
} }
...@@ -12532,17 +12555,26 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) ...@@ -12532,17 +12555,26 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
const bool invoke_triggers= const bool invoke_triggers=
slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers; slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers;
const char *tmp= thd->get_proc_info(); const char *tmp= thd->get_proc_info();
const char *message= "Update_rows_log_event::find_row()"; char *tmp_db= thd->db;
char *message, msg[128];
DBUG_ASSERT(m_table != NULL); DBUG_ASSERT(m_table != NULL);
my_snprintf(msg, sizeof(msg),"Update_rows_log_event::find_row() on table %s",
m_table->s->table_name.str);
thd->db= m_table->s->db.str;
message= msg;
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"Update_rows_log_event::find_row(%lld)", "Update_rows_log_event::find_row(%lld) on table %s",
(long long) wsrep_thd_trx_seqno(thd)); (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ;
message= thd->wsrep_info; message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */ #endif /* WSREP_PROC_INFO */
thd_proc_info(thd, message); thd_proc_info(thd, message);
DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
my_sleep(500000);
};);
int error= find_row(rgi); int error= find_row(rgi);
if (error) if (error)
{ {
...@@ -12553,6 +12585,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) ...@@ -12553,6 +12585,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
if ((m_curr_row= m_curr_row_end)) if ((m_curr_row= m_curr_row_end))
unpack_current_row(rgi, &m_cols_ai); unpack_current_row(rgi, &m_cols_ai);
thd_proc_info(thd, tmp); thd_proc_info(thd, tmp);
thd->db= tmp_db;
return error; return error;
} }
...@@ -12570,16 +12603,21 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) ...@@ -12570,16 +12603,21 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
store_record(m_table,record[1]); store_record(m_table,record[1]);
m_curr_row= m_curr_row_end; m_curr_row= m_curr_row_end;
message= "Update_rows_log_event::unpack_current_row()"; my_snprintf(msg, sizeof(msg),"Update_rows_log_event::unpack_current_row() on table %s",
m_table->s->table_name.str);
message= msg;
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"Update_rows_log_event::unpack_current_row(%lld)", "Update_rows_log_event::unpack_current_row(%lld) on table %s",
(long long) wsrep_thd_trx_seqno(thd)); (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ;
message= thd->wsrep_info; message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */ #endif /* WSREP_PROC_INFO */
/* this also updates m_curr_row_end */ /* this also updates m_curr_row_end */
thd_proc_info(thd, message); thd_proc_info(thd, message);
DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
my_sleep(500000);
};);
if ((error= unpack_current_row(rgi, &m_cols_ai))) if ((error= unpack_current_row(rgi, &m_cols_ai)))
goto err; goto err;
...@@ -12597,15 +12635,20 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) ...@@ -12597,15 +12635,20 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength); DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
#endif #endif
message= "Update_rows_log_event::ha_update_row()"; my_snprintf(msg, sizeof(msg),"Update_rows_log_event::ha_update_row() on table %s",
m_table->s->table_name.str);
message= msg;
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"Update_rows_log_event::ha_update_row(%lld)", "Update_rows_log_event::ha_update_row(%lld) on table %s",
(long long) wsrep_thd_trx_seqno(thd)); (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ;
message= thd->wsrep_info; message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */ #endif /* WSREP_PROC_INFO */
thd_proc_info(thd, message); thd_proc_info(thd, message);
DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
my_sleep(500000);
};);
if (invoke_triggers && if (invoke_triggers &&
process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_BEFORE, TRUE)) process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_BEFORE, TRUE))
{ {
...@@ -12627,9 +12670,9 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) ...@@ -12627,9 +12670,9 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER, TRUE)) process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER, TRUE))
error= HA_ERR_GENERIC; // in case if error is not set yet error= HA_ERR_GENERIC; // in case if error is not set yet
thd_proc_info(thd, tmp);
err: err:
thd_proc_info(thd, tmp);
thd->db= tmp_db;
m_table->file->ha_index_or_rnd_end(); m_table->file->ha_index_or_rnd_end();
return error; return error;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment