Commit 13fddb32 authored by unknown's avatar unknown

MDEV-4506: Parallel replication: Intermediate commit.

Move annotate-event stuff from Relay_log_info to rpl_group_info,
to make it thread safe.
parent 47f8e0ef
...@@ -61,8 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -61,8 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
until_log_pos(0), retried_trans(0), executed_entries(0), until_log_pos(0), retried_trans(0), executed_entries(0),
tables_to_lock(0), tables_to_lock_count(0), tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), m_flags(0), last_event_start_time(0), m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false), row_stmt_start_timestamp(0), long_find_row_note_printed(false)
m_annotate_event(0)
{ {
DBUG_ENTER("Relay_log_info::Relay_log_info"); DBUG_ENTER("Relay_log_info::Relay_log_info");
...@@ -112,7 +111,6 @@ Relay_log_info::~Relay_log_info() ...@@ -112,7 +111,6 @@ Relay_log_info::~Relay_log_info()
mysql_cond_destroy(&log_space_cond); mysql_cond_destroy(&log_space_cond);
mysql_cond_destroy(&sleep_cond); mysql_cond_destroy(&sleep_cond);
relay_log.cleanup(); relay_log.cleanup();
free_annotate_event();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1536,12 +1534,18 @@ rpl_load_gtid_slave_state(THD *thd) ...@@ -1536,12 +1534,18 @@ rpl_load_gtid_slave_state(THD *thd)
rpl_group_info::rpl_group_info(Relay_log_info *rli_) rpl_group_info::rpl_group_info(Relay_log_info *rli_)
: rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0), : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
deferred_events(NULL) deferred_events(NULL), m_annotate_event(0)
{ {
bzero(&current_gtid, sizeof(current_gtid)); bzero(&current_gtid, sizeof(current_gtid));
} }
rpl_group_info::~rpl_group_info()
{
free_annotate_event();
}
int int
event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
{ {
...@@ -1583,7 +1587,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, ...@@ -1583,7 +1587,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
The thd->query will be used to generate new Annotate_rows event The thd->query will be used to generate new Annotate_rows event
during applying the subsequent Rows events. during applying the subsequent Rows events.
*/ */
rgi->rli->set_annotate_event((Annotate_rows_log_event*) ev); rgi->set_annotate_event((Annotate_rows_log_event*) ev);
break; break;
case DELETE_ROWS_EVENT: case DELETE_ROWS_EVENT:
case UPDATE_ROWS_EVENT: case UPDATE_ROWS_EVENT:
...@@ -1593,7 +1597,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, ...@@ -1593,7 +1597,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
event (if any) is not needed anymore and can be deleted. event (if any) is not needed anymore and can be deleted.
*/ */
if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
rgi->rli->free_annotate_event(); rgi->free_annotate_event();
/* fall through */ /* fall through */
default: default:
DBUG_PRINT("info", ("Deleting the event after it has been executed")); DBUG_PRINT("info", ("Deleting the event after it has been executed"));
......
...@@ -472,43 +472,6 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -472,43 +472,6 @@ class Relay_log_info : public Slave_reporting_capability
(m_flags & (1UL << IN_STMT)); (m_flags & (1UL << IN_STMT));
} }
/**
Save pointer to Annotate_rows event and switch on the
binlog_annotate_row_events for this sql thread.
To be called when sql thread recieves an Annotate_rows event.
*/
inline void set_annotate_event(Annotate_rows_log_event *event)
{
free_annotate_event();
m_annotate_event= event;
sql_thd->variables.binlog_annotate_row_events= 1;
}
/**
Returns pointer to the saved Annotate_rows event or NULL if there is
no saved event.
*/
inline Annotate_rows_log_event* get_annotate_event()
{
return m_annotate_event;
}
/**
Delete saved Annotate_rows event (if any) and switch off the
binlog_annotate_row_events for this sql thread.
To be called when sql thread has applied the last (i.e. with
STMT_END_F flag) rbr event.
*/
inline void free_annotate_event()
{
if (m_annotate_event)
{
sql_thd->variables.binlog_annotate_row_events= 0;
delete m_annotate_event;
m_annotate_event= 0;
}
}
time_t get_row_stmt_start_timestamp() time_t get_row_stmt_start_timestamp()
{ {
return row_stmt_start_timestamp; return row_stmt_start_timestamp;
...@@ -553,8 +516,6 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -553,8 +516,6 @@ class Relay_log_info : public Slave_reporting_capability
*/ */
time_t row_stmt_start_timestamp; time_t row_stmt_start_timestamp;
bool long_find_row_note_printed; bool long_find_row_note_printed;
Annotate_rows_log_event *m_annotate_event;
}; };
...@@ -607,9 +568,6 @@ struct rpl_group_info ...@@ -607,9 +568,6 @@ struct rpl_group_info
struct rpl_parallel_entry *parallel_entry; struct rpl_parallel_entry *parallel_entry;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info() { };
/* /*
A container to hold on Intvar-, Rand-, Uservar- log-events in case A container to hold on Intvar-, Rand-, Uservar- log-events in case
the slave is configured with table filtering rules. the slave is configured with table filtering rules.
...@@ -624,6 +582,11 @@ struct rpl_group_info ...@@ -624,6 +582,11 @@ struct rpl_group_info
*/ */
bool deferred_events_collecting; bool deferred_events_collecting;
Annotate_rows_log_event *m_annotate_event;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
/* /*
Returns true if the argument event resides in the containter; Returns true if the argument event resides in the containter;
more specifically, the checking is done against the last added event. more specifically, the checking is done against the last added event.
...@@ -644,6 +607,44 @@ struct rpl_group_info ...@@ -644,6 +607,44 @@ struct rpl_group_info
if (deferred_events) if (deferred_events)
delete deferred_events; delete deferred_events;
}; };
/**
Save pointer to Annotate_rows event and switch on the
binlog_annotate_row_events for this sql thread.
To be called when sql thread recieves an Annotate_rows event.
*/
inline void set_annotate_event(Annotate_rows_log_event *event)
{
free_annotate_event();
m_annotate_event= event;
this->thd->variables.binlog_annotate_row_events= 1;
}
/**
Returns pointer to the saved Annotate_rows event or NULL if there is
no saved event.
*/
inline Annotate_rows_log_event* get_annotate_event()
{
return m_annotate_event;
}
/**
Delete saved Annotate_rows event (if any) and switch off the
binlog_annotate_row_events for this sql thread.
To be called when sql thread has applied the last (i.e. with
STMT_END_F flag) rbr event.
*/
inline void free_annotate_event()
{
if (m_annotate_event)
{
this->thd->variables.binlog_annotate_row_events= 0;
delete m_annotate_event;
m_annotate_event= 0;
}
}
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment