Commit 96e95b54 authored by Monty's avatar Monty

Better SHOW PROCESSLIST for replication

- When waiting for events, start time is now counted from start of wait
- Instead of having "Connect" as "Command" for all replication threads we
  now have:
  - Slave_IO for Slave thread reading relay log
  - Slave_SQL for slave executing SQL commands or distribution queries to
    Slave workers
  - Slave_worker for slave threads executin SQL commands in parallel replication
parent eac7e575
......@@ -10,7 +10,10 @@ enum enum_server_command
COM_STMT_PREPARE, COM_STMT_EXECUTE, COM_STMT_SEND_LONG_DATA, COM_STMT_CLOSE,
COM_STMT_RESET, COM_SET_OPTION, COM_STMT_FETCH, COM_DAEMON,
COM_MDB_GAP_BEG,
COM_MDB_GAP_END=253,
COM_MDB_GAP_END=250,
COM_SLAVE_WORKER,
COM_SLAVE_IO,
COM_SLAVE_SQL,
COM_MULTI,
COM_END
};
......
......@@ -105,7 +105,10 @@ enum enum_server_command
COM_STMT_RESET, COM_SET_OPTION, COM_STMT_FETCH, COM_DAEMON,
/* don't forget to update const char *command_name[] in sql_parse.cc */
COM_MDB_GAP_BEG,
COM_MDB_GAP_END=253,
COM_MDB_GAP_END=250,
COM_SLAVE_WORKER,
COM_SLAVE_IO,
COM_SLAVE_SQL,
COM_MULTI,
/* Must be last */
COM_END
......
......@@ -307,6 +307,7 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
&stage_waiting_for_prior_transaction_to_start_commit,
old_stage);
*did_enter_cond= true;
thd->set_time_for_next_stage();
do
{
if (thd->check_killed() && !rgi->worker_error)
......@@ -369,6 +370,7 @@ do_ftwrl_wait(rpl_group_info *rgi,
thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
&stage_waiting_for_ftwrl, old_stage);
*did_enter_cond= true;
thd->set_time_for_next_stage();
do
{
if (entry->force_abort || rgi->worker_error)
......@@ -417,8 +419,11 @@ pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
*/
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
if (thd)
{
thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
&stage_waiting_for_rpl_thread_pool, &old_stage);
thd->set_time_for_next_stage();
}
while (pool->busy)
{
if (thd && thd->check_killed())
......@@ -534,6 +539,7 @@ rpl_pause_for_ftwrl(THD *thd)
e->pause_sub_id= e->largest_started_sub_id;
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
&stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
thd->set_time_for_next_stage();
while (e->pause_sub_id < (uint64)ULONGLONG_MAX &&
e->last_committed_sub_id < e->pause_sub_id &&
!err)
......@@ -995,7 +1001,6 @@ handle_rpl_parallel_thread(void *arg)
*/
thd->variables.tx_isolation= ISO_REPEATABLE_READ;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->thd= thd;
......@@ -1005,8 +1010,10 @@ handle_rpl_parallel_thread(void *arg)
rpt->running= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
thd->set_command(COM_SLAVE_WORKER);
while (!rpt->stop)
{
uint wait_count= 0;
rpl_parallel_thread::queued_event *qev, *next_qev;
thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
......@@ -1025,7 +1032,11 @@ handle_rpl_parallel_thread(void *arg)
(rpt->current_owner && !in_event_group) ||
(rpt->current_owner && group_rgi->parallel_entry->force_abort) ||
rpt->stop))
{
if (!wait_count++)
thd->set_time_for_next_stage();
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
}
rpt->dequeue1(events);
thd->EXIT_COND(&old_stage);
......
......@@ -4111,6 +4111,7 @@ pthread_handler_t handle_slave_io(void *arg)
}
DBUG_PRINT("info",("Starting reading binary log from master"));
thd->set_command(COM_SLAVE_IO);
while (!io_slave_killed(mi))
{
THD_STAGE_INFO(thd, stage_requesting_binlog_dump);
......@@ -4733,6 +4734,7 @@ pthread_handler_t handle_slave_sql(void *arg)
/* Read queries from the IO/THREAD until this thread is killed */
thd->set_command(COM_SLAVE_SQL);
while (!sql_slave_killed(serial_rgi))
{
THD_STAGE_INFO(thd, stage_reading_event_from_the_relay_log);
......
......@@ -3193,6 +3193,12 @@ class THD :public Statement,
}
ulonglong current_utime() { return microsecond_interval_timer(); }
/* Tell SHOW PROCESSLIST to show time from this point */
inline void set_time_for_next_stage()
{
utime_after_query= current_utime();
}
/**
Update server status after execution of a top level statement.
Currently only checks if a query was slow, and assigns
......@@ -3202,7 +3208,7 @@ class THD :public Statement,
*/
void update_server_status()
{
utime_after_query= current_utime();
set_time_for_next_stage();
if (utime_after_query > utime_after_lock + variables.long_query_time)
server_status|= SERVER_QUERY_WAS_SLOW;
}
......
......@@ -392,9 +392,9 @@ const LEX_STRING command_name[257]={
{ 0, 0 }, //248
{ 0, 0 }, //249
{ 0, 0 }, //250
{ 0, 0 }, //251
{ 0, 0 }, //252
{ 0, 0 }, //253
{ C_STRING_WITH_LEN("Slave_worker") }, //251
{ C_STRING_WITH_LEN("Slave_IO") }, //252
{ C_STRING_WITH_LEN("Slave_SQL") }, //253
{ C_STRING_WITH_LEN("Com_multi") }, //254
{ C_STRING_WITH_LEN("Error") } // Last command number 255
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment