Commit abe6eb10 authored by Sujatha's avatar Sujatha

MDEV-16146: MariaDB slave stops with following errors.

Problem:
========
180511 11:07:58 [ERROR] Slave I/O: Unexpected master's heartbeat data:
heartbeat is not compatible with local info;the event's data: log_file_name
mysql-bin.000009 log_pos 1054262041, Error_code: 1623

Analysis:
=========
In replication setup when master server doesn't have any events to send to
slave server it sends an 'Heartbeat_log_event'. This event carries the
current binary log filename and offset details. The offset values is stored
within 4 bytes of event header. When the size of binary log is higher than
UINT32_MAX the log_pos values will not fit in 4 bytes memory.  It overflows
and hence slave stops with an error.

Fix:
===
Since we cannot extend the common_header of Log_event class, a greater than
4GB value of Log_event::log_pos is made to be transported with a HeartBeat
event's sub-header.  Log_event::log_pos in such case is set to zero to
indicate that the 8 byte sub-header is allocated in the event.

In case of cross version replication following behaviour is expected

OLD - Server without fix
NEW - Server with fix

OLD<->NEW : works bidirectionally as long as the binlog offset is
            (normally) within 4GB.

When log_pos > UINT32_MAX
OLD->NEW  : The 'log_pos' is bound to overflow and NEW slave may report
            an invalid event/incompatible heart beat event error.
NEW->OLD  : Since patched server sets log_pos=0 on overflow, OLD slave will
            report invalid event error.
parent 13b9af50
include/master-slave.inc
[connection master]
connection master;
SET @saved_dbug = @@GLOBAL.debug_dbug;
SET @@global.debug_dbug= 'd,simulate_pos_4G';
connection slave;
include/stop_slave.inc
CHANGE MASTER TO MASTER_HEARTBEAT_PERIOD=0.001;
include/start_slave.inc
connection master;
SET @@GLOBAL.debug_dbug = @saved_dbug;
connection slave;
connection master;
CREATE TABLE t (f INT) ENGINE=INNODB;
INSERT INTO t VALUES (10);
DROP TABLE t;
include/rpl_end.inc
# ==== Purpose ====
#
# Test verifies that slave IO thread can process heartbeat events with log_pos
# values higher than UINT32_MAX.
#
# ==== Implementation ====
#
# Steps:
# 0 - Stop slave threads. Configure a small master_heartbeat_period.
# 1 - Using debug points, simulate a huge binlog offset higher than
# UINT32_MAX on master.
# 2 - Start the slave and observe that slave IO thread is able to process
# the offset received through heartbeat event.
#
# ==== References ====
#
# MDEV-16146: MariaDB slave stops with incompatible heartbeat
#
--source include/have_debug.inc
--source include/have_innodb.inc
--source include/have_binlog_format_mixed.inc
# Test simulates binarylog offsets higher than UINT32_MAX
--source include/have_64bit.inc
--source include/master-slave.inc
--connection master
SET @saved_dbug = @@GLOBAL.debug_dbug;
SET @@global.debug_dbug= 'd,simulate_pos_4G';
--connection slave
--source include/stop_slave.inc
CHANGE MASTER TO MASTER_HEARTBEAT_PERIOD=0.001;
--source include/start_slave.inc
--connection master
sleep 1;
SET @@GLOBAL.debug_dbug = @saved_dbug;
--sync_slave_with_master
--connection master
CREATE TABLE t (f INT) ENGINE=INNODB;
INSERT INTO t VALUES (10);
DROP TABLE t;
--source include/rpl_end.inc
...@@ -14324,14 +14324,23 @@ st_print_event_info::st_print_event_info() ...@@ -14324,14 +14324,23 @@ st_print_event_info::st_print_event_info()
#endif #endif
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len, Heartbeat_log_event::Heartbeat_log_event(const char* buf, ulong event_len,
const Format_description_log_event* description_event) const Format_description_log_event* description_event)
:Log_event(buf, description_event) :Log_event(buf, description_event)
{ {
uint8 header_size= description_event->common_header_len; uint8 header_size= description_event->common_header_len;
ident_len = event_len - header_size; if (log_pos == 0)
set_if_smaller(ident_len,FN_REFLEN-1); {
log_pos= uint8korr(buf + header_size);
log_ident= buf + header_size + HB_SUB_HEADER_LEN;
ident_len= event_len - (header_size + HB_SUB_HEADER_LEN);
}
else
{
log_ident= buf + header_size; log_ident= buf + header_size;
ident_len = event_len - header_size;
}
} }
#endif #endif
......
...@@ -570,6 +570,14 @@ class String; ...@@ -570,6 +570,14 @@ class String;
#define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_GTID #define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_GTID
/*
When the size of 'log_pos' within Heartbeat_log_event exceeds UINT32_MAX it
cannot be accommodated in common_header, as 'log_pos' is of 4 bytes size. In
such cases, sub_header, of size 8 bytes will hold larger 'log_pos' value.
*/
#define HB_SUB_HEADER_LEN 8
/** /**
@enum Log_event_type @enum Log_event_type
...@@ -5160,12 +5168,13 @@ static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, ...@@ -5160,12 +5168,13 @@ static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache,
class Heartbeat_log_event: public Log_event class Heartbeat_log_event: public Log_event
{ {
public: public:
Heartbeat_log_event(const char* buf, uint event_len, uint8 hb_flags;
Heartbeat_log_event(const char* buf, ulong event_len,
const Format_description_log_event* description_event); const Format_description_log_event* description_event);
Log_event_type get_type_code() { return HEARTBEAT_LOG_EVENT; } Log_event_type get_type_code() { return HEARTBEAT_LOG_EVENT; }
bool is_valid() const bool is_valid() const
{ {
return (log_ident != NULL && return (log_ident != NULL && ident_len <= FN_REFLEN-1 &&
log_pos >= BIN_LOG_HEADER_SIZE); log_pos >= BIN_LOG_HEADER_SIZE);
} }
const char * get_log_ident() { return log_ident; } const char * get_log_ident() { return log_ident; }
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "debug_sync.h" #include "debug_sync.h"
#include "log.h" // get_gtid_list_event #include "log.h" // get_gtid_list_event
enum enum_gtid_until_state { enum enum_gtid_until_state {
GTID_UNTIL_NOT_DONE, GTID_UNTIL_NOT_DONE,
GTID_UNTIL_STOP_AFTER_STANDALONE, GTID_UNTIL_STOP_AFTER_STANDALONE,
...@@ -795,6 +796,8 @@ static int send_heartbeat_event(binlog_send_info *info, ...@@ -795,6 +796,8 @@ static int send_heartbeat_event(binlog_send_info *info,
DBUG_ENTER("send_heartbeat_event"); DBUG_ENTER("send_heartbeat_event");
ulong ev_offset; ulong ev_offset;
char sub_header_buf[HB_SUB_HEADER_LEN];
bool sub_header_in_use=false;
if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -815,18 +818,38 @@ static int send_heartbeat_event(binlog_send_info *info, ...@@ -815,18 +818,38 @@ static int send_heartbeat_event(binlog_send_info *info,
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ulong event_len = ident_len + LOG_EVENT_HEADER_LEN +
(do_checksum ? BINLOG_CHECKSUM_LEN : 0); (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
DBUG_EXECUTE_IF("simulate_pos_4G",
{
const_cast<event_coordinates *>(coord)->pos= (UINT_MAX32 + (ulong)1);
DBUG_SET("-d, simulate_pos_4G");
};);
if (coord->pos <= UINT_MAX32)
{
int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
}
else
{
// Set common_header.log_pos=0 to indicate its overflow
int4store(header + LOG_POS_OFFSET, 0);
sub_header_in_use= true;
int8store(sub_header_buf, coord->pos);
event_len+= HB_SUB_HEADER_LEN;
}
int4store(header + EVENT_LEN_OFFSET, event_len); int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, 0); int2store(header + FLAGS_OFFSET, 0);
int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
packet->append(header, sizeof(header)); packet->append(header, sizeof(header));
if (sub_header_in_use)
packet->append(sub_header_buf, sizeof(sub_header_buf));
packet->append(p, ident_len); // log_file_name packet->append(p, ident_len); // log_file_name
if (do_checksum) if (do_checksum)
{ {
char b[BINLOG_CHECKSUM_LEN]; char b[BINLOG_CHECKSUM_LEN];
ha_checksum crc= my_checksum(0, (uchar*) header, sizeof(header)); ha_checksum crc= my_checksum(0, (uchar*) header, sizeof(header));
if (sub_header_in_use)
crc= my_checksum(crc, (uchar*) sub_header_buf, sizeof(sub_header_buf));
crc= my_checksum(crc, (uchar*) p, ident_len); crc= my_checksum(crc, (uchar*) p, ident_len);
int4store(b, crc); int4store(b, crc);
packet->append(b, sizeof(b)); packet->append(b, sizeof(b));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment