Commit 81d7d92a authored by unknown's avatar unknown

MDEV-4506: Parallel replication

MDEV-5217: SQL thread hangs during stop if error occurs in the middle of an event group

Normally, when we stop the slave SQL thread in parallel replication, we want
the worker threads to continue processing events until the end of the current
event group. But if we stop due to an error that prevents further events from
being queued, such as an error reading the relay log, no more events can be
queued for the workers, so they have to abort even if they are in the middle
of an event group. There was a bug that we would deadlock, the workers
waiting for more events to be queued for the event group, the SQL thread
stopped and waiting for the workers to complete their current event group
before exiting.

Fixed by now signalling from the SQL thread to all workers when it is about
to exit, and cleaning up in all workers when so signalled.

This patch fixes one of multiple problems reported in MDEV-5217.
parent ddc28b8c
...@@ -120,6 +120,46 @@ sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) ...@@ -120,6 +120,46 @@ sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
} }
static void
finish_event_group(THD *thd, int err, uint64 sub_id,
rpl_parallel_entry *entry, wait_for_commit *wfc)
{
/*
Remove any left-over registration to wait for a prior commit to
complete. Normally, such wait would already have been removed at
this point by wait_for_prior_commit(), but eg. in error case we
might have skipped waiting, so we would need to remove it explicitly.
*/
wfc->unregister_wait_for_prior_commit();
thd->wait_for_commit_ptr= NULL;
/*
Record that this event group has finished (eg. transaction is
committed, if transactional), so other event groups will no longer
attempt to wait for us to commit. Once we have increased
entry->last_committed_sub_id, no other threads will execute
register_wait_for_prior_commit() against us. Thus, by doing one
extra (usually redundant) wakeup_subsequent_commits() we can ensure
that no register_wait_for_prior_commit() can ever happen without a
subsequent wakeup_subsequent_commits() to wake it up.
We can race here with the next transactions, but that is fine, as
long as we check that we do not decrease last_committed_sub_id. If
this commit is done, then any prior commits will also have been
done and also no longer need waiting for.
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (entry->last_committed_sub_id < sub_id)
{
entry->last_committed_sub_id= sub_id;
mysql_cond_broadcast(&entry->COND_parallel_entry);
}
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
wfc->wakeup_subsequent_commits(err);
}
pthread_handler_t pthread_handler_t
handle_rpl_parallel_thread(void *arg) handle_rpl_parallel_thread(void *arg)
{ {
...@@ -128,6 +168,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -128,6 +168,7 @@ handle_rpl_parallel_thread(void *arg)
struct rpl_parallel_thread::queued_event *events; struct rpl_parallel_thread::queued_event *events;
bool group_standalone= true; bool group_standalone= true;
bool in_event_group= false; bool in_event_group= false;
rpl_group_info *group_rgi= NULL;
uint64 event_gtid_sub_id= 0; uint64 event_gtid_sub_id= 0;
int err; int err;
...@@ -174,7 +215,8 @@ handle_rpl_parallel_thread(void *arg) ...@@ -174,7 +215,8 @@ handle_rpl_parallel_thread(void *arg)
old_msg= thd->proc_info; old_msg= thd->proc_info;
thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
"Waiting for work from SQL thread"); "Waiting for work from SQL thread");
while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed &&
!(rpt->current_entry && rpt->current_entry->force_abort))
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
rpt->dequeue(events); rpt->dequeue(events);
thd->exit_cond(old_msg); thd->exit_cond(old_msg);
...@@ -200,6 +242,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -200,6 +242,7 @@ handle_rpl_parallel_thread(void *arg)
} }
err= 0; err= 0;
group_rgi= rgi;
/* Handle a new event group, which will be initiated by a GTID event. */ /* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT) if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
{ {
...@@ -295,41 +338,10 @@ handle_rpl_parallel_thread(void *arg) ...@@ -295,41 +338,10 @@ handle_rpl_parallel_thread(void *arg)
if (end_of_group) if (end_of_group)
{ {
in_event_group= false; in_event_group= false;
finish_event_group(thd, err, event_gtid_sub_id, entry,
/* &rgi->commit_orderer);
Remove any left-over registration to wait for a prior commit to
complete. Normally, such wait would already have been removed at
this point by wait_for_prior_commit(), but eg. in error case we
might have skipped waiting, so we would need to remove it explicitly.
*/
rgi->commit_orderer.unregister_wait_for_prior_commit();
thd->wait_for_commit_ptr= NULL;
/*
Record that this event group has finished (eg. transaction is
committed, if transactional), so other event groups will no longer
attempt to wait for us to commit. Once we have increased
entry->last_committed_sub_id, no other threads will execute
register_wait_for_prior_commit() against us. Thus, by doing one
extra (usually redundant) wakeup_subsequent_commits() we can ensure
that no register_wait_for_prior_commit() can ever happen without a
subsequent wakeup_subsequent_commits() to wake it up.
We can race here with the next transactions, but that is fine, as
long as we check that we do not decrease last_committed_sub_id. If
this commit is done, then any prior commits will also have been
done and also no longer need waiting for.
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (entry->last_committed_sub_id < event_gtid_sub_id)
{
entry->last_committed_sub_id= event_gtid_sub_id;
mysql_cond_broadcast(&entry->COND_parallel_entry);
}
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
rgi->commit_orderer.wakeup_subsequent_commits(err);
delete rgi; delete rgi;
group_rgi= rgi= NULL;
} }
events= next; events= next;
...@@ -349,6 +361,27 @@ handle_rpl_parallel_thread(void *arg) ...@@ -349,6 +361,27 @@ handle_rpl_parallel_thread(void *arg)
goto more_events; goto more_events;
} }
if (in_event_group && group_rgi->parallel_entry->force_abort)
{
/*
We are asked to abort, without getting the remaining events in the
current event group.
We have to rollback the current transaction and update the last
sub_id value so that SQL thread will know we are done with the
half-processed event group.
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
group_rgi->is_error= true;
finish_event_group(thd, 1, group_rgi->gtid_sub_id,
group_rgi->parallel_entry, &group_rgi->commit_orderer);
group_rgi->cleanup_context(thd, true);
group_rgi->rli->abort_slave= true;
in_event_group= false;
delete group_rgi;
group_rgi= NULL;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
}
if (!in_event_group) if (!in_event_group)
{ {
rpt->current_entry= NULL; rpt->current_entry= NULL;
...@@ -646,6 +679,8 @@ rpl_parallel::find(uint32 domain_id) ...@@ -646,6 +679,8 @@ rpl_parallel::find(uint32 domain_id)
MY_MUTEX_INIT_FAST); MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL); mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
} }
else
e->force_abort= false;
return e; return e;
} }
...@@ -657,6 +692,25 @@ rpl_parallel::wait_for_done() ...@@ -657,6 +692,25 @@ rpl_parallel::wait_for_done()
struct rpl_parallel_entry *e; struct rpl_parallel_entry *e;
uint32 i; uint32 i;
/*
First signal all workers that they must force quit; no more events will
be queued to complete any partial event groups executed.
*/
for (i= 0; i < domain_hash.records; ++i)
{
rpl_parallel_thread *rpt;
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
e->force_abort= true;
if ((rpt= e->rpl_thread))
{
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
if (rpt->current_entry == e)
mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
}
}
for (i= 0; i < domain_hash.records; ++i) for (i= 0; i < domain_hash.records; ++i)
{ {
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
......
...@@ -76,6 +76,13 @@ struct rpl_parallel_entry { ...@@ -76,6 +76,13 @@ struct rpl_parallel_entry {
uint64 last_seq_no; uint64 last_seq_no;
uint64 last_commit_id; uint64 last_commit_id;
bool active; bool active;
/*
Set when SQL thread is shutting down, and no more events can be processed,
so worker threads must force abort any current transactions without
waiting for event groups to complete.
*/
bool force_abort;
rpl_parallel_thread *rpl_thread; rpl_parallel_thread *rpl_thread;
/* /*
The sub_id of the last transaction to commit within this domain_id. The sub_id of the last transaction to commit within this domain_id.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment