Commit fb335b48 authored by Marko Mäkelä's avatar Marko Mäkelä

Allocate purge_sys statically

There is only one purge_sys. Allocate it statically in order to avoid
dereferencing a pointer whenever accessing it. Also, align some
members to their own cache line in order to avoid false sharing.

purge_sys_t::create(): The deferred constructor.

purge_sys_t::close(): The early destructor.

undo::Truncate::create(): The deferred constructor.
Because purge_sys.undo_trunc is constructed before the start-up
parameters are parsed, the normal constructor would copy a
wrong value of srv_purge_rseg_truncate_frequency.

TrxUndoRsegsIterator: Do not forward-declare an inline constructor,
because the static construction of purge_sys.rseg_iter would not have
access to it.
parent a3a2b898
......@@ -142,7 +142,7 @@ typedef std::priority_queue<
/** Chooses the rollback segment with the oldest committed transaction */
struct TrxUndoRsegsIterator {
/** Constructor */
inline TrxUndoRsegsIterator();
TrxUndoRsegsIterator();
/** Sets the next rseg to purge in purge_sys.
Executed in the purge coordinator thread.
@return whether anything is to be purged */
......@@ -204,17 +204,12 @@ namespace undo {
/** Track UNDO tablespace mark for truncate. */
class Truncate {
public:
Truncate()
:
m_undo_for_trunc(ULINT_UNDEFINED),
m_rseg_for_trunc(),
m_scan_start(1),
m_purge_rseg_truncate_frequency(
static_cast<ulint>(
srv_purge_rseg_truncate_frequency))
void create()
{
/* Do Nothing. */
m_undo_for_trunc = ULINT_UNDEFINED;
m_scan_start = 1;
m_purge_rseg_truncate_frequency =
ulint(srv_purge_rseg_truncate_frequency);
}
/** Clear the cached rollback segment. Normally done
......@@ -401,12 +396,9 @@ namespace undo {
/** The control structure used in the purge operation */
class purge_sys_t
{
bool m_initialised;
public:
/** Construct the purge system. */
purge_sys_t();
/** Destruct the purge system. */
~purge_sys_t();
MY_ALIGNED(CACHE_LINE_SIZE)
rw_lock_t latch; /*!< The latch protecting the purge
view. A purge operation must acquire an
x-latch here for the instant at which
......@@ -414,11 +406,14 @@ class purge_sys_t
log operation can prevent this by
obtaining an s-latch here. It also
protects state and running */
MY_ALIGNED(CACHE_LINE_SIZE)
os_event_t event; /*!< State signal event;
os_event_set() and os_event_reset()
are protected by purge_sys_t::latch
X-lock */
MY_ALIGNED(CACHE_LINE_SIZE)
ulint n_stop; /*!< Counter to track number stops */
volatile bool running; /*!< true, if purge is active,
we check this without the latch too */
volatile purge_state_t state; /*!< Purge coordinator thread states,
......@@ -426,6 +421,7 @@ class purge_sys_t
without holding the latch. */
que_t* query; /*!< The query graph which will do the
parallelized purge operation */
MY_ALIGNED(CACHE_LINE_SIZE)
ReadView view; /*!< The purge will not remove undo logs
which are >= this view (purge view) */
ulint n_submitted; /*!< Count of total tasks submitted
......@@ -486,10 +482,30 @@ class purge_sys_t
undo::Truncate undo_trunc; /*!< Track UNDO tablespace marked
for truncate. */
/**
Constructor.
Some members may require late initialisation, thus we just mark object as
uninitialised. Real initialisation happens in create().
*/
purge_sys_t() : m_initialised(false) {}
bool is_initialised() const { return m_initialised; }
/** Create the instance */
void create();
/** Close the purge system on shutdown */
void close();
};
/** The global data structure coordinating a purge */
extern purge_sys_t* purge_sys;
extern purge_sys_t purge_sys;
/** Info required to purge a record */
struct trx_purge_rec_t {
......
......@@ -996,10 +996,10 @@ class trx_sys_t
bool is_initialised() { return m_initialised; }
/** Create the instance */
/** Initialise the purge subsystem. */
void create();
/** Close the transaction system on shutdown */
/** Close the purge subsystem on shutdown. */
void close();
/** @return total number of active (non-prepared) transactions */
......
......@@ -5258,14 +5258,14 @@ lock_print_info_summary(
fprintf(file,
"Purge done for trx's n:o < " TRX_ID_FMT
" undo n:o < " TRX_ID_FMT " state: ",
purge_sys->tail.trx_no(),
purge_sys->tail.undo_no);
purge_sys.tail.trx_no(),
purge_sys.tail.undo_no);
/* Note: We are reading the state without the latch. One because it
will violate the latching order and two because we are merely querying
the state of the variable for display. */
switch (purge_sys->state){
switch (purge_sys.state){
case PURGE_STATE_INIT:
/* Should never be in this state while the system is running. */
ut_error;
......@@ -5281,7 +5281,7 @@ lock_print_info_summary(
case PURGE_STATE_RUN:
fprintf(file, "running");
/* Check if it is waiting for more data to arrive. */
if (!purge_sys->running) {
if (!purge_sys.running) {
fprintf(file, " but idle");
}
break;
......
......@@ -319,7 +319,7 @@ void ReadView::close()
*/
void trx_sys_t::clone_oldest_view()
{
purge_sys->view.snapshot(0);
purge_sys.view.snapshot(0);
mutex_enter(&mutex);
/* Find oldest view. */
for (const ReadView *v= UT_LIST_GET_FIRST(m_views); v;
......@@ -331,7 +331,7 @@ void trx_sys_t::clone_oldest_view()
ut_delay(1);
if (state == READ_VIEW_STATE_OPEN)
purge_sys->view.copy(*v);
purge_sys.view.copy(*v);
}
mutex_exit(&mutex);
}
......@@ -436,11 +436,11 @@ row_vers_must_preserve_del_marked(
const table_name_t& name,
mtr_t* mtr)
{
ut_ad(!rw_lock_own(&(purge_sys->latch), RW_LOCK_S));
ut_ad(!rw_lock_own(&(purge_sys.latch), RW_LOCK_S));
mtr_s_lock(&purge_sys->latch, mtr);
mtr_s_lock(&purge_sys.latch, mtr);
return(!purge_sys->view.changes_visible(trx_id, name));
return(!purge_sys.view.changes_visible(trx_id, name));
}
/** build virtual column value from current cluster index record data
......@@ -866,7 +866,7 @@ row_vers_old_has_index_entry(
ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
| MTR_MEMO_PAGE_S_FIX));
ut_ad(!rw_lock_own(&(purge_sys->latch), RW_LOCK_S));
ut_ad(!rw_lock_own(&(purge_sys.latch), RW_LOCK_S));
clust_index = dict_table_get_first_index(index->table);
......@@ -889,7 +889,7 @@ row_vers_old_has_index_entry(
/* The top of the stack of versions is locked by the
mtr holding a latch on the page containing the
clustered index record. The bottom of the stack is
locked by the fact that the purge_sys->view must
locked by the fact that the purge_sys.view must
'overtake' any read view of an active transaction.
Thus, it is safe to fetch the prefixes for
externally stored columns. */
......@@ -1121,7 +1121,7 @@ row_vers_build_for_consistent_read(
ut_ad(dict_index_is_clust(index));
ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
| MTR_MEMO_PAGE_S_FIX));
ut_ad(!rw_lock_own(&(purge_sys->latch), RW_LOCK_S));
ut_ad(!rw_lock_own(&(purge_sys.latch), RW_LOCK_S));
ut_ad(rec_offs_validate(rec, index, *offsets));
......@@ -1234,7 +1234,7 @@ row_vers_build_for_semi_consistent_read(
ut_ad(dict_index_is_clust(index));
ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
| MTR_MEMO_PAGE_S_FIX));
ut_ad(!rw_lock_own(&(purge_sys->latch), RW_LOCK_S));
ut_ad(!rw_lock_own(&(purge_sys.latch), RW_LOCK_S));
ut_ad(rec_offs_validate(rec, index, *offsets));
......
......@@ -1923,7 +1923,7 @@ srv_get_active_thread_type(void)
srv_sys_mutex_exit();
if (ret == SRV_NONE && srv_shutdown_state != SRV_SHUTDOWN_NONE
&& purge_sys != NULL) {
&& purge_sys.is_initialised()) {
/* Check only on shutdown. */
switch (trx_purge_state()) {
case PURGE_STATE_RUN:
......@@ -1973,7 +1973,7 @@ srv_wake_purge_thread_if_not_active()
{
ut_ad(!srv_sys_mutex_own());
if (purge_sys->state == PURGE_STATE_RUN
if (purge_sys.state == PURGE_STATE_RUN
&& !my_atomic_loadlint(&srv_sys.n_threads_active[SRV_PURGE])
&& trx_sys.history_size()) {
......@@ -2506,7 +2506,7 @@ srv_task_execute(void)
que_run_threads(thr);
my_atomic_addlint(
&purge_sys->n_completed, 1);
&purge_sys.n_completed, 1);
}
return(thr != NULL);
......@@ -2559,17 +2559,17 @@ DECLARE_THREAD(srv_worker_thread)(
}
/* Note: we are checking the state without holding the
purge_sys->latch here. */
} while (purge_sys->state != PURGE_STATE_EXIT);
purge_sys.latch here. */
} while (purge_sys.state != PURGE_STATE_EXIT);
srv_free_slot(slot);
rw_lock_x_lock(&purge_sys->latch);
rw_lock_x_lock(&purge_sys.latch);
ut_a(!purge_sys->running);
ut_a(purge_sys->state == PURGE_STATE_EXIT);
ut_a(!purge_sys.running);
ut_a(purge_sys.state == PURGE_STATE_EXIT);
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
#ifdef UNIV_DEBUG_THREAD_CREATION
ib::info() << "Purge worker thread exiting, id "
......@@ -2648,7 +2648,7 @@ srv_do_purge(ulint* n_total_purged)
}
ulint undo_trunc_freq =
purge_sys->undo_trunc.get_rseg_truncate_frequency();
purge_sys.undo_trunc.get_rseg_truncate_frequency();
ulint rseg_truncate_frequency = ut_min(
static_cast<ulint>(srv_purge_rseg_truncate_frequency),
......@@ -2662,7 +2662,7 @@ srv_do_purge(ulint* n_total_purged)
} while (!srv_purge_should_exit(n_pages_purged)
&& n_pages_purged > 0
&& purge_sys->state == PURGE_STATE_RUN);
&& purge_sys.state == PURGE_STATE_RUN);
return(rseg_history_len);
}
......@@ -2689,11 +2689,11 @@ srv_purge_coordinator_suspend(
int64_t sig_count = srv_suspend_thread(slot);
do {
rw_lock_x_lock(&purge_sys->latch);
rw_lock_x_lock(&purge_sys.latch);
purge_sys->running = false;
purge_sys.running = false;
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
/* We don't wait right away on the the non-timed wait because
we want to signal the thread that wants to suspend purge. */
......@@ -2705,14 +2705,14 @@ srv_purge_coordinator_suspend(
sig_count = srv_suspend_thread(slot);
rw_lock_x_lock(&purge_sys->latch);
rw_lock_x_lock(&purge_sys.latch);
stop = (srv_shutdown_state == SRV_SHUTDOWN_NONE
&& purge_sys->state == PURGE_STATE_STOP);
&& purge_sys.state == PURGE_STATE_STOP);
if (!stop) {
ut_a(purge_sys->n_stop == 0);
purge_sys->running = true;
ut_a(purge_sys.n_stop == 0);
purge_sys.running = true;
if (timeout
&& rseg_history_len < 5000
......@@ -2727,13 +2727,13 @@ srv_purge_coordinator_suspend(
stop = true;
}
} else {
ut_a(purge_sys->n_stop > 0);
ut_a(purge_sys.n_stop > 0);
/* Signal that we are suspended. */
os_event_set(purge_sys->event);
os_event_set(purge_sys.event);
}
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
} while (stop && srv_undo_sources);
srv_resume_thread(slot, 0, false);
......@@ -2759,12 +2759,12 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
ut_a(trx_purge_state() == PURGE_STATE_INIT);
ut_a(srv_force_recovery < SRV_FORCE_NO_BACKGROUND);
rw_lock_x_lock(&purge_sys->latch);
rw_lock_x_lock(&purge_sys.latch);
purge_sys->running = true;
purge_sys->state = PURGE_STATE_RUN;
purge_sys.running = true;
purge_sys.state = PURGE_STATE_RUN;
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
#ifdef UNIV_PFS_THREAD
pfs_register_thread(srv_purge_thread_key);
......@@ -2785,7 +2785,7 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
if (srv_shutdown_state == SRV_SHUTDOWN_NONE
&& srv_undo_sources
&& (purge_sys->state == PURGE_STATE_STOP
&& (purge_sys.state == PURGE_STATE_STOP
|| n_total_purged == 0)) {
srv_purge_coordinator_suspend(slot, rseg_history_len);
......@@ -2809,20 +2809,20 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
srv_free_slot(slot);
/* Note that we are shutting down. */
rw_lock_x_lock(&purge_sys->latch);
rw_lock_x_lock(&purge_sys.latch);
purge_sys->state = PURGE_STATE_EXIT;
purge_sys.state = PURGE_STATE_EXIT;
/* If there are any pending undo-tablespace truncate then clear
it off as we plan to shutdown the purge thread. */
purge_sys->undo_trunc.clear();
purge_sys.undo_trunc.clear();
purge_sys->running = false;
purge_sys.running = false;
/* Ensure that the wait in trx_purge_stop() will terminate. */
os_event_set(purge_sys->event);
os_event_set(purge_sys.event);
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
#ifdef UNIV_DEBUG_THREAD_CREATION
ib::info() << "Purge coordinator exiting, id "
......
......@@ -2661,7 +2661,7 @@ innobase_start_or_create_for_mysql()
srv_start_state_set(SRV_START_STATE_PURGE);
} else {
purge_sys->state = PURGE_STATE_DISABLED;
purge_sys.state = PURGE_STATE_DISABLED;
}
srv_is_being_started = false;
......@@ -2871,8 +2871,7 @@ innodb_shutdown()
log_shutdown();
}
trx_sys.close();
UT_DELETE(purge_sys);
purge_sys = NULL;
purge_sys.close();
if (buf_dblwr) {
buf_dblwr_free();
}
......
......@@ -53,7 +53,7 @@ ulong srv_max_purge_lag = 0;
ulong srv_max_purge_lag_delay = 0;
/** The global data structure coordinating a purge */
purge_sys_t* purge_sys;
purge_sys_t purge_sys;
/** A dummy undo record used as a return value when we have a whole undo log
which needs no purge */
......@@ -67,7 +67,7 @@ my_bool srv_purge_view_update_only_debug;
static const TrxUndoRsegs NullElement;
/** Default constructor */
inline TrxUndoRsegsIterator::TrxUndoRsegsIterator()
TrxUndoRsegsIterator::TrxUndoRsegsIterator()
: m_rsegs(NullElement), m_iter(m_rsegs.begin())
{
}
......@@ -77,7 +77,7 @@ Executed in the purge coordinator thread.
@return whether anything is to be purged */
inline bool TrxUndoRsegsIterator::set_next()
{
mutex_enter(&purge_sys->pq_mutex);
mutex_enter(&purge_sys.pq_mutex);
/* Only purge consumes events from the priority queue, user
threads only produce the events. */
......@@ -90,41 +90,41 @@ inline bool TrxUndoRsegsIterator::set_next()
number shouldn't increase. Undo the increment of
expected commit done by caller assuming rollback
segments from given transaction are done. */
purge_sys->tail.commit = (*m_iter)->last_commit;
} else if (!purge_sys->purge_queue.empty()) {
m_rsegs = purge_sys->purge_queue.top();
purge_sys->purge_queue.pop();
ut_ad(purge_sys->purge_queue.empty()
|| purge_sys->purge_queue.top() != m_rsegs);
purge_sys.tail.commit = (*m_iter)->last_commit;
} else if (!purge_sys.purge_queue.empty()) {
m_rsegs = purge_sys.purge_queue.top();
purge_sys.purge_queue.pop();
ut_ad(purge_sys.purge_queue.empty()
|| purge_sys.purge_queue.top() != m_rsegs);
m_iter = m_rsegs.begin();
} else {
/* Queue is empty, reset iterator. */
purge_sys->rseg = NULL;
mutex_exit(&purge_sys->pq_mutex);
purge_sys.rseg = NULL;
mutex_exit(&purge_sys.pq_mutex);
m_rsegs = NullElement;
m_iter = m_rsegs.begin();
return false;
}
purge_sys->rseg = *m_iter++;
mutex_exit(&purge_sys->pq_mutex);
mutex_enter(&purge_sys->rseg->mutex);
purge_sys.rseg = *m_iter++;
mutex_exit(&purge_sys.pq_mutex);
mutex_enter(&purge_sys.rseg->mutex);
ut_a(purge_sys->rseg->last_page_no != FIL_NULL);
ut_ad(purge_sys->rseg->last_trx_no() == m_rsegs.trx_no());
ut_a(purge_sys.rseg->last_page_no != FIL_NULL);
ut_ad(purge_sys.rseg->last_trx_no() == m_rsegs.trx_no());
/* We assume in purge of externally stored fields that space id is
in the range of UNDO tablespace space ids */
ut_a(purge_sys->rseg->space == TRX_SYS_SPACE
|| srv_is_undo_tablespace(purge_sys->rseg->space));
ut_a(purge_sys.rseg->space == TRX_SYS_SPACE
|| srv_is_undo_tablespace(purge_sys.rseg->space));
ut_a(purge_sys->tail.commit <= purge_sys->rseg->last_commit);
ut_a(purge_sys.tail.commit <= purge_sys.rseg->last_commit);
purge_sys->tail.commit = purge_sys->rseg->last_commit;
purge_sys->hdr_offset = purge_sys->rseg->last_offset;
purge_sys->hdr_page_no = purge_sys->rseg->last_page_no;
purge_sys.tail.commit = purge_sys.rseg->last_commit;
purge_sys.hdr_offset = purge_sys.rseg->last_offset;
purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
mutex_exit(&purge_sys->rseg->mutex);
mutex_exit(&purge_sys.rseg->mutex);
return(true);
}
......@@ -157,27 +157,37 @@ purge_graph_build()
return(fork);
}
/** Construct the purge system. */
purge_sys_t::purge_sys_t()
: latch(), event(os_event_create(0)),
n_stop(0), running(false), state(PURGE_STATE_INIT),
query(purge_graph_build()),
view(), n_submitted(0), n_completed(0),
tail(), head(),
next_stored(false), rseg(NULL),
page_no(0), offset(0), hdr_page_no(0), hdr_offset(0),
rseg_iter(), purge_queue(), pq_mutex(), undo_trunc()
/** Initialise the purge system. */
void purge_sys_t::create()
{
ut_ad(!purge_sys);
rw_lock_create(trx_purge_latch_key, &latch, SYNC_PURGE_LATCH);
mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex);
ut_ad(this == &purge_sys);
ut_ad(!is_initialised());
event= os_event_create(0);
n_stop= 0;
running= false;
state= PURGE_STATE_INIT;
query= purge_graph_build();
n_submitted= 0;
n_completed= 0;
next_stored= false;
rseg= NULL;
page_no= 0;
offset= 0;
hdr_page_no= 0;
hdr_offset= 0;
rw_lock_create(trx_purge_latch_key, &latch, SYNC_PURGE_LATCH);
mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex);
undo_trunc.create();
m_initialised = true;
}
/** Destruct the purge system. */
purge_sys_t::~purge_sys_t()
/** Close the purge subsystem on shutdown. */
void purge_sys_t::close()
{
ut_ad(this == purge_sys);
ut_ad(this == &purge_sys);
if (!is_initialised()) return;
m_initialised = false;
trx_t* trx = query->trx;
que_graph_free(query);
ut_ad(!trx->id);
......@@ -268,9 +278,9 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
ut_ad(srv_undo_sources
|| ((srv_startup_is_before_trx_rollback_phase
|| trx_rollback_is_active)
&& purge_sys->state == PURGE_STATE_INIT)
&& purge_sys.state == PURGE_STATE_INIT)
|| (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND
&& purge_sys->state == PURGE_STATE_DISABLED)
&& purge_sys.state == PURGE_STATE_DISABLED)
|| ((trx->undo_no == 0 || trx->in_mysql_trx_list
|| trx->internal)
&& srv_fast_shutdown));
......@@ -842,17 +852,17 @@ void
trx_purge_cleanse_purge_queue(
undo::Truncate* undo_trunc)
{
mutex_enter(&purge_sys->pq_mutex);
mutex_enter(&purge_sys.pq_mutex);
typedef std::vector<TrxUndoRsegs> purge_elem_list_t;
purge_elem_list_t purge_elem_list;
/* Remove rseg instances that are in the purge queue before we start
truncate of corresponding UNDO truncate. */
while (!purge_sys->purge_queue.empty()) {
purge_elem_list.push_back(purge_sys->purge_queue.top());
purge_sys->purge_queue.pop();
while (!purge_sys.purge_queue.empty()) {
purge_elem_list.push_back(purge_sys.purge_queue.top());
purge_sys.purge_queue.pop();
}
ut_ad(purge_sys->purge_queue.empty());
ut_ad(purge_sys.purge_queue.empty());
for (purge_elem_list_t::iterator it = purge_elem_list.begin();
it != purge_elem_list.end();
......@@ -870,10 +880,10 @@ trx_purge_cleanse_purge_queue(
}
if (!it->empty()) {
purge_sys->purge_queue.push(*it);
purge_sys.purge_queue.push(*it);
}
}
mutex_exit(&purge_sys->pq_mutex);
mutex_exit(&purge_sys.pq_mutex);
}
/** Iterate over selected UNDO tablespace and check if all the rsegs
......@@ -994,17 +1004,17 @@ trx_purge_initiate_truncate(
return;
}
if (purge_sys->rseg != NULL
&& purge_sys->rseg->last_page_no == FIL_NULL) {
/* If purge_sys->rseg is pointing to rseg that was recently
if (purge_sys.rseg != NULL
&& purge_sys.rseg->last_page_no == FIL_NULL) {
/* If purge_sys.rseg is pointing to rseg that was recently
truncated then move to next rseg element.
Note: Ideally purge_sys->rseg should be NULL because purge
Note: Ideally purge_sys.rseg should be NULL because purge
should complete processing of all the records but there is
purge_batch_size that can force the purge loop to exit before
all the records are purged and in this case purge_sys->rseg
all the records are purged and in this case purge_sys.rseg
could point to a valid rseg waiting for next purge cycle. */
purge_sys->next_stored = false;
purge_sys->rseg = NULL;
purge_sys.next_stored = false;
purge_sys.rseg = NULL;
}
DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_end",
......@@ -1037,13 +1047,13 @@ function is called, the caller must not have any latches on undo log pages!
*/
static void trx_purge_truncate_history()
{
ut_ad(purge_sys->head <= purge_sys->tail);
purge_sys_t::iterator& head = purge_sys->head.commit
? purge_sys->head : purge_sys->tail;
ut_ad(purge_sys.head <= purge_sys.tail);
purge_sys_t::iterator& head = purge_sys.head.commit
? purge_sys.head : purge_sys.tail;
if (head.trx_no() >= purge_sys->view.low_limit_no()) {
if (head.trx_no() >= purge_sys.view.low_limit_no()) {
/* This is sometimes necessary. TODO: find out why. */
head.reset_trx_no(purge_sys->view.low_limit_no());
head.reset_trx_no(purge_sys.view.low_limit_no());
head.undo_no = 0;
}
......@@ -1058,14 +1068,14 @@ static void trx_purge_truncate_history()
can (greedy approach). This will ensure when the server is idle we
try and truncate all the UNDO tablespaces. */
for (ulint i = srv_undo_tablespaces_active; i--; ) {
trx_purge_mark_undo_for_truncate(&purge_sys->undo_trunc);
trx_purge_initiate_truncate(head, &purge_sys->undo_trunc);
trx_purge_mark_undo_for_truncate(&purge_sys.undo_trunc);
trx_purge_initiate_truncate(head, &purge_sys.undo_trunc);
}
}
/***********************************************************************//**
Updates the last not yet purged history log info in rseg when we have purged
a whole undo log. Advances also purge_sys->purge_trx_no past the purged log. */
a whole undo log. Advances also purge_sys.purge_trx_no past the purged log. */
static
void
trx_purge_rseg_get_next_history_log(
......@@ -1084,9 +1094,9 @@ trx_purge_rseg_get_next_history_log(
ut_a(rseg->last_page_no != FIL_NULL);
purge_sys->tail.commit = rseg->last_commit + 1;
purge_sys->tail.undo_no = 0;
purge_sys->next_stored = false;
purge_sys.tail.commit = rseg->last_commit + 1;
purge_sys.tail.undo_no = 0;
purge_sys.next_stored = false;
mtr_start(&mtr);
......@@ -1142,11 +1152,11 @@ trx_purge_rseg_get_next_history_log(
than the events that Purge produces. ie. Purge can never produce
events from an empty rollback segment. */
mutex_enter(&purge_sys->pq_mutex);
mutex_enter(&purge_sys.pq_mutex);
purge_sys->purge_queue.push(*rseg);
purge_sys.purge_queue.push(*rseg);
mutex_exit(&purge_sys->pq_mutex);
mutex_exit(&purge_sys.pq_mutex);
mutex_exit(&rseg->mutex);
}
......@@ -1160,15 +1170,15 @@ trx_purge_read_undo_rec()
ulint page_no;
ib_uint64_t undo_no;
purge_sys->hdr_offset = purge_sys->rseg->last_offset;
page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no;
purge_sys.hdr_offset = purge_sys.rseg->last_offset;
page_no = purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
if (purge_sys->rseg->needs_purge) {
if (purge_sys.rseg->needs_purge) {
mtr_t mtr;
mtr.start();
if (trx_undo_rec_t* undo_rec = trx_undo_get_first_rec(
purge_sys->rseg->space, purge_sys->hdr_page_no,
purge_sys->hdr_offset, RW_S_LATCH, &mtr)) {
purge_sys.rseg->space, purge_sys.hdr_page_no,
purge_sys.hdr_offset, RW_S_LATCH, &mtr)) {
offset = page_offset(undo_rec);
undo_no = trx_undo_rec_get_undo_no(undo_rec);
......@@ -1184,11 +1194,11 @@ trx_purge_read_undo_rec()
undo_no = 0;
}
purge_sys->offset = offset;
purge_sys->page_no = page_no;
purge_sys->tail.undo_no = undo_no;
purge_sys.offset = offset;
purge_sys.page_no = page_no;
purge_sys.tail.undo_no = undo_no;
purge_sys->next_stored = true;
purge_sys.next_stored = true;
}
/***********************************************************************//**
......@@ -1201,9 +1211,9 @@ void
trx_purge_choose_next_log(void)
/*===========================*/
{
ut_ad(!purge_sys->next_stored);
ut_ad(!purge_sys.next_stored);
if (purge_sys->rseg_iter.set_next()) {
if (purge_sys.rseg_iter.set_next()) {
trx_purge_read_undo_rec();
} else {
/* There is nothing to do yet. */
......@@ -1232,19 +1242,19 @@ trx_purge_get_next_rec(
ulint space;
mtr_t mtr;
ut_ad(purge_sys->next_stored);
ut_ad(purge_sys->tail.trx_no() < purge_sys->view.low_limit_no());
ut_ad(purge_sys.next_stored);
ut_ad(purge_sys.tail.trx_no() < purge_sys.view.low_limit_no());
space = purge_sys->rseg->space;
page_no = purge_sys->page_no;
offset = purge_sys->offset;
space = purge_sys.rseg->space;
page_no = purge_sys.page_no;
offset = purge_sys.offset;
if (offset == 0) {
/* It is the dummy undo log record, which means that there is
no need to purge this undo log */
trx_purge_rseg_get_next_history_log(
purge_sys->rseg, n_pages_handled);
purge_sys.rseg, n_pages_handled);
/* Look for the next undo log and record to purge */
......@@ -1260,19 +1270,19 @@ trx_purge_get_next_rec(
rec = undo_page + offset;
rec2 = trx_undo_page_get_next_rec(rec, purge_sys->hdr_page_no,
purge_sys->hdr_offset);
rec2 = trx_undo_page_get_next_rec(rec, purge_sys.hdr_page_no,
purge_sys.hdr_offset);
if (rec2 == NULL) {
rec2 = trx_undo_get_next_rec(rec, purge_sys->hdr_page_no,
purge_sys->hdr_offset, &mtr);
rec2 = trx_undo_get_next_rec(rec, purge_sys.hdr_page_no,
purge_sys.hdr_offset, &mtr);
}
if (rec2 == NULL) {
mtr_commit(&mtr);
trx_purge_rseg_get_next_history_log(
purge_sys->rseg, n_pages_handled);
purge_sys.rseg, n_pages_handled);
/* Look for the next undo log and record to purge */
......@@ -1287,9 +1297,9 @@ trx_purge_get_next_rec(
} else {
page = page_align(rec2);
purge_sys->offset = rec2 - page;
purge_sys->page_no = page_get_page_no(page);
purge_sys->tail.undo_no = trx_undo_rec_get_undo_no(rec2);
purge_sys.offset = rec2 - page;
purge_sys.page_no = page_get_page_no(page);
purge_sys.tail.undo_no = trx_undo_rec_get_undo_no(rec2);
if (undo_page != page) {
/* We advance to a new page of the undo log: */
......@@ -1318,17 +1328,17 @@ trx_purge_fetch_next_rec(
handled */
mem_heap_t* heap) /*!< in: memory heap where copied */
{
if (!purge_sys->next_stored) {
if (!purge_sys.next_stored) {
trx_purge_choose_next_log();
if (!purge_sys->next_stored) {
if (!purge_sys.next_stored) {
DBUG_PRINT("ib_purge",
("no logs left in the history list"));
return(NULL);
}
}
if (purge_sys->tail.trx_no() >= purge_sys->view.low_limit_no()) {
if (purge_sys.tail.trx_no() >= purge_sys.view.low_limit_no()) {
return(NULL);
}
......@@ -1340,8 +1350,8 @@ trx_purge_fetch_next_rec(
/* row_purge_record_func() will later set
ROLL_PTR_INSERT_FLAG for TRX_UNDO_INSERT_REC */
false,
purge_sys->rseg->id,
purge_sys->page_no, purge_sys->offset);
purge_sys.rseg->id,
purge_sys.page_no, purge_sys.offset);
/* The following call will advance the stored values of the
purge iterator. */
......@@ -1359,14 +1369,14 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
que_thr_t* thr;
ulint i = 0;
ulint n_pages_handled = 0;
ulint n_thrs = UT_LIST_GET_LEN(purge_sys->query->thrs);
ulint n_thrs = UT_LIST_GET_LEN(purge_sys.query->thrs);
ut_a(n_purge_threads > 0);
purge_sys->head = purge_sys->tail;
purge_sys.head = purge_sys.tail;
/* Debug code to validate some pre-requisites and reset done flag. */
for (thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
thr != NULL && i < n_purge_threads;
thr = UT_LIST_GET_NEXT(thrs, thr), ++i) {
......@@ -1388,10 +1398,10 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
/* Fetch and parse the UNDO records. The UNDO records are added
to a per purge node vector. */
thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
ut_a(n_thrs > 0 && thr != NULL);
ut_ad(purge_sys->head <= purge_sys->tail);
ut_ad(purge_sys.head <= purge_sys.tail);
i = 0;
......@@ -1413,11 +1423,11 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
/* Track the max {trx_id, undo_no} for truncating the
UNDO logs once we have purged the records. */
if (purge_sys->head <= purge_sys->tail) {
purge_sys->head = purge_sys->tail;
if (purge_sys.head <= purge_sys.tail) {
purge_sys.head = purge_sys.tail;
}
/* Fetch the next record, and advance the purge_sys->tail. */
/* Fetch the next record, and advance the purge_sys.tail. */
purge_rec->undo_rec = trx_purge_fetch_next_rec(
&purge_rec->roll_ptr, &n_pages_handled, node->heap);
......@@ -1445,13 +1455,13 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
thr = UT_LIST_GET_NEXT(thrs, thr);
if (!(++i % n_purge_threads)) {
thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
}
ut_a(thr != NULL);
}
ut_ad(purge_sys->head <= purge_sys->tail);
ut_ad(purge_sys.head <= purge_sys.tail);
return(n_pages_handled);
}
......@@ -1501,10 +1511,10 @@ static
void
trx_purge_wait_for_workers_to_complete()
{
ulint n_submitted = purge_sys->n_submitted;
ulint n_submitted = purge_sys.n_submitted;
/* Ensure that the work queue empties out. */
while ((ulint) my_atomic_loadlint(&purge_sys->n_completed) != n_submitted) {
while ((ulint) my_atomic_loadlint(&purge_sys.n_completed) != n_submitted) {
if (srv_get_task_queue_length() > 0) {
srv_release_threads(SRV_WORKER, 1);
......@@ -1514,7 +1524,7 @@ trx_purge_wait_for_workers_to_complete()
}
/* None of the worker threads should be doing any work. */
ut_a(purge_sys->n_submitted == purge_sys->n_completed);
ut_a(purge_sys.n_submitted == purge_sys.n_completed);
/* There should be no outstanding tasks as long
as the worker threads are active. */
......@@ -1539,11 +1549,11 @@ trx_purge(
srv_dml_needed_delay = trx_purge_dml_delay();
/* The number of tasks submitted should be completed. */
ut_a(purge_sys->n_submitted == purge_sys->n_completed);
ut_a(purge_sys.n_submitted == purge_sys.n_completed);
rw_lock_x_lock(&purge_sys->latch);
rw_lock_x_lock(&purge_sys.latch);
trx_sys.clone_oldest_view();
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
#ifdef UNIV_DEBUG
if (srv_purge_view_update_only_debug) {
......@@ -1561,39 +1571,38 @@ trx_purge(
/* Submit the tasks to the work queue. */
for (i = 0; i < n_purge_threads - 1; ++i) {
thr = que_fork_scheduler_round_robin(
purge_sys->query, thr);
purge_sys.query, thr);
ut_a(thr != NULL);
srv_que_task_enqueue_low(thr);
}
thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
ut_a(thr != NULL);
purge_sys->n_submitted += n_purge_threads - 1;
purge_sys.n_submitted += n_purge_threads - 1;
goto run_synchronously;
/* Do it synchronously. */
} else {
thr = que_fork_scheduler_round_robin(purge_sys->query, NULL);
thr = que_fork_scheduler_round_robin(purge_sys.query, NULL);
ut_ad(thr);
run_synchronously:
++purge_sys->n_submitted;
++purge_sys.n_submitted;
que_run_threads(thr);
my_atomic_addlint(
&purge_sys->n_completed, 1);
my_atomic_addlint(&purge_sys.n_completed, 1);
if (n_purge_threads > 1) {
trx_purge_wait_for_workers_to_complete();
}
}
ut_a(purge_sys->n_submitted == purge_sys->n_completed);
ut_a(purge_sys.n_submitted == purge_sys.n_completed);
if (truncate) {
trx_purge_truncate_history();
......@@ -1614,11 +1623,11 @@ trx_purge_state(void)
{
purge_state_t state;
rw_lock_x_lock(&purge_sys->latch);
rw_lock_x_lock(&purge_sys.latch);
state = purge_sys->state;
state = purge_sys.state;
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
return(state);
}
......@@ -1629,9 +1638,9 @@ void
trx_purge_stop(void)
/*================*/
{
rw_lock_x_lock(&purge_sys->latch);
rw_lock_x_lock(&purge_sys.latch);
switch (purge_sys->state) {
switch (purge_sys.state) {
case PURGE_STATE_INIT:
case PURGE_STATE_DISABLED:
ut_error;
......@@ -1640,37 +1649,37 @@ trx_purge_stop(void)
FLUSH TABLES FOR EXPORT. */
ut_ad(!srv_undo_sources);
unlock:
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
break;
case PURGE_STATE_STOP:
ut_ad(srv_n_purge_threads > 0);
++purge_sys->n_stop;
purge_sys->state = PURGE_STATE_STOP;
if (!purge_sys->running) {
++purge_sys.n_stop;
purge_sys.state = PURGE_STATE_STOP;
if (!purge_sys.running) {
goto unlock;
}
ib::info() << "Waiting for purge to stop";
do {
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
os_thread_sleep(10000);
rw_lock_x_lock(&purge_sys->latch);
} while (purge_sys->running);
rw_lock_x_lock(&purge_sys.latch);
} while (purge_sys.running);
goto unlock;
case PURGE_STATE_RUN:
ut_ad(srv_n_purge_threads > 0);
++purge_sys->n_stop;
++purge_sys.n_stop;
ib::info() << "Stopping purge";
/* We need to wakeup the purge thread in case it is suspended,
so that it can acknowledge the state change. */
const int64_t sig_count = os_event_reset(purge_sys->event);
purge_sys->state = PURGE_STATE_STOP;
const int64_t sig_count = os_event_reset(purge_sys.event);
purge_sys.state = PURGE_STATE_STOP;
srv_purge_wakeup();
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
/* Wait for purge coordinator to signal that it
is suspended. */
os_event_wait_low(purge_sys->event, sig_count);
os_event_wait_low(purge_sys.event, sig_count);
}
MONITOR_INC_VALUE(MONITOR_PURGE_STOP_COUNT, 1);
......@@ -1682,9 +1691,9 @@ void
trx_purge_run(void)
/*===============*/
{
rw_lock_x_lock(&purge_sys->latch);
rw_lock_x_lock(&purge_sys.latch);
switch (purge_sys->state) {
switch (purge_sys.state) {
case PURGE_STATE_EXIT:
/* Shutdown must have been initiated during
FLUSH TABLES FOR EXPORT. */
......@@ -1695,21 +1704,21 @@ trx_purge_run(void)
ut_error;
case PURGE_STATE_RUN:
ut_a(!purge_sys->n_stop);
ut_a(!purge_sys.n_stop);
break;
case PURGE_STATE_STOP:
ut_a(purge_sys->n_stop);
if (--purge_sys->n_stop == 0) {
ut_a(purge_sys.n_stop);
if (--purge_sys.n_stop == 0) {
ib::info() << "Resuming purge";
purge_sys->state = PURGE_STATE_RUN;
purge_sys.state = PURGE_STATE_RUN;
}
MONITOR_INC_VALUE(MONITOR_PURGE_RESUME_COUNT, 1);
}
rw_lock_x_unlock(&purge_sys->latch);
rw_lock_x_unlock(&purge_sys.latch);
srv_purge_wakeup();
}
......@@ -2204,14 +2204,14 @@ trx_undo_get_undo_rec(
{
bool missing_history;
rw_lock_s_lock(&purge_sys->latch);
rw_lock_s_lock(&purge_sys.latch);
missing_history = purge_sys->view.changes_visible(trx_id, name);
missing_history = purge_sys.view.changes_visible(trx_id, name);
if (!missing_history) {
*undo_rec = trx_undo_get_undo_rec_low(roll_ptr, is_temp, heap);
}
rw_lock_s_unlock(&purge_sys->latch);
rw_lock_s_unlock(&purge_sys.latch);
return(missing_history);
}
......@@ -2273,7 +2273,7 @@ trx_undo_prev_version_build(
bool dummy_extern;
byte* buf;
ut_ad(!rw_lock_own(&purge_sys->latch, RW_LOCK_S));
ut_ad(!rw_lock_own(&purge_sys.latch, RW_LOCK_S));
ut_ad(mtr_memo_contains_page_flagged(index_mtr, index_rec,
MTR_MEMO_PAGE_S_FIX
| MTR_MEMO_PAGE_X_FIX));
......@@ -2323,12 +2323,12 @@ trx_undo_prev_version_build(
&info_bits);
/* (a) If a clustered index record version is such that the
trx id stamp in it is bigger than purge_sys->view, then the
trx id stamp in it is bigger than purge_sys.view, then the
BLOBs in that version are known to exist (the purge has not
progressed that far);
(b) if the version is the first version such that trx id in it
is less than purge_sys->view, and it is not delete-marked,
is less than purge_sys.view, and it is not delete-marked,
then the BLOBs in that version are known to exist (the purge
cannot have purged the BLOBs referenced by that version
yet).
......@@ -2367,19 +2367,19 @@ trx_undo_prev_version_build(
the BLOB. */
/* the row_upd_changes_disowned_external(update) call could be
omitted, but the synchronization on purge_sys->latch is likely
omitted, but the synchronization on purge_sys.latch is likely
more expensive. */
if ((update->info_bits & REC_INFO_DELETED_FLAG)
&& row_upd_changes_disowned_external(update)) {
bool missing_extern;
rw_lock_s_lock(&purge_sys->latch);
rw_lock_s_lock(&purge_sys.latch);
missing_extern = purge_sys->view.changes_visible(
missing_extern = purge_sys.view.changes_visible(
trx_id, index->table->name);
rw_lock_s_unlock(&purge_sys->latch);
rw_lock_s_unlock(&purge_sys.latch);
if (missing_extern) {
/* treat as a fresh insert, not to
......
......@@ -483,7 +483,7 @@ trx_rseg_mem_restore(
/* There is no need to cover this operation by the purge
mutex because we are still bootstrapping. */
purge_sys->purge_queue.push(*rseg);
purge_sys.purge_queue.push(*rseg);
}
}
}
......
......@@ -883,7 +883,7 @@ trx_lists_init_at_db_start()
{
ut_a(srv_is_being_started);
ut_ad(!srv_was_started);
ut_ad(!purge_sys);
ut_ad(!purge_sys.is_initialised());
if (srv_operation == SRV_OPERATION_RESTORE) {
/* mariabackup --prepare only deals with
......@@ -893,12 +893,11 @@ trx_lists_init_at_db_start()
return;
}
purge_sys = UT_NEW_NOKEY(purge_sys_t());
if (srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN) {
return;
}
purge_sys.create();
trx_rseg_array_init();
/* Look from the rollback segments if there exist undo logs for
......@@ -1219,7 +1218,7 @@ trx_serialise(trx_t* trx)
ut_ad(mutex_own(&rseg->mutex));
if (rseg->last_page_no == FIL_NULL) {
mutex_enter(&purge_sys->pq_mutex);
mutex_enter(&purge_sys.pq_mutex);
}
trx_sys.assign_new_trx_no(trx);
......@@ -1229,8 +1228,8 @@ trx_serialise(trx_t* trx)
already in the rollback segment. User threads only
produce events when a rollback segment is empty. */
if (rseg->last_page_no == FIL_NULL) {
purge_sys->purge_queue.push(TrxUndoRsegs(trx->no, *rseg));
mutex_exit(&purge_sys->pq_mutex);
purge_sys.purge_queue.push(TrxUndoRsegs(trx->no, *rseg));
mutex_exit(&purge_sys.pq_mutex);
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment