Commit 447b19a3 authored by Seppo Jaakola's avatar Seppo Jaakola

Ported all remaining storage/innobase changes from lp:codership-mysql/5.6, up tp revision #4021

This is same level as wsrep_25.1 milestone
Note: stotage/xtaradb is not upgraded yet
parent 96423441
......@@ -141,7 +141,7 @@ extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key,
size_t* key_len);
extern handlerton * wsrep_hton;
extern handlerton * binlog_hton;
extern TC_LOG* tc_log;
extern void wsrep_cleanup_transaction(THD *thd);
#endif /* WITH_WSREP */
/** to protect innobase_open_files */
......@@ -1175,6 +1175,10 @@ innobase_srv_conc_enter_innodb(
/*===========================*/
trx_t* trx) /*!< in: transaction handle */
{
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) &&
wsrep_thd_is_brute_force(trx->mysql_thd)) return;
#endif /* WITH_WSREP */
if (srv_thread_concurrency) {
if (trx->n_tickets_to_enter_innodb > 0) {
......@@ -1209,6 +1213,10 @@ innobase_srv_conc_exit_innodb(
#ifdef UNIV_SYNC_DEBUG
ut_ad(!sync_thread_levels_nonempty_trx(trx->has_search_latch));
#endif /* UNIV_SYNC_DEBUG */
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) &&
wsrep_thd_is_brute_force(trx->mysql_thd)) return;
#endif /* WITH_WSREP */
/* This is to avoid making an unnecessary function call. */
if (trx->declared_to_be_inside_innodb
......@@ -5484,6 +5492,10 @@ wsrep_innobase_mysql_sort(
tmp_length = charset->coll->strnxfrm(charset, str, str_length,
tmp_str, str_length);
/* Note: in MySQL 5.6:
tmp_length = charset->coll->strnxfrm(charset, str, str_length,
str_length, tmp_str, tmp_length, 0);
*/
DBUG_ASSERT(tmp_length == str_length);
break;
......@@ -9177,7 +9189,7 @@ wsrep_dict_foreign_find_index(
ulint check_null);
extern ulint
extern dberr_t
wsrep_append_foreign_key(
/*===========================*/
trx_t* trx, /*!< in: trx */
......@@ -9292,7 +9304,7 @@ wsrep_append_foreign_key(
(index && index->table_name) ? index->table_name :
"void table",
wsrep_thd_query(thd));
return rcode;
return DB_ERROR;
}
strncpy(cache_key,
(wsrep_protocol_version > 1) ?
......@@ -9400,11 +9412,14 @@ wsrep_append_key(
WSREP_WARN("Appending row key failed: %s, %d",
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void", rcode);
DBUG_RETURN(rcode);
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
DBUG_RETURN(0);
}
extern void compute_md5_hash(char *digest, const char *buf, int len);
#define MD5_HASH compute_md5_hash
int
ha_innobase::wsrep_append_keys(
/*==================*/
......@@ -9413,16 +9428,17 @@ ha_innobase::wsrep_append_keys(
const uchar* record0, /* in: row in MySQL format */
const uchar* record1) /* in: row in MySQL format */
{
int rcode;
DBUG_ENTER("wsrep_append_keys");
bool key_appended = false;
bool key_appended = false;
trx_t *trx = thd_to_trx(thd);
if (table_share && table_share->tmp_table != NO_TMP_TABLE) {
WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
wsrep_thd_thread_id(thd),
table_share->tmp_table,
(wsrep_thd_query(thd)) ?
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void");
DBUG_RETURN(0);
}
......@@ -9438,20 +9454,19 @@ ha_innobase::wsrep_append_keys(
table, 0, key, key_info->key_length, record0, &is_null);
if (!is_null) {
int rcode = wsrep_append_key(
thd, trx, table_share, table, keyval,
rcode = wsrep_append_key(
thd, trx, table_share, table, keyval,
len, shared);
if (rcode) DBUG_RETURN(rcode);
}
else
{
WSREP_DEBUG("NULL key skipped (proto 0): %s",
WSREP_DEBUG("NULL key skipped (proto 0): %s",
wsrep_thd_query(thd));
}
} else {
ut_a(table->s->keys <= 256);
uint i;
for (i=0; i<table->s->keys; ++i) {
uint len;
char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
......@@ -9481,27 +9496,27 @@ ha_innobase::wsrep_append_keys(
key_appended = true;
len = wsrep_store_key_val_for_row(
table, i, key0, key_info->key_length,
table, i, key0, key_info->key_length,
record0, &is_null);
if (!is_null) {
int rcode = wsrep_append_key(
thd, trx, table_share, table,
rcode = wsrep_append_key(
thd, trx, table_share, table,
keyval0, len+1, shared);
if (rcode) DBUG_RETURN(rcode);
}
else
{
WSREP_DEBUG("NULL key skipped: %s",
WSREP_DEBUG("NULL key skipped: %s",
wsrep_thd_query(thd));
}
if (record1) {
len = wsrep_store_key_val_for_row(
table, i, key1, key_info->key_length,
table, i, key1, key_info->key_length,
record1, &is_null);
if (!is_null && memcmp(key0, key1, len)) {
int rcode = wsrep_append_key(
thd, trx, table_share,
table,
rcode = wsrep_append_key(
thd, trx, table_share,
table,
keyval1, len+1, shared);
if (rcode) DBUG_RETURN(rcode);
}
......@@ -16658,15 +16673,20 @@ wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno)
abort();
}
int
wsrep_innobase_kill_one_trx(const trx_t *bf_trx, trx_t *victim_trx, ibool signal)
wsrep_innobase_kill_one_trx(void * const bf_thd_ptr,
const trx_t * const bf_trx,
trx_t *victim_trx, ibool signal)
{
ut_ad(lock_mutex_own());
ut_ad(trx_mutex_own(victim_trx));
ut_ad(bf_thd_ptr);
ut_ad(victim_trx);
DBUG_ENTER("wsrep_innobase_kill_one_trx");
THD *bf_thd = (THD *)((bf_trx) ? bf_trx->mysql_thd : NULL);
THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL;
THD *thd = (THD *) victim_trx->mysql_thd;
int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;
if (!bf_thd) bf_thd = (bf_trx) ? (THD *)bf_trx->mysql_thd : NULL;
if (!thd) {
DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
WSREP_WARN("no THD for trx: %lu", victim_trx->id);
......@@ -16869,12 +16889,15 @@ wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
if (victim_trx)
{
mutex_enter(&trx_sys->mutex);
int rcode = wsrep_innobase_kill_one_trx(bf_trx, victim_trx,
signal);
mutex_exit(&trx_sys->mutex);
lock_mutex_enter();
trx_mutex_enter(victim_trx);
int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx,
victim_trx, signal);
trx_mutex_exit(victim_trx);
lock_mutex_exit();
wsrep_srv_conc_cancel_wait(victim_trx);
DBUG_RETURN(rcode);
DBUG_RETURN(rcode);
} else {
WSREP_DEBUG("victim does not have transaction");
wsrep_thd_LOCK(victim_thd);
......
......@@ -289,7 +289,8 @@ innobase_casedn_str(
#ifdef WITH_WSREP
UNIV_INTERN
int
wsrep_innobase_kill_one_trx(const trx_t *bf_trx, trx_t *victim_trx, ibool signal);
wsrep_innobase_kill_one_trx(void *thd_ptr,
const trx_t *bf_trx, trx_t *victim_trx, ibool signal);
extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
int wsrep_trx_order_before(void *thd1, void *thd2);
void wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
......
......@@ -445,7 +445,10 @@ trx_id_t
trx_sys_get_new_trx_id(void)
/*========================*/
{
#ifndef WITH_WSREP
/* wsrep_fake_trx_id violates this assert */
ut_ad(mutex_own(&trx_sys->mutex));
#endif /* WITH_WSREP */
/* VERY important: after the database is started, max_trx_id value is
divisible by TRX_SYS_TRX_ID_WRITE_MARGIN, and the following if
......
......@@ -1583,7 +1583,9 @@ lock_rec_other_has_expl_req(
#ifdef WITH_WSREP
static void
wsrep_kill_victim(trx_t *trx, lock_t *lock) {
wsrep_kill_victim(const trx_t * const trx, const lock_t *lock) {
ut_ad(lock_mutex_own());
ut_ad(trx_mutex_own(lock->trx));
int bf_this = wsrep_thd_is_brute_force(trx->mysql_thd);
int bf_other =
wsrep_thd_is_brute_force(lock->trx->mysql_thd);
......@@ -1591,7 +1593,6 @@ wsrep_kill_victim(trx_t *trx, lock_t *lock) {
(bf_this && bf_other && wsrep_trx_order_before(
trx->mysql_thd, lock->trx->mysql_thd))) {
// if (lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
if (wsrep_debug)
fprintf(stderr, "WSREP: BF victim waiting\n");
......@@ -1599,13 +1600,14 @@ wsrep_kill_victim(trx_t *trx, lock_t *lock) {
is in the queue*/
} else if (lock->trx != trx) {
if (wsrep_log_conflicts) {
mutex_enter(&trx_sys->mutex);
if (bf_this)
fputs("\n*** Priority TRANSACTION:\n",
stderr);
else
fputs("\n*** Victim TRANSACTION:\n",
stderr);
trx_print(stderr, trx, 3000);
trx_print_latched(stderr, trx, 3000);
if (bf_other)
fputs("\n*** Priority TRANSACTION:\n",
......@@ -1613,8 +1615,9 @@ wsrep_kill_victim(trx_t *trx, lock_t *lock) {
else
fputs("\n*** Victim TRANSACTION:\n",
stderr);
trx_print(stderr, lock->trx, 3000);
trx_print_latched(stderr, lock->trx, 3000);
mutex_exit(&trx_sys->mutex);
fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n",
stderr);
......@@ -1624,8 +1627,8 @@ wsrep_kill_victim(trx_t *trx, lock_t *lock) {
lock_table_print(stderr, lock);
}
}
wsrep_innobase_kill_one_trx(
trx, lock->trx, TRUE);
wsrep_innobase_kill_one_trx(trx->mysql_thd,
(const trx_t*) trx, lock->trx, TRUE);
}
}
}
......@@ -1800,7 +1803,8 @@ lock_t*
lock_rec_create(
/*============*/
#ifdef WITH_WSREP
lock_t* c_lock, /* conflicting lock */
lock_t* const c_lock, /* conflicting lock */
que_thr_t* thr,
#endif
ulint type_mode,/*!< in: lock mode and wait
flag, type is ignored and
......@@ -2048,12 +2052,14 @@ lock_rec_enqueue_waiting(
to be granted, note that we already own
the trx mutex. */
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) && c_lock->trx->lock.was_chosen_as_deadlock_victim) {
return(DB_DEADLOCK);
if (wsrep_on(trx->mysql_thd) &&
trx->lock.was_chosen_as_deadlock_victim) {
return(DB_DEADLOCK);
}
/* Enqueue the lock request that will wait to be granted */
lock = lock_rec_create(c_lock, type_mode | LOCK_WAIT,
block, heap_no, index, trx, TRUE);
lock = lock_rec_create(
c_lock, thr,
type_mode | LOCK_WAIT, block, heap_no,
index, trx, TRUE);
#else
lock = lock_rec_create(
type_mode | LOCK_WAIT, block, heap_no,
......@@ -2229,9 +2235,9 @@ lock_rec_add_to_queue(
somebody_waits:
#ifdef WITH_WSREP
return(lock_rec_create(
NULL, type_mode, block, heap_no, index, trx,
caller_owns_trx_mutex));
return(lock_rec_create(NULL, NULL,
type_mode, block, heap_no, index, trx,
caller_owns_trx_mutex));
#else
return(lock_rec_create(
type_mode, block, heap_no, index, trx,
......@@ -2303,10 +2309,10 @@ lock_rec_lock_fast(
if (!impl) {
/* Note that we don't own the trx mutex. */
#ifdef WITH_WSREP
lock = lock_rec_create(
NULL, mode, block, heap_no, index, trx, FALSE);
lock = lock_rec_create(NULL, thr,
mode, block, heap_no, index, trx, FALSE);
#else
lock = lock_rec_create(
lock = lock_rec_create(
mode, block, heap_no, index, trx, FALSE);
#endif
......@@ -2361,10 +2367,10 @@ lock_rec_lock_slow(
dict_index_t* index, /*!< in: index of record */
que_thr_t* thr) /*!< in: query thread */
{
trx_t* trx;
#ifdef WITH_WSREP
lock_t *c_lock;
lock_t* c_lock(NULL);
#endif
trx_t* trx;
lock_t* lock;
dberr_t err = DB_SUCCESS;
......@@ -2430,6 +2436,9 @@ lock_rec_lock_slow(
ut_ad(lock == NULL);
enqueue_waiting:
#ifdef WITH_WSREP
/* c_lock is NULL here if jump to enqueue_waiting happened
but it's ok because lock is not NULL in that case and c_lock
is not used. */
err = lock_rec_enqueue_waiting(
c_lock, mode, block, heap_no,
lock, index, thr);
......
......@@ -919,7 +919,7 @@ row_ins_invalidate_query_cache(
mem_free(buf);
}
#ifdef WITH_WSREP
ulint wsrep_append_foreign_key(trx_t *trx,
dberr_t wsrep_append_foreign_key(trx_t *trx,
dict_foreign_t* foreign,
const rec_t* clust_rec,
dict_index_t* clust_index,
......@@ -1281,14 +1281,16 @@ row_ins_foreign_check_on_constraint(
foreign->foreign_table);
#ifdef WITH_WSREP
if (err == DB_SUCCESS) {
err = (dberr_t) wsrep_append_foreign_key(
thr_get_trx(thr),
foreign,
clust_rec,
clust_index,
FALSE, FALSE);
}
err = wsrep_append_foreign_key(
thr_get_trx(thr),
foreign,
clust_rec,
clust_index,
FALSE, FALSE);
if (err != DB_SUCCESS) {
fprintf(stderr,
"WSREP: foreign key append failed: %lu\n", err);
} else
#endif /* WITH_WSREP */
if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
fprintf(stderr,
......@@ -1620,7 +1622,7 @@ run_again:
if (check_ref) {
err = DB_SUCCESS;
#ifdef WITH_WSREP
err = (dberr_t)wsrep_append_foreign_key(
err = wsrep_append_foreign_key(
thr_get_trx(thr),
foreign,
rec,
......
......@@ -174,13 +174,47 @@ func_exit:
}
#ifdef WITH_WSREP
ulint wsrep_append_foreign_key(trx_t *trx,
dict_foreign_t* foreign,
const rec_t* clust_rec,
dict_index_t* clust_index,
ibool referenced,
ibool shared);
static
ibool
wsrep_row_upd_index_is_foreign(
/*========================*/
dict_index_t* index, /*!< in: index */
trx_t* trx) /*!< in: transaction */
{
dict_table_t* table = index->table;
dict_foreign_t* foreign;
ibool froze_data_dict = FALSE;
ibool is_referenced = FALSE;
if (!UT_LIST_GET_FIRST(table->foreign_list)) {
return(FALSE);
}
if (trx->dict_operation_lock_mode == 0) {
row_mysql_freeze_data_dictionary(trx);
froze_data_dict = TRUE;
}
foreign = UT_LIST_GET_FIRST(table->foreign_list);
while (foreign) {
if (foreign->foreign_index == index) {
is_referenced = TRUE;
goto func_exit;
}
foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
}
func_exit:
if (froze_data_dict) {
row_mysql_unfreeze_data_dictionary(trx);
}
return(is_referenced);
}
#endif /* WITH_WSREP */
/*********************************************************************//**
......@@ -311,7 +345,7 @@ func_exit:
}
#ifdef WITH_WSREP
static
ulint
dberr_t
wsrep_row_upd_check_foreign_constraints(
/*=================================*/
upd_node_t* node, /*!< in: row update node */
......@@ -329,7 +363,7 @@ wsrep_row_upd_check_foreign_constraints(
trx_t* trx;
const rec_t* rec;
ulint n_ext;
ulint err;
dberr_t err;
ibool got_s_lock = FALSE;
if (UT_LIST_GET_FIRST(table->foreign_list) == NULL) {
......@@ -1803,6 +1837,9 @@ row_upd_sec_index_entry(
index = node->index;
referenced = row_upd_index_is_referenced(index, trx);
#ifdef WITH_WSREP
ibool foreign = wsrep_row_upd_index_is_foreign(index, trx);
#endif /* WITH_WSREP */
heap = mem_heap_create(1024);
......@@ -1930,6 +1967,9 @@ row_upd_sec_index_entry(
row_ins_sec_index_entry() below */
if (!rec_get_deleted_flag(
rec, dict_table_is_comp(index->table))) {
#ifdef WITH_WSREP
que_node_t *parent = que_node_get_parent(node);
#endif /* WITH_WSREP */
err = btr_cur_del_mark_set_sec_rec(
0, btr_cur, TRUE, thr, &mtr);
......@@ -1948,12 +1988,17 @@ row_upd_sec_index_entry(
index, offsets, thr, &mtr);
}
#ifdef WITH_WSREP
if (err == DB_SUCCESS && !referenced) {
if (err == DB_SUCCESS && !referenced &&
!(parent && que_node_get_type(parent) ==
QUE_NODE_UPDATE &&
((upd_node_t*)parent)->cascade_node == node) &&
foreign
) {
ulint* offsets =
rec_get_offsets(
rec, index, NULL, ULINT_UNDEFINED,
&heap);
err = (dberr_t)wsrep_row_upd_check_foreign_constraints(
rec, index, NULL, ULINT_UNDEFINED,
&heap);
err = wsrep_row_upd_check_foreign_constraints(
node, &pcur, index->table,
index, offsets, thr, &mtr);
switch (err) {
......@@ -1962,14 +2007,13 @@ row_upd_sec_index_entry(
err = DB_SUCCESS;
break;
case DB_DEADLOCK:
if (wsrep_debug)
fprintf (stderr,
"WSREP: sec index FK check fail for deadlock");
if (wsrep_debug) fprintf (stderr,
"WSREP: sec index FK check fail for deadlock");
break;
default:
fprintf (stderr,
"WSREP: referenced FK check fail: %lu",
err);
"WSREP: referenced FK check fail: %d",
(int)err);
break;
}
}
......@@ -2130,6 +2174,9 @@ row_upd_clust_rec_by_insert(
que_thr_t* thr, /*!< in: query thread */
ibool referenced,/*!< in: TRUE if index may be referenced in
a foreign key constraint */
#ifdef WITH_WSREP
ibool foreign, /*!< in: TRUE if index is foreign key index */
#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in/out: mtr; gets committed here */
{
mem_heap_t* heap;
......@@ -2143,6 +2190,9 @@ row_upd_clust_rec_by_insert(
rec_t* rec;
ulint* offsets = NULL;
#ifdef WITH_WSREP
que_node_t *parent = que_node_get_parent(node);
#endif /* WITH_WSREP */
ut_ad(node);
ut_ad(dict_index_is_clust(index));
......@@ -2220,8 +2270,12 @@ err_exit:
}
}
#ifdef WITH_WSREP
if (!referenced) {
err = (dberr_t)wsrep_row_upd_check_foreign_constraints(
if (!referenced &&
!(parent && que_node_get_type(parent) == QUE_NODE_UPDATE &&
((upd_node_t*)parent)->cascade_node == node) &&
foreign
) {
err = wsrep_row_upd_check_foreign_constraints(
node, pcur, table, index, offsets, thr, mtr);
switch (err) {
case DB_SUCCESS:
......@@ -2234,8 +2288,8 @@ err_exit:
break;
default:
fprintf (stderr,
"WSREP: referenced FK check fail: %lu",
err);
"WSREP: referenced FK check fail: %d",
(int)err);
break;
}
if (err != DB_SUCCESS) {
......@@ -2470,6 +2524,9 @@ row_upd_del_mark_clust_rec(
ibool referenced,
/*!< in: TRUE if index may be referenced in
a foreign key constraint */
#ifdef WITH_WSREP
ibool foreign,/*!< in: TRUE if index is foreign key index */
#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in: mtr; gets committed here */
{
btr_pcur_t* pcur;
......@@ -2477,6 +2534,7 @@ row_upd_del_mark_clust_rec(
dberr_t err;
#ifdef WITH_WSREP
rec_t* rec;
que_node_t *parent = que_node_get_parent(node);
#endif /* WITH_WSREP */
ut_ad(node);
......@@ -2499,14 +2557,12 @@ row_upd_del_mark_clust_rec(
#endif /* WITH_WSREP */
err = btr_cur_del_mark_set_clust_rec(
btr_cur_get_block(btr_cur), btr_cur_get_rec(btr_cur),
#ifdef WITH_WSREP
index, offsets, thr, mtr);
btr_cur_get_block(btr_cur), rec,
#else
index, offsets, thr, mtr);
btr_cur_get_block(btr_cur), btr_cur_get_rec(btr_cur),
#endif /* WITH_WSREP */
index, offsets, thr, mtr);
if (err == DB_SUCCESS && referenced) {
/* NOTE that the following call loses the position of pcur ! */
......@@ -2514,8 +2570,13 @@ row_upd_del_mark_clust_rec(
node, pcur, index->table, index, offsets, thr, mtr);
}
#ifdef WITH_WSREP
if (err == DB_SUCCESS && !referenced) {
err = (dberr_t)wsrep_row_upd_check_foreign_constraints(
if (err == DB_SUCCESS && !referenced &&
!(parent && que_node_get_type(parent) == QUE_NODE_UPDATE &&
((upd_node_t*)parent)->cascade_node == node) &&
thr_get_trx(thr) &&
foreign
) {
err = wsrep_row_upd_check_foreign_constraints(
node, pcur, index->table, index, offsets, thr, mtr);
switch (err) {
case DB_SUCCESS:
......@@ -2528,8 +2589,8 @@ row_upd_del_mark_clust_rec(
break;
default:
fprintf (stderr,
"WSREP: clust rec referenced FK check fail: %lu",
err);
"WSREP: clust rec referenced FK check fail: %d",
(int)err);
break;
}
}
......@@ -2566,6 +2627,10 @@ row_upd_clust_step(
index = dict_table_get_first_index(node->table);
referenced = row_upd_index_is_referenced(index, thr_get_trx(thr));
#ifdef WITH_WSREP
ibool foreign = wsrep_row_upd_index_is_foreign(
index, thr_get_trx(thr));
#endif /* WITH_WSREP */
pcur = node->pcur;
......@@ -2656,7 +2721,11 @@ row_upd_clust_step(
if (node->is_delete) {
err = row_upd_del_mark_clust_rec(
#ifdef WITH_WSREP
node, index, offsets, thr, referenced, foreign, &mtr);
#else
node, index, offsets, thr, referenced, &mtr);
#endif /* WITH_WSREP */
if (err == DB_SUCCESS) {
node->state = UPD_NODE_UPDATE_ALL_SEC;
......@@ -2701,7 +2770,11 @@ row_upd_clust_step(
externally! */
err = row_upd_clust_rec_by_insert(
#ifdef WITH_WSREP
node, index, thr, referenced, foreign, &mtr);
#else
node, index, thr, referenced, &mtr);
#endif /* WITH_WSREP */
if (err != DB_SUCCESS) {
......
......@@ -212,6 +212,16 @@ srv_conc_enter_innodb_with_atomics(
for (;;) {
ulint sleep_in_us;
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) &&
wsrep_trx_is_aborting(trx->mysql_thd)) {
if (wsrep_debug)
fprintf(stderr,
"srv_conc_enter due to MUST_ABORT");
srv_conc_force_enter_innodb(trx);
return;
}
#endif /* WITH_WSREP */
if (srv_conc.n_active < (lint) srv_thread_concurrency) {
ulint n_active;
......@@ -253,21 +263,6 @@ srv_conc_enter_innodb_with_atomics(
(void) os_atomic_decrement_lint(
&srv_conc.n_active, 1);
}
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) &&
wsrep_thd_is_brute_force(trx->mysql_thd)) {
srv_conc_force_enter_innodb(trx);
return;
}
if (wsrep_on(trx->mysql_thd) &&
wsrep_trx_is_aborting(trx->mysql_thd)) {
if (wsrep_debug)
fprintf(stderr,
"srv_conc_enter due to MUST_ABORT");
srv_conc_force_enter_innodb(trx);
return;
}
#endif
if (!notified_mysql) {
(void) os_atomic_increment_lint(
......@@ -431,11 +426,6 @@ retry:
srv_conc_force_enter_innodb(trx);
return;
}
if (wsrep_on(trx->mysql_thd) &&
wsrep_trx_is_aborting(trx->mysql_thd)) {
srv_conc_force_enter_innodb(trx);
return;
}
#endif
/* If the transaction is not holding resources, let it sleep
......@@ -566,39 +556,6 @@ retry:
os_fast_mutex_unlock(&srv_conc_mutex);
}
#endif /* HAVE_ATOMIC_BUILTINS */
#ifdef WITH_WSREP
#ifdef HAVE_ATOMIC_BUILTINS
UNIV_INTERN
void
wsrep_srv_conc_cancel_wait(
/*==================*/
trx_t* trx) /*!< in: transaction object associated with the
thread */
{
if (trx->wsrep_event) {
if (wsrep_debug)
fprintf(stderr, "WSREP: conc slot cancel\n");
os_event_set(trx->wsrep_event);
}
}
#else
UNIV_INTERN
void
wsrep_srv_conc_cancel_wait(
/*==================*/
trx_t* trx) /*!< in: transaction object associated with the
thread */
{
os_fast_mutex_lock(&srv_conc_mutex);
if (trx->wsrep_event) {
if (wsrep_debug)
fprintf(stderr, "WSREP: conc slot cancel\n");
os_event_set(trx->wsrep_event);
}
os_fast_mutex_unlock(&srv_conc_mutex);
}
#endif /* HAVE_ATOMIC_BUILTINS */
#endif /* WITH_WSREP */
/*********************************************************************//**
Puts an OS thread to wait if there are too many concurrent threads
......@@ -700,5 +657,32 @@ srv_conc_get_active_threads(void)
/*==============================*/
{
return(srv_conc.n_active);
}
}
#ifdef WITH_WSREP
UNIV_INTERN
void
wsrep_srv_conc_cancel_wait(
/*==================*/
trx_t* trx) /*!< in: transaction object associated with the
thread */
{
#ifdef HAVE_ATOMIC_BUILTINS
/* aborting transactions will enter innodb by force in
srv_conc_enter_innodb_with_atomics(). No need to cancel here,
thr will wake up after os_sleep and let to enter innodb
*/
if (wsrep_debug)
fprintf(stderr, "WSREP: conc slot cancel, no atomics\n");
#else
os_fast_mutex_lock(&srv_conc_mutex);
if (trx->wsrep_event) {
if (wsrep_debug)
fprintf(stderr, "WSREP: conc slot cancel\n");
os_event_set(trx->wsrep_event);
}
os_fast_mutex_unlock(&srv_conc_mutex);
#endif
}
#endif /* WITH_WSREP */
......@@ -43,9 +43,6 @@ Created 3/26/1996 Heikki Tuuri
#include "row0mysql.h"
#include "lock0lock.h"
#include "pars0pars.h"
#ifdef WITH_WSREP
#include "ha_prototypes.h"
#endif /* WITH_WSREP */
#include "srv0mon.h"
#include "trx0sys.h"
......@@ -385,12 +382,6 @@ trx_rollback_to_savepoint_for_mysql_low(
trx->op_info = "";
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) &&
trx->lock.was_chosen_as_deadlock_victim) {
trx->lock.was_chosen_as_deadlock_victim = FALSE;
}
#endif
return(err);
}
......@@ -1027,12 +1018,6 @@ trx_roll_try_truncate(
if (trx->update_undo) {
trx_undo_truncate_end(trx, trx->update_undo, limit);
}
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) &&
trx->lock.was_chosen_as_deadlock_victim) {
trx->lock.was_chosen_as_deadlock_victim = FALSE;
}
#endif
}
/***********************************************************************//**
......@@ -1338,12 +1323,6 @@ trx_rollback_finish(
trx_commit(trx);
trx->lock.que_state = TRX_QUE_RUNNING;
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) &&
trx->lock.was_chosen_as_deadlock_victim) {
trx->lock.was_chosen_as_deadlock_victim = FALSE;
}
#endif
}
/*********************************************************************//**
......
......@@ -206,9 +206,14 @@ trx_sys_update_mysql_binlog_offset(
ib_int64_t offset, /*!< in: position in that log file */
ulint field, /*!< in: offset of the MySQL log info field in
the trx sys header */
#ifdef WITH_WSREP
trx_sysf_t* sys_header, /*!< in: trx sys header */
#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in: mtr */
{
#ifndef WITH_WSREP
trx_sysf_t* sys_header;
#endif /* !WITH_WSREP */
if (ut_strlen(file_name) >= TRX_SYS_MYSQL_LOG_NAME_LEN) {
......@@ -217,7 +222,9 @@ trx_sys_update_mysql_binlog_offset(
return;
}
#ifndef WITH_WSREP
sys_header = trx_sysf_get(mtr);
#endif /* !WITH_WSREP */
if (mach_read_from_4(sys_header + field
+ TRX_SYS_MYSQL_LOG_MAGIC_N_FLD)
......@@ -306,12 +313,48 @@ trx_sys_print_mysql_binlog_offset(void)
#ifdef WITH_WSREP
#ifdef UNIV_DEBUG
static long long trx_sys_cur_xid_seqno = -1;
static unsigned char trx_sys_cur_xid_uuid[16];
long long read_wsrep_xid_seqno(const XID* xid)
{
long long seqno;
memcpy(&seqno, xid->data + 24, sizeof(long long));
return seqno;
}
void read_wsrep_xid_uuid(const XID* xid, unsigned char* buf)
{
memcpy(buf, xid->data + 8, 16);
}
#endif /* UNIV_DEBUG */
void
trx_sys_update_wsrep_checkpoint(
const XID* xid, /*!< in: transaction XID */
mtr_t* mtr) /*!< in: mtr */
const XID* xid, /*!< in: transaction XID */
trx_sysf_t* sys_header, /*!< in: sys_header */
mtr_t* mtr) /*!< in: mtr */
{
trx_sysf_t* sys_header;
#ifdef UNIV_DEBUG
{
/* Check that seqno is monotonically increasing */
unsigned char xid_uuid[16];
long long xid_seqno = read_wsrep_xid_seqno(xid);
read_wsrep_xid_uuid(xid, xid_uuid);
if (!memcmp(xid_uuid, trx_sys_cur_xid_uuid, 8))
{
ut_ad(xid_seqno > trx_sys_cur_xid_seqno);
trx_sys_cur_xid_seqno = xid_seqno;
}
else
{
memcpy(trx_sys_cur_xid_uuid, xid_uuid, 16);
}
trx_sys_cur_xid_seqno = xid_seqno;
}
#endif /* UNIV_DEBUG */
ut_ad(xid && mtr);
ut_a(xid->formatID == -1 || wsrep_is_wsrep_xid(xid));
......@@ -364,7 +407,7 @@ trx_sys_read_wsrep_checkpoint(XID* xid)
!= TRX_SYS_WSREP_XID_MAGIC_N) {
memset(xid, 0, sizeof(*xid));
xid->formatID = -1;
trx_sys_update_wsrep_checkpoint(xid, &mtr);
trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
mtr_commit(&mtr);
return;
}
......
......@@ -893,12 +893,12 @@ trx_write_serialisation_history(
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
#ifdef WITH_WSREP
sys_header = trx_sysf_get(&mtr);
/* Update latest MySQL wsrep XID in trx sys header. */
if (wsrep_is_wsrep_xid(&trx->xid))
{
trx_sys_update_wsrep_checkpoint(&trx->xid, &mtr);
}
sys_header = trx_sysf_get(&mtr);
/* Update latest MySQL wsrep XID in trx sys header. */
if (wsrep_is_wsrep_xid(&trx->xid))
{
trx_sys_update_wsrep_checkpoint(&trx->xid, sys_header, &mtr);
}
#endif /* WITH_WSREP */
/* Update the latest MySQL binlog name and offset info
......@@ -1258,14 +1258,13 @@ trx_commit(
ut_ad(!trx->in_ro_trx_list);
ut_ad(!trx->in_rw_trx_list);
trx->dict_operation = TRX_DICT_OP_NONE;
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) &&
trx->lock.was_chosen_as_deadlock_victim) {
if (wsrep_on(trx->mysql_thd)) {
trx->lock.was_chosen_as_deadlock_victim = FALSE;
}
#endif
trx->dict_operation = TRX_DICT_OP_NONE;
trx->error_state = DB_SUCCESS;
/* trx->in_mysql_trx_list would hold between
......@@ -1358,6 +1357,10 @@ trx_commit_or_rollback_prepare(
switch (trx->state) {
case TRX_STATE_NOT_STARTED:
#ifdef WITH_WSREP
ut_d(trx->start_file = __FILE__);
ut_d(trx->start_line = __LINE__);
#endif /* WITH_WSREP */
trx_start_low(trx);
/* fall through */
case TRX_STATE_ACTIVE:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment