Commit 5614ebe7 authored by unknown's avatar unknown

MWL#116: after-architecture-review code refactoring and cleanup.

Remove the extra class hierarchy with classes TC_LOG_queued, TC_LOG_unordered,
and TC_LOG_group_commit, folding the code into the TC_LOG_MMAP and
TC_LOG_BINLOG classes. In particular TC_LOG_BINLOG is greatly simplified by
this, unifying the code path for transactional and non-transactional
commit.

Remove unnecessary locking of LOCK_log in MYSQL_BIN_LOG::write() (backport
of same fix from mysql-5.5).
parent b91ad17c
...@@ -3,11 +3,11 @@ SELECT variable_value INTO @commits FROM information_schema.global_status ...@@ -3,11 +3,11 @@ SELECT variable_value INTO @commits FROM information_schema.global_status
WHERE variable_name = 'binlog_commits'; WHERE variable_name = 'binlog_commits';
SELECT variable_value INTO @group_commits FROM information_schema.global_status SELECT variable_value INTO @group_commits FROM information_schema.global_status
WHERE variable_name = 'binlog_group_commits'; WHERE variable_name = 'binlog_group_commits';
SET DEBUG_SYNC= "commit_after_group_log_xid SIGNAL group1_running WAIT_FOR group2_queued"; SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group1_running WAIT_FOR group2_queued";
INSERT INTO t1 VALUES ("con1"); INSERT INTO t1 VALUES ("con1");
set DEBUG_SYNC= "now WAIT_FOR group1_running"; set DEBUG_SYNC= "now WAIT_FOR group1_running";
SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL group2_con2"; SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL group2_con2";
SET DEBUG_SYNC= "commit_after_release_LOCK_group_commit WAIT_FOR group3_committed"; SET DEBUG_SYNC= "commit_after_release_LOCK_log WAIT_FOR group3_committed";
SET DEBUG_SYNC= "commit_after_group_run_commit_ordered SIGNAL group2_visible WAIT_FOR group2_checked"; SET DEBUG_SYNC= "commit_after_group_run_commit_ordered SIGNAL group2_visible WAIT_FOR group2_checked";
INSERT INTO t1 VALUES ("con2"); INSERT INTO t1 VALUES ("con2");
SET DEBUG_SYNC= "now WAIT_FOR group2_con2"; SET DEBUG_SYNC= "now WAIT_FOR group2_con2";
...@@ -25,7 +25,7 @@ SELECT * FROM t1 ORDER BY a; ...@@ -25,7 +25,7 @@ SELECT * FROM t1 ORDER BY a;
a a
con1 con1
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5"; SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5";
SET DEBUG_SYNC= "commit_after_get_LOCK_group_commit SIGNAL con5_leader WAIT_FOR con6_queued"; SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con5_leader WAIT_FOR con6_queued";
INSERT INTO t1 VALUES ("con5"); INSERT INTO t1 VALUES ("con5");
SET DEBUG_SYNC= "now WAIT_FOR con5_leader"; SET DEBUG_SYNC= "now WAIT_FOR con5_leader";
SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL con6_queued"; SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL con6_queued";
......
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb;
INSERT INTO t1 VALUES (0); INSERT INTO t1 VALUES (0);
SET DEBUG_SYNC= "commit_after_get_LOCK_group_commit SIGNAL con1_waiting WAIT_FOR con3_queued"; SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con1_waiting WAIT_FOR con3_queued";
SET DEBUG_SYNC= "commit_loop_entry_commit_ordered SIGNAL con1_loop WAIT_FOR con1_loop_cont EXECUTE 3"; SET DEBUG_SYNC= "commit_loop_entry_commit_ordered SIGNAL con1_loop WAIT_FOR con1_loop_cont EXECUTE 3";
INSERT INTO t1 VALUES (1); INSERT INTO t1 VALUES (1);
SET DEBUG_SYNC= "now WAIT_FOR con1_waiting"; SET DEBUG_SYNC= "now WAIT_FOR con1_waiting";
......
...@@ -27,7 +27,7 @@ connect(con6,localhost,root,,); ...@@ -27,7 +27,7 @@ connect(con6,localhost,root,,);
# group2 to queue up before finishing. # group2 to queue up before finishing.
connection con1; connection con1;
SET DEBUG_SYNC= "commit_after_group_log_xid SIGNAL group1_running WAIT_FOR group2_queued"; SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group1_running WAIT_FOR group2_queued";
send INSERT INTO t1 VALUES ("con1"); send INSERT INTO t1 VALUES ("con1");
# Make group2 (with three threads) queue up. # Make group2 (with three threads) queue up.
...@@ -37,7 +37,7 @@ send INSERT INTO t1 VALUES ("con1"); ...@@ -37,7 +37,7 @@ send INSERT INTO t1 VALUES ("con1");
connection con2; connection con2;
set DEBUG_SYNC= "now WAIT_FOR group1_running"; set DEBUG_SYNC= "now WAIT_FOR group1_running";
SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL group2_con2"; SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL group2_con2";
SET DEBUG_SYNC= "commit_after_release_LOCK_group_commit WAIT_FOR group3_committed"; SET DEBUG_SYNC= "commit_after_release_LOCK_log WAIT_FOR group3_committed";
SET DEBUG_SYNC= "commit_after_group_run_commit_ordered SIGNAL group2_visible WAIT_FOR group2_checked"; SET DEBUG_SYNC= "commit_after_group_run_commit_ordered SIGNAL group2_visible WAIT_FOR group2_checked";
send INSERT INTO t1 VALUES ("con2"); send INSERT INTO t1 VALUES ("con2");
connection con3; connection con3;
...@@ -69,7 +69,7 @@ SELECT * FROM t1 ORDER BY a; ...@@ -69,7 +69,7 @@ SELECT * FROM t1 ORDER BY a;
connection con5; connection con5;
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5"; SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5";
SET DEBUG_SYNC= "commit_after_get_LOCK_group_commit SIGNAL con5_leader WAIT_FOR con6_queued"; SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con5_leader WAIT_FOR con6_queued";
send INSERT INTO t1 VALUES ("con5"); send INSERT INTO t1 VALUES ("con5");
connection con6; connection con6;
......
...@@ -23,7 +23,7 @@ connect(con3,localhost,root,,); ...@@ -23,7 +23,7 @@ connect(con3,localhost,root,,);
# Queue up three commits for group commit. # Queue up three commits for group commit.
connection con1; connection con1;
SET DEBUG_SYNC= "commit_after_get_LOCK_group_commit SIGNAL con1_waiting WAIT_FOR con3_queued"; SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con1_waiting WAIT_FOR con3_queued";
SET DEBUG_SYNC= "commit_loop_entry_commit_ordered SIGNAL con1_loop WAIT_FOR con1_loop_cont EXECUTE 3"; SET DEBUG_SYNC= "commit_loop_entry_commit_ordered SIGNAL con1_loop WAIT_FOR con1_loop_cont EXECUTE 3";
send INSERT INTO t1 VALUES (1); send INSERT INTO t1 VALUES (1);
......
...@@ -155,19 +155,14 @@ class binlog_trx_data { ...@@ -155,19 +155,14 @@ class binlog_trx_data {
public: public:
binlog_trx_data() binlog_trx_data()
: at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0),
before_stmt_pos(MY_OFF_T_UNDEF), using_xa(0), commit_bin_log_file_pos(0) before_stmt_pos(MY_OFF_T_UNDEF), commit_bin_log_file_pos(0), using_xa(0)
{ {
trans_log.end_of_file= max_binlog_cache_size; trans_log.end_of_file= max_binlog_cache_size;
(void) my_pthread_mutex_init(&LOCK_binlog_participant, MY_MUTEX_INIT_SLOW,
"LOCK_binlog_participant", MYF(0));
(void) pthread_cond_init(&COND_binlog_participant, 0);
} }
~binlog_trx_data() ~binlog_trx_data()
{ {
DBUG_ASSERT(pending() == NULL); DBUG_ASSERT(pending() == NULL);
(void) pthread_cond_destroy(&COND_binlog_participant);
(void) pthread_mutex_destroy(&LOCK_binlog_participant);
close_cached_file(&trans_log); close_cached_file(&trans_log);
} }
...@@ -265,46 +260,17 @@ public: ...@@ -265,46 +260,17 @@ public:
Binlog position before the start of the current statement. Binlog position before the start of the current statement.
*/ */
my_off_t before_stmt_pos; my_off_t before_stmt_pos;
/* 0 or error when writing to binlog; set during group commit. */
int error;
/* If error != 0, value of errno (for my_error() reporting). */
int commit_errno;
/* Link for queueing transactions up for group commit to binlog. */
binlog_trx_data *next;
/* /*
Flag set true when group commit for this transaction is finished; used Binlog position after current commit, available to storage engines during
with pthread_cond_wait() to wait until commit is done. commit_ordered() and commit().
This flag is protected by LOCK_binlog_participant.
*/
bool done;
/*
Flag set if this transaction is the group commit leader that will handle
the actual writing to the binlog.
This flag is protected by LOCK_binlog_participant.
*/ */
bool group_commit_leader; ulonglong commit_bin_log_file_pos;
/* /*
Flag set true if this transaction is committed with log_xid() as part of Flag set true if this transaction is committed with log_xid() as part of
XA, false if not. XA, false if not.
*/ */
bool using_xa; bool using_xa;
/*
Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be
written during group commit. The incident_event is only valid if
has_incident() is true.
*/
Log_event *begin_event;
Log_event *end_event;
Log_event *incident_event;
/* Mutex and condition for wakeup after group commit. */
pthread_mutex_t LOCK_binlog_participant;
pthread_cond_t COND_binlog_participant;
/*
Binlog position after current commit, available to storage engines during
commit() and commit_ordered().
*/
ulonglong commit_bin_log_file_pos;
}; };
handlerton *binlog_hton; handlerton *binlog_hton;
...@@ -1441,30 +1407,6 @@ static int binlog_close_connection(handlerton *hton, THD *thd) ...@@ -1441,30 +1407,6 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
return 0; return 0;
} }
/* Helper functions for binlog_flush_trx_cache(). */
static int
binlog_flush_trx_cache_prepare(THD *thd)
{
if (thd->binlog_flush_pending_rows_event(TRUE))
return 1;
return 0;
}
static void
binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data)
{
IO_CACHE *trans_log= &trx_data->trans_log;
trx_data->reset();
statistic_increment(binlog_cache_use, &LOCK_status);
if (trans_log->disk_writes != 0)
{
statistic_increment(binlog_cache_disk_use, &LOCK_status);
trans_log->disk_writes= 0;
}
}
/* /*
End a transaction, writing events to the binary log. End a transaction, writing events to the binary log.
...@@ -1487,14 +1429,15 @@ binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data) ...@@ -1487,14 +1429,15 @@ binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data)
*/ */
static int static int
binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data,
Log_event *end_ev) Log_event *end_ev, bool all)
{ {
DBUG_ENTER("binlog_flush_trx_cache"); DBUG_ENTER("binlog_flush_trx_cache");
IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_PRINT("info", ("thd->options={ %s%s}", DBUG_PRINT("info", ("thd->options={ %s%s}",
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->options, OPTION_BEGIN))); FLAGSTR(thd->options, OPTION_BEGIN)));
if (binlog_flush_trx_cache_prepare(thd)) if (thd->binlog_flush_pending_rows_event(TRUE))
DBUG_RETURN(1); DBUG_RETURN(1);
/* /*
...@@ -1507,9 +1450,17 @@ binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, ...@@ -1507,9 +1450,17 @@ binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data,
were, we would have to ensure that we're not ending a statement were, we would have to ensure that we're not ending a statement
inside a stored function. inside a stored function.
*/ */
int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev); int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data,
end_ev, all);
binlog_flush_trx_cache_finish(thd, trx_data); trx_data->reset();
statistic_increment(binlog_cache_use, &LOCK_status);
if (trans_log->disk_writes != 0)
{
statistic_increment(binlog_cache_disk_use, &LOCK_status);
trans_log->disk_writes= 0;
}
DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL);
DBUG_RETURN(error); DBUG_RETURN(error);
...@@ -1578,51 +1529,11 @@ static LEX_STRING const write_error_msg= ...@@ -1578,51 +1529,11 @@ static LEX_STRING const write_error_msg=
static int binlog_prepare(handlerton *hton, THD *thd, bool all) static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{ {
/* /*
If this prepare is for a single statement in the middle of a transactions, do nothing.
not the actual transaction commit, then we do nothing. The real work is just pretend we can do 2pc, so that MySQL won't
only done later, in the prepare for making persistent changes. switch to 1pc.
real work will be done in MYSQL_BIN_LOG::log_and_order()
*/ */
if (!all && (thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
return 0;
binlog_trx_data *trx_data=
(binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
trx_data->using_xa= TRUE;
if (binlog_flush_trx_cache_prepare(thd))
return 1;
my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
if (!xid)
{
/* Skip logging this transaction, marked by setting end_event to NULL. */
trx_data->end_event= NULL;
return 0;
}
/*
Allocate the extra events that will be logged to the binlog in binlog group
commit. Use placement new to allocate them on the THD memroot, as they need
to remain live until log_xid() returns.
*/
size_t needed_size= sizeof(Query_log_event) + sizeof(Xid_log_event);
if (trx_data->has_incident())
needed_size+= sizeof(Incident_log_event);
uchar *mem= (uchar *)thd->alloc(needed_size);
if (!mem)
return 1;
trx_data->begin_event= new ((void *)mem)
Query_log_event(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
mem+= sizeof(Query_log_event);
trx_data->end_event= new ((void *)mem) Xid_log_event(thd, xid);
if (trx_data->has_incident())
trx_data->incident_event= new ((void *)(mem + sizeof(Xid_log_event)))
Incident_log_event(thd, INCIDENT_LOST_EVENTS, write_error_msg);
return 0; return 0;
} }
...@@ -1646,11 +1557,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) ...@@ -1646,11 +1557,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
binlog_trx_data *const trx_data= binlog_trx_data *const trx_data=
(binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
if (trx_data->using_xa) if (trx_data->empty())
{ {
// we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid()
binlog_flush_trx_cache_finish(thd, trx_data); trx_data->reset();
DBUG_RETURN(error); DBUG_RETURN(0);
} }
/* /*
...@@ -1673,7 +1584,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) ...@@ -1673,7 +1584,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
!stmt_has_updated_trans_table(thd) && stmt_has_updated_non_trans_table(thd))) !stmt_has_updated_trans_table(thd) && stmt_has_updated_non_trans_table(thd)))
{ {
Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
error= binlog_flush_trx_cache(thd, trx_data, &end_ev); error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all);
} }
trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0;
...@@ -1757,7 +1668,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) ...@@ -1757,7 +1668,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
thd->current_stmt_binlog_row_based)) thd->current_stmt_binlog_row_based))
{ {
Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0);
error= binlog_flush_trx_cache(thd, trx_data, &end_ev); error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all);
} }
/* /*
Otherwise, we simply truncate the cache as there is no change on Otherwise, we simply truncate the cache as there is no change on
...@@ -2599,6 +2510,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name, ...@@ -2599,6 +2510,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG() MYSQL_BIN_LOG::MYSQL_BIN_LOG()
:bytes_written(0), prepared_xids(0), file_id(1), open_count(1), :bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
need_start_event(TRUE), need_start_event(TRUE),
group_commit_queue(0), num_commits(0), num_group_commits(0),
is_relay_log(0), is_relay_log(0),
description_event_for_exec(0), description_event_for_queue(0) description_event_for_exec(0), description_event_for_queue(0)
{ {
...@@ -2626,7 +2538,6 @@ void MYSQL_BIN_LOG::cleanup() ...@@ -2626,7 +2538,6 @@ void MYSQL_BIN_LOG::cleanup()
delete description_event_for_exec; delete description_event_for_exec;
(void) pthread_mutex_destroy(&LOCK_log); (void) pthread_mutex_destroy(&LOCK_log);
(void) pthread_mutex_destroy(&LOCK_index); (void) pthread_mutex_destroy(&LOCK_index);
(void) pthread_mutex_destroy(&LOCK_queue);
(void) pthread_cond_destroy(&update_cond); (void) pthread_cond_destroy(&update_cond);
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -2655,8 +2566,6 @@ void MYSQL_BIN_LOG::init_pthread_objects() ...@@ -2655,8 +2566,6 @@ void MYSQL_BIN_LOG::init_pthread_objects()
*/ */
(void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index", (void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index",
MYF_NO_DEADLOCK_DETECTION); MYF_NO_DEADLOCK_DETECTION);
(void) my_pthread_mutex_init(&LOCK_queue, MY_MUTEX_INIT_FAST, "LOCK_queue",
MYF(0));
(void) pthread_cond_init(&update_cond, 0); (void) pthread_cond_init(&update_cond, 0);
} }
...@@ -4461,11 +4370,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -4461,11 +4370,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
} }
/* /*
Flush the pending rows event to the transaction cache or to the
log file. Since this function potentially aquire the LOCK_log
mutex, we do this before aquiring the LOCK_log mutex in this
function.
We only end the statement if we are in a top-level statement. If We only end the statement if we are in a top-level statement. If
we are inside a stored function, we do not end the statement since we are inside a stored function, we do not end the statement since
this will close all tables on the slave. this will close all tables on the slave.
...@@ -4475,8 +4379,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -4475,8 +4379,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (thd->binlog_flush_pending_rows_event(end_stmt)) if (thd->binlog_flush_pending_rows_event(end_stmt))
DBUG_RETURN(error); DBUG_RETURN(error);
pthread_mutex_lock(&LOCK_log);
/* /*
In most cases this is only called if 'is_open()' is true; in fact this is In most cases this is only called if 'is_open()' is true; in fact this is
mostly called if is_open() *was* true a few instructions before, but it mostly called if is_open() *was* true a few instructions before, but it
...@@ -4497,7 +4399,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -4497,7 +4399,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
thd->lex->sql_command != SQLCOM_SAVEPOINT && thd->lex->sql_command != SQLCOM_SAVEPOINT &&
!binlog_filter->db_ok(local_db))) !binlog_filter->db_ok(local_db)))
{ {
VOID(pthread_mutex_unlock(&LOCK_log));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
...@@ -4539,15 +4440,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -4539,15 +4440,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
thd->binlog_start_trans_and_stmt(); thd->binlog_start_trans_and_stmt();
file= trans_log; file= trans_log;
} }
/*
TODO as Mats suggested, for all the cases above where we write to
trans_log, it sounds unnecessary to lock LOCK_log. We should rather
test first if we want to write to trans_log, and if not, lock
LOCK_log.
*/
} }
#endif /* USING_TRANSACTIONS */ #endif /* USING_TRANSACTIONS */
DBUG_PRINT("info",("event type: %d",event_info->get_type_code())); DBUG_PRINT("info",("event type: %d",event_info->get_type_code()));
if (file == &log_file)
pthread_mutex_lock(&LOCK_log);
/* /*
No check for auto events flag here - this write method should No check for auto events flag here - this write method should
...@@ -4572,7 +4469,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -4572,7 +4469,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
thd->first_successful_insert_id_in_prev_stmt_for_binlog); thd->first_successful_insert_id_in_prev_stmt_for_binlog);
if (e.write(file)) if (e.write(file))
goto err; goto err_unlock;
} }
if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0) if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
{ {
...@@ -4583,13 +4480,13 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -4583,13 +4480,13 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
thd->auto_inc_intervals_in_cur_stmt_for_binlog. thd->auto_inc_intervals_in_cur_stmt_for_binlog.
minimum()); minimum());
if (e.write(file)) if (e.write(file))
goto err; goto err_unlock;
} }
if (thd->rand_used) if (thd->rand_used)
{ {
Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2); Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2);
if (e.write(file)) if (e.write(file))
goto err; goto err_unlock;
} }
if (thd->user_var_events.elements) if (thd->user_var_events.elements)
{ {
...@@ -4604,7 +4501,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -4604,7 +4501,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
user_var_event->type, user_var_event->type,
user_var_event->charset_number); user_var_event->charset_number);
if (e.write(file)) if (e.write(file))
goto err; goto err_unlock;
} }
} }
} }
...@@ -4616,23 +4513,26 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -4616,23 +4513,26 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (event_info->write(file) || if (event_info->write(file) ||
DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0))
goto err; goto err_unlock;
if (file == &log_file) // we are writing to the real log (disk) if (file == &log_file) // we are writing to the real log (disk)
{ {
if (flush_and_sync()) if (flush_and_sync())
goto err; goto err_unlock;
signal_update(); signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
} }
error=0; error=0;
err_unlock:
if (file == &log_file)
pthread_mutex_unlock(&LOCK_log);
err: err:
if (error) if (error)
set_write_error(thd); set_write_error(thd);
} }
pthread_mutex_unlock(&LOCK_log);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -4957,10 +4857,16 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) ...@@ -4957,10 +4857,16 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
bool bool
MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
Log_event *end_ev) Log_event *end_ev, bool all)
{ {
group_commit_entry entry;
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
entry.thd= thd;
entry.trx_data= trx_data;
entry.error= 0;
entry.all= all;
/* /*
Create the necessary events here, where we have the correct THD (and Create the necessary events here, where we have the correct THD (and
thread context). thread context).
...@@ -4969,23 +4875,23 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, ...@@ -4969,23 +4875,23 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
thread. thread.
*/ */
Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
trx_data->begin_event= &qinfo; entry.begin_event= &qinfo;
trx_data->end_event= end_ev; entry.end_event= end_ev;
if (trx_data->has_incident()) if (trx_data->has_incident())
{ {
Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg); Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg);
trx_data->incident_event= &inc_ev; entry.incident_event= &inc_ev;
DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); DBUG_RETURN(write_transaction_to_binlog_events(&entry));
} }
else else
{ {
trx_data->incident_event= NULL; entry.incident_event= NULL;
DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); DBUG_RETURN(write_transaction_to_binlog_events(&entry));
} }
} }
bool bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data) MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{ {
/* /*
To facilitate group commit for the binlog, we first queue up ourselves in To facilitate group commit for the binlog, we first queue up ourselves in
...@@ -4995,67 +4901,40 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data) ...@@ -4995,67 +4901,40 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data)
the commit and wake them up. the commit and wake them up.
*/ */
pthread_mutex_lock(&trx_data->LOCK_binlog_participant); entry->thd->clear_wakeup_ready();
pthread_mutex_lock(&LOCK_prepare_ordered);
pthread_mutex_lock(&LOCK_queue); group_commit_entry *orig_queue= group_commit_queue;
binlog_trx_data *orig_queue= group_commit_queue; entry->next= orig_queue;
trx_data->next= orig_queue; group_commit_queue= entry;
group_commit_queue= trx_data;
pthread_mutex_unlock(&LOCK_queue);
if (orig_queue != NULL) if (entry->trx_data->using_xa)
{
trx_data->group_commit_leader= FALSE;
trx_data->done= FALSE;
trx_group_commit_participant(trx_data);
}
else
{ {
trx_data->group_commit_leader= TRUE; DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
pthread_mutex_unlock(&trx_data->LOCK_binlog_participant); run_prepare_ordered(entry->thd, entry->all);
trx_group_commit_leader(NULL); DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
} }
pthread_mutex_unlock(&LOCK_prepare_ordered);
return trx_group_commit_finish(trx_data); /*
} The first in the queue handle group commit for all; the others just wait
to be signalled when group commit is done.
/* */
Participate as secondary transaction in group commit. if (orig_queue != NULL)
entry->thd->wait_for_wakeup_ready();
Another thread is already waiting to obtain the LOCK_log, and should include else
this thread in the group commit once the log is obtained. So here we put trx_group_commit_leader(entry);
ourself in the queue and wait to be signalled that the group commit is done.
Note that this function must be called with trx_data->LOCK_binlog_participant
locked; the mutex will be released before return.
*/
void
MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data)
{
safe_mutex_assert_owner(&trx_data->LOCK_binlog_participant);
/* Wait until trx_data.done == true and woken up by the leader. */ if (!entry->error)
while (!trx_data->done) return 0;
pthread_cond_wait(&trx_data->COND_binlog_participant,
&trx_data->LOCK_binlog_participant);
pthread_mutex_unlock(&trx_data->LOCK_binlog_participant);
}
bool switch (entry->error)
MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data)
{
DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_finish");
DBUG_PRINT("info", ("trx_data->error=%d\n", trx_data->error));
if (trx_data->error)
{
switch (trx_data->error)
{ {
case ER_ERROR_ON_WRITE: case ER_ERROR_ON_WRITE:
my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, trx_data->commit_errno); my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno);
break; break;
case ER_ERROR_ON_READ: case ER_ERROR_ON_READ:
my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH),
trx_data->trans_log.file_name, trx_data->commit_errno); entry->trx_data->trans_log.file_name, entry->commit_errno);
break; break;
default: default:
/* /*
...@@ -5063,9 +4942,9 @@ MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) ...@@ -5063,9 +4942,9 @@ MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data)
But just in case one is added later without updating the above switch But just in case one is added later without updating the above switch
statement, include a catch-all. statement, include a catch-all.
*/ */
my_printf_error(trx_data->error, my_printf_error(entry->error,
"Error writing transaction to binary log: %d", "Error writing transaction to binary log: %d",
MYF(ME_NOREFRESH), trx_data->error); MYF(ME_NOREFRESH), entry->error);
} }
/* /*
...@@ -5073,13 +4952,10 @@ MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) ...@@ -5073,13 +4952,10 @@ MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data)
we need to mark it as not needed for recovery (unlog() is not called we need to mark it as not needed for recovery (unlog() is not called
for a transaction if log_xid() fails). for a transaction if log_xid() fails).
*/ */
if (trx_data->end_event->get_type_code() == XID_EVENT) if (entry->trx_data->using_xa)
mark_xid_done(); mark_xid_done();
DBUG_RETURN(1); return 1;
}
DBUG_RETURN(0);
} }
/* /*
...@@ -5093,69 +4969,36 @@ MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) ...@@ -5093,69 +4969,36 @@ MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data)
*/ */
void void
MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{ {
DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
uint xid_count= 0; uint xid_count= 0;
uint write_count= 0; uint write_count= 0;
/* First, put anything from group_log_xid into the queue. */
binlog_trx_data *full_queue= NULL;
binlog_trx_data **next_ptr= &full_queue;
for (TC_group_commit_entry *entry= first; entry; entry= entry->next)
{
binlog_trx_data *const trx_data=
(binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton);
/* Skip log_xid for transactions without xid, marked by NULL end_event. */
if (!trx_data->end_event)
continue;
trx_data->error= 0;
*next_ptr= trx_data;
next_ptr= &(trx_data->next);
}
/* /*
Next, lock the LOCK_log(), and once we get it, add any additional writes Lock the LOCK_log(), and once we get it, collect any additional writes
that queued up while we were waiting. that queued up while we were waiting.
Note that if some writer not going through log_xid() comes in and gets the
LOCK_log before us, they will not be able to include us in their group
commit (and they are not able to handle ensuring same commit order between
us and participating transactional storage engines anyway).
On the other hand, when we get the LOCK_log, we will be able to include
any non-trasactional writes that queued up in our group commit. This
should hopefully not be too big of a problem, as group commit is most
important for the transactional case anyway when durability (fsync) is
enabled.
*/ */
VOID(pthread_mutex_lock(&LOCK_log)); VOID(pthread_mutex_lock(&LOCK_log));
DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
/* pthread_mutex_lock(&LOCK_prepare_ordered);
As the queue is in reverse order of entering, reverse the queue as we add group_commit_entry *current= group_commit_queue;
it to the existing one. Note that there is no ordering defined between
transactional and non-transactional commits.
*/
pthread_mutex_lock(&LOCK_queue);
binlog_trx_data *current= group_commit_queue;
group_commit_queue= NULL; group_commit_queue= NULL;
pthread_mutex_unlock(&LOCK_queue); pthread_mutex_unlock(&LOCK_prepare_ordered);
binlog_trx_data *xtra_queue= NULL;
/* As the queue is in reverse order of entering, reverse it. */
group_commit_entry *queue= NULL;
while (current) while (current)
{ {
current->error= 0; group_commit_entry *next= current->next;
binlog_trx_data *next= current->next; current->next= queue;
current->next= xtra_queue; queue= current;
xtra_queue= current;
current= next; current= next;
} }
*next_ptr= xtra_queue; DBUG_ASSERT(leader == queue /* the leader should be first in queue */);
/* /* Now we have in queue the list of transactions to be committed in order. */
Now we have in full_queue the list of transactions to be committed in
order.
*/
DBUG_ASSERT(is_open()); DBUG_ASSERT(is_open());
if (likely(is_open())) // Should always be true if (likely(is_open())) // Should always be true
{ {
...@@ -5169,9 +5012,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) ...@@ -5169,9 +5012,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
current->error and let the thread do the error reporting itself once current->error and let the thread do the error reporting itself once
we wake it up. we wake it up.
*/ */
for (current= full_queue; current != NULL; current= current->next) for (current= queue; current != NULL; current= current->next)
{ {
IO_CACHE *cache= &current->trans_log; binlog_trx_data *trx_data= current->trx_data;
IO_CACHE *cache= &trx_data->trans_log;
/* Skip log_xid for transactions without xid, marked by NULL end_event. */
if (!current->end_event)
continue;
/* /*
We only bother to write to the binary log if there is anything We only bother to write to the binary log if there is anything
...@@ -5186,9 +5034,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) ...@@ -5186,9 +5034,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
write_count++; write_count++;
} }
current->commit_bin_log_file_pos= trx_data->commit_bin_log_file_pos=
log_file.pos_in_file + (log_file.write_pos - log_file.write_buffer); log_file.pos_in_file + (log_file.write_pos - log_file.write_buffer);
if (current->end_event->get_type_code() == XID_EVENT) if (trx_data->using_xa)
xid_count++; xid_count++;
} }
...@@ -5196,7 +5044,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) ...@@ -5196,7 +5044,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
{ {
if (flush_and_sync()) if (flush_and_sync())
{ {
for (current= full_queue; current != NULL; current= current->next) for (current= queue; current != NULL; current= current->next)
{ {
if (!current->error) if (!current->error)
{ {
...@@ -5213,7 +5061,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) ...@@ -5213,7 +5061,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
/* /*
if any commit_events are Xid_log_event, increase the number of if any commit_events are Xid_log_event, increase the number of
prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated
if there're prepared xids in it - see the comment in new_file() for if there're prepared xids in it - see the comment in new_file() for
an explanation. an explanation.
If no Xid_log_events (then it's all Query_log_event) rotate binlog, If no Xid_log_events (then it's all Query_log_event) rotate binlog,
...@@ -5227,37 +5075,49 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) ...@@ -5227,37 +5075,49 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
} }
VOID(pthread_mutex_unlock(&LOCK_log)); DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
pthread_mutex_lock(&LOCK_commit_ordered);
/* /*
Signal those that are not part of group_log_xid, and are not group leaders We cannot unlock LOCK_log until we have locked LOCK_commit_ordered;
running the queue. otherwise scheduling could allow the next group commit to run ahead of us,
messing up the order of commit_ordered() calls. But as soon as
LOCK_commit_ordered is obtained, we can let the next group commit start.
*/
pthread_mutex_unlock(&LOCK_log);
DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log");
++num_group_commits;
Since a group leader runs the queue itself if a group_log_xid does not get /*
to do it forst, such leader threads do not need wait or wakeup. Wakeup each participant waiting for our group commit, first calling the
commit_ordered() methods for any transactions doing 2-phase commit.
*/ */
for (current= xtra_queue; current != NULL; current= current->next) current= queue;
while (current != NULL)
{ {
/* DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered");
Note that we need to take LOCK_binlog_participant even in the case of a ++num_commits;
leader! if (current->trx_data->using_xa && !current->error)
run_commit_ordered(current->thd, current->all);
Otherwise there is a race between setting and testing the /*
group_commit_leader flag. Careful not to access current->next after waking up the other thread! As
it may change immediately after wakeup.
*/ */
pthread_mutex_lock(&current->LOCK_binlog_participant); group_commit_entry *next= current->next;
if (!current->group_commit_leader) if (current != leader) // Don't wake up ourself
{ current->thd->signal_wakeup_ready();
current->done= true; current= next;
pthread_cond_signal(&current->COND_binlog_participant);
}
pthread_mutex_unlock(&current->LOCK_binlog_participant);
} }
DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered");
pthread_mutex_unlock(&LOCK_commit_ordered);
DBUG_VOID_RETURN;
} }
int int
MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry)
{ {
binlog_trx_data *trx_data= entry->trx_data;
IO_CACHE *cache= &trx_data->trans_log; IO_CACHE *cache= &trx_data->trans_log;
/* /*
Log "BEGIN" at the beginning of every transaction. Here, a transaction is Log "BEGIN" at the beginning of every transaction. Here, a transaction is
...@@ -5272,7 +5132,7 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) ...@@ -5272,7 +5132,7 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data)
in wrong positions being shown to the user, MASTER_POS_WAIT in wrong positions being shown to the user, MASTER_POS_WAIT
undue waiting etc. undue waiting etc.
*/ */
if (trx_data->begin_event->write(&log_file)) if (entry->begin_event->write(&log_file))
return ER_ERROR_ON_WRITE; return ER_ERROR_ON_WRITE;
DBUG_EXECUTE_IF("crash_before_writing_xid", DBUG_EXECUTE_IF("crash_before_writing_xid",
...@@ -5289,10 +5149,10 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) ...@@ -5289,10 +5149,10 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data)
if (write_cache(cache)) if (write_cache(cache))
return ER_ERROR_ON_WRITE; return ER_ERROR_ON_WRITE;
if (trx_data->end_event->write(&log_file)) if (entry->end_event->write(&log_file))
return ER_ERROR_ON_WRITE; return ER_ERROR_ON_WRITE;
if (trx_data->has_incident() && trx_data->incident_event->write(&log_file)) if (entry->incident_event && entry->incident_event->write(&log_file))
return ER_ERROR_ON_WRITE; return ER_ERROR_ON_WRITE;
if (cache->error) // Error on read if (cache->error) // Error on read
...@@ -5754,30 +5614,6 @@ TC_LOG::run_commit_ordered(THD *thd, bool all) ...@@ -5754,30 +5614,6 @@ TC_LOG::run_commit_ordered(THD *thd, bool all)
} }
} }
TC_LOG_queued::TC_LOG_queued() : group_commit_queue(NULL)
{
}
TC_LOG_queued::~TC_LOG_queued()
{
}
TC_LOG_queued::TC_group_commit_entry *
TC_LOG_queued::reverse_queue(TC_LOG_queued::TC_group_commit_entry *queue)
{
TC_group_commit_entry *entry= queue;
TC_group_commit_entry *prev= NULL;
while (entry)
{
TC_group_commit_entry *next= entry->next;
entry->next= prev;
prev= entry;
entry= next;
}
return prev;
}
int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_prepare_ordered,
bool need_commit_ordered) bool need_commit_ordered)
...@@ -5886,142 +5722,6 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, ...@@ -5886,142 +5722,6 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
} }
TC_LOG_group_commit::TC_LOG_group_commit()
: num_commits(0), num_group_commits(0)
{
}
TC_LOG_group_commit::~TC_LOG_group_commit()
{
}
void
TC_LOG_group_commit::init()
{
my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW,
"LOCK_group_commit", MYF(0));
}
void
TC_LOG_group_commit::deinit()
{
pthread_mutex_destroy(&LOCK_group_commit);
}
int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered,
bool need_commit_ordered)
{
IF_DBUG(int err;)
int cookie;
struct TC_group_commit_entry entry;
bool is_group_commit_leader;
thd->clear_wakeup_ready();
entry.thd= thd;
entry.all= all;
entry.xid_error= 0;
pthread_mutex_lock(&LOCK_prepare_ordered);
TC_group_commit_entry *previous_queue= group_commit_queue;
entry.next= previous_queue;
group_commit_queue= &entry;
DEBUG_SYNC(thd, "commit_before_prepare_ordered");
run_prepare_ordered(thd, all);
DEBUG_SYNC(thd, "commit_after_prepare_ordered");
pthread_mutex_unlock(&LOCK_prepare_ordered);
is_group_commit_leader= (previous_queue == NULL);
if (is_group_commit_leader)
{
TC_group_commit_entry *current;
pthread_mutex_lock(&LOCK_group_commit);
DEBUG_SYNC(thd, "commit_after_get_LOCK_group_commit");
pthread_mutex_lock(&LOCK_prepare_ordered);
TC_group_commit_entry *queue= group_commit_queue;
group_commit_queue= NULL;
pthread_mutex_unlock(&LOCK_prepare_ordered);
/*
Since we enqueue at the head, the queue is actually in reverse order.
So reverse it back into correct commit order before returning.
*/
queue= reverse_queue(queue);
/* The first in the queue is the leader. */
DBUG_ASSERT(queue == &entry && queue->thd == thd);
DEBUG_SYNC(thd, "commit_before_group_log_xid");
/* This will set individual error codes in each thd->xid_error. */
group_log_xid(queue);
DEBUG_SYNC(thd, "commit_after_group_log_xid");
/*
Call commit_ordered methods for all transactions in the queue
(that did not get an error in group_log_xid()).
We do this under an additional global LOCK_commit_ordered; this is
so that transactions that do not need 2-phase commit do not have
to wait for the potentially long duration of LOCK_group_commit.
*/
current= queue;
DEBUG_SYNC(thd, "commit_before_get_LOCK_commit_ordered");
pthread_mutex_lock(&LOCK_commit_ordered);
/*
We cannot unlock LOCK_group_commit until we have locked
LOCK_commit_ordered; otherwise scheduling could allow the next
group commit to run ahead of us, messing up the order of
commit_ordered() calls. But as soon as LOCK_commit_ordered is
obtained, we can let the next group commit start.
*/
pthread_mutex_unlock(&LOCK_group_commit);
DEBUG_SYNC(thd, "commit_after_release_LOCK_group_commit");
++num_group_commits;
do
{
DEBUG_SYNC(thd, "commit_loop_entry_commit_ordered");
++num_commits;
if (!current->xid_error)
run_commit_ordered(current->thd, current->all);
/*
Careful not to access current->next_commit_ordered after waking up
the other thread! As it may change immediately after wakeup.
*/
TC_group_commit_entry *next= current->next;
if (current != &entry) // Don't wake up ourself
current->thd->signal_wakeup_ready();
current= next;
} while (current != NULL);
DEBUG_SYNC(thd, "commit_after_group_run_commit_ordered");
pthread_mutex_unlock(&LOCK_commit_ordered);
}
else
{
/* If not leader, just wait until leader wakes us up. */
thd->wait_for_wakeup_ready();
}
/*
Now that we're back in our own thread context, do any delayed processing
and error reporting.
*/
IF_DBUG(err= entry.xid_error;)
cookie= xid_log_after(&entry);
/* The cookie must be non-zero in the non-error case. */
DBUG_ASSERT(err || cookie);
return cookie;
}
/********* transaction coordinator log for 2pc - mmap() based solution *******/ /********* transaction coordinator log for 2pc - mmap() based solution *******/
/* /*
...@@ -6567,7 +6267,6 @@ int TC_LOG_BINLOG::open(const char *opt_name) ...@@ -6567,7 +6267,6 @@ int TC_LOG_BINLOG::open(const char *opt_name)
DBUG_ASSERT(total_ha_2pc > 1); DBUG_ASSERT(total_ha_2pc > 1);
DBUG_ASSERT(opt_name && opt_name[0]); DBUG_ASSERT(opt_name && opt_name[0]);
TC_LOG_group_commit::init();
pthread_mutex_init(&LOCK_prep_xids, MY_MUTEX_INIT_FAST); pthread_mutex_init(&LOCK_prep_xids, MY_MUTEX_INIT_FAST);
pthread_cond_init (&COND_prep_xids, 0); pthread_cond_init (&COND_prep_xids, 0);
...@@ -6651,36 +6350,33 @@ void TC_LOG_BINLOG::close() ...@@ -6651,36 +6350,33 @@ void TC_LOG_BINLOG::close()
DBUG_ASSERT(prepared_xids==0); DBUG_ASSERT(prepared_xids==0);
pthread_mutex_destroy(&LOCK_prep_xids); pthread_mutex_destroy(&LOCK_prep_xids);
pthread_cond_destroy (&COND_prep_xids); pthread_cond_destroy (&COND_prep_xids);
TC_LOG_group_commit::deinit();
} }
/* /*
Do a binlog log_xid() for a group of transactions, linked through Do a binlog log_xid() for a group of transactions, linked through
thd->next_commit_ordered. thd->next_commit_ordered.
*/ */
void
TC_LOG_BINLOG::group_log_xid(TC_group_commit_entry *first)
{
DBUG_ENTER("TC_LOG_BINLOG::group_log_xid");
trx_group_commit_leader(first);
for (TC_group_commit_entry *entry= first; entry; entry= entry->next)
{
binlog_trx_data *const trx_data=
(binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton);
entry->xid_error= trx_data->error;
}
DBUG_VOID_RETURN;
}
int int
TC_LOG_BINLOG::xid_log_after(TC_group_commit_entry *entry) TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered __attribute__((unused)),
bool need_commit_ordered __attribute__((unused)))
{ {
int err;
DBUG_ENTER("TC_LOG_BINLOG::log_and_order");
binlog_trx_data *const trx_data= binlog_trx_data *const trx_data=
(binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
if (trx_group_commit_finish(trx_data))
return 0; // Returning zero cookie signals error trx_data->using_xa= TRUE;
if (xid)
{
Xid_log_event xid_event(thd, xid);
err= binlog_flush_trx_cache(thd, trx_data, &xid_event, all);
}
else else
return 1; err= binlog_flush_trx_cache(thd, trx_data, NULL, all);
DBUG_RETURN(!err);
} }
/* /*
......
...@@ -73,101 +73,6 @@ extern pthread_mutex_t LOCK_commit_ordered; ...@@ -73,101 +73,6 @@ extern pthread_mutex_t LOCK_commit_ordered;
extern void TC_init(); extern void TC_init();
extern void TC_destroy(); extern void TC_destroy();
/*
Base class for two TC implementations TC_LOG_unordered and
TC_LOG_group_commit that both use a queue of threads waiting for group
commit.
*/
class TC_LOG_queued: public TC_LOG
{
protected:
TC_LOG_queued();
~TC_LOG_queued();
/* Structure used to link list of THDs waiting for group commit. */
struct TC_group_commit_entry
{
struct TC_group_commit_entry *next;
THD *thd;
/* This is the `all' parameter for ha_commit_trans() etc. */
bool all;
/*
Set by TC_LOG_group_commit::group_log_xid(), to return per-thd error and
cookie.
*/
int xid_error;
};
TC_group_commit_entry * reverse_queue(TC_group_commit_entry *queue);
/*
This is a queue of threads waiting for being allowed to commit.
Access to the queue must be protected by LOCK_prepare_ordered.
*/
TC_group_commit_entry *group_commit_queue;
};
class TC_LOG_group_commit: public TC_LOG_queued
{
public:
TC_LOG_group_commit();
~TC_LOG_group_commit();
void init();
void deinit();
int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_commit_ordered);
protected:
/* Total number of committed transactions. */
ulonglong num_commits;
/* Number of group commits done. */
ulonglong num_group_commits;
/*
When using this class, this method is used instead of log_xid() to do
logging of a group of transactions all at once.
The transactions will be linked through THD::next_commit_ordered.
Additionally, when this method is used instead of log_xid(), the order in
which handler->prepare_ordered() and handler->commit_ordered() are called
is guaranteed to be the same as the order of calls and THD list elements
for group_log_xid().
This can be used to efficiently implement group commit that at the same
time preserves the order of commits among handlers and TC (eg. to get same
commit order in InnoDB and binary log).
For TCs that do not need this, it can be preferable to use plain log_xid()
with class TC_LOG_unordered instead, as it allows threads to run log_xid()
in parallel with each other. In contrast, group_log_xid() runs under a
global mutex, so it is guaranteed that only once call into it will be
active at once.
Since this call handles multiple threads/THDs at once, my_error() (and
other code that relies on thread local storage) cannot be used in this
method. Instead, the implementation must record any error and report it as
the return value from xid_log_after(), which will be invoked individually
for each thread.
In the success case, this method must set thd->xid_cookie for each thread
to the cookie that is normally returned from log_xid() (which must be
non-zero in the non-error case).
*/
virtual void group_log_xid(TC_group_commit_entry *first) = 0;
/*
Called for each transaction (in corrent thread context) after
group_log_xid() has finished, but with no guarantee on ordering among
threads.
Can be used to do error reporting etc. */
virtual int xid_log_after(TC_group_commit_entry *entry) = 0;
private:
/* Mutex used to serialise calls to group_log_xid(). */
pthread_mutex_t LOCK_group_commit;
};
class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging
{ {
public: public:
...@@ -398,17 +303,33 @@ private: ...@@ -398,17 +303,33 @@ private:
}; };
class binlog_trx_data; class binlog_trx_data;
class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{ {
private: private:
struct group_commit_entry
{
struct group_commit_entry *next;
THD *thd;
binlog_trx_data *trx_data;
/*
Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be
written during group commit. The incident_event is only valid if
trx_data->has_incident() is true.
*/
Log_event *begin_event;
Log_event *end_event;
Log_event *incident_event;
/* Set during group commit to record any per-thread error. */
int error;
int commit_errno;
/* This is the `all' parameter for ha_commit_ordered(). */
bool all;
/* True if we come in through XA log_and_order(), false otherwise. */
};
/* LOCK_log and LOCK_index are inited by init_pthread_objects() */ /* LOCK_log and LOCK_index are inited by init_pthread_objects() */
pthread_mutex_t LOCK_index; pthread_mutex_t LOCK_index;
pthread_mutex_t LOCK_prep_xids; pthread_mutex_t LOCK_prep_xids;
/*
Mutex to protect the queue of non-transactional binlog writes waiting to
participate in group commit.
*/
pthread_mutex_t LOCK_queue;
pthread_cond_t COND_prep_xids; pthread_cond_t COND_prep_xids;
pthread_cond_t update_cond; pthread_cond_t update_cond;
...@@ -449,7 +370,11 @@ class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG ...@@ -449,7 +370,11 @@ class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG
*/ */
bool no_auto_events; bool no_auto_events;
/* Queue of transactions queued up to participate in group commit. */ /* Queue of transactions queued up to participate in group commit. */
binlog_trx_data *group_commit_queue; group_commit_entry *group_commit_queue;
/* Total number of committed transactions. */
ulonglong num_commits;
/* Number of group commits done. */
ulonglong num_group_commits;
int write_to_file(IO_CACHE *cache); int write_to_file(IO_CACHE *cache);
/* /*
...@@ -459,10 +384,9 @@ class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG ...@@ -459,10 +384,9 @@ class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG
*/ */
void new_file_without_locking(); void new_file_without_locking();
void new_file_impl(bool need_lock); void new_file_impl(bool need_lock);
int write_transaction(binlog_trx_data *trx_data); int write_transaction(group_commit_entry *entry);
bool write_transaction_to_binlog_events(binlog_trx_data *trx_data); bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_participant(binlog_trx_data *trx_data); void trx_group_commit_leader(group_commit_entry *leader);
void trx_group_commit_leader(TC_group_commit_entry *first);
void mark_xid_done(); void mark_xid_done();
void mark_xids_active(uint xid_count); void mark_xids_active(uint xid_count);
...@@ -493,8 +417,8 @@ public: ...@@ -493,8 +417,8 @@ public:
int open(const char *opt_name); int open(const char *opt_name);
void close(); void close();
void group_log_xid(TC_group_commit_entry *first); int log_and_order(THD *thd, my_xid xid, bool all,
int xid_log_after(TC_group_commit_entry *entry); bool need_prepare_ordered, bool need_commit_ordered);
void unlog(ulong cookie, my_xid xid); void unlog(ulong cookie, my_xid xid);
int recover(IO_CACHE *log, Format_description_log_event *fdle); int recover(IO_CACHE *log, Format_description_log_event *fdle);
#if !defined(MYSQL_CLIENT) #if !defined(MYSQL_CLIENT)
...@@ -540,8 +464,7 @@ public: ...@@ -540,8 +464,7 @@ public:
void reset_gathered_updates(THD *thd); void reset_gathered_updates(THD *thd);
bool write(Log_event* event_info); // binary log write bool write(Log_event* event_info); // binary log write
bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
Log_event *end_ev); Log_event *end_ev, bool all);
bool trx_group_commit_finish(binlog_trx_data *trx_data);
bool write_incident(THD *thd); bool write_incident(THD *thd);
int write_cache(IO_CACHE *cache); int write_cache(IO_CACHE *cache);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment