Commit 1b3dc66e authored by unknown's avatar unknown

MDEV-4506: Parallel replication: Intermediate commit.

First step of splitting out part of Relay_log_info, so that different
event groups being applied in parallel can each use their own copy.
parent 7e5dc4f0
This diff is collapsed.
...@@ -1317,9 +1317,9 @@ public: ...@@ -1317,9 +1317,9 @@ public:
@see do_apply_event @see do_apply_event
*/ */
int apply_event(Relay_log_info const *rli) int apply_event(struct rpl_group_info *rgi)
{ {
return do_apply_event(rli); return do_apply_event(rgi);
} }
...@@ -1412,7 +1412,7 @@ protected: ...@@ -1412,7 +1412,7 @@ protected:
@retval 0 Event applied successfully @retval 0 Event applied successfully
@retval errno Error code if event application failed @retval errno Error code if event application failed
*/ */
virtual int do_apply_event(Relay_log_info const *rli) virtual int do_apply_event(struct rpl_group_info *rgi)
{ {
return 0; /* Default implementation does nothing */ return 0; /* Default implementation does nothing */
} }
...@@ -1966,10 +1966,10 @@ public: ...@@ -1966,10 +1966,10 @@ public:
public: /* !!! Public in this patch to allow old usage */ public: /* !!! Public in this patch to allow old usage */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli); virtual int do_update_pos(Relay_log_info *rli);
int do_apply_event(Relay_log_info const *rli, int do_apply_event(struct rpl_group_info *rgi,
const char *query_arg, const char *query_arg,
uint32 q_len_arg); uint32 q_len_arg);
static bool peek_is_commit_rollback(const char *event_start, static bool peek_is_commit_rollback(const char *event_start,
...@@ -2083,7 +2083,7 @@ public: ...@@ -2083,7 +2083,7 @@ public:
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const* rli); virtual int do_apply_event(struct rpl_group_info *rgi);
#endif #endif
}; };
...@@ -2396,12 +2396,12 @@ public: ...@@ -2396,12 +2396,12 @@ public:
public: /* !!! Public in this patch to allow old usage */ public: /* !!! Public in this patch to allow old usage */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const* rli) virtual int do_apply_event(struct rpl_group_info *rgi)
{ {
return do_apply_event(thd->slave_net,rli,0); return do_apply_event(thd->slave_net,rgi,0);
} }
int do_apply_event(NET *net, Relay_log_info const *rli, int do_apply_event(NET *net, struct rpl_group_info *rgi,
bool use_rli_only_for_errors); bool use_rli_only_for_errors);
#endif #endif
}; };
...@@ -2480,7 +2480,7 @@ public: ...@@ -2480,7 +2480,7 @@ public:
protected: protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info*) virtual enum_skip_reason do_shall_skip(Relay_log_info*)
{ {
/* /*
...@@ -2576,7 +2576,7 @@ public: ...@@ -2576,7 +2576,7 @@ public:
static bool is_version_before_checksum(const master_version_split *version_split); static bool is_version_before_checksum(const master_version_split *version_split);
protected: protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli); virtual int do_update_pos(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif #endif
...@@ -2655,7 +2655,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg, ...@@ -2655,7 +2655,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg,
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli); virtual int do_update_pos(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif #endif
...@@ -2734,7 +2734,7 @@ class Rand_log_event: public Log_event ...@@ -2734,7 +2734,7 @@ class Rand_log_event: public Log_event
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli); virtual int do_update_pos(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif #endif
...@@ -2783,7 +2783,7 @@ class Xid_log_event: public Log_event ...@@ -2783,7 +2783,7 @@ class Xid_log_event: public Log_event
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
enum_skip_reason do_shall_skip(Relay_log_info *rli); enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif #endif
}; };
...@@ -2850,7 +2850,7 @@ public: ...@@ -2850,7 +2850,7 @@ public:
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli); virtual int do_update_pos(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif #endif
...@@ -3099,7 +3099,7 @@ public: ...@@ -3099,7 +3099,7 @@ public:
uint16 flags, bool is_transactional, uint64 commit_id); uint16 flags, bool is_transactional, uint64 commit_id);
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol); void pack_info(THD *thd, Protocol *protocol);
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli); virtual int do_update_pos(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif #endif
...@@ -3229,7 +3229,7 @@ public: ...@@ -3229,7 +3229,7 @@ public:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
bool to_packet(String *packet); bool to_packet(String *packet);
bool write(IO_CACHE *file); bool write(IO_CACHE *file);
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
#endif #endif
static bool peek(const char *event_start, uint32 event_len, static bool peek(const char *event_start, uint32 event_len,
uint8 checksum_alg, uint8 checksum_alg,
...@@ -3308,7 +3308,7 @@ public: ...@@ -3308,7 +3308,7 @@ public:
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
#endif #endif
}; };
...@@ -3363,7 +3363,7 @@ public: ...@@ -3363,7 +3363,7 @@ public:
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
#endif #endif
}; };
...@@ -3404,7 +3404,7 @@ public: ...@@ -3404,7 +3404,7 @@ public:
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
#endif #endif
}; };
...@@ -3444,7 +3444,7 @@ public: ...@@ -3444,7 +3444,7 @@ public:
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
#endif #endif
}; };
...@@ -3543,7 +3543,7 @@ public: ...@@ -3543,7 +3543,7 @@ public:
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
#endif #endif
}; };
...@@ -3615,7 +3615,7 @@ public: ...@@ -3615,7 +3615,7 @@ public:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
private: private:
virtual int do_apply_event(Relay_log_info const*); virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info*); virtual int do_update_pos(Relay_log_info*);
virtual enum_skip_reason do_shall_skip(Relay_log_info*); virtual enum_skip_reason do_shall_skip(Relay_log_info*);
#endif #endif
...@@ -4030,7 +4030,7 @@ public: ...@@ -4030,7 +4030,7 @@ public:
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli); virtual int do_update_pos(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif #endif
...@@ -4258,7 +4258,7 @@ protected: ...@@ -4258,7 +4258,7 @@ protected:
private: private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli); virtual int do_update_pos(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
...@@ -4592,7 +4592,7 @@ public: ...@@ -4592,7 +4592,7 @@ public:
#endif #endif
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli); virtual int do_apply_event(struct rpl_group_info *rgi);
#endif #endif
virtual bool write_data_header(IO_CACHE *file); virtual bool write_data_header(IO_CACHE *file);
......
...@@ -65,17 +65,18 @@ int ...@@ -65,17 +65,18 @@ int
rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli) rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli)
{ {
uint64 sub_id; uint64 sub_id;
struct rpl_group_info *rgi;
/* /*
Update the GTID position, if we have it and did not already update Update the GTID position, if we have it and did not already update
it in a GTID transaction. it in a GTID transaction.
*/ */
if ((sub_id= rli->gtid_sub_id)) if ((rgi= rli->group_info) && (sub_id= rgi->gtid_sub_id))
{ {
rli->gtid_sub_id= 0; rgi->gtid_sub_id= 0;
if (record_gtid(thd, &rli->current_gtid, sub_id, false, false)) if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
return 1; return 1;
update_state_hash(sub_id, &rli->current_gtid); update_state_hash(sub_id, &rgi->current_gtid);
} }
return 0; return 0;
} }
......
...@@ -13,15 +13,18 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, ...@@ -13,15 +13,18 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt) struct rpl_parallel_thread *rpt)
{ {
int err; int err;
Relay_log_info *rli= qev->rli; struct rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
thd->rli_slave= rli; thd->rli_slave= rli;
thd->rpl_filter = rli->mi->rpl_filter; thd->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */ /* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock); mysql_mutex_lock(&rli->data_lock);
err= apply_event_and_update_pos(qev->ev, thd, rli, rpt); err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
/* ToDo: error handling. */ /* ToDo: error handling. */
/* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */ /* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */
my_free(rgi);
rgi= NULL;
} }
...@@ -398,7 +401,8 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd) ...@@ -398,7 +401,8 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
return true; return true;
} }
qev->ev= ev; qev->ev= ev;
qev->rli= rli; qev->rgi= rli->group_info;
rli->group_info= NULL; /* Avoid conflict with groups applied in parallel */
qev->next= NULL; qev->next= NULL;
if (ev->get_type_code() == GTID_EVENT) if (ev->get_type_code() == GTID_EVENT)
......
...@@ -23,7 +23,7 @@ struct rpl_parallel_thread { ...@@ -23,7 +23,7 @@ struct rpl_parallel_thread {
struct queued_event { struct queued_event {
queued_event *next; queued_event *next;
Log_event *ev; Log_event *ev;
Relay_log_info *rli; struct rpl_group_info *rgi;
} *event_queue, *last_in_queue; } *event_queue, *last_in_queue;
rpl_parallel_thread *wait_for; /* ToDo: change this ... */ rpl_parallel_thread *wait_for; /* ToDo: change this ... */
}; };
......
...@@ -59,7 +59,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -59,7 +59,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
abort_pos_wait(0), slave_run_id(0), sql_thd(0), abort_pos_wait(0), slave_run_id(0), sql_thd(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0), until_log_pos(0), retried_trans(0), executed_entries(0),
gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0), group_info(0), tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), deferred_events(NULL),m_flags(0), last_event_start_time(0), deferred_events(NULL),m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false), row_stmt_start_timestamp(0), long_find_row_note_printed(false),
m_annotate_event(0) m_annotate_event(0)
...@@ -113,6 +113,8 @@ Relay_log_info::~Relay_log_info() ...@@ -113,6 +113,8 @@ Relay_log_info::~Relay_log_info()
mysql_cond_destroy(&sleep_cond); mysql_cond_destroy(&sleep_cond);
relay_log.cleanup(); relay_log.cleanup();
free_annotate_event(); free_annotate_event();
if (group_info)
my_free(group_info);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
......
...@@ -53,6 +53,8 @@ class Master_info; ...@@ -53,6 +53,8 @@ class Master_info;
*****************************************************************************/ *****************************************************************************/
struct rpl_group_info;
class Relay_log_info : public Slave_reporting_capability class Relay_log_info : public Slave_reporting_capability
{ {
public: public:
...@@ -312,13 +314,8 @@ public: ...@@ -312,13 +314,8 @@ public:
char slave_patternload_file[FN_REFLEN]; char slave_patternload_file[FN_REFLEN];
size_t slave_patternload_file_size; size_t slave_patternload_file_size;
/* /* Various data related to the currently executing event group. */
Current GTID being processed. struct rpl_group_info *group_info;
The sub_id gives the binlog order within one domain_id. A zero sub_id
means that there is no active GTID.
*/
uint64 gtid_sub_id;
rpl_gtid current_gtid;
rpl_parallel parallel; rpl_parallel parallel;
Relay_log_info(bool is_slave_recovery); Relay_log_info(bool is_slave_recovery);
...@@ -596,6 +593,26 @@ private: ...@@ -596,6 +593,26 @@ private:
}; };
/*
This is data for various state needed to be kept for the processing of
one event group in the SQL thread.
For single-threaded replication it is linked from the RLI, for parallel
replication it is linked into each event group being executed in parallel.
*/
struct rpl_group_info
{
Relay_log_info *rli;
/*
Current GTID being processed.
The sub_id gives the binlog order within one domain_id. A zero sub_id
means that there is no active GTID.
*/
uint64 gtid_sub_id;
rpl_gtid current_gtid;
};
// Defined in rpl_rli.cc // Defined in rpl_rli.cc
int init_relay_log_info(Relay_log_info* rli, const char* info_fname); int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
......
...@@ -1143,9 +1143,10 @@ bool Deferred_log_events::is_empty() ...@@ -1143,9 +1143,10 @@ bool Deferred_log_events::is_empty()
return array.elements == 0; return array.elements == 0;
} }
bool Deferred_log_events::execute(Relay_log_info *rli) bool Deferred_log_events::execute(struct rpl_group_info *rgi)
{ {
bool res= false; bool res= false;
Relay_log_info *rli= rgi->rli;
DBUG_ASSERT(rli->deferred_events_collecting); DBUG_ASSERT(rli->deferred_events_collecting);
...@@ -1154,7 +1155,7 @@ bool Deferred_log_events::execute(Relay_log_info *rli) ...@@ -1154,7 +1155,7 @@ bool Deferred_log_events::execute(Relay_log_info *rli)
{ {
Log_event *ev= (* (Log_event **) Log_event *ev= (* (Log_event **)
dynamic_array_ptr(&array, i)); dynamic_array_ptr(&array, i));
res= ev->apply_event(rli); res= ev->apply_event(rgi);
} }
rli->deferred_events_collecting= true; rli->deferred_events_collecting= true;
return res; return res;
......
...@@ -275,7 +275,7 @@ public: ...@@ -275,7 +275,7 @@ public:
/* queue for exection at Query-log-event time prior the Query */ /* queue for exection at Query-log-event time prior the Query */
int add(Log_event *ev); int add(Log_event *ev);
bool is_empty(); bool is_empty();
bool execute(Relay_log_info *rli); bool execute(struct rpl_group_info *rgi);
void rewind(); void rewind();
bool is_last(Log_event *ev) { return ev == last_added; }; bool is_last(Log_event *ev) { return ev == last_added; };
}; };
......
...@@ -3018,10 +3018,12 @@ static int has_temporary_error(THD *thd) ...@@ -3018,10 +3018,12 @@ static int has_temporary_error(THD *thd)
@retval 2 No error calling ev->apply_event(), but error calling @retval 2 No error calling ev->apply_event(), but error calling
ev->update_pos(). ev->update_pos().
*/ */
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, int apply_event_and_update_pos(Log_event* ev, THD* thd,
struct rpl_group_info *rgi,
rpl_parallel_thread *rpt) rpl_parallel_thread *rpt)
{ {
int exec_res= 0; int exec_res= 0;
Relay_log_info* rli= rgi->rli;
DBUG_ENTER("apply_event_and_update_pos"); DBUG_ENTER("apply_event_and_update_pos");
...@@ -3080,7 +3082,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, ...@@ -3080,7 +3082,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
} }
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
if (reason == Log_event::EVENT_SKIP_NOT) if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli); exec_res= ev->apply_event(rgi);
#ifndef DBUG_OFF #ifndef DBUG_OFF
/* /*
...@@ -3244,7 +3246,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) ...@@ -3244,7 +3246,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
if (opt_slave_parallel_threads > 0) if (opt_slave_parallel_threads > 0)
DBUG_RETURN(rli->parallel.do_event(rli, ev, thd)); DBUG_RETURN(rli->parallel.do_event(rli, ev, thd));
exec_res= apply_event_and_update_pos(ev, thd, rli, NULL); exec_res= apply_event_and_update_pos(ev, thd, rli->group_info, NULL);
switch (ev->get_type_code()) { switch (ev->get_type_code()) {
case FORMAT_DESCRIPTION_EVENT: case FORMAT_DESCRIPTION_EVENT:
...@@ -5734,6 +5736,7 @@ static Log_event* next_event(Relay_log_info* rli) ...@@ -5734,6 +5736,7 @@ static Log_event* next_event(Relay_log_info* rli)
mysql_mutex_t *log_lock = rli->relay_log.get_log_lock(); mysql_mutex_t *log_lock = rli->relay_log.get_log_lock();
const char* errmsg=0; const char* errmsg=0;
THD* thd = rli->sql_thd; THD* thd = rli->sql_thd;
struct rpl_group_info *rgi;
DBUG_ENTER("next_event"); DBUG_ENTER("next_event");
DBUG_ASSERT(thd != 0); DBUG_ASSERT(thd != 0);
...@@ -5821,6 +5824,19 @@ static Log_event* next_event(Relay_log_info* rli) ...@@ -5821,6 +5824,19 @@ static Log_event* next_event(Relay_log_info* rli)
opt_slave_sql_verify_checksum))) opt_slave_sql_verify_checksum)))
{ {
if (!(rgi= rli->group_info))
{
if (!(rgi= rli->group_info= (struct rpl_group_info *)
my_malloc(sizeof(*rgi), MYF(0))))
{
errmsg = "slave SQL thread aborted because of out-of-memory error";
if (hot_log)
mysql_mutex_unlock(log_lock);
goto err;
}
bzero(rgi, sizeof(*rgi));
}
rgi->rli= rli;
DBUG_ASSERT(thd==rli->sql_thd); DBUG_ASSERT(thd==rli->sql_thd);
/* /*
read it while we have a lock, to avoid a mutex lock in read it while we have a lock, to avoid a mutex lock in
...@@ -5842,10 +5858,10 @@ static Log_event* next_event(Relay_log_info* rli) ...@@ -5842,10 +5858,10 @@ static Log_event* next_event(Relay_log_info* rli)
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
goto err; goto err;
} }
rli->gtid_sub_id= sub_id; rgi->gtid_sub_id= sub_id;
rli->current_gtid.server_id= gev->server_id; rgi->current_gtid.server_id= gev->server_id;
rli->current_gtid.domain_id= gev->domain_id; rgi->current_gtid.domain_id= gev->domain_id;
rli->current_gtid.seq_no= gev->seq_no; rgi->current_gtid.seq_no= gev->seq_no;
} }
if (hot_log) if (hot_log)
......
...@@ -228,7 +228,8 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, ...@@ -228,7 +228,8 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
void set_slave_thread_options(THD* thd); void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli); void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli);
int rotate_relay_log(Master_info* mi); int rotate_relay_log(Master_info* mi);
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, int apply_event_and_update_pos(Log_event* ev, THD* thd,
struct rpl_group_info *rgi,
rpl_parallel_thread *rpt); rpl_parallel_thread *rpt);
pthread_handler_t handle_slave_io(void *arg); pthread_handler_t handle_slave_io(void *arg);
......
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
void mysql_client_binlog_statement(THD* thd) void mysql_client_binlog_statement(THD* thd)
{ {
struct rpl_group_info *rgi;
DBUG_ENTER("mysql_client_binlog_statement"); DBUG_ENTER("mysql_client_binlog_statement");
DBUG_PRINT("info",("binlog base64: '%*s'", DBUG_PRINT("info",("binlog base64: '%*s'",
(int) (thd->lex->comment.length < 2048 ? (int) (thd->lex->comment.length < 2048 ?
...@@ -196,6 +197,17 @@ void mysql_client_binlog_statement(THD* thd) ...@@ -196,6 +197,17 @@ void mysql_client_binlog_statement(THD* thd)
} }
} }
if (!(rgi= rli->group_info))
{
if (!(rgi= rli->group_info= (struct rpl_group_info *)
my_malloc(sizeof(*rgi), MYF(0))))
{
my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*rgi));
goto end;
}
bzero(rgi, sizeof(*rgi));
}
rgi->rli= rli;
ev= Log_event::read_log_event(bufptr, event_len, &error, ev= Log_event::read_log_event(bufptr, event_len, &error,
rli->relay_log.description_event_for_exec, rli->relay_log.description_event_for_exec,
0); 0);
...@@ -232,7 +244,7 @@ void mysql_client_binlog_statement(THD* thd) ...@@ -232,7 +244,7 @@ void mysql_client_binlog_statement(THD* thd)
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ?
OPTION_SKIP_REPLICATION : 0); OPTION_SKIP_REPLICATION : 0);
err= ev->apply_event(rli); err= ev->apply_event(rgi);
thd->variables.option_bits= thd->variables.option_bits=
(thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment