Commit 3dc3ab1a authored by Andrei Elkin's avatar Andrei Elkin Committed by Monty

Added checking that row events ends with a proper end block

Problems --------

The slave io thread did not conduct integrity check
for a group of row-based events. Specifically it tolerates missed
terminal block event that must be flagged with STMT_END. Failure to
react on its loss can confuse the applier thread in various ways.
Another potential issue was that there were no check of impossible
second in row Gtid-log-event while the slave io thread is receiving
to be skipped events after reconnect.

Fixes
-----
The slave io thread is made by this patch to track the rows event
STMT_END status.
Whenever at next event reading the IO thread finds out that a preceding
Rows event did not actually had the flag, an
explicit error is issued.

Replication can be resumed after the source of failure is eliminated,
see a provided test.

Note that currently the row-based group integrity check excludes
the compressed version 2 Rows events (which are not generated by MariaDB
master).
Its uncompressed counterpart is manually tested.

The 2nd issue is covered to produce an error in case the io thread
receives a successive Gtid_log_event while it is post-reconnect
skipping.
parent 5fce14da
include/master-slave.inc
[connection master]
connection slave;
call mtr.add_suppression("Slave IO thread did not receive an expected Rows-log end-of-statement");
call mtr.add_suppression("Relay log write failure: could not queue event from master");
SET @save_debug= @@global.debug;
SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss";
include/stop_slave.inc
CHANGE MASTER TO master_host = '127.0.0.1', master_port = MASTER_PORT, MASTER_USE_GTID=SLAVE_POS;
connection master;
CREATE TABLE t (a INT, b text(8192));;
INSERT INTO t values (1, repeat('b', 8192)), (1, repeat('b', 8192));
connection slave;
START SLAVE IO_THREAD;
include/wait_for_slave_io_error.inc [errno=1595]
SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss";
include/start_slave.inc
connection master;
connection slave;
connection slave;
include/stop_slave.inc
connection master;
SET @save_log_bin_compress= @@GLOBAL.log_bin_compress;
SET @save_log_bin_compress_min_len= @@GLOBAL.log_bin_compress_min_len;
SET @@GLOBAL.log_bin_compress=ON;
SET @@GLOBAL.log_bin_compress_min_len=10;
INSERT INTO t values (2, repeat('b', 8192)), (2, repeat('b', 8192));
connection slave;
SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss";
START SLAVE IO_THREAD;
include/wait_for_slave_io_error.inc [errno=1595]
SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss";
include/start_slave.inc
connection master;
connection slave;
connection master;
SET @@GLOBAL.log_bin_compress= @save_log_bin_compress;
SET @@GLOBAL.log_bin_compress_min_len= @save_log_bin_compress_min_len;
DROP TABLE t;
connection slave;
SET GLOBAL debug_dbug= @save_debug;
include/rpl_end.inc
--source include/have_debug.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
# Loss of STMT_END flagged event must error out the IO thread
--connection slave
call mtr.add_suppression("Slave IO thread did not receive an expected Rows-log end-of-statement");
call mtr.add_suppression("Relay log write failure: could not queue event from master");
SET @save_debug= @@global.debug;
SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss";
--source include/stop_slave.inc
--replace_result $MASTER_MYPORT MASTER_PORT
--eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT, MASTER_USE_GTID=SLAVE_POS
--connection master
--let $max_row_size=8192
--eval CREATE TABLE t (a INT, b text($max_row_size));
--eval INSERT INTO t values (1, repeat('b', $max_row_size)), (1, repeat('b', $max_row_size))
# Prove that the missed STMT_END marked rows-event causes the io thread stop.
--connection slave
START SLAVE IO_THREAD;
--let $slave_io_errno=1595
--source include/wait_for_slave_io_error.inc
SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss";
--source include/start_slave.inc
--connection master
sync_slave_with_master;
# Compressed version of the above
--connection slave
--source include/stop_slave.inc
--connection master
SET @save_log_bin_compress= @@GLOBAL.log_bin_compress;
SET @save_log_bin_compress_min_len= @@GLOBAL.log_bin_compress_min_len;
SET @@GLOBAL.log_bin_compress=ON;
SET @@GLOBAL.log_bin_compress_min_len=10;
--eval INSERT INTO t values (2, repeat('b', $max_row_size)), (2, repeat('b', $max_row_size))
# Prove that the missed STMT_END marked rows-event causes the io thread stop.
--connection slave
SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss";
START SLAVE IO_THREAD;
--let $slave_io_errno=1595
--source include/wait_for_slave_io_error.inc
SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss";
--source include/start_slave.inc
--connection master
sync_slave_with_master;
# cleanup
--connection master
SET @@GLOBAL.log_bin_compress= @save_log_bin_compress;
SET @@GLOBAL.log_bin_compress_min_len= @save_log_bin_compress_min_len;
DROP TABLE t;
sync_slave_with_master;
SET GLOBAL debug_dbug= @save_debug;
--source include/rpl_end.inc
...@@ -133,6 +133,19 @@ class Domain_id_filter ...@@ -133,6 +133,19 @@ class Domain_id_filter
extern TYPELIB slave_parallel_mode_typelib; extern TYPELIB slave_parallel_mode_typelib;
typedef struct st_rows_event_tracker
{
char binlog_file_name[FN_REFLEN];
my_off_t first_seen;
my_off_t last_seen;
bool stmt_end_seen;
void update(const char* file_name, size_t pos,
const char* buf,
const Format_description_log_event *fdle);
void reset();
bool check_and_report(const char* file_name, size_t pos);
} Rows_event_tracker;
/***************************************************************************** /*****************************************************************************
Replication IO Thread Replication IO Thread
...@@ -301,6 +314,14 @@ class Master_info : public Slave_reporting_capability ...@@ -301,6 +314,14 @@ class Master_info : public Slave_reporting_capability
uint64 gtid_reconnect_event_skip_count; uint64 gtid_reconnect_event_skip_count;
/* gtid_event_seen is false until we receive first GTID event from master. */ /* gtid_event_seen is false until we receive first GTID event from master. */
bool gtid_event_seen; bool gtid_event_seen;
/**
The struct holds some history of Rows- log-event reading/queuing
by the receiver thread. Its fields are updated per each such event
at time of queue_event(), and they are checked to detect
the Rows- event group integrity violation at time of first non-Rows-
event gets handled.
*/
Rows_event_tracker rows_event_tracker;
bool in_start_all_slaves, in_stop_all_slaves; bool in_start_all_slaves, in_stop_all_slaves;
bool in_flush_all_relay_logs; bool in_flush_all_relay_logs;
uint users; /* Active user for object */ uint users; /* Active user for object */
......
...@@ -3349,7 +3349,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, ...@@ -3349,7 +3349,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
we suppress prints to .err file as long as the reconnect we suppress prints to .err file as long as the reconnect
happens without problems happens without problems
*/ */
*suppress_warnings= TRUE; *suppress_warnings=
global_system_variables.log_warnings < 2 ? TRUE : FALSE;
} }
else else
{ {
...@@ -4274,6 +4275,7 @@ pthread_handler_t handle_slave_io(void *arg) ...@@ -4274,6 +4275,7 @@ pthread_handler_t handle_slave_io(void *arg)
mi->abort_slave = 0; mi->abort_slave = 0;
mysql_mutex_unlock(&mi->run_lock); mysql_mutex_unlock(&mi->run_lock);
mysql_cond_broadcast(&mi->start_cond); mysql_cond_broadcast(&mi->start_cond);
mi->rows_event_tracker.reset();
DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu", DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu",
mi->master_log_name, mi->master_log_pos)); mi->master_log_name, mi->master_log_pos));
...@@ -4356,6 +4358,10 @@ pthread_handler_t handle_slave_io(void *arg) ...@@ -4356,6 +4358,10 @@ pthread_handler_t handle_slave_io(void *arg)
*/ */
mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid; mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid;
mi->gtid_event_seen= false; mi->gtid_event_seen= false;
/*
Reset stale state of the rows-event group tracker at reconnect.
*/
mi->rows_event_tracker.reset();
} }
#ifdef ENABLED_DEBUG_SYNC #ifdef ENABLED_DEBUG_SYNC
...@@ -5752,7 +5758,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5752,7 +5758,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
char* new_buf = NULL; char* new_buf = NULL;
char new_buf_arr[4096]; char new_buf_arr[4096];
bool is_malloc = false; bool is_malloc = false;
bool is_rows_event= false;
/* /*
FD_q must have been prepared for the first R_a event FD_q must have been prepared for the first R_a event
inside get_master_version_and_clock() inside get_master_version_and_clock()
...@@ -6186,11 +6192,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6186,11 +6192,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
got_gtid_event= true; got_gtid_event= true;
if (mi->using_gtid == Master_info::USE_GTID_NO) if (mi->using_gtid == Master_info::USE_GTID_NO)
goto default_action; goto default_action;
if (unlikely(!mi->gtid_event_seen)) if (unlikely(mi->gtid_reconnect_event_skip_count))
{ {
mi->gtid_event_seen= true; if (likely(!mi->gtid_event_seen))
if (mi->gtid_reconnect_event_skip_count)
{ {
mi->gtid_event_seen= true;
/* /*
If we are reconnecting, and we need to skip a partial event group If we are reconnecting, and we need to skip a partial event group
already queued to the relay log before the reconnect, then we check already queued to the relay log before the reconnect, then we check
...@@ -6219,13 +6225,45 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6219,13 +6225,45 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first); rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first);
goto err; goto err;
} }
if (global_system_variables.log_warnings > 1)
{
bool first= true;
StringBuffer<1024> gtid_text;
rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
&first);
sql_print_information("Slave IO thread is reconnected to "
"receive Gtid_log_event %s. It is to skip %llu "
"already received events including the gtid one",
gtid_text.ptr(),
mi->events_queued_since_last_gtid);
} }
goto default_action;
} }
else
if (unlikely(mi->gtid_reconnect_event_skip_count))
{ {
goto default_action; bool first;
StringBuffer<1024> gtid_text;
gtid_text.append(STRING_WITH_LEN("Last received gtid: "));
first= true;
rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
&first);
gtid_text.append(STRING_WITH_LEN(", currently received: "));
first= true;
rpl_slave_state_tostring_helper(&gtid_text, &event_gtid, &first);
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
sql_print_error("Slave IO thread has received a new Gtid_log_event "
"while skipping already logged events "
"after reconnect. %s. %llu remains to be skipped. "
"The number of originally read events was %llu",
gtid_text.ptr(),
mi->gtid_reconnect_event_skip_count,
mi->events_queued_since_last_gtid);
goto err;
}
} }
mi->gtid_event_seen= true;
/* /*
We have successfully queued to relay log everything before this GTID, so We have successfully queued to relay log everything before this GTID, so
...@@ -6292,8 +6330,34 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6292,8 +6330,34 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err; goto err;
} }
} }
buf = new_buf;
is_compress_event = true; is_compress_event = true;
buf = new_buf;
/*
As we are uncertain about compressed V2 rows events, we don't track
them
*/
if (LOG_EVENT_IS_ROW_V2((Log_event_type) buf[EVENT_TYPE_OFFSET]))
goto default_action;
/* fall through */
case WRITE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V1:
case WRITE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT:
{
is_rows_event= true;
mi->rows_event_tracker.update(mi->master_log_name,
mi->master_log_pos,
buf,
mi->rli.relay_log.
description_event_for_queue);
DBUG_EXECUTE_IF("simulate_stmt_end_rows_event_loss",
{
mi->rows_event_tracker.stmt_end_seen= false;
});
}
goto default_action; goto default_action;
#ifndef DBUG_OFF #ifndef DBUG_OFF
...@@ -6351,6 +6415,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6351,6 +6415,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
break; break;
} }
/*
Integrity of Rows- event group check.
A sequence of Rows- events must end with STMT_END_F flagged one.
Even when Heartbeat event interrupts Rows- events flow this must indicate a
malfunction e.g logging on the master.
*/
if (((uchar) buf[EVENT_TYPE_OFFSET] != HEARTBEAT_LOG_EVENT) &&
!is_rows_event &&
mi->rows_event_tracker.check_and_report(mi->master_log_name,
mi->master_log_pos))
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
/* /*
If we filter events master-side (eg. @@skip_replication), we will see holes If we filter events master-side (eg. @@skip_replication), we will see holes
in the event positions from the master. If we see such a hole, adjust in the event positions from the master. If we see such a hole, adjust
...@@ -6519,6 +6598,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6519,6 +6598,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
The whole of the current event group is queued. So in case of The whole of the current event group is queued. So in case of
reconnect we can start from after the current GTID. reconnect we can start from after the current GTID.
*/ */
if (mi->gtid_reconnect_event_skip_count)
{
bool first= true;
StringBuffer<1024> gtid_text;
rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
&first);
sql_print_error("Slave IO thread received a terminal event from "
"group %s whose retrieval was interrupted "
"with reconnect. We still had %llu events to read. "
"The number of originally read events was %llu",
gtid_text.ptr(),
mi->gtid_reconnect_event_skip_count,
mi->events_queued_since_last_gtid);
}
mi->gtid_current_pos.update(&mi->last_queued_gtid); mi->gtid_current_pos.update(&mi->last_queued_gtid);
mi->events_queued_since_last_gtid= 0; mi->events_queued_since_last_gtid= 0;
...@@ -7518,6 +7612,92 @@ bool rpl_master_erroneous_autoinc(THD *thd) ...@@ -7518,6 +7612,92 @@ bool rpl_master_erroneous_autoinc(THD *thd)
return FALSE; return FALSE;
} }
static bool get_row_event_stmt_end(const char* buf,
const Format_description_log_event *fdle)
{
uint8 const common_header_len= fdle->common_header_len;
Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET];
uint8 const post_header_len= fdle->post_header_len[event_type-1];
const char *flag_start= buf + common_header_len;
/*
The term 4 below signifies that master is of 'an intermediate source', see
Rows_log_event::Rows_log_event.
*/
flag_start += RW_MAPID_OFFSET + (post_header_len == 6) ? 4 : RW_FLAGS_OFFSET;
return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0;
}
/*
Reset log event tracking data.
*/
void Rows_event_tracker::reset()
{
binlog_file_name[0]= 0;
first_seen= last_seen= 0;
stmt_end_seen= false;
}
/*
Update log event tracking data.
The first- and last- seen event binlog position get memorized, as
well as the end-of-statement status of the last one.
*/
void Rows_event_tracker::update(const char* file_name, size_t pos,
const char* buf,
const Format_description_log_event *fdle)
{
if (!first_seen)
{
first_seen= pos;
strmake(binlog_file_name, file_name, sizeof(binlog_file_name) - 1);
}
last_seen= pos;
DBUG_ASSERT(stmt_end_seen == 0); // We can only have one
stmt_end_seen= get_row_event_stmt_end(buf, fdle);
};
/**
The function is called at next event reading
after a sequence of Rows- log-events. It checks the end-of-statement status
of the past sequence to report on any isssue.
In the positive case the tracker gets reset.
@return true when the Rows- event group integrity found compromised,
false otherwise.
*/
bool Rows_event_tracker::check_and_report(const char* file_name,
size_t pos)
{
if (last_seen)
{
// there was at least one "block" event previously
if (!stmt_end_seen)
{
sql_print_error("Slave IO thread did not receive an expected "
"Rows-log end-of-statement for event starting "
"at log '%s' position %llu "
"whose last block was seen at log '%s' position %llu. "
"The end-of-statement should have been delivered "
"before the current one at log '%s' position %llu",
binlog_file_name, first_seen,
binlog_file_name, last_seen, file_name, pos);
return true;
}
reset();
}
return false;
}
/** /**
@} (end of group Replication) @} (end of group Replication)
*/ */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment