Commit 4b00b3f0 authored by unknown's avatar unknown

BUG#23171 (Illegal slave restart position):

Third patch of the bug fix where the code for skipping events and for
executing events is factored out into three functions:
- shall_skip() to decide if the event shall be skipped and the
  reason for it;
- do_apply_event(), where the event is applied to the database; and
- do_update_pos(), which updates the actual relay log position and
  group positions.


mysql-test/r/rpl_row_tabledefs_2myisam.result:
  Result change.
mysql-test/r/rpl_row_tabledefs_3innodb.result:
  Result change.
sql/log_event.cc:
  Creating shall_skip(), do_update_pos(), and do_apply_event()
  functions for each event by factoring out the previous code.
  Adding debug code and fixing some error codes that were not correct.
sql/rpl_rli.cc:
  Renaming unsafe_to_stop_at into last_event_start_time.
  Adding debug code.
sql/rpl_rli.h:
  Renaming unsafe_to_stop_at into last_event_start_time.
sql/slave.cc:
  Renaming unsafe_to_stop_at into last_event_start_time.
parent baaa102d
...@@ -121,7 +121,7 @@ Replicate_Do_Table ...@@ -121,7 +121,7 @@ Replicate_Do_Table
Replicate_Ignore_Table Replicate_Ignore_Table
Replicate_Wild_Do_Table Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table Replicate_Wild_Ignore_Table
Last_Errno 1364 Last_Errno 1105
Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef
Skip_Counter 0 Skip_Counter 0
Exec_Master_Log_Pos # Exec_Master_Log_Pos #
......
...@@ -121,7 +121,7 @@ Replicate_Do_Table ...@@ -121,7 +121,7 @@ Replicate_Do_Table
Replicate_Ignore_Table Replicate_Ignore_Table
Replicate_Wild_Do_Table Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table Replicate_Wild_Ignore_Table
Last_Errno 1364 Last_Errno 1105
Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef
Skip_Counter 0 Skip_Counter 0
Exec_Master_Log_Pos # Exec_Master_Log_Pos #
......
This diff is collapsed.
...@@ -36,7 +36,7 @@ st_relay_log_info::st_relay_log_info() ...@@ -36,7 +36,7 @@ st_relay_log_info::st_relay_log_info()
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0), tables_to_lock(0), tables_to_lock_count(0),
unsafe_to_stop_at(0) last_event_start_time(0)
{ {
DBUG_ENTER("st_relay_log_info::st_relay_log_info"); DBUG_ENTER("st_relay_log_info::st_relay_log_info");
...@@ -1001,6 +1001,22 @@ bool st_relay_log_info::is_until_satisfied() ...@@ -1001,6 +1001,22 @@ bool st_relay_log_info::is_until_satisfied()
log_pos= group_relay_log_pos; log_pos= group_relay_log_pos;
} }
#ifndef DBUG_OFF
{
char buf[32];
DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
group_master_log_name, llstr(group_master_log_pos, buf)));
DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
group_relay_log_name, llstr(group_relay_log_pos, buf)));
DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
log_name, llstr(log_pos, buf)));
DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
until_log_name, llstr(until_log_pos, buf)));
}
#endif
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN) if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
{ {
/* /*
...@@ -1095,7 +1111,7 @@ void st_relay_log_info::cleanup_context(THD *thd, bool error) ...@@ -1095,7 +1111,7 @@ void st_relay_log_info::cleanup_context(THD *thd, bool error)
m_table_map.clear_tables(); m_table_map.clear_tables();
close_thread_tables(thd); close_thread_tables(thd);
clear_tables_to_lock(); clear_tables_to_lock();
unsafe_to_stop_at= 0; last_event_start_time= 0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
#endif #endif
...@@ -305,7 +305,14 @@ typedef struct st_relay_log_info ...@@ -305,7 +305,14 @@ typedef struct st_relay_log_info
DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0); DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
} }
time_t unsafe_to_stop_at; /*
Used by row-based replication to detect that it should not stop at
this event, but give it a chance to send more events. The time
where the last event inside a group started is stored here. If the
variable is zero, we are not in a group (but may be in a
transaction).
*/
time_t last_event_start_time;
} RELAY_LOG_INFO; } RELAY_LOG_INFO;
......
...@@ -517,11 +517,11 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli) ...@@ -517,11 +517,11 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
really one minute of idleness, we don't timeout if the slave SQL thread really one minute of idleness, we don't timeout if the slave SQL thread
is actively working. is actively working.
*/ */
if (!rli->unsafe_to_stop_at) if (rli->last_event_start_time == 0)
DBUG_RETURN(1); DBUG_RETURN(1);
DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving " DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
"it some grace period")); "it some grace period"));
if (difftime(time(0), rli->unsafe_to_stop_at) > 60) if (difftime(time(0), rli->last_event_start_time) > 60)
{ {
slave_print_msg(ERROR_LEVEL, rli, 0, slave_print_msg(ERROR_LEVEL, rli, 0,
"SQL thread had to stop in an unsafe situation, in " "SQL thread had to stop in an unsafe situation, in "
...@@ -1737,61 +1737,14 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) ...@@ -1737,61 +1737,14 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
now the relay log starts with its Format_desc, has a Rotate etc). now the relay log starts with its Format_desc, has a Rotate etc).
*/ */
DBUG_PRINT("info",("type_code=%d, server_id=%d",type_code,ev->server_id)); DBUG_PRINT("info",("type_code=%d (%s), server_id=%d",
type_code, ev->get_type_str(), ev->server_id));
if ((ev->server_id == (uint32) ::server_id &&
!replicate_same_server_id &&
type_code != FORMAT_DESCRIPTION_EVENT) ||
(rli->slave_skip_counter &&
type_code != ROTATE_EVENT && type_code != STOP_EVENT &&
type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT))
{
DBUG_PRINT("info", ("event skipped"));
/*
We only skip the event here and do not increase the group log
position. In the event that we have to restart, this means
that we might have to skip the event again, but that is a
minor issue.
If we were to increase the group log position when skipping an
event, it might be that we are restarting at the wrong
position and have events before that we should have executed,
so not increasing the group log position is a sure bet in this
case.
In this way, we just step the group log position when we
*know* that we are at the end of a group.
*/
rli->inc_event_relay_log_pos();
/* /*
Protect against common user error of setting the counter to 1 Execute the event, but first we set some data that is needed for
instead of 2 while recovering from an insert which used auto_increment, the thread.
rand or user var. */
*/
if (rli->slave_skip_counter &&
!((type_code == INTVAR_EVENT ||
type_code == RAND_EVENT ||
type_code == USER_VAR_EVENT) &&
rli->slave_skip_counter == 1) &&
/*
The events from ourselves which have something to do with the relay
log itself must be skipped, true, but they mustn't decrement
rli->slave_skip_counter, because the user is supposed to not see
these events (they are not in the master's binlog) and if we
decremented, START SLAVE would for example decrement when it sees
the Rotate, so the event which the user probably wanted to skip
would not be skipped.
*/
!(ev->server_id == (uint32) ::server_id &&
(type_code == ROTATE_EVENT || type_code == STOP_EVENT ||
type_code == START_EVENT_V3 || type_code == FORMAT_DESCRIPTION_EVENT)))
--rli->slave_skip_counter;
pthread_mutex_unlock(&rli->data_lock);
delete ev;
DBUG_RETURN(0); // avoid infinite update loops
}
pthread_mutex_unlock(&rli->data_lock);
thd->server_id = ev->server_id; // use the original server id for logging thd->server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query thd->set_time(); // time the query
...@@ -1799,7 +1752,8 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) ...@@ -1799,7 +1752,8 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
if (!ev->when) if (!ev->when)
ev->when = time(NULL); ev->when = time(NULL);
ev->thd = thd; // because up to this point, ev->thd == 0 ev->thd = thd; // because up to this point, ev->thd == 0
exec_res = ev->exec_event(rli);
exec_res= ev->exec_event(rli);
DBUG_PRINT("info", ("exec_event result = %d", exec_res)); DBUG_PRINT("info", ("exec_event result = %d", exec_res));
DBUG_ASSERT(rli->sql_thd==thd); DBUG_ASSERT(rli->sql_thd==thd);
/* /*
...@@ -2354,13 +2308,17 @@ Slave SQL thread aborted. Can't execute init_slave query"); ...@@ -2354,13 +2308,17 @@ Slave SQL thread aborted. Can't execute init_slave query");
THD_CHECK_SENTRY(thd); THD_CHECK_SENTRY(thd);
if (exec_relay_log_event(thd,rli)) if (exec_relay_log_event(thd,rli))
{ {
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped // do not scare the user if SQL thread was simply killed or stopped
if (!sql_slave_killed(thd,rli)) if (!sql_slave_killed(thd,rli))
{ {
/* /*
retrieve as much info as possible from the thd and, error codes and warnings retrieve as much info as possible from the thd and, error
and print this to the error log as to allow the user to locate the error codes and warnings and print this to the error log as to
allow the user to locate the error
*/ */
DBUG_PRINT("info", ("thd->net.last_errno=%d; rli->last_slave_errno=%d",
thd->net.last_errno, rli->last_slave_errno));
if (thd->net.last_errno != 0) if (thd->net.last_errno != 0)
{ {
if (rli->last_slave_errno == 0) if (rli->last_slave_errno == 0)
...@@ -2682,6 +2640,7 @@ static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf, ...@@ -2682,6 +2640,7 @@ static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf,
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR)); my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
pthread_mutex_lock(&mi->data_lock); pthread_mutex_lock(&mi->data_lock);
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */ ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
switch (ev->get_type_code()) { switch (ev->get_type_code()) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment