Commit 487fbc2e authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-21452 fixup: Introduce trx_t::mutex_is_owner()

When we replaced trx_t::mutex with srw_mutex
in commit 38fd7b7d
we lost the SAFE_MUTEX instrumentation.
Let us introduce a replacement and restore the assertions.
parent 455514c8
......@@ -4478,10 +4478,10 @@ static void innobase_kill_query(handlerton*, THD *thd, enum thd_kill_levels)
mysql_mutex_lock(&lock_sys.wait_mutex);
if (lock_t *lock= trx->lock.wait_lock)
{
trx->mutex.wr_lock();
trx->mutex_lock();
trx->error_state= DB_INTERRUPTED;
lock_cancel_waiting_and_release(lock);
trx->mutex.wr_unlock();
trx->mutex_unlock();
}
lock_sys.mutex_unlock();
mysql_mutex_unlock(&lock_sys.wait_mutex);
......@@ -18007,6 +18007,7 @@ int wsrep_innobase_kill_one_trx(THD *bf_thd, trx_t *victim_trx, bool signal)
ut_ad(bf_thd);
ut_ad(victim_trx);
lock_sys.mutex_assert_locked();
ut_ad(victim_trx->mutex_is_owner());
DBUG_ENTER("wsrep_innobase_kill_one_trx");
......@@ -18110,11 +18111,11 @@ wsrep_abort_transaction(
if (victim_trx) {
lock_sys.mutex_lock();
victim_trx->mutex.wr_lock();
victim_trx->mutex_lock();
int rcode= wsrep_innobase_kill_one_trx(bf_thd,
victim_trx, signal);
lock_sys.mutex_unlock();
victim_trx->mutex.wr_unlock();
victim_trx->mutex_unlock();
DBUG_RETURN(rcode);
} else {
wsrep_thd_bf_abort(bf_thd, victim_thd, signal);
......
......@@ -517,12 +517,12 @@ class rw_trx_hash_t
ut_ad(!trx->read_only || !trx->rsegs.m_redo.rseg);
ut_ad(!trx_is_autocommit_non_locking(trx));
/* trx->state can be anything except TRX_STATE_NOT_STARTED */
ut_d(trx->mutex.wr_lock());
ut_d(trx->mutex_lock());
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) ||
trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) ||
trx_state_eq(trx, TRX_STATE_PREPARED_RECOVERED) ||
trx_state_eq(trx, TRX_STATE_PREPARED));
ut_d(trx->mutex.wr_unlock());
ut_d(trx->mutex_unlock());
}
......
......@@ -667,9 +667,41 @@ struct trx_t : ilist_node<> {
trx_sys_t::deregister_rw(), release_locks(). */
trx_id_t id;
private:
/** mutex protecting state and some of lock
(some are protected by lock_sys.mutex) */
srw_mutex mutex;
#ifdef UNIV_DEBUG
/** The owner of mutex (0 if none); protected by mutex */
std::atomic<os_thread_id_t> mutex_owner{0};
#endif /* UNIV_DEBUG */
public:
void mutex_init() { mutex.init(); }
void mutex_destroy() { mutex.destroy(); }
/** Acquire the mutex */
void mutex_lock()
{
ut_ad(!mutex_is_owner());
mutex.wr_lock();
ut_ad(!mutex_owner.exchange(os_thread_get_curr_id(),
std::memory_order_relaxed));
}
/** Release the mutex */
void mutex_unlock()
{
ut_ad(mutex_owner.exchange(0, std::memory_order_relaxed)
== os_thread_get_curr_id());
mutex.wr_unlock();
}
#ifdef UNIV_DEBUG
/** @return whether the current thread holds the mutex */
bool mutex_is_owner() const
{
return mutex_owner.load(std::memory_order_relaxed) ==
os_thread_get_curr_id();
}
#endif /* UNIV_DEBUG */
/** State of the trx from the point of view of concurrency control
and the valid state transitions.
......@@ -1027,6 +1059,7 @@ struct trx_t : ilist_node<> {
{
ut_ad(state == TRX_STATE_NOT_STARTED);
ut_ad(!id);
ut_ad(!mutex_is_owner());
ut_ad(!has_logged());
ut_ad(!is_referenced());
ut_ad(!is_wsrep());
......
This diff is collapsed.
......@@ -436,6 +436,7 @@ lock_prdt_add_to_queue(
{
const page_id_t id{block->page.id()};
lock_sys.mutex_assert_locked();
ut_ad(caller_owns_trx_mutex == trx->mutex_is_owner());
ut_ad(index->is_spatial());
ut_ad(!dict_index_is_online_ddl(index));
ut_ad(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE));
......@@ -555,7 +556,7 @@ lock_prdt_insert_check_and_lock(
lock_init_prdt_from_mbr(prdt, mbr, 0, trx->lock.lock_heap);
/* Note that we may get DB_SUCCESS also here! */
trx->mutex.wr_lock();
trx->mutex_lock();
err = lock_rec_enqueue_waiting(
#ifdef WITH_WSREP
......@@ -564,7 +565,7 @@ lock_prdt_insert_check_and_lock(
LOCK_X | LOCK_PREDICATE | LOCK_INSERT_INTENTION,
block, PRDT_HEAPNO, index, thr, prdt);
trx->mutex.wr_unlock();
trx->mutex_unlock();
} else {
err = DB_SUCCESS;
}
......@@ -789,7 +790,7 @@ lock_prdt_lock(
|| ((type_mode & LOCK_PREDICATE)
&& (!lock_prdt_consistent(
lock_get_prdt_from_lock(lock), prdt, 0)))) {
trx->mutex.wr_lock();
trx->mutex_lock();
lock = lock_prdt_has_lock(
mode, type_mode, block, prdt, trx);
......@@ -821,7 +822,7 @@ lock_prdt_lock(
}
}
trx->mutex.wr_unlock();
trx->mutex_unlock();
} else {
if (!lock_rec_get_nth_bit(lock, PRDT_HEAPNO)) {
lock_rec_set_nth_bit(lock, PRDT_HEAPNO);
......
......@@ -177,7 +177,7 @@ que_fork_scheduler_round_robin(
que_fork_t* fork, /*!< in: a query fork */
que_thr_t* thr) /*!< in: current pos */
{
fork->trx->mutex.wr_lock();
fork->trx->mutex_lock();
/* If no current, start first available. */
if (thr == NULL) {
......@@ -195,7 +195,7 @@ que_fork_scheduler_round_robin(
que_thr_init_command(thr);
}
fork->trx->mutex.wr_unlock();
fork->trx->mutex_unlock();
return(thr);
}
......@@ -466,14 +466,14 @@ que_thr_node_step(
}
trx_t *trx= thr->graph->trx;
trx->mutex.wr_lock();
trx->mutex_lock();
if (!trx->lock.wait_thr && thr->graph->state == QUE_FORK_ACTIVE) {
thr->state = QUE_THR_COMPLETED;
thr = NULL;
}
trx->mutex.wr_unlock();
trx->mutex_unlock();
return(thr);
}
......@@ -688,18 +688,14 @@ que_run_threads_low(
/*================*/
que_thr_t* thr) /*!< in: query thread */
{
trx_t* trx;
que_thr_t* next_thr;
ut_ad(thr->state == QUE_THR_RUNNING);
ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
/* cumul_resource counts how much resources the OS thread (NOT the
query thread) has spent in this function */
trx = thr_get_trx(thr);
do {
for (trx_t* trx = thr_get_trx(thr);;) {
ut_ad(!trx->mutex_is_owner());
ut_a(trx->error_state == DB_SUCCESS);
/* Check that there is enough space in the log to accommodate
possible log entries by this query step; if the operation can
touch more than about 4 pages, checks must be made also within
......@@ -710,17 +706,14 @@ que_run_threads_low(
/* Perform the actual query step: note that the query thread
may change if, e.g., a subprocedure call is made */
/*-------------------------*/
next_thr = que_thr_step(thr);
/*-------------------------*/
if (next_thr) {
ut_a(trx->error_state == DB_SUCCESS);
ut_a(next_thr == thr);
que_thr_t* next_thr = que_thr_step(thr);
ut_ad(trx == thr_get_trx(thr));
if (!next_thr) {
return;
}
ut_ad(trx == thr_get_trx(thr));
} while (next_thr != NULL);
ut_a(next_thr == thr);
}
}
/**********************************************************************//**
......
......@@ -193,10 +193,10 @@ row_vers_impl_x_locked_low(
heap, &prev_version, NULL,
dict_index_has_virtual(index) ? &vrow : NULL, 0);
ut_d(trx->mutex.wr_lock());
ut_d(trx->mutex_lock());
const bool committed = trx_state_eq(
trx, TRX_STATE_COMMITTED_IN_MEMORY);
ut_d(trx->mutex.wr_unlock());
ut_d(trx->mutex_unlock());
/* The oldest visible clustered index version must not be
delete-marked, because we never start a transaction by
......
......@@ -155,6 +155,7 @@ inline void trx_t::rollback_low(trx_savept_t *savept)
@return error code or DB_SUCCESS */
dberr_t trx_t::rollback(trx_savept_t *savept)
{
ut_ad(!mutex_is_owner());
if (state == TRX_STATE_NOT_STARTED)
{
error_state= DB_SUCCESS;
......@@ -724,10 +725,10 @@ static my_bool trx_rollback_recovered_callback(rw_trx_hash_element_t *element,
mysql_mutex_lock(&element->mutex);
if (trx_t *trx= element->trx)
{
trx->mutex.wr_lock();
trx->mutex_lock();
if (trx_state_eq(trx, TRX_STATE_ACTIVE) && trx->is_recovered)
trx_list->push_back(trx);
trx->mutex.wr_unlock();
trx->mutex_unlock();
}
mysql_mutex_unlock(&element->mutex);
return 0;
......@@ -769,10 +770,10 @@ void trx_rollback_recovered(bool all)
trx_list.pop_back();
ut_ad(trx);
ut_d(trx->mutex.wr_lock());
ut_d(trx->mutex_lock());
ut_ad(trx->is_recovered);
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
ut_d(trx->mutex.wr_unlock());
ut_d(trx->mutex_unlock());
if (srv_shutdown_state != SRV_SHUTDOWN_NONE && !srv_undo_sources &&
srv_fast_shutdown)
......@@ -866,6 +867,7 @@ trx_roll_graph_build(
que_fork_t* fork;
que_thr_t* thr;
ut_ad(trx->mutex_is_owner());
heap = mem_heap_create(512);
fork = que_fork_create(heap);
fork->trx = trx;
......@@ -892,6 +894,7 @@ trx_rollback_start(
{
/* Initialize the rollback field in the transaction */
ut_ad(trx->mutex_is_owner());
ut_ad(!trx->roll_limit);
ut_ad(!trx->in_rollback);
......@@ -960,13 +963,13 @@ trx_rollback_step(
roll_limit = node->savept ? node->savept->least_undo_no : 0;
trx->mutex.wr_lock();
trx->mutex_lock();
trx_commit_or_rollback_prepare(trx);
node->undo_thr = trx_rollback_start(trx, roll_limit);
trx->mutex.wr_unlock();
trx->mutex_unlock();
} else {
ut_ad(node->state == ROLL_NODE_WAIT);
......
......@@ -195,7 +195,7 @@ struct TrxFactory {
trx->trx_savepoints,
&trx_named_savept_t::trx_savepoints);
trx->mutex.init();
trx->mutex_init();
}
/** Release resources held by the transaction object.
......@@ -234,7 +234,7 @@ struct TrxFactory {
UT_DELETE(trx->xid);
ut_free(trx->detailed_error);
trx->mutex.destroy();
trx->mutex_destroy();
trx->mod_tables.~trx_mod_tables_t();
......@@ -1946,9 +1946,9 @@ trx_prepare(
DBUG_EXECUTE_IF("ib_trx_crash_during_xa_prepare_step", DBUG_SUICIDE(););
ut_a(trx->state == TRX_STATE_ACTIVE);
trx->mutex.wr_lock();
trx->mutex_lock();
trx->state = TRX_STATE_PREPARED;
trx->mutex.wr_unlock();
trx->mutex_unlock();
if (lsn) {
/* Depending on the my.cnf options, we may now write the log
......@@ -2090,7 +2090,7 @@ static my_bool trx_get_trx_by_xid_callback(rw_trx_hash_element_t *element,
mysql_mutex_lock(&element->mutex);
if (trx_t *trx= element->trx)
{
trx->mutex.wr_lock();
trx->mutex_lock();
if (trx->is_recovered &&
(trx_state_eq(trx, TRX_STATE_PREPARED) ||
trx_state_eq(trx, TRX_STATE_PREPARED_RECOVERED)) &&
......@@ -2107,7 +2107,7 @@ static my_bool trx_get_trx_by_xid_callback(rw_trx_hash_element_t *element,
arg->trx= trx;
found= 1;
}
trx->mutex.wr_unlock();
trx->mutex_unlock();
}
mysql_mutex_unlock(&element->mutex);
return found;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment