Commit 31a5edb5 authored by unknown's avatar unknown

MDEV-4506: Parallel replication. Intermediate commit.

Hook in the wait-for-prior-commit logic (not really tested yet).
Clean up some resource maintenance around rpl_group_info (may still be some
smaller issues there though).
Add a ToDo list at the top of rpl_parallel.cc
parent 1b3dc66e
......@@ -1376,6 +1376,26 @@ class Log_event
}
}
static bool is_group_event(enum Log_event_type ev_type)
{
switch (ev_type)
{
case START_EVENT_V3:
case STOP_EVENT:
case ROTATE_EVENT:
case SLAVE_EVENT:
case FORMAT_DESCRIPTION_EVENT:
case INCIDENT_EVENT:
case HEARTBEAT_LOG_EVENT:
case BINLOG_CHECKPOINT_EVENT:
case GTID_LIST_EVENT:
return false;
default:
return true;
}
}
protected:
/**
......
......@@ -772,7 +772,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool;
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
......@@ -850,7 +850,8 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_slave_state, "LOCK_slave_state", 0},
{ &key_LOCK_binlog_state, "LOCK_binlog_state", 0},
{ &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0},
{ &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0}
{ &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0},
{ &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0}
};
PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
......
......@@ -249,7 +249,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool;
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
......
......@@ -4,6 +4,51 @@
#include "rpl_mi.h"
/*
Code for optional parallel execution of replicated events on the slave.
ToDo list:
- Review every field in Relay_log_info, and all code that accesses it.
Split out the necessary parts into rpl_group_info, to avoid conflicts
between parallel execution of events. (Such as deferred events ...)
- Error handling. If we fail in one of multiple parallel executions, we
need to make a best effort to complete prior transactions and roll back
following transactions, so slave binlog position will be correct.
- Stopping the slave needs to handle stopping all parallel executions. And
the logic in sql_slave_killed() that waits for current event group to
complete needs to be extended appropriately...
- We need some user-configurable limit on how far ahead the SQL thread will
fetch and queue events for parallel execution (otherwise if slave gets
behind we will fill up memory with pending malloc()'ed events).
- Fix update of relay-log.info and master.info. In non-GTID replication,
they must be serialised to preserve correctness. In GTID replication, we
should not update them at all except at slave thread stop.
- All the waits (eg. in struct wait_for_commit and in
rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
everything needs to be correctly rolled back and stopped in all threads,
to ensure a consistent slave replication state.
- We need some knob on the master to allow the user to deliberately delay
commits waiting for more transactions to join group commit, to increase
potential for parallel execution on the slave.
- Handle the case of a partial event group. This occurs when the master
crashes in the middle of writing the event group to the binlog. The
slave rolls back the transaction; parallel execution needs to be able
to deal with this wrt. commit_orderer and such.
- We should fail if we connect to the master with opt_slave_parallel_threads
greater than zero and master does not support GTID. Just to avoid a bunch
of potential problems, we won't be able to do any parallel replication
in this case anyway.
*/
struct rpl_parallel_thread_pool global_rpl_thread_pool;
......@@ -18,13 +63,14 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
thd->rli_slave= rli;
thd->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Get rid of rli->group_info, it is not thread safe. */
rli->group_info= rgi;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
/* ToDo: error handling. */
/* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */
my_free(rgi);
rgi= NULL;
}
......@@ -90,31 +136,72 @@ handle_rpl_parallel_thread(void *arg)
{
struct rpl_parallel_thread::queued_event *next= events->next;
Log_event_type event_type= events->ev->get_type_code();
rpl_group_info *rgi= events->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
uint64 wait_for_sub_id;
if (event_type == GTID_EVENT)
{
in_event_group= true;
group_standalone=
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
in_event_group= true;
}
else
{
if (group_standalone)
/*
Register ourself to wait for the previous commit, if we need to do
such registration _and_ that previous commit has not already
occured.
*/
if ((wait_for_sub_id= rgi->wait_commit_sub_id))
{
if (!Log_event::is_part_of_group(event_type))
in_event_group= false;
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (wait_for_sub_id > entry->last_committed_sub_id)
{
wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
}
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
}
else if (event_type == XID_EVENT)
in_event_group= false;
else if (event_type == QUERY_EVENT)
DBUG_ASSERT(!thd->wait_for_commit_ptr);
thd->wait_for_commit_ptr= &rgi->commit_orderer;
}
rpt_handle_event(events, thd, rpt);
if (in_event_group)
{
if ((group_standalone && !Log_event::is_part_of_group(event_type)) ||
event_type == XID_EVENT ||
(event_type == QUERY_EVENT &&
(!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) ||
!strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query))))
{
Query_log_event *query= static_cast<Query_log_event *>(events->ev);
if (!strcmp("COMMIT", query->query) ||
!strcmp("ROLLBACK", query->query))
in_event_group= false;
in_event_group= false;
rgi->commit_orderer.unregister_wait_for_prior_commit();
thd->wait_for_commit_ptr= NULL;
/*
Record that we have finished, so other event groups will no
longer attempt to wait for us to commit.
We can race here with the next transactions, but that is fine, as
long as we check that we do not decrease last_committed_sub_id. If
this commit is done, then any prior commits will also have been
done and also no longer need waiting for.
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (entry->last_committed_sub_id < rgi->gtid_sub_id)
entry->last_committed_sub_id= rgi->gtid_sub_id;
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
rgi->commit_orderer.wakeup_subsequent_commits();
delete rgi;
}
}
rpt_handle_event(events, thd, rpt);
my_free(events);
events= next;
}
......@@ -365,19 +452,17 @@ rpl_parallel::find(uint32 domain_id)
(const uchar *)&domain_id, 0)))
{
/* Allocate a new, empty one. */
if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e), MYF(0))))
if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e),
MYF(MY_ZEROFILL))))
return NULL;
e->domain_id= domain_id;
e->last_server_id= 0;
e->last_seq_no= 0;
e->last_commit_id= 0;
e->active= false;
e->rpl_thread= NULL;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
return NULL;
}
mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry,
MY_MUTEX_INIT_FAST);
}
return e;
......@@ -385,11 +470,15 @@ rpl_parallel::find(uint32 domain_id)
bool
rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
THD *parent_thd)
{
rpl_parallel_entry *e;
rpl_parallel_thread *cur_thread;
rpl_parallel_thread::queued_event *qev;
struct rpl_group_info *rgi;
Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ;
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
......@@ -401,17 +490,17 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
return true;
}
qev->ev= ev;
qev->rgi= rli->group_info;
rli->group_info= NULL; /* Avoid conflict with groups applied in parallel */
qev->next= NULL;
if (ev->get_type_code() == GTID_EVENT)
if ((typ= ev->get_type_code()) == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
if (!(e= find(gtid_ev->domain_id)))
if (!(e= find(gtid_ev->domain_id)) ||
!(e->current_group_info= rgi= new rpl_group_info(rli)) ||
event_group_new_gtid(rgi, gtid_ev))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
return true;
}
......@@ -448,7 +537,7 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
e->last_commit_id= 0;
}
cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
e->rpl_thread->wait_for= NULL; /* ToDo */
rgi->wait_commit_sub_id= 0;
/* get_thread() returns with the LOCK_rpl_thread locked. */
}
else if ((gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
......@@ -464,8 +553,8 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
servers in the replication hierarchy.
*/
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
rpt->wait_for= cur_thread; /* ToDo */
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
rgi->wait_commit_sub_id= e->current_sub_id;
rgi->wait_commit_group_info= e->current_group_info;
e->rpl_thread= cur_thread= rpt;
/* get_thread() returns with the LOCK_rpl_thread locked. */
}
......@@ -476,18 +565,25 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
domain, and we have to wait for that to finish before we can start on
the next one. So just re-use the thread.
*/
rgi->wait_commit_sub_id= 0;
}
current= e;
e->current_sub_id= rgi->gtid_sub_id;
current= rgi->parallel_entry= e;
}
else if (!Log_event::is_group_event(typ) || !current)
{
/*
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
Same for events not preceeded by GTID (we should not see those normally,
but they might be from an old master).
*/
qev->rgi= serial_rgi;
rpt_handle_event(qev, parent_thd, NULL);
return false;
}
else
{
if (!current)
{
/* We have no domain_id yet, just run non-parallel. */
rpt_handle_event(qev, parent_thd, NULL);
return false;
}
cur_thread= current->rpl_thread;
if (cur_thread)
{
......@@ -503,9 +599,10 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
{
cur_thread= current->rpl_thread=
global_rpl_thread_pool.get_thread(current);
cur_thread->wait_for= NULL; /* ToDo */
}
}
qev->rgi= current->current_group_info;
/*
Queue the event for processing.
*/
......
......@@ -25,7 +25,6 @@ struct rpl_parallel_thread {
Log_event *ev;
struct rpl_group_info *rgi;
} *event_queue, *last_in_queue;
rpl_parallel_thread *wait_for; /* ToDo: change this ... */
};
......@@ -52,6 +51,14 @@ struct rpl_parallel_entry {
uint64 last_commit_id;
bool active;
rpl_parallel_thread *rpl_thread;
/*
The sub_id of the last transaction to commit within this domain_id.
Must be accessed under LOCK_parallel_entry protection.
*/
uint64 last_committed_sub_id;
mysql_mutex_t LOCK_parallel_entry;
uint64 current_sub_id;
struct rpl_group_info *current_group_info;
};
struct rpl_parallel {
HASH domain_hash;
......@@ -60,7 +67,7 @@ struct rpl_parallel {
rpl_parallel();
~rpl_parallel();
rpl_parallel_entry *find(uint32 domain_id);
bool do_event(Relay_log_info *rli, Log_event *ev, THD *thd);
bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev, THD *thd);
};
......
......@@ -113,8 +113,6 @@ Relay_log_info::~Relay_log_info()
mysql_cond_destroy(&sleep_cond);
relay_log.cleanup();
free_annotate_event();
if (group_info)
my_free(group_info);
DBUG_VOID_RETURN;
}
......@@ -1532,4 +1530,28 @@ rpl_load_gtid_slave_state(THD *thd)
DBUG_RETURN(err);
}
rpl_group_info::rpl_group_info(Relay_log_info *rli_)
: rli(rli_), gtid_sub_id(0), wait_commit_sub_id(0), wait_commit_group_info(0),
parallel_entry(0)
{
bzero(&current_gtid, sizeof(current_gtid));
}
int
event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
{
uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
if (!sub_id)
{
return 1;
}
rgi->gtid_sub_id= sub_id;
rgi->current_gtid.server_id= gev->server_id;
rgi->current_gtid.domain_id= gev->domain_id;
rgi->current_gtid.seq_no= gev->seq_no;
return 0;
}
#endif
......@@ -314,7 +314,7 @@ class Relay_log_info : public Slave_reporting_capability
char slave_patternload_file[FN_REFLEN];
size_t slave_patternload_file_size;
/* Various data related to the currently executing event group. */
/* ToDo: We need to remove this, always use the per-transaction one to work with parallel replication. */
struct rpl_group_info *group_info;
rpl_parallel parallel;
......@@ -610,6 +610,30 @@ struct rpl_group_info
*/
uint64 gtid_sub_id;
rpl_gtid current_gtid;
/*
This is used to keep transaction commit order.
We will signal this when we commit, and can register it to wait for the
commit_orderer of the previous commit to signal us.
*/
wait_for_commit commit_orderer;
/*
If non-zero, the sub_id of a prior event group whose commit we have to wait
for before committing ourselves. Then wait_commit_group_info points to the
event group to wait for.
Before using this, rpl_parallel_entry::last_committed_sub_id should be
compared against wait_commit_sub_id. Only if last_committed_sub_id is
smaller than wait_commit_sub_id must the wait be done (otherwise the
waited-for transaction is already committed, so we would otherwise wait
for the wrong commit).
*/
uint64 wait_commit_sub_id;
struct rpl_group_info *wait_commit_group_info;
struct rpl_parallel_entry *parallel_entry;
rpl_group_info(Relay_log_info *rli);
~rpl_group_info() { };
};
......@@ -620,5 +644,6 @@ int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern struct rpl_slave_state rpl_global_gtid_slave_state;
int rpl_load_gtid_slave_state(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
#endif /* RPL_RLI_H */
......@@ -3177,7 +3177,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
@retval 1 The event was not applied.
*/
static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
rpl_group_info *serial_rgi)
{
DBUG_ENTER("exec_relay_log_event");
......@@ -3201,6 +3202,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
if (ev)
{
int exec_res;
Log_event_type typ= ev->get_type_code();
/*
This tests if the position of the beginning of the current event
......@@ -3230,8 +3232,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
read hanging if the realy log does not have any more events.
*/
DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
if ((ev->get_type_code() == XID_EVENT) ||
((ev->get_type_code() == QUERY_EVENT) &&
if ((typ == XID_EVENT) ||
((typ == QUERY_EVENT) &&
strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0))
{
DBUG_ASSERT(thd->transaction.all.modified_non_trans_table);
......@@ -3244,11 +3246,25 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
}
if (opt_slave_parallel_threads > 0)
DBUG_RETURN(rli->parallel.do_event(rli, ev, thd));
DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, thd));
/*
For GTID, allocate a new sub_id for the given domain_id.
The sub_id must be allocated in increasing order of binlog order.
*/
if (typ == GTID_EVENT &&
event_group_new_gtid(serial_rgi, static_cast<Gtid_log_event *>(ev)))
{
sql_print_error("Error reading relay log event: %s",
"slave SQL thread aborted because of out-of-memory error");
mysql_mutex_unlock(&rli->data_lock);
delete ev;
DBUG_RETURN(1);
}
exec_res= apply_event_and_update_pos(ev, thd, rli->group_info, NULL);
switch (ev->get_type_code()) {
switch (typ) {
case FORMAT_DESCRIPTION_EVENT:
/*
Format_description_log_event should not be deleted because it
......@@ -4001,6 +4017,7 @@ pthread_handler_t handle_slave_sql(void *arg)
Master_info *mi= ((Master_info*)arg);
Relay_log_info* rli = &mi->rli;
const char *errmsg;
rpl_group_info serial_rgi(rli);
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
......@@ -4205,6 +4222,13 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
}
mysql_mutex_unlock(&rli->data_lock);
/*
ToDo: Get rid of this, all accesses to rpl_group_info must be made
per-worker-thread to work with parallel replication.
*/
if (opt_slave_parallel_threads <= 0)
rli->group_info= &serial_rgi;
/* Read queries from the IO/THREAD until this thread is killed */
while (!sql_slave_killed(thd,rli))
......@@ -4227,7 +4251,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
saved_skip= 0;
}
if (exec_relay_log_event(thd,rli))
if (exec_relay_log_event(thd, rli, &serial_rgi))
{
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped
......@@ -5736,7 +5760,6 @@ static Log_event* next_event(Relay_log_info* rli)
mysql_mutex_t *log_lock = rli->relay_log.get_log_lock();
const char* errmsg=0;
THD* thd = rli->sql_thd;
struct rpl_group_info *rgi;
DBUG_ENTER("next_event");
DBUG_ASSERT(thd != 0);
......@@ -5824,45 +5847,12 @@ static Log_event* next_event(Relay_log_info* rli)
opt_slave_sql_verify_checksum)))
{
if (!(rgi= rli->group_info))
{
if (!(rgi= rli->group_info= (struct rpl_group_info *)
my_malloc(sizeof(*rgi), MYF(0))))
{
errmsg = "slave SQL thread aborted because of out-of-memory error";
if (hot_log)
mysql_mutex_unlock(log_lock);
goto err;
}
bzero(rgi, sizeof(*rgi));
}
rgi->rli= rli;
DBUG_ASSERT(thd==rli->sql_thd);
/*
read it while we have a lock, to avoid a mutex lock in
inc_event_relay_log_pos()
*/
rli->future_event_relay_log_pos= my_b_tell(cur_log);
/*
For GTID, allocate a new sub_id for the given domain_id.
The sub_id must be allocated in increasing order of binlog order.
*/
if (ev->get_type_code() == GTID_EVENT)
{
Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
if (!sub_id)
{
errmsg = "slave SQL thread aborted because of out-of-memory error";
if (hot_log)
mysql_mutex_unlock(log_lock);
goto err;
}
rgi->gtid_sub_id= sub_id;
rgi->current_gtid.server_id= gev->server_id;
rgi->current_gtid.domain_id= gev->domain_id;
rgi->current_gtid.seq_no= gev->seq_no;
}
if (hot_log)
mysql_mutex_unlock(log_lock);
......
......@@ -5602,6 +5602,13 @@ wait_for_commit::wait_for_commit()
}
wait_for_commit::~wait_for_commit()
{
mysql_mutex_destroy(&LOCK_wait_commit);
mysql_cond_destroy(&COND_wait_commit);
}
void
wait_for_commit::wakeup()
{
......
......@@ -1659,6 +1659,7 @@ struct wait_for_commit
void unregister_wait_for_prior_commit2();
wait_for_commit();
~wait_for_commit();
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment