Commit c5e25c8b authored by Monty's avatar Monty Committed by Sergei Golubchik

Added a separate lock for start/stop/reset slave.

This solves some possible dead locks when one calls stop slave while slave
is starting.
parent e65f667b
...@@ -115,14 +115,19 @@ my_hash_init2(HASH *hash, uint growth_size, CHARSET_INFO *charset, ...@@ -115,14 +115,19 @@ my_hash_init2(HASH *hash, uint growth_size, CHARSET_INFO *charset,
static inline void my_hash_free_elements(HASH *hash) static inline void my_hash_free_elements(HASH *hash)
{ {
uint records= hash->records;
/*
Set records to 0 early to guard against anyone looking at the structure
during the free process
*/
hash->records= 0;
if (hash->free) if (hash->free)
{ {
HASH_LINK *data=dynamic_element(&hash->array,0,HASH_LINK*); HASH_LINK *data=dynamic_element(&hash->array,0,HASH_LINK*);
HASH_LINK *end= data + hash->records; HASH_LINK *end= data + records;
while (data < end) while (data < end)
(*hash->free)((data++)->data); (*hash->free)((data++)->data);
} }
hash->records=0;
} }
...@@ -519,6 +524,9 @@ my_bool my_hash_insert(HASH *info, const uchar *record) ...@@ -519,6 +524,9 @@ my_bool my_hash_insert(HASH *info, const uchar *record)
The record with the same record ptr is removed. The record with the same record ptr is removed.
If there is a free-function it's called if record was found. If there is a free-function it's called if record was found.
hash->free() is guarantee to be called only after the row has been
deleted from the hash and the hash can be reused by other threads.
@return @return
@retval 0 ok @retval 0 ok
@retval 1 Record not found @retval 1 Record not found
......
...@@ -863,7 +863,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, ...@@ -863,7 +863,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_system_variables_hash, key_LOCK_thd_data, key_LOCK_system_variables_hash, key_LOCK_thd_data,
key_LOCK_user_conn, key_LOCK_uuid_short_generator, key_LOG_LOCK_log, key_LOCK_user_conn, key_LOCK_uuid_short_generator, key_LOG_LOCK_log,
key_master_info_data_lock, key_master_info_run_lock, key_master_info_data_lock, key_master_info_run_lock,
key_master_info_sleep_lock, key_master_info_sleep_lock, key_master_info_start_stop_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock, key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_rpl_group_info_sleep_lock, key_rpl_group_info_sleep_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock, key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
...@@ -933,6 +933,7 @@ static PSI_mutex_info all_server_mutexes[]= ...@@ -933,6 +933,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL}, { &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
{ &key_LOG_LOCK_log, "LOG::LOCK_log", 0}, { &key_LOG_LOCK_log, "LOG::LOCK_log", 0},
{ &key_master_info_data_lock, "Master_info::data_lock", 0}, { &key_master_info_data_lock, "Master_info::data_lock", 0},
{ &key_master_info_start_stop_lock, "Master_info::start_stop_lock", 0},
{ &key_master_info_run_lock, "Master_info::run_lock", 0}, { &key_master_info_run_lock, "Master_info::run_lock", 0},
{ &key_master_info_sleep_lock, "Master_info::sleep_lock", 0}, { &key_master_info_sleep_lock, "Master_info::sleep_lock", 0},
{ &key_mutex_slave_reporting_capability_err_lock, "Slave_reporting_capability::err_lock", 0}, { &key_mutex_slave_reporting_capability_err_lock, "Slave_reporting_capability::err_lock", 0},
......
...@@ -266,7 +266,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, ...@@ -266,7 +266,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_thd_data, key_LOCK_thd_data,
key_LOCK_user_conn, key_LOG_LOCK_log, key_LOCK_user_conn, key_LOG_LOCK_log,
key_master_info_data_lock, key_master_info_run_lock, key_master_info_data_lock, key_master_info_run_lock,
key_master_info_sleep_lock, key_master_info_sleep_lock, key_master_info_start_stop_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock, key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock, key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
key_rpl_group_info_sleep_lock, key_rpl_group_info_sleep_lock,
......
...@@ -80,6 +80,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, ...@@ -80,6 +80,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
bzero((char*) &file, sizeof(file)); bzero((char*) &file, sizeof(file));
mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_master_info_start_stop_lock, &start_stop_lock,
MY_MUTEX_INIT_SLOW);
mysql_mutex_setflags(&run_lock, MYF_NO_DEADLOCK_DETECTION); mysql_mutex_setflags(&run_lock, MYF_NO_DEADLOCK_DETECTION);
mysql_mutex_setflags(&data_lock, MYF_NO_DEADLOCK_DETECTION); mysql_mutex_setflags(&data_lock, MYF_NO_DEADLOCK_DETECTION);
mysql_mutex_init(key_master_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_master_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
...@@ -117,6 +119,7 @@ Master_info::~Master_info() ...@@ -117,6 +119,7 @@ Master_info::~Master_info()
mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&sleep_lock); mysql_mutex_destroy(&sleep_lock);
mysql_mutex_destroy(&start_stop_lock);
mysql_cond_destroy(&data_cond); mysql_cond_destroy(&data_cond);
mysql_cond_destroy(&start_cond); mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond); mysql_cond_destroy(&stop_cond);
...@@ -727,17 +730,28 @@ uchar *get_key_master_info(Master_info *mi, size_t *length, ...@@ -727,17 +730,28 @@ uchar *get_key_master_info(Master_info *mi, size_t *length,
return (uchar*) mi->cmp_connection_name.str; return (uchar*) mi->cmp_connection_name.str;
} }
/*
Delete a master info
Called from my_hash_delete(&master_info_hash)
Stops associated slave threads and frees master_info
*/
void free_key_master_info(Master_info *mi) void free_key_master_info(Master_info *mi)
{ {
DBUG_ENTER("free_key_master_info"); DBUG_ENTER("free_key_master_info");
mysql_mutex_unlock(&LOCK_active_mi);
/* Ensure that we are not in reset_slave while this is done */ /* Ensure that we are not in reset_slave while this is done */
lock_slave_threads(mi); mi->lock_slave_threads();
terminate_slave_threads(mi,SLAVE_FORCE_ALL); terminate_slave_threads(mi,SLAVE_FORCE_ALL);
/* We use 2 here instead of 1 just to make it easier when debugging */ /* We use 2 here instead of 1 just to make it easier when debugging */
mi->killed= 2; mi->killed= 2;
end_master_info(mi); end_master_info(mi);
unlock_slave_threads(mi); mi->unlock_slave_threads();
delete mi; delete mi;
mysql_mutex_lock(&LOCK_active_mi);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -904,6 +918,7 @@ Master_info_index::Master_info_index() ...@@ -904,6 +918,7 @@ Master_info_index::Master_info_index()
void Master_info_index::free_connections() void Master_info_index::free_connections()
{ {
mysql_mutex_assert_owner(&LOCK_active_mi);
my_hash_reset(&master_info_hash); my_hash_reset(&master_info_hash);
} }
...@@ -936,7 +951,6 @@ bool Master_info_index::init_all_master_info() ...@@ -936,7 +951,6 @@ bool Master_info_index::init_all_master_info()
File index_file_nr; File index_file_nr;
DBUG_ENTER("init_all_master_info"); DBUG_ENTER("init_all_master_info");
mysql_mutex_assert_owner(&LOCK_active_mi);
DBUG_ASSERT(master_info_index); DBUG_ASSERT(master_info_index);
if ((index_file_nr= my_open(index_file_name, if ((index_file_nr= my_open(index_file_name,
...@@ -984,7 +998,6 @@ bool Master_info_index::init_all_master_info() ...@@ -984,7 +998,6 @@ bool Master_info_index::init_all_master_info()
DBUG_RETURN(1); DBUG_RETURN(1);
} }
lock_slave_threads(mi);
init_thread_mask(&thread_mask,mi,0 /*not inverse*/); init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
create_logfile_name_with_suffix(buf_master_info_file, create_logfile_name_with_suffix(buf_master_info_file,
...@@ -999,6 +1012,7 @@ bool Master_info_index::init_all_master_info() ...@@ -999,6 +1012,7 @@ bool Master_info_index::init_all_master_info()
sql_print_information("Reading Master_info: '%s' Relay_info:'%s'", sql_print_information("Reading Master_info: '%s' Relay_info:'%s'",
buf_master_info_file, buf_relay_log_info_file); buf_master_info_file, buf_relay_log_info_file);
mi->lock_slave_threads();
if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file, if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file,
0, thread_mask)) 0, thread_mask))
{ {
...@@ -1012,14 +1026,14 @@ bool Master_info_index::init_all_master_info() ...@@ -1012,14 +1026,14 @@ bool Master_info_index::init_all_master_info()
if (master_info_index->add_master_info(mi, FALSE)) if (master_info_index->add_master_info(mi, FALSE))
DBUG_RETURN(1); DBUG_RETURN(1);
succ_num++; succ_num++;
unlock_slave_threads(mi); mi->unlock_slave_threads();
} }
else else
{ {
/* Master_info already in HASH */ /* Master_info already in HASH */
sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS), sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
(int) connection_name.length, connection_name.str); (int) connection_name.length, connection_name.str);
unlock_slave_threads(mi); mi->unlock_slave_threads();
delete mi; delete mi;
} }
continue; continue;
...@@ -1036,7 +1050,7 @@ bool Master_info_index::init_all_master_info() ...@@ -1036,7 +1050,7 @@ bool Master_info_index::init_all_master_info()
/* Master_info was already registered */ /* Master_info was already registered */
sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS), sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
(int) connection_name.length, connection_name.str); (int) connection_name.length, connection_name.str);
unlock_slave_threads(mi); mi->unlock_slave_threads();
delete mi; delete mi;
continue; continue;
} }
...@@ -1065,7 +1079,7 @@ bool Master_info_index::init_all_master_info() ...@@ -1065,7 +1079,7 @@ bool Master_info_index::init_all_master_info()
(int) connection_name.length, (int) connection_name.length,
connection_name.str); connection_name.str);
} }
unlock_slave_threads(mi); mi->unlock_slave_threads();
} }
} }
...@@ -1268,7 +1282,12 @@ bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg, ...@@ -1268,7 +1282,12 @@ bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg,
/* Add a Master_info class to Hash Table */ /* Add a Master_info class to Hash Table */
bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file) bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file)
{ {
if (!my_hash_insert(&master_info_hash, (uchar*) mi)) /*
We have to protect against shutdown to ensure we are not calling
my_hash_insert() while my_hash_free() is in progress
*/
if (unlikely(shutdown_in_progress) ||
!my_hash_insert(&master_info_hash, (uchar*) mi))
{ {
if (global_system_variables.log_warnings > 1) if (global_system_variables.log_warnings > 1)
sql_print_information("Added new Master_info '%.*s' to hash table", sql_print_information("Added new Master_info '%.*s' to hash table",
...@@ -1392,23 +1411,31 @@ bool give_error_if_slave_running(bool already_locked) ...@@ -1392,23 +1411,31 @@ bool give_error_if_slave_running(bool already_locked)
@return @return
0 No Slave SQL thread is running 0 No Slave SQL thread is running
# Number of slave SQL thread running # Number of slave SQL thread running
Note that during shutdown we return 1. This is needed to ensure we
don't try to resize thread pool during shutdown as during shutdown
master_info_hash may be freeing the hash and during that time
hash entries can't be accessed.
*/ */
uint any_slave_sql_running() uint any_slave_sql_running()
{ {
uint count= 0; uint count= 0;
HASH *hash;
DBUG_ENTER("any_slave_sql_running"); DBUG_ENTER("any_slave_sql_running");
mysql_mutex_lock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_active_mi);
if (likely(master_info_index)) // Not shutdown if (unlikely(shutdown_in_progress || !master_info_index))
{ {
HASH *hash= &master_info_index->master_info_hash; mysql_mutex_unlock(&LOCK_active_mi);
for (uint i= 0; i< hash->records; ++i) return 1;
{ }
Master_info *mi= (Master_info *)my_hash_element(hash, i); hash= &master_info_index->master_info_hash;
if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) for (uint i= 0; i< hash->records; ++i)
count++; {
} Master_info *mi= (Master_info *)my_hash_element(hash, i);
if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
count++;
} }
mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(count); DBUG_RETURN(count);
......
...@@ -81,6 +81,8 @@ class Master_info : public Slave_reporting_capability ...@@ -81,6 +81,8 @@ class Master_info : public Slave_reporting_capability
} }
void release(); void release();
void wait_until_free(); void wait_until_free();
void lock_slave_threads();
void unlock_slave_threads();
/* the variables below are needed because we can change masters on the fly */ /* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN+6]; /* Room for multi-*/ char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
...@@ -99,7 +101,7 @@ class Master_info : public Slave_reporting_capability ...@@ -99,7 +101,7 @@ class Master_info : public Slave_reporting_capability
File fd; // we keep the file open, so we need to remember the file pointer File fd; // we keep the file open, so we need to remember the file pointer
IO_CACHE file; IO_CACHE file;
mysql_mutex_t data_lock, run_lock, sleep_lock; mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock;
mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond; mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond;
THD *io_thd; THD *io_thd;
MYSQL* mysql; MYSQL* mysql;
......
...@@ -1378,10 +1378,24 @@ dealloc_gco(group_commit_orderer *gco) ...@@ -1378,10 +1378,24 @@ dealloc_gco(group_commit_orderer *gco)
my_free(gco); my_free(gco);
} }
/**
Change thread count for global parallel worker threads
@param pool parallel thread pool
@param new_count Number of threads to be in pool. 0 in shutdown
@param force Force thread count to new_count even if slave
threads are running
By default we don't resize pool of there are running threads.
However during shutdown we will always do it.
This is needed as any_slave_sql_running() returns 1 during shutdown
as we don't want to access master_info while
Master_info_index::free_connections are running.
*/
static int static int
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 new_count) uint32 new_count, bool force)
{ {
uint32 i; uint32 i;
rpl_parallel_thread **new_list= NULL; rpl_parallel_thread **new_list= NULL;
...@@ -1403,7 +1417,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, ...@@ -1403,7 +1417,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
If we are about to delete pool, do an extra check that there are no new If we are about to delete pool, do an extra check that there are no new
slave threads running since we marked pool busy slave threads running since we marked pool busy
*/ */
if (!new_count) if (!new_count && !force)
{ {
if (any_slave_sql_running()) if (any_slave_sql_running())
{ {
...@@ -1556,8 +1570,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, ...@@ -1556,8 +1570,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
int rpl_parallel_resize_pool_if_no_slaves(void) int rpl_parallel_resize_pool_if_no_slaves(void)
{ {
/* master_info_index is set to NULL on shutdown */ /* master_info_index is set to NULL on shutdown */
if (opt_slave_parallel_threads > 0 && !any_slave_sql_running() && if (opt_slave_parallel_threads > 0 && !any_slave_sql_running())
master_info_index)
return rpl_parallel_inactivate_pool(&global_rpl_thread_pool); return rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
return 0; return 0;
} }
...@@ -1567,7 +1580,8 @@ int ...@@ -1567,7 +1580,8 @@ int
rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
{ {
if (!pool->count) if (!pool->count)
return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads); return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads,
0);
return 0; return 0;
} }
...@@ -1575,7 +1589,7 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) ...@@ -1575,7 +1589,7 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
int int
rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool) rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
{ {
return rpl_parallel_change_thread_count(pool, 0); return rpl_parallel_change_thread_count(pool, 0, 0);
} }
...@@ -1854,7 +1868,7 @@ rpl_parallel_thread_pool::destroy() ...@@ -1854,7 +1868,7 @@ rpl_parallel_thread_pool::destroy()
{ {
if (!inited) if (!inited)
return; return;
rpl_parallel_change_thread_count(this, 0); rpl_parallel_change_thread_count(this, 0, 1);
mysql_mutex_destroy(&LOCK_rpl_thread_pool); mysql_mutex_destroy(&LOCK_rpl_thread_pool);
mysql_cond_destroy(&COND_rpl_thread_pool); mysql_cond_destroy(&COND_rpl_thread_pool);
inited= false; inited= false;
......
...@@ -228,16 +228,14 @@ void init_thread_mask(int* mask,Master_info* mi,bool inverse) ...@@ -228,16 +228,14 @@ void init_thread_mask(int* mask,Master_info* mi,bool inverse)
/* /*
lock_slave_threads() lock_slave_threads() against other threads doing STOP, START or RESET SLAVE
*/ */
void lock_slave_threads(Master_info* mi) void Master_info::lock_slave_threads()
{ {
DBUG_ENTER("lock_slave_threads"); DBUG_ENTER("lock_slave_threads");
mysql_mutex_lock(&start_stop_lock);
//TODO: see if we can do this without dual mutex
mysql_mutex_lock(&mi->run_lock);
mysql_mutex_lock(&mi->rli.run_lock);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -246,13 +244,10 @@ void lock_slave_threads(Master_info* mi) ...@@ -246,13 +244,10 @@ void lock_slave_threads(Master_info* mi)
unlock_slave_threads() unlock_slave_threads()
*/ */
void unlock_slave_threads(Master_info* mi) void Master_info::unlock_slave_threads()
{ {
DBUG_ENTER("unlock_slave_threads"); DBUG_ENTER("unlock_slave_threads");
mysql_mutex_unlock(&start_stop_lock);
//TODO: see if we can do this without dual mutex
mysql_mutex_unlock(&mi->rli.run_lock);
mysql_mutex_unlock(&mi->run_lock);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -374,7 +369,6 @@ int init_slave() ...@@ -374,7 +369,6 @@ int init_slave()
accepted. However bootstrap may conflict with us if it does START SLAVE. accepted. However bootstrap may conflict with us if it does START SLAVE.
So it's safer to take the lock. So it's safer to take the lock.
*/ */
mysql_mutex_lock(&LOCK_active_mi);
if (pthread_key_create(&RPL_MASTER_INFO, NULL)) if (pthread_key_create(&RPL_MASTER_INFO, NULL))
goto err; goto err;
...@@ -383,7 +377,6 @@ int init_slave() ...@@ -383,7 +377,6 @@ int init_slave()
if (!master_info_index || master_info_index->init_all_master_info()) if (!master_info_index || master_info_index->init_all_master_info())
{ {
sql_print_error("Failed to initialize multi master structures"); sql_print_error("Failed to initialize multi master structures");
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
if (!(active_mi= new Master_info(&default_master_connection_name, if (!(active_mi= new Master_info(&default_master_connection_name,
...@@ -441,7 +434,6 @@ int init_slave() ...@@ -441,7 +434,6 @@ int init_slave()
} }
end: end:
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(error); DBUG_RETURN(error);
err: err:
...@@ -6159,7 +6151,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, ...@@ -6159,7 +6151,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
suppress_warnings= 0; suppress_warnings= 0;
mi->report(ERROR_LEVEL, last_errno, NULL, mi->report(ERROR_LEVEL, last_errno, NULL,
"error %s to master '%s@%s:%d'" "error %s to master '%s@%s:%d'"
" - retry-time: %d retries: %lu message: %s", " - retry-time: %d maximum-retries: %lu message: %s",
(reconnect ? "reconnecting" : "connecting"), (reconnect ? "reconnecting" : "connecting"),
mi->user, mi->host, mi->port, mi->user, mi->host, mi->port,
mi->connect_retry, master_retry_count, mi->connect_retry, master_retry_count,
......
...@@ -219,8 +219,6 @@ void close_active_mi(); /* clean up slave threads data */ ...@@ -219,8 +219,6 @@ void close_active_mi(); /* clean up slave threads data */
void clear_until_condition(Relay_log_info* rli); void clear_until_condition(Relay_log_info* rli);
void clear_slave_error(Relay_log_info* rli); void clear_slave_error(Relay_log_info* rli);
void end_relay_log_info(Relay_log_info* rli); void end_relay_log_info(Relay_log_info* rli);
void lock_slave_threads(Master_info* mi);
void unlock_slave_threads(Master_info* mi);
void init_thread_mask(int* mask,Master_info* mi,bool inverse); void init_thread_mask(int* mask,Master_info* mi,bool inverse);
Format_description_log_event * Format_description_log_event *
read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos, read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
......
...@@ -2833,7 +2833,16 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) ...@@ -2833,7 +2833,16 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
relay_log_info_file, 0, relay_log_info_file, 0,
&mi->cmp_connection_name); &mi->cmp_connection_name);
lock_slave_threads(mi); // this allows us to cleanly read slave_running mi->lock_slave_threads();
if (mi->killed)
{
/* connection was deleted while we waited for lock_slave_threads */
mi->unlock_slave_threads();
my_error(WARN_NO_MASTER_INFO, mi->connection_name.length,
mi->connection_name.str);
DBUG_RETURN(-1);
}
// Get a mask of _stopped_ threads // Get a mask of _stopped_ threads
init_thread_mask(&thread_mask,mi,1 /* inverse */); init_thread_mask(&thread_mask,mi,1 /* inverse */);
...@@ -2968,7 +2977,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) ...@@ -2968,7 +2977,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
ER(ER_UNTIL_COND_IGNORED)); ER(ER_UNTIL_COND_IGNORED));
if (!slave_errno) if (!slave_errno)
slave_errno = start_slave_threads(0 /*no mutex */, slave_errno = start_slave_threads(1,
1 /* wait for start */, 1 /* wait for start */,
mi, mi,
master_info_file_tmp, master_info_file_tmp,
...@@ -2984,7 +2993,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) ...@@ -2984,7 +2993,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
} }
err: err:
unlock_slave_threads(mi); mi->unlock_slave_threads();
if (slave_errno) if (slave_errno)
{ {
...@@ -3024,8 +3033,12 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) ...@@ -3024,8 +3033,12 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
DBUG_RETURN(-1); DBUG_RETURN(-1);
THD_STAGE_INFO(thd, stage_killing_slave); THD_STAGE_INFO(thd, stage_killing_slave);
int thread_mask; int thread_mask;
lock_slave_threads(mi); mi->lock_slave_threads();
// Get a mask of _running_ threads /*
Get a mask of _running_ threads.
We don't have to test for mi->killed as the thread_mask will take care
of checking if threads exists
*/
init_thread_mask(&thread_mask,mi,0 /* not inverse*/); init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
/* /*
Below we will stop all running threads. Below we will stop all running threads.
...@@ -3038,8 +3051,7 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) ...@@ -3038,8 +3051,7 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
if (thread_mask) if (thread_mask)
{ {
slave_errno= terminate_slave_threads(mi,thread_mask, slave_errno= terminate_slave_threads(mi,thread_mask, 0 /* get lock */);
1 /*skip lock */);
} }
else else
{ {
...@@ -3048,7 +3060,8 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) ...@@ -3048,7 +3060,8 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING, push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
ER(ER_SLAVE_WAS_NOT_RUNNING)); ER(ER_SLAVE_WAS_NOT_RUNNING));
} }
unlock_slave_threads(mi);
mi->unlock_slave_threads();
if (slave_errno) if (slave_errno)
{ {
...@@ -3083,11 +3096,20 @@ int reset_slave(THD *thd, Master_info* mi) ...@@ -3083,11 +3096,20 @@ int reset_slave(THD *thd, Master_info* mi)
char relay_log_info_file_tmp[FN_REFLEN]; char relay_log_info_file_tmp[FN_REFLEN];
DBUG_ENTER("reset_slave"); DBUG_ENTER("reset_slave");
lock_slave_threads(mi); mi->lock_slave_threads();
if (mi->killed)
{
/* connection was deleted while we waited for lock_slave_threads */
mi->unlock_slave_threads();
my_error(WARN_NO_MASTER_INFO, mi->connection_name.length,
mi->connection_name.str);
DBUG_RETURN(-1);
}
init_thread_mask(&thread_mask,mi,0 /* not inverse */); init_thread_mask(&thread_mask,mi,0 /* not inverse */);
if (thread_mask) // We refuse if any slave thread is running if (thread_mask) // We refuse if any slave thread is running
{ {
unlock_slave_threads(mi); mi->unlock_slave_threads();
my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
mi->connection_name.str); mi->connection_name.str);
DBUG_RETURN(ER_SLAVE_MUST_STOP); DBUG_RETURN(ER_SLAVE_MUST_STOP);
...@@ -3152,7 +3174,7 @@ int reset_slave(THD *thd, Master_info* mi) ...@@ -3152,7 +3174,7 @@ int reset_slave(THD *thd, Master_info* mi)
RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err: err:
unlock_slave_threads(mi); mi->unlock_slave_threads();
if (error) if (error)
my_error(sql_errno, MYF(0), errmsg); my_error(sql_errno, MYF(0), errmsg);
DBUG_RETURN(error); DBUG_RETURN(error);
...@@ -3286,7 +3308,16 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) ...@@ -3286,7 +3308,16 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
lex_mi->port)) lex_mi->port))
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
lock_slave_threads(mi); mi->lock_slave_threads();
if (mi->killed)
{
/* connection was deleted while we waited for lock_slave_threads */
mi->unlock_slave_threads();
my_error(WARN_NO_MASTER_INFO, mi->connection_name.length,
mi->connection_name.str);
DBUG_RETURN(TRUE);
}
init_thread_mask(&thread_mask,mi,0 /*not inverse*/); init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
if (thread_mask) // We refuse if any slave thread is running if (thread_mask) // We refuse if any slave thread is running
{ {
...@@ -3593,7 +3624,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) ...@@ -3593,7 +3624,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
mysql_mutex_unlock(&mi->rli.data_lock); mysql_mutex_unlock(&mi->rli.data_lock);
err: err:
unlock_slave_threads(mi); mi->unlock_slave_threads();
if (ret == FALSE) if (ret == FALSE)
my_ok(thd); my_ok(thd);
DBUG_RETURN(ret); DBUG_RETURN(ret);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment