Commit 6db773a5 authored by Andrei Elkin's avatar Andrei Elkin

MDEV-17437 Semisync master fires invalid fd value assert

The semisync ack collector hits fd's out-of-bound value assert through
a stack of

/usr/sbin/mysqld(_ZN12Ack_receiver17get_slave_socketsEP6fd_setPj+0x70)[0x7fa3bbe27400]

/usr/sbin/mysqld(_ZN12Ack_receiver3runEv+0x540)[0x7fa3bbe27980]

/usr/sbin/mysqld(ack_receive_handler+0x19)[0x7fa3bbe27a79]

The reason of the failure must be the same as in https://bugs.mysql.com/bug.php?id=79865
whose fixes are applied with minor changes.

Specifically, the semisync ack thread is changed to use poll()
instead of select() on platforms where the former is defined.
On the systems that still use select(), Ack receive thread will generate
an error and semi sync will be switched off. Windows systems is exception
case because on windows this limitation does not exists.

The sustain manual testing with `mysqlslap --concurrency > 1024' in "background" while the slave io thread is restarting multiple times.
parent c29c39a7
...@@ -173,25 +173,6 @@ inline void Ack_receiver::wait_for_slave_connection() ...@@ -173,25 +173,6 @@ inline void Ack_receiver::wait_for_slave_connection()
mysql_cond_wait(&m_cond, &m_mutex); mysql_cond_wait(&m_cond, &m_mutex);
} }
my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count)
{
my_socket max_fd= INVALID_SOCKET;
Slave *slave;
I_List_iterator<Slave> it(m_slaves);
*count= 0;
FD_ZERO(fds);
while ((slave= it++))
{
(*count)++;
my_socket fd= slave->sock_fd();
max_fd= (fd > max_fd ? fd : max_fd);
FD_SET(fd, fds);
}
return max_fd;
}
/* Auxilary function to initialize a NET object with given net buffer. */ /* Auxilary function to initialize a NET object with given net buffer. */
static void init_net(NET *net, unsigned char *buff, unsigned int buff_len) static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
{ {
...@@ -208,14 +189,17 @@ void Ack_receiver::run() ...@@ -208,14 +189,17 @@ void Ack_receiver::run()
THD *thd= new THD(next_thread_id(), false, true); THD *thd= new THD(next_thread_id(), false, true);
NET net; NET net;
unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
fd_set read_fds;
my_socket max_fd= INVALID_SOCKET;
Slave *slave;
my_thread_init(); my_thread_init();
DBUG_ENTER("Ack_receiver::run"); DBUG_ENTER("Ack_receiver::run");
#ifdef HAVE_POLL
Poll_socket_listener listener(m_slaves);
#else
Select_socket_listener listener(m_slaves);
#endif //HAVE_POLL
sql_print_information("Starting ack receiver thread"); sql_print_information("Starting ack receiver thread");
thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND; thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
thd->thread_stack= (char*) &thd; thd->thread_stack= (char*) &thd;
...@@ -231,9 +215,9 @@ void Ack_receiver::run() ...@@ -231,9 +215,9 @@ void Ack_receiver::run()
while (1) while (1)
{ {
fd_set fds;
int ret; int ret;
uint slave_count; uint slave_count __attribute__((unused))= 0;
Slave *slave;
mysql_mutex_lock(&m_mutex); mysql_mutex_lock(&m_mutex);
if (unlikely(m_status == ST_STOPPING)) if (unlikely(m_status == ST_STOPPING))
...@@ -249,23 +233,25 @@ void Ack_receiver::run() ...@@ -249,23 +233,25 @@ void Ack_receiver::run()
continue; continue;
} }
max_fd= get_slave_sockets(&read_fds, &slave_count); if ((slave_count= listener.init_slave_sockets()) == 0)
goto end;
m_slaves_changed= false; m_slaves_changed= false;
#ifdef HAVE_POLL
DBUG_PRINT("info", ("fd count %u", slave_count));
#else
DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count,(int) max_fd)); DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count,(int) max_fd));
#endif
} }
struct timeval tv= {1, 0}; ret= listener.listen_on_sockets();
fds= read_fds;
/* select requires max fd + 1 for the first argument */
ret= select((int)(max_fd+1), &fds, NULL, NULL, &tv);
if (ret <= 0) if (ret <= 0)
{ {
mysql_mutex_unlock(&m_mutex); mysql_mutex_unlock(&m_mutex);
ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret); ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
if (ret == -1) if (ret == -1 && errno != EINTR)
sql_print_information("Failed to select() on semi-sync dump sockets, " sql_print_information("Failed to wait on semi-sync sockets, "
"error: errno=%d", socket_errno); "error: errno=%d", socket_errno);
/* Sleep 1us, so other threads can catch the m_mutex easily. */ /* Sleep 1us, so other threads can catch the m_mutex easily. */
my_sleep(1); my_sleep(1);
...@@ -273,11 +259,10 @@ void Ack_receiver::run() ...@@ -273,11 +259,10 @@ void Ack_receiver::run()
} }
set_stage_info(stage_reading_semi_sync_ack); set_stage_info(stage_reading_semi_sync_ack);
I_List_iterator<Slave> it(m_slaves); Slave_ilist_iterator it(m_slaves);
while ((slave= it++)) while ((slave= it++))
{ {
if (FD_ISSET(slave->sock_fd(), &fds)) if (listener.is_socket_active(slave))
{ {
ulong len; ulong len;
...@@ -289,7 +274,7 @@ void Ack_receiver::run() ...@@ -289,7 +274,7 @@ void Ack_receiver::run()
repl_semisync_master.report_reply_packet(slave->server_id(), repl_semisync_master.report_reply_packet(slave->server_id(),
net.read_pos, len); net.read_pos, len);
else if (net.last_errno == ER_NET_READ_ERROR) else if (net.last_errno == ER_NET_READ_ERROR)
FD_CLR(slave->sock_fd(), &read_fds); listener.clear_socket_info(slave);
} }
} }
mysql_mutex_unlock(&m_mutex); mysql_mutex_unlock(&m_mutex);
......
...@@ -20,6 +20,22 @@ ...@@ -20,6 +20,22 @@
#include "my_pthread.h" #include "my_pthread.h"
#include "sql_class.h" #include "sql_class.h"
#include "semisync.h" #include "semisync.h"
#include <vector>
struct Slave :public ilink
{
THD *thd;
Vio vio;
#ifdef HAVE_POLL
uint m_fds_index;
#endif
my_socket sock_fd() const { return vio.mysql_socket.fd; }
uint server_id() const { return thd->variables.server_id; }
};
typedef I_List<Slave> Slave_ilist;
typedef I_List_iterator<Slave> Slave_ilist_iterator;
/** /**
Ack_receiver is responsible to control ack receive thread and maintain Ack_receiver is responsible to control ack receive thread and maintain
slave information used by ack receive thread. slave information used by ack receive thread.
...@@ -92,18 +108,7 @@ class Ack_receiver : public Repl_semi_sync_base ...@@ -92,18 +108,7 @@ class Ack_receiver : public Repl_semi_sync_base
/* If slave list is updated(add or remove). */ /* If slave list is updated(add or remove). */
bool m_slaves_changed; bool m_slaves_changed;
class Slave :public ilink Slave_ilist m_slaves;
{
public:
THD *thd;
Vio vio;
my_socket sock_fd() { return vio.mysql_socket.fd; }
uint server_id() { return thd->variables.server_id; }
};
I_List<Slave> m_slaves;
pthread_t m_pid; pthread_t m_pid;
/* Declare them private, so no one can copy the object. */ /* Declare them private, so no one can copy the object. */
...@@ -112,8 +117,124 @@ class Ack_receiver : public Repl_semi_sync_base ...@@ -112,8 +117,124 @@ class Ack_receiver : public Repl_semi_sync_base
void set_stage_info(const PSI_stage_info &stage); void set_stage_info(const PSI_stage_info &stage);
void wait_for_slave_connection(); void wait_for_slave_connection();
my_socket get_slave_sockets(fd_set *fds, uint *count);
}; };
#ifdef HAVE_POLL
#include <sys/poll.h>
#include <vector>
class Poll_socket_listener
{
public:
Poll_socket_listener(const Slave_ilist &slaves)
:m_slaves(slaves)
{
}
bool listen_on_sockets()
{
return poll(m_fds.data(), m_fds.size(), 1000 /*1 Second timeout*/);
}
bool is_socket_active(const Slave *slave)
{
return m_fds[slave->m_fds_index].revents & POLLIN;
}
void clear_socket_info(const Slave *slave)
{
m_fds[slave->m_fds_index].fd= -1;
m_fds[slave->m_fds_index].events= 0;
}
uint init_slave_sockets()
{
Slave_ilist_iterator it(const_cast<Slave_ilist&>(m_slaves));
Slave *slave;
uint fds_index= 0;
m_fds.clear();
while ((slave= it++))
{
pollfd poll_fd;
poll_fd.fd= slave->sock_fd();
poll_fd.events= POLLIN;
m_fds.push_back(poll_fd);
slave->m_fds_index= fds_index++;
}
return fds_index;
}
private:
const Slave_ilist &m_slaves;
std::vector<pollfd> m_fds;
};
#else //NO POLL
class Select_socket_listener
{
public:
Select_socket_listener(const Slave_ilist &slaves)
:m_slaves(slaves), m_max_fd(INVALID_SOCKET)
{
}
bool listen_on_sockets()
{
/* Reinitialze the fds with active fds before calling select */
m_fds= m_init_fds;
struct timeval tv= {1,0};
/* select requires max fd + 1 for the first argument */
return select(m_max_fd+1, &m_fds, NULL, NULL, &tv);
}
bool is_socket_active(const Slave *slave)
{
return FD_ISSET(slave->sock_fd(), &m_fds);
}
void clear_socket_info(const Slave *slave)
{
FD_CLR(slave->sock_fd(), &m_init_fds);
}
uint init_slave_sockets()
{
Slave_ilist_iterator it(m_slaves);
Slave *slave;
uint fds_index= 0;
FD_ZERO(&m_init_fds);
while ((slave= it++))
{
my_socket socket_id= slave->sock_fd();
m_max_fd= (socket_id > m_max_fd ? socket_id : m_max_fd);
#ifndef WINDOWS
if (socket_id > FD_SETSIZE)
{
sql_print_error("Semisync slave socket fd is %u. "
"select() cannot handle if the socket fd is "
"greater than %u (FD_SETSIZE).", socket_id, FD_SETSIZE);
return 0;
}
#endif //WINDOWS
FD_SET(socket_id, &m_init_fds);
fds_index++;
}
return fds_index;
}
my_socket get_max_fd() { return m_max_fd; }
private:
const Slave_ilist &m_slaves;
my_socket m_max_fd;
fd_set m_init_fds;
fd_set m_fds;
};
#endif //HAVE_POLL
extern Ack_receiver ack_receiver; extern Ack_receiver ack_receiver;
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment