Commit 1fb9105e authored by mats@romeo.(none)'s avatar mats@romeo.(none)

BUG#23171 (Illegal slave restart group position):

Second patch to fix skipping code. Moving relay and binary log 
position changing code from do_apply_event [old exec_event()] into
do_update_pos() and doing other changes necessary to support that.

Fixing a bug that can cause deadlock if rotating binary log when committing
a changes to a transactional table that is not inside a transaction and
cause a rotate log.
parent edb70453
...@@ -1453,7 +1453,7 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) ...@@ -1453,7 +1453,7 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
do nothing. do nothing.
just pretend we can do 2pc, so that MySQL won't just pretend we can do 2pc, so that MySQL won't
switch to 1pc. switch to 1pc.
real work will be done in MYSQL_BIN_LOG::log() real work will be done in TC_LOG_BINLOG::log()
*/ */
return 0; return 0;
} }
...@@ -1467,9 +1467,15 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) ...@@ -1467,9 +1467,15 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
IO_CACHE *trans_log= &trx_data->trans_log; IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_ASSERT(mysql_bin_log.is_open()); DBUG_ASSERT(mysql_bin_log.is_open());
if (all && trx_data->empty()) /*
The condition here has to be identical to the one inside
binlog_end_trans(), guarding the write of the transaction cache to
the binary log.
*/
if ((all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) &&
trx_data->empty())
{ {
// we're here because trans_log was flushed in MYSQL_BIN_LOG::log() // we're here because trans_log was flushed in TC_LOG_BINLOG::log()
trx_data->reset(); trx_data->reset();
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -3117,8 +3123,10 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock) ...@@ -3117,8 +3123,10 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock)
{ {
tc_log_page_waits++; tc_log_page_waits++;
pthread_mutex_lock(&LOCK_prep_xids); pthread_mutex_lock(&LOCK_prep_xids);
while (prepared_xids) while (prepared_xids) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids); pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids);
}
pthread_mutex_unlock(&LOCK_prep_xids); pthread_mutex_unlock(&LOCK_prep_xids);
} }
...@@ -4905,8 +4913,10 @@ void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) ...@@ -4905,8 +4913,10 @@ void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
{ {
pthread_mutex_lock(&LOCK_prep_xids); pthread_mutex_lock(&LOCK_prep_xids);
DBUG_ASSERT(prepared_xids > 0); DBUG_ASSERT(prepared_xids > 0);
if (--prepared_xids == 0) if (--prepared_xids == 0) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_signal(&COND_prep_xids); pthread_cond_signal(&COND_prep_xids);
}
pthread_mutex_unlock(&LOCK_prep_xids); pthread_mutex_unlock(&LOCK_prep_xids);
rotate_and_purge(0); // as ::write() did not rotate rotate_and_purge(0); // as ::write() did not rotate
} }
......
This diff is collapsed.
...@@ -1056,30 +1056,19 @@ void st_relay_log_info::cached_charset_invalidate() ...@@ -1056,30 +1056,19 @@ void st_relay_log_info::cached_charset_invalidate()
} }
bool st_relay_log_info::cached_charset_compare(char *charset) bool st_relay_log_info::cached_charset_compare(char *charset) const
{ {
DBUG_ENTER("st_relay_log_info::cached_charset_compare"); DBUG_ENTER("st_relay_log_info::cached_charset_compare");
if (bcmp(cached_charset, charset, sizeof(cached_charset))) if (bcmp(cached_charset, charset, sizeof(cached_charset)))
{ {
memcpy(cached_charset, charset, sizeof(cached_charset)); memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
void st_relay_log_info::transaction_end(THD* thd)
{
DBUG_ENTER("st_relay_log_info::transaction_end");
/*
Nothing to do here right now.
*/
DBUG_VOID_RETURN;
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
void st_relay_log_info::cleanup_context(THD *thd, bool error) void st_relay_log_info::cleanup_context(THD *thd, bool error)
{ {
......
...@@ -291,9 +291,7 @@ typedef struct st_relay_log_info ...@@ -291,9 +291,7 @@ typedef struct st_relay_log_info
When the 6 bytes are equal to 0 is used to mean "cache is invalidated". When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
*/ */
void cached_charset_invalidate(); void cached_charset_invalidate();
bool cached_charset_compare(char *charset); bool cached_charset_compare(char *charset) const;
void transaction_end(THD*);
void cleanup_context(THD *, bool); void cleanup_context(THD *, bool);
void clear_tables_to_lock() { void clear_tables_to_lock() {
......
...@@ -109,7 +109,7 @@ field_length_from_packed(enum_field_types const field_type, ...@@ -109,7 +109,7 @@ field_length_from_packed(enum_field_types const field_type,
*/ */
int int
table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table) table_def::compatible_with(RELAY_LOG_INFO const *rli_arg, TABLE *table)
const const
{ {
/* /*
...@@ -117,6 +117,7 @@ table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table) ...@@ -117,6 +117,7 @@ table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table)
*/ */
uint const cols_to_check= min(table->s->fields, size()); uint const cols_to_check= min(table->s->fields, size());
int error= 0; int error= 0;
RELAY_LOG_INFO const *rli= const_cast<RELAY_LOG_INFO*>(rli_arg);
TABLE_SHARE const *const tsh= table->s; TABLE_SHARE const *const tsh= table->s;
......
...@@ -116,7 +116,7 @@ public: ...@@ -116,7 +116,7 @@ public:
1 if the table definition is not compatible with 'table' 1 if the table definition is not compatible with 'table'
0 if the table definition is compatible with 'table' 0 if the table definition is compatible with 'table'
*/ */
int compatible_with(RELAY_LOG_INFO *rli, TABLE *table) const; int compatible_with(RELAY_LOG_INFO const *rli, TABLE *table) const;
private: private:
my_size_t m_size; // Number of elements in the types array my_size_t m_size; // Number of elements in the types array
......
...@@ -555,7 +555,7 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli) ...@@ -555,7 +555,7 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
void void
*/ */
void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli, void slave_print_msg(enum loglevel level, RELAY_LOG_INFO const *rli,
int err_code, const char* msg, ...) int err_code, const char* msg, ...)
{ {
void (*report_function)(const char *, ...); void (*report_function)(const char *, ...);
...@@ -577,9 +577,9 @@ void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli, ...@@ -577,9 +577,9 @@ void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli,
It's an error, it must be reported in Last_error and Last_errno in SHOW It's an error, it must be reported in Last_error and Last_errno in SHOW
SLAVE STATUS. SLAVE STATUS.
*/ */
pbuff= rli->last_slave_error; pbuff= const_cast<RELAY_LOG_INFO*>(rli)->last_slave_error;
pbuffsize= sizeof(rli->last_slave_error); pbuffsize= sizeof(rli->last_slave_error);
rli->last_slave_errno = err_code; const_cast<RELAY_LOG_INFO*>(rli)->last_slave_errno = err_code;
report_function= sql_print_error; report_function= sql_print_error;
break; break;
case WARNING_LEVEL: case WARNING_LEVEL:
...@@ -1376,7 +1376,7 @@ void set_slave_thread_options(THD* thd) ...@@ -1376,7 +1376,7 @@ void set_slave_thread_options(THD* thd)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli) void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO const *rli)
{ {
DBUG_ENTER("set_slave_thread_default_charset"); DBUG_ENTER("set_slave_thread_default_charset");
...@@ -1387,7 +1387,14 @@ void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli) ...@@ -1387,7 +1387,14 @@ void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli)
thd->variables.collation_server= thd->variables.collation_server=
global_system_variables.collation_server; global_system_variables.collation_server;
thd->update_charset(); thd->update_charset();
rli->cached_charset_invalidate();
/*
We use a const cast here since the conceptual (and externally
visible) behavior of the function is to set the default charset of
the thread. That the cache has to be invalidated is a secondary
effect.
*/
const_cast<RELAY_LOG_INFO*>(rli)->cached_charset_invalidate();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1607,7 +1614,8 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings) ...@@ -1607,7 +1614,8 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
} }
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error) int check_expected_error(THD* thd, RELAY_LOG_INFO const *rli,
int expected_error)
{ {
DBUG_ENTER("check_expected_error"); DBUG_ENTER("check_expected_error");
......
...@@ -164,9 +164,9 @@ bool show_master_info(THD* thd, MASTER_INFO* mi); ...@@ -164,9 +164,9 @@ bool show_master_info(THD* thd, MASTER_INFO* mi);
bool show_binlog_info(THD* thd); bool show_binlog_info(THD* thd);
const char *print_slave_db_safe(const char *db); const char *print_slave_db_safe(const char *db);
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int error_code); int check_expected_error(THD* thd, RELAY_LOG_INFO const *rli, int error_code);
void skip_load_data_infile(NET* net); void skip_load_data_infile(NET* net);
void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli, void slave_print_msg(enum loglevel level, RELAY_LOG_INFO const *rli,
int err_code, const char* msg, ...) int err_code, const char* msg, ...)
ATTRIBUTE_FORMAT(printf, 4, 5); ATTRIBUTE_FORMAT(printf, 4, 5);
...@@ -184,7 +184,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos, ...@@ -184,7 +184,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos,
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
const char** errmsg); const char** errmsg);
void set_slave_thread_options(THD* thd); void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli); void set_slave_thread_default_charset(THD *thd, RELAY_LOG_INFO const *rli);
void rotate_relay_log(MASTER_INFO* mi); void rotate_relay_log(MASTER_INFO* mi);
pthread_handler_t handle_slave_io(void *arg); pthread_handler_t handle_slave_io(void *arg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment