skip updates with the same server id as self

 kill zombie COM_BINLOG_DUMP with the same server id on connect
parent cb4749d6
...@@ -41,6 +41,7 @@ char *sql_strdup(const char *str); ...@@ -41,6 +41,7 @@ char *sql_strdup(const char *str);
char *sql_strmake(const char *str,uint len); char *sql_strmake(const char *str,uint len);
gptr sql_memdup(const void * ptr,unsigned size); gptr sql_memdup(const void * ptr,unsigned size);
void sql_element_free(void *ptr); void sql_element_free(void *ptr);
void kill_one_thread(THD *thd, ulong id);
#define x_free(A) { my_free((gptr) (A),MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); } #define x_free(A) { my_free((gptr) (A),MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); }
#define safeFree(x) { if(x) { my_free((gptr) x,MYF(0)); x = NULL; } } #define safeFree(x) { if(x) { my_free((gptr) x,MYF(0)); x = NULL; } }
......
...@@ -38,6 +38,7 @@ MASTER_INFO glob_mi; ...@@ -38,6 +38,7 @@ MASTER_INFO glob_mi;
extern bool opt_log_slave_updates ; extern bool opt_log_slave_updates ;
static inline void skip_load_data_infile(NET* net);
static inline bool slave_killed(THD* thd); static inline bool slave_killed(THD* thd);
static int init_slave_thread(THD* thd); static int init_slave_thread(THD* thd);
int init_master_info(MASTER_INFO* mi); int init_master_info(MASTER_INFO* mi);
...@@ -53,6 +54,15 @@ static inline bool slave_killed(THD* thd) ...@@ -53,6 +54,15 @@ static inline bool slave_killed(THD* thd)
return abort_slave || abort_loop || thd->killed; return abort_slave || abort_loop || thd->killed;
} }
static inline void skip_load_data_infile(NET* net)
{
(void)my_net_write(net, "\xfb/dev/null", 10);
(void)net_flush(net);
(void)my_net_read(net); // discard response
send_ok(net); // the master expects it
}
int db_ok(const char* db, I_List<i_string> &do_list, int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list ) I_List<i_string> &ignore_list )
{ {
...@@ -553,9 +563,26 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -553,9 +563,26 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{ {
Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1, Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
event_len); event_len);
if (ev) if (ev)
{ {
switch(ev->get_type_code()) int type_code = ev->get_type_code();
if(ev->server_id == ::server_id)
{
if(type_code == LOAD_EVENT)
skip_load_data_infile(net);
mi->inc_pos(event_len);
flush_master_info(mi);
delete ev;
return 0; // avoid infinite update loops
}
thd->server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query
ev->when = time(NULL);
switch(type_code)
{ {
case QUERY_EVENT: case QUERY_EVENT:
{ {
...@@ -706,10 +733,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -706,10 +733,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
else // we will just ask the master to send us /dev/null if we do not want to else // we will just ask the master to send us /dev/null if we do not want to
// load the data :-) // load the data :-)
{ {
(void)my_net_write(net, "\xfb/dev/null", 10); skip_load_data_infile(net);
(void)net_flush(net);
(void)my_net_read(net); // discard response
send_ok(net); // the master expects it
} }
thd->net.vio = 0; thd->net.vio = 0;
...@@ -799,14 +823,14 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -799,14 +823,14 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
if(!server_id) if(!server_id)
{ {
sql_print_error("Server id not set, will not start slave"); sql_print_error("Server id not set, will not start slave");
pthread_exit(1); pthread_exit((void*)1);
} }
pthread_mutex_lock(&LOCK_slave); pthread_mutex_lock(&LOCK_slave);
if(slave_running) if(slave_running)
{ {
pthread_mutex_unlock(&LOCK_slave); pthread_mutex_unlock(&LOCK_slave);
pthread_exit(1); // safety just in case pthread_exit((void*)1); // safety just in case
} }
slave_running = 1; slave_running = 1;
abort_slave = 0; abort_slave = 0;
......
...@@ -39,7 +39,6 @@ static bool check_dup(THD *thd,const char *db,const char *name, ...@@ -39,7 +39,6 @@ static bool check_dup(THD *thd,const char *db,const char *name,
TABLE_LIST *tables); TABLE_LIST *tables);
static void mysql_init_query(THD *thd); static void mysql_init_query(THD *thd);
static void remove_escape(char *name); static void remove_escape(char *name);
static void kill_one_thread(THD *thd, ulong thread);
static void refresh_status(void); static void refresh_status(void);
const char *any_db="*any*"; // Special symbol for check_access const char *any_db="*any*"; // Special symbol for check_access
...@@ -712,6 +711,9 @@ bool do_command(THD *thd) ...@@ -712,6 +711,9 @@ bool do_command(THD *thd)
thd->server_id = slave_server_id; thd->server_id = slave_server_id;
pthread_mutex_unlock(&LOCK_server_id); pthread_mutex_unlock(&LOCK_server_id);
mysql_binlog_send(thd, strdup(packet + 11), pos, flags); mysql_binlog_send(thd, strdup(packet + 11), pos, flags);
// fake COM_QUIT -- if we get here, the thread needs to terminate
error = TRUE;
net->error = 0;
break; break;
} }
case COM_REFRESH: case COM_REFRESH:
...@@ -2451,7 +2453,7 @@ bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables) ...@@ -2451,7 +2453,7 @@ bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables)
} }
static void kill_one_thread(THD *thd, ulong id) void kill_one_thread(THD *thd, ulong id)
{ {
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
I_List_iterator<THD> it(threads); I_List_iterator<THD> it(threads);
......
...@@ -225,6 +225,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -225,6 +225,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
if(read_packet) if(read_packet)
{ {
thd->proc_info = "sending update to slave";
if(my_net_write(net, (char*)packet->ptr(), packet->length()) ) if(my_net_write(net, (char*)packet->ptr(), packet->length()) )
{ {
errmsg = "Failed on my_net_write()"; errmsg = "Failed on my_net_write()";
...@@ -257,7 +258,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -257,7 +258,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
else else
{ {
bool loop_breaker = 0; // need this to break out of the for loop from switch bool loop_breaker = 0; // need this to break out of the for loop from switch
thd->proc_info = "switching to next log";
switch(mysql_bin_log.find_next_log(&linfo)) switch(mysql_bin_log.find_next_log(&linfo))
{ {
case LOG_INFO_EOF: case LOG_INFO_EOF:
...@@ -307,10 +308,12 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -307,10 +308,12 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
} }
(void)my_fclose(log, MYF(MY_WME)); (void)my_fclose(log, MYF(MY_WME));
send_eof(&thd->net); send_eof(&thd->net);
thd->proc_info = "waiting to finalize termination";
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
err: err:
thd->proc_info = "waiting to finalize termination";
if(log) if(log)
(void) my_fclose(log, MYF(MY_WME)); (void) my_fclose(log, MYF(MY_WME));
send_error(&thd->net, 0, errmsg); send_error(&thd->net, 0, errmsg);
...@@ -408,6 +411,34 @@ void reset_slave() ...@@ -408,6 +411,34 @@ void reset_slave()
void kill_zombie_dump_threads(uint32 slave_server_id) void kill_zombie_dump_threads(uint32 slave_server_id)
{ {
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
THD *tmp;
while((tmp=it++))
{
if(tmp->command == COM_BINLOG_DUMP &&
tmp->server_id == slave_server_id)
{
// here we do not call kill_one_thread()
// it will be slow because it will iterate through the list
// again. Plus it double-locks LOCK_thread_count, which
// make safe_mutex complain and abort
// so we just to our own thread murder
thr_alarm_kill(tmp->real_id);
tmp->killed = 1;
pthread_mutex_lock(&tmp->mysys_var->mutex);
tmp->mysys_var->abort = 1;
if(tmp->mysys_var->current_mutex)
{
pthread_mutex_lock(tmp->mysys_var->current_mutex);
pthread_cond_broadcast(tmp->mysys_var->current_cond);
pthread_mutex_unlock(tmp->mysys_var->current_mutex);
}
pthread_mutex_unlock(&tmp->mysys_var->mutex);
}
}
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment