Commit c2c637c5 authored by Jan Lindström's avatar Jan Lindström Committed by GitHub

Merge pull request #1180 from codership/10.4-load-data-splitting

10.4 make wsrep_load_data_splitting use streaming replication
parents 6476126c f20dfeec
......@@ -9,5 +9,5 @@ SELECT COUNT(*) = 20000 FROM t1;
COUNT(*) = 20000
1
wsrep_last_committed_diff
0
1
DROP TABLE t1;
connection node_2;
connection node_1;
SET SESSION wsrep_trx_fragment_size = 512;
SET GLOBAL wsrep_load_data_splitting = TRUE;
Warnings:
Warning 1287 '@@wsrep_load_data_splitting' is deprecated and will be removed in a future release
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
connection node_2;
connection node_1;
connection node_2;
SELECT COUNT(*) = 95000 FROM t1;
COUNT(*) = 95000
1
wsrep_last_committed_diff
1
connection node_1;
Warnings:
Warning 1287 '@@wsrep_load_data_splitting' is deprecated and will be removed in a future release
DROP TABLE t1;
......@@ -33,7 +33,7 @@ CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
SELECT COUNT(*) = 20000 FROM t1;
# LOAD-ing 20K rows causes 3 commits to be registered
--disable_query_log
--eval SELECT $wsrep_last_committed_after - $wsrep_last_committed_before = 3 AS wsrep_last_committed_diff;
--eval SELECT $wsrep_last_committed_after - $wsrep_last_committed_before = 3 AS wsrep_last_committed_diff
--enable_query_log
DROP TABLE t1;
......@@ -100,41 +100,39 @@ class Term_string
#define PUSH(A) *(stack_pos++)=(A)
#ifdef WITH_WSREP
/** If requested by wsrep_load_data_splitting, commit and restart
the transaction after every 10,000 inserted rows. */
static bool wsrep_load_data_split(THD *thd, const TABLE *table,
const COPY_INFO &info)
/** If requested by wsrep_load_data_splitting and streaming replication is
not enabled, replicate a streaming fragment every 10,000 rows.*/
class Wsrep_load_data_split
{
DBUG_ENTER("wsrep_load_data_split");
if (!wsrep_load_data_splitting || !WSREP(thd)
|| !info.records || (info.records % 10000)
|| !thd->transaction.stmt.ha_list
|| thd->transaction.stmt.ha_list->ht() != binlog_hton
|| !thd->transaction.stmt.ha_list->next()
|| thd->transaction.stmt.ha_list->next()->next())
DBUG_RETURN(false);
if (handlerton* hton= thd->transaction.stmt.ha_list->next()->ht())
public:
Wsrep_load_data_split(THD *thd)
: m_thd(thd)
, m_load_data_splitting(wsrep_load_data_splitting)
, m_fragment_unit(thd->wsrep_trx().streaming_context().fragment_unit())
, m_fragment_size(thd->wsrep_trx().streaming_context().fragment_size())
{
if (!(hton->flags & HTON_WSREP_REPLICATION))
DBUG_RETURN(false);
WSREP_DEBUG("intermediate transaction commit in LOAD DATA");
wsrep_tc_log_commit(thd);
table->file->extra(HA_EXTRA_FAKE_START_STMT);
if (WSREP(m_thd) && m_load_data_splitting)
{
/* Override streaming settings with backward compatible values for
load data splitting */
m_thd->wsrep_cs().streaming_params(wsrep::streaming_context::row, 10000);
}
}
DBUG_RETURN(false);
}
# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \
if (wsrep_load_data_split(thd,table,info)) \
{ \
table->auto_increment_field_not_null= FALSE; \
DBUG_RETURN(1); \
}
#else /* WITH_WSREP */
#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */
~Wsrep_load_data_split()
{
if (WSREP(m_thd) && m_load_data_splitting)
{
/* Restore original settings */
m_thd->wsrep_cs().streaming_params(m_fragment_unit, m_fragment_size);
}
}
private:
THD *m_thd;
my_bool m_load_data_splitting;
enum wsrep::streaming_context::fragment_unit m_fragment_unit;
size_t m_fragment_size;
};
#endif /* WITH_WSREP */
class READ_INFO: public Load_data_param
......@@ -354,6 +352,9 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list,
bool transactional_table __attribute__((unused));
DBUG_ENTER("mysql_load");
#ifdef WITH_WSREP
Wsrep_load_data_split wsrep_load_data_split(thd);
#endif /* WITH_WSREP */
/*
Bug #34283
mysqlbinlog leaves tmpfile after termination if binlog contains
......@@ -1005,7 +1006,6 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
......@@ -1148,7 +1148,6 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
......@@ -1271,7 +1270,6 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= false;
if (err)
......
......@@ -2499,57 +2499,6 @@ int wsrep_ordered_commit_if_no_binlog(THD* thd, bool all)
return 0;
}
wsrep_status_t wsrep_tc_log_commit(THD* thd)
{
int cookie;
my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_LOAD);
if (wsrep_before_commit(thd, true))
{
WSREP_DEBUG("wsrep_tc_log_commit: wsrep_before_commit failed %llu",
thd->thread_id);
return WSREP_TRX_FAIL;
}
cookie= tc_log->log_and_order(thd, xid, 1, false, true);
if (wsrep_after_commit(thd, true))
{
WSREP_DEBUG("wsrep_tc_log_commit: wsrep_after_commit failed %llu",
thd->thread_id);
return WSREP_TRX_FAIL;
}
if (!cookie)
{
WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
return WSREP_TRX_FAIL;
}
if (tc_log->unlog(cookie, xid))
{
WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
return WSREP_TRX_FAIL;
}
if (wsrep_after_statement(thd))
{
return WSREP_TRX_FAIL;
}
/* Set wsrep transaction id if not set. */
if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
{
if (thd->wsrep_next_trx_id() == WSREP_UNDEFINED_TRX_ID)
{
thd->set_wsrep_next_trx_id(thd->query_id);
}
DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID);
}
if (wsrep_start_transaction(thd, thd->wsrep_next_trx_id()))
{
return WSREP_TRX_FAIL;
}
DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID);
return WSREP_OK;
}
int wsrep_thd_retry_counter(const THD *thd)
{
return thd->wsrep_retry_counter;
......
......@@ -444,15 +444,6 @@ bool wsrep_provider_is_SR_capable();
*/
int wsrep_ordered_commit_if_no_binlog(THD*, bool);
/**
* Commit the current transaction with the
* MySQL "Transaction Coordinator Log" (see `class TC_LOG` in sql/log.h).
* Calling this function will generate and assign a new wsrep transaction id
* for `thd`.
* @return WSREP_OK on success or other WSREP_* error code on failure
*/
wsrep_status_t wsrep_tc_log_commit(THD* thd);
/**
* Initialize WSREP server instance.
*
......
Subproject commit e7d72ae7f6a6995a21d743389426a963429a1fff
Subproject commit 20b52ff1ddc3b2f547b7081471f46dcfa5efabc7
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment