Commit 4d8b346e authored by Jonas Oreland's avatar Jonas Oreland Committed by Kristian Nielsen

MDEV-7257: Dump Thread Enhancements

Make the binlog dump threads not need to take LOCK_log while sending
binlog events to slave. Instead, a new LOCK_binlog_end_pos is used
just to coordinate tracking the current end-of-log.

This is a pre-requisite for MDEV-162, "Enhanced semisync
replication". It should also help reduce the contention on LOCK_log on
a busy master.

Also does some much-needed refactoring/cleanup of the related code in
the binlog dump thread.
parent ea01fff5
...@@ -71,7 +71,7 @@ insert into t1 values (1) /* will not be applied on slave due to simulation */; ...@@ -71,7 +71,7 @@ insert into t1 values (1) /* will not be applied on slave due to simulation */;
set @@global.debug_dbug='d,simulate_slave_unaware_checksum'; set @@global.debug_dbug='d,simulate_slave_unaware_checksum';
start slave; start slave;
include/wait_for_slave_io_error.inc [errno=1236] include/wait_for_slave_io_error.inc [errno=1236]
Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Slave can not handle replication events with the checksum that master is configured to log; the first event 'master-bin.000009' at 367, the last event read from 'master-bin.000010' at 248, the last byte read from 'master-bin.000010' at 248.'' Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Slave can not handle replication events with the checksum that master is configured to log; the first event 'master-bin.000009' at 367, the last event read from 'master-bin.000010' at 4, the last byte read from 'master-bin.000010' at 248.''
select count(*) as zero from t1; select count(*) as zero from t1;
zero zero
0 0
......
...@@ -3133,6 +3133,7 @@ void MYSQL_BIN_LOG::cleanup() ...@@ -3133,6 +3133,7 @@ void MYSQL_BIN_LOG::cleanup()
mysql_mutex_destroy(&LOCK_index); mysql_mutex_destroy(&LOCK_index);
mysql_mutex_destroy(&LOCK_xid_list); mysql_mutex_destroy(&LOCK_xid_list);
mysql_mutex_destroy(&LOCK_binlog_background_thread); mysql_mutex_destroy(&LOCK_binlog_background_thread);
mysql_mutex_destroy(&LOCK_binlog_end_pos);
mysql_cond_destroy(&update_cond); mysql_cond_destroy(&update_cond);
mysql_cond_destroy(&COND_queue_busy); mysql_cond_destroy(&COND_queue_busy);
mysql_cond_destroy(&COND_xid_list); mysql_cond_destroy(&COND_xid_list);
...@@ -3178,6 +3179,9 @@ void MYSQL_BIN_LOG::init_pthread_objects() ...@@ -3178,6 +3179,9 @@ void MYSQL_BIN_LOG::init_pthread_objects()
&COND_binlog_background_thread, 0); &COND_binlog_background_thread, 0);
mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end, mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end,
&COND_binlog_background_thread_end, 0); &COND_binlog_background_thread_end, 0);
mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
MY_MUTEX_INIT_SLOW);
} }
...@@ -3524,10 +3528,19 @@ bool MYSQL_BIN_LOG::open(const char *log_name, ...@@ -3524,10 +3528,19 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (flush_io_cache(&log_file) || if (flush_io_cache(&log_file) ||
mysql_file_sync(log_file.file, MYF(MY_WME|MY_SYNC_FILESIZE))) mysql_file_sync(log_file.file, MYF(MY_WME|MY_SYNC_FILESIZE)))
goto err; goto err;
my_off_t offset= my_b_tell(&log_file);
if (!is_relay_log)
{
/* update binlog_end_pos so that it can be read by after sync hook */
reset_binlog_end_pos(log_file_name, offset);
mysql_mutex_lock(&LOCK_commit_ordered); mysql_mutex_lock(&LOCK_commit_ordered);
strmake_buf(last_commit_pos_file, log_file_name); strmake_buf(last_commit_pos_file, log_file_name);
last_commit_pos_offset= my_b_tell(&log_file); last_commit_pos_offset= offset;
mysql_mutex_unlock(&LOCK_commit_ordered); mysql_mutex_unlock(&LOCK_commit_ordered);
}
if (write_file_name_to_index_file) if (write_file_name_to_index_file)
{ {
...@@ -3632,6 +3645,7 @@ int MYSQL_BIN_LOG::get_current_log(LOG_INFO* linfo) ...@@ -3632,6 +3645,7 @@ int MYSQL_BIN_LOG::get_current_log(LOG_INFO* linfo)
int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo) int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo)
{ {
mysql_mutex_assert_owner(&LOCK_log);
strmake_buf(linfo->log_file_name, log_file_name); strmake_buf(linfo->log_file_name, log_file_name);
linfo->pos = my_b_tell(&log_file); linfo->pos = my_b_tell(&log_file);
return 0; return 0;
...@@ -4797,6 +4811,20 @@ void MYSQL_BIN_LOG::make_log_name(char* buf, const char* log_ident) ...@@ -4797,6 +4811,20 @@ void MYSQL_BIN_LOG::make_log_name(char* buf, const char* log_ident)
bool MYSQL_BIN_LOG::is_active(const char *log_file_name_arg) bool MYSQL_BIN_LOG::is_active(const char *log_file_name_arg)
{ {
/**
* there should/must be mysql_mutex_assert_owner(&LOCK_log) here...
* but code violates this! (scary monsters and super creeps!)
*
* example stacktrace:
* #8 MYSQL_BIN_LOG::is_active
* #9 MYSQL_BIN_LOG::can_purge_log
* #10 MYSQL_BIN_LOG::purge_logs
* #11 MYSQL_BIN_LOG::purge_first_log
* #12 next_event
* #13 exec_relay_log_event
*
* I didn't investigate if this is ligit...(i.e if my comment is wrong)
*/
return !strcmp(log_file_name, log_file_name_arg); return !strcmp(log_file_name, log_file_name_arg);
} }
...@@ -5359,6 +5387,7 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd) ...@@ -5359,6 +5387,7 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data(); binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data();
/* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */ /* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */
mysql_mutex_assert_owner(&LOCK_commit_ordered);
strmake_buf(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file); strmake_buf(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file);
cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset; cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset;
...@@ -6013,6 +6042,14 @@ err: ...@@ -6013,6 +6042,14 @@ err:
} }
else else
{ {
/* update binlog_end_pos so it can be read by dump thread
*
* note: must be _after_ the RUN_HOOK(after_flush) or else
* semi-sync-plugin might not have put the transaction into
* it's list before dump-thread tries to send it
*/
update_binlog_end_pos(offset);
signal_update(); signal_update();
if ((error= rotate(false, &check_purge))) if ((error= rotate(false, &check_purge)))
check_purge= false; check_purge= false;
...@@ -6664,6 +6701,9 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) ...@@ -6664,6 +6701,9 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
} }
offset= my_b_tell(&log_file); offset= my_b_tell(&log_file);
update_binlog_end_pos(offset);
/* /*
Take mutex to protect against a reader seeing partial writes of 64-bit Take mutex to protect against a reader seeing partial writes of 64-bit
offset on 32-bit CPUs. offset on 32-bit CPUs.
...@@ -6709,6 +6749,9 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name, ...@@ -6709,6 +6749,9 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name,
} }
offset= my_b_tell(&log_file); offset= my_b_tell(&log_file);
update_binlog_end_pos(offset);
/* /*
Take mutex to protect against a reader seeing partial writes of 64-bit Take mutex to protect against a reader seeing partial writes of 64-bit
offset on 32-bit CPUs. offset on 32-bit CPUs.
...@@ -7335,7 +7378,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -7335,7 +7378,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{ {
if (!current->error && if (!current->error &&
RUN_HOOK(binlog_storage, after_flush, RUN_HOOK(binlog_storage, after_flush,
(current->thd, log_file_name, (current->thd,
current->cache_mngr->last_commit_pos_file,
current->cache_mngr->last_commit_pos_offset, synced))) current->cache_mngr->last_commit_pos_offset, synced)))
{ {
current->error= ER_ERROR_ON_WRITE; current->error= ER_ERROR_ON_WRITE;
...@@ -7347,6 +7391,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -7347,6 +7391,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
all_error= false; all_error= false;
} }
/* update binlog_end_pos so it can be read by dump thread
*
* note: must be _after_ the RUN_HOOK(after_flush) or else
* semi-sync-plugin might not have put the transaction into
* it's list before dump-thread tries to send it
*/
update_binlog_end_pos(commit_offset);
if (any_error) if (any_error)
sql_print_error("Failed to run 'after_flush' hooks"); sql_print_error("Failed to run 'after_flush' hooks");
if (!all_error) if (!all_error)
...@@ -7387,6 +7439,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -7387,6 +7439,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered"); DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
mysql_mutex_lock(&LOCK_commit_ordered); mysql_mutex_lock(&LOCK_commit_ordered);
/**
* TODO(jonaso): Check with Kristian,
* if we rotate:d above, this offset is "wrong"
*/
last_commit_pos_offset= commit_offset; last_commit_pos_offset= commit_offset;
/* /*
We cannot unlock LOCK_log until we have locked LOCK_commit_ordered; We cannot unlock LOCK_log until we have locked LOCK_commit_ordered;
...@@ -7625,6 +7681,7 @@ void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd) ...@@ -7625,6 +7681,7 @@ void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd)
PSI_stage_info old_stage; PSI_stage_info old_stage;
DBUG_ENTER("wait_for_update_relay_log"); DBUG_ENTER("wait_for_update_relay_log");
mysql_mutex_assert_owner(&LOCK_log);
thd->ENTER_COND(&update_cond, &LOCK_log, thd->ENTER_COND(&update_cond, &LOCK_log,
&stage_slave_has_read_all_relay_log, &stage_slave_has_read_all_relay_log,
&old_stage); &old_stage);
...@@ -7655,6 +7712,7 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, ...@@ -7655,6 +7712,7 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
int ret= 0; int ret= 0;
DBUG_ENTER("wait_for_update_bin_log"); DBUG_ENTER("wait_for_update_bin_log");
mysql_mutex_assert_owner(&LOCK_log);
if (!timeout) if (!timeout)
mysql_cond_wait(&update_cond, &LOCK_log); mysql_cond_wait(&update_cond, &LOCK_log);
else else
...@@ -7663,6 +7721,21 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, ...@@ -7663,6 +7721,21 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd,
struct timespec *timeout)
{
int ret= 0;
DBUG_ENTER("wait_for_update_binlog_end_pos");
mysql_mutex_assert_owner(get_binlog_end_pos_lock());
if (!timeout)
mysql_cond_wait(&update_cond, get_binlog_end_pos_lock());
else
ret= mysql_cond_timedwait(&update_cond, get_binlog_end_pos_lock(),
timeout);
DBUG_RETURN(ret);
}
/** /**
Close the log file. Close the log file.
...@@ -9703,6 +9776,14 @@ TC_LOG_BINLOG::set_status_variables(THD *thd) ...@@ -9703,6 +9776,14 @@ TC_LOG_BINLOG::set_status_variables(THD *thd)
} }
} }
void assert_LOCK_log_owner(bool owner)
{
if (owner)
mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
else
mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock());
}
struct st_mysql_storage_engine binlog_storage_engine= struct st_mysql_storage_engine binlog_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION }; { MYSQL_HANDLERTON_INTERFACE_VERSION };
......
...@@ -341,6 +341,8 @@ public: ...@@ -341,6 +341,8 @@ public:
/** Instrumentation key to use for file io in @c log_file */ /** Instrumentation key to use for file io in @c log_file */
PSI_file_key m_log_file_key; PSI_file_key m_log_file_key;
#endif #endif
/* for documentation of mutexes held in various places in code */
friend void assert_LOCK_log_owner(bool owner);
}; };
class MYSQL_QUERY_LOG: public MYSQL_LOG class MYSQL_QUERY_LOG: public MYSQL_LOG
...@@ -425,6 +427,9 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -425,6 +427,9 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
PSI_file_key m_key_file_log_index; PSI_file_key m_key_file_log_index;
PSI_file_key m_key_COND_queue_busy; PSI_file_key m_key_COND_queue_busy;
/** The instrumentation key to use for @ LOCK_binlog_end_pos */
PSI_mutex_key m_key_LOCK_binlog_end_pos;
#endif #endif
struct group_commit_entry struct group_commit_entry
...@@ -477,6 +482,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -477,6 +482,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
/* LOCK_log and LOCK_index are inited by init_pthread_objects() */ /* LOCK_log and LOCK_index are inited by init_pthread_objects() */
mysql_mutex_t LOCK_index; mysql_mutex_t LOCK_index;
mysql_mutex_t LOCK_binlog_end_pos;
mysql_mutex_t LOCK_xid_list; mysql_mutex_t LOCK_xid_list;
mysql_cond_t COND_xid_list; mysql_cond_t COND_xid_list;
mysql_cond_t update_cond; mysql_cond_t update_cond;
...@@ -811,6 +817,67 @@ public: ...@@ -811,6 +817,67 @@ public:
int bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no); int bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no);
bool check_strict_gtid_sequence(uint32 domain_id, uint32 server_id, bool check_strict_gtid_sequence(uint32 domain_id, uint32 server_id,
uint64 seq_no); uint64 seq_no);
void update_binlog_end_pos(my_off_t pos)
{
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos);
lock_binlog_end_pos();
/**
* note: it would make more sense to assert(pos > binlog_end_pos)
* but there are two places triggered by mtr that has pos == binlog_end_pos
* i didn't investigate but accepted as it should do no harm
*/
DBUG_ASSERT(pos >= binlog_end_pos);
binlog_end_pos= pos;
signal_update();
unlock_binlog_end_pos();
}
/**
* used when opening new file, and binlog_end_pos moves backwards
*/
void reset_binlog_end_pos(const char file_name[FN_REFLEN], my_off_t pos)
{
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos);
lock_binlog_end_pos();
binlog_end_pos= pos;
strcpy(binlog_end_pos_file, file_name);
signal_update();
unlock_binlog_end_pos();
}
/*
It is called by the threads(e.g. dump thread) which want to read
log without LOCK_log protection.
*/
my_off_t get_binlog_end_pos(char file_name_buf[FN_REFLEN]) const
{
mysql_mutex_assert_not_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_binlog_end_pos);
strcpy(file_name_buf, binlog_end_pos_file);
return binlog_end_pos;
}
void lock_binlog_end_pos() { mysql_mutex_lock(&LOCK_binlog_end_pos); }
void unlock_binlog_end_pos() { mysql_mutex_unlock(&LOCK_binlog_end_pos); }
mysql_mutex_t* get_binlog_end_pos_lock() { return &LOCK_binlog_end_pos; }
int wait_for_update_binlog_end_pos(THD* thd, struct timespec * timeout);
/*
Binlog position of end of the binlog.
Access to this is protected by LOCK_binlog_end_pos
The difference between this and last_commit_pos_{file,offset} is that
the commit position is updated later. If semi-sync wait point is set
to WAIT_AFTER_SYNC, the commit pos is update after semi-sync-ack has
been received and the end point is updated after the write as it's needed
for the dump threads to be able to semi-sync the event.
*/
my_off_t binlog_end_pos;
char binlog_end_pos_file[FN_REFLEN];
}; };
class Log_event_handler class Log_event_handler
...@@ -1088,4 +1155,6 @@ static inline TC_LOG *get_tc_log_implementation() ...@@ -1088,4 +1155,6 @@ static inline TC_LOG *get_tc_log_implementation()
return &tc_log_mmap; return &tc_log_mmap;
} }
void assert_LOCK_log_owner(bool owner);
#endif /* LOG_H */ #endif /* LOG_H */
...@@ -5167,9 +5167,18 @@ a file name for --log-bin-index option", opt_binlog_index_name); ...@@ -5167,9 +5167,18 @@ a file name for --log-bin-index option", opt_binlog_index_name);
unireg_abort(1); unireg_abort(1);
} }
if (opt_bin_log && mysql_bin_log.open(opt_bin_logname, LOG_BIN, 0, if (opt_bin_log)
{
/**
* mutex lock is not needed here.
* but to be able to have mysql_mutex_assert_owner() in code,
* we do it anyway */
mysql_mutex_lock(mysql_bin_log.get_log_lock());
if (mysql_bin_log.open(opt_bin_logname, LOG_BIN, 0,
WRITE_CACHE, max_binlog_size, 0, TRUE)) WRITE_CACHE, max_binlog_size, 0, TRUE))
unireg_abort(1); unireg_abort(1);
mysql_mutex_unlock(mysql_bin_log.get_log_lock());
}
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (opt_bin_log && expire_logs_days) if (opt_bin_log && expire_logs_days)
......
...@@ -125,8 +125,9 @@ struct binlog_send_info { ...@@ -125,8 +125,9 @@ struct binlog_send_info {
THD *thd; THD *thd;
NET *net; NET *net;
String *packet; String *packet;
char *log_file_name; char *const log_file_name; // ptr/alias to linfo.log_file_name
slave_connection_state *until_gtid_state; slave_connection_state *until_gtid_state;
slave_connection_state until_gtid_state_obj;
Format_description_log_event *fdev; Format_description_log_event *fdev;
int mariadb_slave_capability; int mariadb_slave_capability;
enum_gtid_skip_type gtid_skip_group; enum_gtid_skip_type gtid_skip_group;
...@@ -138,16 +139,57 @@ struct binlog_send_info { ...@@ -138,16 +139,57 @@ struct binlog_send_info {
bool slave_gtid_ignore_duplicates; bool slave_gtid_ignore_duplicates;
bool using_gtid_state; bool using_gtid_state;
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn) int error;
const char *errmsg;
char error_text[MAX_SLAVE_ERRMSG];
rpl_gtid error_gtid;
ulonglong heartbeat_period;
/** start file/pos as requested by slave, for error message */
char start_log_file_name[FN_REFLEN];
my_off_t start_pos;
/** last pos for error message */
my_off_t last_pos;
#ifndef DBUG_OFF
int left_events;
uint dbug_reconnect_counter;
ulong hb_info_counter;
#endif
bool clear_initial_log_pos;
bool should_stop;
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg), : thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
log_file_name(lfn), until_gtid_state(NULL), fdev(NULL), log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE), gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
slave_gtid_strict_mode(false), send_fake_gtid_list(false), slave_gtid_strict_mode(false), send_fake_gtid_list(false),
slave_gtid_ignore_duplicates(false) slave_gtid_ignore_duplicates(false),
{ } error(0),
errmsg("Unknown error"),
heartbeat_period(0),
#ifndef DBUG_OFF
left_events(max_binlog_dump_events),
dbug_reconnect_counter(0),
hb_info_counter(0),
#endif
clear_initial_log_pos(false),
should_stop(false)
{
error_text[0] = 0;
bzero(&error_gtid, sizeof(error_gtid));
}
}; };
// prototype
static int reset_transmit_packet(struct binlog_send_info *info, ushort flags,
ulong *ev_offset, const char **errmsg);
/* /*
fake_rotate_event() builds a fake (=which does not exist physically in any fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to binlog) Rotate event, which contains the name of the binlog we are going to
...@@ -170,6 +212,7 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position, ...@@ -170,6 +212,7 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position,
const char** errmsg, uint8 checksum_alg_arg) const char** errmsg, uint8 checksum_alg_arg)
{ {
DBUG_ENTER("fake_rotate_event"); DBUG_ENTER("fake_rotate_event");
ulong ev_offset;
char buf[ROTATE_HEADER_LEN+100]; char buf[ROTATE_HEADER_LEN+100];
my_bool do_checksum; my_bool do_checksum;
int err; int err;
...@@ -178,10 +221,18 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position, ...@@ -178,10 +221,18 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position,
String *packet= info->packet; String *packet= info->packet;
ha_checksum crc; ha_checksum crc;
/* reset transmit packet for the fake rotate event below */
if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
DBUG_RETURN(1);
if ((err= fake_event_header(packet, ROTATE_EVENT, if ((err= fake_event_header(packet, ROTATE_EVENT,
ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc, ident_len + ROTATE_HEADER_LEN, &do_checksum,
&crc,
errmsg, checksum_alg_arg, 0))) errmsg, checksum_alg_arg, 0)))
{
info->error= ER_UNKNOWN_ERROR;
DBUG_RETURN(err); DBUG_RETURN(err);
}
int8store(buf+R_POS_OFFSET,position); int8store(buf+R_POS_OFFSET,position);
packet->append(buf, ROTATE_HEADER_LEN); packet->append(buf, ROTATE_HEADER_LEN);
...@@ -195,8 +246,10 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position, ...@@ -195,8 +246,10 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position,
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
(err= fake_event_write(info->net, packet, errmsg))) (err= fake_event_write(info->net, packet, errmsg)))
{
info->error= ER_UNKNOWN_ERROR;
DBUG_RETURN(err); DBUG_RETURN(err);
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -215,13 +268,17 @@ static int fake_gtid_list_event(binlog_send_info *info, ...@@ -215,13 +268,17 @@ static int fake_gtid_list_event(binlog_send_info *info,
str.length(0); str.length(0);
if (glev->to_packet(&str)) if (glev->to_packet(&str))
{ {
info->error= ER_UNKNOWN_ERROR;
*errmsg= "Failed due to out-of-memory writing Gtid_list event"; *errmsg= "Failed due to out-of-memory writing Gtid_list event";
return -1; return -1;
} }
if ((err= fake_event_header(packet, GTID_LIST_EVENT, if ((err= fake_event_header(packet, GTID_LIST_EVENT,
str.length(), &do_checksum, &crc, str.length(), &do_checksum, &crc,
errmsg, info->current_checksum_alg, current_pos))) errmsg, info->current_checksum_alg, current_pos)))
{
info->error= ER_UNKNOWN_ERROR;
return err; return err;
}
packet->append(str); packet->append(str);
if (do_checksum) if (do_checksum)
...@@ -231,7 +288,10 @@ static int fake_gtid_list_event(binlog_send_info *info, ...@@ -231,7 +288,10 @@ static int fake_gtid_list_event(binlog_send_info *info,
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
(err= fake_event_write(info->net, packet, errmsg))) (err= fake_event_write(info->net, packet, errmsg)))
{
info->error= ER_UNKNOWN_ERROR;
return err; return err;
}
return 0; return 0;
} }
...@@ -243,20 +303,20 @@ static int fake_gtid_list_event(binlog_send_info *info, ...@@ -243,20 +303,20 @@ static int fake_gtid_list_event(binlog_send_info *info,
This function allocates header bytes for event transmission, and This function allocates header bytes for event transmission, and
should be called before store the event data to the packet buffer. should be called before store the event data to the packet buffer.
*/ */
static int reset_transmit_packet(THD *thd, ushort flags, static int reset_transmit_packet(binlog_send_info *info, ushort flags,
ulong *ev_offset, const char **errmsg) ulong *ev_offset, const char **errmsg)
{ {
int ret= 0; int ret= 0;
String *packet= &thd->packet; String *packet= &info->thd->packet;
/* reserve and set default header */ /* reserve and set default header */
packet->length(0); packet->length(0);
packet->set("\0", 1, &my_charset_bin); packet->set("\0", 1, &my_charset_bin);
if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet))) if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet)))
{ {
info->error= ER_UNKNOWN_ERROR;
*errmsg= "Failed to run hook 'reserve_header'"; *errmsg= "Failed to run hook 'reserve_header'";
my_errno= ER_UNKNOWN_ERROR;
ret= 1; ret= 1;
} }
*ev_offset= packet->length(); *ev_offset= packet->length();
...@@ -556,36 +616,38 @@ bool purge_master_logs_before_date(THD* thd, time_t purge_time) ...@@ -556,36 +616,38 @@ bool purge_master_logs_before_date(THD* thd, time_t purge_time)
mysql_bin_log.purge_logs_before_date(purge_time)); mysql_bin_log.purge_logs_before_date(purge_time));
} }
int test_for_non_eof_log_read_errors(int error, const char **errmsg) void set_read_error(binlog_send_info *info, int error)
{ {
if (error == LOG_READ_EOF) if (error == LOG_READ_EOF)
return 0; {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return;
}
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
switch (error) { switch (error) {
case LOG_READ_BOGUS: case LOG_READ_BOGUS:
*errmsg = "bogus data in log event"; info->errmsg= "bogus data in log event";
break; break;
case LOG_READ_TOO_LARGE: case LOG_READ_TOO_LARGE:
*errmsg = "log event entry exceeded max_allowed_packet; \ info->errmsg= "log event entry exceeded max_allowed_packet; "
Increase max_allowed_packet on master"; "Increase max_allowed_packet on master";
break; break;
case LOG_READ_IO: case LOG_READ_IO:
*errmsg = "I/O error reading log event"; info->errmsg= "I/O error reading log event";
break; break;
case LOG_READ_MEM: case LOG_READ_MEM:
*errmsg = "memory allocation failed reading log event"; info->errmsg= "memory allocation failed reading log event";
break; break;
case LOG_READ_TRUNC: case LOG_READ_TRUNC:
*errmsg = "binlog truncated in the middle of event; consider out of disk space on master"; info->errmsg= "binlog truncated in the middle of event; "
"consider out of disk space on master";
break; break;
case LOG_READ_CHECKSUM_FAILURE: case LOG_READ_CHECKSUM_FAILURE:
*errmsg = "event read from binlog did not pass crc check"; info->errmsg= "event read from binlog did not pass crc check";
break; break;
default: default:
*errmsg = "unknown error reading log event on the master"; info->errmsg= "unknown error reading log event on the master";
break; break;
} }
return error;
} }
...@@ -710,11 +772,17 @@ get_slave_until_gtid(THD *thd, String *out_str) ...@@ -710,11 +772,17 @@ get_slave_until_gtid(THD *thd, String *out_str)
The error to send is serious and should force terminating The error to send is serious and should force terminating
the dump thread. the dump thread.
*/ */
static int send_heartbeat_event(NET* net, String* packet, static int send_heartbeat_event(binlog_send_info *info,
NET* net, String* packet,
const struct event_coordinates *coord, const struct event_coordinates *coord,
uint8 checksum_alg_arg) uint8 checksum_alg_arg)
{ {
DBUG_ENTER("send_heartbeat_event"); DBUG_ENTER("send_heartbeat_event");
ulong ev_offset;
if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
DBUG_RETURN(1);
char header[LOG_EVENT_HEADER_LEN]; char header[LOG_EVENT_HEADER_LEN];
my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
...@@ -753,8 +821,10 @@ static int send_heartbeat_event(NET* net, String* packet, ...@@ -753,8 +821,10 @@ static int send_heartbeat_event(NET* net, String* packet,
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) || if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
net_flush(net)) net_flush(net))
{ {
info->error= ER_UNKNOWN_ERROR;
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1552,7 +1622,7 @@ is_until_reached(binlog_send_info *info, ulong *ev_offset, ...@@ -1552,7 +1622,7 @@ is_until_reached(binlog_send_info *info, ulong *ev_offset,
Send a last fake Gtid_list_log_event with a flag set to mark that we Send a last fake Gtid_list_log_event with a flag set to mark that we
stop due to UNTIL condition. stop due to UNTIL condition.
*/ */
if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg)) if (reset_transmit_packet(info, info->flags, ev_offset, errmsg))
return true; return true;
Gtid_list_log_event glev(&info->until_binlog_state, Gtid_list_log_event glev(&info->until_binlog_state,
Gtid_list_log_event::FLAG_UNTIL_REACHED); Gtid_list_log_event::FLAG_UNTIL_REACHED);
...@@ -1593,14 +1663,14 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1593,14 +1663,14 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
current_checksum_alg, current_checksum_alg,
&gtid_list, &list_len, info->fdev)) &gtid_list, &list_len, info->fdev))
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_list_log_event: corrupt binlog"; return "Failed to read Gtid_list_log_event: corrupt binlog";
} }
err= info->until_binlog_state.load(gtid_list, list_len); err= info->until_binlog_state.load(gtid_list, list_len);
my_free(gtid_list); my_free(gtid_list);
if (err) if (err)
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed in internal GTID book-keeping: Out of memory"; return "Failed in internal GTID book-keeping: Out of memory";
} }
} }
...@@ -1622,7 +1692,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1622,7 +1692,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
&event_gtid.domain_id, &event_gtid.server_id, &event_gtid.domain_id, &event_gtid.server_id,
&event_gtid.seq_no, &flags2, info->fdev)) &event_gtid.seq_no, &flags2, info->fdev))
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_log_event: corrupt binlog"; return "Failed to read Gtid_log_event: corrupt binlog";
} }
...@@ -1634,14 +1704,14 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1634,14 +1704,14 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
{ {
DBUG_SET("-d,gtid_force_reconnect_at_10_1_100"); DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100"); DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100");
my_errno= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
return "DBUG-injected forced reconnect"; return "DBUG-injected forced reconnect";
} }
}); });
if (info->until_binlog_state.update_nolock(&event_gtid, false)) if (info->until_binlog_state.update_nolock(&event_gtid, false))
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed in internal GTID book-keeping: Out of memory"; return "Failed in internal GTID book-keeping: Out of memory";
} }
...@@ -1663,7 +1733,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1663,7 +1733,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
const char *errormsg; const char *errormsg;
*error_gtid= *gtid; *error_gtid= *gtid;
give_error_start_pos_missing_in_binlog(&err, &errormsg, error_gtid); give_error_start_pos_missing_in_binlog(&err, &errormsg, error_gtid);
my_errno= err; info->error= err;
return errormsg; return errormsg;
} }
gtid_entry->flags&= ~(uint32)slave_connection_state::START_ON_EMPTY_DOMAIN; gtid_entry->flags&= ~(uint32)slave_connection_state::START_ON_EMPTY_DOMAIN;
...@@ -1687,7 +1757,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1687,7 +1757,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
exist, even though both the prior and subsequent seq_no exists exist, even though both the prior and subsequent seq_no exists
for same domain_id and server_id. for same domain_id and server_id.
*/ */
my_errno= ER_GTID_START_FROM_BINLOG_HOLE; info->error= ER_GTID_START_FROM_BINLOG_HOLE;
*error_gtid= *gtid; *error_gtid= *gtid;
return "The binlog on the master is missing the GTID requested " return "The binlog on the master is missing the GTID requested "
"by the slave (even though both a prior and a subsequent " "by the slave (even though both a prior and a subsequent "
...@@ -1812,7 +1882,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1812,7 +1882,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
*/ */
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to replace row annotate event with dummy: too small event."; return "Failed to replace row annotate event with dummy: too small event.";
} }
} }
...@@ -1834,7 +1904,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1834,7 +1904,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
current_checksum_alg); current_checksum_alg);
if (err) if (err)
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to replace GTID event with backwards-compatible event: " return "Failed to replace GTID event with backwards-compatible event: "
"currupt event."; "currupt event.";
} }
...@@ -1865,7 +1935,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1865,7 +1935,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
*/ */
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to replace binlog checkpoint or gtid list event with " return "Failed to replace binlog checkpoint or gtid list event with "
"dummy: too small event."; "dummy: too small event.";
} }
...@@ -1893,13 +1963,13 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1893,13 +1963,13 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
if (RUN_HOOK(binlog_transmit, before_send_event, if (RUN_HOOK(binlog_transmit, before_send_event,
(info->thd, info->flags, packet, info->log_file_name, pos))) (info->thd, info->flags, packet, info->log_file_name, pos)))
{ {
my_errno= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed"; return "run 'before_send_event' hook failed";
} }
if (my_net_write(info->net, (uchar*) packet->ptr(), len)) if (my_net_write(info->net, (uchar*) packet->ptr(), len))
{ {
my_errno= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
return "Failed on my_net_write()"; return "Failed on my_net_write()";
} }
...@@ -1908,7 +1978,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1908,7 +1978,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
{ {
if (send_file(info->thd)) if (send_file(info->thd))
{ {
my_errno= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
return "failed in send_file()"; return "failed in send_file()";
} }
} }
...@@ -1916,83 +1986,91 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, ...@@ -1916,83 +1986,91 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
if (RUN_HOOK(binlog_transmit, after_send_event, if (RUN_HOOK(binlog_transmit, after_send_event,
(info->thd, info->flags, packet))) (info->thd, info->flags, packet)))
{ {
my_errno= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'"; return "Failed to run hook 'after_send_event'";
} }
return NULL; /* Success */ return NULL; /* Success */
} }
static int check_start_offset(binlog_send_info *info,
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, const char *log_file_name,
ushort flags) my_off_t pos)
{ {
LOG_INFO linfo; IO_CACHE log;
char *log_file_name = linfo.log_file_name; File file= -1;
char search_file_name[FN_REFLEN], *name;
ulong ev_offset; /** check that requested position is inside of file */
if ((file=open_binlog(&log, log_file_name, &info->errmsg)) < 0)
{
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return 1;
}
IO_CACHE log; if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
File file = -1; {
String* const packet= &thd->packet; const char* msg= "Client requested master to start replication from "
"impossible position";
info->errmsg= NULL; // don't do further modifications of error_text
snprintf(info->error_text, sizeof(info->error_text),
"%s; the first event '%s' at %lld, "
"the last event read from '%s' at %d, "
"the last byte read from '%s' at %d.",
msg,
my_basename(info->start_log_file_name), pos,
my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE,
my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE);
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
err:
end_io_cache(&log);
mysql_file_close(file, MYF(MY_WME));
return info->error;
}
static int init_binlog_sender(binlog_send_info *info,
LOG_INFO *linfo,
const char *log_ident,
my_off_t *pos)
{
THD *thd= info->thd;
int error; int error;
const char *errmsg = "Unknown error", *tmp_msg;
char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
char str_buf[128]; char str_buf[128];
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
char str_buf2[128]; char str_buf2[128];
String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info); String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
slave_connection_state until_gtid_state_obj; connect_gtid_state.length(0);
rpl_gtid error_gtid;
binlog_send_info info(thd, packet, flags, log_file_name);
int old_max_allowed_packet= thd->variables.max_allowed_packet; /** save start file/pos that was requested by slave */
strmake(info->start_log_file_name, log_ident,
sizeof(info->start_log_file_name));
info->start_pos= *pos;
#ifndef DBUG_OFF /** init last pos */
int left_events = max_binlog_dump_events; info->last_pos= *pos;
uint dbug_reconnect_counter= 0;
#endif
DBUG_ENTER("mysql_binlog_send");
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log)); info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
bzero(&error_gtid, sizeof(error_gtid)); info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
/* info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
heartbeat_period from @master_heartbeat_period user variable DBUG_EXECUTE_IF("simulate_non_gtid_aware_master",
*/ info->using_gtid_state= false;);
ulonglong heartbeat_period= get_heartbeat_period(thd);
struct timespec heartbeat_buf;
struct timespec *heartbeat_ts= NULL;
const LOG_POS_COORD start_coord= { log_ident, pos },
*p_start_coord= &start_coord;
LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE },
*p_coord= &coord_buf;
if (heartbeat_period != 0)
{
heartbeat_ts= &heartbeat_buf;
set_timespec_nsec(*heartbeat_ts, 0);
}
info.mariadb_slave_capability= get_mariadb_slave_capability(thd);
connect_gtid_state.length(0); if (info->using_gtid_state)
info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;);
if (info.using_gtid_state)
{ {
info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd); info->slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd); info->slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
if(get_slave_until_gtid(thd, &slave_until_gtid_str)) if (get_slave_until_gtid(thd, &slave_until_gtid_str))
info.until_gtid_state= &until_gtid_state_obj; info->until_gtid_state= &info->until_gtid_state_obj;
} }
DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events", DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
{ {
DBUG_SET("-d,binlog_force_reconnect_after_22_events"); DBUG_SET("-d,binlog_force_reconnect_after_22_events");
DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events"); DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events");
dbug_reconnect_counter= 22; info->dbug_reconnect_counter= 22;
}); });
/* /*
...@@ -2008,76 +2086,69 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ...@@ -2008,76 +2086,69 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}); });
if (global_system_variables.log_warnings > 1) if (global_system_variables.log_warnings > 1)
sql_print_information("Start binlog_dump to slave_server(%lu), pos(%s, %lu)", sql_print_information(
thd->variables.server_id, log_ident, (ulong)pos); "Start binlog_dump to slave_server(%lu), pos(%s, %lu)",
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) thd->variables.server_id, log_ident, (ulong)*pos);
{
errmsg= "Failed to run hook 'transmit_start'";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
#ifndef DBUG_OFF #ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
{ {
errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover"; info->errmsg= "Master failed COM_BINLOG_DUMP to test if slave can recover";
my_errno= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
goto err; return 1;
} }
#endif #endif
if (!(info.fdev= new Format_description_log_event(3)))
{
errmsg= "Out of memory initializing format_description event";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
if (!mysql_bin_log.is_open()) if (!mysql_bin_log.is_open())
{ {
errmsg = "Binary log is not open"; info->errmsg= "Binary log is not open";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err; return 1;
} }
if (!server_id_supplied) if (!server_id_supplied)
{ {
errmsg = "Misconfigured master - server id was not set"; info->errmsg= "Misconfigured master - server id was not set";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err; return 1;
} }
name=search_file_name; char search_file_name[FN_REFLEN];
if (info.using_gtid_state) const char *name=search_file_name;
if (info->using_gtid_state)
{ {
if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(), if (info->gtid_state.load(connect_gtid_state.c_ptr_quick(),
connect_gtid_state.length())) connect_gtid_state.length()))
{ {
errmsg= "Out of memory or malformed slave request when obtaining start " info->errmsg= "Out of memory or malformed slave request when obtaining "
"position from GTID state"; "start position from GTID state";
my_errno= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
goto err; return 1;
} }
if (info.until_gtid_state && if (info->until_gtid_state &&
info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(), info->until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
slave_until_gtid_str.length())) slave_until_gtid_str.length()))
{ {
errmsg= "Out of memory or malformed slave request when obtaining UNTIL " info->errmsg= "Out of memory or malformed slave request when "
"position sent from slave"; "obtaining UNTIL position sent from slave";
my_errno= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
goto err; return 1;
} }
if ((error= check_slave_start_position(&info, &errmsg, &error_gtid))) if ((error= check_slave_start_position(info, &info->errmsg,
&info->error_gtid)))
{ {
my_errno= error; info->error= error;
goto err; return 1;
} }
if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name, if ((info->errmsg= gtid_find_binlog_file(&info->gtid_state,
info.until_gtid_state))) search_file_name,
info->until_gtid_state)))
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err; return 1;
} }
pos= 4;
/* start from beginning of binlog file */
*pos = 4;
} }
else else
{ {
...@@ -2086,153 +2157,157 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ...@@ -2086,153 +2157,157 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
else else
name=0; // Find first log name=0; // Find first log
} }
linfo->index_file_offset= 0;
linfo.index_file_offset = 0; if (mysql_bin_log.find_log_pos(linfo, name, 1))
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
{ {
errmsg = "Could not find first log file name in binary log index file"; info->errmsg= "Could not find first log file name in binary "
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; "log index file";
goto err; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return 1;
} }
// set current pos too
linfo->pos= *pos;
// note: publish that we use file, before we open it
mysql_mutex_lock(&LOCK_thread_count); mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = &linfo; thd->current_linfo= linfo;
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0) if (check_start_offset(info, linfo->log_file_name, *pos))
{ return 1;
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
{
errmsg= "Client requested master to start replication from \
impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
/* reset transmit packet for the fake rotate event below */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
/* if (*pos > BIN_LOG_HEADER_SIZE)
Tell the client about the log name with a fake Rotate event;
this is needed even if we also send a Format_description_log_event
just after, because that event does not contain the binlog's name.
Note that as this Rotate event is sent before
Format_description_log_event, the slave cannot have any info to
understand this event's format, so the header len of
Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
than other events except FORMAT_DESCRIPTION_EVENT).
Before 4.0.14 we called fake_rotate_event below only if (pos ==
BIN_LOG_HEADER_SIZE), because if this is false then the slave
already knows the binlog's name.
Since, we always call fake_rotate_event; if the slave already knew
the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
useless but does not harm much. It is nice for 3.23 (>=.58) slaves
which test Rotate events to see if the master is 4.0 (then they
choose to stop because they can't replicate 4.0); by always calling
fake_rotate_event we are sure that 3.23.58 and newer will detect the
problem as soon as replication starts (BUG#198).
Always calling fake_rotate_event makes sending of normal
(=from-binlog) Rotate events a priori unneeded, but it is not so
simple: the 2 Rotate events are not equivalent, the normal one is
before the Stop event, the fake one is after. If we don't send the
normal one, then the Stop event will be interpreted (by existing 4.0
slaves) as "the master stopped", which is wrong. So for safety,
given that we want minimum modification of 4.0, we send the normal
and fake Rotates.
*/
if (fake_rotate_event(&info, pos, &errmsg,
get_binlog_checksum_value_at_connect(thd)))
{ {
/* /*
This error code is not perfect, as fake_rotate_event() does not mark that first format descriptor with "log_pos=0", so the slave
read anything from the binlog; if it fails it's because of an should not increment master's binlog position
error in my_net_write(), fortunately it will say so in errmsg. (rli->group_master_log_pos)
*/ */
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->clear_initial_log_pos= true;
goto err;
} }
/* return 0;
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become }
this larger than the corresponding packet (query) sent
from client to master. /**
* send format descriptor event for one binlog file
*/ */
thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET; static int send_format_descriptor_event(binlog_send_info *info,
IO_CACHE *log,
LOG_INFO *linfo,
my_off_t start_pos)
{
int error;
ulong ev_offset;
THD *thd= info->thd;
String *packet= info->packet;
Log_event_type event_type;
/* /**
We can set log_lock now, it does not move (it's a member of * 1) reset fdev before each log-file
mysql_bin_log, and it's already inited, and it will be destroyed * 2) read first event, should be the format descriptor
only at shutdown). * 3) read second event, *might* be start encryption event
* if it's isn't, seek back to undo this read
*/ */
p_coord->pos= pos; // the first hb matches the slave's last seen value if (info->fdev != NULL)
log_lock= mysql_bin_log.get_log_lock(); delete info->fdev;
log_cond= mysql_bin_log.get_log_cond();
if (pos > BIN_LOG_HEADER_SIZE) if (!(info->fdev= new Format_description_log_event(3)))
{ {
/* reset transmit packet for the event read from binary log info->errmsg= "Out of memory initializing format_description event";
file */ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) return 1;
goto err; }
do
{
/* reset transmit packet for the event read from binary log file */
if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
break;
/* /*
Try to find a Format_description_log_event at the beginning of Try to find a Format_description_log_event at the beginning of
the binlog the binlog
*/ */
if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0))) info->last_pos= my_b_tell(log);
error= Log_event::read_log_event(log, packet, /* LOCK_log */ NULL,
info->current_checksum_alg);
linfo->pos= my_b_tell(log);
if (error)
{ {
set_read_error(info, error);
break;
}
event_type= (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
/* /*
The packet has offsets equal to the normal offsets in a The packet has offsets equal to the normal offsets in a
binlog event + ev_offset (the first ev_offset characters are binlog event + ev_offset (the first ev_offset characters are
the header (default \0)). the header (default \0)).
*/ */
DBUG_PRINT("info", DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d", ("Looked for a Format_description_log_event, "
(*packet)[EVENT_TYPE_OFFSET+ev_offset])); "found event type %d", (int)event_type));
if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
if (event_type != FORMAT_DESCRIPTION_EVENT)
{ {
Format_description_log_event *tmp; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
info->errmsg= "Failed to find format descriptor event in start of binlog";
sql_print_warning("Failed to find format descriptor event in "
"start of binlog: %s",
info->log_file_name);
break;
}
info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, info->current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset); packet->length() - ev_offset);
DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || DBUG_ASSERT(info->current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); info->current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
info->current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) && if (!is_slave_checksum_aware(thd) &&
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum " info->errmsg= "Slave can not handle replication events with the "
"that master is configured to log"; "checksum that master is configured to log";
sql_print_warning("Master is configured to log replication events " sql_print_warning("Master is configured to log replication events "
"with checksum, but will not send such events to " "with checksum, but will not send such events to "
"slaves that cannot process them"); "slaves that cannot process them");
goto err; break;
} }
Format_description_log_event *tmp;
if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
packet->length()-ev_offset, packet->length()-ev_offset,
info.fdev))) info->fdev)))
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Corrupt Format_description event found or out-of-memory"; info->errmsg= "Corrupt Format_description event found "
goto err; "or out-of-memory";
break;
} }
delete info.fdev; delete info->fdev;
info.fdev= tmp; info->fdev= tmp;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
if (info->clear_initial_log_pos)
{
info->clear_initial_log_pos= false;
/* /*
mark that this event with "log_pos=0", so the slave mark that this event with "log_pos=0", so the slave
should not increment master's binlog position should not increment master's binlog position
(rli->group_master_log_pos) (rli->group_master_log_pos)
*/ */
int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0); int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, (ulong) 0);
/* /*
if reconnect master sends FD event with `created' as 0 if reconnect master sends FD event with `created' as 0
to avoid destroying temp tables. to avoid destroying temp tables.
...@@ -2241,193 +2316,314 @@ impossible position"; ...@@ -2241,193 +2316,314 @@ impossible position";
ST_CREATED_OFFSET+ev_offset, (ulong) 0); ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* fix the checksum due to latest changes in header */ /* fix the checksum due to latest changes in header */
if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && if (info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
fix_checksum(packet, ev_offset); fix_checksum(packet, ev_offset);
}
/* send it */ /* send it */
if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length())) if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length()))
{ {
errmsg = "Failed on my_net_write()"; info->errmsg= "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
goto err; break;
} }
/* /** all done */
No need to save this event. We are only doing simple reads return 0;
(no real parsing of the events) so we don't need it. And so
we don't need the artificial Format_description_log_event of
3.23&4.x.
*/
}
}
else
{
if (test_for_non_eof_log_read_errors(error, &errmsg))
goto err;
/*
It's EOF, nothing to do, go on reading next events, the
Format_description_log_event will be found naturally if it is written.
*/
}
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
else
{
/* The Format_description_log_event event will be found naturally. */
}
/* } while (false);
Handle the case of START SLAVE UNTIL with an UNTIL condition already
fulfilled at the start position.
We will send one event, the format_description, and then stop. return 1;
}
static bool should_stop(binlog_send_info *info)
{
return
info->net->error ||
info->net->vio == NULL ||
info->thd->killed ||
info->error != 0 ||
info->should_stop;
}
/**
* wait for new events to enter binlog
* this function will send heartbeats while waiting if so configured
*/ */
if (info.until_gtid_state && info.until_gtid_state->count() == 0) static int wait_new_events(binlog_send_info *info, /* in */
info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE; LOG_INFO* linfo, /* in */
char binlog_end_pos_filename[], /* out */
my_off_t *end_pos_ptr) /* out */
{
int ret= 1;
PSI_stage_info old_stage;
/* seek to the requested position, to start the requested dump */ mysql_bin_log.lock_binlog_end_pos();
my_b_seek(&log, pos); // Seek will done on next read info->thd->ENTER_COND(mysql_bin_log.get_log_cond(),
mysql_bin_log.get_binlog_end_pos_lock(),
&stage_master_has_sent_all_binlog_to_slave,
&old_stage);
while (!info.net->error && info.net->vio != 0 && !thd->killed) while (!should_stop(info))
{
*end_pos_ptr= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
if (strcmp(linfo->log_file_name, binlog_end_pos_filename) != 0)
{ {
Log_event_type event_type= UNKNOWN_EVENT; /* there has been a log file switch, we don't need to wait */
killed_state killed; ret= 0;
break;
}
/* reset the transmit packet for the event read from binary log if (linfo->pos < *end_pos_ptr)
file */ {
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) /* there is data to read, we don't need to wait */
goto err; ret= 0;
break;
}
bool is_active_binlog= false; if (info->heartbeat_period)
while (!(killed= thd->killed) && {
!(error = Log_event::read_log_event(&log, packet, log_lock, struct timespec ts;
info.current_checksum_alg, set_timespec_nsec(ts, info->heartbeat_period);
log_file_name, ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, &ts);
&is_active_binlog))) if (ret == ETIMEDOUT || ret == ETIME)
{ {
struct event_coordinates coord = { linfo->log_file_name, linfo->pos };
#ifndef DBUG_OFF #ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--) const ulong hb_info_counter_limit = 3;
if (info->hb_info_counter < hb_info_counter_limit)
{ {
net_flush(info.net); sql_print_information("master sends heartbeat message %s:%llu",
errmsg = "Debugging binlog dump abort"; linfo->log_file_name, linfo->pos);
my_errno= ER_UNKNOWN_ERROR; info->hb_info_counter++;
goto err; if (info->hb_info_counter == hb_info_counter_limit)
sql_print_information("the rest of heartbeat info skipped ...");
} }
#endif #endif
/* mysql_bin_log.unlock_binlog_end_pos();
log's filename does not change while it's active ret= send_heartbeat_event(info,
*/ info->net, info->packet, &coord,
p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET); info->current_checksum_alg);
mysql_bin_log.lock_binlog_end_pos();
event_type= if (ret)
(Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
#ifdef ENABLED_DEBUG_SYNC
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{ {
if (event_type == XID_EVENT) ret= 1; // error
break;
}
/**
* re-read heartbeat period after each sent
*/
info->heartbeat_period= get_heartbeat_period(info->thd);
}
else if (ret != 0)
{ {
net_flush(info.net); ret= 1; // error
const char act[]= break;
"now "
"wait_for signal.continue";
DBUG_ASSERT(debug_sync_service);
DBUG_ASSERT(!debug_sync_set_action(thd,
STRING_WITH_LEN(act)));
const char act2[]=
"now "
"signal signal.continued";
DBUG_ASSERT(!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act2)));
} }
}); }
#endif else
if (event_type == FORMAT_DESCRIPTION_EVENT)
{ {
Format_description_log_event *tmp; ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, NULL);
if (ret != 0 && ret != ETIMEDOUT && ret != ETIME)
info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; ret= 1; // error
errmsg= "Slave can not handle replication events with the checksum " break;
"that master is configured to log"; }
sql_print_warning("Master is configured to log replication events " }
"with checksum, but will not send such events to "
"slaves that cannot process them");
goto err;
} }
if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, /* it releases the lock set in ENTER_COND */
packet->length()-ev_offset, info->thd->EXIT_COND(&old_stage);
info.fdev))) return ret;
}
/**
* get end pos of current log file, this function
* will wait if there is nothing available
*/
static my_off_t get_binlog_end_pos(binlog_send_info *info,
IO_CACHE* log,
LOG_INFO* linfo)
{
my_off_t log_pos= my_b_tell(log);
/**
* get current binlog end pos
*/
mysql_bin_log.lock_binlog_end_pos();
char binlog_end_pos_filename[FN_REFLEN];
my_off_t end_pos= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
mysql_bin_log.unlock_binlog_end_pos();
do
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; if (strcmp(binlog_end_pos_filename, linfo->log_file_name) != 0)
errmsg= "Corrupt Format_description event found or out-of-memory"; {
goto err; /**
} * this file is not active, since it's not written to again,
delete info.fdev; * it safe to check file length and use that as end_pos
info.fdev= tmp; */
end_pos= my_b_filelength(log);
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; if (log_pos == end_pos)
return 0; // already at end of file inactive file
else
return end_pos; // return size of inactive file
} }
else
#ifndef DBUG_OFF
if (dbug_reconnect_counter > 0)
{ {
--dbug_reconnect_counter; /**
if (dbug_reconnect_counter == 0) * this is the active file
*/
if (log_pos < end_pos)
{ {
errmsg= "DBUG-injected forced reconnect"; /**
my_errno= ER_UNKNOWN_ERROR; * there is data available to read
goto err; */
return end_pos;
} }
/**
* check if we should wait for more data
*/
if ((info->flags & BINLOG_DUMP_NON_BLOCK) ||
(info->thd->variables.server_id == 0))
{
info->should_stop= true;
return 0;
} }
#endif
if ((tmp_msg= send_event_to_slave(&info, event_type, &log, /**
ev_offset, &error_gtid))) * flush data before waiting
*/
if (net_flush(info->net))
{ {
errmsg= tmp_msg; info->errmsg= "failed on net_flush()";
goto err; info->error= ER_UNKNOWN_ERROR;
return 1;
}
if (wait_new_events(info, linfo, binlog_end_pos_filename, &end_pos))
return 1;
} }
if (unlikely(info.send_fake_gtid_list) && } while (!should_stop(info));
info.gtid_skip_group == GTID_SKIP_NOT)
return 0;
}
/**
* This function sends events from one binlog file
* but only up until end_pos
*
* return 0 - OK
* else NOK
*/
static int send_events(binlog_send_info *info,
IO_CACHE* log,
LOG_INFO* linfo,
my_off_t end_pos)
{
int error;
ulong ev_offset;
String *packet= info->packet;
linfo->pos= my_b_tell(log);
info->last_pos= my_b_tell(log);
while (linfo->pos < end_pos)
{ {
Gtid_list_log_event glev(&info.until_binlog_state, 0); if (should_stop(info))
return 0;
/* reset the transmit packet for the event read from binary log
file */
if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
return 1;
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) || info->last_pos= linfo->pos;
fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log))) error = Log_event::read_log_event(log, packet, /* LOCK_log */ NULL,
info->current_checksum_alg,
NULL, NULL);
linfo->pos= my_b_tell(log);
if (error)
{ {
my_errno= ER_UNKNOWN_ERROR; goto read_err;
goto err;
}
info.send_fake_gtid_list= false;
} }
if (info.until_gtid_state &&
is_until_reached(&info, &ev_offset, event_type, &errmsg, Log_event_type event_type=
my_b_tell(&log))) (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
#ifndef DBUG_OFF
if (info->dbug_reconnect_counter > 0)
{ {
if (errmsg) --info->dbug_reconnect_counter;
if (info->dbug_reconnect_counter == 0)
{ {
my_errno= ER_UNKNOWN_ERROR; info->errmsg= "DBUG-injected forced reconnect";
goto err; info->error= ER_UNKNOWN_ERROR;
return 1;
} }
goto end;
} }
#endif
#ifdef ENABLED_DEBUG_SYNC
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{ {
if (event_type == XID_EVENT) if (event_type == XID_EVENT)
{ {
net_flush(info.net); net_flush(info->net);
const char act[]=
"now "
"wait_for signal.continue";
DBUG_ASSERT(debug_sync_service);
DBUG_ASSERT(!debug_sync_set_action(
info->thd,
STRING_WITH_LEN(act)));
const char act2[]=
"now "
"signal signal.continued";
DBUG_ASSERT(!debug_sync_set_action(
info->thd,
STRING_WITH_LEN(act2)));
} }
}); });
#endif
if ((info->errmsg= send_event_to_slave(info, event_type, log,
ev_offset, &info->error_gtid)))
return 1;
if (unlikely(info->send_fake_gtid_list) &&
info->gtid_skip_group == GTID_SKIP_NOT)
{
Gtid_list_log_event glev(&info->until_binlog_state, 0);
if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg) ||
fake_gtid_list_event(info, &glev, &info->errmsg, my_b_tell(log)))
{
info->error= ER_UNKNOWN_ERROR;
return 1;
}
info->send_fake_gtid_list= false;
}
if (info->until_gtid_state &&
is_until_reached(info, &ev_offset, event_type, &info->errmsg,
my_b_tell(log)))
{
if (info->errmsg)
{
info->error= ER_UNKNOWN_ERROR;
return 1;
}
info->should_stop= true;
return 0;
}
/* Abort server before it sends the XID_EVENT */ /* Abort server before it sends the XID_EVENT */
DBUG_EXECUTE_IF("crash_before_send_xid", DBUG_EXECUTE_IF("crash_before_send_xid",
...@@ -2438,344 +2634,295 @@ impossible position"; ...@@ -2438,344 +2634,295 @@ impossible position";
DBUG_SUICIDE(); DBUG_SUICIDE();
} }
}); });
/* reset transmit packet for next loop */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
} }
if (killed)
goto end;
DBUG_EXECUTE_IF("wait_after_binlog_EOF", return 0;
{
const char act[]= "now wait_for signal.rotate_finished";
DBUG_ASSERT(!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act)));
};);
/* read_err:
TODO: now that we are logging the offset, check to make sure set_read_error(info, error);
the recorded offset and the actual match.
Guilhem 2003-06: this is not true if this master is a slave
<4.0.15 running with --log-slave-updates, because then log_pos may
be the offset in the-master-of-this-master's binlog.
*/
if (test_for_non_eof_log_read_errors(error, &errmsg))
goto err;
/* return 1;
We should only move to the next binlog when the last read event }
came from a already deactivated binlog.
*/ /**
if (!(flags & BINLOG_DUMP_NON_BLOCK) && is_active_binlog) * This function sends one binlog file to slave
{ *
/* * return 0 - OK
Block until there is more data in the log * 1 - NOK
*/ */
if (net_flush(info.net)) static int send_one_binlog_file(binlog_send_info *info,
IO_CACHE* log,
LOG_INFO* linfo,
my_off_t start_pos)
{
assert_LOCK_log_owner(false); // we don't have LOCK_log
/* seek to the requested position, to start the requested dump */
if (start_pos != BIN_LOG_HEADER_SIZE)
{ {
errmsg = "failed on net_flush()"; my_b_seek(log, start_pos);
my_errno= ER_UNKNOWN_ERROR; linfo->pos= start_pos;
goto err;
} }
/* while (!should_stop(info))
We may have missed the update broadcast from the log {
that has just happened, let's try to catch it if it did. /**
If we did not miss anything, we just wait for other threads * get end pos of current log file, this function
to signal us. * will wait if there is nothing available
*/ */
my_off_t end_pos= get_binlog_end_pos(info, log, linfo);
if (end_pos <= 1)
{ {
log.error=0; /** end of file or error */
bool read_packet = 0; return end_pos;
}
#ifndef DBUG_OFF /**
if (max_binlog_dump_events && !left_events--) * send events from current position up to end_pos
{ */
errmsg = "Debugging binlog dump abort"; if (send_events(info, log, linfo, end_pos))
my_errno= ER_UNKNOWN_ERROR; return 1;
goto err;
} }
#endif
/* reset the transmit packet for the event read from binary log return 1;
file */ }
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
/* void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
No one will update the log while we are reading ushort flags)
now, but we'll be quick and just read one record {
LOG_INFO linfo;
TODO: IO_CACHE log;
Add an counter that is incremented for each time we update the File file = -1;
binary log. We can avoid the following read if the counter String* const packet= &thd->packet;
has not been updated since last read.
*/
mysql_mutex_lock(log_lock); binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name);
switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0, binlog_send_info *info= &infoobj;
info.current_checksum_alg)) {
case 0:
/* we read successfully, so we'll need to send it to the slave */
mysql_mutex_unlock(log_lock);
read_packet = 1;
p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
event_type=
(Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
break;
case LOG_READ_EOF: int old_max_allowed_packet= thd->variables.max_allowed_packet;
{ thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
int ret;
ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log"));
/* For mysqlbinlog (mysqlbinlog.server_id==0). */
if (thd->variables.server_id==0)
{
mysql_mutex_unlock(log_lock);
goto end;
}
#ifndef DBUG_OFF DBUG_ENTER("mysql_binlog_send");
ulong hb_info_counter= 0; DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
#endif
PSI_stage_info old_stage; bzero((char*) &log,sizeof(log));
signal_cnt= mysql_bin_log.signal_cnt;
do if (init_binlog_sender(info, &linfo, log_ident, &pos))
{
if (heartbeat_period != 0)
{
DBUG_ASSERT(heartbeat_ts);
set_timespec_nsec(*heartbeat_ts, heartbeat_period);
}
thd->ENTER_COND(log_cond, log_lock,
&stage_master_has_sent_all_binlog_to_slave,
&old_stage);
if (thd->killed)
break;
ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
DBUG_ASSERT(ret == 0 || (heartbeat_period != 0));
if (ret == ETIMEDOUT || ret == ETIME)
{
#ifndef DBUG_OFF
if (hb_info_counter < 3)
{
sql_print_information("master sends heartbeat message");
hb_info_counter++;
if (hb_info_counter == 3)
sql_print_information("the rest of heartbeat info skipped ...");
}
#endif
/* reset transmit packet for the heartbeat event */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
{
thd->EXIT_COND(&old_stage);
goto err; goto err;
}
if (send_heartbeat_event(info.net, packet, p_coord, /*
info.current_checksum_alg)) run hook first when all check has been made that slave seems to
be requesting a reasonable position. i.e when transmit actually starts
*/
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
{ {
errmsg = "Failed on my_net_write()"; info->errmsg= "Failed to run hook 'transmit_start'";
my_errno= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
thd->EXIT_COND(&old_stage);
goto err; goto err;
} }
}
else
{
DBUG_PRINT("wait",("binary log received update or a broadcast signal caught"));
}
} while (signal_cnt == mysql_bin_log.signal_cnt);
thd->EXIT_COND(&old_stage);
}
break;
default: /*
mysql_mutex_unlock(log_lock); heartbeat_period from @master_heartbeat_period user variable
test_for_non_eof_log_read_errors(error, &errmsg); NOTE: this is initialized after transmit_start-hook so that
goto err; the hook can affect value of heartbeat period
} */
info->heartbeat_period= get_heartbeat_period(thd);
if (read_packet) while (!should_stop(info))
{ {
if ((tmp_msg= send_event_to_slave(&info, event_type, &log, /*
ev_offset, &error_gtid))) Tell the client about the log name with a fake Rotate event;
this is needed even if we also send a Format_description_log_event
just after, because that event does not contain the binlog's name.
Note that as this Rotate event is sent before
Format_description_log_event, the slave cannot have any info to
understand this event's format, so the header len of
Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
than other events except FORMAT_DESCRIPTION_EVENT).
Before 4.0.14 we called fake_rotate_event below only if (pos ==
BIN_LOG_HEADER_SIZE), because if this is false then the slave
already knows the binlog's name.
Since, we always call fake_rotate_event; if the slave already knew
the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
useless but does not harm much. It is nice for 3.23 (>=.58) slaves
which test Rotate events to see if the master is 4.0 (then they
choose to stop because they can't replicate 4.0); by always calling
fake_rotate_event we are sure that 3.23.58 and newer will detect the
problem as soon as replication starts (BUG#198).
Always calling fake_rotate_event makes sending of normal
(=from-binlog) Rotate events a priori unneeded, but it is not so
simple: the 2 Rotate events are not equivalent, the normal one is
before the Stop event, the fake one is after. If we don't send the
normal one, then the Stop event will be interpreted (by existing 4.0
slaves) as "the master stopped", which is wrong. So for safety,
given that we want minimum modification of 4.0, we send the normal
and fake Rotates.
*/
if (fake_rotate_event(info, pos, &info->errmsg, info->current_checksum_alg))
{ {
errmsg= tmp_msg; /*
This error code is not perfect, as fake_rotate_event() does not
read anything from the binlog; if it fails it's because of an
error in my_net_write(), fortunately it will say so in errmsg.
*/
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err; goto err;
} }
if (unlikely(info.send_fake_gtid_list)
&& info.gtid_skip_group == GTID_SKIP_NOT)
{
Gtid_list_log_event glev(&info.until_binlog_state, 0);
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) || if ((file=open_binlog(&log, linfo.log_file_name, &info->errmsg)) < 0)
fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
{ {
my_errno= ER_UNKNOWN_ERROR; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err; goto err;
} }
info.send_fake_gtid_list= false;
} if (send_format_descriptor_event(info, &log, &linfo, pos))
if (info.until_gtid_state &&
is_until_reached(&info, &ev_offset, event_type, &errmsg,
my_b_tell(&log)))
{
if (errmsg)
{ {
my_errno= ER_UNKNOWN_ERROR; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err; goto err;
} }
goto end;
}
}
log.error=0; /*
} We want to corrupt the first event that will be sent to the slave.
} But we do not want the corruption to happen early, eg. when client does
else BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to
set the real DBUG injection here.
*/
DBUG_EXECUTE_IF("corrupt_read_log_event_to_slave_set",
{ {
bool loop_breaker = 0; DBUG_SET("-d,corrupt_read_log_event_to_slave_set");
/* need this to break out of the for loop from switch */ DBUG_SET("+d,corrupt_read_log_event2");
});
/*
Handle the case of START SLAVE UNTIL with an UNTIL condition already
fulfilled at the start position.
We will send one event, the format_description, and then stop.
*/
if (info->until_gtid_state && info->until_gtid_state->count() == 0)
info->gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
if (send_one_binlog_file(info, &log, &linfo, pos))
break;
THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog); if (should_stop(info))
switch (mysql_bin_log.find_next_log(&linfo, 1)) {
case 0:
break; break;
case LOG_INFO_EOF:
if (mysql_bin_log.is_active(log_file_name)) DBUG_EXECUTE_IF("wait_after_binlog_EOF",
{
const char act[]= "now wait_for signal.rotate_finished";
DBUG_ASSERT(!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act)));
};);
THD_STAGE_INFO(thd,
stage_finished_reading_one_binlog_switching_to_next_binlog);
if (mysql_bin_log.find_next_log(&linfo, 1))
{ {
loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK); info->errmsg= "could not find next log";
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
break; break;
} }
default:
errmsg = "could not find next log";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
if (loop_breaker) /** start from start of next file */
break; pos= BIN_LOG_HEADER_SIZE;
/** close current cache/file */
end_io_cache(&log); end_io_cache(&log);
mysql_file_close(file, MYF(MY_WME)); mysql_file_close(file, MYF(MY_WME));
file= -1;
/* reset transmit packet for the possible fake rotate event */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
/*
Call fake_rotate_event() in case the previous log (the one which
we have just finished reading) did not contain a Rotate event
(for example (I don't know any other example) the previous log
was the last one before the master was shutdown & restarted).
This way we tell the slave about the new log's name and
position. If the binlog is 5.0, the next event we are going to
read and send is Format_description_log_event.
*/
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg,
info.current_checksum_alg))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
} }
p_coord->file_name= log_file_name; // reset to the next err:
} THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
} RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
end: const bool binlog_open = my_b_inited(&log);
if (file >= 0)
{
end_io_cache(&log); end_io_cache(&log);
mysql_file_close(file, MYF(MY_WME)); mysql_file_close(file, MYF(MY_WME));
}
RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
my_eof(thd);
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
mysql_mutex_lock(&LOCK_thread_count); mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0; thd->current_linfo = 0;
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet; thd->variables.max_allowed_packet= old_max_allowed_packet;
delete info.fdev; delete info->fdev;
DBUG_VOID_RETURN;
err: if (info->error == ER_MASTER_FATAL_ERROR_READING_BINLOG && binlog_open)
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log))
{ {
/* /*
detailing the fatal error message with coordinates detailing the fatal error message with coordinates
of the last position read. of the last position read.
*/ */
my_snprintf(error_text, sizeof(error_text), my_snprintf(info->error_text, sizeof(info->error_text),
"%s; the first event '%s' at %lld, " "%s; the first event '%s' at %lld, "
"the last event read from '%s' at %lld, " "the last event read from '%s' at %lld, "
"the last byte read from '%s' at %lld.", "the last byte read from '%s' at %lld.",
errmsg, info->errmsg,
my_basename(p_start_coord->file_name), p_start_coord->pos, my_basename(info->start_log_file_name), info->start_pos,
my_basename(p_coord->file_name), p_coord->pos, my_basename(info->log_file_name), info->last_pos,
my_basename(log_file_name), my_b_tell(&log)); my_basename(info->log_file_name), linfo.pos);
} }
else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG) else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
{ {
my_snprintf(error_text, sizeof(error_text), my_snprintf(info->error_text, sizeof(info->error_text),
"Error: connecting slave requested to start from GTID " "Error: connecting slave requested to start from GTID "
"%u-%u-%llu, which is not in the master's binlog", "%u-%u-%llu, which is not in the master's binlog",
error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); info->error_gtid.domain_id,
info->error_gtid.server_id,
info->error_gtid.seq_no);
/* Use this error code so slave will know not to try reconnect. */ /* Use this error code so slave will know not to try reconnect. */
my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
} }
else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2) else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2)
{ {
my_snprintf(error_text, sizeof(error_text), my_snprintf(info->error_text, sizeof(info->error_text),
"Error: connecting slave requested to start from GTID " "Error: connecting slave requested to start from GTID "
"%u-%u-%llu, which is not in the master's binlog. Since the " "%u-%u-%llu, which is not in the master's binlog. Since the "
"master's binlog contains GTIDs with higher sequence numbers, " "master's binlog contains GTIDs with higher sequence numbers, "
"it probably means that the slave has diverged due to " "it probably means that the slave has diverged due to "
"executing extra errorneous transactions", "executing extra errorneous transactions",
error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); info->error_gtid.domain_id,
info->error_gtid.server_id,
info->error_gtid.seq_no);
/* Use this error code so slave will know not to try reconnect. */ /* Use this error code so slave will know not to try reconnect. */
my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
} }
else if (my_errno == ER_GTID_START_FROM_BINLOG_HOLE) else if (info->error == ER_GTID_START_FROM_BINLOG_HOLE)
{ {
my_snprintf(error_text, sizeof(error_text), my_snprintf(info->error_text, sizeof(info->error_text),
"The binlog on the master is missing the GTID %u-%u-%llu " "The binlog on the master is missing the GTID %u-%u-%llu "
"requested by the slave (even though both a prior and a " "requested by the slave (even though both a prior and a "
"subsequent sequence number does exist), and GTID strict mode " "subsequent sequence number does exist), and GTID strict mode "
"is enabled", "is enabled",
error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); info->error_gtid.domain_id,
info->error_gtid.server_id,
info->error_gtid.seq_no);
/* Use this error code so slave will know not to try reconnect. */ /* Use this error code so slave will know not to try reconnect. */
my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
} }
else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE) else if (info->error == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
{ {
my_snprintf(error_text, sizeof(error_text), my_snprintf(info->error_text, sizeof(info->error_text),
"Failed to load replication slave GTID state from table %s.%s", "Failed to load replication slave GTID state from table %s.%s",
"mysql", rpl_gtid_slave_state_table_name.str); "mysql", rpl_gtid_slave_state_table_name.str);
my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
}
else if (info->error != 0 && info->errmsg != NULL)
strcpy(info->error_text, info->errmsg);
if (info->error == 0)
{
my_eof(thd);
} }
else else
strcpy(error_text, errmsg); {
end_io_cache(&log); my_message(info->error, info->error_text, MYF(0));
RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); }
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
thread list and update thd->current_linfo->index_file_offset
this mutex will make sure that it never tried to update our linfo
after we return from this stack frame
*/
mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
mysql_mutex_unlock(&LOCK_thread_count);
if (file >= 0)
mysql_file_close(file, MYF(MY_WME));
thd->variables.max_allowed_packet= old_max_allowed_packet;
delete info.fdev;
my_message(my_errno, error_text, MYF(0));
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment