Commit 26432e49 authored by sysprg's avatar sysprg Committed by Jan Lindström

MDEV-17262: mysql crashed on galera while node rejoined cluster (#895)

This patch contains a fix for the MDEV-17262/17243 issues and
new mtr test.

These issues (MDEV-17262/17243) have two reasons:

1) After an intermediate commit, a transaction loses its status
of "transaction that registered in the MySQL for 2pc coordinator"
(in the InnoDB) due to the fact that since version 10.2 the
write_row() function (which located in the ha_innodb.cc) does
not call trx_register_for_2pc(m_prebuilt->trx) during the processing
of split transactions. It is necessary to restore this call inside
the write_row() when an intermediate commit was made (for a split
transaction).

Similarly, we need to set the flag of the started transaction
(m_prebuilt->sql_stat_start) after intermediate commit.

The table->file->extra(HA_EXTRA_FAKE_START_STMT) called from the
wsrep_load_data_split() function (which located in sql_load.cc)
will also do this, but it will be too late. As a result, the call
to the wsrep_append_keys() function from the InnoDB engine may be
lost or function may be called with invalid transaction identifier.

2) If a transaction with the LOAD DATA statement is divided into
logical mini-transactions (of the 10K rows) and binlog is rotated,
then in rare cases due to the wsrep handler re-registration at the
boundary of the split, the last portion of data may be lost. Since
splitting of the LOAD DATA into mini-transactions is technical,
I believe that we should not allow these mini-transactions to fall
into separate binlogs. Therefore, it is necessary to prohibit the
rotation of binlog in the middle of processing LOAD DATA statement.

https://jira.mariadb.org/browse/MDEV-17262 and
https://jira.mariadb.org/browse/MDEV-17243
parent 5e044f78
...@@ -108,6 +108,8 @@ extern struct wsrep_service_st { ...@@ -108,6 +108,8 @@ extern struct wsrep_service_st {
long long (*wsrep_thd_trx_seqno_func)(THD *thd); long long (*wsrep_thd_trx_seqno_func)(THD *thd);
struct wsrep_ws_handle * (*wsrep_thd_ws_handle_func)(THD *thd); struct wsrep_ws_handle * (*wsrep_thd_ws_handle_func)(THD *thd);
void (*wsrep_thd_auto_increment_variables_func)(THD *thd, unsigned long long *offset, unsigned long long *increment); void (*wsrep_thd_auto_increment_variables_func)(THD *thd, unsigned long long *offset, unsigned long long *increment);
void (*wsrep_set_load_multi_commit_func)(THD *thd, bool split);
bool (*wsrep_is_load_multi_commit_func)(THD *thd);
int (*wsrep_trx_is_aborting_func)(MYSQL_THD thd); int (*wsrep_trx_is_aborting_func)(MYSQL_THD thd);
int (*wsrep_trx_order_before_func)(MYSQL_THD, MYSQL_THD); int (*wsrep_trx_order_before_func)(MYSQL_THD, MYSQL_THD);
void (*wsrep_unlock_rollback_func)(); void (*wsrep_unlock_rollback_func)();
...@@ -152,6 +154,8 @@ extern struct wsrep_service_st { ...@@ -152,6 +154,8 @@ extern struct wsrep_service_st {
#define wsrep_thd_trx_seqno(T) wsrep_service->wsrep_thd_trx_seqno_func(T) #define wsrep_thd_trx_seqno(T) wsrep_service->wsrep_thd_trx_seqno_func(T)
#define wsrep_thd_ws_handle(T) wsrep_service->wsrep_thd_ws_handle_func(T) #define wsrep_thd_ws_handle(T) wsrep_service->wsrep_thd_ws_handle_func(T)
#define wsrep_thd_auto_increment_variables(T,O,I) wsrep_service->wsrep_thd_auto_increment_variables_func(T,O,I) #define wsrep_thd_auto_increment_variables(T,O,I) wsrep_service->wsrep_thd_auto_increment_variables_func(T,O,I)
#define wsrep_set_load_multi_commit(T,S) wsrep_service->wsrep_set_load_multi_commit_func(T,S)
#define wsrep_is_load_multi_commit(T) wsrep_service->wsrep_is_load_multi_commit_func(T)
#define wsrep_trx_is_aborting(T) wsrep_service->wsrep_trx_is_aborting_func(T) #define wsrep_trx_is_aborting(T) wsrep_service->wsrep_trx_is_aborting_func(T)
#define wsrep_trx_order_before(T1,T2) wsrep_service->wsrep_trx_order_before_func(T1,T2) #define wsrep_trx_order_before(T1,T2) wsrep_service->wsrep_trx_order_before_func(T1,T2)
#define wsrep_unlock_rollback() wsrep_service->wsrep_unlock_rollback_func() #define wsrep_unlock_rollback() wsrep_service->wsrep_unlock_rollback_func()
...@@ -206,6 +210,8 @@ my_bool wsrep_thd_is_wsrep(MYSQL_THD thd); ...@@ -206,6 +210,8 @@ my_bool wsrep_thd_is_wsrep(MYSQL_THD thd);
struct wsrep *get_wsrep(); struct wsrep *get_wsrep();
struct wsrep_ws_handle *wsrep_thd_ws_handle(THD *thd); struct wsrep_ws_handle *wsrep_thd_ws_handle(THD *thd);
void wsrep_thd_auto_increment_variables(THD *thd, unsigned long long *offset, unsigned long long *increment); void wsrep_thd_auto_increment_variables(THD *thd, unsigned long long *offset, unsigned long long *increment);
void wsrep_set_load_multi_commit(THD *thd, bool split);
bool wsrep_is_load_multi_commit(THD *thd);
void wsrep_aborting_thd_enqueue(THD *thd); void wsrep_aborting_thd_enqueue(THD *thd);
void wsrep_lock_rollback(); void wsrep_lock_rollback();
void wsrep_post_commit(THD* thd, bool all); void wsrep_post_commit(THD* thd, bool all);
......
connection node_1;
connection node_2;
connection node_3;
connection node_1;
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
connection node_2;
connection node_3;
SET GLOBAL wsrep_provider_options = 'gmcast.isolate = 1';
SET SESSION wsrep_on = OFF;
SET SESSION wsrep_on = ON;
SET SESSION wsrep_sync_wait = 0;
connection node_2a;
SET SESSION wsrep_sync_wait = 0;
connection node_2;
SET GLOBAL wsrep_load_data_splitting = TRUE;
SET DEBUG_SYNC='intermediate_transaction_commit SIGNAL commited WAIT_FOR ist';
connection node_2a;
SET DEBUG_SYNC='now WAIT_FOR commited';
connection node_3;
SET GLOBAL wsrep_provider_options = 'gmcast.isolate = 0';
connection node_2a;
SET DEBUG_SYNC='now SIGNAL ist';
connection node_1;
connection node_2;
SET DEBUG_SYNC='RESET';
SELECT COUNT(*) = 95000 FROM t1;
COUNT(*) = 95000
1
wsrep_last_committed_diff
1
connection node_1;
SET GLOBAL wsrep_load_data_splitting = 1;;
DROP TABLE t1;
disconnect node_3;
disconnect node_2;
disconnect node_1;
!include ../galera_3nodes.cnf
[mysqld]
wsrep-causal-reads=OFF
--source include/have_debug_sync.inc
--source include/galera_cluster.inc
--source include/have_innodb.inc
--source include/big_test.inc
# Establish connection to the third node:
--let $galera_connection_name = node_3
--let $galera_server_number = 3
--source include/galera_connect.inc
# Establish additional connection to the second node
# (which is used in the test for synchronization control):
--let $galera_connection_name = node_2a
--let $galera_server_number = 2
--source include/galera_connect.inc
# Save original auto_increment_offset values:
--let $node_1=node_1
--let $node_2=node_2
--let $node_3=node_3
--source ../galera/include/auto_increment_offset_save.inc
# Create a file for LOAD DATA with 95K entries
--connection node_1
--perl
open(FILE, ">", "$ENV{'MYSQLTEST_VARDIR'}/tmp/galera_var_load_data_splitting.csv") or die;
foreach my $i (1..95000) {
print FILE "$i\n";
}
EOF
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
# Let's wait for the completion of the formation of a cluster
# of three nodes:
--let $wait_condition = SELECT VARIABLE_VALUE = 3 FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_size';
--source include/wait_condition.inc
--connection node_2
--source include/wait_until_ready.inc
--connection node_3
--source include/wait_until_ready.inc
# Disconnect the third node from the cluster:
SET GLOBAL wsrep_provider_options = 'gmcast.isolate = 1';
SET SESSION wsrep_on = OFF;
--let $wait_condition = SELECT VARIABLE_VALUE = 'non-Primary' FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_status';
--source include/wait_condition.inc
SET SESSION wsrep_on = ON;
SET SESSION wsrep_sync_wait = 0;
# Disable sync wait for control connection:
--connection node_2a
SET SESSION wsrep_sync_wait = 0;
# Let's wait until the other nodes stop seeing the third
# node in the cluster:
--let $wait_condition = SELECT VARIABLE_VALUE = 2 FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_size';
--source include/wait_condition.inc
# Record wsrep_last_committed as it was before LOAD DATA:
--connection node_2
--let $wsrep_last_committed_before = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.SESSION_STATUS WHERE VARIABLE_NAME = 'wsrep_last_committed'`
# Enable splitting for LOAD DATA:
--let $wsrep_load_data_splitting_orig = `SELECT @@wsrep_load_data_splitting`
SET GLOBAL wsrep_load_data_splitting = TRUE;
# Stop after the first commit and wait for the IST signal:
SET DEBUG_SYNC='intermediate_transaction_commit SIGNAL commited WAIT_FOR ist';
# Perform the LOAD DATA statement:
--disable_query_log
let v1='$MYSQLTEST_VARDIR/tmp/galera_var_load_data_splitting.csv';
--send_eval LOAD DATA INFILE $v1 INTO TABLE t1;
--enable_query_log
# Wait for the first commit:
--connection node_2a
SET DEBUG_SYNC='now WAIT_FOR commited';
# Initiate the IST:
--connection node_3
SET GLOBAL wsrep_provider_options = 'gmcast.isolate = 0';
# Continue the execution of LOAD DATA:
--connection node_2a
SET DEBUG_SYNC='now SIGNAL ist';
# Let's wait for the recovery of the cluster
# of three nodes:
--connection node_1
--let $wait_condition = SELECT VARIABLE_VALUE = 3 FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_size';
--source include/wait_condition.inc
# Save the LOAD DATA results:
--connection node_2
--reap
# Reset all synchronization points and signals:
SET DEBUG_SYNC='RESET';
# Read the wsrep_last_commited after LOAD DATA:
--let $wsrep_last_committed_after = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.SESSION_STATUS WHERE VARIABLE_NAME = 'wsrep_last_committed'`
# Check the records:
SELECT COUNT(*) = 95000 FROM t1;
# LOAD-ing 95K rows should causes 10 commits to be registered:
--disable_query_log
--eval SELECT $wsrep_last_committed_after = $wsrep_last_committed_before + 10 AS wsrep_last_committed_diff;
--enable_query_log
# Restore the original splitting:
--connection node_1
--eval SET GLOBAL wsrep_load_data_splitting = $wsrep_load_data_splitting_orig;
# Drop test table:
DROP TABLE t1;
# Restore original auto_increment_offset values:
--source ../galera/include/auto_increment_offset_restore.inc
--let $galera_cluster_size=3
--source include/galera_end.inc
...@@ -6413,8 +6413,25 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -6413,8 +6413,25 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
update_binlog_end_pos(offset); update_binlog_end_pos(offset);
signal_update(); signal_update();
/*
If a transaction with the LOAD DATA statement is divided
into logical mini-transactions (of the 10K rows) and binlog
is rotated, then the last portion of data may be lost due to
wsrep handler re-registration at the boundary of the split.
Since splitting of the LOAD DATA into mini-transactions is
logical, we should not allow these mini-transactions to fall
into separate binlogs. Therefore, it is necessary to prohibit
the rotation of binlog in the middle of processing LOAD DATA:
*/
#ifdef WITH_WSREP
if (!thd->wsrep_split_flag)
{
#endif /* WITH_WSREP */
if ((error= rotate(false, &check_purge))) if ((error= rotate(false, &check_purge)))
check_purge= false; check_purge= false;
#ifdef WITH_WSREP
}
#endif /* WITH_WSREP */
} }
} }
} }
...@@ -7139,8 +7156,25 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) ...@@ -7139,8 +7156,25 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
!(error= flush_and_sync(0))) !(error= flush_and_sync(0)))
{ {
signal_update(); signal_update();
/*
If a transaction with the LOAD DATA statement is divided
into logical mini-transactions (of the 10K rows) and binlog
is rotated, then the last portion of data may be lost due to
wsrep handler re-registration at the boundary of the split.
Since splitting of the LOAD DATA into mini-transactions is
logical, we should not allow these mini-transactions to fall
into separate binlogs. Therefore, it is necessary to prohibit
the rotation of binlog in the middle of processing LOAD DATA:
*/
#ifdef WITH_WSREP
if (!thd->wsrep_split_flag)
{
#endif /* WITH_WSREP */
if ((error= rotate(false, &check_purge))) if ((error= rotate(false, &check_purge)))
check_purge= false; check_purge= false;
#ifdef WITH_WSREP
}
#endif /* WITH_WSREP */
} }
offset= my_b_tell(&log_file); offset= my_b_tell(&log_file);
...@@ -7906,6 +7940,20 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -7906,6 +7940,20 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
mark_xids_active(binlog_id, xid_count); mark_xids_active(binlog_id, xid_count);
} }
/*
If a transaction with the LOAD DATA statement is divided
into logical mini-transactions (of the 10K rows) and binlog
is rotated, then the last portion of data may be lost due to
wsrep handler re-registration at the boundary of the split.
Since splitting of the LOAD DATA into mini-transactions is
logical, we should not allow these mini-transactions to fall
into separate binlogs. Therefore, it is necessary to prohibit
the rotation of binlog in the middle of processing LOAD DATA:
*/
#ifdef WITH_WSREP
if (!leader->thd->wsrep_split_flag)
{
#endif /* WITH_WSREP */
if (rotate(false, &check_purge)) if (rotate(false, &check_purge))
{ {
/* /*
...@@ -7925,6 +7973,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -7925,6 +7973,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, errno); my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, errno);
check_purge= false; check_purge= false;
} }
#ifdef WITH_WSREP
}
#endif /* WITH_WSREP */
/* In case of binlog rotate, update the correct current binlog offset. */ /* In case of binlog rotate, update the correct current binlog offset. */
commit_offset= my_b_write_tell(&log_file); commit_offset= my_b_write_tell(&log_file);
} }
......
...@@ -784,6 +784,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) ...@@ -784,6 +784,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
wsrep_affected_rows = 0; wsrep_affected_rows = 0;
wsrep_replicate_GTID = false; wsrep_replicate_GTID = false;
wsrep_skip_wsrep_GTID = false; wsrep_skip_wsrep_GTID = false;
wsrep_split_flag = false;
#endif #endif
/* Call to init() below requires fully initialized Open_tables_state. */ /* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this); reset_open_tables_state(this);
...@@ -1218,6 +1219,7 @@ void THD::init(void) ...@@ -1218,6 +1219,7 @@ void THD::init(void)
wsrep_affected_rows = 0; wsrep_affected_rows = 0;
wsrep_replicate_GTID = false; wsrep_replicate_GTID = false;
wsrep_skip_wsrep_GTID = false; wsrep_skip_wsrep_GTID = false;
wsrep_split_flag = false;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
if (variables.sql_log_bin) if (variables.sql_log_bin)
......
...@@ -4431,6 +4431,14 @@ class THD :public Statement, ...@@ -4431,6 +4431,14 @@ class THD :public Statement,
ulong wsrep_affected_rows; ulong wsrep_affected_rows;
bool wsrep_replicate_GTID; bool wsrep_replicate_GTID;
bool wsrep_skip_wsrep_GTID; bool wsrep_skip_wsrep_GTID;
/* This flag is set when innodb do an intermediate commit to
processing the LOAD DATA INFILE statement by splitting it into 10K
rows chunks. If flag is set, then binlog rotation is not performed
while intermediate transaction try to commit, because in this case
rotation causes unregistration of innodb handler. Later innodb handler
registered again, but replication of last chunk of rows is skipped
by the innodb engine: */
bool wsrep_split_flag;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
/* Handling of timeouts for commands */ /* Handling of timeouts for commands */
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "sql_trigger.h" #include "sql_trigger.h"
#include "sql_derived.h" #include "sql_derived.h"
#include "sql_show.h" #include "sql_show.h"
#include "debug_sync.h"
extern "C" int _my_b_net_read(IO_CACHE *info, uchar *Buffer, size_t Count); extern "C" int _my_b_net_read(IO_CACHE *info, uchar *Buffer, size_t Count);
...@@ -119,21 +120,43 @@ static bool wsrep_load_data_split(THD *thd, const TABLE *table, ...@@ -119,21 +120,43 @@ static bool wsrep_load_data_split(THD *thd, const TABLE *table,
if (hton->db_type != DB_TYPE_INNODB) if (hton->db_type != DB_TYPE_INNODB)
DBUG_RETURN(false); DBUG_RETURN(false);
WSREP_DEBUG("intermediate transaction commit in LOAD DATA"); WSREP_DEBUG("intermediate transaction commit in LOAD DATA");
wsrep_set_load_multi_commit(thd, true);
if (wsrep_run_wsrep_commit(thd, true) != WSREP_TRX_OK) DBUG_RETURN(true); if (wsrep_run_wsrep_commit(thd, true) != WSREP_TRX_OK) DBUG_RETURN(true);
if (binlog_hton->commit(binlog_hton, thd, true)) DBUG_RETURN(true); if (binlog_hton->commit(binlog_hton, thd, true)) DBUG_RETURN(true);
wsrep_post_commit(thd, true); wsrep_post_commit(thd, true);
hton->commit(hton, thd, true); hton->commit(hton, thd, true);
wsrep_set_load_multi_commit(thd, false);
DEBUG_SYNC(thd, "intermediate_transaction_commit");
table->file->extra(HA_EXTRA_FAKE_START_STMT); table->file->extra(HA_EXTRA_FAKE_START_STMT);
} }
DBUG_RETURN(false); DBUG_RETURN(false);
} }
# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \ /*
if (wsrep_load_data_split(thd,table,info)) DBUG_RETURN(1) If the commit fails, then an early return from
the function occurs there and therefore we need
to reset the table->auto_increment_field_not_null
flag, which is usually reset after calling
the write_record():
*/
#define WSREP_LOAD_DATA_SPLIT(thd,table,info) \
if (wsrep_load_data_split(thd,table,info)) \
{ \
table->auto_increment_field_not_null= FALSE; \
DBUG_RETURN(1); \
}
#else /* WITH_WSREP */ #else /* WITH_WSREP */
#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */ #define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
#define WRITE_RECORD(thd,table,info) \
do { \
int err_= write_record(thd, table, &info); \
table->auto_increment_field_not_null= FALSE; \
if (err_) \
DBUG_RETURN(1); \
} while (0)
class READ_INFO: public Load_data_param class READ_INFO: public Load_data_param
{ {
File file; File file;
...@@ -911,7 +934,7 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, ...@@ -911,7 +934,7 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
List_iterator_fast<Item> it(fields_vars); List_iterator_fast<Item> it(fields_vars);
Item *item; Item *item;
TABLE *table= table_list->table; TABLE *table= table_list->table;
bool err, progress_reports; bool progress_reports;
ulonglong counter, time_to_report_progress; ulonglong counter, time_to_report_progress;
DBUG_ENTER("read_fixed_length"); DBUG_ENTER("read_fixed_length");
...@@ -1003,11 +1026,8 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, ...@@ -1003,11 +1026,8 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
} }
WSREP_LOAD_DATA_SPLIT(thd, table, info); WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info); WRITE_RECORD(thd, table, info);
table->auto_increment_field_not_null= FALSE;
if (err)
DBUG_RETURN(1);
/* /*
We don't need to reset auto-increment field since we are restoring We don't need to reset auto-increment field since we are restoring
its default value at the beginning of each loop iteration. its default value at the beginning of each loop iteration.
...@@ -1040,7 +1060,7 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, ...@@ -1040,7 +1060,7 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
Item *item; Item *item;
TABLE *table= table_list->table; TABLE *table= table_list->table;
uint enclosed_length; uint enclosed_length;
bool err, progress_reports; bool progress_reports;
ulonglong counter, time_to_report_progress; ulonglong counter, time_to_report_progress;
DBUG_ENTER("read_sep_field"); DBUG_ENTER("read_sep_field");
...@@ -1124,7 +1144,7 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, ...@@ -1124,7 +1144,7 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
{ {
Load_data_outvar *dst= item->get_load_data_outvar_or_error(); Load_data_outvar *dst= item->get_load_data_outvar_or_error();
DBUG_ASSERT(dst); DBUG_ASSERT(dst);
if (dst->load_data_set_no_data(thd, &read_info)) if (unlikely(dst->load_data_set_no_data(thd, &read_info)))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
} }
...@@ -1146,10 +1166,8 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, ...@@ -1146,10 +1166,8 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
} }
WSREP_LOAD_DATA_SPLIT(thd, table, info); WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info); WRITE_RECORD(thd, table, info);
table->auto_increment_field_not_null= FALSE;
if (err)
DBUG_RETURN(1);
/* /*
We don't need to reset auto-increment field since we are restoring We don't need to reset auto-increment field since we are restoring
its default value at the beginning of each loop iteration. its default value at the beginning of each loop iteration.
...@@ -1267,13 +1285,10 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, ...@@ -1267,13 +1285,10 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
case VIEW_CHECK_ERROR: case VIEW_CHECK_ERROR:
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
WSREP_LOAD_DATA_SPLIT(thd, table, info); WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info); WRITE_RECORD(thd, table, info);
table->auto_increment_field_not_null= false;
if (err)
DBUG_RETURN(1);
/* /*
We don't need to reset auto-increment field since we are restoring We don't need to reset auto-increment field since we are restoring
its default value at the beginning of each loop iteration. its default value at the beginning of each loop iteration.
......
...@@ -178,6 +178,8 @@ static struct wsrep_service_st wsrep_handler = { ...@@ -178,6 +178,8 @@ static struct wsrep_service_st wsrep_handler = {
wsrep_thd_trx_seqno, wsrep_thd_trx_seqno,
wsrep_thd_ws_handle, wsrep_thd_ws_handle,
wsrep_thd_auto_increment_variables, wsrep_thd_auto_increment_variables,
wsrep_set_load_multi_commit,
wsrep_is_load_multi_commit,
wsrep_trx_is_aborting, wsrep_trx_is_aborting,
wsrep_trx_order_before, wsrep_trx_order_before,
wsrep_unlock_rollback, wsrep_unlock_rollback,
......
...@@ -133,6 +133,12 @@ void wsrep_thd_auto_increment_variables(THD *thd, ...@@ -133,6 +133,12 @@ void wsrep_thd_auto_increment_variables(THD *thd,
*increment= thd->variables.auto_increment_increment; *increment= thd->variables.auto_increment_increment;
} }
void wsrep_set_load_multi_commit(THD *thd, bool split)
{ }
bool wsrep_is_load_multi_commit(THD *thd)
{ return false; }
int wsrep_trx_is_aborting(THD *) int wsrep_trx_is_aborting(THD *)
{ return 0; } { return 0; }
......
...@@ -45,6 +45,7 @@ void wsrep_cleanup_transaction(THD *thd) ...@@ -45,6 +45,7 @@ void wsrep_cleanup_transaction(THD *thd)
thd->wsrep_exec_mode= LOCAL_STATE; thd->wsrep_exec_mode= LOCAL_STATE;
thd->wsrep_affected_rows= 0; thd->wsrep_affected_rows= 0;
thd->wsrep_skip_wsrep_GTID= false; thd->wsrep_skip_wsrep_GTID= false;
thd->wsrep_split_flag= false;
return; return;
} }
......
...@@ -708,3 +708,13 @@ my_bool wsrep_thd_is_applier(MYSQL_THD thd) ...@@ -708,3 +708,13 @@ my_bool wsrep_thd_is_applier(MYSQL_THD thd)
return (is_applier); return (is_applier);
} }
void wsrep_set_load_multi_commit(THD *thd, bool split)
{
thd->wsrep_split_flag= split;
}
bool wsrep_is_load_multi_commit(THD *thd)
{
return thd->wsrep_split_flag;
}
...@@ -8199,6 +8199,16 @@ ha_innobase::write_row( ...@@ -8199,6 +8199,16 @@ ha_innobase::write_row(
++trx->will_lock; ++trx->will_lock;
} }
#ifdef WITH_WSREP
if (wsrep_is_load_multi_commit(m_user_thd))
{
/* Note that this transaction is still active. */
trx_register_for_2pc(m_prebuilt->trx);
/* We will need an IX lock on the destination table. */
m_prebuilt->sql_stat_start = TRUE;
}
#endif /* WITH_WSREP */
/* Handling of Auto-Increment Columns. */ /* Handling of Auto-Increment Columns. */
if (table->next_number_field && record == table->record[0]) { if (table->next_number_field && record == table->record[0]) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment