Commit a09d2b10 authored by unknown's avatar unknown

MDEV-4506: Parallel replication.

Fix some more parts of old-style position updates.
Now we save in rgi some coordinates for master log and relay log, so
that in do_update_pos() we can use the right set of coordinates with
the right events.

The Rotate_log_event::do_update_pos() is fixed in the parallel case
to not directly update relay-log.info (as Rotate event runs directly
in the driver SQL thread, ahead of actual event execution). Instead,
group_master_log_file is updated as part of do_update_pos() in each
event execution.

In the parallel case, position updates happen in parallel without
any ordering, but taking care that position is not updated backwards.
Since position update happens only after event execution this leads
to the right result.

Also fix an access-after-free introduced in an earlier commit.
parent 7681c6aa
......@@ -3843,17 +3843,6 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
thd->variables.auto_increment_increment= auto_increment_increment;
thd->variables.auto_increment_offset= auto_increment_offset;
/*
InnoDB internally stores the master log position it has executed so far,
i.e. the position just after the COMMIT event.
When InnoDB will want to store, the positions in rli won't have
been updated yet, so group_master_log_* will point to old BEGIN
and event_master_log* will point to the beginning of current COMMIT.
But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
END of the current log event (COMMIT). We save it in rli so that InnoDB can
access it.
*/
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
......@@ -3882,7 +3871,6 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
invariants like IN_STMT flag must be off at committing the transaction.
*/
rgi->inc_event_relay_log_pos();
const_cast<Relay_log_info*>(rli)->clear_flag(Relay_log_info::IN_STMT);
}
else
{
......@@ -5535,16 +5523,6 @@ int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi,
thd->lex->local_file= local_fname;
mysql_reset_thd_for_next_command(thd, 0);
if (!use_rli_only_for_errors)
{
/*
Saved for InnoDB, see comment in
Query_log_event::do_apply_event()
*/
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
}
/*
We test replicate_*_db rules. Note that we have already prepared
the file to load, even if we are going to ignore and delete it
......@@ -5940,11 +5918,16 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
correspond to the beginning of the transaction. Starting from
5.0.0, there also are some rotates from the slave itself, in the
relay log, which shall not change the group positions.
In parallel replication, rotate event is executed out-of-band with normal
events, so we cannot update group_master_log_name or _pos here, it will
be updated with the next normal event instead.
*/
if ((server_id != global_system_variables.server_id ||
rli->replicate_same_server_id) &&
!is_relay_log_event() &&
!rli->is_in_group())
!rli->is_in_group() &&
!rgi->is_parallel_exec)
{
mysql_mutex_lock(&rli->data_lock);
DBUG_PRINT("info", ("old group_master_log_name: '%s' "
......@@ -7712,7 +7695,7 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi)
*/
if (rli->get_flag(Relay_log_info::IN_TRANSACTION))
rgi->inc_event_relay_log_pos();
else
else if (!rgi->is_parallel_exec)
{
rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi);
rli->inc_group_relay_log_pos(0, rgi);
......@@ -8408,7 +8391,6 @@ int Execute_load_log_event::do_apply_event(rpl_group_info *rgi)
calls mysql_load()).
*/
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
if (lev->do_apply_event(0,rgi,1))
{
/*
......
......@@ -64,7 +64,11 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
qev->ev->thd= thd;
strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name);
rgi->event_relay_log_name= rgi->event_relay_log_name_buf;
rgi->event_relay_log_pos= qev->event_relay_log_pos;
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
......@@ -660,7 +664,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
}
qev->ev= ev;
qev->next= NULL;
strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
qev->event_relay_log_pos= rli->event_relay_log_pos;
qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
if (typ == GTID_EVENT)
{
......@@ -674,6 +681,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
delete rgi;
return true;
}
rgi->is_parallel_exec = true;
if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
rgi->deferred_events= new Deferred_log_events(rli);
......@@ -783,6 +791,14 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
have GTID, like a MariaDB 5.5 or MySQL master.
*/
qev->rgi= serial_rgi;
/* Handle master log name change, seen in Rotate_log_event. */
if (typ == ROTATE_EVENT)
{
Rotate_log_event *rev= static_cast<Rotate_log_event *>(qev->ev);
memcpy(rli->future_event_master_log_name,
rev->new_log_ident, rev->ident_len+1);
}
rpt_handle_event(qev, NULL);
delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev);
my_free(qev);
......
......@@ -24,6 +24,9 @@ struct rpl_parallel_thread {
Log_event *ev;
rpl_group_info *rgi;
ulonglong future_event_relay_log_pos;
char event_relay_log_name[FN_REFLEN];
char future_event_master_log_name[FN_REFLEN];
ulonglong event_relay_log_pos;
} *event_queue, *last_in_queue;
};
......
......@@ -877,7 +877,9 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
if (!skip_lock)
mysql_mutex_lock(&data_lock);
rgi->inc_event_relay_log_pos();
if (opt_slave_parallel_threads > 0)
DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
(long) log_pos, (long) group_master_log_pos));
if (rgi->is_parallel_exec)
{
/* In case of parallel replication, do not update the position backwards. */
int cmp= strcmp(group_relay_log_name, event_relay_log_name);
......@@ -888,6 +890,18 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
notify_group_relay_log_name_update();
} else if (cmp == 0 && group_relay_log_pos < event_relay_log_pos)
group_relay_log_pos= event_relay_log_pos;
cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name);
if (cmp <= 0)
{
if (cmp < 0)
{
strcpy(group_master_log_name, rgi->future_event_master_log_name);
notify_group_master_log_name_update();
}
if (group_master_log_pos < log_pos)
group_master_log_pos= log_pos;
}
}
else
{
......@@ -895,6 +909,8 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
group_relay_log_pos= event_relay_log_pos;
strmake_buf(group_relay_log_name, event_relay_log_name);
notify_group_relay_log_name_update();
if (log_pos) // 3.23 binlogs don't have log_posx
group_master_log_pos= log_pos;
}
/*
......@@ -927,12 +943,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
the relay log is not "val".
With the end_log_pos solution, we avoid computations involving lengthes.
*/
DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
(long) log_pos, (long) group_master_log_pos));
if (log_pos) // 3.23 binlogs don't have log_posx
{
group_master_log_pos= log_pos;
}
mysql_cond_broadcast(&data_cond);
if (!skip_lock)
mysql_mutex_unlock(&data_lock);
......@@ -1436,6 +1446,7 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_)
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
is_parallel_exec(false),
row_stmt_start_timestamp(0), long_find_row_note_printed(false)
{
bzero(&current_gtid, sizeof(current_gtid));
......
......@@ -185,6 +185,10 @@ class Relay_log_info : public Slave_reporting_capability
char event_relay_log_name[FN_REFLEN];
ulonglong event_relay_log_pos;
ulonglong future_event_relay_log_pos;
/*
The master log name for current event. Only used in parallel replication.
*/
char future_event_master_log_name[FN_REFLEN];
#ifdef HAVE_valgrind
bool is_fake; /* Mark that this is a fake relay log info structure */
......@@ -216,18 +220,6 @@ class Relay_log_info : public Slave_reporting_capability
*/
bool sql_force_rotate_relay;
/*
When it commits, InnoDB internally stores the master log position it has
processed so far; the position to store is the one of the end of the
committing event (the COMMIT query event, or the event if in autocommit
mode).
*/
#if MYSQL_VERSION_ID < 40100
ulonglong future_master_log_pos;
#else
ulonglong future_group_master_log_pos;
#endif
time_t last_master_timestamp;
void clear_until_condition();
......@@ -557,7 +549,15 @@ struct rpl_group_info
*/
time_t last_event_start_time;
char *event_relay_log_name;
char event_relay_log_name_buf[FN_REFLEN];
ulonglong event_relay_log_pos;
ulonglong future_event_relay_log_pos;
/*
The master log name for current event. Only used in parallel replication.
*/
char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec;
private:
/*
......@@ -685,7 +685,7 @@ struct rpl_group_info
inline void inc_event_relay_log_pos()
{
if (opt_slave_parallel_threads == 0 ||
if (!is_parallel_exec ||
rli->event_relay_log_pos < future_event_relay_log_pos)
rli->event_relay_log_pos= future_event_relay_log_pos;
}
......
......@@ -3361,6 +3361,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
}
serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos;
serial_rgi->event_relay_log_name= rli->event_relay_log_name;
serial_rgi->event_relay_log_pos= rli->event_relay_log_pos;
exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL);
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
......
......@@ -4228,6 +4228,8 @@ static bool check_pseudo_slave_mode(sys_var *self, THD *thd, set_var *var)
#ifndef EMBEDDED_LIBRARY
delete thd->rli_fake;
thd->rli_fake= NULL;
delete thd->rgi_fake;
thd->rgi_fake= NULL;
#endif
}
else if (previous_val && val)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment