Commit 787c470c authored by unknown's avatar unknown Committed by Kristian Nielsen

MDEV-5262: Missing retry after temp error in parallel replication

Handle retry of event groups that span multiple relay log files.

 - If retry reaches the end of one relay log file, move on to the next.

 - Handle refcounting of relay log files, and avoid purging relay log
   files until all event groups have completed that might have needed
   them for transaction retry.
parent d6091569
...@@ -141,9 +141,56 @@ a b ...@@ -141,9 +141,56 @@ a b
7 1 7 1
8 1 8 1
9 1 9 1
*** Test retry of event group that spans multiple relay log files. ***
CREATE TABLE t2 (a int PRIMARY KEY, b BLOB) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1,"Hulubullu");
include/stop_slave.inc
SET @old_max= @@GLOBAL.max_relay_log_size;
SET GLOBAL max_relay_log_size=4096;
SET gtid_seq_no = 100;
SET @old_server_id = @@server_id;
SET server_id = 12;
BEGIN;
INSERT INTO t1 VALUES (10, 4);
COMMIT;
SET server_id = @old_server_id;
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a b
10 4
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
a LENGTH(b)
1 9
2 5006
3 5012
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
include/start_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
retries
1
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a b
10 4
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
a LENGTH(b)
1 9
2 5006
3 5012
INSERT INTO t1 VALUES (11,11);
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a b
10 4
11 11
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
a LENGTH(b)
1 9
2 5006
3 5012
4 5000
SET GLOBAL max_relay_log_size=@old_max;
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc include/start_slave.inc
DROP TABLE t1; DROP TABLE t1, t2;
DROP function foo; DROP function foo;
include/rpl_end.inc include/rpl_end.inc
...@@ -149,6 +149,64 @@ STOP SLAVE IO_THREAD; ...@@ -149,6 +149,64 @@ STOP SLAVE IO_THREAD;
--sync_with_master --sync_with_master
SELECT * FROM t1 ORDER BY a; SELECT * FROM t1 ORDER BY a;
--echo *** Test retry of event group that spans multiple relay log files. ***
--connection server_1
CREATE TABLE t2 (a int PRIMARY KEY, b BLOB) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1,"Hulubullu");
--save_master_pos
--connection server_2
--sync_with_master
--source include/stop_slave.inc
SET @old_max= @@GLOBAL.max_relay_log_size;
SET GLOBAL max_relay_log_size=4096;
--connection server_1
--let $big= `SELECT REPEAT("*", 5000)`
SET gtid_seq_no = 100;
SET @old_server_id = @@server_id;
SET server_id = 12;
BEGIN;
--disable_query_log
eval INSERT INTO t2 VALUES (2, CONCAT("Hello ", "$big"));
eval INSERT INTO t2 VALUES (3, CONCAT("Long data: ", "$big"));
--enable_query_log
INSERT INTO t1 VALUES (10, 4);
COMMIT;
SET server_id = @old_server_id;
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
--save_master_pos
--connection server_2
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--source include/start_slave.inc
--sync_with_master
SET GLOBAL debug_dbug=@old_dbug;
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
--connection server_1
INSERT INTO t1 VALUES (11,11);
--disable_query_log
eval INSERT INTO t2 VALUES (4, "$big");
--enable_query_log
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
SET GLOBAL max_relay_log_size=@old_max;
--connection server_2 --connection server_2
--source include/stop_slave.inc --source include/stop_slave.inc
...@@ -156,7 +214,7 @@ SET GLOBAL slave_parallel_threads=@old_parallel_threads; ...@@ -156,7 +214,7 @@ SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc --source include/start_slave.inc
--connection server_1 --connection server_1
DROP TABLE t1; DROP TABLE t1, t2;
DROP function foo; DROP function foo;
--source include/rpl_end.inc --source include/rpl_end.inc
...@@ -4097,6 +4097,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) ...@@ -4097,6 +4097,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
{ {
int error; int error;
char *to_purge_if_included= NULL; char *to_purge_if_included= NULL;
inuse_relaylog *ir;
DBUG_ENTER("purge_first_log"); DBUG_ENTER("purge_first_log");
DBUG_ASSERT(is_open()); DBUG_ASSERT(is_open());
...@@ -4104,7 +4105,30 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) ...@@ -4104,7 +4105,30 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name)); DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
mysql_mutex_lock(&LOCK_index); mysql_mutex_lock(&LOCK_index);
to_purge_if_included= my_strdup(rli->group_relay_log_name, MYF(0));
ir= rli->inuse_relaylog_list;
while (ir)
{
inuse_relaylog *next= ir->next;
if (!ir->completed || ir->dequeued_count < ir->queued_count)
{
included= false;
break;
}
if (!included && 0 == strcmp(ir->name, rli->group_relay_log_name))
break;
if (!next)
{
rli->last_inuse_relaylog= NULL;
included= 1;
to_purge_if_included= my_strdup(ir->name, MYF(0));
}
my_free(ir);
ir= next;
}
rli->inuse_relaylog_list= ir;
if (ir)
to_purge_if_included= my_strdup(ir->name, MYF(0));
/* /*
Read the next log file name from the index file and pass it back to Read the next log file name from the index file and pass it back to
......
...@@ -204,20 +204,14 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) ...@@ -204,20 +204,14 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
} }
#endif #endif
static int
retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog)
{
/* ToDo */
return 0;
}
static int static int
retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
rpl_parallel_thread::queued_event *orig_qev) rpl_parallel_thread::queued_event *orig_qev)
{ {
IO_CACHE rlog; IO_CACHE rlog;
File fd; LOG_INFO linfo;
File fd= (File)-1;
const char *errmsg= NULL; const char *errmsg= NULL;
inuse_relaylog *ir= rgi->relay_log; inuse_relaylog *ir= rgi->relay_log;
uint64 event_count; uint64 event_count;
...@@ -241,7 +235,10 @@ do_retry: ...@@ -241,7 +235,10 @@ do_retry:
strcpy(log_name, ir->name); strcpy(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
return 1; {
err= 1;
goto err;
}
cur_offset= rgi->retry_start_offset; cur_offset= rgi->retry_start_offset;
my_b_seek(&rlog, cur_offset); my_b_seek(&rlog, cur_offset);
...@@ -249,23 +246,67 @@ do_retry: ...@@ -249,23 +246,67 @@ do_retry:
{ {
Log_event_type event_type; Log_event_type event_type;
Log_event *ev; Log_event *ev;
rpl_parallel_thread::queued_event *qev;
/* The loop is here so we can try again the next relay log file on EOF. */
for (;;)
{
old_offset= cur_offset; old_offset= cur_offset;
ev= Log_event::read_log_event(&rlog, 0, ev= Log_event::read_log_event(&rlog, 0,
rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
opt_slave_sql_verify_checksum); opt_slave_sql_verify_checksum);
cur_offset= my_b_tell(&rlog); cur_offset= my_b_tell(&rlog);
if (!ev) if (ev)
break;
if (rlog.error < 0)
{ {
errmsg= "slave SQL thread aborted because of I/O error";
err= 1; err= 1;
goto err; goto err;
} }
ev->thd= thd; if (rlog.error > 0)
{
sql_print_error("Slave SQL thread: I/O error reading "
"event(errno: %d cur_log->error: %d)",
my_errno, rlog.error);
errmsg= "Aborting slave SQL thread because of partial event read";
err= 1;
goto err;
}
/* EOF. Move to the next relay log. */
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
fd= (File)-1;
/* Find the next relay log file. */
if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
(err= rli->relay_log.find_next_log(&linfo, 1)))
{
char buff[22];
sql_print_error("next log error: %d offset: %s log: %s",
err,
llstr(linfo.index_file_offset, buff),
log_name);
goto err;
}
strmake_buf(log_name ,linfo.log_file_name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
err= 1;
goto err;
}
/* Loop to try again on the new log file. */
}
event_type= ev->get_type_code(); event_type= ev->get_type_code();
if (Log_event::is_group_event(event_type)) if (!Log_event::is_group_event(event_type))
{ {
rpl_parallel_thread::queued_event *qev; delete ev;
continue;
}
ev->thd= thd;
mysql_mutex_lock(&rpt->LOCK_rpl_thread); mysql_mutex_lock(&rpt->LOCK_rpl_thread);
qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset, qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
...@@ -283,9 +324,7 @@ do_retry: ...@@ -283,9 +324,7 @@ do_retry:
mysql_mutex_lock(&rpt->LOCK_rpl_thread); mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_qev(qev); rpt->free_qev(qev);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
}
else
err= retry_handle_relay_log_rotate(ev, &rlog);
delete_or_keep_event_post_apply(rgi, event_type, ev); delete_or_keep_event_post_apply(rgi, event_type, ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100", DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd);); if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
...@@ -300,6 +339,7 @@ do_retry: ...@@ -300,6 +339,7 @@ do_retry:
{ {
end_io_cache(&rlog); end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME)); mysql_file_close(fd, MYF(MY_WME));
fd= (File)-1;
goto do_retry; goto do_retry;
} }
sql_print_error("Slave worker thread retried transaction %lu time(s) " sql_print_error("Slave worker thread retried transaction %lu time(s) "
...@@ -309,15 +349,17 @@ do_retry: ...@@ -309,15 +349,17 @@ do_retry:
} }
goto err; goto err;
} }
// ToDo: handle too many retries.
} while (event_count < events_to_execute); } while (event_count < events_to_execute);
err: err:
if (fd >= 0)
{
end_io_cache(&rlog); end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME)); mysql_file_close(fd, MYF(MY_WME));
}
if (errmsg)
sql_print_error("Error reading relay log event: %s", errmsg);
return err; return err;
} }
...@@ -340,6 +382,8 @@ handle_rpl_parallel_thread(void *arg) ...@@ -340,6 +382,8 @@ handle_rpl_parallel_thread(void *arg)
rpl_sql_thread_info sql_info(NULL); rpl_sql_thread_info sql_info(NULL);
size_t total_event_size; size_t total_event_size;
int err; int err;
inuse_relaylog *last_ir;
uint64 accumulated_ir_count;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
...@@ -683,12 +727,34 @@ handle_rpl_parallel_thread(void *arg) ...@@ -683,12 +727,34 @@ handle_rpl_parallel_thread(void *arg)
rpt->free_rgi(rgis_to_free); rpt->free_rgi(rgis_to_free);
rgis_to_free= next; rgis_to_free= next;
} }
last_ir= NULL;
accumulated_ir_count= 0;
while (qevs_to_free) while (qevs_to_free)
{ {
rpl_parallel_thread::queued_event *next= qevs_to_free->next; rpl_parallel_thread::queued_event *next= qevs_to_free->next;
inuse_relaylog *ir= qevs_to_free->ir;
/* Batch up refcount update to reduce use of synchronised operations. */
if (last_ir != ir)
{
if (last_ir)
{
my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
accumulated_ir_count= 0;
}
last_ir= ir;
}
++accumulated_ir_count;
rpt->free_qev(qevs_to_free); rpt->free_qev(qevs_to_free);
qevs_to_free= next; qevs_to_free= next;
} }
if (last_ir)
{
my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
}
if ((events= rpt->event_queue) != NULL) if ((events= rpt->event_queue) != NULL)
{ {
...@@ -1711,6 +1777,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1711,6 +1777,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Queue the event for processing. Queue the event for processing.
*/ */
rli->event_relay_log_pos= rli->future_event_relay_log_pos; rli->event_relay_log_pos= rli->future_event_relay_log_pos;
qev->ir= rli->last_inuse_relaylog;
++qev->ir->queued_count;
cur_thread->enqueue(qev); cur_thread->enqueue(qev);
unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread, unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
&did_enter_cond, &old_stage); &did_enter_cond, &old_stage);
......
...@@ -9,6 +9,7 @@ struct rpl_parallel_entry; ...@@ -9,6 +9,7 @@ struct rpl_parallel_entry;
struct rpl_parallel_thread_pool; struct rpl_parallel_thread_pool;
class Relay_log_info; class Relay_log_info;
struct inuse_relaylog;
/* /*
...@@ -73,6 +74,7 @@ struct rpl_parallel_thread { ...@@ -73,6 +74,7 @@ struct rpl_parallel_thread {
queued_event *next; queued_event *next;
Log_event *ev; Log_event *ev;
rpl_group_info *rgi; rpl_group_info *rgi;
inuse_relaylog *ir;
ulonglong future_event_relay_log_pos; ulonglong future_event_relay_log_pos;
char event_relay_log_name[FN_REFLEN]; char event_relay_log_name[FN_REFLEN];
char future_event_master_log_name[FN_REFLEN]; char future_event_master_log_name[FN_REFLEN];
......
...@@ -92,6 +92,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -92,6 +92,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL); mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL); mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL); mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
my_atomic_rwlock_init(&inuse_relaylog_atomic_lock);
relay_log.init_pthread_objects(); relay_log.init_pthread_objects();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -117,6 +118,7 @@ Relay_log_info::~Relay_log_info() ...@@ -117,6 +118,7 @@ Relay_log_info::~Relay_log_info()
mysql_cond_destroy(&start_cond); mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond); mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&log_space_cond); mysql_cond_destroy(&log_space_cond);
my_atomic_rwlock_destroy(&inuse_relaylog_atomic_lock);
relay_log.cleanup(); relay_log.cleanup();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1365,7 +1367,10 @@ Relay_log_info::alloc_inuse_relaylog(const char *name) ...@@ -1365,7 +1367,10 @@ Relay_log_info::alloc_inuse_relaylog(const char *name)
if (!inuse_relaylog_list) if (!inuse_relaylog_list)
inuse_relaylog_list= ir; inuse_relaylog_list= ir;
else else
{
last_inuse_relaylog->completed= true;
last_inuse_relaylog->next= ir; last_inuse_relaylog->next= ir;
}
last_inuse_relaylog= ir; last_inuse_relaylog= ir;
return 0; return 0;
......
...@@ -170,6 +170,7 @@ public: ...@@ -170,6 +170,7 @@ public:
*/ */
inuse_relaylog *inuse_relaylog_list; inuse_relaylog *inuse_relaylog_list;
inuse_relaylog *last_inuse_relaylog; inuse_relaylog *last_inuse_relaylog;
my_atomic_rwlock_t inuse_relaylog_atomic_lock;
/* /*
Needed to deal properly with cur_log getting closed and re-opened with Needed to deal properly with cur_log getting closed and re-opened with
...@@ -481,12 +482,26 @@ private: ...@@ -481,12 +482,26 @@ private:
Each rpl_group_info has a pointer to one of those, corresponding to the Each rpl_group_info has a pointer to one of those, corresponding to the
first GTID event. first GTID event.
A reference count keeps track of how long a relay log is potentially in use. A pair of reference count keeps track of how long a relay log is potentially
in use. When the `completed' flag is set, all events have been read out of
the relay log, but the log might still be needed for retry in worker
threads. As worker threads complete an event group, they increment
atomically the `dequeued_count' with number of events queued. Thus, when
completed is set and dequeued_count equals queued_count, the relay log file
is finally done with and can be purged.
By separating the queued and dequeued count, only the dequeued_count needs
multi-thread synchronisation; the completed flag and queued_count fields
are only accessed by the SQL driver thread and need no synchronisation.
*/ */
struct inuse_relaylog { struct inuse_relaylog {
inuse_relaylog *next; inuse_relaylog *next;
uint64 queued_count; /* Number of events in this relay log queued for worker threads. */
uint64 dequeued_count; int64 queued_count;
/* Number of events completed by worker threads. */
volatile int64 dequeued_count;
/* Set when all events have been read from a relaylog. */
bool completed;
char name[FN_REFLEN]; char name[FN_REFLEN];
}; };
......
...@@ -6397,6 +6397,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) ...@@ -6397,6 +6397,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
DBUG_ASSERT(rli->cur_log_fd >= 0); DBUG_ASSERT(rli->cur_log_fd >= 0);
mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1; rli->cur_log_fd = -1;
rli->last_inuse_relaylog->completed= true;
if (relay_log_purge) if (relay_log_purge)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment