Commit 170e9e59 authored by unknown's avatar unknown

MDEV-5306: Missing locking around rpl_global_gtid_binlog_state

There were some places where insufficient locking between
parallel threads could cause invalid memory accesses and
possibly other grief.

This patch adds the missing locking, and moves the locking
into the struct rpl_binlog_state methods to make it easier
to see that proper locking is in place everywhere.
parent def3c98a
...@@ -59,7 +59,6 @@ wait/synch/mutex/sql/LOCK_open ...@@ -59,7 +59,6 @@ wait/synch/mutex/sql/LOCK_open
wait/synch/mutex/sql/LOCK_plugin wait/synch/mutex/sql/LOCK_plugin
wait/synch/mutex/sql/LOCK_prepared_stmt_count wait/synch/mutex/sql/LOCK_prepared_stmt_count
wait/synch/mutex/sql/LOCK_prepare_ordered wait/synch/mutex/sql/LOCK_prepare_ordered
wait/synch/mutex/sql/LOCK_rpl_gtid_state
wait/synch/mutex/sql/LOCK_rpl_status wait/synch/mutex/sql/LOCK_rpl_status
wait/synch/mutex/sql/LOCK_rpl_thread_pool wait/synch/mutex/sql/LOCK_rpl_thread_pool
wait/synch/mutex/sql/LOCK_server_started wait/synch/mutex/sql/LOCK_server_started
......
...@@ -5406,19 +5406,15 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, ...@@ -5406,19 +5406,15 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
gtid.domain_id= domain_id; gtid.domain_id= domain_id;
gtid.server_id= server_id; gtid.server_id= server_id;
gtid.seq_no= seq_no; gtid.seq_no= seq_no;
mysql_mutex_lock(&LOCK_rpl_gtid_state);
err= rpl_global_gtid_binlog_state.update(&gtid, opt_gtid_strict_mode); err= rpl_global_gtid_binlog_state.update(&gtid, opt_gtid_strict_mode);
mysql_mutex_unlock(&LOCK_rpl_gtid_state);
if (err && thd->stmt_da->sql_errno()==ER_GTID_STRICT_OUT_OF_ORDER) if (err && thd->stmt_da->sql_errno()==ER_GTID_STRICT_OUT_OF_ORDER)
errno= ER_GTID_STRICT_OUT_OF_ORDER; errno= ER_GTID_STRICT_OUT_OF_ORDER;
} }
else else
{ {
/* Allocate the next sequence number for the GTID. */ /* Allocate the next sequence number for the GTID. */
mysql_mutex_lock(&LOCK_rpl_gtid_state);
err= rpl_global_gtid_binlog_state.update_with_next_gtid(domain_id, err= rpl_global_gtid_binlog_state.update_with_next_gtid(domain_id,
server_id, &gtid); server_id, &gtid);
mysql_mutex_unlock(&LOCK_rpl_gtid_state);
seq_no= gtid.seq_no; seq_no= gtid.seq_no;
} }
if (err) if (err)
...@@ -5548,36 +5544,21 @@ MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) ...@@ -5548,36 +5544,21 @@ MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
bool bool
MYSQL_BIN_LOG::append_state_pos(String *str) MYSQL_BIN_LOG::append_state_pos(String *str)
{ {
bool err; return rpl_global_gtid_binlog_state.append_pos(str);
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
err= rpl_global_gtid_binlog_state.append_pos(str);
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return err;
} }
bool bool
MYSQL_BIN_LOG::append_state(String *str) MYSQL_BIN_LOG::append_state(String *str)
{ {
bool err; return rpl_global_gtid_binlog_state.append_state(str);
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
err= rpl_global_gtid_binlog_state.append_state(str);
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return err;
} }
bool bool
MYSQL_BIN_LOG::is_empty_state() MYSQL_BIN_LOG::is_empty_state()
{ {
bool res; return (rpl_global_gtid_binlog_state.count() == 0);
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
res= (rpl_global_gtid_binlog_state.count() == 0);
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return res;
} }
...@@ -5586,10 +5567,8 @@ MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id, ...@@ -5586,10 +5567,8 @@ MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id,
rpl_gtid *out_gtid) rpl_gtid *out_gtid)
{ {
rpl_gtid *gtid; rpl_gtid *gtid;
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id))) if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id)))
*out_gtid= *gtid; *out_gtid= *gtid;
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return gtid != NULL; return gtid != NULL;
} }
...@@ -5599,29 +5578,21 @@ MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id, ...@@ -5599,29 +5578,21 @@ MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id,
rpl_gtid *out_gtid) rpl_gtid *out_gtid)
{ {
rpl_gtid *found_gtid; rpl_gtid *found_gtid;
bool res= false;
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
if ((found_gtid= rpl_global_gtid_binlog_state.find_most_recent(domain_id))) if ((found_gtid= rpl_global_gtid_binlog_state.find_most_recent(domain_id)))
{ {
*out_gtid= *found_gtid; *out_gtid= *found_gtid;
res= true; return true;
} }
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return res; return false;
} }
int int
MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no) MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no)
{ {
int err; return rpl_global_gtid_binlog_state.bump_seq_no_if_needed(domain_id, seq_no);
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
err= rpl_global_gtid_binlog_state.bump_seq_no_if_needed(domain_id, seq_no);
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return err;
} }
...@@ -5629,13 +5600,8 @@ bool ...@@ -5629,13 +5600,8 @@ bool
MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, uint32 server_id, MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, uint32 server_id,
uint64 seq_no) uint64 seq_no)
{ {
bool err; return rpl_global_gtid_binlog_state.check_strict_sequence(domain_id,
server_id, seq_no);
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
err= rpl_global_gtid_binlog_state.check_strict_sequence(domain_id, server_id,
seq_no);
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return err;
} }
...@@ -9141,7 +9107,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, ...@@ -9141,7 +9107,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
gtid.domain_id= gev->domain_id; gtid.domain_id= gev->domain_id;
gtid.server_id= gev->server_id; gtid.server_id= gev->server_id;
gtid.seq_no= gev->seq_no; gtid.seq_no= gev->seq_no;
if (rpl_global_gtid_binlog_state.update(&gtid, false)) if (rpl_global_gtid_binlog_state.update_nolock(&gtid, false))
goto err2; goto err2;
} }
break; break;
......
...@@ -683,8 +683,6 @@ mysql_mutex_t ...@@ -683,8 +683,6 @@ mysql_mutex_t
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats, mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats; LOCK_global_table_stats, LOCK_global_index_stats;
mysql_mutex_t LOCK_rpl_gtid_state;
/** /**
The below lock protects access to two global server variables: The below lock protects access to two global server variables:
max_prepared_stmt_count and prepared_stmt_count. These variables max_prepared_stmt_count and prepared_stmt_count. These variables
...@@ -783,8 +781,6 @@ PSI_mutex_key key_LOCK_stats, ...@@ -783,8 +781,6 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_global_index_stats, key_LOCK_global_index_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit; key_LOCK_wakeup_ready, key_LOCK_wait_commit;
PSI_mutex_key key_LOCK_rpl_gtid_state;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered; PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
static PSI_mutex_info all_server_mutexes[]= static PSI_mutex_info all_server_mutexes[]=
...@@ -828,7 +824,6 @@ static PSI_mutex_info all_server_mutexes[]= ...@@ -828,7 +824,6 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0}, { &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
{ &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL},
{ &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0}, { &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0}, { &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL}, { &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
...@@ -1997,7 +1992,6 @@ static void clean_up_mutexes() ...@@ -1997,7 +1992,6 @@ static void clean_up_mutexes()
mysql_mutex_destroy(&LOCK_global_user_client_stats); mysql_mutex_destroy(&LOCK_global_user_client_stats);
mysql_mutex_destroy(&LOCK_global_table_stats); mysql_mutex_destroy(&LOCK_global_table_stats);
mysql_mutex_destroy(&LOCK_global_index_stats); mysql_mutex_destroy(&LOCK_global_index_stats);
mysql_mutex_destroy(&LOCK_rpl_gtid_state);
#ifdef HAVE_OPENSSL #ifdef HAVE_OPENSSL
mysql_mutex_destroy(&LOCK_des_key_file); mysql_mutex_destroy(&LOCK_des_key_file);
#ifndef HAVE_YASSL #ifndef HAVE_YASSL
...@@ -4155,8 +4149,6 @@ static int init_thread_environment() ...@@ -4155,8 +4149,6 @@ static int init_thread_environment()
&LOCK_global_table_stats, MY_MUTEX_INIT_FAST); &LOCK_global_table_stats, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_global_index_stats, mysql_mutex_init(key_LOCK_global_index_stats,
&LOCK_global_index_stats, MY_MUTEX_INIT_FAST); &LOCK_global_index_stats, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_rpl_gtid_state,
&LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered, mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered,
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL); mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL);
......
...@@ -258,8 +258,6 @@ extern PSI_mutex_key key_LOCK_stats, ...@@ -258,8 +258,6 @@ extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit; key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
extern PSI_mutex_key key_LOCK_rpl_gtid_state;
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave, key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock; key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock;
...@@ -353,7 +351,6 @@ extern mysql_mutex_t ...@@ -353,7 +351,6 @@ extern mysql_mutex_t
LOCK_slave_list, LOCK_active_mi, LOCK_manager, LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn, LOCK_global_system_variables, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count; LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count;
extern mysql_mutex_t LOCK_rpl_gtid_state;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count; extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
#ifdef HAVE_OPENSSL #ifdef HAVE_OPENSSL
extern mysql_mutex_t LOCK_des_key_file; extern mysql_mutex_t LOCK_des_key_file;
......
...@@ -838,7 +838,7 @@ rpl_binlog_state::rpl_binlog_state() ...@@ -838,7 +838,7 @@ rpl_binlog_state::rpl_binlog_state()
void void
rpl_binlog_state::reset() rpl_binlog_state::reset_nolock()
{ {
uint32 i; uint32 i;
...@@ -847,12 +847,22 @@ rpl_binlog_state::reset() ...@@ -847,12 +847,22 @@ rpl_binlog_state::reset()
my_hash_reset(&hash); my_hash_reset(&hash);
} }
void
rpl_binlog_state::reset()
{
mysql_mutex_lock(&LOCK_binlog_state);
reset_nolock();
mysql_mutex_unlock(&LOCK_binlog_state);
}
void rpl_binlog_state::free() void rpl_binlog_state::free()
{ {
if (initialized) if (initialized)
{ {
initialized= 0; initialized= 0;
reset(); reset_nolock();
my_hash_free(&hash); my_hash_free(&hash);
mysql_mutex_destroy(&LOCK_binlog_state); mysql_mutex_destroy(&LOCK_binlog_state);
} }
...@@ -863,14 +873,20 @@ bool ...@@ -863,14 +873,20 @@ bool
rpl_binlog_state::load(struct rpl_gtid *list, uint32 count) rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
{ {
uint32 i; uint32 i;
bool res= false;
reset(); mysql_mutex_lock(&LOCK_binlog_state);
reset_nolock();
for (i= 0; i < count; ++i) for (i= 0; i < count; ++i)
{ {
if (update(&(list[i]), false)) if (update_nolock(&(list[i]), false))
return true; {
res= true;
break;
}
} }
return false; mysql_mutex_unlock(&LOCK_binlog_state);
return res;
} }
...@@ -889,7 +905,7 @@ rpl_binlog_state::~rpl_binlog_state() ...@@ -889,7 +905,7 @@ rpl_binlog_state::~rpl_binlog_state()
Returns 0 for ok, 1 for error. Returns 0 for ok, 1 for error.
*/ */
int int
rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict) rpl_binlog_state::update_nolock(const struct rpl_gtid *gtid, bool strict)
{ {
element *elem; element *elem;
...@@ -908,7 +924,7 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict) ...@@ -908,7 +924,7 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
if (!elem->update_element(gtid)) if (!elem->update_element(gtid))
return 0; return 0;
} }
else if (!alloc_element(gtid)) else if (!alloc_element_nolock(gtid))
return 0; return 0;
my_error(ER_OUT_OF_RESOURCES, MYF(0)); my_error(ER_OUT_OF_RESOURCES, MYF(0));
...@@ -916,6 +932,17 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict) ...@@ -916,6 +932,17 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
} }
int
rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
{
int res;
mysql_mutex_lock(&LOCK_binlog_state);
res= update_nolock(gtid, strict);
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
}
/* /*
Fill in a new GTID, allocating next sequence number, and update state Fill in a new GTID, allocating next sequence number, and update state
accordingly. accordingly.
...@@ -925,25 +952,30 @@ rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id, ...@@ -925,25 +952,30 @@ rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
rpl_gtid *gtid) rpl_gtid *gtid)
{ {
element *elem; element *elem;
int res= 0;
gtid->domain_id= domain_id; gtid->domain_id= domain_id;
gtid->server_id= server_id; gtid->server_id= server_id;
mysql_mutex_lock(&LOCK_binlog_state);
if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0))) if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
{ {
gtid->seq_no= ++elem->seq_no_counter; gtid->seq_no= ++elem->seq_no_counter;
if (!elem->update_element(gtid)) if (!elem->update_element(gtid))
return 0; goto end;
} }
else else
{ {
gtid->seq_no= 1; gtid->seq_no= 1;
if (!alloc_element(gtid)) if (!alloc_element_nolock(gtid))
return 0; goto end;
} }
my_error(ER_OUT_OF_RESOURCES, MYF(0)); my_error(ER_OUT_OF_RESOURCES, MYF(0));
return 1; res= 1;
end:
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
} }
...@@ -989,7 +1021,7 @@ rpl_binlog_state::element::update_element(const rpl_gtid *gtid) ...@@ -989,7 +1021,7 @@ rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
int int
rpl_binlog_state::alloc_element(const rpl_gtid *gtid) rpl_binlog_state::alloc_element_nolock(const rpl_gtid *gtid)
{ {
element *elem; element *elem;
rpl_gtid *lookup_gtid; rpl_gtid *lookup_gtid;
...@@ -1033,7 +1065,9 @@ rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id, ...@@ -1033,7 +1065,9 @@ rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
uint64 seq_no) uint64 seq_no)
{ {
element *elem; element *elem;
bool res= 0;
mysql_mutex_lock(&LOCK_binlog_state);
if ((elem= (element *)my_hash_search(&hash, if ((elem= (element *)my_hash_search(&hash,
(const uchar *)(&domain_id), 0)) && (const uchar *)(&domain_id), 0)) &&
elem->last_gtid && elem->last_gtid->seq_no >= seq_no) elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
...@@ -1041,9 +1075,10 @@ rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id, ...@@ -1041,9 +1075,10 @@ rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no, my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
elem->last_gtid->domain_id, elem->last_gtid->server_id, elem->last_gtid->domain_id, elem->last_gtid->server_id,
elem->last_gtid->seq_no); elem->last_gtid->seq_no);
return 1; res= 1;
} }
return 0; mysql_mutex_unlock(&LOCK_binlog_state);
return res;
} }
...@@ -1058,17 +1093,23 @@ int ...@@ -1058,17 +1093,23 @@ int
rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no) rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
{ {
element *elem; element *elem;
int res;
mysql_mutex_lock(&LOCK_binlog_state);
if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0))) if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
{ {
if (elem->seq_no_counter < seq_no) if (elem->seq_no_counter < seq_no)
elem->seq_no_counter= seq_no; elem->seq_no_counter= seq_no;
return 0; res= 0;
goto end;
} }
/* We need to allocate a new, empty element to remember the next seq_no. */ /* We need to allocate a new, empty element to remember the next seq_no. */
if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)))) if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
return 1; {
res= 1;
goto end;
}
elem->domain_id= domain_id; elem->domain_id= domain_id;
my_hash_init(&elem->hash, &my_charset_bin, 32, my_hash_init(&elem->hash, &my_charset_bin, 32,
...@@ -1077,11 +1118,18 @@ rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no) ...@@ -1077,11 +1118,18 @@ rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
elem->last_gtid= NULL; elem->last_gtid= NULL;
elem->seq_no_counter= seq_no; elem->seq_no_counter= seq_no;
if (0 == my_hash_insert(&hash, (const uchar *)elem)) if (0 == my_hash_insert(&hash, (const uchar *)elem))
return 0; {
res= 0;
goto end;
}
my_hash_free(&elem->hash); my_hash_free(&elem->hash);
my_free(elem); my_free(elem);
return 1; res= 1;
end:
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
} }
...@@ -1097,7 +1145,9 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest) ...@@ -1097,7 +1145,9 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
{ {
ulong i, j; ulong i, j;
char buf[21]; char buf[21];
int res= 0;
mysql_mutex_lock(&LOCK_binlog_state);
for (i= 0; i < hash.records; ++i) for (i= 0; i < hash.records; ++i)
{ {
size_t res; size_t res;
...@@ -1122,11 +1172,16 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest) ...@@ -1122,11 +1172,16 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
longlong10_to_str(gtid->seq_no, buf, 10); longlong10_to_str(gtid->seq_no, buf, 10);
res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf); res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf);
if (res == (size_t) -1) if (res == (size_t) -1)
return 1; {
res= 1;
goto end;
}
} }
} }
return 0; end:
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
} }
...@@ -1137,26 +1192,31 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src) ...@@ -1137,26 +1192,31 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src)
char buf[10+1+10+1+20+1+1]; char buf[10+1+10+1+20+1+1];
char *p, *end; char *p, *end;
rpl_gtid gtid; rpl_gtid gtid;
int res= 0;
reset(); mysql_mutex_lock(&LOCK_binlog_state);
reset_nolock();
for (;;) for (;;)
{ {
size_t res= my_b_gets(src, buf, sizeof(buf)); size_t len= my_b_gets(src, buf, sizeof(buf));
if (!res) if (!len)
break; break;
p= buf; p= buf;
end= buf + res; end= buf + len;
if (gtid_parser_helper(&p, end, &gtid)) if (gtid_parser_helper(&p, end, &gtid) ||
return 1; update_nolock(&gtid, false))
if (update(&gtid, false)) {
return 1; res= 1;
break;
}
} }
return 0; mysql_mutex_unlock(&LOCK_binlog_state);
return res;
} }
rpl_gtid * rpl_gtid *
rpl_binlog_state::find(uint32 domain_id, uint32 server_id) rpl_binlog_state::find_nolock(uint32 domain_id, uint32 server_id)
{ {
element *elem; element *elem;
if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0))) if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
...@@ -1164,15 +1224,29 @@ rpl_binlog_state::find(uint32 domain_id, uint32 server_id) ...@@ -1164,15 +1224,29 @@ rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0); return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
} }
rpl_gtid *
rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
{
rpl_gtid *p;
mysql_mutex_lock(&LOCK_binlog_state);
p= find_nolock(domain_id, server_id);
mysql_mutex_unlock(&LOCK_binlog_state);
return p;
}
rpl_gtid * rpl_gtid *
rpl_binlog_state::find_most_recent(uint32 domain_id) rpl_binlog_state::find_most_recent(uint32 domain_id)
{ {
element *elem; element *elem;
rpl_gtid *gtid= NULL;
mysql_mutex_lock(&LOCK_binlog_state);
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0); elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (elem && elem->last_gtid) if (elem && elem->last_gtid)
return elem->last_gtid; gtid= elem->last_gtid;
return NULL; mysql_mutex_unlock(&LOCK_binlog_state);
return gtid;
} }
...@@ -1182,8 +1256,10 @@ rpl_binlog_state::count() ...@@ -1182,8 +1256,10 @@ rpl_binlog_state::count()
uint32 c= 0; uint32 c= 0;
uint32 i; uint32 i;
mysql_mutex_lock(&LOCK_binlog_state);
for (i= 0; i < hash.records; ++i) for (i= 0; i < hash.records; ++i)
c+= ((element *)my_hash_element(&hash, i))->hash.records; c+= ((element *)my_hash_element(&hash, i))->hash.records;
mysql_mutex_unlock(&LOCK_binlog_state);
return c; return c;
} }
...@@ -1193,7 +1269,9 @@ int ...@@ -1193,7 +1269,9 @@ int
rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
{ {
uint32 i, j, pos; uint32 i, j, pos;
int res= 0;
mysql_mutex_lock(&LOCK_binlog_state);
pos= 0; pos= 0;
for (i= 0; i < hash.records; ++i) for (i= 0; i < hash.records; ++i)
{ {
...@@ -1216,12 +1294,17 @@ rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) ...@@ -1216,12 +1294,17 @@ rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
gtid= e->last_gtid; gtid= e->last_gtid;
if (pos >= list_size) if (pos >= list_size)
return 1; {
res= 1;
goto end;
}
memcpy(&gtid_list[pos++], gtid, sizeof(*gtid)); memcpy(&gtid_list[pos++], gtid, sizeof(*gtid));
} }
} }
return 0; end:
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
} }
...@@ -1240,12 +1323,17 @@ rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) ...@@ -1240,12 +1323,17 @@ rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
{ {
uint32 i; uint32 i;
uint32 alloc_size, out_size; uint32 alloc_size, out_size;
int res= 0;
out_size= 0;
mysql_mutex_lock(&LOCK_binlog_state);
alloc_size= hash.records; alloc_size= hash.records;
if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid), if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid),
MYF(MY_WME)))) MYF(MY_WME))))
return 1; {
out_size= 0; res= 1;
goto end;
}
for (i= 0; i < alloc_size; ++i) for (i= 0; i < alloc_size; ++i)
{ {
element *e= (element *)my_hash_element(&hash, i); element *e= (element *)my_hash_element(&hash, i);
...@@ -1254,8 +1342,10 @@ rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) ...@@ -1254,8 +1342,10 @@ rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid)); memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
} }
end:
mysql_mutex_unlock(&LOCK_binlog_state);
*size= out_size; *size= out_size;
return 0; return res;
} }
...@@ -1265,6 +1355,7 @@ rpl_binlog_state::append_pos(String *str) ...@@ -1265,6 +1355,7 @@ rpl_binlog_state::append_pos(String *str)
uint32 i; uint32 i;
bool first= true; bool first= true;
mysql_mutex_lock(&LOCK_binlog_state);
for (i= 0; i < hash.records; ++i) for (i= 0; i < hash.records; ++i)
{ {
element *e= (element *)my_hash_element(&hash, i); element *e= (element *)my_hash_element(&hash, i);
...@@ -1272,6 +1363,7 @@ rpl_binlog_state::append_pos(String *str) ...@@ -1272,6 +1363,7 @@ rpl_binlog_state::append_pos(String *str)
rpl_slave_state_tostring_helper(str, e->last_gtid, &first)) rpl_slave_state_tostring_helper(str, e->last_gtid, &first))
return true; return true;
} }
mysql_mutex_unlock(&LOCK_binlog_state);
return false; return false;
} }
...@@ -1282,7 +1374,9 @@ rpl_binlog_state::append_state(String *str) ...@@ -1282,7 +1374,9 @@ rpl_binlog_state::append_state(String *str)
{ {
uint32 i, j; uint32 i, j;
bool first= true; bool first= true;
bool res= false;
mysql_mutex_lock(&LOCK_binlog_state);
for (i= 0; i < hash.records; ++i) for (i= 0; i < hash.records; ++i)
{ {
element *e= (element *)my_hash_element(&hash, i); element *e= (element *)my_hash_element(&hash, i);
...@@ -1304,11 +1398,16 @@ rpl_binlog_state::append_state(String *str) ...@@ -1304,11 +1398,16 @@ rpl_binlog_state::append_state(String *str)
gtid= e->last_gtid; gtid= e->last_gtid;
if (rpl_slave_state_tostring_helper(str, gtid, &first)) if (rpl_slave_state_tostring_helper(str, gtid, &first))
return true; {
res= true;
goto end;
}
} }
} }
return false; end:
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
} }
......
...@@ -146,13 +146,15 @@ struct rpl_binlog_state ...@@ -146,13 +146,15 @@ struct rpl_binlog_state
rpl_binlog_state(); rpl_binlog_state();
~rpl_binlog_state(); ~rpl_binlog_state();
void reset_nolock();
void reset(); void reset();
void free(); void free();
bool load(struct rpl_gtid *list, uint32 count); bool load(struct rpl_gtid *list, uint32 count);
int update_nolock(const struct rpl_gtid *gtid, bool strict);
int update(const struct rpl_gtid *gtid, bool strict); int update(const struct rpl_gtid *gtid, bool strict);
int update_with_next_gtid(uint32 domain_id, uint32 server_id, int update_with_next_gtid(uint32 domain_id, uint32 server_id,
rpl_gtid *gtid); rpl_gtid *gtid);
int alloc_element(const rpl_gtid *gtid); int alloc_element_nolock(const rpl_gtid *gtid);
bool check_strict_sequence(uint32 domain_id, uint32 server_id, uint64 seq_no); bool check_strict_sequence(uint32 domain_id, uint32 server_id, uint64 seq_no);
int bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no); int bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no);
int write_to_iocache(IO_CACHE *dest); int write_to_iocache(IO_CACHE *dest);
...@@ -162,6 +164,7 @@ struct rpl_binlog_state ...@@ -162,6 +164,7 @@ struct rpl_binlog_state
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
bool append_pos(String *str); bool append_pos(String *str);
bool append_state(String *str); bool append_state(String *str);
rpl_gtid *find_nolock(uint32 domain_id, uint32 server_id);
rpl_gtid *find(uint32 domain_id, uint32 server_id); rpl_gtid *find(uint32 domain_id, uint32 server_id);
rpl_gtid *find_most_recent(uint32 domain_id); rpl_gtid *find_most_recent(uint32 domain_id);
}; };
......
...@@ -1575,7 +1575,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, ...@@ -1575,7 +1575,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100", DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100",
{ {
rpl_gtid *dbug_gtid; rpl_gtid *dbug_gtid;
if ((dbug_gtid= until_binlog_state->find(10,1)) && if ((dbug_gtid= until_binlog_state->find_nolock(10,1)) &&
dbug_gtid->seq_no == 100) dbug_gtid->seq_no == 100)
{ {
DBUG_SET("-d,gtid_force_reconnect_at_10_1_100"); DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
...@@ -1585,7 +1585,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, ...@@ -1585,7 +1585,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
} }
}); });
if (until_binlog_state->update(&event_gtid, false)) if (until_binlog_state->update_nolock(&event_gtid, false))
{ {
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed in internal GTID book-keeping: Out of memory"; return "Failed in internal GTID book-keeping: Out of memory";
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment