Commit 53cc9aa5 authored by Sergey Vojtovich's avatar Sergey Vojtovich

MDEV-15104 - Remove trx_sys_t::rw_trx_ids

Take snapshot of registered read-write transaction identifiers directly
from rw_trx_hash. It immediately saves one trx_sys.mutex lock, reduces
size of another critical section protected by this mutex, and makes
further optimisations like removing trx_sys_t::serialisation_list
possible.

Downside of this approach is bigger overhead for view opening, because
iterating LF_HASH is more expensive compared to taking snapshot of an
array. However for low concurrency overhead difference is negligible,
while for high concurrency mutex is much bigger evil.

Currently we still take trx_sys.mutex to serialise ReadView creation.
This is required to keep serialisation_list ordered by trx->no as well
as not to let purge thread to create more recent snapshot while another
thread gets suspended during creation of older snapshot. This will
become completely mutex free along with serialisation_list removal.

Compared to previous implementation removing element from rw_trx_hash
and serialisation_list is not atomic. We disregard all possible bad
consequences (if there're any) since it will be solved along with
serialisation_list removal.
parent af566d8a
......@@ -3636,11 +3636,7 @@ static ulonglong innodb_prepare_commit_versioned(THD* thd, ulonglong *trx_id)
DBUG_ASSERT(t->first->versioned_by_id());
DBUG_ASSERT(trx->rsegs.m_redo.rseg);
mutex_enter(&trx_sys.mutex);
trx_id_t commit_id = trx_sys.get_new_trx_id();
mutex_exit(&trx_sys.mutex);
return commit_id;
return trx_sys.get_new_trx_id();
}
}
......@@ -19742,9 +19738,7 @@ wsrep_fake_trx_id(
handlerton *hton,
THD *thd) /*!< in: user thread handle */
{
mutex_enter(&trx_sys.mutex);
trx_id_t trx_id = trx_sys.get_new_trx_id();
mutex_exit(&trx_sys.mutex);
WSREP_DEBUG("innodb fake trx id: " TRX_ID_FMT " thd: %s",
trx_id, wsrep_thd_query(thd));
wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), trx_id);
......
......@@ -165,8 +165,10 @@ class ReadView {
private:
/**
Opens a read view where exactly the transactions serialized before this
point in time are seen in the view. */
inline void clone();
point in time are seen in the view.
@param[in,out] trx transaction */
void open(trx_t *trx);
/**
Copy state from another view.
......
......@@ -804,6 +804,10 @@ struct trx_sys_t {
MY_ALIGNED(CACHE_LINE_SIZE) trx_id_t m_max_trx_id;
/** Solves race condition between register_rw() and snapshot_ids(). */
MY_ALIGNED(CACHE_LINE_SIZE) trx_id_t m_rw_trx_hash_version;
bool m_initialised;
public:
......@@ -830,16 +834,6 @@ struct trx_sys_t {
transactions that have not yet been
started in InnoDB. */
MY_ALIGNED(CACHE_LINE_SIZE)
trx_ids_t rw_trx_ids; /*!< Array of Read write transaction IDs
for MVCC snapshot. A ReadView would take
a snapshot of these transactions whose
changes are not visible to it. We should
remove transactions from the list before
committing in memory and releasing locks
to ensure right order of removal and
consistent snapshot. */
MY_ALIGNED(CACHE_LINE_SIZE)
/** Temporary rollback segments */
trx_rseg_t* temp_rsegs[TRX_SYS_N_RSEGS];
......@@ -870,13 +864,11 @@ struct trx_sys_t {
/**
Constructor.
We only initialise rw_trx_ids here as it is impossible to postpone it's
initialisation to create().
Some members may require late initialisation, thus we just mark object as
uninitialised. Real initialisation happens in create().
*/
trx_sys_t(): m_initialised(false),
rw_trx_ids(ut_allocator<trx_id_t>(mem_key_trx_sys_t_rw_trx_ids))
{}
trx_sys_t(): m_initialised(false) {}
/**
......@@ -920,15 +912,54 @@ struct trx_sys_t {
trx_id_t get_new_trx_id()
{
ut_ad(mutex_own(&mutex));
return static_cast<trx_id_t>(my_atomic_add64_explicit(
reinterpret_cast<int64*>(&m_max_trx_id), 1, MY_MEMORY_ORDER_RELAXED));
trx_id_t id= get_new_trx_id_no_refresh();
refresh_rw_trx_hash_version();
return id;
}
/**
Takes MVCC snapshot.
To reduce malloc probablility we reserver rw_trx_hash.size() + 32 elements
in ids.
For details about get_rw_trx_hash_version() != get_max_trx_id() spin
@sa register_rw().
We rely on get_rw_trx_hash_version() to issue ACQUIRE memory barrier so
that loading of m_rw_trx_hash_version happens before accessing rw_trx_hash.
To optimise snapshot creation rw_trx_hash.iterate() is being used instead
of rw_trx_hash.iterate_no_dups(). It means that some transaction
identifiers may appear multiple times in ids.
@param[in,out] caller_trx used to get access to rw_trx_hash_pins
@param[out] ids array to store registered transaction identifiers
@param[out] max_trx_id variable to store m_max_trx_id value
*/
void snapshot_ids(trx_t *caller_trx, trx_ids_t *ids, trx_id_t *max_trx_id)
{
snapshot_ids_arg arg(ids);
while ((arg.m_id= get_rw_trx_hash_version()) != get_max_trx_id())
ut_delay(1);
ids->clear();
ids->reserve(rw_trx_hash.size() + 32);
*max_trx_id= arg.m_id;
rw_trx_hash.iterate(caller_trx,
reinterpret_cast<my_hash_walk_action>(copy_one_id),
&arg);
std::sort(ids->begin(), ids->end());
}
/** Initialiser for m_max_trx_id and m_rw_trx_hash_version. */
void init_max_trx_id(trx_id_t value)
{
m_max_trx_id= value;
m_max_trx_id= m_rw_trx_hash_version= value;
}
......@@ -945,14 +976,44 @@ struct trx_sys_t {
ulint any_active_transactions();
/** Registers read-write transaction. */
/**
Registers read-write transaction.
Transaction becomes visible to MVCC.
There's a gap between m_max_trx_id increment and transaction becoming
visible through rw_trx_hash. While we're in this gap concurrent thread may
come and do MVCC snapshot. As a result concurrent read view will be able to
observe records owned by this transaction even before it was committed.
m_rw_trx_hash_version is intended to solve this problem. MVCC snapshot has
to wait until m_max_trx_id == m_rw_trx_hash_version, which effectively
means that all transactions up to m_max_trx_id are available through
rw_trx_hash.
We rely on refresh_rw_trx_hash_version() to issue RELEASE memory barrier so
that m_rw_trx_hash_version increment happens after transaction becomes
visible through rw_trx_hash.
*/
void register_rw(trx_t *trx)
{
mutex_enter(&mutex);
trx->id= get_new_trx_id();
rw_trx_ids.push_back(trx->id);
mutex_exit(&mutex);
trx->id= get_new_trx_id_no_refresh();
rw_trx_hash.insert(trx);
refresh_rw_trx_hash_version();
}
/**
Deregisters read-write transaction.
Transaction is removed from rw_trx_hash, which releases all implicit locks.
MVCC snapshot won't see this transaction anymore.
*/
void deregister_rw(trx_t *trx)
{
rw_trx_hash.erase(trx);
}
......@@ -982,6 +1043,60 @@ struct trx_sys_t {
}
return 0;
}
struct snapshot_ids_arg
{
snapshot_ids_arg(trx_ids_t *ids): m_ids(ids) {}
trx_ids_t *m_ids;
trx_id_t m_id;
};
static my_bool copy_one_id(rw_trx_hash_element_t *element,
snapshot_ids_arg *arg)
{
if (element->id < arg->m_id)
arg->m_ids->push_back(element->id);
return 0;
}
/** Getter for m_rw_trx_hash_version, must issue ACQUIRE memory barrier. */
trx_id_t get_rw_trx_hash_version()
{
return static_cast<trx_id_t>
(my_atomic_load64_explicit(reinterpret_cast<int64*>
(&m_rw_trx_hash_version),
MY_MEMORY_ORDER_ACQUIRE));
}
/** Increments m_rw_trx_hash_version, must issue RELEASE memory barrier. */
void refresh_rw_trx_hash_version()
{
my_atomic_add64_explicit(reinterpret_cast<int64*>(&m_rw_trx_hash_version),
1, MY_MEMORY_ORDER_RELEASE);
}
/**
Allocates new transaction id without refreshing rw_trx_hash version.
This method is extracted for exclusive use by register_rw() where
transaction must be inserted into rw_trx_hash between new transaction id
allocation and rw_trx_hash version refresh.
@sa get_new_trx_id()
@return new transaction id
*/
trx_id_t get_new_trx_id_no_refresh()
{
return static_cast<trx_id_t>(my_atomic_add64_explicit(
reinterpret_cast<int64*>(&m_max_trx_id), 1, MY_MEMORY_ORDER_RELAXED));
}
};
......
......@@ -172,7 +172,6 @@ extern PSI_memory_key mem_key_other;
extern PSI_memory_key mem_key_row_log_buf;
extern PSI_memory_key mem_key_row_merge_sort;
extern PSI_memory_key mem_key_std;
extern PSI_memory_key mem_key_trx_sys_t_rw_trx_ids;
extern PSI_memory_key mem_key_partitioning;
/** Setup the internal objects needed for UT_NEW() to operate.
......
......@@ -207,49 +207,25 @@ MVCC::validate() const
}
#endif /* UNIV_DEBUG */
/**
Opens a read view where exactly the transactions serialized before this
point in time are seen in the view. */
void ReadView::clone()
{
ut_ad(mutex_own(&trx_sys.mutex));
m_low_limit_no = m_low_limit_id = trx_sys.get_max_trx_id();
m_ids= trx_sys.rw_trx_ids;
#ifdef UNIV_DEBUG
/* Original assertion was here to make sure that rw_trx_ids and
rw_trx_hash are in sync and they hold either ACTIVE or PREPARED
transaction.
Now rw_trx_hash_t::find() does
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) ||
trx_state_eq(trx, TRX_STATE_PREPARED)).
No need to repeat it here. We even can't repeat it here: it'll be race
condition because we need trx->element->mutex locked to perform this
check (see how it is done in find()).
Now rw_trx_ids and rw_trx_hash may get out of sync for a short while:
when transaction is registered it first gets added into rw_trx_ids
under trx_sys.mutex protection and then to rw_trx_hash without mutex
protection. Thus we need repeat this lookup. */
for (trx_ids_t::const_iterator it = trx_sys.rw_trx_ids.begin();
it != trx_sys.rw_trx_ids.end(); ++it) {
while (!trx_sys.is_registered(current_trx(), *it));
}
#endif /* UNIV_DEBUG */
m_up_limit_id = m_ids.empty() ? m_low_limit_id : m_ids.front();
ut_ad(m_up_limit_id <= m_low_limit_id);
if (UT_LIST_GET_LEN(trx_sys.serialisation_list) > 0) {
const trx_t* trx;
/**
Opens a read view where exactly the transactions serialized before this
point in time are seen in the view.
trx = UT_LIST_GET_FIRST(trx_sys.serialisation_list);
@param[in,out] trx transaction
*/
if (trx->no < m_low_limit_no) {
m_low_limit_no = trx->no;
}
}
void ReadView::open(trx_t *trx)
{
ut_ad(mutex_own(&trx_sys.mutex));
trx_sys.snapshot_ids(trx, &m_ids, &m_low_limit_id);
m_low_limit_no= m_low_limit_id;
m_up_limit_id= m_ids.empty() ? m_low_limit_id : m_ids.front();
ut_ad(m_up_limit_id <= m_low_limit_id);
if (const trx_t *trx= UT_LIST_GET_FIRST(trx_sys.serialisation_list))
if (trx->no < m_low_limit_no)
m_low_limit_no= trx->no;
}
......@@ -324,7 +300,7 @@ void MVCC::view_open(trx_t* trx)
}
mutex_enter(&trx_sys.mutex);
trx->read_view.clone();
trx->read_view.open(trx);
if (trx->read_view.is_registered())
UT_LIST_REMOVE(m_views, &trx->read_view);
else
......@@ -375,7 +351,7 @@ MVCC::clone_oldest_view(ReadView* view)
{
mutex_enter(&trx_sys.mutex);
/* Find oldest view. */
for (ReadView *oldest_view = UT_LIST_GET_LAST(m_views);
for (const ReadView *oldest_view = UT_LIST_GET_LAST(m_views);
oldest_view != NULL;
oldest_view = UT_LIST_GET_PREV(m_view_list, oldest_view))
{
......@@ -387,7 +363,7 @@ MVCC::clone_oldest_view(ReadView* view)
}
}
/* No views in the list: snapshot current state. */
view->clone();
view->open(0);
mutex_exit(&trx_sys.mutex);
}
......
......@@ -859,7 +859,7 @@ void trx_rollback_recovered(bool all)
ut_ad(!srv_undo_sources);
ut_ad(srv_fast_shutdown);
discard:
trx_sys.rw_trx_hash.erase(trx);
trx_sys.deregister_rw(trx);
trx_free_at_shutdown(trx);
}
else
......
......@@ -878,7 +878,6 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg,
trx_sys.rw_trx_hash.insert(trx);
trx_sys.rw_trx_hash.put_pins(trx);
trx_sys.rw_trx_ids.push_back(trx->id);
trx_resurrect_table_locks(trx, undo);
if (trx_state_eq(trx, TRX_STATE_ACTIVE))
*rows_to_undo+= trx->undo_no;
......@@ -973,8 +972,6 @@ trx_lists_init_at_db_start()
ib::info() << "Trx id counter is " << trx_sys.get_max_trx_id();
}
std::sort(trx_sys.rw_trx_ids.begin(), trx_sys.rw_trx_ids.end());
trx_sys.mvcc.clone_oldest_view(&purge_sys->view);
}
......@@ -1519,9 +1516,7 @@ trx_update_mod_tables_timestamp(
/**
Erase the transaction from running transaction lists and serialization
list. Active RW transaction list of a MVCC snapshot(ReadView::prepare)
won't include this transaction after this call. All implicit locks are
also released by this call as trx is removed from rw_trx_hash.
list.
@param[in] trx Transaction to erase, must have an ID > 0
@param[in] serialised true if serialisation log was written */
static
......@@ -1538,15 +1533,8 @@ trx_erase_lists(
} else {
mutex_enter(&trx_sys.mutex);
}
trx_ids_t::iterator it = std::lower_bound(
trx_sys.rw_trx_ids.begin(),
trx_sys.rw_trx_ids.end(),
trx->id);
ut_ad(*it == trx->id);
trx_sys.rw_trx_ids.erase(it);
mutex_exit(&trx_sys.mutex);
trx_sys.rw_trx_hash.erase(trx);
trx_sys.deregister_rw(trx);
}
/****************************************************************//**
......
......@@ -43,7 +43,6 @@ PSI_memory_key mem_key_other;
PSI_memory_key mem_key_row_log_buf;
PSI_memory_key mem_key_row_merge_sort;
PSI_memory_key mem_key_std;
PSI_memory_key mem_key_trx_sys_t_rw_trx_ids;
PSI_memory_key mem_key_partitioning;
#ifdef UNIV_PFS_MEMORY
......@@ -72,7 +71,6 @@ static PSI_memory_info pfs_info[] = {
{&mem_key_row_log_buf, "row_log_buf", 0},
{&mem_key_row_merge_sort, "row_merge_sort", 0},
{&mem_key_std, "std", 0},
{&mem_key_trx_sys_t_rw_trx_ids, "trx_sys_t::rw_trx_ids", 0},
{&mem_key_partitioning, "partitioning", 0},
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment