Commit 5614b311 authored by Michael Widenius's avatar Michael Widenius

Moved the remaining variables, that depends on sql execution, from...

Moved the remaining variables, that depends on sql execution, from Relay_log_info to rpl_group_info:
-row_stmt_start_timestamp
-last_event_start_time
-long_find_row_note
-trans_retries

Added slave_executed_entries_lock to protect rli->executed_entries
Added primitives for thread safe 64 bit increment
Update rli->executed_entries when event has executed, not when event has been sent to sql execution thread


sql/log_event.cc:
  row_stmt_start and long_find_row_note is now in rpl_group_info
sql/mysqld.cc:
  Added slave_executed_entries_lock to protect rli->executed_entries
sql/mysqld.h:
  Added slave_executed_entries_lock to protect rli->executed_entries
  Added primitives for thread safe 64 bit increment
sql/rpl_parallel.cc:
  Update rli->executed_entries when event has executed, not when event has been sent to sql execution thread
sql/rpl_rli.cc:
  Moved row_stmt_start_timestamp, last_event_start_time and long_find_row_note from Relay_log_info to rpl_group_info
sql/rpl_rli.h:
  Moved trans_retries, row_stmt_start_timestamp, last_event_start_time and long_find_row_note from Relay_log_info to rpl_group_info
sql/slave.cc:
  Use rgi for trans_retries and last_event_start_time
  Update rli->executed_entries when event has executed, not when event has been sent to sql execution thread
  Reset trans_retries when object is created
parent 4bc9c093
......@@ -9293,7 +9293,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
set the initial time of this ROWS statement if it was not done
before in some other ROWS event.
*/
const_cast<Relay_log_info*>(rli)->set_row_stmt_start_timestamp();
rgi->set_row_stmt_start_timestamp();
while (error == 0 && m_curr_row < m_rows_end)
{
......@@ -11133,13 +11133,13 @@ static inline
void issue_long_find_row_warning(Log_event_type type,
const char *table_name,
bool is_index_scan,
const Relay_log_info *rli)
rpl_group_info *rgi)
{
if ((global_system_variables.log_warnings > 1 &&
!const_cast<Relay_log_info*>(rli)->is_long_find_row_note_printed()))
!rgi->is_long_find_row_note_printed()))
{
time_t now= my_time(0);
time_t stmt_ts= const_cast<Relay_log_info*>(rli)->get_row_stmt_start_timestamp();
time_t stmt_ts= rgi->get_row_stmt_start_timestamp();
DBUG_EXECUTE_IF("inject_long_find_row_note",
stmt_ts-=(LONG_FIND_ROW_THRESHOLD*2););
......@@ -11148,7 +11148,7 @@ void issue_long_find_row_warning(Log_event_type type,
if (delta > LONG_FIND_ROW_THRESHOLD)
{
const_cast<Relay_log_info*>(rli)->set_long_find_row_note_printed();
rgi->set_long_find_row_note_printed();
const char* evt_type= type == DELETE_ROWS_EVENT ? " DELETE" : "n UPDATE";
const char* scan_type= is_index_scan ? "scanning an index" : "scanning the table";
......@@ -11477,7 +11477,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
end:
if (is_table_scan || is_index_scan)
issue_long_find_row_warning(get_type_code(), m_table->alias.c_ptr(),
is_index_scan, rgi->rli);
is_index_scan, rgi);
table->default_column_bitmaps();
DBUG_RETURN(error);
}
......
......@@ -1740,7 +1740,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
problem. When WL#2975 is implemented, just remove the member
Relay_log_info::last_event_start_time and all its occurrences.
*/
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
rgi->last_event_start_time= my_time(0);
}
if (get_flags(STMT_END_F))
......
......@@ -492,6 +492,7 @@ my_atomic_rwlock_t global_query_id_lock;
my_atomic_rwlock_t thread_running_lock;
my_atomic_rwlock_t thread_count_lock;
my_atomic_rwlock_t statistics_lock;
my_atomic_rwlock_t slave_executed_entries_lock;
ulong aborted_threads, aborted_connects;
ulong delayed_insert_timeout, delayed_insert_limit, delayed_queue_size;
ulong delayed_insert_threads, delayed_insert_writes, delayed_rows_in_use;
......@@ -1939,6 +1940,7 @@ void clean_up(bool print_message)
my_atomic_rwlock_destroy(&thread_running_lock);
my_atomic_rwlock_destroy(&thread_count_lock);
my_atomic_rwlock_destroy(&statistics_lock);
my_atomic_rwlock_destroy(&slave_executed_entries_lock);
free_charsets();
mysql_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("got thread count lock"));
......@@ -7550,6 +7552,7 @@ static int mysql_init_variables(void)
my_atomic_rwlock_init(&thread_running_lock);
my_atomic_rwlock_init(&thread_count_lock);
my_atomic_rwlock_init(&statistics_lock);
my_atomic_rwlock_init(slave_executed_entries_lock);
strmov(server_version, MYSQL_SERVER_VERSION);
threads.empty();
thread_cache.empty();
......
......@@ -367,6 +367,7 @@ extern mysql_cond_t COND_manager;
extern int32 thread_running;
extern int32 thread_count;
extern my_atomic_rwlock_t thread_running_lock, thread_count_lock;
extern my_atomic_rwlock_t slave_executed_entries_lock;
extern char *opt_ssl_ca, *opt_ssl_capath, *opt_ssl_cert, *opt_ssl_cipher,
*opt_ssl_key;
......@@ -507,6 +508,20 @@ inline void thread_safe_decrement32(int32 *value, my_atomic_rwlock_t *lock)
my_atomic_rwlock_wrunlock(lock);
}
inline void thread_safe_increment64(int64 *value, my_atomic_rwlock_t *lock)
{
my_atomic_rwlock_wrlock(lock);
(void) my_atomic_add64(value, 1);
my_atomic_rwlock_wrunlock(lock);
}
inline void thread_safe_decrement64(int64 *value, my_atomic_rwlock_t *lock)
{
my_atomic_rwlock_wrlock(lock);
(void) my_atomic_add64(value, -1);
my_atomic_rwlock_wrunlock(lock);
}
inline void
inc_thread_running()
{
......
......@@ -9,10 +9,6 @@
ToDo list:
- Review every field in Relay_log_info, and all code that accesses it.
Split out the necessary parts into rpl_group_info, to avoid conflicts
between parallel execution of events. (Such as deferred events ...)
- Error handling. If we fail in one of multiple parallel executions, we
need to make a best effort to complete prior transactions and roll back
following transactions, so slave binlog position will be correct.
......@@ -43,10 +39,11 @@
slave rolls back the transaction; parallel execution needs to be able
to deal with this wrt. commit_orderer and such.
- We should fail if we connect to the master with opt_slave_parallel_threads
greater than zero and master does not support GTID. Just to avoid a bunch
of potential problems, we won't be able to do any parallel replication
in this case anyway.
- We should notice if the master doesn't support GTID, and then run in
single threaded mode against that master. This is needed to be able to
support multi-master-replication with old and new masters.
- Retry of failed transactions is not yet implemented for the parallel case.
*/
struct rpl_parallel_thread_pool global_rpl_thread_pool;
......@@ -56,7 +53,7 @@ static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt)
{
int err;
int err __attribute__((unused));
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
......@@ -69,6 +66,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
qev->ev->thd= thd;
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock);
/* ToDo: error handling. */
return err;
}
......@@ -617,7 +617,10 @@ rpl_parallel::wait_for_done()
/*
do_event() is executed by the sql_driver_thd thread.
It's main purpose is to find a thread that can exectue the query.
It's main purpose is to find a thread that can execute the query.
@retval false ok, event was accepted
@retval true error
*/
bool
......@@ -643,7 +646,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
rli->abort_slave)
sql_thread_stopping= true;
if (sql_thread_stopping)
{
/* QQ: Need a better comment why we return false here */
return false;
}
if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
MYF(0))))
......
......@@ -59,8 +59,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
last_event_start_time(0), m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false)
m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
......@@ -1420,7 +1419,8 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_)
: rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
tables_to_lock_count(0)
tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false)
{
bzero(&current_gtid, sizeof(current_gtid));
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
......@@ -1551,8 +1551,8 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
- timestamp
- flag that decides whether the slave prints or not
*/
rli->reset_row_stmt_start_timestamp();
rli->unset_long_find_row_note_printed();
reset_row_stmt_start_timestamp();
unset_long_find_row_note_printed();
DBUG_VOID_RETURN;
}
......
......@@ -298,14 +298,16 @@ class Relay_log_info : public Slave_reporting_capability
char cached_charset[6];
/*
trans_retries varies between 0 to slave_transaction_retries and counts how
many times the slave has retried the present transaction; gets reset to 0
when the transaction finally succeeds. retried_trans is a cumulative
counter: how many times the slave has retried a transaction (any) since
slave started.
retried_trans is a cumulative counter: how many times the slave
has retried a transaction (any) since slave started.
Protected by data_lock.
*/
ulong trans_retries, retried_trans;
ulong executed_entries; /* For SLAVE STATUS */
ulong retried_trans;
/*
Number of executed events for SLAVE STATUS.
Protected by slave_executed_entries_lock
*/
int64 executed_entries;
/*
If the end of the hot relay log is made of master's events ignored by the
......@@ -381,13 +383,6 @@ class Relay_log_info : public Slave_reporting_capability
void cached_charset_invalidate();
bool cached_charset_compare(char *charset) const;
/*
Used to defer stopping the SQL thread to give it a chance
to finish up the current group of events.
The timestamp is set and reset in @c sql_slave_killed().
*/
time_t last_event_start_time;
/**
Helper function to do after statement completion.
......@@ -462,39 +457,6 @@ class Relay_log_info : public Slave_reporting_capability
m_flags&= ~flag;
}
time_t get_row_stmt_start_timestamp()
{
return row_stmt_start_timestamp;
}
time_t set_row_stmt_start_timestamp()
{
if (row_stmt_start_timestamp == 0)
row_stmt_start_timestamp= my_time(0);
return row_stmt_start_timestamp;
}
void reset_row_stmt_start_timestamp()
{
row_stmt_start_timestamp= 0;
}
void set_long_find_row_note_printed()
{
long_find_row_note_printed= true;
}
void unset_long_find_row_note_printed()
{
long_find_row_note_printed= false;
}
bool is_long_find_row_note_printed()
{
return long_find_row_note_printed;
}
private:
/*
......@@ -504,13 +466,6 @@ class Relay_log_info : public Slave_reporting_capability
relay log.
*/
uint32 m_flags;
/*
Runtime state for printing a note when slave is taking
too long while processing a row event.
*/
time_t row_stmt_start_timestamp;
bool long_find_row_note_printed;
};
......@@ -592,6 +547,29 @@ struct rpl_group_info
mysql_mutex_t sleep_lock;
mysql_cond_t sleep_cond;
/*
trans_retries varies between 0 to slave_transaction_retries and counts how
many times the slave has retried the present transaction; gets reset to 0
when the transaction finally succeeds.
*/
ulong trans_retries;
/*
Used to defer stopping the SQL thread to give it a chance
to finish up the current group of events.
The timestamp is set and reset in @c sql_slave_killed().
*/
time_t last_event_start_time;
private:
/*
Runtime state for printing a note when slave is taking
too long while processing a row event.
*/
time_t row_stmt_start_timestamp;
bool long_find_row_note_printed;
public:
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
......@@ -673,6 +651,39 @@ struct rpl_group_info
void clear_tables_to_lock();
void cleanup_context(THD *, bool);
void slave_close_thread_tables(THD *);
time_t get_row_stmt_start_timestamp()
{
return row_stmt_start_timestamp;
}
time_t set_row_stmt_start_timestamp()
{
if (row_stmt_start_timestamp == 0)
row_stmt_start_timestamp= my_time(0);
return row_stmt_start_timestamp;
}
void reset_row_stmt_start_timestamp()
{
row_stmt_start_timestamp= 0;
}
void set_long_find_row_note_printed()
{
long_find_row_note_printed= true;
}
void unset_long_find_row_note_printed()
{
long_find_row_note_printed= false;
}
bool is_long_find_row_note_printed()
{
return long_find_row_note_printed;
}
};
......
......@@ -1034,9 +1034,9 @@ static bool sql_slave_killed(rpl_group_info *rgi)
@c last_event_start_time the timer.
*/
if (rli->last_event_start_time == 0)
rli->last_event_start_time= my_time(0);
ret= difftime(my_time(0), rli->last_event_start_time) <=
if (rgi->last_event_start_time == 0)
rgi->last_event_start_time= my_time(0);
ret= difftime(my_time(0), rgi->last_event_start_time) <=
SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE;
DBUG_EXECUTE_IF("stop_slave_middle_group",
......@@ -1070,7 +1070,7 @@ static bool sql_slave_killed(rpl_group_info *rgi)
}
}
if (ret)
rli->last_event_start_time= 0;
rgi->last_event_start_time= 0;
DBUG_RETURN(ret);
}
......@@ -3047,10 +3047,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
ev->get_type_str(), ev->get_type_code(),
ev->server_id));
DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu",
DBUG_PRINT("info", ("thd->options: %s%s; rgi->last_event_start_time: %lu",
FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->variables.option_bits, OPTION_BEGIN),
(ulong) rli->last_event_start_time));
(ulong) rgi->last_event_start_time));
/*
Execute the event to change the database and update the binary
......@@ -3385,14 +3385,16 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
there is no rollback since 5.0.13 (ref: manual).
We have to not only seek but also
a) init_master_info(), to seek back to hot relay log's start for later
(for when we will come back to this hot log after re-processing the
possibly existing old logs where BEGIN is: check_binlog_magic() will
then need the cache to be at position 0 (see comments at beginning of
a) init_master_info(), to seek back to hot relay log's start
for later (for when we will come back to this hot log after
re-processing the possibly existing old logs where BEGIN is:
check_binlog_magic() will then need the cache to be at
position 0 (see comments at beginning of
init_master_info()).
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
*/
if (rli->trans_retries < slave_trans_retries)
if (serial_rgi->trans_retries < slave_trans_retries)
{
if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
sql_print_error("Failed to initialize the master info structure");
......@@ -3407,15 +3409,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
exec_res= 0;
serial_rgi->cleanup_context(thd, 1);
/* chance for concurrent connection to get more locks */
slave_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
slave_sleep(thd, min(serial_rgi->trans_retries,
MAX_SLAVE_RETRY_PAUSE),
sql_slave_killed, serial_rgi);
serial_rgi->trans_retries++;
mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS
rli->trans_retries++;
rli->retried_trans++;
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info", ("Slave retries transaction "
"rli->trans_retries: %lu", rli->trans_retries));
"rgi->trans_retries: %lu",
serial_rgi->trans_retries));
}
}
else
......@@ -3434,11 +3438,13 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
event, the execution will proceed as usual; in the case of a
non-transient error, the slave will stop with an error.
*/
rli->trans_retries= 0; // restart from fresh
DBUG_PRINT("info", ("Resetting retry counter, rli->trans_retries: %lu",
rli->trans_retries));
serial_rgi->trans_retries= 0; // restart from fresh
DBUG_PRINT("info", ("Resetting retry counter, rgi->trans_retries: %lu",
serial_rgi->trans_retries));
}
}
thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock);
DBUG_RETURN(exec_res);
}
mysql_mutex_unlock(&rli->data_lock);
......@@ -4179,8 +4185,6 @@ pthread_handler_t handle_slave_sql(void *arg)
mysql_mutex_lock(&rli->log_space_lock);
rli->ignore_log_space_limit= 0;
mysql_mutex_unlock(&rli->log_space_lock);
rli->trans_retries= 0; // start from "no error"
DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries));
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
......@@ -4406,7 +4410,6 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
}
goto err;
}
rli->executed_entries++;
}
if (opt_slave_parallel_threads > 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment