Commit c31e1e26 authored by unknown's avatar unknown

MDEV-4506: Parallel replication: intermediate commit.

Fix a bunch of issues found with locking, ordering, and non-thread-safe stuff
in Relay_log_info.

Now able to do a simple benchmark, showing 4.5 times speedup for applying a
binlog with 10000 REPLACE statements.
parent 1fca3487
...@@ -408,7 +408,7 @@ class MYSQL_QUERY_LOG: public MYSQL_LOG ...@@ -408,7 +408,7 @@ class MYSQL_QUERY_LOG: public MYSQL_LOG
class binlog_cache_mngr; class binlog_cache_mngr;
struct rpl_gtid; struct rpl_gtid;
class wait_for_commit; struct wait_for_commit;
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{ {
private: private:
......
...@@ -9101,7 +9101,7 @@ int Rows_log_event::do_apply_event(struct rpl_group_info *rgi) ...@@ -9101,7 +9101,7 @@ int Rows_log_event::do_apply_event(struct rpl_group_info *rgi)
do_apply_event(). We still check here to prevent future coding do_apply_event(). We still check here to prevent future coding
errors. errors.
*/ */
DBUG_ASSERT(rli->sql_thd == thd); DBUG_ASSERT(rgi->thd == thd);
/* /*
If there is no locks taken, this is the first binrow event seen If there is no locks taken, this is the first binrow event seen
......
...@@ -68,7 +68,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, struct rpl_group_info ...@@ -68,7 +68,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, struct rpl_group_info
do_apply_event(). We still check here to prevent future coding do_apply_event(). We still check here to prevent future coding
errors. errors.
*/ */
DBUG_ASSERT(rli->sql_thd == ev_thd); DBUG_ASSERT(rgi->thd == ev_thd);
/* /*
If there is no locks taken, this is the first binrow event seen If there is no locks taken, this is the first binrow event seen
...@@ -1481,7 +1481,7 @@ int Old_rows_log_event::do_apply_event(struct rpl_group_info *rgi) ...@@ -1481,7 +1481,7 @@ int Old_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
do_apply_event(). We still check here to prevent future coding do_apply_event(). We still check here to prevent future coding
errors. errors.
*/ */
DBUG_ASSERT(rli->sql_thd == thd); DBUG_ASSERT(rgi->thd == thd);
/* /*
If there is no locks taken, this is the first binrow event seen If there is no locks taken, this is the first binrow event seen
......
...@@ -38,15 +38,16 @@ ...@@ -38,15 +38,16 @@
everything needs to be correctly rolled back and stopped in all threads, everything needs to be correctly rolled back and stopped in all threads,
to ensure a consistent slave replication state. to ensure a consistent slave replication state.
- We need some knob on the master to allow the user to deliberately delay
commits waiting for more transactions to join group commit, to increase
potential for parallel execution on the slave.
- Handle the case of a partial event group. This occurs when the master - Handle the case of a partial event group. This occurs when the master
crashes in the middle of writing the event group to the binlog. The crashes in the middle of writing the event group to the binlog. The
slave rolls back the transaction; parallel execution needs to be able slave rolls back the transaction; parallel execution needs to be able
to deal with this wrt. commit_orderer and such. to deal with this wrt. commit_orderer and such.
- Relay_log_info::is_in_group(). This needs to be handled correctly in all
callers. I think it needs to be split into two, one version in
Relay_log_info to be used from next_event() in slave.cc, one to be used in
per-transaction stuff.
- We should fail if we connect to the master with opt_slave_parallel_threads - We should fail if we connect to the master with opt_slave_parallel_threads
greater than zero and master does not support GTID. Just to avoid a bunch greater than zero and master does not support GTID. Just to avoid a bunch
of potential problems, we won't be able to do any parallel replication of potential problems, we won't be able to do any parallel replication
...@@ -58,12 +59,12 @@ struct rpl_parallel_thread_pool global_rpl_thread_pool; ...@@ -58,12 +59,12 @@ struct rpl_parallel_thread_pool global_rpl_thread_pool;
static void static void
rpt_handle_event(rpl_parallel_thread::queued_event *qev, rpt_handle_event(rpl_parallel_thread::queued_event *qev,
THD *thd,
struct rpl_parallel_thread *rpt) struct rpl_parallel_thread *rpt)
{ {
int err; int err;
struct rpl_group_info *rgi= qev->rgi; struct rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli; Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
thd->rli_slave= rli; thd->rli_slave= rli;
thd->rpl_filter = rli->mi->rpl_filter; thd->rpl_filter = rli->mi->rpl_filter;
...@@ -143,6 +144,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -143,6 +144,7 @@ handle_rpl_parallel_thread(void *arg)
rpl_group_info *rgi= events->rgi; rpl_group_info *rgi= events->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry; rpl_parallel_entry *entry= rgi->parallel_entry;
uint64 wait_for_sub_id; uint64 wait_for_sub_id;
uint64 wait_start_sub_id;
bool end_of_group; bool end_of_group;
if (event_type == GTID_EVENT) if (event_type == GTID_EVENT)
...@@ -155,14 +157,28 @@ handle_rpl_parallel_thread(void *arg) ...@@ -155,14 +157,28 @@ handle_rpl_parallel_thread(void *arg)
/* Save this, as it gets cleared once event group commits. */ /* Save this, as it gets cleared once event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id; event_gtid_sub_id= rgi->gtid_sub_id;
rgi->thd= thd;
/* /*
Register ourself to wait for the previous commit, if we need to do Register ourself to wait for the previous commit, if we need to do
such registration _and_ that previous commit has not already such registration _and_ that previous commit has not already
occured. occured.
Also do not start parallel execution of this event group until all
prior groups have committed that are not safe to run in parallel with.
*/ */
if ((wait_for_sub_id= rgi->wait_commit_sub_id)) wait_for_sub_id= rgi->wait_commit_sub_id;
wait_start_sub_id= rgi->wait_start_sub_id;
if (wait_for_sub_id || wait_start_sub_id)
{ {
mysql_mutex_lock(&entry->LOCK_parallel_entry); mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (wait_start_sub_id)
{
while (wait_start_sub_id > entry->last_committed_sub_id)
mysql_cond_wait(&entry->COND_parallel_entry,
&entry->LOCK_parallel_entry);
}
rgi->wait_start_sub_id= 0; /* No need to check again. */
if (wait_for_sub_id > entry->last_committed_sub_id) if (wait_for_sub_id > entry->last_committed_sub_id)
{ {
wait_for_commit *waitee= wait_for_commit *waitee=
...@@ -176,7 +192,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -176,7 +192,7 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr= &rgi->commit_orderer; thd->wait_for_commit_ptr= &rgi->commit_orderer;
} }
rpt_handle_event(events, thd, rpt); rpt_handle_event(events, rpt);
end_of_group= end_of_group=
in_event_group && in_event_group &&
...@@ -376,6 +392,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, ...@@ -376,6 +392,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
while (new_free_list->running) while (new_free_list->running)
mysql_cond_wait(&new_free_list->COND_rpl_thread, mysql_cond_wait(&new_free_list->COND_rpl_thread,
&new_free_list->LOCK_rpl_thread); &new_free_list->LOCK_rpl_thread);
mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread);
my_free(new_free_list); my_free(new_free_list);
new_free_list= next; new_free_list= next;
} }
...@@ -503,8 +520,7 @@ rpl_parallel::wait_for_done() ...@@ -503,8 +520,7 @@ rpl_parallel::wait_for_done()
bool bool
rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev)
THD *parent_thd)
{ {
rpl_parallel_entry *e; rpl_parallel_entry *e;
rpl_parallel_thread *cur_thread; rpl_parallel_thread *cur_thread;
...@@ -530,51 +546,15 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, ...@@ -530,51 +546,15 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
if (!(e= find(gtid_ev->domain_id)) || if (!(e= find(gtid_ev->domain_id)) ||
!(e->current_group_info= rgi= new rpl_group_info(rli)) || !(rgi= new rpl_group_info(rli)) ||
event_group_new_gtid(rgi, gtid_ev)) event_group_new_gtid(rgi, gtid_ev))
{ {
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
return true; return true;
} }
/* Check if we already have a worker thread for this entry. */ if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
cur_thread= e->rpl_thread; e->last_commit_id == gtid_ev->commit_id)
if (cur_thread)
{
mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
if (cur_thread->current_entry != e)
{
/* Not ours anymore, we need to grab a new one. */
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
e->rpl_thread= cur_thread= NULL;
}
}
if (!cur_thread)
{
/*
Nothing else is currently running in this domain. We can spawn a new
thread to do this event group in parallel with anything else that might
be running in other domains.
*/
if (gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID)
{
e->last_server_id= gtid_ev->server_id;
e->last_seq_no= gtid_ev->seq_no;
e->last_commit_id= gtid_ev->commit_id;
}
else
{
e->last_server_id= 0;
e->last_seq_no= 0;
e->last_commit_id= 0;
}
cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
rgi->wait_commit_sub_id= 0;
/* get_thread() returns with the LOCK_rpl_thread locked. */
}
else if ((gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
e->last_commit_id == gtid_ev->commit_id)
{ {
/* /*
We are already executing something else in this domain. But the two We are already executing something else in this domain. But the two
...@@ -588,19 +568,63 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, ...@@ -588,19 +568,63 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e); rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
rgi->wait_commit_sub_id= e->current_sub_id; rgi->wait_commit_sub_id= e->current_sub_id;
rgi->wait_commit_group_info= e->current_group_info; rgi->wait_commit_group_info= e->current_group_info;
rgi->wait_start_sub_id= e->prev_groupcommit_sub_id;
e->rpl_thread= cur_thread= rpt; e->rpl_thread= cur_thread= rpt;
/* get_thread() returns with the LOCK_rpl_thread locked. */ /* get_thread() returns with the LOCK_rpl_thread locked. */
} }
else else
{ {
/* /* Check if we already have a worker thread for this entry. */
We are still executing the previous event group for this replication cur_thread= e->rpl_thread;
domain, and we have to wait for that to finish before we can start on if (cur_thread)
the next one. So just re-use the thread. {
*/ mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
if (cur_thread->current_entry != e)
{
/* Not ours anymore, we need to grab a new one. */
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
e->rpl_thread= cur_thread= NULL;
}
}
if (!cur_thread)
{
/*
Nothing else is currently running in this domain. We can spawn a new
thread to do this event group in parallel with anything else that might
be running in other domains.
*/
cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
/* get_thread() returns with the LOCK_rpl_thread locked. */
}
else
{
/*
We are still executing the previous event group for this replication
domain, and we have to wait for that to finish before we can start on
the next one. So just re-use the thread.
*/
}
rgi->wait_commit_sub_id= 0; rgi->wait_commit_sub_id= 0;
rgi->wait_start_sub_id= 0;
e->prev_groupcommit_sub_id= e->current_sub_id;
}
if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
{
e->last_server_id= gtid_ev->server_id;
e->last_seq_no= gtid_ev->seq_no;
e->last_commit_id= gtid_ev->commit_id;
}
else
{
e->last_server_id= 0;
e->last_seq_no= 0;
e->last_commit_id= 0;
} }
e->current_group_info= rgi;
e->current_sub_id= rgi->gtid_sub_id; e->current_sub_id= rgi->gtid_sub_id;
current= rgi->parallel_entry= e; current= rgi->parallel_entry= e;
} }
...@@ -612,7 +636,7 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, ...@@ -612,7 +636,7 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
but they might be from an old master). but they might be from an old master).
*/ */
qev->rgi= serial_rgi; qev->rgi= serial_rgi;
rpt_handle_event(qev, parent_thd, NULL); rpt_handle_event(qev, NULL);
delete_or_keep_event_post_apply(rli, typ, qev->ev); delete_or_keep_event_post_apply(rli, typ, qev->ev);
return false; return false;
......
...@@ -60,6 +60,15 @@ struct rpl_parallel_entry { ...@@ -60,6 +60,15 @@ struct rpl_parallel_entry {
mysql_cond_t COND_parallel_entry; mysql_cond_t COND_parallel_entry;
uint64 current_sub_id; uint64 current_sub_id;
struct rpl_group_info *current_group_info; struct rpl_group_info *current_group_info;
/*
The sub_id of the last event group in the previous batch of group-committed
transactions.
When we spawn parallel worker threads for the next group-committed batch,
they first need to wait for this sub_id to be committed before it is safe
to start executing them.
*/
uint64 prev_groupcommit_sub_id;
}; };
struct rpl_parallel { struct rpl_parallel {
HASH domain_hash; HASH domain_hash;
...@@ -69,7 +78,7 @@ struct rpl_parallel { ...@@ -69,7 +78,7 @@ struct rpl_parallel {
~rpl_parallel(); ~rpl_parallel();
rpl_parallel_entry *find(uint32 domain_id); rpl_parallel_entry *find(uint32 domain_id);
void wait_for_done(); void wait_for_done();
bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev, THD *thd); bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev);
}; };
......
...@@ -1226,7 +1226,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, ...@@ -1226,7 +1226,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
middle of the "transaction". START SLAVE will resume at BEGIN middle of the "transaction". START SLAVE will resume at BEGIN
while the MyISAM table has already been updated. while the MyISAM table has already been updated.
*/ */
if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
inc_event_relay_log_pos(); inc_event_relay_log_pos();
else else
{ {
...@@ -1267,7 +1267,7 @@ void Relay_log_info::cleanup_context(THD *thd, bool error) ...@@ -1267,7 +1267,7 @@ void Relay_log_info::cleanup_context(THD *thd, bool error)
{ {
DBUG_ENTER("Relay_log_info::cleanup_context"); DBUG_ENTER("Relay_log_info::cleanup_context");
DBUG_ASSERT(sql_thd == thd); DBUG_ASSERT(opt_slave_parallel_threads > 0 || sql_thd == thd);
/* /*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
may have opened tables, which we cannot be sure have been closed (because may have opened tables, which we cannot be sure have been closed (because
...@@ -1534,8 +1534,8 @@ rpl_load_gtid_slave_state(THD *thd) ...@@ -1534,8 +1534,8 @@ rpl_load_gtid_slave_state(THD *thd)
rpl_group_info::rpl_group_info(Relay_log_info *rli_) rpl_group_info::rpl_group_info(Relay_log_info *rli_)
: rli(rli_), gtid_sub_id(0), wait_commit_sub_id(0), wait_commit_group_info(0), : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
parallel_entry(0) wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0)
{ {
bzero(&current_gtid, sizeof(current_gtid)); bzero(&current_gtid, sizeof(current_gtid));
} }
......
...@@ -604,6 +604,7 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -604,6 +604,7 @@ class Relay_log_info : public Slave_reporting_capability
struct rpl_group_info struct rpl_group_info
{ {
Relay_log_info *rli; Relay_log_info *rli;
THD *thd;
/* /*
Current GTID being processed. Current GTID being processed.
The sub_id gives the binlog order within one domain_id. A zero sub_id The sub_id gives the binlog order within one domain_id. A zero sub_id
...@@ -630,10 +631,19 @@ struct rpl_group_info ...@@ -630,10 +631,19 @@ struct rpl_group_info
*/ */
uint64 wait_commit_sub_id; uint64 wait_commit_sub_id;
struct rpl_group_info *wait_commit_group_info; struct rpl_group_info *wait_commit_group_info;
/*
If non-zero, the event group must wait for this sub_id to be committed
before the execution of the event group is allowed to start.
(When we execute in parallel the transactions that group committed
together on the master, we still need to wait for any prior transactions
to have commtted).
*/
uint64 wait_start_sub_id;
struct rpl_parallel_entry *parallel_entry; struct rpl_parallel_entry *parallel_entry;
rpl_group_info(Relay_log_info *rli); rpl_group_info(Relay_log_info *rli_);
~rpl_group_info() { }; ~rpl_group_info() { };
}; };
......
...@@ -3246,7 +3246,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -3246,7 +3246,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
} }
if (opt_slave_parallel_threads > 0) if (opt_slave_parallel_threads > 0)
DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, thd)); DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev));
/* /*
For GTID, allocate a new sub_id for the given domain_id. For GTID, allocate a new sub_id for the given domain_id.
...@@ -3995,6 +3995,7 @@ pthread_handler_t handle_slave_sql(void *arg) ...@@ -3995,6 +3995,7 @@ pthread_handler_t handle_slave_sql(void *arg)
thd = new THD; // note that contructor of THD uses DBUG_ ! thd = new THD; // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is thd->thread_stack = (char*)&thd; // remember where our stack is
thd->rpl_filter = mi->rpl_filter; thd->rpl_filter = mi->rpl_filter;
serial_rgi.thd= thd;
DBUG_ASSERT(rli->inited); DBUG_ASSERT(rli->inited);
DBUG_ASSERT(rli->mi == mi); DBUG_ASSERT(rli->mi == mi);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment