Commit c47ee569 authored by unknown's avatar unknown

Clearer states in SHOW PROCESSLIST for replication threads.

For example the Binlog_dump thread (on the master) sometimes showed "Slave:".
And there were confusing messages where "binlog" was employed instead
of "relay log".


sql/log.cc:
  MYSQL_LOG::wait_for_update() is used by the binlog_dump and I/Oslave threads,
  and it updates thd->proc_info, so we need a bool to not show the same
  proc_info for 2 different things (previously we showed "Slave: etc" and that's
  bad for a binlog_dump thread).
sql/slave.cc:
  Clearer thd-proc_info for slave threads.
sql/sql_class.h:
  prototype change
sql/sql_repl.cc:
  clearer thd->proc_info for binlog_dump thread
parent 89e95d0c
...@@ -1526,6 +1526,9 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length, ...@@ -1526,6 +1526,9 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
SYNOPSIS SYNOPSIS
wait_for_update() wait_for_update()
thd Thread variable thd Thread variable
master_or_slave If 0, the caller is the Binlog_dump thread from master;
if 1, the caller is the SQL thread from the slave. This
influences only thd->proc_info.
NOTES NOTES
One must have a lock on LOCK_log before calling this function. One must have a lock on LOCK_log before calling this function.
...@@ -1538,11 +1541,15 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length, ...@@ -1538,11 +1541,15 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
*/ */
void MYSQL_LOG:: wait_for_update(THD* thd) void MYSQL_LOG:: wait_for_update(THD* thd, bool master_or_slave)
{ {
safe_mutex_assert_owner(&LOCK_log); safe_mutex_assert_owner(&LOCK_log);
const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log, const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log,
"Slave: waiting for binlog update"); master_or_slave ?
"Has read all relay log; waiting for \
the I/O slave thread to update it" :
"Has sent all binlog to slave; \
waiting for binlog to be updated");
pthread_cond_wait(&update_cond, &LOCK_log); pthread_cond_wait(&update_cond, &LOCK_log);
pthread_mutex_unlock(&LOCK_log); // See NOTES pthread_mutex_unlock(&LOCK_log); // See NOTES
thd->exit_cond(old_msg); thd->exit_cond(old_msg);
......
...@@ -1443,7 +1443,8 @@ static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli) ...@@ -1443,7 +1443,8 @@ static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
pthread_mutex_lock(&rli->log_space_lock); pthread_mutex_lock(&rli->log_space_lock);
const char* save_proc_info= thd->enter_cond(&rli->log_space_cond, const char* save_proc_info= thd->enter_cond(&rli->log_space_cond,
&rli->log_space_lock, &rli->log_space_lock,
"Waiting for relay log space to free"); "Waiting for the SQL slave \
thread to free enough relay log space");
while (rli->log_space_limit < rli->log_space_total && while (rli->log_space_limit < rli->log_space_total &&
!(slave_killed=io_slave_killed(thd,mi)) && !(slave_killed=io_slave_killed(thd,mi)) &&
!rli->ignore_log_space_limit) !rli->ignore_log_space_limit)
...@@ -1925,7 +1926,8 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, ...@@ -1925,7 +1926,8 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
DBUG_PRINT("info",("Waiting for master update")); DBUG_PRINT("info",("Waiting for master update"));
const char* msg = thd->enter_cond(&data_cond, &data_lock, const char* msg = thd->enter_cond(&data_cond, &data_lock,
"Waiting for master update"); "Waiting for the SQL slave thread to \
advance position");
/* /*
We are going to pthread_cond_(timed)wait(); if the SQL thread stops it We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
will wake us up. will wake us up.
...@@ -2016,7 +2018,7 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) ...@@ -2016,7 +2018,7 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
#endif #endif
if (thd_type == SLAVE_THD_SQL) if (thd_type == SLAVE_THD_SQL)
thd->proc_info= "Waiting for the next event in slave queue"; thd->proc_info= "Waiting for the next event in relay log";
else else
thd->proc_info= "Waiting for master update"; thd->proc_info= "Waiting for master update";
thd->version=refresh_version; thd->version=refresh_version;
...@@ -2338,7 +2340,7 @@ slave_begin: ...@@ -2338,7 +2340,7 @@ slave_begin:
} }
thd->proc_info = "connecting to master"; thd->proc_info = "Connecting to master";
// we can get killed during safe_connect // we can get killed during safe_connect
if (!safe_connect(thd, mysql, mi)) if (!safe_connect(thd, mysql, mi))
sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\ sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\
...@@ -2385,7 +2387,7 @@ dump"); ...@@ -2385,7 +2387,7 @@ dump");
goto err; goto err;
} }
thd->proc_info = "Waiiting to reconnect after a failed dump request"; thd->proc_info= "Waiting to reconnect after a failed binlog dump request";
mc_end_server(mysql); mc_end_server(mysql);
/* /*
First time retry immediately, assuming that we can recover First time retry immediately, assuming that we can recover
...@@ -2406,7 +2408,7 @@ dump"); ...@@ -2406,7 +2408,7 @@ dump");
goto err; goto err;
} }
thd->proc_info = "Reconnecting after a failed dump request"; thd->proc_info = "Reconnecting after a failed binlog dump request";
if (!suppress_warnings) if (!suppress_warnings)
sql_print_error("Slave I/O thread: failed dump request, \ sql_print_error("Slave I/O thread: failed dump request, \
reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME, reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
...@@ -2425,7 +2427,13 @@ after reconnect"); ...@@ -2425,7 +2427,13 @@ after reconnect");
while (!io_slave_killed(thd,mi)) while (!io_slave_killed(thd,mi))
{ {
bool suppress_warnings= 0; bool suppress_warnings= 0;
thd->proc_info = "Reading master update"; /*
We say "waiting" because read_event() will wait if there's nothing to
read. But if there's something to read, it will not wait. The important
thing is to not confuse users by saying "reading" whereas we're in fact
receiving nothing.
*/
thd->proc_info = "Waiting for master to send event";
ulong event_len = read_event(mysql, mi, &suppress_warnings); ulong event_len = read_event(mysql, mi, &suppress_warnings);
if (io_slave_killed(thd,mi)) if (io_slave_killed(thd,mi))
{ {
...@@ -2452,7 +2460,8 @@ max_allowed_packet", ...@@ -2452,7 +2460,8 @@ max_allowed_packet",
mc_mysql_error(mysql)); mc_mysql_error(mysql));
goto err; goto err;
} }
thd->proc_info = "Waiting to reconnect after a failed read"; thd->proc_info = "Waiting to reconnect after a failed master event \
read";
mc_end_server(mysql); mc_end_server(mysql);
if (retry_count++) if (retry_count++)
{ {
...@@ -2468,7 +2477,7 @@ max_allowed_packet", ...@@ -2468,7 +2477,7 @@ max_allowed_packet",
reconnect after a failed read"); reconnect after a failed read");
goto err; goto err;
} }
thd->proc_info = "Reconnecting after a failed read"; thd->proc_info = "Reconnecting after a failed master event read";
if (!suppress_warnings) if (!suppress_warnings)
sql_print_error("Slave I/O thread: Failed reading log event, \ sql_print_error("Slave I/O thread: Failed reading log event, \
reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME, reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
...@@ -2485,7 +2494,7 @@ reconnect done to recover from failed read"); ...@@ -2485,7 +2494,7 @@ reconnect done to recover from failed read");
} // if (event_len == packet_error) } // if (event_len == packet_error)
retry_count=0; // ok event, reset retry counter retry_count=0; // ok event, reset retry counter
thd->proc_info = "Queueing event from master"; thd->proc_info = "Queueing master event to the relay log";
if (queue_event(mi,(const char*)mysql->net.read_pos + 1, if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
event_len)) event_len))
{ {
...@@ -2667,7 +2676,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, ...@@ -2667,7 +2676,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
while (!sql_slave_killed(thd,rli)) while (!sql_slave_killed(thd,rli))
{ {
thd->proc_info = "Processing master log event"; thd->proc_info = "Reading event from the relay log";
DBUG_ASSERT(rli->sql_thd == thd); DBUG_ASSERT(rli->sql_thd == thd);
THD_CHECK_SENTRY(thd); THD_CHECK_SENTRY(thd);
if (exec_relay_log_event(thd,rli)) if (exec_relay_log_event(thd,rli))
...@@ -3396,7 +3405,7 @@ rli->relay_log_pos=%s rli->pending=%lu", ...@@ -3396,7 +3405,7 @@ rli->relay_log_pos=%s rli->pending=%lu",
pthread_mutex_unlock(&rli->log_space_lock); pthread_mutex_unlock(&rli->log_space_lock);
pthread_cond_broadcast(&rli->log_space_cond); pthread_cond_broadcast(&rli->log_space_cond);
// Note that wait_for_update unlocks lock_log ! // Note that wait_for_update unlocks lock_log !
rli->relay_log.wait_for_update(rli->sql_thd); rli->relay_log.wait_for_update(rli->sql_thd, 1);
// re-acquire data lock since we released it earlier // re-acquire data lock since we released it earlier
pthread_mutex_lock(&rli->data_lock); pthread_mutex_lock(&rli->data_lock);
continue; continue;
......
...@@ -123,7 +123,7 @@ public: ...@@ -123,7 +123,7 @@ public:
} }
void set_max_size(ulong max_size_arg); void set_max_size(ulong max_size_arg);
void signal_update() { pthread_cond_broadcast(&update_cond);} void signal_update() { pthread_cond_broadcast(&update_cond);}
void wait_for_update(THD* thd); void wait_for_update(THD* thd, bool master_or_slave);
void set_need_start_event() { need_start_event = 1; } void set_need_start_event() { need_start_event = 1; }
void init(enum_log_type log_type_arg, void init(enum_log_type log_type_arg,
enum cache_type io_cache_type_arg, enum cache_type io_cache_type_arg,
......
...@@ -532,7 +532,7 @@ Increase max_allowed_packet on master"; ...@@ -532,7 +532,7 @@ Increase max_allowed_packet on master";
if (!thd->killed) if (!thd->killed)
{ {
/* Note that the following call unlocks lock_log */ /* Note that the following call unlocks lock_log */
mysql_bin_log.wait_for_update(thd); mysql_bin_log.wait_for_update(thd, 0);
} }
else else
pthread_mutex_unlock(log_lock); pthread_mutex_unlock(log_lock);
...@@ -547,7 +547,7 @@ Increase max_allowed_packet on master"; ...@@ -547,7 +547,7 @@ Increase max_allowed_packet on master";
if (read_packet) if (read_packet)
{ {
thd->proc_info = "sending update to slave"; thd->proc_info = "Sending binlog event to slave";
if (my_net_write(net, (char*)packet->ptr(), packet->length()) ) if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
{ {
errmsg = "Failed on my_net_write()"; errmsg = "Failed on my_net_write()";
...@@ -584,7 +584,7 @@ Increase max_allowed_packet on master"; ...@@ -584,7 +584,7 @@ Increase max_allowed_packet on master";
{ {
bool loop_breaker = 0; bool loop_breaker = 0;
// need this to break out of the for loop from switch // need this to break out of the for loop from switch
thd->proc_info = "switching to next log"; thd->proc_info = "Finished reading one binlog; switching to next binlog";
switch (mysql_bin_log.find_next_log(&linfo, 1)) { switch (mysql_bin_log.find_next_log(&linfo, 1)) {
case LOG_INFO_EOF: case LOG_INFO_EOF:
loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK); loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
...@@ -623,14 +623,14 @@ end: ...@@ -623,14 +623,14 @@ end:
(void)my_close(file, MYF(MY_WME)); (void)my_close(file, MYF(MY_WME));
send_eof(&thd->net); send_eof(&thd->net);
thd->proc_info = "waiting to finalize termination"; thd->proc_info = "Waiting to finalize termination";
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0; thd->current_linfo = 0;
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
err: err:
thd->proc_info = "waiting to finalize termination"; thd->proc_info = "Waiting to finalize termination";
end_io_cache(&log); end_io_cache(&log);
/* /*
Exclude iteration through thread list Exclude iteration through thread list
...@@ -866,7 +866,7 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -866,7 +866,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
DBUG_RETURN(1); DBUG_RETURN(1);
} }
thd->proc_info = "changing master"; thd->proc_info = "Changing master";
LEX_MASTER_INFO* lex_mi = &thd->lex.mi; LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
// TODO: see if needs re-write // TODO: see if needs re-write
if (init_master_info(mi, master_info_file, relay_log_info_file, 0)) if (init_master_info(mi, master_info_file, relay_log_info_file, 0))
...@@ -932,7 +932,7 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -932,7 +932,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (need_relay_log_purge) if (need_relay_log_purge)
{ {
mi->rli.skip_log_purge= 0; mi->rli.skip_log_purge= 0;
thd->proc_info="purging old relay logs"; thd->proc_info="Purging old relay logs";
if (purge_relay_logs(&mi->rli, thd, if (purge_relay_logs(&mi->rli, thd,
0 /* not only reset, but also reinit */, 0 /* not only reset, but also reinit */,
&errmsg)) &errmsg))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment