Commit 7e5dc4f0 authored by unknown's avatar unknown

MDEV-4506: Parallel replication. Intermediate commit.

Implement facility for the commit in one thread to wait for the commit of
another to complete first. The wait is done in a way that does not hinder
that a waiter and a waitee can group commit together with a single fsync()
in both binlog and InnoDB. The wait is done efficiently with respect to
locking.

The patch was originally made to support TaoBao parallel replication with
in-order commit; now it will be adapted to also be used for parallel
replication of group-committed transactions.

A waiter THD registers itself with a prior waitee THD. The waiter will then
complete its commit at the earliest in the same group commit of the waitee
(when using binlog). The wait can also be done explicitly by the waitee.
parent 535de717
...@@ -683,6 +683,41 @@ void *thd_get_ha_data(const MYSQL_THD thd, const struct handlerton *hton); ...@@ -683,6 +683,41 @@ void *thd_get_ha_data(const MYSQL_THD thd, const struct handlerton *hton);
*/ */
void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton, void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton,
const void *ha_data); const void *ha_data);
/**
Signal that the first part of handler commit is finished, and that the
committed transaction is now visible and has fixed commit ordering with
respect to other transactions. The commit need _not_ be durable yet, and
typically will not be when this call makes sense.
This call is optional, if the storage engine does not call it the upper
layer will after the handler commit() method is done. However, the storage
engine may choose to call it itself to increase the possibility for group
commit.
In-order parallel replication uses this to apply different transaction in
parallel, but delay the commits of later transactions until earlier
transactions have committed first, thus achieving increased performance on
multi-core systems while still preserving full transaction consistency.
The storage engine can call this from within the commit() method, typically
after the commit record has been written to the transaction log, but before
the log has been fsync()'ed. This will allow the next replicated transaction
to proceed to commit before the first one has done fsync() or similar. Thus,
it becomes possible for multiple sequential replicated transactions to share
a single fsync() inside the engine in group commit.
Note that this method should _not_ be called from within the commit_ordered()
method, or any other place in the storage engine. When commit_ordered() is
used (typically when binlog is enabled), the transaction coordinator takes
care of this and makes group commit in the storage engine possible without
any other action needed on the part of the storage engine. This function
thd_wakeup_subsequent_commits() is only needed when no transaction
coordinator is used, meaning a single storage engine and no binary log.
*/
void thd_wakeup_subsequent_commits(MYSQL_THD thd);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -236,6 +236,7 @@ void mysql_query_cache_invalidate4(void* thd, ...@@ -236,6 +236,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton); void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton, void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data); const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd);
struct mysql_event_general struct mysql_event_general
{ {
unsigned int event_subclass; unsigned int event_subclass;
......
...@@ -236,6 +236,7 @@ void mysql_query_cache_invalidate4(void* thd, ...@@ -236,6 +236,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton); void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton, void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data); const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd);
#include <mysql/plugin_auth_common.h> #include <mysql/plugin_auth_common.h>
typedef struct st_plugin_vio_info typedef struct st_plugin_vio_info
{ {
......
...@@ -189,6 +189,7 @@ void mysql_query_cache_invalidate4(void* thd, ...@@ -189,6 +189,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton); void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton, void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data); const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd);
enum enum_ftparser_mode enum enum_ftparser_mode
{ {
MYSQL_FTPARSER_SIMPLE_MODE= 0, MYSQL_FTPARSER_SIMPLE_MODE= 0,
......
...@@ -1455,6 +1455,8 @@ int ha_commit_one_phase(THD *thd, bool all) ...@@ -1455,6 +1455,8 @@ int ha_commit_one_phase(THD *thd, bool all)
*/ */
bool is_real_trans=all || thd->transaction.all.ha_list == 0; bool is_real_trans=all || thd->transaction.all.ha_list == 0;
DBUG_ENTER("ha_commit_one_phase"); DBUG_ENTER("ha_commit_one_phase");
if (is_real_trans)
thd->wait_for_prior_commit();
int res= commit_one_phase_2(thd, all, trans, is_real_trans); int res= commit_one_phase_2(thd, all, trans, is_real_trans);
DBUG_RETURN(res); DBUG_RETURN(res);
} }
...@@ -1494,7 +1496,10 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) ...@@ -1494,7 +1496,10 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
} }
/* Free resources and perform other cleanup even for 'empty' transactions. */ /* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans) if (is_real_trans)
{
thd->wakeup_subsequent_commits();
thd->transaction.cleanup(); thd->transaction.cleanup();
}
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -1569,7 +1574,10 @@ int ha_rollback_trans(THD *thd, bool all) ...@@ -1569,7 +1574,10 @@ int ha_rollback_trans(THD *thd, bool all)
} }
/* Always cleanup. Even if nht==0. There may be savepoints. */ /* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans) if (is_real_trans)
{
thd->wakeup_subsequent_commits();
thd->transaction.cleanup(); thd->transaction.cleanup();
}
if (all) if (all)
thd->transaction_rollback_request= FALSE; thd->transaction_rollback_request= FALSE;
......
...@@ -6542,44 +6542,199 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, ...@@ -6542,44 +6542,199 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
} }
bool bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
wait_for_commit *wfc)
{ {
group_commit_entry *orig_queue;
wait_for_commit *list, *cur, *last;
/* /*
To facilitate group commit for the binlog, we first queue up ourselves in To facilitate group commit for the binlog, we first queue up ourselves in
the group commit queue. Then the first thread to enter the queue waits for the group commit queue. Then the first thread to enter the queue waits for
the LOCK_log mutex, and commits for everyone in the queue once it gets the the LOCK_log mutex, and commits for everyone in the queue once it gets the
lock. Any other threads in the queue just wait for the first one to finish lock. Any other threads in the queue just wait for the first one to finish
the commit and wake them up. the commit and wake them up.
To support in-order parallel replication with group commit, after we add
some transaction to the queue, we check if there were other transactions
already prepared to commit but just waiting for the first one to commit.
If so, we add those to the queue as well, transitively for all waiters.
*/ */
entry->thd->clear_wakeup_ready(); entry->thd->clear_wakeup_ready();
mysql_mutex_lock(&LOCK_prepare_ordered); mysql_mutex_lock(&LOCK_prepare_ordered);
group_commit_entry *orig_queue= group_commit_queue; orig_queue= group_commit_queue;
entry->next= orig_queue;
group_commit_queue= entry; /*
Iteratively process everything added to the queue, looking for waiters,
and their waiters, and so on. If a waiter is ready to commit, we
immediately add it to the queue; if not we just wake it up.
This would be natural to do with recursion, but we want to avoid
potentially unbounded recursion blowing the C stack, so we use the list
approach instead.
*/
list= wfc;
cur= list;
last= list;
for (;;)
{
/* Add the entry to the group commit queue. */
entry->next= group_commit_queue;
group_commit_queue= entry;
if (entry->cache_mngr->using_xa)
{
DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
run_prepare_ordered(entry->thd, entry->all);
DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
}
if (!cur)
break; // Can happen if initial entry has no wait_for_commit
if (cur->subsequent_commits_list)
{
bool have_lock;
wait_for_commit *waiter;
mysql_mutex_lock(&cur->LOCK_wait_commit);
have_lock= true;
waiter= cur->subsequent_commits_list;
/* Check again, now safely under lock. */
if (waiter)
{
/* Grab the list of waiters and process it. */
cur->subsequent_commits_list= NULL;
do
{
wait_for_commit *next= waiter->next_subsequent_commit;
group_commit_entry *entry2=
(group_commit_entry *)waiter->opaque_pointer;
if (entry2)
{
/*
This is another transaction ready to be written to the binary
log. We can put it into the queue directly, without needing a
separate context switch to the other thread. We just set a flag
so that the other thread will know when it wakes up that it was
already processed.
So put it at the end of the list to be processed in a subsequent
iteration of the outer loop.
*/
entry2->queued_by_other= true;
last->next_subsequent_commit= waiter;
last= waiter;
/*
As a small optimisation, we do not actually need to set
waiter->next_subsequent_commit to NULL, as we can use the
pointer `last' to check for end-of-list.
*/
}
else
{
/*
Wake up the waiting transaction.
For this, we need to set the "wakeup running" flag and release
the waitee lock to avoid a deadlock, see comments on
THD::wakeup_subsequent_commits2() for details.
*/
if (have_lock)
{
cur->wakeup_subsequent_commits_running= true;
mysql_mutex_unlock(&cur->LOCK_wait_commit);
have_lock= false;
}
waiter->wakeup();
}
waiter= next;
} while (waiter);
}
if (have_lock)
mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
if (cur == last)
break;
cur= cur->next_subsequent_commit;
entry= (group_commit_entry *)cur->opaque_pointer;
DBUG_ASSERT(entry != NULL);
}
if (entry->cache_mngr->using_xa) /* Now we need to clear the wakeup_subsequent_commits_running flags. */
if (list)
{ {
DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered"); for (;;)
run_prepare_ordered(entry->thd, entry->all); {
DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); if (list->wakeup_subsequent_commits_running)
{
mysql_mutex_lock(&list->LOCK_wait_commit);
list->wakeup_subsequent_commits_running= false;
mysql_mutex_unlock(&list->LOCK_wait_commit);
}
if (list == last)
break;
list= list->next_subsequent_commit;
}
} }
mysql_mutex_unlock(&LOCK_prepare_ordered); mysql_mutex_unlock(&LOCK_prepare_ordered);
DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered"); DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered");
return orig_queue == NULL;
}
bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
wait_for_commit *wfc;
bool is_leader;
wfc= entry->thd->wait_for_commit_ptr;
entry->queued_by_other= false;
if (wfc && wfc->waiting_for_commit)
{
mysql_mutex_lock(&wfc->LOCK_wait_commit);
/* Do an extra check here, this time safely under lock. */
if (wfc->waiting_for_commit)
{
wfc->opaque_pointer= entry;
do
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
} while (wfc->waiting_for_commit);
wfc->opaque_pointer= NULL;
}
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
}
if (entry->queued_by_other)
is_leader= false;
else
is_leader= queue_for_group_commit(entry, wfc);
/* /*
The first in the queue handle group commit for all; the others just wait The first in the queue handles group commit for all; the others just wait
to be signalled when group commit is done. to be signalled when group commit is done.
*/ */
if (orig_queue != NULL) if (is_leader)
trx_group_commit_leader(entry);
else if (!entry->queued_by_other)
entry->thd->wait_for_wakeup_ready(); entry->thd->wait_for_wakeup_ready();
else else
trx_group_commit_leader(entry); {
/*
If we were queued by another prior commit, then we are woken up
only when the leader has already completed the commit for us.
So nothing to do here then.
*/
}
if (!opt_optimize_thread_scheduling) if (!opt_optimize_thread_scheduling)
{ {
/* For the leader, trx_group_commit_leader() already took the lock. */ /* For the leader, trx_group_commit_leader() already took the lock. */
if (orig_queue != NULL) if (!is_leader)
mysql_mutex_lock(&LOCK_commit_ordered); mysql_mutex_lock(&LOCK_commit_ordered);
DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered"); DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered");
...@@ -6598,7 +6753,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) ...@@ -6598,7 +6753,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
if (next) if (next)
{ {
next->thd->signal_wakeup_ready(); if (next->queued_by_other)
next->thd->wait_for_commit_ptr->wakeup();
else
next->thd->signal_wakeup_ready();
} }
else else
{ {
...@@ -6884,7 +7042,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -6884,7 +7042,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/ */
next= current->next; next= current->next;
if (current != leader) // Don't wake up ourself if (current != leader) // Don't wake up ourself
current->thd->signal_wakeup_ready(); {
if (current->queued_by_other)
current->thd->wait_for_commit_ptr->wakeup();
else
current->thd->signal_wakeup_ready();
}
current= next; current= next;
} }
DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered"); DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered");
...@@ -7514,6 +7677,8 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, ...@@ -7514,6 +7677,8 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
mysql_mutex_unlock(&LOCK_prepare_ordered); mysql_mutex_unlock(&LOCK_prepare_ordered);
} }
thd->wait_for_prior_commit();
cookie= 0; cookie= 0;
if (xid) if (xid)
cookie= log_one_transaction(xid); cookie= log_one_transaction(xid);
......
...@@ -45,6 +45,15 @@ class TC_LOG ...@@ -45,6 +45,15 @@ class TC_LOG
virtual int open(const char *opt_name)=0; virtual int open(const char *opt_name)=0;
virtual void close()=0; virtual void close()=0;
/*
Transaction coordinator 2-phase commit.
Must invoke the run_prepare_ordered and run_commit_ordered methods, as
described below for these methods.
In addition, must invoke THD::wait_for_prior_commit(), or equivalent
wait, to ensure that one commit waits for another if registered to do so.
*/
virtual int log_and_order(THD *thd, my_xid xid, bool all, virtual int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_prepare_ordered,
bool need_commit_ordered) = 0; bool need_commit_ordered) = 0;
...@@ -397,6 +406,7 @@ private: ...@@ -397,6 +406,7 @@ private:
class binlog_cache_mngr; class binlog_cache_mngr;
struct rpl_gtid; struct rpl_gtid;
class wait_for_commit;
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{ {
private: private:
...@@ -445,6 +455,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -445,6 +455,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
group commit, only used when opt_optimize_thread_scheduling is not set. group commit, only used when opt_optimize_thread_scheduling is not set.
*/ */
bool check_purge; bool check_purge;
/* Flag used to optimise around wait_for_prior_commit. */
bool queued_by_other;
ulong binlog_id; ulong binlog_id;
}; };
...@@ -526,6 +538,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -526,6 +538,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
void do_checkpoint_request(ulong binlog_id); void do_checkpoint_request(ulong binlog_id);
void purge(); void purge();
int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id); int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
bool queue_for_group_commit(group_commit_entry *entry, wait_for_commit *wfc);
bool write_transaction_to_binlog_events(group_commit_entry *entry); bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_leader(group_commit_entry *leader); void trx_group_commit_leader(group_commit_entry *leader);
bool is_xidlist_idle_nolock(); bool is_xidlist_idle_nolock();
......
...@@ -777,7 +777,7 @@ PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, ...@@ -777,7 +777,7 @@ PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
PSI_mutex_key key_LOCK_stats, PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_global_index_stats,
key_LOCK_wakeup_ready; key_LOCK_wakeup_ready, key_LOCK_wait_commit;
PSI_mutex_key key_LOCK_rpl_gtid_state; PSI_mutex_key key_LOCK_rpl_gtid_state;
...@@ -825,6 +825,7 @@ static PSI_mutex_info all_server_mutexes[]= ...@@ -825,6 +825,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0}, { &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
{ &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL}, { &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL},
{ &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0}, { &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL}, { &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL}, { &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
...@@ -888,7 +889,8 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, ...@@ -888,7 +889,8 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond, key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache, key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
key_BINLOG_COND_queue_busy; key_BINLOG_COND_queue_busy;
PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready; PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool; PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool;
...@@ -912,6 +914,7 @@ static PSI_cond_info all_server_conds[]= ...@@ -912,6 +914,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0}, { &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0},
{ &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0}, { &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
{ &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0}, { &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0},
{ &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0},
{ &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0}, { &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0},
{ &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL}, { &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL},
{ &key_COND_rpl_status, "COND_rpl_status", PSI_FLAG_GLOBAL}, { &key_COND_rpl_status, "COND_rpl_status", PSI_FLAG_GLOBAL},
......
...@@ -253,7 +253,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, ...@@ -253,7 +253,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
extern PSI_mutex_key key_LOCK_stats, extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready; key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
extern PSI_mutex_key key_LOCK_rpl_gtid_state; extern PSI_mutex_key key_LOCK_rpl_gtid_state;
...@@ -279,7 +279,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, ...@@ -279,7 +279,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_relay_log_info_sleep_cond, key_relay_log_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond, key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache; key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache;
extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready; extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy; extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool; extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool;
......
...@@ -605,6 +605,17 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton, ...@@ -605,6 +605,17 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton,
} }
/**
Allow storage engine to wakeup commits waiting in THD::wait_for_prior_commit.
@see thd_wakeup_subsequent_commits() definition in plugin.h
*/
extern "C"
void thd_wakeup_subsequent_commits(THD *thd)
{
thd->wakeup_subsequent_commits();
}
extern "C" extern "C"
long long thd_test_options(const THD *thd, long long test_options) long long thd_test_options(const THD *thd, long long test_options)
{ {
...@@ -788,6 +799,7 @@ THD::THD() ...@@ -788,6 +799,7 @@ THD::THD()
#if defined(ENABLED_DEBUG_SYNC) #if defined(ENABLED_DEBUG_SYNC)
debug_sync_control(0), debug_sync_control(0),
#endif /* defined(ENABLED_DEBUG_SYNC) */ #endif /* defined(ENABLED_DEBUG_SYNC) */
wait_for_commit_ptr(0),
main_warning_info(0, false, false) main_warning_info(0, false, false)
{ {
ulong tmp; ulong tmp;
...@@ -5580,6 +5592,202 @@ THD::signal_wakeup_ready() ...@@ -5580,6 +5592,202 @@ THD::signal_wakeup_ready()
} }
wait_for_commit::wait_for_commit()
: subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
opaque_pointer(0),
waiting_for_commit(false), wakeup_subsequent_commits_running(false)
{
mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
}
void
wait_for_commit::wakeup()
{
/*
We signal each waiter on their own condition and mutex (rather than using
pthread_cond_broadcast() or something like that).
Otherwise we would need to somehow ensure that they were done
waking up before we could allow this THD to be destroyed, which would
be annoying and unnecessary.
*/
mysql_mutex_lock(&LOCK_wait_commit);
waiting_for_commit= false;
mysql_cond_signal(&COND_wait_commit);
mysql_mutex_unlock(&LOCK_wait_commit);
}
/*
Register that the next commit of this THD should wait to complete until
commit in another THD (the waitee) has completed.
The wait may occur explicitly, with the waiter sitting in
wait_for_prior_commit() until the waitee calls wakeup_subsequent_commits().
Alternatively, the TC (eg. binlog) may do the commits of both waitee and
waiter at once during group commit, resolving both of them in the right
order.
Only one waitee can be registered for a waiter; it must be removed by
wait_for_prior_commit() or unregister_wait_for_prior_commit() before a new
one is registered. But it is ok for several waiters to register a wait for
the same waitee. It is also permissible for one THD to be both a waiter and
a waitee at the same time.
*/
void
wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
{
waiting_for_commit= true;
DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
this->waitee= waitee;
mysql_mutex_lock(&waitee->LOCK_wait_commit);
/*
If waitee is in the middle of wakeup, then there is nothing to wait for,
so we need not register. This is necessary to avoid a race in unregister,
see comments on wakeup_subsequent_commits2() for details.
*/
if (waitee->wakeup_subsequent_commits_running)
waiting_for_commit= false;
else
{
this->next_subsequent_commit= waitee->subsequent_commits_list;
waitee->subsequent_commits_list= this;
}
mysql_mutex_unlock(&waitee->LOCK_wait_commit);
}
/*
Wait for commit of another transaction to complete, as already registered
with register_wait_for_prior_commit(). If the commit already completed,
returns immediately.
*/
void
wait_for_commit::wait_for_prior_commit2()
{
mysql_mutex_lock(&LOCK_wait_commit);
while (waiting_for_commit)
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
mysql_mutex_unlock(&LOCK_wait_commit);
waitee= NULL;
}
/*
Wakeup anyone waiting for us to have committed.
Note about locking:
We have a potential race or deadlock between wakeup_subsequent_commits() in
the waitee and unregister_wait_for_prior_commit() in the waiter.
Both waiter and waitee needs to take their own lock before it is safe to take
a lock on the other party - else the other party might disappear and invalid
memory data could be accessed. But if we take the two locks in different
order, we may end up in a deadlock.
The waiter needs to lock the waitee to delete itself from the list in
unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not
hold its own lock while locking waiters, lest we deadlock.
So we need to prevent unregister_wait_for_prior_commit() running while wakeup
is in progress - otherwise the unregister could complete before the wakeup,
leading to incorrect spurious wakeup or accessing invalid memory.
However, if we are in the middle of running wakeup_subsequent_commits(), then
there is no need for unregister_wait_for_prior_commit() in the first place -
the waiter can just do a normal wait_for_prior_commit(), as it will be
immediately woken up.
So the solution to the potential race/deadlock is to set a flag in the waitee
that wakeup_subsequent_commits() is in progress. When this flag is set,
unregister_wait_for_prior_commit() becomes just wait_for_prior_commit().
Then also register_wait_for_prior_commit() needs to check if
wakeup_subsequent_commits() is running, and skip the registration if
so. This is needed in case a new waiter manages to register itself and
immediately try to unregister while wakeup_subsequent_commits() is
running. Else the new waiter would also wait rather than unregister, but it
would not be woken up until next wakeup, which could be potentially much
later than necessary.
*/
void
wait_for_commit::wakeup_subsequent_commits2()
{
wait_for_commit *waiter;
mysql_mutex_lock(&LOCK_wait_commit);
wakeup_subsequent_commits_running= true;
waiter= subsequent_commits_list;
subsequent_commits_list= NULL;
mysql_mutex_unlock(&LOCK_wait_commit);
while (waiter)
{
/*
Important: we must grab the next pointer before waking up the waiter;
once the wakeup is done, the field could be invalidated at any time.
*/
wait_for_commit *next= waiter->next_subsequent_commit;
waiter->wakeup();
waiter= next;
}
mysql_mutex_lock(&LOCK_wait_commit);
wakeup_subsequent_commits_running= false;
mysql_mutex_unlock(&LOCK_wait_commit);
}
/* Cancel a previously registered wait for another THD to commit before us. */
void
wait_for_commit::unregister_wait_for_prior_commit2()
{
mysql_mutex_lock(&LOCK_wait_commit);
if (waiting_for_commit)
{
wait_for_commit *loc_waitee= this->waitee;
wait_for_commit **next_ptr_ptr, *cur;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
/*
When a wakeup is running, we cannot safely remove ourselves from the
list without corrupting it. Instead we can just wait, as wakeup is
already in progress and will thus be immediate.
See comments on wakeup_subsequent_commits2() for more details.
*/
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
while (waiting_for_commit)
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
}
else
{
/* Remove ourselves from the list in the waitee. */
next_ptr_ptr= &loc_waitee->subsequent_commits_list;
while ((cur= *next_ptr_ptr) != NULL)
{
if (cur == this)
{
*next_ptr_ptr= this->next_subsequent_commit;
break;
}
next_ptr_ptr= &cur->next_subsequent_commit;
}
waiting_for_commit= false;
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
}
}
mysql_mutex_unlock(&LOCK_wait_commit);
this->waitee= NULL;
}
bool Discrete_intervals_list::append(ulonglong start, ulonglong val, bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
ulonglong incr) ulonglong incr)
{ {
......
...@@ -1553,6 +1553,115 @@ private: ...@@ -1553,6 +1553,115 @@ private:
}; };
/*
Class to facilitate the commit of one transactions waiting for the commit of
another transaction to complete first.
This is used during (parallel) replication, to allow different transactions
to be applied in parallel, but still commit in order.
The transaction that wants to wait for a prior commit must first register
to wait with register_wait_for_prior_commit(waitee). Such registration
must be done holding the waitee->LOCK_wait_commit, to prevent the other
THD from disappearing during the registration.
Then during commit, if a THD is registered to wait, it will call
wait_for_prior_commit() as part of ha_commit_trans(). If no wait is
registered, or if the waitee for has already completed commit, then
wait_for_prior_commit() returns immediately.
And when a THD that may be waited for has completed commit (more precisely
commit_ordered()), then it must call wakeup_subsequent_commits() to wake
up any waiters. Note that this must be done at a point that is guaranteed
to be later than any waiters registering themselves. It is safe to call
wakeup_subsequent_commits() multiple times, as waiters are removed from
registration as part of the wakeup.
The reason for separate register and wait calls is that this allows to
register the wait early, at a point where the waited-for THD is known to
exist. And then the actual wait can be done much later, where the
waited-for THD may have been long gone. By registering early, the waitee
can signal before disappearing.
*/
struct wait_for_commit
{
/*
The LOCK_wait_commit protects the fields subsequent_commits_list and
wakeup_subsequent_commits_running (for a waitee), and the flag
waiting_for_commit and associated COND_wait_commit (for a waiter).
*/
mysql_mutex_t LOCK_wait_commit;
mysql_cond_t COND_wait_commit;
/* List of threads that did register_wait_for_prior_commit() on us. */
wait_for_commit *subsequent_commits_list;
/* Link field for entries in subsequent_commits_list. */
wait_for_commit *next_subsequent_commit;
/* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */
wait_for_commit *waitee;
/*
Generic pointer for use by the transaction coordinator to optimise the
waiting for improved group commit.
Currently used by binlog TC to signal that a waiter is ready to commit, so
that the waitee can grab it and group commit it directly. It is free to be
used by another transaction coordinator for similar purposes.
*/
void *opaque_pointer;
/*
The waiting_for_commit flag is cleared when a waiter has been woken
up. The COND_wait_commit condition is signalled when this has been
cleared.
*/
bool waiting_for_commit;
/*
Flag set when wakeup_subsequent_commits_running() is active, see commonts
on that function for details.
*/
bool wakeup_subsequent_commits_running;
void register_wait_for_prior_commit(wait_for_commit *waitee);
void wait_for_prior_commit()
{
/*
Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled.
*/
if (waiting_for_commit)
wait_for_prior_commit2();
}
void wakeup_subsequent_commits()
{
/*
Do the check inline, so only the wakeup case takes the cost of a function
call for every commmit.
Note that the check is done without locking. It is the responsibility of
the user of the wakeup facility to ensure that no waiters can register
themselves after the last call to wakeup_subsequent_commits().
This avoids having to take another lock for every commit, which would be
pointless anyway - even if we check under lock, there is nothing to
prevent a waiter from arriving just after releasing the lock.
*/
if (subsequent_commits_list)
wakeup_subsequent_commits2();
}
void unregister_wait_for_prior_commit()
{
if (waiting_for_commit)
unregister_wait_for_prior_commit2();
}
void wakeup();
void wait_for_prior_commit2();
void wakeup_subsequent_commits2();
void unregister_wait_for_prior_commit2();
wait_for_commit();
};
extern "C" void my_message_sql(uint error, const char *str, myf MyFlags); extern "C" void my_message_sql(uint error, const char *str, myf MyFlags);
class THD; class THD;
...@@ -3194,6 +3303,19 @@ public: ...@@ -3194,6 +3303,19 @@ public:
void wait_for_wakeup_ready(); void wait_for_wakeup_ready();
/* Wake this thread up from wait_for_wakeup_ready(). */ /* Wake this thread up from wait_for_wakeup_ready(). */
void signal_wakeup_ready(); void signal_wakeup_ready();
wait_for_commit *wait_for_commit_ptr;
void wait_for_prior_commit()
{
if (wait_for_commit_ptr)
wait_for_commit_ptr->wait_for_prior_commit();
}
void wakeup_subsequent_commits()
{
if (wait_for_commit_ptr)
wait_for_commit_ptr->wakeup_subsequent_commits();
}
private: private:
/** The current internal error handler for this thread, or NULL. */ /** The current internal error handler for this thread, or NULL. */
......
...@@ -2924,6 +2924,11 @@ innobase_commit( ...@@ -2924,6 +2924,11 @@ innobase_commit(
/* We were instructed to commit the whole transaction, or /* We were instructed to commit the whole transaction, or
this is an SQL statement end and autocommit is on */ this is an SQL statement end and autocommit is on */
/* At this point commit order is fixed and transaction is
visible to others. So we can wakeup other commits waiting for
this one, to allow then to group commit with us. */
thd_wakeup_subsequent_commits(thd);
/* We did the first part already in innobase_commit_ordered(), /* We did the first part already in innobase_commit_ordered(),
Now finish by doing a write + flush of logs. */ Now finish by doing a write + flush of logs. */
trx_commit_complete_for_mysql(trx); trx_commit_complete_for_mysql(trx);
......
...@@ -3585,6 +3585,11 @@ innobase_commit( ...@@ -3585,6 +3585,11 @@ innobase_commit(
/* We were instructed to commit the whole transaction, or /* We were instructed to commit the whole transaction, or
this is an SQL statement end and autocommit is on */ this is an SQL statement end and autocommit is on */
/* At this point commit order is fixed and transaction is
visible to others. So we can wakeup other commits waiting for
this one, to allow then to group commit with us. */
thd_wakeup_subsequent_commits(thd);
/* We did the first part already in innobase_commit_ordered(), /* We did the first part already in innobase_commit_ordered(),
Now finish by doing a write + flush of logs. */ Now finish by doing a write + flush of logs. */
trx_commit_complete_for_mysql(trx); trx_commit_complete_for_mysql(trx);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment