Commit b7ae65ef authored by unknown's avatar unknown

MDEV-5363: Make parallel replication waits killable

A couple of more parallel replication waits made killable.
parent 4d6ee2d1
...@@ -285,20 +285,41 @@ handle_rpl_parallel_thread(void *arg) ...@@ -285,20 +285,41 @@ handle_rpl_parallel_thread(void *arg)
wait_start_sub_id= rgi->wait_start_sub_id; wait_start_sub_id= rgi->wait_start_sub_id;
if (wait_for_sub_id || wait_start_sub_id) if (wait_for_sub_id || wait_start_sub_id)
{ {
bool did_enter_cond= false;
const char *old_msg= NULL;
mysql_mutex_lock(&entry->LOCK_parallel_entry); mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (wait_start_sub_id) if (wait_start_sub_id)
{ {
while (wait_start_sub_id > entry->last_committed_sub_id) old_msg= thd->enter_cond(&entry->COND_parallel_entry,
&entry->LOCK_parallel_entry,
"Waiting for prior transaction to commit "
"before starting next transaction");
did_enter_cond= true;
while (wait_start_sub_id > entry->last_committed_sub_id &&
!thd->check_killed())
mysql_cond_wait(&entry->COND_parallel_entry, mysql_cond_wait(&entry->COND_parallel_entry,
&entry->LOCK_parallel_entry); &entry->LOCK_parallel_entry);
if (wait_start_sub_id > entry->last_committed_sub_id)
{
/* The thread got a kill signal. */
thd->send_kill_message();
rgi->is_error= true;
slave_output_error_info(rgi->rli, thd);
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
} }
rgi->wait_start_sub_id= 0; /* No need to check again. */ rgi->wait_start_sub_id= 0; /* No need to check again. */
}
if (wait_for_sub_id > entry->last_committed_sub_id) if (wait_for_sub_id > entry->last_committed_sub_id)
{ {
wait_for_commit *waitee= wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer; &rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee); rgi->commit_orderer.register_wait_for_prior_commit(waitee);
} }
if (did_enter_cond)
thd->exit_cond(old_msg);
else
mysql_mutex_unlock(&entry->LOCK_parallel_entry); mysql_mutex_unlock(&entry->LOCK_parallel_entry);
} }
...@@ -753,6 +774,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -753,6 +774,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Relay_log_info *rli= serial_rgi->rli; Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ; enum Log_event_type typ;
bool is_group_event; bool is_group_event;
bool did_enter_cond= false;
const char *old_msg= NULL;
/* ToDo: what to do with this lock?!? */ /* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
...@@ -860,6 +883,13 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -860,6 +883,13 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
} }
else if (cur_thread->queued_size <= opt_slave_parallel_max_queued) else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
break; // The thread is ready to queue into break; // The thread is ready to queue into
else if (rli->sql_driver_thd->check_killed())
{
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
my_error(ER_CONNECTION_KILLED, MYF(0));
delete rgi;
return true;
}
else else
{ {
/* /*
...@@ -867,6 +897,13 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -867,6 +897,13 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
use for queuing events, so wait for the thread to consume some use for queuing events, so wait for the thread to consume some
of its queue. of its queue.
*/ */
if (!did_enter_cond)
{
old_msg= rli->sql_driver_thd->enter_cond
(&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread,
"Waiting for room in worker thread event queue");
did_enter_cond= true;
}
mysql_cond_wait(&cur_thread->COND_rpl_thread, mysql_cond_wait(&cur_thread->COND_rpl_thread,
&cur_thread->LOCK_rpl_thread); &cur_thread->LOCK_rpl_thread);
} }
...@@ -1016,6 +1053,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1016,6 +1053,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/ */
rli->event_relay_log_pos= rli->future_event_relay_log_pos; rli->event_relay_log_pos= rli->future_event_relay_log_pos;
cur_thread->enqueue(qev); cur_thread->enqueue(qev);
if (did_enter_cond)
rli->sql_driver_thd->exit_cond(old_msg);
else
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
mysql_cond_signal(&cur_thread->COND_rpl_thread); mysql_cond_signal(&cur_thread->COND_rpl_thread);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment