Commit 5633dd82 authored by unknown's avatar unknown

MDEV-4506: parallel replication.

Add a simple test case.
Fix bugs found.
parent d107bdaa
include/rpl_init.inc [topology=1->2]
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=10;
ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc
*** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
INSERT INTO t2 VALUES (1);
LOCK TABLE t1 WRITE;
SET gtid_domain_id=1;
INSERT INTO t1 VALUES (2);
SET gtid_domain_id=0;
INSERT INTO t2 VALUES (2);
INSERT INTO t2 VALUES (3);
BEGIN;
INSERT INTO t2 VALUES (4);
INSERT INTO t2 VALUES (5);
COMMIT;
INSERT INTO t2 VALUES (6);
SELECT * FROM t2 ORDER by a;
a
1
2
3
4
5
6
SELECT * FROM t1;
a
1
UNLOCK TABLES;
SELECT * FROM t1 ORDER BY a;
a
1
2
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP TABLE t1,t2;
include/rpl_end.inc
--source include/have_binlog_format_statement.inc
--source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
--let $rpl_topology=1->2
--source include/rpl_init.inc
connect (s1,127.0.0.1,root,,test,$MASTER_MYPORT,);
connect (s2,127.0.0.1,root,,test,$SLAVE_MYPORT,);
# Test various aspects of parallel replication.
--connection s1
SELECT @@server_id;
SET sql_log_bin=0;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=MyISAM;
SET sql_log_bin=1;
--connection server_2
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--error ER_SLAVE_MUST_STOP
SET GLOBAL slave_parallel_threads=10;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/start_slave.inc
--connection s2
SELECT @@server_id;
SET sql_log_bin=0;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=MyISAM;
SET sql_log_bin=1;
--replace_result $MASTER_MYPORT MASTER_PORT
eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT,
master_user='root', master_use_gtid=current_pos;
--echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
--connection s1
SET gtid_domain_id=0;
--connection server_1
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
INSERT INTO t2 VALUES (1);
--save_master_pos
--connection server_2
--sync_with_master
# Block the table t1 to simulate a replicated query taking a long time.
--connect (con_temp,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
LOCK TABLE t1 WRITE;
--connection server_1
SET gtid_domain_id=1;
# This query will be blocked on the slave until UNLOCK TABLES.
INSERT INTO t1 VALUES (2);
SET gtid_domain_id=2;
INSERT INTO t1 VALUES (3);
SET gtid_domain_id=0;
INSERT INTO t1 VALUES (4);
SET gtid_domain_id=1;
INSERT INTO t1 VALUES (5);
SET gtid_domain_id=2;
INSERT INTO t1 VALUES (6);
SET gtid_domain_id=0;
INSERT INTO t1 VALUES (7);
SET gtid_domain_id=1;
INSERT INTO t1 VALUES (8);
SET gtid_domain_id=2;
INSERT INTO t1 VALUES (9);
# These t2 queries can be replicated in parallel with the prior t1 query, as
# they are in a separate replication domain.
INSERT INTO t2 VALUES (2);
INSERT INTO t2 VALUES (3);
BEGIN;
INSERT INTO t2 VALUES (4);
INSERT INTO t2 VALUES (5);
COMMIT;
INSERT INTO t2 VALUES (6);
--connection s2
query_vertical SHOW SLAVE STATUS;
--connection server_2
--let $wait_condition= SELECT COUNT(*) = 6 FROM t2
--source include/wait_condition.inc
--source include/start_slave.inc
SELECT * FROM t2 ORDER by a;
--connection con_temp
SELECT * FROM t1;
UNLOCK TABLES;
--connection server_2
--let $wait_condition= SELECT COUNT(*) = 2 FROM t1
--source include/wait_condition.inc
SELECT * FROM t1 ORDER BY a;
--connection server_2
--source include/stop_slave.inc
SELECT * FROM t1;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
--connection s1
SET sql_log_bin=0;
DROP TABLE t1;
SET sql_log_bin=1;
--connection server_1
DROP TABLE t1,t2;
--connection s2
RESET SLAVE ALL;
SET sql_log_bin=0;
DROP TABLE t1;
SET sql_log_bin=1;
--source include/rpl_end.inc
--source include/have_binlog_format_statement.inc
--source include/have_xtradb.inc
connect (m1,127.0.0.1,root,,test,$MASTER_MYPORT,);
connect (m2,127.0.0.1,root,,test,$MASTER_MYPORT,);
connect (m3,127.0.0.1,root,,test,$MASTER_MYPORT,);
connect (m4,127.0.0.1,root,,test,$MASTER_MYPORT,);
connect (s1,127.0.0.1,root,,test,$SLAVE_MYPORT,);
connect (s2,127.0.0.1,root,,test,$SLAVE_MYPORT,);
connect (s3,127.0.0.1,root,,test,$SLAVE_MYPORT,);
connect (s4,127.0.0.1,root,,test,$SLAVE_MYPORT,);
--connection m1
SELECT @@server_id;
SET sql_log_bin=0;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET sql_log_bin=1;
SET @old_count= @@GLOBAL.binlog_commit_wait_count;
SET @old_usec= @@GLOBAL.binlog_commit_wait_usec;
SET GLOBAL binlog_commit_wait_usec = 30*1000000;
--connection s1
SELECT @@server_id;
SET sql_log_bin=0;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET sql_log_bin=1;
--replace_result $MASTER_MYPORT MASTER_PORT
eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT,
master_user='root', master_use_gtid=current_pos;
--connection m1
SET GLOBAL binlog_commit_wait_count = 4;
send INSERT INTO t1 VALUES (1);
--connection m2
send INSERT INTO t1 VALUES (2);
--connection m3
send INSERT INTO t1 VALUES (3);
--connection m4
INSERT INTO t1 VALUES (4);
--connection m1
reap;
--connection m2
reap;
--connection m3
reap;
--connection m1
SHOW BINLOG EVENTS;
--connection s1
--source include/start_slave.inc
SELECT * FROM t1;
--source include/stop_slave.inc
SELECT * FROM t1;
--connection m1
SET sql_log_bin=0;
DROP TABLE t1;
SET sql_log_bin=1;
SET GLOBAL binlog_commit_wait_count= @old_count;
SET GLOBAL binlog_commit_wait_usec= @old_usec;
--connection s1
RESET SLAVE ALL;
SET sql_log_bin=0;
DROP TABLE t1;
SET sql_log_bin=1;
......@@ -130,7 +130,7 @@ const ulong checksum_version_product_mariadb=
checksum_version_split_mariadb[2];
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD* thd);
static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD* thd);
static const char *HA_ERR(int i)
{
......@@ -3854,7 +3854,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
if (strcmp("COMMIT", query) == 0 && rli->tables_to_lock)
if (strcmp("COMMIT", query) == 0 && rgi->tables_to_lock)
{
/*
Cleaning-up the last statement context:
......@@ -3863,7 +3863,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
*/
int error;
char llbuff[22];
if ((error= rows_event_stmt_cleanup(const_cast<Relay_log_info*>(rli), thd)))
if ((error= rows_event_stmt_cleanup(rgi, thd)))
{
const_cast<Relay_log_info*>(rli)->report(ERROR_LEVEL, error,
"Error in cleaning up after an event preceeding the commit; "
......@@ -3883,7 +3883,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
}
else
{
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
rgi->slave_close_thread_tables(thd);
}
/*
......@@ -4835,7 +4835,7 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
"or ROLLBACK in relay log). A probable cause is that "
"the master died while writing the transaction to "
"its binary log, thus rolled back too.");
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1);
rgi->cleanup_context(thd, 1);
}
/*
......@@ -5533,7 +5533,7 @@ int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi,
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
/* see Query_log_event::do_apply_event() and BUG#13360 */
DBUG_ASSERT(!rli->m_table_map.count());
DBUG_ASSERT(!rgi->m_table_map.count());
/*
Usually lex_start() is called by mysql_parse(), but we need it here
as the present method does not call mysql_parse().
......@@ -9089,7 +9089,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
DBUG_ASSERT(get_flags(STMT_END_F));
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
rgi->slave_close_thread_tables(thd);
thd->clear_error();
DBUG_RETURN(0);
}
......@@ -9151,7 +9151,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
if (open_and_lock_tables(thd, rgi->tables_to_lock, FALSE, 0))
{
uint actual_error= thd->stmt_da->sql_errno();
if (thd->is_slave_error || thd->is_fatal_error)
......@@ -9168,7 +9168,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
"unexpected success or fatal error"));
thd->is_slave_error= 1;
}
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
rgi->slave_close_thread_tables(thd);
DBUG_RETURN(actual_error);
}
......@@ -9182,7 +9182,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
{
DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
rli->tables_to_lock));
rgi->tables_to_lock));
/**
When using RBR and MyISAM MERGE tables the base tables that make
......@@ -9196,8 +9196,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
NOTE: The base tables are added here are removed when
close_thread_tables is called.
*/
RPL_TABLE_LIST *ptr= rli->tables_to_lock;
for (uint i= 0 ; ptr && (i < rli->tables_to_lock_count);
RPL_TABLE_LIST *ptr= rgi->tables_to_lock;
for (uint i= 0 ; ptr && (i < rgi->tables_to_lock_count);
ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++)
{
DBUG_ASSERT(ptr->m_tabledef_valid);
......@@ -9213,7 +9213,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
having severe errors which should not be skiped.
*/
thd->is_slave_error= 1;
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
rgi->slave_close_thread_tables(thd);
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
......@@ -9238,18 +9238,18 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
Rows_log_event, we can invalidate the query cache for the
associated table.
*/
TABLE_LIST *ptr= rli->tables_to_lock;
for (uint i=0 ; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++)
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
TABLE_LIST *ptr= rgi->tables_to_lock;
for (uint i=0 ; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++)
rgi->m_table_map.set_table(ptr->table_id, ptr->table);
#ifdef HAVE_QUERY_CACHE
query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock);
query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
#endif
}
TABLE*
table=
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
m_table= rgi->m_table_map.get_table(m_table_id);
DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
......@@ -9331,7 +9331,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (!table->in_use)
table->in_use= thd;
error= do_exec_row(rli);
error= do_exec_row(rgi);
if (error)
DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
......@@ -9371,7 +9371,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
(ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
if (!m_curr_row_end && !error)
error= unpack_current_row(rli);
error= unpack_current_row(rgi);
// at this moment m_curr_row_end should be set
DBUG_ASSERT(error || m_curr_row_end != NULL);
......@@ -9432,7 +9432,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
DBUG_RETURN(error);
}
if (get_flags(STMT_END_F) && (error= rows_event_stmt_cleanup(rli, thd)))
if (get_flags(STMT_END_F) && (error= rows_event_stmt_cleanup(rgi, thd)))
slave_rows_error_report(ERROR_LEVEL,
thd->is_error() ? 0 : error,
rli, thd, table,
......@@ -9466,7 +9466,7 @@ Rows_log_event::do_shall_skip(Relay_log_info *rli)
@retval non-zero Error at the commit.
*/
static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD * thd)
{
int error;
{
......@@ -9520,7 +9520,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
*/
thd->reset_current_stmt_binlog_format_row();
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
rgi->cleanup_context(thd, 0);
}
return error;
}
......@@ -10259,10 +10259,11 @@ enum enum_tbl_map_status
rli->tables_to_lock.
*/
static enum_tbl_map_status
check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list)
check_table_map(rpl_group_info *rgi, RPL_TABLE_LIST *table_list)
{
DBUG_ENTER("check_table_map");
enum_tbl_map_status res= OK_TO_PROCESS;
Relay_log_info *rli= rgi->rli;
if (rli->sql_thd->slave_thread /* filtering is for slave only */ &&
(!rli->mi->rpl_filter->db_ok(table_list->db) ||
......@@ -10270,8 +10271,8 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list)
res= FILTERED_OUT;
else
{
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rli->tables_to_lock);
for(uint i=0 ; ptr && (i< rli->tables_to_lock_count);
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rgi->tables_to_lock);
for(uint i=0 ; ptr && (i< rgi->tables_to_lock_count);
ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_local), i++)
{
if (ptr->table_id == table_list->table_id)
......@@ -10303,7 +10304,6 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
Rpl_filter *filter;
Relay_log_info const *rli= rgi->rli;
DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
DBUG_ASSERT(rli->sql_thd == thd);
/* Step the query id to mark what columns that are actually used. */
thd->set_query_id(next_query_id());
......@@ -10328,7 +10328,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
table_list->updating= 1;
table_list->required_type= FRMTYPE_TABLE;
DBUG_PRINT("debug", ("table: %s is mapped to %u", table_list->table_name, table_list->table_id));
enum_tbl_map_status tblmap_status= check_table_map(rli, table_list);
enum_tbl_map_status tblmap_status= check_table_map(rgi, table_list);
if (tblmap_status == OK_TO_PROCESS)
{
DBUG_ASSERT(thd->lex->query_tables != table_list);
......@@ -10354,9 +10354,9 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
We record in the slave's information that the table should be
locked by linking the table into the list of tables to lock.
*/
table_list->next_global= table_list->next_local= rli->tables_to_lock;
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
table_list->next_global= table_list->next_local= rgi->tables_to_lock;
rgi->tables_to_lock= table_list;
rgi->tables_to_lock_count++;
/* 'memory' is freed in clear_tables_to_lock */
}
else // FILTERED_OUT, SAME_ID_MAPPING_*
......@@ -10709,7 +10709,7 @@ is_duplicate_key_error(int errcode)
*/
int
Rows_log_event::write_row(const Relay_log_info *const rli,
Rows_log_event::write_row(rpl_group_info *rgi,
const bool overwrite)
{
DBUG_ENTER("write_row");
......@@ -10724,7 +10724,7 @@ Rows_log_event::write_row(const Relay_log_info *const rli,
table->file->ht->db_type != DB_TYPE_NDBCLUSTER);
/* unpack row into table->record[0] */
if ((error= unpack_current_row(rli)))
if ((error= unpack_current_row(rgi)))
DBUG_RETURN(error);
if (m_curr_row == m_rows_buf)
......@@ -10841,7 +10841,7 @@ Rows_log_event::write_row(const Relay_log_info *const rli,
if (!get_flags(COMPLETE_ROWS_F))
{
restore_record(table,record[1]);
error= unpack_current_row(rli);
error= unpack_current_row(rgi);
}
#ifndef DBUG_OFF
......@@ -10907,10 +10907,10 @@ Rows_log_event::write_row(const Relay_log_info *const rli,
#endif
int
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
Write_rows_log_event::do_exec_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table != NULL);
int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
int error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
if (error && !thd->is_error())
{
......@@ -11214,7 +11214,7 @@ void issue_long_find_row_warning(Log_event_type type,
for any following update/delete command.
*/
int Rows_log_event::find_row(const Relay_log_info *rli)
int Rows_log_event::find_row(rpl_group_info *rgi)
{
DBUG_ENTER("Rows_log_event::find_row");
......@@ -11232,7 +11232,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
*/
prepare_record(table, m_width, FALSE);
error= unpack_current_row(rli);
error= unpack_current_row(rgi);
#ifndef DBUG_OFF
DBUG_PRINT("info",("looking for the following record"));
......@@ -11497,7 +11497,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
end:
if (is_table_scan || is_index_scan)
issue_long_find_row_warning(get_type_code(), m_table->alias.c_ptr(),
is_index_scan, rli);
is_index_scan, rgi->rli);
table->default_column_bitmaps();
DBUG_RETURN(error);
}
......@@ -11565,12 +11565,12 @@ Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability
return error;
}
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
{
int error;
DBUG_ASSERT(m_table != NULL);
if (!(error= find_row(rli)))
if (!(error= find_row(rgi)))
{
/*
Delete the record found, located in record[0]
......@@ -11691,11 +11691,11 @@ Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability
}
int
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table != NULL);
int error= find_row(rli);
int error= find_row(rgi);
if (error)
{
/*
......@@ -11703,7 +11703,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
able to skip to the next pair of updates
*/
m_curr_row= m_curr_row_end;
unpack_current_row(rli);
unpack_current_row(rgi);
return error;
}
......@@ -11722,7 +11722,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
m_curr_row= m_curr_row_end;
/* this also updates m_curr_row_end */
if ((error= unpack_current_row(rli)))
if ((error= unpack_current_row(rgi)))
goto err;
/*
......
......@@ -4256,16 +4256,16 @@ class Rows_log_event : public Log_event
uint m_key_nr; /* Key number */
int find_key(); // Find a best key to use in find_row()
int find_row(const Relay_log_info *const);
int write_row(const Relay_log_info *const, const bool);
int find_row(rpl_group_info *);
int write_row(rpl_group_info *, const bool);
// Unpack the current row into m_table->record[0]
int unpack_current_row(const Relay_log_info *const rli)
int unpack_current_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table);
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row,
int const result= ::unpack_row(rgi, m_table, m_width, m_curr_row,
m_rows_end, &m_cols,
&m_curr_row_end, &m_master_reclength);
if (m_curr_row_end > m_rows_end)
......@@ -4331,7 +4331,7 @@ class Rows_log_event : public Log_event
0 if execution succeeded, 1 if execution failed.
*/
virtual int do_exec_row(const Relay_log_info *const rli) = 0;
virtual int do_exec_row(rpl_group_info *rli) = 0;
#endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */
friend class Old_rows_log_event;
......@@ -4387,7 +4387,7 @@ class Write_rows_log_event : public Rows_log_event
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_before_row_operations(const Slave_reporting_capability *const);
virtual int do_after_row_operations(const Slave_reporting_capability *const,int);
virtual int do_exec_row(const Relay_log_info *const);
virtual int do_exec_row(rpl_group_info *);
#endif
};
......@@ -4461,7 +4461,7 @@ class Update_rows_log_event : public Rows_log_event
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_before_row_operations(const Slave_reporting_capability *const);
virtual int do_after_row_operations(const Slave_reporting_capability *const,int);
virtual int do_exec_row(const Relay_log_info *const);
virtual int do_exec_row(rpl_group_info *);
#endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */
};
......@@ -4526,7 +4526,7 @@ class Delete_rows_log_event : public Rows_log_event
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_before_row_operations(const Slave_reporting_capability *const);
virtual int do_after_row_operations(const Slave_reporting_capability *const,int);
virtual int do_exec_row(const Relay_log_info *const);
virtual int do_exec_row(rpl_group_info *);
#endif
};
......
......@@ -58,7 +58,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
*/
DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F));
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
rgi->slave_close_thread_tables(ev_thd);
ev_thd->clear_error();
DBUG_RETURN(0);
}
......@@ -98,7 +98,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
*/
ev_thd->lex->set_stmt_row_injection();
if (open_and_lock_tables(ev_thd, rli->tables_to_lock, FALSE, 0))
if (open_and_lock_tables(ev_thd, rgi->tables_to_lock, FALSE, 0))
{
uint actual_error= ev_thd->stmt_da->sql_errno();
if (ev_thd->is_slave_error || ev_thd->is_fatal_error)
......@@ -113,7 +113,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
"unexpected success or fatal error"));
ev_thd->is_slave_error= 1;
}
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
rgi->slave_close_thread_tables(thd);
DBUG_RETURN(actual_error);
}
......@@ -126,8 +126,8 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
*/
{
RPL_TABLE_LIST *ptr= rli->tables_to_lock;
for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count);
RPL_TABLE_LIST *ptr= rgi->tables_to_lock;
for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count);
ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++)
{
DBUG_ASSERT(ptr->m_tabledef_valid);
......@@ -136,7 +136,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
ptr->table, &conv_table))
{
ev_thd->is_slave_error= 1;
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
rgi->slave_close_thread_tables(ev_thd);
DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
}
DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
......@@ -161,15 +161,15 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
Old_rows_log_event, we can invalidate the query cache for the
associated table.
*/
TABLE_LIST *ptr= rli->tables_to_lock;
for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++)
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
TABLE_LIST *ptr= rgi->tables_to_lock;
for (uint i=0; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++)
rgi->m_table_map.set_table(ptr->table_id, ptr->table);
#ifdef HAVE_QUERY_CACHE
query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock);
query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
#endif
}
TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id);
TABLE* table= rgi->m_table_map.get_table(ev->m_table_id);
if (table)
{
......@@ -220,7 +220,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
while (error == 0 && row_start < ev->m_rows_end)
{
uchar const *row_end= NULL;
if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end)))
if ((error= do_prepare_row(ev_thd, rgi, table, row_start, &row_end)))
break; // We should perform the after-row operation even in
// the case of error
......@@ -280,7 +280,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
rollback at the caller along with sbr.
*/
ev_thd->reset_current_stmt_binlog_format_row();
const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error);
rgi->cleanup_context(ev_thd, error);
ev_thd->is_slave_error= 1;
DBUG_RETURN(error);
}
......@@ -953,7 +953,7 @@ int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
int
Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
Relay_log_info const *rli,
rpl_group_info *rgi,
TABLE *table,
uchar const *row_start,
uchar const **row_end)
......@@ -962,7 +962,7 @@ Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
DBUG_ASSERT(row_start && row_end);
int error;
error= unpack_row_old(const_cast<Relay_log_info*>(rli),
error= unpack_row_old(rgi,
table, m_width, table->record[0],
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
......@@ -1037,7 +1037,7 @@ int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
int
Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
Relay_log_info const *rli,
rpl_group_info *rgi,
TABLE *table,
uchar const *row_start,
uchar const **row_end)
......@@ -1050,7 +1050,7 @@ Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
*/
DBUG_ASSERT(table->s->fields >= m_width);
error= unpack_row_old(const_cast<Relay_log_info*>(rli),
error= unpack_row_old(rgi,
table, m_width, table->record[0],
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
......@@ -1134,7 +1134,7 @@ int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
Relay_log_info const *rli,
rpl_group_info *rgi,
TABLE *table,
uchar const *row_start,
uchar const **row_end)
......@@ -1148,14 +1148,14 @@ int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
DBUG_ASSERT(table->s->fields >= m_width);
/* record[0] is the before image for the update */
error= unpack_row_old(const_cast<Relay_log_info*>(rli),
error= unpack_row_old(rgi,
table, m_width, table->record[0],
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
row_start = *row_end;
/* m_after_image is the after image for the update */
error= unpack_row_old(const_cast<Relay_log_info*>(rli),
error= unpack_row_old(rgi,
table, m_width, m_after_image,
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
......@@ -1471,7 +1471,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
DBUG_ASSERT(get_flags(STMT_END_F));
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
rgi->slave_close_thread_tables(thd);
thd->clear_error();
DBUG_RETURN(0);
}
......@@ -1499,8 +1499,8 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
lex_start(thd);
if ((error= lock_tables(thd, rli->tables_to_lock,
rli->tables_to_lock_count, 0)))
if ((error= lock_tables(thd, rgi->tables_to_lock,
rgi->tables_to_lock_count, 0)))
{
if (thd->is_slave_error || thd->is_fatal_error)
{
......@@ -1522,7 +1522,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
"Error in %s event: when locking tables",
get_type_str());
}
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
rgi->slave_close_thread_tables(thd);
DBUG_RETURN(error);
}
......@@ -1535,8 +1535,8 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
{
RPL_TABLE_LIST *ptr= rli->tables_to_lock;
for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count);
RPL_TABLE_LIST *ptr= rgi->tables_to_lock;
for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count);
ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++)
{
TABLE *conv_table;
......@@ -1544,7 +1544,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
ptr->table, &conv_table))
{
thd->is_slave_error= 1;
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
rgi->slave_close_thread_tables(thd);
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
ptr->m_conv_table= conv_table;
......@@ -1566,18 +1566,18 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
Old_rows_log_event, we can invalidate the query cache for the
associated table.
*/
for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
for (TABLE_LIST *ptr= rgi->tables_to_lock ; ptr ; ptr= ptr->next_global)
{
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
rgi->m_table_map.set_table(ptr->table_id, ptr->table);
}
#ifdef HAVE_QUERY_CACHE
query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock);
query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
#endif
}
TABLE*
table=
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
m_table= rgi->m_table_map.get_table(m_table_id);
if (table)
{
......@@ -1657,7 +1657,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
if (!table->in_use)
table->in_use= thd;
error= do_exec_row(rli);
error= do_exec_row(rgi);
DBUG_PRINT("info", ("error: %d", error));
DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
......@@ -1696,7 +1696,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
(ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
if (!m_curr_row_end && !error)
unpack_current_row(rli);
unpack_current_row(rgi);
// at this moment m_curr_row_end should be set
DBUG_ASSERT(error || m_curr_row_end != NULL);
......@@ -1733,7 +1733,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
rollback at the caller along with sbr.
*/
thd->reset_current_stmt_binlog_format_row();
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
rgi->cleanup_context(thd, error);
thd->is_slave_error= 1;
DBUG_RETURN(error);
}
......@@ -1812,7 +1812,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
thd->reset_current_stmt_binlog_format_row();
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
rgi->cleanup_context(thd, 0);
}
DBUG_RETURN(error);
......@@ -1998,8 +1998,7 @@ void Old_rows_log_event::print_helper(FILE *file,
*/
int
Old_rows_log_event::write_row(const Relay_log_info *const rli,
const bool overwrite)
Old_rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite)
{
DBUG_ENTER("write_row");
DBUG_ASSERT(m_table != NULL && thd != NULL);
......@@ -2016,7 +2015,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli,
DBUG_RETURN(error);
/* unpack row into table->record[0] */
error= unpack_current_row(rli); // TODO: how to handle errors?
error= unpack_current_row(rgi); // TODO: how to handle errors?
#ifndef DBUG_OFF
DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
......@@ -2123,7 +2122,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli,
if (!get_flags(COMPLETE_ROWS_F))
{
restore_record(table,record[1]);
error= unpack_current_row(rli);
error= unpack_current_row(rgi);
}
#ifndef DBUG_OFF
......@@ -2218,7 +2217,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli,
for any following update/delete command.
*/
int Old_rows_log_event::find_row(const Relay_log_info *rli)
int Old_rows_log_event::find_row(rpl_group_info *rgi)
{
DBUG_ENTER("find_row");
......@@ -2231,7 +2230,7 @@ int Old_rows_log_event::find_row(const Relay_log_info *rli)
// TODO: shall we check and report errors here?
prepare_record(table, m_width, FALSE /* don't check errors */);
error= unpack_current_row(rli);
error= unpack_current_row(rgi);
#ifndef DBUG_OFF
DBUG_PRINT("info",("looking for the following record"));
......@@ -2603,10 +2602,10 @@ Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabili
int
Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
Write_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table != NULL);
int error= write_row(rli, TRUE /* overwrite */);
int error= write_row(rgi, TRUE /* overwrite */);
if (error && !thd->net.last_errno)
thd->net.last_errno= error;
......@@ -2705,12 +2704,12 @@ Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil
}
int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
int Delete_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
{
int error;
DBUG_ASSERT(m_table != NULL);
if (!(error= find_row(rli)))
if (!(error= find_row(rgi)))
{
/*
Delete the record found, located in record[0]
......@@ -2804,11 +2803,11 @@ Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil
int
Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
Update_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table != NULL);
int error= find_row(rli);
int error= find_row(rgi);
if (error)
{
/*
......@@ -2816,7 +2815,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
able to skip to the next pair of updates
*/
m_curr_row= m_curr_row_end;
unpack_current_row(rli);
unpack_current_row(rgi);
return error;
}
......@@ -2834,7 +2833,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
store_record(m_table,record[1]);
m_curr_row= m_curr_row_end;
error= unpack_current_row(rli); // this also updates m_curr_row_end
error= unpack_current_row(rgi); // this also updates m_curr_row_end
/*
Now we have the right row to update. The old row (the one we're
......
......@@ -195,15 +195,15 @@ class Old_rows_log_event : public Log_event
const uchar *m_curr_row_end; /* One-after the end of the current row */
uchar *m_key; /* Buffer to keep key value during searches */
int find_row(const Relay_log_info *const);
int write_row(const Relay_log_info *const, const bool);
int find_row(rpl_group_info *);
int write_row(rpl_group_info *, const bool);
// Unpack the current row into m_table->record[0]
int unpack_current_row(const Relay_log_info *const rli)
int unpack_current_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table);
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row,
int const result= ::unpack_row(rgi, m_table, m_width, m_curr_row,
m_rows_end, &m_cols,
&m_curr_row_end, &m_master_reclength);
ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
......@@ -267,7 +267,7 @@ class Old_rows_log_event : public Log_event
0 if execution succeeded, 1 if execution failed.
*/
virtual int do_exec_row(const Relay_log_info *const rli) = 0;
virtual int do_exec_row(rpl_group_info *rgi) = 0;
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
/********** END OF CUT & PASTE FROM Rows_log_event **********/
......@@ -324,7 +324,7 @@ class Old_rows_log_event : public Log_event
RETURN VALUE
Error code, if something went wrong, 0 otherwise.
*/
virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*,
virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*,
uchar const *row_start,
uchar const **row_end) = 0;
......@@ -387,7 +387,7 @@ class Write_rows_log_event_old : public Old_rows_log_event
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_before_row_operations(const Slave_reporting_capability *const);
virtual int do_after_row_operations(const Slave_reporting_capability *const,int);
virtual int do_exec_row(const Relay_log_info *const);
virtual int do_exec_row(rpl_group_info *);
#endif
/********** END OF CUT & PASTE FROM Write_rows_log_event **********/
......@@ -409,7 +409,7 @@ class Write_rows_log_event_old : public Old_rows_log_event
// primitives for old version of do_apply_event()
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*,
virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*,
uchar const *row_start, uchar const **row_end);
virtual int do_exec_row(TABLE *table);
......@@ -463,7 +463,7 @@ class Update_rows_log_event_old : public Old_rows_log_event
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_before_row_operations(const Slave_reporting_capability *const);
virtual int do_after_row_operations(const Slave_reporting_capability *const,int);
virtual int do_exec_row(const Relay_log_info *const);
virtual int do_exec_row(rpl_group_info *);
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
/********** END OF CUT & PASTE FROM Update_rows_log_event **********/
......@@ -487,7 +487,7 @@ class Update_rows_log_event_old : public Old_rows_log_event
// primitives for old version of do_apply_event()
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*,
virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*,
uchar const *row_start, uchar const **row_end);
virtual int do_exec_row(TABLE *table);
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
......@@ -538,7 +538,7 @@ class Delete_rows_log_event_old : public Old_rows_log_event
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_before_row_operations(const Slave_reporting_capability *const);
virtual int do_after_row_operations(const Slave_reporting_capability *const,int);
virtual int do_exec_row(const Relay_log_info *const);
virtual int do_exec_row(rpl_group_info *);
#endif
/********** END CUT & PASTE FROM Delete_rows_log_event **********/
......@@ -562,7 +562,7 @@ class Delete_rows_log_event_old : public Old_rows_log_event
// primitives for old version of do_apply_event()
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*,
virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*,
uchar const *row_start, uchar const **row_end);
virtual int do_exec_row(TABLE *table);
#endif
......
......@@ -72,6 +72,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
/* ToDo: error handling. */
}
......@@ -487,12 +488,22 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
}
static void
free_rpl_parallel_entry(void *element)
{
rpl_parallel_entry *e= (rpl_parallel_entry *)element;
mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry);
my_free(e);
}
rpl_parallel::rpl_parallel() :
current(NULL)
{
my_hash_init(&domain_hash, &my_charset_bin, 32,
offsetof(rpl_parallel_entry, domain_id), sizeof(uint32),
NULL, NULL, HASH_UNIQUE);
NULL, free_rpl_parallel_entry, HASH_UNIQUE);
}
......@@ -667,6 +678,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
qev->rgi= serial_rgi;
rpt_handle_event(qev, NULL);
delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev);
my_free(qev);
return false;
}
......
......@@ -186,7 +186,7 @@ pack_row(TABLE *table, MY_BITMAP const* cols,
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
unpack_row(Relay_log_info const *rli,
unpack_row(rpl_group_info *rgi,
TABLE *table, uint const colcnt,
uchar const *const row_data, uchar const *const row_buffer_end,
MY_BITMAP const *cols,
......@@ -214,18 +214,18 @@ unpack_row(Relay_log_info const *rli,
uint i= 0;
table_def *tabledef= NULL;
TABLE *conv_table= NULL;
bool table_found= rli && rli->get_table_data(table, &tabledef, &conv_table);
bool table_found= rgi && rgi->get_table_data(table, &tabledef, &conv_table);
DBUG_PRINT("debug", ("Table data: table_found: %d, tabldef: %p, conv_table: %p",
table_found, tabledef, conv_table));
DBUG_ASSERT(table_found);
/*
If rli is NULL it means that there is no source table and that the
If rgi is NULL it means that there is no source table and that the
row shall just be unpacked without doing any checks. This feature
is used by MySQL Backup, but can be used for other purposes as
well.
*/
if (rli && !table_found)
if (rgi && !table_found)
DBUG_RETURN(HA_ERR_GENERIC);
for (field_ptr= begin_ptr ; field_ptr < end_ptr && *field_ptr ; ++field_ptr)
......@@ -313,7 +313,7 @@ unpack_row(Relay_log_info const *rli,
(int) (pack_ptr - old_pack_ptr)));
if (!pack_ptr)
{
rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT,
rgi->rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT,
"Could not read field '%s' of table '%s.%s'",
f->field_name, table->s->db.str,
table->s->table_name.str);
......
......@@ -21,7 +21,7 @@
#include <rpl_reporting.h>
#include "my_global.h" /* uchar */
class Relay_log_info;
class rpl_group_info;
struct TABLE;
typedef struct st_bitmap MY_BITMAP;
......@@ -31,7 +31,7 @@ size_t pack_row(TABLE* table, MY_BITMAP const* cols,
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int unpack_row(Relay_log_info const *rli,
int unpack_row(rpl_group_info *rgi,
TABLE *table, uint const colcnt,
uchar const *const row_data, uchar const *row_buffer_end,
MY_BITMAP const *cols,
......
......@@ -88,7 +88,7 @@ pack_row_old(TABLE *table, MY_BITMAP const* cols,
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
unpack_row_old(Relay_log_info *rli,
unpack_row_old(rpl_group_info *rgi,
TABLE *table, uint const colcnt, uchar *record,
uchar const *row, const uchar *row_buffer_end,
MY_BITMAP const *cols,
......@@ -141,7 +141,7 @@ unpack_row_old(Relay_log_info *rli,
f->move_field_offset(-offset);
if (!ptr)
{
rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT,
rgi->rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT,
"Could not read field `%s` of table `%s`.`%s`",
f->field_name, table->s->db.str,
table->s->table_name.str);
......@@ -183,7 +183,7 @@ unpack_row_old(Relay_log_info *rli,
if (event_type == WRITE_ROWS_EVENT &&
((*field_ptr)->flags & mask) == mask)
{
rli->report(ERROR_LEVEL, ER_NO_DEFAULT_FOR_FIELD,
rgi->rli->report(ERROR_LEVEL, ER_NO_DEFAULT_FOR_FIELD,
"Field `%s` of table `%s`.`%s` "
"has no default value and cannot be NULL",
(*field_ptr)->field_name, table->s->db.str,
......
......@@ -23,7 +23,7 @@ size_t pack_row_old(TABLE *table, MY_BITMAP const* cols,
uchar *row_data, const uchar *record);
#ifdef HAVE_REPLICATION
int unpack_row_old(Relay_log_info *rli,
int unpack_row_old(rpl_group_info *rgi,
TABLE *table, uint const colcnt, uchar *record,
uchar const *row, uchar const *row_buffer_end,
MY_BITMAP const *cols,
......
......@@ -59,7 +59,6 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
abort_pos_wait(0), slave_run_id(0), sql_thd(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false)
{
......@@ -135,8 +134,6 @@ int init_relay_log_info(Relay_log_info* rli,
rli->abort_pos_wait=0;
rli->log_space_limit= relay_log_space_limit;
rli->log_space_total= 0;
rli->tables_to_lock= 0;
rli->tables_to_lock_count= 0;
char pattern[FN_REFLEN];
(void) my_realpath(pattern, slave_load_tmpdir, 0);
......@@ -1261,129 +1258,6 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
void Relay_log_info::cleanup_context(THD *thd, bool error)
{
DBUG_ENTER("Relay_log_info::cleanup_context");
/*
In parallel replication, different THDs can be used from different
parallel threads. But in single-threaded mode, only the THD of the main
SQL thread is allowed.
*/
DBUG_ASSERT(opt_slave_parallel_threads > 0 || sql_thd == thd);
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
may have opened tables, which we cannot be sure have been closed (because
maybe the Rows_log_event have not been found or will not be, because slave
SQL thread is stopping, or relay log has a missing tail etc). So we close
all thread's tables. And so the table mappings have to be cancelled.
2) Rows_log_event::do_apply_event() may even have started statements or
transactions on them, which we need to rollback in case of error.
3) If finding a Format_description_log_event after a BEGIN, we also need
to rollback before continuing with the next events.
4) so we need this "context cleanup" function.
*/
if (error)
{
trans_rollback_stmt(thd); // if a "statement transaction"
trans_rollback(thd); // if a "real transaction"
}
m_table_map.clear_tables();
slave_close_thread_tables(thd);
if (error)
thd->mdl_context.release_transactional_locks();
clear_flag(IN_STMT);
/*
Cleanup for the flags that have been set at do_apply_event.
*/
thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
/*
Reset state related to long_find_row notes in the error log:
- timestamp
- flag that decides whether the slave prints or not
*/
reset_row_stmt_start_timestamp();
unset_long_find_row_note_printed();
DBUG_VOID_RETURN;
}
void Relay_log_info::clear_tables_to_lock()
{
DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
#ifndef DBUG_OFF
/**
When replicating in RBR and MyISAM Merge tables are involved
open_and_lock_tables (called in do_apply_event) appends the
base tables to the list of tables_to_lock. Then these are
removed from the list in close_thread_tables (which is called
before we reach this point).
This assertion just confirms that we get no surprises at this
point.
*/
uint i=0;
for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
DBUG_ASSERT(i == tables_to_lock_count);
#endif
while (tables_to_lock)
{
uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
if (tables_to_lock->m_tabledef_valid)
{
tables_to_lock->m_tabledef.table_def::~table_def();
tables_to_lock->m_tabledef_valid= FALSE;
}
/*
If blob fields were used during conversion of field values
from the master table into the slave table, then we need to
free the memory used temporarily to store their values before
copying into the slave's table.
*/
if (tables_to_lock->m_conv_table)
free_blobs(tables_to_lock->m_conv_table);
tables_to_lock=
static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
tables_to_lock_count--;
my_free(to_free);
}
DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
DBUG_VOID_RETURN;
}
void Relay_log_info::slave_close_thread_tables(THD *thd)
{
DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
thd->stmt_da->can_overwrite_status= TRUE;
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
thd->stmt_da->can_overwrite_status= FALSE;
close_thread_tables(thd);
/*
- If inside a multi-statement transaction,
defer the release of metadata locks until the current
transaction is either committed or rolled back. This prevents
other statements from modifying the table for the entire
duration of this transaction. This provides commit ordering
and guarantees serializability across multiple transactions.
- If in autocommit mode, or outside a transactional context,
automatically release metadata locks of the current statement.
*/
if (! thd->in_multi_stmt_transaction_mode())
thd->mdl_context.release_transactional_locks();
else
thd->mdl_context.release_statement_locks();
clear_tables_to_lock();
DBUG_VOID_RETURN;
}
int
rpl_load_gtid_slave_state(THD *thd)
{
......@@ -1539,7 +1413,8 @@ rpl_load_gtid_slave_state(THD *thd)
rpl_group_info::rpl_group_info(Relay_log_info *rli_)
: rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0)
deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
tables_to_lock_count(0)
{
bzero(&current_gtid, sizeof(current_gtid));
}
......@@ -1613,4 +1488,126 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
}
}
void rpl_group_info::cleanup_context(THD *thd, bool error)
{
DBUG_ENTER("Relay_log_info::cleanup_context");
DBUG_ASSERT(this->thd == thd);
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
may have opened tables, which we cannot be sure have been closed (because
maybe the Rows_log_event have not been found or will not be, because slave
SQL thread is stopping, or relay log has a missing tail etc). So we close
all thread's tables. And so the table mappings have to be cancelled.
2) Rows_log_event::do_apply_event() may even have started statements or
transactions on them, which we need to rollback in case of error.
3) If finding a Format_description_log_event after a BEGIN, we also need
to rollback before continuing with the next events.
4) so we need this "context cleanup" function.
*/
if (error)
{
trans_rollback_stmt(thd); // if a "statement transaction"
trans_rollback(thd); // if a "real transaction"
}
m_table_map.clear_tables();
slave_close_thread_tables(thd);
if (error)
thd->mdl_context.release_transactional_locks();
/* ToDo: This must clear the flag in rgi, not rli. */
rli->clear_flag(Relay_log_info::IN_STMT);
/*
Cleanup for the flags that have been set at do_apply_event.
*/
thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
/*
Reset state related to long_find_row notes in the error log:
- timestamp
- flag that decides whether the slave prints or not
*/
rli->reset_row_stmt_start_timestamp();
rli->unset_long_find_row_note_printed();
DBUG_VOID_RETURN;
}
void rpl_group_info::clear_tables_to_lock()
{
DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
#ifndef DBUG_OFF
/**
When replicating in RBR and MyISAM Merge tables are involved
open_and_lock_tables (called in do_apply_event) appends the
base tables to the list of tables_to_lock. Then these are
removed from the list in close_thread_tables (which is called
before we reach this point).
This assertion just confirms that we get no surprises at this
point.
*/
uint i=0;
for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
DBUG_ASSERT(i == tables_to_lock_count);
#endif
while (tables_to_lock)
{
uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
if (tables_to_lock->m_tabledef_valid)
{
tables_to_lock->m_tabledef.table_def::~table_def();
tables_to_lock->m_tabledef_valid= FALSE;
}
/*
If blob fields were used during conversion of field values
from the master table into the slave table, then we need to
free the memory used temporarily to store their values before
copying into the slave's table.
*/
if (tables_to_lock->m_conv_table)
free_blobs(tables_to_lock->m_conv_table);
tables_to_lock=
static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
tables_to_lock_count--;
my_free(to_free);
}
DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
DBUG_VOID_RETURN;
}
void rpl_group_info::slave_close_thread_tables(THD *thd)
{
DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
thd->stmt_da->can_overwrite_status= TRUE;
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
thd->stmt_da->can_overwrite_status= FALSE;
close_thread_tables(thd);
/*
- If inside a multi-statement transaction,
defer the release of metadata locks until the current
transaction is either committed or rolled back. This prevents
other statements from modifying the table for the entire
duration of this transaction. This provides commit ordering
and guarantees serializability across multiple transactions.
- If in autocommit mode, or outside a transactional context,
automatically release metadata locks of the current statement.
*/
if (! thd->in_multi_stmt_transaction_mode())
thd->mdl_context.release_transactional_locks();
else
thd->mdl_context.release_statement_locks();
clear_tables_to_lock();
DBUG_VOID_RETURN;
}
#endif
......@@ -361,27 +361,6 @@ class Relay_log_info : public Slave_reporting_capability
group_relay_log_pos);
}
RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
uint tables_to_lock_count; /* RBR: Count of tables to lock */
table_mapping m_table_map; /* RBR: Mapping table-id to table */
bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
{
DBUG_ASSERT(tabledef_var && conv_table_var);
for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
if (ptr->table == table_arg)
{
*tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;
*conv_table_var= static_cast<RPL_TABLE_LIST*>(ptr)->m_conv_table;
DBUG_PRINT("debug", ("Fetching table data for table %s.%s:"
" tabledef: %p, conv_table: %p",
table_arg->s->db.str, table_arg->s->table_name.str,
*tabledef_var, *conv_table_var));
return true;
}
return false;
}
/*
Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
the thread save 3 get_charset() per Query_log_event if the charset is not
......@@ -391,10 +370,6 @@ class Relay_log_info : public Slave_reporting_capability
void cached_charset_invalidate();
bool cached_charset_compare(char *charset) const;
void cleanup_context(THD *, bool);
void slave_close_thread_tables(THD *);
void clear_tables_to_lock();
/*
Used to defer stopping the SQL thread to give it a chance
to finish up the current group of events.
......@@ -588,6 +563,10 @@ struct rpl_group_info
Annotate_rows_log_event *m_annotate_event;
RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
uint tables_to_lock_count; /* RBR: Count of tables to lock */
table_mapping m_table_map; /* RBR: Mapping table-id to table */
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
......@@ -649,6 +628,26 @@ struct rpl_group_info
}
}
bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
{
DBUG_ASSERT(tabledef_var && conv_table_var);
for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
if (ptr->table == table_arg)
{
*tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;
*conv_table_var= static_cast<RPL_TABLE_LIST*>(ptr)->m_conv_table;
DBUG_PRINT("debug", ("Fetching table data for table %s.%s:"
" tabledef: %p, conv_table: %p",
table_arg->s->db.str, table_arg->s->table_name.str,
*tabledef_var, *conv_table_var));
return true;
}
return false;
}
void clear_tables_to_lock();
void cleanup_context(THD *, bool);
void slave_close_thread_tables(THD *);
};
......
......@@ -3307,7 +3307,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
else
{
exec_res= 0;
rli->cleanup_context(thd, 1);
serial_rgi->cleanup_context(thd, 1);
/* chance for concurrent connection to get more locks */
slave_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
sql_slave_killed, rli);
......@@ -3983,7 +3983,7 @@ pthread_handler_t handle_slave_sql(void *arg)
Master_info *mi= ((Master_info*)arg);
Relay_log_info* rli = &mi->rli;
const char *errmsg;
rpl_group_info serial_rgi(rli);
rpl_group_info *serial_rgi;
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
......@@ -3992,10 +3992,11 @@ pthread_handler_t handle_slave_sql(void *arg)
LINT_INIT(saved_master_log_pos);
LINT_INIT(saved_log_pos);
serial_rgi= new rpl_group_info(rli);
thd = new THD; // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is
thd->rpl_filter = mi->rpl_filter;
serial_rgi.thd= thd;
serial_rgi->thd= thd;
DBUG_ASSERT(rli->inited);
DBUG_ASSERT(rli->mi == mi);
......@@ -4025,10 +4026,10 @@ pthread_handler_t handle_slave_sql(void *arg)
goto err_during_init;
}
thd->init_for_queries();
thd->rgi_slave= &serial_rgi;
if ((serial_rgi.deferred_events_collecting= mi->rpl_filter->is_on()))
thd->rgi_slave= serial_rgi;
if ((serial_rgi->deferred_events_collecting= mi->rpl_filter->is_on()))
{
serial_rgi.deferred_events= new Deferred_log_events(rli);
serial_rgi->deferred_events= new Deferred_log_events(rli);
}
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
......@@ -4211,7 +4212,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
saved_skip= 0;
}
if (exec_relay_log_event(thd, rli, &serial_rgi))
if (exec_relay_log_event(thd, rli, serial_rgi))
{
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped
......@@ -4338,7 +4339,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
must "proactively" clear playgrounds:
*/
thd->clear_error();
rli->cleanup_context(thd, 1);
serial_rgi->cleanup_context(thd, 1);
/*
Some extra safety, which should not been needed (normally, event deletion
should already have done these assignments (each event which sets these
......@@ -4379,6 +4380,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
delete serial_rgi;
mysql_mutex_unlock(&LOCK_thread_count);
/*
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
......
......@@ -273,7 +273,7 @@ void mysql_client_binlog_statement(THD* thd)
end:
thd->variables.option_bits= thd_options;
rli->slave_close_thread_tables(thd);
rgi->slave_close_thread_tables(thd);
my_free(buf);
DBUG_VOID_RETURN;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment