Commit b0b60f24 authored by unknown's avatar unknown Committed by Kristian Nielsen

MDEV-5262: Missing retry after temp error in parallel replication

Start implementing that an event group can be re-tried in parallel replication
if it fails with a temporary error (like deadlock).

Patch is very incomplete, just some very basic retry works.

Stuff still missing (not complete list):

 - Handle moving to the next relay log file, if event group to be retried
   spans multiple relay log files.

 - Handle refcounting of relay log files, to ensure that we do not purge a
   relay log file and then later attempt to re-execute events out of it.

 - Handle description_event_for_exec - we need to save this somehow for the
   possible retry - and use the correct one in case it differs between relay
   logs.

 - Do another retry attempt in case the first retry also fails.

 - Limit the max number of retries.

 - Lots of testing will be needed for the various edge cases.
parent 2b4b857d
include/rpl_init.inc [topology=1->2]
*** Test retry of transactions that fail to replicate due to deadlock or similar temporary error. ***
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1,1);
SET sql_log_bin=0;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
SET sql_log_bin=1;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=5;
include/start_slave.inc
SET sql_log_bin=0;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
IF d1 != '' THEN
SET debug_sync = d1;
END IF;
IF d2 != '' THEN
SET debug_sync = d2;
END IF;
RETURN x;
END
||
include/stop_slave.inc
SET @old_format= @@SESSION.binlog_format;
SET binlog_format='statement';
SET gtid_seq_no = 100;
BEGIN;
INSERT INTO t1 VALUES (2,1);
UPDATE t1 SET b=b+1 WHERE a=1;
INSERT INTO t1 VALUES (3,1);
COMMIT;
SET binlog_format=@old_format;
SELECT * FROM t1 ORDER BY a;
a b
1 2
2 1
3 1
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_1_100";
include/start_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
retries
1
SELECT * FROM t1 ORDER BY a;
a b
1 2
2 1
3 1
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP TABLE t1;
DROP function foo;
include/rpl_end.inc
--source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
--let $rpl_topology=1->2
--source include/rpl_init.inc
--echo *** Test retry of transactions that fail to replicate due to deadlock or similar temporary error. ***
--connection server_1
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1,1);
--save_master_pos
# Use a stored function to inject a debug_sync into the appropriate THD.
# The function does nothing on the master, and on the slave it injects the
# desired debug_sync action(s).
SET sql_log_bin=0;
--delimiter ||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
--delimiter ;
SET sql_log_bin=1;
--connection server_2
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=5;
--source include/start_slave.inc
--sync_with_master
SET sql_log_bin=0;
--delimiter ||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
IF d1 != '' THEN
SET debug_sync = d1;
END IF;
IF d2 != '' THEN
SET debug_sync = d2;
END IF;
RETURN x;
END
||
--delimiter ;
--source include/stop_slave.inc
--connection server_1
SET @old_format= @@SESSION.binlog_format;
SET binlog_format='statement';
SET gtid_seq_no = 100;
BEGIN;
INSERT INTO t1 VALUES (2,1);
UPDATE t1 SET b=b+1 WHERE a=1;
#INSERT INTO t1 VALUES (3,foo(1,
# "ha_write_row_end SIGNAL q1_ready WAIT_FOR q1_cont",
# ""));
INSERT INTO t1 VALUES (3,1);
COMMIT;
SET binlog_format=@old_format;
SELECT * FROM t1 ORDER BY a;
--save_master_pos
--connection server_2
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_1_100";
let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--source include/start_slave.inc
--sync_with_master
SET GLOBAL debug_dbug=@old_dbug;
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
SELECT * FROM t1 ORDER BY a;
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
--connection server_1
DROP TABLE t1;
DROP function foo;
--source include/rpl_end.inc
...@@ -7,15 +7,6 @@ ...@@ -7,15 +7,6 @@
/* /*
Code for optional parallel execution of replicated events on the slave. Code for optional parallel execution of replicated events on the slave.
ToDo list:
- Retry of failed transactions is not yet implemented for the parallel case.
- All the waits (eg. in struct wait_for_commit and in
rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
everything needs to be correctly rolled back and stopped in all threads,
to ensure a consistent slave replication state.
*/ */
struct rpl_parallel_thread_pool global_rpl_thread_pool; struct rpl_parallel_thread_pool global_rpl_thread_pool;
...@@ -197,6 +188,105 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, ...@@ -197,6 +188,105 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
} }
static int
retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog)
{
/* ToDo */
return 0;
}
static int
retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
rpl_parallel_thread::queued_event *orig_qev)
{
IO_CACHE rlog;
File fd;
const char *errmsg= NULL;
inuse_relaylog *ir= rgi->relay_log;
uint64 event_count= 0;
uint64 events_to_execute= rgi->retry_event_count;
Relay_log_info *rli= rgi->rli;
int err= 0;
ulonglong cur_offset, old_offset;
char log_name[FN_REFLEN];
THD *thd= rgi->thd;
do_retry:
rgi->cleanup_context(thd, 1);
mysql_mutex_lock(&rli->data_lock);
++rli->retried_trans;
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
strcpy(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
return 1;
cur_offset= rgi->retry_start_offset;
my_b_seek(&rlog, cur_offset);
do
{
Log_event_type event_type;
Log_event *ev;
old_offset= cur_offset;
ev= Log_event::read_log_event(&rlog, 0,
rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
opt_slave_sql_verify_checksum);
cur_offset= my_b_tell(&rlog);
if (!ev)
{
err= 1;
goto err;
}
ev->thd= thd;
event_type= ev->get_type_code();
if (Log_event::is_group_event(event_type))
{
rpl_parallel_thread::queued_event *qev;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
cur_offset - old_offset);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
if (!qev)
{
delete ev;
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto err;
}
err= rpt_handle_event(qev, rpt);
++event_count;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_qev(qev);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
}
else
err= retry_handle_relay_log_rotate(ev, &rlog);
delete_or_keep_event_post_apply(rgi, event_type, ev);
if (err)
{
/* ToDo: Need to here also handle second retry. */
goto err;
}
// ToDo: handle too many retries.
} while (event_count < events_to_execute);
err:
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
return err;
}
pthread_handler_t pthread_handler_t
handle_rpl_parallel_thread(void *arg) handle_rpl_parallel_thread(void *arg)
{ {
...@@ -499,7 +589,23 @@ handle_rpl_parallel_thread(void *arg) ...@@ -499,7 +589,23 @@ handle_rpl_parallel_thread(void *arg)
everything is stopped and cleaned up correctly. everything is stopped and cleaned up correctly.
*/ */
if (likely(!rgi->worker_error) && !skip_event_group) if (likely(!rgi->worker_error) && !skip_event_group)
{
++rgi->retry_event_count;
err= rpt_handle_event(events, rpt); err= rpt_handle_event(events, rpt);
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_1_100",
if (rgi->current_gtid.domain_id == 0 &&
rgi->current_gtid.server_id == 1 &&
rgi->current_gtid.seq_no == 100 &&
rgi->retry_event_count == 4)
{
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
my_error(ER_LOCK_DEADLOCK, MYF(0));
err= 1;
};);
if (err && has_temporary_error(thd))
err= retry_event_group(rgi, rpt, events);
}
else else
err= thd->wait_for_prior_commit(); err= thd->wait_for_prior_commit();
...@@ -802,8 +908,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, ...@@ -802,8 +908,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
rpl_parallel_thread::queued_event * rpl_parallel_thread::queued_event *
rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
Relay_log_info *rli)
{ {
queued_event *qev; queued_event *qev;
mysql_mutex_assert_owner(&LOCK_rpl_thread); mysql_mutex_assert_owner(&LOCK_rpl_thread);
...@@ -817,6 +922,17 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, ...@@ -817,6 +922,17 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
qev->ev= ev; qev->ev= ev;
qev->event_size= event_size; qev->event_size= event_size;
qev->next= NULL; qev->next= NULL;
return qev;
}
rpl_parallel_thread::queued_event *
rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
Relay_log_info *rli)
{
queued_event *qev= get_qev_common(ev, event_size);
if (!qev)
return NULL;
strcpy(qev->event_relay_log_name, rli->event_relay_log_name); strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
qev->event_relay_log_pos= rli->event_relay_log_pos; qev->event_relay_log_pos= rli->event_relay_log_pos;
qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
...@@ -825,6 +941,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, ...@@ -825,6 +941,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
} }
rpl_parallel_thread::queued_event *
rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
const char *relay_log_name,
ulonglong event_pos, ulonglong event_size)
{
queued_event *qev= get_qev_common(ev, event_size);
if (!qev)
return NULL;
qev->rgi= orig_qev->rgi;
strcpy(qev->event_relay_log_name, relay_log_name);
qev->event_relay_log_pos= event_pos;
qev->future_event_relay_log_pos= event_pos+event_size;
strcpy(qev->future_event_master_log_name,
orig_qev->future_event_master_log_name);
return qev;
}
void void
rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
{ {
...@@ -836,7 +970,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) ...@@ -836,7 +970,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
rpl_group_info* rpl_group_info*
rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rpl_parallel_entry *e) rpl_parallel_entry *e, ulonglong event_size)
{ {
rpl_group_info *rgi; rpl_group_info *rgi;
mysql_mutex_assert_owner(&LOCK_rpl_thread); mysql_mutex_assert_owner(&LOCK_rpl_thread);
...@@ -864,6 +998,9 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, ...@@ -864,6 +998,9 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
return NULL; return NULL;
} }
rgi->parallel_entry= e; rgi->parallel_entry= e;
rgi->relay_log= rli->last_inuse_relaylog;
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
return rgi; return rgi;
} }
...@@ -1439,7 +1576,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1439,7 +1576,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{ {
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e))) if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
{ {
cur_thread->free_qev(qev); cur_thread->free_qev(qev);
abandon_worker_thread(rli->sql_driver_thd, cur_thread, abandon_worker_thread(rli->sql_driver_thd, cur_thread,
......
...@@ -106,11 +106,15 @@ struct rpl_parallel_thread { ...@@ -106,11 +106,15 @@ struct rpl_parallel_thread {
queued_size-= dequeue_size; queued_size-= dequeue_size;
} }
queued_event *get_qev_common(Log_event *ev, ulonglong event_size);
queued_event *get_qev(Log_event *ev, ulonglong event_size, queued_event *get_qev(Log_event *ev, ulonglong event_size,
Relay_log_info *rli); Relay_log_info *rli);
queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev,
const char *relay_log_name,
ulonglong event_pos, ulonglong event_size);
void free_qev(queued_event *qev); void free_qev(queued_event *qev);
rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rpl_parallel_entry *e); rpl_parallel_entry *e, ulonglong event_size);
void free_rgi(rpl_group_info *rgi); void free_rgi(rpl_group_info *rgi);
group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev); group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
void free_gco(group_commit_orderer *gco); void free_gco(group_commit_orderer *gco);
......
...@@ -52,6 +52,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -52,6 +52,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
sync_counter(0), is_relay_log_recovery(is_slave_recovery), sync_counter(0), is_relay_log_recovery(is_slave_recovery),
save_temporary_tables(0), mi(0), save_temporary_tables(0), mi(0),
inuse_relaylog_list(0), last_inuse_relaylog(0),
cur_log_old_open_count(0), group_relay_log_pos(0), cur_log_old_open_count(0), group_relay_log_pos(0),
event_relay_log_pos(0), event_relay_log_pos(0),
#if HAVE_valgrind #if HAVE_valgrind
...@@ -98,8 +99,17 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -98,8 +99,17 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
Relay_log_info::~Relay_log_info() Relay_log_info::~Relay_log_info()
{ {
inuse_relaylog *cur;
DBUG_ENTER("Relay_log_info::~Relay_log_info"); DBUG_ENTER("Relay_log_info::~Relay_log_info");
cur= inuse_relaylog_list;
while (cur)
{
DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
inuse_relaylog *next= cur->next;
my_free(cur);
cur= next;
}
mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock); mysql_mutex_destroy(&log_space_lock);
...@@ -1339,6 +1349,29 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, ...@@ -1339,6 +1349,29 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
int
Relay_log_info::alloc_inuse_relaylog(const char *name)
{
inuse_relaylog *ir;
if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
return 1;
}
strcpy(ir->name, name);
if (!inuse_relaylog_list)
inuse_relaylog_list= ir;
else
last_inuse_relaylog->next= ir;
last_inuse_relaylog= ir;
return 0;
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int int
rpl_load_gtid_slave_state(THD *thd) rpl_load_gtid_slave_state(THD *thd)
...@@ -1623,7 +1656,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, ...@@ -1623,7 +1656,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
void rpl_group_info::cleanup_context(THD *thd, bool error) void rpl_group_info::cleanup_context(THD *thd, bool error)
{ {
DBUG_ENTER("Relay_log_info::cleanup_context"); DBUG_ENTER("rpl_group_info::cleanup_context");
DBUG_PRINT("enter", ("error: %d", (int) error)); DBUG_PRINT("enter", ("error: %d", (int) error));
DBUG_ASSERT(this->thd == thd); DBUG_ASSERT(this->thd == thd);
...@@ -1689,7 +1722,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error) ...@@ -1689,7 +1722,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
void rpl_group_info::clear_tables_to_lock() void rpl_group_info::clear_tables_to_lock()
{ {
DBUG_ENTER("Relay_log_info::clear_tables_to_lock()"); DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
#ifndef DBUG_OFF #ifndef DBUG_OFF
/** /**
When replicating in RBR and MyISAM Merge tables are involved When replicating in RBR and MyISAM Merge tables are involved
...@@ -1736,7 +1769,7 @@ void rpl_group_info::clear_tables_to_lock() ...@@ -1736,7 +1769,7 @@ void rpl_group_info::clear_tables_to_lock()
void rpl_group_info::slave_close_thread_tables(THD *thd) void rpl_group_info::slave_close_thread_tables(THD *thd)
{ {
DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)"); DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
thd->get_stmt_da()->set_overwrite_status(true); thd->get_stmt_da()->set_overwrite_status(true);
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
thd->get_stmt_da()->set_overwrite_status(false); thd->get_stmt_da()->set_overwrite_status(false);
......
...@@ -61,6 +61,7 @@ enum { ...@@ -61,6 +61,7 @@ enum {
*****************************************************************************/ *****************************************************************************/
struct rpl_group_info; struct rpl_group_info;
struct inuse_relaylog;
class Relay_log_info : public Slave_reporting_capability class Relay_log_info : public Slave_reporting_capability
{ {
...@@ -163,6 +164,13 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -163,6 +164,13 @@ class Relay_log_info : public Slave_reporting_capability
/* parent Master_info structure */ /* parent Master_info structure */
Master_info *mi; Master_info *mi;
/*
List of active relay log files.
(This can be more than one in case of parallel replication).
*/
inuse_relaylog *inuse_relaylog_list;
inuse_relaylog *last_inuse_relaylog;
/* /*
Needed to deal properly with cur_log getting closed and re-opened with Needed to deal properly with cur_log getting closed and re-opened with
a different log under our feet a different log under our feet
...@@ -398,6 +406,7 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -398,6 +406,7 @@ class Relay_log_info : public Slave_reporting_capability
void stmt_done(my_off_t event_log_pos, void stmt_done(my_off_t event_log_pos,
time_t event_creation_time, THD *thd, time_t event_creation_time, THD *thd,
rpl_group_info *rgi); rpl_group_info *rgi);
int alloc_inuse_relaylog(const char *name);
/** /**
Is the replication inside a group? Is the replication inside a group?
...@@ -463,6 +472,25 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -463,6 +472,25 @@ class Relay_log_info : public Slave_reporting_capability
}; };
/*
In parallel replication, if we need to re-try a transaction due to a
deadlock or other temporary error, we may need to go back and re-read events
out of an earlier relay log.
This structure keeps track of the relaylogs that are potentially in use.
Each rpl_group_info has a pointer to one of those, corresponding to the
first GTID event.
A reference count keeps track of how long a relay log is potentially in use.
*/
struct inuse_relaylog {
inuse_relaylog *next;
uint64 queued_count;
uint64 dequeued_count;
char name[FN_REFLEN];
};
/* /*
This is data for various state needed to be kept for the processing of This is data for various state needed to be kept for the processing of
one event group (transaction) during replication. one event group (transaction) during replication.
...@@ -596,6 +624,14 @@ struct rpl_group_info ...@@ -596,6 +624,14 @@ struct rpl_group_info
/* Needs room for "Gtid D-S-N\x00". */ /* Needs room for "Gtid D-S-N\x00". */
char gtid_info_buf[5+10+1+10+1+20+1]; char gtid_info_buf[5+10+1+10+1+20+1];
/*
Information to be able to re-try an event group in case of a deadlock or
other temporary error.
*/
inuse_relaylog *relay_log;
uint64 retry_start_offset;
uint64 retry_event_count;
rpl_group_info(Relay_log_info *rli_); rpl_group_info(Relay_log_info *rli_);
~rpl_group_info(); ~rpl_group_info();
void reinit(Relay_log_info *rli); void reinit(Relay_log_info *rli);
......
...@@ -3094,7 +3094,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) ...@@ -3094,7 +3094,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
that the error is temporary by pushing a warning with the error code that the error is temporary by pushing a warning with the error code
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary. ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
*/ */
static int has_temporary_error(THD *thd) int
has_temporary_error(THD *thd)
{ {
DBUG_ENTER("has_temporary_error"); DBUG_ENTER("has_temporary_error");
...@@ -4478,6 +4479,9 @@ pthread_handler_t handle_slave_sql(void *arg) ...@@ -4478,6 +4479,9 @@ pthread_handler_t handle_slave_sql(void *arg)
"Error initializing relay log position: %s", errmsg); "Error initializing relay log position: %s", errmsg);
goto err; goto err;
} }
if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
goto err;
strcpy(rli->future_event_master_log_name, rli->group_master_log_name); strcpy(rli->future_event_master_log_name, rli->group_master_log_name);
THD_CHECK_SENTRY(thd); THD_CHECK_SENTRY(thd);
#ifndef DBUG_OFF #ifndef DBUG_OFF
...@@ -6521,6 +6525,12 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) ...@@ -6521,6 +6525,12 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
goto err; goto err;
} }
if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name))
{
if (!hot_log)
mysql_mutex_unlock(log_lock);
goto err;
}
if (!hot_log) if (!hot_log)
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
continue; continue;
...@@ -6536,6 +6546,8 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) ...@@ -6536,6 +6546,8 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
&errmsg)) <0) &errmsg)) <0)
goto err; goto err;
if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name))
goto err;
} }
else else
{ {
......
...@@ -229,6 +229,7 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, ...@@ -229,6 +229,7 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
void set_slave_thread_options(THD* thd); void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, rpl_group_info *rgi); void set_slave_thread_default_charset(THD *thd, rpl_group_info *rgi);
int rotate_relay_log(Master_info* mi); int rotate_relay_log(Master_info* mi);
int has_temporary_error(THD *thd);
int apply_event_and_update_pos(Log_event* ev, THD* thd, int apply_event_and_update_pos(Log_event* ev, THD* thd,
struct rpl_group_info *rgi, struct rpl_group_info *rgi,
rpl_parallel_thread *rpt); rpl_parallel_thread *rpt);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment