Commit 2b711d23 authored by Sergey Vojtovich's avatar Sergey Vojtovich

Adieu slave_list

slave_list was used to provide data for SHOW SLAVE HOSTS and
Slaves_connected status variable.

Introduced binlog_dump_thread_count which is exposed via Slaves_connected
(replaces slave_list.records).

Store Slave_info on THD and access it by iterating server_threads
(replaces slave_list).

Added:
THD::slave_info
binlog_dump_thread_count
show_slave_hosts_callback()

Removed:
slave_list
SLAVE_LIST_CHUNK
SLAVE_ERRMSG_SIZE
slave_list_key()
slave_info_free()
init_slave_list()
end_slave_list()
all_slave_list_mutexes
init_all_slave_list_mutexes()
key_LOCK_slave_list
LOCK_slave_list

Moved:
SLAVE_INFO -> Slave_info
register_slave() -> THD::register_slave()
unregister_slave() -> THD::unregister_slave()

Also removed redundant end_slave() from close_connections(): it is called
again soon afterwards by clean_up().

Pre-requisite for clean MDEV-18450 solution.
parent 68c765d3
......@@ -68,10 +68,6 @@ where name like "wait/synch/mutex/sql/LOCK_crypt";
count(name)
1
select count(name) from mutex_instances
where name like "wait/synch/mutex/sql/LOCK_slave_list";
count(name)
1
select count(name) from mutex_instances
where name like "wait/synch/mutex/sql/LOCK_active_mi";
count(name)
1
......
......@@ -70,9 +70,6 @@ select count(name) from mutex_instances
select count(name) from mutex_instances
where name like "wait/synch/mutex/sql/LOCK_crypt";
select count(name) from mutex_instances
where name like "wait/synch/mutex/sql/LOCK_slave_list";
select count(name) from mutex_instances
where name like "wait/synch/mutex/sql/LOCK_active_mi";
......
......@@ -705,7 +705,7 @@ mysql_mutex_t
LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create,
LOCK_crypt,
LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list,
LOCK_user_conn,
LOCK_connection_count, LOCK_error_messages, LOCK_slave_background;
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats;
......@@ -1722,7 +1722,6 @@ static void close_connections(void)
if (global_system_variables.log_warnings)
server_threads.iterate(warn_threads_still_active);
end_slave();
#ifdef WITH_WSREP
if (wsrep_inited == 1)
{
......@@ -1947,9 +1946,6 @@ static void clean_up(bool print_message)
free_global_index_stats();
delete_dynamic(&all_options); // This should be empty
free_all_rpl_filters();
#ifdef HAVE_REPLICATION
end_slave_list();
#endif
wsrep_thr_deinit();
my_uuid_end();
delete type_handler_data;
......@@ -4820,9 +4816,6 @@ static int init_server_components()
#endif
my_uuid_init((ulong) (my_rnd(&sql_rand))*12345,12345);
#ifdef HAVE_REPLICATION
init_slave_list();
#endif
wt_init();
/* Setup logs */
......@@ -7110,11 +7103,7 @@ static int show_slaves_connected(THD *thd, SHOW_VAR *var, char *buff)
var->type= SHOW_LONGLONG;
var->value= buff;
mysql_mutex_lock(&LOCK_slave_list);
*((longlong *)buff)= slave_list.records;
mysql_mutex_unlock(&LOCK_slave_list);
*((longlong*) buff)= uint32_t(binlog_dump_thread_count);
return 0;
}
......
......@@ -624,7 +624,7 @@ extern mysql_mutex_t
LOCK_item_func_sleep, LOCK_status,
LOCK_error_log, LOCK_delayed_insert, LOCK_short_uuid_generator,
LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count,
LOCK_slave_background;
......
......@@ -38,13 +38,21 @@
#include "log_event.h"
#include <mysql.h>
#define SLAVE_LIST_CHUNK 128
#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
struct Slave_info
{
uint32 server_id;
uint32 master_id;
char host[HOSTNAME_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
char user[USERNAME_LENGTH+1];
char password[MAX_PASSWORD_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
uint16 port;
};
Atomic_counter<uint32_t> binlog_dump_thread_count;
ulong rpl_status=RPL_NULL;
mysql_mutex_t LOCK_rpl_status;
HASH slave_list;
const char *rpl_role_type[] = {"MASTER","SLAVE",NullS};
TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"",
......@@ -81,33 +89,26 @@ void change_rpl_status(ulong from_status, ulong to_status)
errmsg= msg;\
goto err; \
}\
strmake(obj,(char*) p,len); \
::strmake(obj, (char*) p, len); \
p+= len; \
}\
void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
void THD::unregister_slave()
{
uint32 thd_server_id= thd->variables.server_id;
if (thd_server_id)
if (auto old_si= slave_info)
{
if (need_mutex)
mysql_mutex_lock(&LOCK_slave_list);
SLAVE_INFO* old_si;
if ((old_si = (SLAVE_INFO*)my_hash_search(&slave_list,
(uchar*)&thd_server_id, 4)) &&
(!only_mine || old_si->thd == thd))
my_hash_delete(&slave_list, (uchar*)old_si);
if (need_mutex)
mysql_mutex_unlock(&LOCK_slave_list);
mysql_mutex_lock(&LOCK_thd_data);
slave_info= 0;
mysql_mutex_unlock(&LOCK_thd_data);
delete old_si;
binlog_dump_thread_count--;
}
}
/**
Register slave in 'slave_list' hash table.
Register slave
@return
0 ok
......@@ -115,19 +116,18 @@ void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
1 Error. Error message sent to client
*/
int register_slave(THD* thd, uchar* packet, size_t packet_length)
int THD::register_slave(uchar *packet, size_t packet_length)
{
int res;
SLAVE_INFO *si;
Slave_info *si;
uchar *p= packet, *p_end= packet + packet_length;
const char *errmsg= "Wrong parameters to function register_slave";
if (check_access(thd, REPL_SLAVE_ACL, any_db, NULL, NULL, 0, 0))
if (check_access(this, REPL_SLAVE_ACL, any_db, NULL, NULL, 0, 0))
return 1;
if (!(si= new Slave_info))
return 1;
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
goto err2;
thd->variables.server_id= si->server_id= uint4korr(p);
variables.server_id= si->server_id= uint4korr(p);
p+= 4;
get_object(p,si->host, "Failed to register slave: too long 'report-host'");
get_object(p,si->user, "Failed to register slave: too long 'report-user'");
......@@ -146,77 +146,44 @@ int register_slave(THD* thd, uchar* packet, size_t packet_length)
p += 4;
if (!(si->master_id= uint4korr(p)))
si->master_id= global_system_variables.server_id;
si->thd= thd;
mysql_mutex_lock(&LOCK_slave_list);
unregister_slave(thd,0,0);
res= my_hash_insert(&slave_list, (uchar*) si);
mysql_mutex_unlock(&LOCK_slave_list);
return res;
binlog_dump_thread_count++;
unregister_slave();
mysql_mutex_lock(&LOCK_thd_data);
slave_info= si;
mysql_mutex_unlock(&LOCK_thd_data);
return 0;
err:
my_free(si);
delete si;
my_message(ER_UNKNOWN_ERROR, errmsg, MYF(0)); /* purecov: inspected */
err2:
return 1;
}
extern "C" uint32
*slave_list_key(SLAVE_INFO* si, size_t *len,
my_bool not_used __attribute__((unused)))
{
*len = 4;
return &si->server_id;
}
extern "C" void slave_info_free(void *s)
{
my_free(s);
}
#ifdef HAVE_PSI_INTERFACE
static PSI_mutex_key key_LOCK_slave_list;
static PSI_mutex_info all_slave_list_mutexes[]=
{
{ &key_LOCK_slave_list, "LOCK_slave_list", PSI_FLAG_GLOBAL}
};
static void init_all_slave_list_mutexes(void)
{
const char* category= "sql";
int count;
if (PSI_server == NULL)
return;
count= array_elements(all_slave_list_mutexes);
PSI_server->register_mutex(category, all_slave_list_mutexes, count);
}
#endif /* HAVE_PSI_INTERFACE */
void init_slave_list()
{
#ifdef HAVE_PSI_INTERFACE
init_all_slave_list_mutexes();
#endif
my_hash_init(&slave_list, system_charset_info, SLAVE_LIST_CHUNK, 0, 0,
(my_hash_get_key) slave_list_key,
(my_hash_free_key) slave_info_free, 0);
mysql_mutex_init(key_LOCK_slave_list, &LOCK_slave_list, MY_MUTEX_INIT_FAST);
}
void end_slave_list()
static my_bool show_slave_hosts_callback(THD *thd, Protocol *protocol)
{
/* No protection by a mutex needed as we are only called at shutdown */
if (my_hash_inited(&slave_list))
my_bool res= FALSE;
mysql_mutex_lock(&thd->LOCK_thd_data);
if (auto si= thd->slave_info)
{
my_hash_free(&slave_list);
mysql_mutex_destroy(&LOCK_slave_list);
protocol->prepare_for_resend();
protocol->store(si->server_id);
protocol->store(si->host, &my_charset_bin);
if (opt_show_slave_auth_info)
{
protocol->store(si->user, &my_charset_bin);
protocol->store(si->password, &my_charset_bin);
}
protocol->store((uint32) si->port);
protocol->store(si->master_id);
res= protocol->write();
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
return res;
}
/**
Execute a SHOW SLAVE HOSTS statement.
......@@ -258,28 +225,9 @@ bool show_slave_hosts(THD* thd)
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
mysql_mutex_lock(&LOCK_slave_list);
if (server_threads.iterate(show_slave_hosts_callback, protocol))
DBUG_RETURN(true);
for (uint i = 0; i < slave_list.records; ++i)
{
SLAVE_INFO* si = (SLAVE_INFO*) my_hash_element(&slave_list, i);
protocol->prepare_for_resend();
protocol->store((uint32) si->server_id);
protocol->store(si->host, &my_charset_bin);
if (opt_show_slave_auth_info)
{
protocol->store(si->user, &my_charset_bin);
protocol->store(si->password, &my_charset_bin);
}
protocol->store((uint32) si->port);
protocol->store((uint32) si->master_id);
if (protocol->write())
{
mysql_mutex_unlock(&LOCK_slave_list);
DBUG_RETURN(TRUE);
}
}
mysql_mutex_unlock(&LOCK_slave_list);
my_eof(thd);
DBUG_RETURN(FALSE);
}
......
......@@ -22,6 +22,7 @@
#include <my_sys.h>
#include "slave.h"
extern Atomic_counter<uint32_t> binlog_dump_thread_count;
typedef enum {RPL_AUTH_MASTER=0,RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE,
RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER,
RPL_RECOVERY_CAPTAIN,RPL_NULL /* inactive */,
......@@ -36,13 +37,7 @@ extern const char* rpl_role_type[], *rpl_status_type[];
void change_rpl_status(ulong from_status, ulong to_status);
int find_recovery_captain(THD* thd, MYSQL* mysql);
extern HASH slave_list;
bool show_slave_hosts(THD* thd);
void init_slave_list();
void end_slave_list();
int register_slave(THD* thd, uchar* packet, size_t packet_length);
void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
#endif /* HAVE_REPLICATION */
#endif /* REPL_FAILSAFE_INCLUDED */
......@@ -642,6 +642,10 @@ THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock)
tdc_hash_pins(0),
xid_hash_pins(0),
m_tmp_tables_locked(false)
#ifdef HAVE_REPLICATION
,
slave_info(0)
#endif
#ifdef WITH_WSREP
,
wsrep_applier(is_wsrep_applier),
......@@ -1562,6 +1566,9 @@ void THD::cleanup(void)
DBUG_ASSERT(!mdl_context.has_locks());
apc_target.destroy();
#ifdef HAVE_REPLICATION
unregister_slave();
#endif
cleanup_done=1;
DBUG_VOID_RETURN;
}
......
......@@ -89,6 +89,9 @@ class user_var_entry;
struct Trans_binlog_info;
class rpl_io_thread_info;
class rpl_sql_thread_info;
#ifdef HAVE_REPLICATION
struct Slave_info;
#endif
enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY, RNEXT_SAME };
enum enum_duplicates { DUP_ERROR, DUP_REPLACE, DUP_UPDATE };
......@@ -4809,6 +4812,13 @@ class THD: public THD_count, /* this must be first */
}
public:
#ifdef HAVE_REPLICATION
Slave_info *slave_info;
int register_slave(uchar *packet, size_t packet_length);
void unregister_slave();
#endif
inline ulong wsrep_binlog_format() const
{
return WSREP_BINLOG_FORMAT(variables.binlog_format);
......
......@@ -1654,7 +1654,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
case COM_REGISTER_SLAVE:
{
status_var_increment(thd->status_var.com_register_slave);
if (!register_slave(thd, (uchar*)packet, packet_length))
if (!thd->register_slave((uchar*) packet, packet_length))
my_ok(thd);
break;
}
......@@ -2094,7 +2094,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
general_log_print(thd, command, "Log: '%s' Pos: %lu", name, pos);
if (nlen < FN_REFLEN)
mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags);
unregister_slave(thd,1,1);
thd->unregister_slave();
/* fake COM_QUIT -- if we get here, the thread needs to terminate */
error = TRUE;
break;
......
......@@ -21,17 +21,6 @@
#ifdef HAVE_REPLICATION
#include "slave.h"
typedef struct st_slave_info
{
uint32 server_id;
uint32 master_id;
char host[HOSTNAME_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
char user[USERNAME_LENGTH+1];
char password[MAX_PASSWORD_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
uint16 port;
THD* thd;
} SLAVE_INFO;
struct slave_connection_state;
extern my_bool opt_show_slave_auth_info;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment