Commit f9bd7f20 authored by Sujatha's avatar Sujatha

MDEV-20220: Merge 5.7 P_S replication table 'replication_applier_status_by_worker

Step 3:
======

Preserve worker pool information on either STOP SLAVE/Error.  In case STOP
SLAVE is executed worker threads will be gone, hence worker threads will be
unavailable. Querying the table at this stage will give empty rows. To
address this case when worker threads are about to stop, due to an error or
forced stop, create a backup pool and preserve the data which is relevant to
populate performance schema table. Clear the backup pool upon slave start.
parent 036ee612
...@@ -127,6 +127,7 @@ include/wait_for_slave_sql_error.inc [errno=1213] ...@@ -127,6 +127,7 @@ include/wait_for_slave_sql_error.inc [errno=1213]
SET GLOBAL debug_dbug=@old_dbug; SET GLOBAL debug_dbug=@old_dbug;
retries retries
10 10
include/assert.inc [Performance Schema retries should match with actual retries]
SELECT * FROM t1 ORDER BY a; SELECT * FROM t1 ORDER BY a;
a b a b
1 3 1 3
......
...@@ -38,6 +38,48 @@ include/wait_for_slave_to_stop.inc ...@@ -38,6 +38,48 @@ include/wait_for_slave_to_stop.inc
RESET SLAVE ALL; RESET SLAVE ALL;
SET default_master_connection=''; SET default_master_connection='';
CHANGE MASTER TO MASTER_USER='root', MASTER_HOST='127.0.0.1',MASTER_PORT=$MASTER_MYPORT; CHANGE MASTER TO MASTER_USER='root', MASTER_HOST='127.0.0.1',MASTER_PORT=$MASTER_MYPORT;
include/start_slave.inc
# Introduce an error in the worker thread and check for the correctness
# of error number, message and timestamp fields.
connection master;
use test;
create table t(a int primary key);
connection slave;
drop table t;
connection master;
insert into t values(1);
connection slave;
include/wait_for_slave_sql_error.inc [errno=1146]
# Extract the error related fields from SSS and PS table and compare
# them for correctness.
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
Last_Error_Message
Error 'Table 'test.t' doesn't exist' on query. Default database: 'test'. Query: 'insert into t values(1)'
# Verify that the error fields are preserved after STOP SLAVE.
# 1. Verify that thread_id changes to NULL and service_state to "off" on
# STOP SLAVE.
include/assert.inc [After STOP SLAVE, thread_id should be NULL]
include/assert.inc [So, Service_State after STOP SLAVE should be "OFF".]
# 2. Extract the worker_id and the error related fields from SSS and PS
# table and compare them. These fields should preserve their values.
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
Last_Error_Message
Error 'Table 'test.t' doesn't exist' on query. Default database: 'test'. Query: 'insert into t values(1)'
include/stop_slave.inc
RESET SLAVE;
connection master;
DROP TABLE t;
RESET MASTER;
# Verify that number of rows in 'replication_applier_status_by_worker' table match with # Verify that number of rows in 'replication_applier_status_by_worker' table match with
# number of slave_parallel_workers. # number of slave_parallel_workers.
......
...@@ -149,6 +149,12 @@ let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', V ...@@ -149,6 +149,12 @@ let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', V
--disable_query_log --disable_query_log
eval SELECT $new_retry - $old_retry AS retries; eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log --enable_query_log
let $ps_value= query_get_value(select last_trans_retry_count from
performance_schema.replication_applier_status_by_worker where
last_trans_retry_count > 0, last_trans_retry_count, 1);
let $assert_text= Performance Schema retries should match with actual retries;
let $assert_cond= "$ps_value" = $new_retry - $old_retry;
source include/assert.inc;
SELECT * FROM t1 ORDER BY a; SELECT * FROM t1 ORDER BY a;
STOP SLAVE IO_THREAD; STOP SLAVE IO_THREAD;
......
...@@ -133,6 +133,102 @@ STOP SLAVE 'slave1'; ...@@ -133,6 +133,102 @@ STOP SLAVE 'slave1';
RESET SLAVE ALL; RESET SLAVE ALL;
SET default_master_connection=''; SET default_master_connection='';
evalp CHANGE MASTER TO MASTER_USER='root', MASTER_HOST='127.0.0.1',MASTER_PORT=$MASTER_MYPORT; evalp CHANGE MASTER TO MASTER_USER='root', MASTER_HOST='127.0.0.1',MASTER_PORT=$MASTER_MYPORT;
--source include/start_slave.inc
--echo
--echo # Introduce an error in the worker thread and check for the correctness
--echo # of error number, message and timestamp fields.
--echo
# Cause an error in Worker thread.
# 1) Create a table 't' at master, replicate at slave.
# 2) Drop table 't' at slave only.
# 3) Insert a value in table 't' on master and replicate on slave.
# Since slave doesnt have table 't' anymore, worker thread will report an error.
--connection master
use test;
create table t(a int primary key);
sync_slave_with_master;
drop table t;
--connection master
insert into t values(1);
--connection slave
let $slave_sql_errno=1146;
source include/wait_for_slave_sql_error.inc;
--echo
--echo # Extract the error related fields from SSS and PS table and compare
--echo # them for correctness.
--echo
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_worker, Last_Error_Number, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
--disable_query_log
--replace_regex /master-bin.[0-9]+/FILENAME/ /end_log_pos [0-9]+/end_log_pos POSITION/
select Last_Error_Message from performance_schema.replication_applier_status_by_worker;
--enable_query_log
--echo
--echo # Verify that the error fields are preserved after STOP SLAVE.
--echo
--echo
--echo # 1. Verify that thread_id changes to NULL and service_state to "off" on
--echo # STOP SLAVE.
--echo
let $ps_value= query_get_value(select thread_id from performance_schema.replication_applier_status_by_worker, thread_id, 1);
let $assert_text= After STOP SLAVE, thread_id should be NULL;
let $assert_cond= "$ps_value" = "NULL";
source include/assert.inc;
let $ps_value= query_get_value(select service_state from performance_schema.replication_applier_status_by_worker, service_state, 1);
let $assert_text= So, Service_State after STOP SLAVE should be "OFF".;
let $assert_cond= "$ps_value"= "OFF";
source include/assert.inc;
--echo
--echo # 2. Extract the worker_id and the error related fields from SSS and PS
--echo # table and compare them. These fields should preserve their values.
--echo
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_worker, Last_Error_Number, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
--disable_query_log
--replace_regex /master-bin.[0-9]+/FILENAME/ /end_log_pos [0-9]+/end_log_pos POSITION/
select Last_Error_Message from performance_schema.replication_applier_status_by_worker;
--enable_query_log
# The timestamp format is slightly different in SSS and PS.
# SSS => YYMMDD HH:MM:SS
# PS => YYYY-MM-DD HH:MM:SS
# To match the two, we get rid of hyphons from PS output and first two digits
# the year field so that it can be matched directly.
#--- TODO: Can we include Last_SQL_Error_Timestamp as part of SSS
#let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Error_Timestamp, 1);
#let $ps_value= query_get_value(select Last_Error_Timestamp from performance_schema.replication_applier_status_by_worker, Last_Error_Timestamp, 1);
#let $ps_value_without_hyphons= `SELECT REPLACE("$ps_value", '-', '')`;
#let $ps_value_in_sss_format= `select substring("$ps_value_without_hyphons", 3)`;
#let $assert_text= Value returned by SSS and PS table for Last_Error_Timestamp should be same.;
#let $assert_cond= "$sss_value" = "$ps_value_in_sss_format";
#source include/assert.inc;
--source include/stop_slave.inc
RESET SLAVE;
--connection master
DROP TABLE t;
RESET MASTER;
--echo --echo
--echo # Verify that number of rows in 'replication_applier_status_by_worker' table match with --echo # Verify that number of rows in 'replication_applier_status_by_worker' table match with
......
...@@ -1752,6 +1752,7 @@ int ...@@ -1752,6 +1752,7 @@ int
rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
{ {
int rc= 0; int rc= 0;
struct pool_bkp_for_pfs* bkp= &pool->pfs_bkp;
if ((rc= pool_mark_busy(pool, current_thd))) if ((rc= pool_mark_busy(pool, current_thd)))
return rc; // killed return rc; // killed
...@@ -1761,6 +1762,23 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) ...@@ -1761,6 +1762,23 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
pool_mark_not_busy(pool); pool_mark_not_busy(pool);
rc= rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads, rc= rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads,
0); 0);
if (!rc)
{
if (pool->count)
{
if (bkp->inited)
{
if (bkp->count != pool->count)
{
bkp->destroy();
bkp->init(pool->count);
}
}
else
bkp->init(pool->count);
}
}
} }
else else
{ {
...@@ -2026,7 +2044,8 @@ rpl_parallel_thread::rpl_parallel_thread() ...@@ -2026,7 +2044,8 @@ rpl_parallel_thread::rpl_parallel_thread()
rpl_parallel_thread_pool::rpl_parallel_thread_pool() rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: threads(0), free_list(0), count(0), inited(false), busy(false) : threads(0), free_list(0), count(0), inited(false), busy(false),
pfs_bkp{0, false, NULL}
{ {
} }
...@@ -2057,6 +2076,7 @@ void ...@@ -2057,6 +2076,7 @@ void
rpl_parallel_thread_pool::destroy() rpl_parallel_thread_pool::destroy()
{ {
deactivate(); deactivate();
pfs_bkp.destroy();
destroy_cond_mutex(); destroy_cond_mutex();
} }
...@@ -2125,6 +2145,37 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) ...@@ -2125,6 +2145,37 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
mysql_mutex_unlock(&LOCK_rpl_thread_pool); mysql_mutex_unlock(&LOCK_rpl_thread_pool);
} }
void
rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
{
if (pfs_bkp.inited)
{
for(uint i=0; i<count;i++)
{
rpl_parallel_thread *rpt, *pfs_rpt;
rpt= threads[i];
pfs_rpt= pfs_bkp.rpl_thread_arr[i];
if (rpt->channel_name_length)
{
pfs_rpt->channel_name_length= rpt->channel_name_length;
strmake(pfs_rpt->channel_name, rpt->channel_name,
rpt->channel_name_length);
}
pfs_rpt->thd= rpt->thd;
pfs_rpt->last_seen_gtid= rpt->last_seen_gtid;
if (rli->err_thread_id && rpt->thd->thread_id == rli->err_thread_id)
{
pfs_rpt->last_error_number= rli->last_error().number;
strmake(pfs_rpt->last_error_message,
rli->last_error().message, sizeof(rli->last_error().message));
pfs_rpt->last_error_timestamp= rli->last_error().skr*1000000;
}
pfs_rpt->running= false;
pfs_rpt->worker_idle_time= rpt->get_worker_idle_time();
pfs_rpt->last_trans_retry_count= rpt->last_trans_retry_count;
}
}
}
/* /*
Obtain a worker thread that we can queue an event to. Obtain a worker thread that we can queue an event to.
...@@ -2393,6 +2444,7 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) ...@@ -2393,6 +2444,7 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
STRING_WITH_LEN("now SIGNAL wait_for_done_waiting")); STRING_WITH_LEN("now SIGNAL wait_for_done_waiting"));
};); };);
global_rpl_thread_pool.copy_pool_for_pfs(rli);
for (i= 0; i < domain_hash.records; ++i) for (i= 0; i < domain_hash.records; ++i)
{ {
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
struct rpl_parallel; struct rpl_parallel;
struct rpl_parallel_entry; struct rpl_parallel_entry;
struct rpl_parallel_thread_pool; struct rpl_parallel_thread_pool;
extern struct rpl_parallel_thread_pool pool_bkp_for_pfs;
class Relay_log_info; class Relay_log_info;
struct inuse_relaylog; struct inuse_relaylog;
...@@ -257,6 +258,38 @@ struct rpl_parallel_thread { ...@@ -257,6 +258,38 @@ struct rpl_parallel_thread {
}; };
struct pool_bkp_for_pfs{
uint32 count;
bool inited;
struct rpl_parallel_thread **rpl_thread_arr;
void init(uint32 thd_count)
{
DBUG_ASSERT(thd_count);
rpl_thread_arr= (rpl_parallel_thread **)
my_malloc(PSI_INSTRUMENT_ME,
thd_count * sizeof(rpl_parallel_thread*),
MYF(MY_WME | MY_ZEROFILL));
for (uint i=0; i<thd_count; i++)
rpl_thread_arr[i]= (rpl_parallel_thread *)
my_malloc(PSI_INSTRUMENT_ME, sizeof(rpl_parallel_thread),
MYF(MY_WME | MY_ZEROFILL));
count= thd_count;
inited= true;
}
void destroy()
{
if (inited)
{
for (uint i=0; i<count; i++)
my_free(rpl_thread_arr[i]);
my_free(rpl_thread_arr);
rpl_thread_arr= NULL;
}
}
};
struct rpl_parallel_thread_pool { struct rpl_parallel_thread_pool {
struct rpl_parallel_thread **threads; struct rpl_parallel_thread **threads;
struct rpl_parallel_thread *free_list; struct rpl_parallel_thread *free_list;
...@@ -270,8 +303,10 @@ struct rpl_parallel_thread_pool { ...@@ -270,8 +303,10 @@ struct rpl_parallel_thread_pool {
is in progress. is in progress.
*/ */
bool busy; bool busy;
struct pool_bkp_for_pfs pfs_bkp;
rpl_parallel_thread_pool(); rpl_parallel_thread_pool();
void copy_pool_for_pfs(Relay_log_info *rli);
int init(uint32 size); int init(uint32 size);
void destroy(); void destroy();
void deactivate(); void deactivate();
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "sql_class.h" #include "sql_class.h"
Slave_reporting_capability::Slave_reporting_capability(char const *thread_name) Slave_reporting_capability::Slave_reporting_capability(char const *thread_name)
: m_thread_name(thread_name) : err_thread_id(0), m_thread_name(thread_name)
{ {
mysql_mutex_init(key_mutex_slave_reporting_capability_err_lock, mysql_mutex_init(key_mutex_slave_reporting_capability_err_lock,
&err_lock, MY_MUTEX_INIT_FAST); &err_lock, MY_MUTEX_INIT_FAST);
......
...@@ -109,6 +109,22 @@ int table_replication_applier_status_by_worker::rnd_next(void) ...@@ -109,6 +109,22 @@ int table_replication_applier_status_by_worker::rnd_next(void)
} }
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
} }
else
{
struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp;
if (bkp_pool->inited && bkp_pool->count)
{
for (m_pos.set_at(&m_next_pos);
m_pos.has_more_workers(bkp_pool->count);
m_pos.next_worker())
{
rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
make_row(rpt);
m_next_pos.set_after(&m_pos);
return 0;
}
}
}
return HA_ERR_END_OF_FILE; return HA_ERR_END_OF_FILE;
} }
...@@ -130,6 +146,16 @@ int table_replication_applier_status_by_worker::rnd_pos(const void *pos) ...@@ -130,6 +146,16 @@ int table_replication_applier_status_by_worker::rnd_pos(const void *pos)
res= 0; res= 0;
} }
} }
else
{
struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp;
if (bkp_pool->inited && bkp_pool->count && m_pos.m_index < bkp_pool->count)
{
rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
make_row(rpt);
res= 0;
}
}
return res; return res;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment