Commit 3920f529 authored by mats@romeo.(none)'s avatar mats@romeo.(none)

BUG#23171 (Illegal slave restart position):

Third patch of the bug fix where the code for skipping events and for
executing events is factored out into three functions:
- shall_skip() to decide if the event shall be skipped and the
  reason for it;
- do_apply_event(), where the event is applied to the database; and
- do_update_pos(), which updates the actual relay log position and
  group positions.
parent 1fb9105e
......@@ -121,7 +121,7 @@ Replicate_Do_Table
Replicate_Ignore_Table
Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table
Last_Errno 1364
Last_Errno 1105
Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef
Skip_Counter 0
Exec_Master_Log_Pos #
......
......@@ -121,7 +121,7 @@ Replicate_Do_Table
Replicate_Ignore_Table
Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table
Last_Errno 1364
Last_Errno 1105
Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef
Skip_Counter 0
Exec_Master_Log_Pos #
......
......@@ -89,9 +89,10 @@ public:
operator&()
DESCRIPTION
Function to return a pointer to the internal, so that the object
can be treated as a IO_CACHE and used with the my_b_* IO_CACHE
functions
Function to return a pointer to the internal cache, so that the
object can be treated as a IO_CACHE and used with the my_b_*
IO_CACHE functions
RETURN VALUE
A pointer to the internal IO_CACHE.
......@@ -593,6 +594,19 @@ int Log_event::do_update_pos(RELAY_LOG_INFO *rli)
return 0; // Cannot fail currently
}
Log_event::enum_skip_reason
Log_event::shall_skip(RELAY_LOG_INFO *rli)
{
if (this->server_id == ::server_id && !replicate_same_server_id)
return EVENT_SKIP_SAME_SID;
else if (rli->slave_skip_counter > 0)
return EVENT_SKIP_COUNT;
else
return EVENT_NOT_SKIPPED;
}
/*
Log_event::pack_info()
*/
......@@ -736,7 +750,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
ulong data_len;
int result=0;
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
DBUG_ENTER("read_log_event");
DBUG_ENTER("Log_event::read_log_event");
if (log_lock)
pthread_mutex_lock(log_lock);
......@@ -811,7 +825,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file,
const Format_description_log_event *description_event)
#endif
{
DBUG_ENTER("Log_event::read_log_event(IO_CACHE *, Format_description_log_event *");
DBUG_ENTER("Log_event::read_log_event");
DBUG_ASSERT(description_event != 0);
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
/*
......@@ -2472,16 +2486,6 @@ bool Format_description_log_event::write(IO_CACHE* file)
}
#endif
/*
SYNOPSIS
Format_description_log_event::do_apply_event()
IMPLEMENTATION
Save the information which describes the binlog's format, to be
able to read all coming events. Call
Start_log_event_v3::do_apply_event().
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Format_description_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
......@@ -2561,6 +2565,12 @@ int Format_description_log_event::do_update_pos(RELAY_LOG_INFO *rli)
}
}
Log_event::enum_skip_reason
Format_description_log_event::shall_skip(RELAY_LOG_INFO *rli)
{
return Log_event::EVENT_NOT_SKIPPED;
}
#endif
/**************************************************************************
......@@ -3425,6 +3435,16 @@ bool Rotate_log_event::write(IO_CACHE* file)
}
#endif
/**
Helper function to detect if the event is inside a group.
*/
static bool is_in_group(THD *const thd, RELAY_LOG_INFO *const rli)
{
return (thd->options & OPTION_BEGIN) != 0 ||
(rli->last_event_start_time > 0);
}
/*
Rotate_log_event::do_apply_event()
......@@ -3446,30 +3466,40 @@ bool Rotate_log_event::write(IO_CACHE* file)
int Rotate_log_event::do_update_pos(RELAY_LOG_INFO *rli)
{
DBUG_ENTER("Rotate_log_event::do_update_pos");
#ifndef DBUG_OFF
char buf[32];
#endif
DBUG_PRINT("info", ("server_id=%lu; ::server_id=%lu", this->server_id, ::server_id));
DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident));
DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf)));
pthread_mutex_lock(&rli->data_lock);
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
/*
If we are in a transaction: the only normal case is when the I/O thread was
copying a big transaction, then it was stopped and restarted: we have this
in the relay log:
If we are in a transaction or in a group: the only normal case is
when the I/O thread was copying a big transaction, then it was
stopped and restarted: we have this in the relay log:
BEGIN
...
ROTATE (a fake one)
...
COMMIT or ROLLBACK
In that case, we don't want to touch the coordinates which correspond to
the beginning of the transaction.
Starting from 5.0.0, there also are some rotates from the slave itself, in
the relay log.
In that case, we don't want to touch the coordinates which
correspond to the beginning of the transaction. Starting from
5.0.0, there also are some rotates from the slave itself, in the
relay log.
*/
if (!(thd->options & OPTION_BEGIN))
if (!is_in_group(thd, rli))
{
memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
rli->notify_group_master_log_name_update();
rli->group_master_log_pos= pos;
rli->group_relay_log_pos= rli->event_relay_log_pos;
DBUG_PRINT("info", ("group_master_log_name: '%s' "
"group_master_log_pos: %lu",
DBUG_PRINT("info", ("new group_master_log_name: '%s' "
"new group_master_log_pos: %lu",
rli->group_master_log_name,
(ulong) rli->group_master_log_pos));
/*
......@@ -3492,6 +3522,24 @@ int Rotate_log_event::do_update_pos(RELAY_LOG_INFO *rli)
DBUG_RETURN(0);
}
Log_event::enum_skip_reason
Rotate_log_event::shall_skip(RELAY_LOG_INFO *rli)
{
enum_skip_reason reason= Log_event::shall_skip(rli);
switch (reason) {
case Log_event::EVENT_NOT_SKIPPED:
case Log_event::EVENT_SKIP_COUNT:
return Log_event::EVENT_NOT_SKIPPED;
case Log_event::EVENT_SKIP_SAME_SID:
return Log_event::EVENT_SKIP_SAME_SID;
}
DBUG_ASSERT(0);
}
#endif
......@@ -3620,6 +3668,26 @@ int Intvar_log_event::do_update_pos(RELAY_LOG_INFO *rli)
rli->inc_event_relay_log_pos();
return 0;
}
Log_event::enum_skip_reason
Intvar_log_event::shall_skip(RELAY_LOG_INFO *rli)
{
/*
It is a common error to set the slave skip counter to 1 instead
of 2 when recovering from an insert which used a auto increment,
rand, or user var. Therefore, if the slave skip counter is 1,
we just say that this event should be skipped because of the
slave skip count, but we do not change the value of the slave
skip counter since it will be decreased by the following insert
event.
*/
if (rli->slave_skip_counter == 1)
return Log_event::EVENT_SKIP_COUNT;
else
return Log_event::shall_skip(rli);
}
#endif
......@@ -3694,6 +3762,25 @@ int Rand_log_event::do_update_pos(RELAY_LOG_INFO *rli)
return 0;
}
Log_event::enum_skip_reason
Rand_log_event::shall_skip(RELAY_LOG_INFO *rli)
{
/*
It is a common error to set the slave skip counter to 1 instead
of 2 when recovering from an insert which used a auto increment,
rand, or user var. Therefore, if the slave skip counter is 1,
we just say that this event should be skipped because of the
slave skip count, but we do not change the value of the slave
skip counter since it will be decreased by the following insert
event.
*/
if (rli->slave_skip_counter == 1)
return Log_event::EVENT_SKIP_COUNT;
else
return Log_event::shall_skip(rli);
}
#endif /* !MYSQL_CLIENT */
......@@ -4116,6 +4203,23 @@ int User_var_log_event::do_update_pos(RELAY_LOG_INFO *rli)
return 0;
}
Log_event::enum_skip_reason
User_var_log_event::shall_skip(RELAY_LOG_INFO *rli)
{
/*
It is a common error to set the slave skip counter to 1 instead
of 2 when recovering from an insert which used a auto increment,
rand, or user var. Therefore, if the slave skip counter is 1,
we just say that this event should be skipped because of the
slave skip count, but we do not change the value of the slave
skip counter since it will be decreased by the following insert
event.
*/
if (rli->slave_skip_counter == 1)
return Log_event::EVENT_SKIP_COUNT;
else
return Log_event::shall_skip(rli);
}
#endif /* !MYSQL_CLIENT */
......@@ -5814,9 +5918,9 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
break;
default:
slave_print_msg(ERROR_LEVEL, rli, error,
slave_print_msg(ERROR_LEVEL, rli, thd->net.last_errno,
"Error in %s event: row application failed",
get_type_str());
get_type_str(), error);
thd->query_error= 1;
break;
}
......@@ -5835,11 +5939,12 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
if (error)
{ /* error has occured during the transaction */
slave_print_msg(ERROR_LEVEL, rli, error,
slave_print_msg(ERROR_LEVEL, rli, thd->net.last_errno,
"Error in %s event: error during transaction execution "
"on table %s.%s",
get_type_str(), table->s->db.str,
table->s->table_name.str);
/*
If one day we honour --skip-slave-errors in row-based replication, and
the error should be skipped, then we would clear mappings, rollback,
......@@ -5851,7 +5956,7 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
thread is certainly going to stop.
*/
thd->reset_current_stmt_binlog_row_based();
const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, 1);
const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, error);
thd->query_error= 1;
DBUG_RETURN(error);
}
......@@ -5934,9 +6039,9 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
wait (reached end of last relay log and nothing gets appended
there), we timeout after one minute, and notify DBA about the
problem. When WL#2975 is implemented, just remove the member
st_relay_log_info::unsafe_to_stop_at and all its occurences.
st_relay_log_info::last_event_start_time and all its occurences.
*/
const_cast<RELAY_LOG_INFO*>(rli)->unsafe_to_stop_at= time(0);
const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= time(0);
}
DBUG_ASSERT(error == 0);
......@@ -6599,6 +6704,32 @@ copy_extra_record_fields(TABLE *table,
return 0; // All OK
}
/**
Check if an error is a duplicate key error.
This function is used to check if an error code is one of the
duplicate key error, i.e., and error code for which it is sensible
to do a <code>get_dup_key()</code> to retrieve the duplicate key.
@param errcode The error code to check.
@return <code>true</code> if the error code is such that
<code>get_dup_key()</code> will return true, <code>false</code>
otherwise.
*/
bool
is_duplicate_key_error(int errcode)
{
switch (errcode)
{
case HA_ERR_FOUND_DUPP_KEY:
case HA_ERR_FOUND_DUPP_UNIQUE:
return true;
}
return false;
}
/*
Replace the provided record in the database.
......@@ -6633,10 +6764,15 @@ replace_record(THD *thd, TABLE *table,
while ((error= table->file->ha_write_row(table->record[0])))
{
if (!is_duplicate_key_error(error))
{
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
if ((keynum= table->file->get_dup_key(error)) < 0)
{
/* We failed to retrieve the duplicate key */
DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
DBUG_RETURN(error);
}
/*
......@@ -6653,8 +6789,11 @@ replace_record(THD *thd, TABLE *table,
{
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
if (error)
{
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
}
else
{
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
......@@ -6675,8 +6814,11 @@ replace_record(THD *thd, TABLE *table,
table->key_info[keynum].key_length,
HA_READ_KEY_EXACT);
if (error)
{
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
}
/*
Now, table->record[1] should contain the offending row. That
......@@ -6708,15 +6850,21 @@ replace_record(THD *thd, TABLE *table,
{
error=table->file->ha_update_row(table->record[1],
table->record[0]);
if (error)
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
else
{
if ((error= table->file->ha_delete_row(table->record[1])))
{
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
/* Will retry ha_write_row() with the offending row removed. */
}
}
DBUG_RETURN(error);
}
......
......@@ -36,7 +36,7 @@ st_relay_log_info::st_relay_log_info()
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0),
unsafe_to_stop_at(0)
last_event_start_time(0)
{
DBUG_ENTER("st_relay_log_info::st_relay_log_info");
......@@ -1001,6 +1001,22 @@ bool st_relay_log_info::is_until_satisfied()
log_pos= group_relay_log_pos;
}
#ifndef DBUG_OFF
{
char buf[32];
DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
group_master_log_name, llstr(group_master_log_pos, buf)));
DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
group_relay_log_name, llstr(group_relay_log_pos, buf)));
DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
log_name, llstr(log_pos, buf)));
DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
until_log_name, llstr(until_log_pos, buf)));
}
#endif
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
{
/*
......@@ -1095,7 +1111,7 @@ void st_relay_log_info::cleanup_context(THD *thd, bool error)
m_table_map.clear_tables();
close_thread_tables(thd);
clear_tables_to_lock();
unsafe_to_stop_at= 0;
last_event_start_time= 0;
DBUG_VOID_RETURN;
}
#endif
......@@ -305,7 +305,14 @@ typedef struct st_relay_log_info
DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
}
time_t unsafe_to_stop_at;
/*
Used by row-based replication to detect that it should not stop at
this event, but give it a chance to send more events. The time
where the last event inside a group started is stored here. If the
variable is zero, we are not in a group (but may be in a
transaction).
*/
time_t last_event_start_time;
} RELAY_LOG_INFO;
......
......@@ -517,11 +517,11 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
really one minute of idleness, we don't timeout if the slave SQL thread
is actively working.
*/
if (!rli->unsafe_to_stop_at)
if (rli->last_event_start_time == 0)
DBUG_RETURN(1);
DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
"it some grace period"));
if (difftime(time(0), rli->unsafe_to_stop_at) > 60)
if (difftime(time(0), rli->last_event_start_time) > 60)
{
slave_print_msg(ERROR_LEVEL, rli, 0,
"SQL thread had to stop in an unsafe situation, in "
......@@ -1737,61 +1737,14 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
now the relay log starts with its Format_desc, has a Rotate etc).
*/
DBUG_PRINT("info",("type_code=%d, server_id=%d",type_code,ev->server_id));
DBUG_PRINT("info",("type_code=%d (%s), server_id=%d",
type_code, ev->get_type_str(), ev->server_id));
if ((ev->server_id == (uint32) ::server_id &&
!replicate_same_server_id &&
type_code != FORMAT_DESCRIPTION_EVENT) ||
(rli->slave_skip_counter &&
type_code != ROTATE_EVENT && type_code != STOP_EVENT &&
type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT))
{
DBUG_PRINT("info", ("event skipped"));
/*
We only skip the event here and do not increase the group log
position. In the event that we have to restart, this means
that we might have to skip the event again, but that is a
minor issue.
If we were to increase the group log position when skipping an
event, it might be that we are restarting at the wrong
position and have events before that we should have executed,
so not increasing the group log position is a sure bet in this
case.
In this way, we just step the group log position when we
*know* that we are at the end of a group.
*/
rli->inc_event_relay_log_pos();
/*
Protect against common user error of setting the counter to 1
instead of 2 while recovering from an insert which used auto_increment,
rand or user var.
*/
if (rli->slave_skip_counter &&
!((type_code == INTVAR_EVENT ||
type_code == RAND_EVENT ||
type_code == USER_VAR_EVENT) &&
rli->slave_skip_counter == 1) &&
/*
The events from ourselves which have something to do with the relay
log itself must be skipped, true, but they mustn't decrement
rli->slave_skip_counter, because the user is supposed to not see
these events (they are not in the master's binlog) and if we
decremented, START SLAVE would for example decrement when it sees
the Rotate, so the event which the user probably wanted to skip
would not be skipped.
Execute the event, but first we set some data that is needed for
the thread.
*/
!(ev->server_id == (uint32) ::server_id &&
(type_code == ROTATE_EVENT || type_code == STOP_EVENT ||
type_code == START_EVENT_V3 || type_code == FORMAT_DESCRIPTION_EVENT)))
--rli->slave_skip_counter;
pthread_mutex_unlock(&rli->data_lock);
delete ev;
DBUG_RETURN(0); // avoid infinite update loops
}
pthread_mutex_unlock(&rli->data_lock);
thd->server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query
......@@ -1799,7 +1752,8 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
if (!ev->when)
ev->when = time(NULL);
ev->thd = thd; // because up to this point, ev->thd == 0
exec_res = ev->exec_event(rli);
exec_res= ev->exec_event(rli);
DBUG_PRINT("info", ("exec_event result = %d", exec_res));
DBUG_ASSERT(rli->sql_thd==thd);
/*
......@@ -2354,13 +2308,17 @@ Slave SQL thread aborted. Can't execute init_slave query");
THD_CHECK_SENTRY(thd);
if (exec_relay_log_event(thd,rli))
{
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped
if (!sql_slave_killed(thd,rli))
{
/*
retrieve as much info as possible from the thd and, error codes and warnings
and print this to the error log as to allow the user to locate the error
retrieve as much info as possible from the thd and, error
codes and warnings and print this to the error log as to
allow the user to locate the error
*/
DBUG_PRINT("info", ("thd->net.last_errno=%d; rli->last_slave_errno=%d",
thd->net.last_errno, rli->last_slave_errno));
if (thd->net.last_errno != 0)
{
if (rli->last_slave_errno == 0)
......@@ -2682,6 +2640,7 @@ static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf,
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(1);
}
pthread_mutex_lock(&mi->data_lock);
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
switch (ev->get_type_code()) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment