fixed replication assert failure ( found by Jeremy Zawodny)

parent e497f1c2
...@@ -29,6 +29,8 @@ ...@@ -29,6 +29,8 @@
bool use_slave_mask = 0; bool use_slave_mask = 0;
MY_BITMAP slave_error_mask; MY_BITMAP slave_error_mask;
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
volatile bool slave_sql_running = 0, slave_io_running = 0; volatile bool slave_sql_running = 0, slave_io_running = 0;
char* slave_load_tmpdir = 0; char* slave_load_tmpdir = 0;
MASTER_INFO main_mi; MASTER_INFO main_mi;
...@@ -58,14 +60,15 @@ static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev); ...@@ -58,14 +60,15 @@ static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev); static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
static int queue_old_event(MASTER_INFO* mi, const char* buf, static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len); uint event_len);
static inline bool slave_killed(THD* thd,MASTER_INFO* mi); static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli); static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect); bool reconnect);
static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec); static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
void* thread_killed_arg);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db, static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name); const char* table_name);
...@@ -605,14 +608,14 @@ void end_slave() ...@@ -605,14 +608,14 @@ void end_slave()
free_string_array(&replicate_wild_ignore_table); free_string_array(&replicate_wild_ignore_table);
} }
static inline bool slave_killed(THD* thd, MASTER_INFO* mi) static bool io_slave_killed(THD* thd, MASTER_INFO* mi)
{ {
DBUG_ASSERT(mi->io_thd == thd); DBUG_ASSERT(mi->io_thd == thd);
DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun
return mi->abort_slave || abort_loop || thd->killed; return mi->abort_slave || abort_loop || thd->killed;
} }
static inline bool slave_killed(THD* thd, RELAY_LOG_INFO* rli) static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
{ {
DBUG_ASSERT(rli->sql_thd == thd); DBUG_ASSERT(rli->sql_thd == thd);
DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
...@@ -1375,7 +1378,8 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) ...@@ -1375,7 +1378,8 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec) static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
void* thread_killed_arg)
{ {
thr_alarm_t alarmed; thr_alarm_t alarmed;
thr_alarm_init(&alarmed); thr_alarm_init(&alarmed);
...@@ -1398,7 +1402,7 @@ static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec) ...@@ -1398,7 +1402,7 @@ static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec)
if (thr_alarm_in_use(&alarmed)) if (thr_alarm_in_use(&alarmed))
thr_end_alarm(&alarmed); thr_end_alarm(&alarmed);
if (slave_killed(thd,mi)) if ((*thread_killed)(thd,thread_killed_arg))
return 1; return 1;
start_time=time((time_t*) 0); start_time=time((time_t*) 0);
} }
...@@ -1528,7 +1532,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) ...@@ -1528,7 +1532,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
DBUG_ASSERT(rli->sql_thd==thd); DBUG_ASSERT(rli->sql_thd==thd);
Log_event * ev = next_event(rli); Log_event * ev = next_event(rli);
DBUG_ASSERT(rli->sql_thd==thd); DBUG_ASSERT(rli->sql_thd==thd);
if (slave_killed(thd,rli)) if (sql_slave_killed(thd,rli))
return 1; return 1;
if (ev) if (ev)
{ {
...@@ -1661,13 +1665,13 @@ pthread_handler_decl(handle_slave_io,arg) ...@@ -1661,13 +1665,13 @@ pthread_handler_decl(handle_slave_io,arg)
goto err; goto err;
} }
while (!slave_killed(thd,mi)) while (!io_slave_killed(thd,mi))
{ {
thd->proc_info = "Requesting binlog dump"; thd->proc_info = "Requesting binlog dump";
if (request_dump(mysql, mi)) if (request_dump(mysql, mi))
{ {
sql_print_error("Failed on request_dump()"); sql_print_error("Failed on request_dump()");
if(slave_killed(thd,mi)) if(io_slave_killed(thd,mi))
{ {
sql_print_error("Slave I/O thread killed while requesting master \ sql_print_error("Slave I/O thread killed while requesting master \
dump"); dump");
...@@ -1682,11 +1686,12 @@ dump"); ...@@ -1682,11 +1686,12 @@ dump");
hopefuly the admin can fix the problem sometime hopefuly the admin can fix the problem sometime
*/ */
if (retried_once) if (retried_once)
safe_sleep(thd, mi, mi->connect_retry); safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
(void*)mi);
else else
retried_once = 1; retried_once = 1;
if (slave_killed(thd,mi)) if (io_slave_killed(thd,mi))
{ {
sql_print_error("Slave I/O thread killed while retrying master \ sql_print_error("Slave I/O thread killed while retrying master \
dump"); dump");
...@@ -1697,7 +1702,7 @@ dump"); ...@@ -1697,7 +1702,7 @@ dump");
sql_print_error("Slave I/O thread: failed dump request, \ sql_print_error("Slave I/O thread: failed dump request, \
reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME, reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
llstr(mi->master_log_pos,llbuff)); llstr(mi->master_log_pos,llbuff));
if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi)) if (safe_reconnect(thd, mysql, mi) || io_slave_killed(thd,mi))
{ {
sql_print_error("Slave I/O thread killed during or \ sql_print_error("Slave I/O thread killed during or \
after reconnect"); after reconnect");
...@@ -1707,11 +1712,11 @@ after reconnect"); ...@@ -1707,11 +1712,11 @@ after reconnect");
goto connected; goto connected;
} }
while (!slave_killed(thd,mi)) while (!io_slave_killed(thd,mi))
{ {
thd->proc_info = "Reading master update"; thd->proc_info = "Reading master update";
ulong event_len = read_event(mysql, mi); ulong event_len = read_event(mysql, mi);
if (slave_killed(thd,mi)) if (io_slave_killed(thd,mi))
{ {
sql_print_error("Slave I/O thread killed while reading event"); sql_print_error("Slave I/O thread killed while reading event");
goto err; goto err;
...@@ -1731,11 +1736,12 @@ is correct, restart the server with a higher value of max_allowed_packet", ...@@ -1731,11 +1736,12 @@ is correct, restart the server with a higher value of max_allowed_packet",
thd->proc_info = "Waiting to reconnect after a failed read"; thd->proc_info = "Waiting to reconnect after a failed read";
mc_end_server(mysql); mc_end_server(mysql);
if (retried_once) // punish repeat offender with sleep if (retried_once) // punish repeat offender with sleep
safe_sleep(thd,mi,mi->connect_retry); safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
(void*)mi);
else else
retried_once = 1; retried_once = 1;
if (slave_killed(thd,mi)) if (io_slave_killed(thd,mi))
{ {
sql_print_error("Slave I/O thread killed while waiting to \ sql_print_error("Slave I/O thread killed while waiting to \
reconnect after a failed read"); reconnect after a failed read");
...@@ -1745,7 +1751,7 @@ reconnect after a failed read"); ...@@ -1745,7 +1751,7 @@ reconnect after a failed read");
sql_print_error("Slave I/O thread: Failed reading log event, \ sql_print_error("Slave I/O thread: Failed reading log event, \
reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME, reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
llstr(mi->master_log_pos, llbuff)); llstr(mi->master_log_pos, llbuff));
if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi)) if (safe_reconnect(thd, mysql, mi) || io_slave_killed(thd,mi))
{ {
sql_print_error("Slave I/O thread killed during or after a \ sql_print_error("Slave I/O thread killed during or after a \
reconnect done to recover from failed read"); reconnect done to recover from failed read");
...@@ -1771,8 +1777,8 @@ from master"); ...@@ -1771,8 +1777,8 @@ from master");
goto err; goto err;
} }
#endif #endif
} // while(!slave_killed(thd,mi)) - read/exec loop }
} // while(!slave_killed(thd,mi)) - slave loop }
// error = 0; // error = 0;
err: err:
...@@ -1874,14 +1880,14 @@ pthread_handler_decl(handle_slave_sql,arg) ...@@ -1874,14 +1880,14 @@ pthread_handler_decl(handle_slave_sql,arg)
log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME, log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME,
llstr(rli->master_log_pos,llbuff),rli->relay_log_name, llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
llstr(rli->relay_log_pos,llbuff1)); llstr(rli->relay_log_pos,llbuff1));
while (!slave_killed(thd,rli)) while (!sql_slave_killed(thd,rli))
{ {
thd->proc_info = "Processing master log event"; thd->proc_info = "Processing master log event";
DBUG_ASSERT(rli->sql_thd == thd); DBUG_ASSERT(rli->sql_thd == thd);
if (exec_relay_log_event(thd,rli)) if (exec_relay_log_event(thd,rli))
{ {
// do not scare the user if SQL thread was simply killed or stopped // do not scare the user if SQL thread was simply killed or stopped
if (!slave_killed(thd,rli)) if (!sql_slave_killed(thd,rli))
sql_print_error("\ sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \ Error running query, slave SQL thread aborted. Fix the problem, and restart \
the slave SQL thread with \"SLAVE START\". We stopped at log \ the slave SQL thread with \"SLAVE START\". We stopped at log \
...@@ -1889,7 +1895,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ ...@@ -1889,7 +1895,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff)); RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
goto err; goto err;
} }
} // while(!slave_killed(thd,rli)) - read/exec loop } // while(!sql_slave_killed(thd,rli)) - read/exec loop
// error = 0; // error = 0;
err: err:
...@@ -2224,7 +2230,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, ...@@ -2224,7 +2230,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
#ifndef DBUG_OFF #ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count; events_till_disconnect = disconnect_slave_event_count;
#endif #endif
while (!(slave_was_killed = slave_killed(thd,mi)) && while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
(reconnect ? mc_mysql_reconnect(mysql) != 0 : (reconnect ? mc_mysql_reconnect(mysql) != 0 :
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))) mi->port, 0, 0)))
...@@ -2238,7 +2244,8 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, ...@@ -2238,7 +2244,8 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql), mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
mi->connect_retry); mi->connect_retry);
} }
safe_sleep(thd,mi,mi->connect_retry); safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
(void*)mi);
/* /*
By default we try forever. The reason is that failure will trigger By default we try forever. The reason is that failure will trigger
master election, so if the user did not set master_retry_count we master election, so if the user did not set master_retry_count we
...@@ -2333,7 +2340,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) ...@@ -2333,7 +2340,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
*/ */
pthread_mutex_lock(&rli->data_lock); pthread_mutex_lock(&rli->data_lock);
for (; !(was_killed=slave_killed(thd,rli)) ;) for (; !(was_killed=sql_slave_killed(thd,rli)) ;)
{ {
/* /*
We can have two kinds of log reading: We can have two kinds of log reading:
...@@ -2465,7 +2472,8 @@ event(errno=%d,cur_log->error=%d)", ...@@ -2465,7 +2472,8 @@ event(errno=%d,cur_log->error=%d)",
my_errno,cur_log->error); my_errno,cur_log->error);
// no need to hog the mutex while we sleep // no need to hog the mutex while we sleep
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
safe_sleep(rli->sql_thd,rli->mi,1); safe_sleep(rli->sql_thd,1,(CHECK_KILLED_FUNC)sql_slave_killed,
(void*)rli);
pthread_mutex_lock(&rli->data_lock); pthread_mutex_lock(&rli->data_lock);
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment