Commit b782971c authored by Sergey Vojtovich's avatar Sergey Vojtovich

MDEV-15246 - premature history data deletion

This is regression after bc7a1dc1 of
MDEV-15104 - Optimise MVCC snapshot.

Aforementioned revision removes mutex lock around ReadView creation,
which allows views to be created concurrently. Effectively it
invalidates "oldest view" approach: no single view can be considered
oldest anymore. Instead we have to iterate trx_sys.m_views to find
min(m_low_limit_no), min(m_low_limit_id) and all transaction ids below
min(m_low_limit_id), which would form oldest view.

Second regression comes from c0d5d7c0
of MDEV-15104 - Optimise MVCC snapshot.

It removes mutex protection around trx->no assignment, which opens up
a gap between m_max_trx_id increment and transaction serialisation
number becoming visible through rw_trx_hash. While we're in this gap
concurrent thread may come and do MVCC snapshot without seeing allocated
but not yet assigned serialisation number. Then at some point purge
thread may clone this view. As a result it won't see newly allocated
serialisation number and may remove "unnecessary" history data of this
transaction from rollback segments.
parent ba125eca
...@@ -89,15 +89,46 @@ class ReadView ...@@ -89,15 +89,46 @@ class ReadView
/** /**
Copy state from another view. Copy state from another view.
This method is used to find min(m_low_limit_no), min(m_low_limit_id) and
all transaction ids below min(m_low_limit_id). These values effectively
form oldest view.
@param other view to copy from @param other view to copy from
*/ */
void copy(const ReadView &other) void copy(const ReadView &other)
{ {
ut_ad(&other != this); ut_ad(&other != this);
m_ids= other.m_ids; if (m_low_limit_no > other.m_low_limit_no)
m_up_limit_id= other.m_up_limit_id;
m_low_limit_no= other.m_low_limit_no; m_low_limit_no= other.m_low_limit_no;
if (m_low_limit_id > other.m_low_limit_id)
m_low_limit_id= other.m_low_limit_id; m_low_limit_id= other.m_low_limit_id;
trx_ids_t::iterator dst= m_ids.begin();
for (trx_ids_t::const_iterator src= other.m_ids.begin();
src != other.m_ids.end(); src++)
{
if (*src >= m_low_limit_id)
break;
loop:
if (dst == m_ids.end())
{
m_ids.push_back(*src);
dst= m_ids.end();
continue;
}
if (*dst < *src)
{
dst++;
goto loop;
}
else if (*dst > *src)
dst= m_ids.insert(dst, *src) + 1;
}
m_ids.erase(std::lower_bound(dst, m_ids.end(), m_low_limit_id),
m_ids.end());
m_up_limit_id= m_ids.empty() ? m_low_limit_id : m_ids.front();
ut_ad(m_up_limit_id <= m_low_limit_id);
} }
......
...@@ -797,17 +797,23 @@ class rw_trx_hash_t ...@@ -797,17 +797,23 @@ class rw_trx_hash_t
/** The transaction system central memory data structure. */ /** The transaction system central memory data structure. */
struct trx_sys_t { class trx_sys_t
private: {
/** /**
The smallest number not yet assigned as a transaction id or transaction The smallest number not yet assigned as a transaction id or transaction
number. Accessed and updated with atomic operations. number. Accessed and updated with atomic operations.
*/ */
MY_ALIGNED(CACHE_LINE_SIZE) trx_id_t m_max_trx_id; MY_ALIGNED(CACHE_LINE_SIZE) trx_id_t m_max_trx_id;
/** Solves race condition between register_rw() and snapshot_ids(). */ /**
Solves race conditions between register_rw() and snapshot_ids() as well as
race condition between assign_new_trx_no() and snapshot_ids().
@sa register_rw()
@sa assign_new_trx_no()
@sa snapshot_ids()
*/
MY_ALIGNED(CACHE_LINE_SIZE) trx_id_t m_rw_trx_hash_version; MY_ALIGNED(CACHE_LINE_SIZE) trx_id_t m_rw_trx_hash_version;
...@@ -895,7 +901,7 @@ struct trx_sys_t { ...@@ -895,7 +901,7 @@ struct trx_sys_t {
next call to trx_sys.get_new_trx_id() next call to trx_sys.get_new_trx_id()
*/ */
trx_id_t get_max_trx_id(void) trx_id_t get_max_trx_id()
{ {
return static_cast<trx_id_t> return static_cast<trx_id_t>
(my_atomic_load64_explicit(reinterpret_cast<int64*>(&m_max_trx_id), (my_atomic_load64_explicit(reinterpret_cast<int64*>(&m_max_trx_id),
...@@ -916,6 +922,38 @@ struct trx_sys_t { ...@@ -916,6 +922,38 @@ struct trx_sys_t {
} }
/**
Allocates and assigns new transaction serialisation number.
There's a gap between m_max_trx_id increment and transaction serialisation
number becoming visible through rw_trx_hash. While we're in this gap
concurrent thread may come and do MVCC snapshot without seeing allocated
but not yet assigned serialisation number. Then at some point purge thread
may clone this view. As a result it won't see newly allocated serialisation
number and may remove "unnecessary" history data of this transaction from
rollback segments.
m_rw_trx_hash_version is intended to solve this problem. MVCC snapshot has
to wait until m_max_trx_id == m_rw_trx_hash_version, which effectively
means that all transaction serialisation numbers up to m_max_trx_id are
available through rw_trx_hash.
We rely on refresh_rw_trx_hash_version() to issue RELEASE memory barrier so
that m_rw_trx_hash_version increment happens after
trx->rw_trx_hash_element->no becomes visible through rw_trx_hash.
@param trx transaction
*/
void assign_new_trx_no(trx_t *trx)
{
trx->no= get_new_trx_id_no_refresh();
my_atomic_store64_explicit(reinterpret_cast<int64*>
(&trx->rw_trx_hash_element->no),
trx->no, MY_MEMORY_ORDER_RELAXED);
refresh_rw_trx_hash_version();
}
/** /**
Takes MVCC snapshot. Takes MVCC snapshot.
...@@ -923,7 +961,7 @@ struct trx_sys_t { ...@@ -923,7 +961,7 @@ struct trx_sys_t {
in ids. in ids.
For details about get_rw_trx_hash_version() != get_max_trx_id() spin For details about get_rw_trx_hash_version() != get_max_trx_id() spin
@sa register_rw(). @sa register_rw() and @sa assign_new_trx_no().
We rely on get_rw_trx_hash_version() to issue ACQUIRE memory barrier so We rely on get_rw_trx_hash_version() to issue ACQUIRE memory barrier so
that loading of m_rw_trx_hash_version happens before accessing rw_trx_hash. that loading of m_rw_trx_hash_version happens before accessing rw_trx_hash.
...@@ -941,6 +979,7 @@ struct trx_sys_t { ...@@ -941,6 +979,7 @@ struct trx_sys_t {
void snapshot_ids(trx_t *caller_trx, trx_ids_t *ids, trx_id_t *max_trx_id, void snapshot_ids(trx_t *caller_trx, trx_ids_t *ids, trx_id_t *max_trx_id,
trx_id_t *min_trx_no) trx_id_t *min_trx_no)
{ {
ut_ad(!mutex_own(&mutex));
snapshot_ids_arg arg(ids); snapshot_ids_arg arg(ids);
while ((arg.m_id= get_rw_trx_hash_version()) != get_max_trx_id()) while ((arg.m_id= get_rw_trx_hash_version()) != get_max_trx_id())
...@@ -952,7 +991,6 @@ struct trx_sys_t { ...@@ -952,7 +991,6 @@ struct trx_sys_t {
rw_trx_hash.iterate(caller_trx, rw_trx_hash.iterate(caller_trx,
reinterpret_cast<my_hash_walk_action>(copy_one_id), reinterpret_cast<my_hash_walk_action>(copy_one_id),
&arg); &arg);
std::sort(ids->begin(), ids->end());
*max_trx_id= arg.m_id; *max_trx_id= arg.m_id;
*min_trx_no= arg.m_no; *min_trx_no= arg.m_no;
...@@ -1146,11 +1184,12 @@ struct trx_sys_t { ...@@ -1146,11 +1184,12 @@ struct trx_sys_t {
/** /**
Allocates new transaction id without refreshing rw_trx_hash version. Allocates new transaction id without refreshing rw_trx_hash version.
This method is extracted for exclusive use by register_rw() where This method is extracted for exclusive use by register_rw() and
transaction must be inserted into rw_trx_hash between new transaction id assign_new_trx_no() where new id must be allocated atomically with
allocation and rw_trx_hash version refresh. payload of these methods from MVCC snapshot point of view.
@sa get_new_trx_id() @sa get_new_trx_id()
@sa assign_new_trx_no()
@return new transaction id @return new transaction id
*/ */
......
...@@ -112,8 +112,6 @@ enum trx_dict_op_t { ...@@ -112,8 +112,6 @@ enum trx_dict_op_t {
struct trx_t; struct trx_t;
/** The locks and state of an active transaction */ /** The locks and state of an active transaction */
struct trx_lock_t; struct trx_lock_t;
/** Transaction system */
struct trx_sys_t;
/** Signal */ /** Signal */
struct trx_sig_t; struct trx_sig_t;
/** Rollback segment */ /** Rollback segment */
......
...@@ -182,8 +182,8 @@ will mark their views as closed but not actually free their views. ...@@ -182,8 +182,8 @@ will mark their views as closed but not actually free their views.
*/ */
void ReadView::snapshot(trx_t *trx) void ReadView::snapshot(trx_t *trx)
{ {
ut_ad(!mutex_own(&trx_sys.mutex));
trx_sys.snapshot_ids(trx, &m_ids, &m_low_limit_id, &m_low_limit_no); trx_sys.snapshot_ids(trx, &m_ids, &m_low_limit_id, &m_low_limit_no);
std::sort(m_ids.begin(), m_ids.end());
m_up_limit_id= m_ids.empty() ? m_low_limit_id : m_ids.front(); m_up_limit_id= m_ids.empty() ? m_low_limit_id : m_ids.front();
ut_ad(m_up_limit_id <= m_low_limit_id); ut_ad(m_up_limit_id <= m_low_limit_id);
} }
...@@ -219,7 +219,7 @@ void ReadView::open(trx_t *trx) ...@@ -219,7 +219,7 @@ void ReadView::open(trx_t *trx)
protection. But we're cutting edges to achieve great scalability. protection. But we're cutting edges to achieve great scalability.
There're at least two types of concurrent threads interested in this There're at least two types of concurrent threads interested in this
value: purge coordinator thread (see MVCC::clone_oldest_view()) and value: purge coordinator thread (see trx_sys_t::clone_oldest_view()) and
InnoDB monitor thread (see lock_trx_print_wait_and_mvcc_state()). InnoDB monitor thread (see lock_trx_print_wait_and_mvcc_state()).
What bad things can happen because we allow this race? What bad things can happen because we allow this race?
...@@ -319,10 +319,7 @@ void ReadView::close() ...@@ -319,10 +319,7 @@ void ReadView::close()
*/ */
void trx_sys_t::clone_oldest_view() void trx_sys_t::clone_oldest_view()
{ {
const ReadView *oldest_view= &purge_sys->view;
purge_sys->view.snapshot(0); purge_sys->view.snapshot(0);
mutex_enter(&mutex); mutex_enter(&mutex);
/* Find oldest view. */ /* Find oldest view. */
for (const ReadView *v= UT_LIST_GET_FIRST(m_views); v; for (const ReadView *v= UT_LIST_GET_FIRST(m_views); v;
...@@ -333,11 +330,8 @@ void trx_sys_t::clone_oldest_view() ...@@ -333,11 +330,8 @@ void trx_sys_t::clone_oldest_view()
while ((state= v->get_state()) == READ_VIEW_STATE_SNAPSHOT) while ((state= v->get_state()) == READ_VIEW_STATE_SNAPSHOT)
ut_delay(1); ut_delay(1);
if (state == READ_VIEW_STATE_OPEN && if (state == READ_VIEW_STATE_OPEN)
v->low_limit_no() < oldest_view->low_limit_no()) purge_sys->view.copy(*v);
oldest_view= v;
} }
if (oldest_view != &purge_sys->view)
purge_sys->view.copy(*oldest_view);
mutex_exit(&mutex); mutex_exit(&mutex);
} }
...@@ -1222,10 +1222,7 @@ trx_serialise(trx_t* trx) ...@@ -1222,10 +1222,7 @@ trx_serialise(trx_t* trx)
mutex_enter(&purge_sys->pq_mutex); mutex_enter(&purge_sys->pq_mutex);
} }
trx->no = trx_sys.get_new_trx_id(); trx_sys.assign_new_trx_no(trx);
my_atomic_store64_explicit(reinterpret_cast<int64*>
(&trx->rw_trx_hash_element->no),
trx->no, MY_MEMORY_ORDER_RELAXED);
/* If the rollack segment is not empty then the /* If the rollack segment is not empty then the
new trx_t::no can't be less than any trx_t::no new trx_t::no can't be less than any trx_t::no
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment