Commit 92dfbd42 authored by unknown's avatar unknown

Merge MWL#116 after-review fixes.

parents 4d964073 bc9f6021
...@@ -561,6 +561,8 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); ...@@ -561,6 +561,8 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *);
#define my_b_tell(info) ((info)->pos_in_file + \ #define my_b_tell(info) ((info)->pos_in_file + \
(size_t) (*(info)->current_pos - (info)->request_pos)) (size_t) (*(info)->current_pos - (info)->request_pos))
#define my_b_write_tell(info) ((info)->pos_in_file + \
((info)->write_pos - (info)->write_buffer))
#define my_b_get_buffer_start(info) (info)->request_pos #define my_b_get_buffer_start(info) (info)->request_pos
#define my_b_get_bytes_in_buffer(info) (char*) (info)->read_end - \ #define my_b_get_bytes_in_buffer(info) (char*) (info)->read_end - \
......
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET binlog_format= mixed;
RESET MASTER;
XA START 'xatest';
INSERT INTO t1 VALUES (1);
XA END 'xatest';
XA PREPARE 'xatest';
XA COMMIT 'xatest';
XA START 'xatest';
INSERT INTO t1 VALUES (2);
XA END 'xatest';
XA COMMIT 'xatest' ONE PHASE;
BEGIN;
INSERT INTO t1 VALUES (3);
COMMIT;
SELECT * FROM t1 ORDER BY a;
a
1
2
3
SHOW BINLOG EVENTS LIMIT 1,9;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query 1 # BEGIN
master-bin.000001 # Query 1 # use `test`; INSERT INTO t1 VALUES (1)
master-bin.000001 # Query 1 # COMMIT
master-bin.000001 # Query 1 # BEGIN
master-bin.000001 # Query 1 # use `test`; INSERT INTO t1 VALUES (2)
master-bin.000001 # Query 1 # COMMIT
master-bin.000001 # Query 1 # BEGIN
master-bin.000001 # Query 1 # use `test`; INSERT INTO t1 VALUES (3)
master-bin.000001 # Xid 1 # COMMIT /* xid=XX */
DROP TABLE t1;
...@@ -21,7 +21,8 @@ SELECT * FROM t1; ...@@ -21,7 +21,8 @@ SELECT * FROM t1;
# Actually the output from this currently shows a bug. # Actually the output from this currently shows a bug.
# The injected IO error leaves partially written transactions in the binlog in # The injected IO error leaves partially written transactions in the binlog in
# the form of stray "BEGIN" events. # the form of stray "BEGIN" events.
# These should disappear from the output if binlog error handling is improved. # These should disappear from the output if binlog error handling is improved
# (see MySQL Bug#37148 and WL#1790).
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /Server ver: .*, Binlog ver: .*/Server ver: #, Binlog ver: #/ /table_id: [0-9]+/table_id: #/ --replace_regex /\/\* xid=.* \*\//\/* XID *\// /Server ver: .*, Binlog ver: .*/Server ver: #, Binlog ver: #/ /table_id: [0-9]+/table_id: #/
--replace_column 1 BINLOG 2 POS 5 ENDPOS --replace_column 1 BINLOG 2 POS 5 ENDPOS
SHOW BINLOG EVENTS; SHOW BINLOG EVENTS;
......
...@@ -16,6 +16,7 @@ select * from t2; ...@@ -16,6 +16,7 @@ select * from t2;
b b
2 2
SET sql_log_bin = 0; SET sql_log_bin = 0;
BEGIN;
INSERT INTO t1 VALUES (3); INSERT INTO t1 VALUES (3);
INSERT INTO t2 VALUES (4); INSERT INTO t2 VALUES (4);
COMMIT; COMMIT;
......
...@@ -20,6 +20,7 @@ select * from t2; ...@@ -20,6 +20,7 @@ select * from t2;
# Test 2-phase commit when we disable binlogging. # Test 2-phase commit when we disable binlogging.
SET sql_log_bin = 0; SET sql_log_bin = 0;
BEGIN;
INSERT INTO t1 VALUES (3); INSERT INTO t1 VALUES (3);
INSERT INTO t2 VALUES (4); INSERT INTO t2 VALUES (4);
COMMIT; COMMIT;
......
...@@ -8,6 +8,10 @@ ...@@ -8,6 +8,10 @@
# Don't test this under valgrind, memory leaks will occur as we crash # Don't test this under valgrind, memory leaks will occur as we crash
--source include/not_valgrind.inc --source include/not_valgrind.inc
# The test case currently uses grep and tail, which may be unavailable on
# some windows systems. But see MWL#191 for how to remove the need for grep.
--source include/not_windows.inc
# XtraDB stores the binlog position corresponding to the last commit, and # XtraDB stores the binlog position corresponding to the last commit, and
# prints it during crash recovery. # prints it during crash recovery.
# Test that we get the correct position when we group commit several # Test that we get the correct position when we group commit several
......
--source include/have_innodb.inc
--source include/have_log_bin.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
# Fix binlog format (otherwise SHOW BINLOG EVENTS will fluctuate).
SET binlog_format= mixed;
RESET MASTER;
XA START 'xatest';
INSERT INTO t1 VALUES (1);
XA END 'xatest';
XA PREPARE 'xatest';
XA COMMIT 'xatest';
XA START 'xatest';
INSERT INTO t1 VALUES (2);
XA END 'xatest';
XA COMMIT 'xatest' ONE PHASE;
BEGIN;
INSERT INTO t1 VALUES (3);
COMMIT;
SELECT * FROM t1 ORDER BY a;
--replace_column 2 # 5 #
--replace_regex /xid=[0-9]+/xid=XX/
SHOW BINLOG EVENTS LIMIT 1,9;
DROP TABLE t1;
...@@ -1193,17 +1193,16 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1193,17 +1193,16 @@ int ha_commit_trans(THD *thd, bool all)
Sic: we know that prepare() is not NULL since otherwise Sic: we know that prepare() is not NULL since otherwise
trans->no_2pc would have been set. trans->no_2pc would have been set.
*/ */
if ((err= ht->prepare(ht, thd, all))) err= ht->prepare(ht, thd, all);
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
status_var_increment(thd->status_var.ha_prepare_count); status_var_increment(thd->status_var.ha_prepare_count);
if (err)
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
if (err) if (err)
goto err; goto err;
if (ht->prepare_ordered) need_prepare_ordered|= (ht->prepare_ordered != NULL);
need_prepare_ordered= TRUE; need_commit_ordered|= (ht->commit_ordered != NULL);
if (ht->commit_ordered)
need_commit_ordered= TRUE;
} }
DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
...@@ -1232,8 +1231,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1232,8 +1231,7 @@ int ha_commit_trans(THD *thd, bool all)
/* Come here if error and we need to rollback. */ /* Come here if error and we need to rollback. */
err: err:
if (!error) error= 1; /* Transaction was rolled back */
error= 1;
ha_rollback_trans(thd, all); ha_rollback_trans(thd, all);
end: end:
...@@ -1885,8 +1883,11 @@ int ha_start_consistent_snapshot(THD *thd) ...@@ -1885,8 +1883,11 @@ int ha_start_consistent_snapshot(THD *thd)
bool warn= true; bool warn= true;
/* /*
Holding the LOCK_commit_ordered mutex ensures that for any transaction Holding the LOCK_commit_ordered mutex ensures that we get the same
we either see it committed in all engines, or in none. snapshot for all engines (including the binary log). This allows us
among other things to do backups with
START TRANSACTION WITH CONSISTENT SNAPSHOT and
have a consistent binlog position.
*/ */
pthread_mutex_lock(&LOCK_commit_ordered); pthread_mutex_lock(&LOCK_commit_ordered);
plugin_foreach(thd, snapshot_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &warn); plugin_foreach(thd, snapshot_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &warn);
......
...@@ -776,7 +776,7 @@ struct handlerton ...@@ -776,7 +776,7 @@ struct handlerton
Not that like prepare(), commit_ordered() is only called when 2-phase Not that like prepare(), commit_ordered() is only called when 2-phase
commit takes place. Ie. when no binary log and only a single engine commit takes place. Ie. when no binary log and only a single engine
participates in a transaction, one commit() is called, no participates in a transaction, one commit() is called, no
commit_orderd(). So engines must be prepared for this. commit_ordered(). So engines must be prepared for this.
The calls to commit_ordered() in multiple parallel transactions is The calls to commit_ordered() in multiple parallel transactions is
guaranteed to happen in the same order in every participating guaranteed to happen in the same order in every participating
...@@ -789,7 +789,7 @@ struct handlerton ...@@ -789,7 +789,7 @@ struct handlerton
transaction visible to other transactions, thereby making the order of transaction visible to other transactions, thereby making the order of
transaction commits be defined by the order of commit_ordered() calls. transaction commits be defined by the order of commit_ordered() calls.
The intension is that commit_ordered() should do the minimal amount of The intention is that commit_ordered() should do the minimal amount of
work that needs to happen in consistent commit order among handlers. To work that needs to happen in consistent commit order among handlers. To
preserve ordering, calls need to be serialised on a global mutex, so preserve ordering, calls need to be serialised on a global mutex, so
doing any time-consuming or blocking operations in commit_ordered() will doing any time-consuming or blocking operations in commit_ordered() will
...@@ -833,7 +833,7 @@ struct handlerton ...@@ -833,7 +833,7 @@ struct handlerton
order transactions will be eventually committed. order transactions will be eventually committed.
Like commit_ordered(), prepare_ordered() calls are serialised to maintain Like commit_ordered(), prepare_ordered() calls are serialised to maintain
ordering, so the intension is that they should execute fast, with only ordering, so the intention is that they should execute fast, with only
the minimal amount of work needed to define commit order. Handlers can the minimal amount of work needed to define commit order. Handlers can
rely on this serialisation, and do not need to do any extra locking to rely on this serialisation, and do not need to do any extra locking to
avoid two prepare_ordered() calls running in parallel. avoid two prepare_ordered() calls running in parallel.
...@@ -853,8 +853,7 @@ struct handlerton ...@@ -853,8 +853,7 @@ struct handlerton
require blocking all other commits for an indefinite time). require blocking all other commits for an indefinite time).
When 2-phase commit is not used (eg. only one engine (and no binlog) in When 2-phase commit is not used (eg. only one engine (and no binlog) in
transaction), prepare() is not called and in such cases prepare_ordered() transaction), neither prepare() nor prepare_ordered() is called.
also is not called.
*/ */
void (*prepare_ordered)(handlerton *hton, THD *thd, bool all); void (*prepare_ordered)(handlerton *hton, THD *thd, bool all);
int (*recover)(handlerton *hton, XID *xid_list, uint len); int (*recover)(handlerton *hton, XID *xid_list, uint len);
......
...@@ -64,6 +64,36 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all); ...@@ -64,6 +64,36 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all);
static int binlog_prepare(handlerton *hton, THD *thd, bool all); static int binlog_prepare(handlerton *hton, THD *thd, bool all);
static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd); static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd);
static LEX_STRING const write_error_msg=
{ C_STRING_WITH_LEN("error writing to the binary log") };
#ifndef DBUG_OFF
static ulong opt_binlog_dbug_fsync_sleep= 0;
#endif
static my_bool mutexes_inited;
pthread_mutex_t LOCK_prepare_ordered;
pthread_mutex_t LOCK_commit_ordered;
static ulonglong binlog_status_var_num_commits;
static ulonglong binlog_status_var_num_group_commits;
static char binlog_snapshot_file[FN_REFLEN];
static ulonglong binlog_snapshot_position;
static SHOW_VAR binlog_status_vars_detail[]=
{
{"commits",
(char *)&binlog_status_var_num_commits, SHOW_LONGLONG},
{"group_commits",
(char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG},
{"snapshot_file",
(char *)&binlog_snapshot_file, SHOW_CHAR},
{"snapshot_position",
(char *)&binlog_snapshot_position, SHOW_LONGLONG},
{NullS, NullS, SHOW_LONG}
};
/** /**
Silence all errors and warnings reported when performing a write Silence all errors and warnings reported when performing a write
to a log table. to a log table.
...@@ -114,41 +144,6 @@ char *make_default_log_name(char *buff,const char* log_ext) ...@@ -114,41 +144,6 @@ char *make_default_log_name(char *buff,const char* log_ext)
MYF(MY_UNPACK_FILENAME|MY_REPLACE_EXT)); MYF(MY_UNPACK_FILENAME|MY_REPLACE_EXT));
} }
/*
Helper class to hold a mutex for the duration of the
block.
Eliminates the need for explicit unlocking of mutexes on, e.g.,
error returns. On passing a null pointer, the sentry will not do
anything.
*/
class Mutex_sentry
{
public:
Mutex_sentry(pthread_mutex_t *mutex)
: m_mutex(mutex)
{
if (m_mutex)
pthread_mutex_lock(mutex);
}
~Mutex_sentry()
{
if (m_mutex)
pthread_mutex_unlock(m_mutex);
#ifndef DBUG_OFF
m_mutex= 0;
#endif
}
private:
pthread_mutex_t *m_mutex;
// It's not allowed to copy this object in any way
Mutex_sentry(Mutex_sentry const&);
void operator=(Mutex_sentry const&);
};
/* /*
Helper class to store binary log transaction data. Helper class to store binary log transaction data.
*/ */
...@@ -156,7 +151,8 @@ class binlog_trx_data { ...@@ -156,7 +151,8 @@ class binlog_trx_data {
public: public:
binlog_trx_data() binlog_trx_data()
: at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0),
before_stmt_pos(MY_OFF_T_UNDEF), last_commit_pos_offset(0), using_xa(0) before_stmt_pos(MY_OFF_T_UNDEF), last_commit_pos_offset(0),
using_xa(FALSE), xa_xid(0)
{ {
trans_log.end_of_file= max_binlog_cache_size; trans_log.end_of_file= max_binlog_cache_size;
strcpy(last_commit_pos_file, ""); strcpy(last_commit_pos_file, "");
...@@ -278,6 +274,7 @@ public: ...@@ -278,6 +274,7 @@ public:
XA, false if not. XA, false if not.
*/ */
bool using_xa; bool using_xa;
my_xid xa_xid;
}; };
handlerton *binlog_hton; handlerton *binlog_hton;
...@@ -1445,6 +1442,8 @@ static int binlog_close_connection(handlerton *hton, THD *thd) ...@@ -1445,6 +1442,8 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
thd The thread whose transaction should be ended thd The thread whose transaction should be ended
trx_data Pointer to the transaction data to use trx_data Pointer to the transaction data to use
all True if the entire transaction should be ended, false if
only the statement transaction should be ended.
end_ev The end event to use (COMMIT, ROLLBACK, or commit XID) end_ev The end event to use (COMMIT, ROLLBACK, or commit XID)
DESCRIPTION DESCRIPTION
...@@ -1553,9 +1552,6 @@ binlog_truncate_trx_cache(THD *thd, binlog_trx_data *trx_data, bool all) ...@@ -1553,9 +1552,6 @@ binlog_truncate_trx_cache(THD *thd, binlog_trx_data *trx_data, bool all)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
static LEX_STRING const write_error_msg=
{ C_STRING_WITH_LEN("error writing to the binary log") };
static int binlog_prepare(handlerton *hton, THD *thd, bool all) static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{ {
/* /*
...@@ -4028,10 +4024,6 @@ err: ...@@ -4028,10 +4024,6 @@ err:
} }
#ifndef DBUG_OFF
static ulong opt_binlog_dbug_fsync_sleep= 0;
#endif
bool MYSQL_BIN_LOG::flush_and_sync() bool MYSQL_BIN_LOG::flush_and_sync()
{ {
int err=0, fd=log_file.file; int err=0, fd=log_file.file;
...@@ -4043,9 +4035,8 @@ bool MYSQL_BIN_LOG::flush_and_sync() ...@@ -4043,9 +4035,8 @@ bool MYSQL_BIN_LOG::flush_and_sync()
sync_binlog_counter= 0; sync_binlog_counter= 0;
err=my_sync(fd, MYF(MY_WME)); err=my_sync(fd, MYF(MY_WME));
#ifndef DBUG_OFF #ifndef DBUG_OFF
ulong usec_sleep= opt_binlog_dbug_fsync_sleep; if (opt_binlog_dbug_fsync_sleep > 0)
if (usec_sleep > 0) my_sleep(opt_binlog_dbug_fsync_sleep);
my_sleep(usec_sleep);
#endif #endif
} }
return err; return err;
...@@ -4512,7 +4503,19 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -4512,7 +4503,19 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
#endif /* USING_TRANSACTIONS */ #endif /* USING_TRANSACTIONS */
DBUG_PRINT("info",("event type: %d",event_info->get_type_code())); DBUG_PRINT("info",("event type: %d",event_info->get_type_code()));
if (file == &log_file) if (file == &log_file)
{
pthread_mutex_lock(&LOCK_log); pthread_mutex_lock(&LOCK_log);
/*
We did not want to take LOCK_log unless really necessary.
However, now that we hold LOCK_log, we must check is_open() again, lest
the log was closed just before.
*/
if (unlikely(!is_open()))
{
pthread_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
}
/* /*
No check for auto events flag here - this write method should No check for auto events flag here - this write method should
...@@ -4745,6 +4748,7 @@ uint MYSQL_BIN_LOG::next_file_id() ...@@ -4745,6 +4748,7 @@ uint MYSQL_BIN_LOG::next_file_id()
int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
{ {
safe_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
return ER_ERROR_ON_WRITE; return ER_ERROR_ON_WRITE;
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
...@@ -4907,12 +4911,15 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) ...@@ -4907,12 +4911,15 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
Incident_log_event ev(thd, incident, write_error_msg); Incident_log_event ev(thd, incident, write_error_msg);
pthread_mutex_lock(&LOCK_log); pthread_mutex_lock(&LOCK_log);
error= ev.write(&log_file); if (likely(is_open()))
status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
if (!error && !(error= flush_and_sync()))
{ {
signal_update(); error= ev.write(&log_file);
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
if (!error && !(error= flush_and_sync()))
{
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
} }
pthread_mutex_lock(&LOCK_commit_ordered); pthread_mutex_lock(&LOCK_commit_ordered);
last_commit_pos_offset= my_b_tell(&log_file); last_commit_pos_offset= my_b_tell(&log_file);
...@@ -4959,6 +4966,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, ...@@ -4959,6 +4966,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
entry.all= all; entry.all= all;
/* /*
Log "BEGIN" at the beginning of every transaction. Here, a transaction is
either a BEGIN..COMMIT block or a single statement in autocommit mode.
Create the necessary events here, where we have the correct THD (and Create the necessary events here, where we have the correct THD (and
thread context). thread context).
...@@ -5016,7 +5026,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) ...@@ -5016,7 +5026,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
else else
trx_group_commit_leader(entry); trx_group_commit_leader(entry);
if (!entry->error) if (likely(!entry->error))
return 0; return 0;
switch (entry->error) switch (entry->error)
...@@ -5044,7 +5054,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) ...@@ -5044,7 +5054,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
we need to mark it as not needed for recovery (unlog() is not called we need to mark it as not needed for recovery (unlog() is not called
for a transaction if log_xid() fails). for a transaction if log_xid() fails).
*/ */
if (entry->trx_data->using_xa) if (entry->trx_data->using_xa && entry->trx_data->xa_xid)
mark_xid_done(); mark_xid_done();
return 1; return 1;
...@@ -5111,18 +5121,13 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -5111,18 +5121,13 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
binlog_trx_data *trx_data= current->trx_data; binlog_trx_data *trx_data= current->trx_data;
IO_CACHE *cache= &trx_data->trans_log; IO_CACHE *cache= &trx_data->trans_log;
/* Skip log_xid for transactions without xid, marked by NULL end_event. */
if (!current->end_event)
continue;
/* /*
We only bother to write to the binary log if there is anything We only bother to write to the binary log if there is anything
to write. to write.
*/ */
if (my_b_tell(cache) > 0) if (my_b_tell(cache) > 0)
{ {
current->error= write_transaction(current); if ((current->error= write_transaction(current)))
if (current->error)
current->commit_errno= errno; current->commit_errno= errno;
write_count++; write_count++;
...@@ -5130,10 +5135,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -5130,10 +5135,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
strmake(trx_data->last_commit_pos_file, log_file_name, strmake(trx_data->last_commit_pos_file, log_file_name,
sizeof(trx_data->last_commit_pos_file)-1); sizeof(trx_data->last_commit_pos_file)-1);
commit_offset= commit_offset= my_b_write_tell(&log_file);
log_file.pos_in_file + (log_file.write_pos - log_file.write_buffer);
trx_data->last_commit_pos_offset= commit_offset; trx_data->last_commit_pos_offset= commit_offset;
if (trx_data->using_xa) if (trx_data->using_xa && trx_data->xa_xid)
xid_count++; xid_count++;
} }
...@@ -5192,6 +5196,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -5192,6 +5196,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
current= queue; current= queue;
while (current != NULL) while (current != NULL)
{ {
group_commit_entry *next;
DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered"); DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered");
++num_commits; ++num_commits;
if (current->trx_data->using_xa && !current->error) if (current->trx_data->using_xa && !current->error)
...@@ -5201,7 +5207,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -5201,7 +5207,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
Careful not to access current->next after waking up the other thread! As Careful not to access current->next after waking up the other thread! As
it may change immediately after wakeup. it may change immediately after wakeup.
*/ */
group_commit_entry *next= current->next; next= current->next;
if (current != leader) // Don't wake up ourself if (current != leader) // Don't wake up ourself
current->thd->signal_wakeup_ready(); current->thd->signal_wakeup_ready();
current= next; current= next;
...@@ -5217,19 +5223,7 @@ MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry) ...@@ -5217,19 +5223,7 @@ MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry)
{ {
binlog_trx_data *trx_data= entry->trx_data; binlog_trx_data *trx_data= entry->trx_data;
IO_CACHE *cache= &trx_data->trans_log; IO_CACHE *cache= &trx_data->trans_log;
/*
Log "BEGIN" at the beginning of every transaction. Here, a transaction is
either a BEGIN..COMMIT block or a single statement in autocommit mode. The
event was constructed in write_transaction_to_binlog(), in the thread
running the transaction.
Now this Query_log_event has artificial log_pos 0. It must be
adjusted to reflect the real position in the log. Not doing it
would confuse the slave: it would prevent this one from
knowing where he is in the master's binlog, which would result
in wrong positions being shown to the user, MASTER_POS_WAIT
undue waiting etc.
*/
if (entry->begin_event->write(&log_file)) if (entry->begin_event->write(&log_file))
return ER_ERROR_ON_WRITE; return ER_ERROR_ON_WRITE;
status_var_add(entry->thd->status_var.binlog_bytes_written, status_var_add(entry->thd->status_var.binlog_bytes_written,
...@@ -5694,10 +5688,6 @@ void sql_print_information(const char *format, ...) ...@@ -5694,10 +5688,6 @@ void sql_print_information(const char *format, ...)
} }
static my_bool mutexes_inited;
pthread_mutex_t LOCK_prepare_ordered;
pthread_mutex_t LOCK_commit_ordered;
void void
TC_init() TC_init()
{ {
...@@ -5708,6 +5698,7 @@ TC_init() ...@@ -5708,6 +5698,7 @@ TC_init()
mutexes_inited= TRUE; mutexes_inited= TRUE;
} }
void void
TC_destroy() TC_destroy()
{ {
...@@ -5719,39 +5710,42 @@ TC_destroy() ...@@ -5719,39 +5710,42 @@ TC_destroy()
} }
} }
void void
TC_LOG::run_prepare_ordered(THD *thd, bool all) TC_LOG::run_prepare_ordered(THD *thd, bool all)
{ {
Ha_trx_info *ha_info= Ha_trx_info *ha_info=
all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
safe_mutex_assert_owner(&LOCK_prepare_ordered);
for (; ha_info; ha_info= ha_info->next()) for (; ha_info; ha_info= ha_info->next())
{ {
handlerton *ht= ha_info->ht(); handlerton *ht= ha_info->ht();
if (!ht->prepare_ordered) if (!ht->prepare_ordered)
continue; continue;
safe_mutex_assert_owner(&LOCK_prepare_ordered);
ht->prepare_ordered(ht, thd, all); ht->prepare_ordered(ht, thd, all);
} }
} }
void void
TC_LOG::run_commit_ordered(THD *thd, bool all) TC_LOG::run_commit_ordered(THD *thd, bool all)
{ {
Ha_trx_info *ha_info= Ha_trx_info *ha_info=
all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
safe_mutex_assert_owner(&LOCK_commit_ordered);
for (; ha_info; ha_info= ha_info->next()) for (; ha_info; ha_info= ha_info->next())
{ {
handlerton *ht= ha_info->ht(); handlerton *ht= ha_info->ht();
if (!ht->commit_ordered) if (!ht->commit_ordered)
continue; continue;
safe_mutex_assert_owner(&LOCK_commit_ordered);
ht->commit_ordered(ht, thd, all); ht->commit_ordered(ht, thd, all);
DEBUG_SYNC(thd, "commit_after_run_commit_ordered"); DEBUG_SYNC(thd, "commit_after_run_commit_ordered");
} }
} }
int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_prepare_ordered,
bool need_commit_ordered) bool need_commit_ordered)
...@@ -5781,10 +5775,9 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, ...@@ -5781,10 +5775,9 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
pthread_mutex_unlock(&LOCK_prepare_ordered); pthread_mutex_unlock(&LOCK_prepare_ordered);
} }
cookie= 0;
if (xid) if (xid)
cookie= log_one_transaction(xid); cookie= log_one_transaction(xid);
else
cookie= 0;
if (need_commit_ordered) if (need_commit_ordered)
{ {
...@@ -6506,13 +6499,23 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, ...@@ -6506,13 +6499,23 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
(binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
trx_data->using_xa= TRUE; trx_data->using_xa= TRUE;
trx_data->xa_xid= xid;
if (xid) if (xid)
{ {
Xid_log_event xid_event(thd, xid); Xid_log_event xid_event(thd, xid);
err= binlog_flush_trx_cache(thd, trx_data, &xid_event, all); err= binlog_flush_trx_cache(thd, trx_data, &xid_event, all);
} }
else else
err= binlog_flush_trx_cache(thd, trx_data, NULL, all); {
/*
Empty xid occurs in XA COMMIT ... ONE PHASE.
In this case, we do not have a MySQL xid for the transaction, and the
external XA transaction coordinator will have to handle recovery if
needed. So we end the transaction with a plain COMMIT query event.
*/
Query_log_event end_event(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
err= binlog_flush_trx_cache(thd, trx_data, &end_event, all);
}
DEBUG_SYNC(thd, "binlog_after_log_and_order"); DEBUG_SYNC(thd, "binlog_after_log_and_order");
...@@ -6551,14 +6554,17 @@ TC_LOG_BINLOG::mark_xids_active(uint xid_count) ...@@ -6551,14 +6554,17 @@ TC_LOG_BINLOG::mark_xids_active(uint xid_count)
void void
TC_LOG_BINLOG::mark_xid_done() TC_LOG_BINLOG::mark_xid_done()
{ {
my_bool send_signal;
DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done"); DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done");
pthread_mutex_lock(&LOCK_prep_xids); pthread_mutex_lock(&LOCK_prep_xids);
DBUG_ASSERT(prepared_xids > 0); DBUG_ASSERT(prepared_xids > 0);
if (--prepared_xids == 0) { send_signal= !--prepared_xids;
pthread_mutex_unlock(&LOCK_prep_xids);
if (send_signal) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids)); DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_signal(&COND_prep_xids); pthread_cond_signal(&COND_prep_xids);
} }
pthread_mutex_unlock(&LOCK_prep_xids);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -6666,24 +6672,6 @@ mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file) ...@@ -6666,24 +6672,6 @@ mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file)
#endif /* INNODB_COMPATIBILITY_HOOKS */ #endif /* INNODB_COMPATIBILITY_HOOKS */
static ulonglong binlog_status_var_num_commits;
static ulonglong binlog_status_var_num_group_commits;
static char binlog_snapshot_file[FN_REFLEN];
static ulonglong binlog_snapshot_position;
static SHOW_VAR binlog_status_vars_detail[]=
{
{"commits",
(char *)&binlog_status_var_num_commits, SHOW_LONGLONG},
{"group_commits",
(char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG},
{"snapshot_file",
(char *)&binlog_snapshot_file, SHOW_CHAR},
{"snapshot_position",
(char *)&binlog_snapshot_position, SHOW_LONGLONG},
{NullS, NullS, SHOW_LONG}
};
static int show_binlog_vars(THD *thd, SHOW_VAR *var, char *buff) static int show_binlog_vars(THD *thd, SHOW_VAR *var, char *buff)
{ {
mysql_bin_log.set_status_variables(thd); mysql_bin_log.set_status_variables(thd);
......
...@@ -4168,8 +4168,8 @@ THD::signal_wakeup_ready() ...@@ -4168,8 +4168,8 @@ THD::signal_wakeup_ready()
{ {
pthread_mutex_lock(&LOCK_wakeup_ready); pthread_mutex_lock(&LOCK_wakeup_ready);
wakeup_ready= true; wakeup_ready= true;
pthread_cond_signal(&COND_wakeup_ready);
pthread_mutex_unlock(&LOCK_wakeup_ready); pthread_mutex_unlock(&LOCK_wakeup_ready);
pthread_cond_signal(&COND_wakeup_ready);
} }
......
...@@ -11725,6 +11725,12 @@ static MYSQL_SYSVAR_ENUM(adaptive_checkpoint, srv_adaptive_checkpoint, ...@@ -11725,6 +11725,12 @@ static MYSQL_SYSVAR_ENUM(adaptive_checkpoint, srv_adaptive_checkpoint,
"Enable/Disable flushing along modified age. (none, reflex, [estimate])", "Enable/Disable flushing along modified age. (none, reflex, [estimate])",
NULL, innodb_adaptive_checkpoint_update, 2, &adaptive_checkpoint_typelib); NULL, innodb_adaptive_checkpoint_update, 2, &adaptive_checkpoint_typelib);
static MYSQL_SYSVAR_ULONG(enable_unsafe_group_commit, srv_deprecated_enable_unsafe_group_commit,
PLUGIN_VAR_RQCMDARG,
"Enable/Disable unsafe group commit when support_xa=OFF and use with binlog or other XA storage engine. "
"(Deprecated, and does nothing, group commit is always enabled in a safe way)",
NULL, NULL, 0, 0, 1, 0);
static MYSQL_SYSVAR_ULONG(expand_import, srv_expand_import, static MYSQL_SYSVAR_ULONG(expand_import, srv_expand_import,
PLUGIN_VAR_RQCMDARG, PLUGIN_VAR_RQCMDARG,
"Enable/Disable converting automatically *.ibd files when import tablespace.", "Enable/Disable converting automatically *.ibd files when import tablespace.",
...@@ -11833,6 +11839,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= { ...@@ -11833,6 +11839,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(read_ahead), MYSQL_SYSVAR(read_ahead),
MYSQL_SYSVAR(adaptive_checkpoint), MYSQL_SYSVAR(adaptive_checkpoint),
MYSQL_SYSVAR(flush_log_at_trx_commit_session), MYSQL_SYSVAR(flush_log_at_trx_commit_session),
MYSQL_SYSVAR(enable_unsafe_group_commit),
MYSQL_SYSVAR(expand_import), MYSQL_SYSVAR(expand_import),
MYSQL_SYSVAR(extra_rsegments), MYSQL_SYSVAR(extra_rsegments),
MYSQL_SYSVAR(dict_size_limit), MYSQL_SYSVAR(dict_size_limit),
......
...@@ -229,7 +229,7 @@ extern ulong srv_ibuf_active_contract; ...@@ -229,7 +229,7 @@ extern ulong srv_ibuf_active_contract;
extern ulong srv_ibuf_accel_rate; extern ulong srv_ibuf_accel_rate;
extern ulint srv_checkpoint_age_target; extern ulint srv_checkpoint_age_target;
extern ulong srv_flush_neighbor_pages; extern ulong srv_flush_neighbor_pages;
extern ulong srv_enable_unsafe_group_commit; extern ulong srv_deprecated_enable_unsafe_group_commit;
extern ulong srv_read_ahead; extern ulong srv_read_ahead;
extern ulong srv_adaptive_checkpoint; extern ulong srv_adaptive_checkpoint;
......
...@@ -402,7 +402,7 @@ UNIV_INTERN ulong srv_ibuf_accel_rate = 100; ...@@ -402,7 +402,7 @@ UNIV_INTERN ulong srv_ibuf_accel_rate = 100;
UNIV_INTERN ulint srv_checkpoint_age_target = 0; UNIV_INTERN ulint srv_checkpoint_age_target = 0;
UNIV_INTERN ulong srv_flush_neighbor_pages = 1; /* 0:disable 1:enable */ UNIV_INTERN ulong srv_flush_neighbor_pages = 1; /* 0:disable 1:enable */
UNIV_INTERN ulong srv_enable_unsafe_group_commit = 0; /* 0:disable 1:enable */ UNIV_INTERN ulong srv_deprecated_enable_unsafe_group_commit = 0;
UNIV_INTERN ulong srv_read_ahead = 3; /* 1: random 2: linear 3: Both */ UNIV_INTERN ulong srv_read_ahead = 3; /* 1: random 2: linear 3: Both */
UNIV_INTERN ulong srv_adaptive_checkpoint = 0; /* 0: none 1: reflex 2: estimate */ UNIV_INTERN ulong srv_adaptive_checkpoint = 0; /* 0: none 1: reflex 2: estimate */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment