Commit 14178398 authored by Marko Mäkelä's avatar Marko Mäkelä

InnoDB purge_sys cleanup.

TrxUndoRsegsIterator::m_purge_sys: Remove. There is only one purge_sys.

purge_sys_t: Renamed from trx_purge_t. Define a constructor and
destructor. Allocate rseg_iter, purge_queue inline.

purge_sys->trx: Remove. Use purge_sys->sess->trx instead.

purge_sys->view_active: Remove. Access to purge_sys->view is always
protected by purge_sys->latch.

trx_purge_sys_create(): Replaced by purge_sys_t::purge_sys_t().

trx_purge_sys_close(): Replaced by purge_sys_t::~purge_sys_t().
parent 9928dbe5
......@@ -37,9 +37,6 @@ Created 3/26/1996 Heikki Tuuri
#include "fil0fil.h"
#include "read0types.h"
/** The global data structure coordinating a purge */
extern trx_purge_t* purge_sys;
/** A dummy undo record used as a return value when we have a whole undo log
which needs no purge */
extern trx_undo_rec_t trx_purge_dummy_rec;
......@@ -54,12 +51,6 @@ trx_purge_get_log_from_hist(
/*========================*/
fil_addr_t node_addr); /*!< in: file address of the history
list node of the log */
/** Create the global purge system data structure. */
void
trx_purge_sys_create();
/** Free the global purge system data structure. */
void
trx_purge_sys_close();
/************************************************************************
Adds the update undo log as the first log in the history list. Removes the
update undo log segment from the rseg slot if it is too big for reuse. */
......@@ -216,9 +207,9 @@ Chooses the rollback segment with the smallest trx_no. */
struct TrxUndoRsegsIterator {
/** Constructor */
TrxUndoRsegsIterator(trx_purge_t* purge_sys);
TrxUndoRsegsIterator();
/** Sets the next rseg to purge in m_purge_sys.
/** Sets the next rseg to purge in purge_sys.
@return page size of the table for which the log is.
NOTE: if rseg is NULL when this function returns this means that
there are no rollback segments to purge and then the returned page
......@@ -230,9 +221,6 @@ struct TrxUndoRsegsIterator {
TrxUndoRsegsIterator(const TrxUndoRsegsIterator&);
TrxUndoRsegsIterator& operator=(const TrxUndoRsegsIterator&);
/** The purge system pointer */
trx_purge_t* m_purge_sys;
/** The current element to process */
TrxUndoRsegs m_trx_undo_rsegs;
......@@ -506,13 +494,16 @@ namespace undo {
}; /* namespace undo */
/** The control structure used in the purge operation */
struct trx_purge_t{
class purge_sys_t
{
public:
/** Construct the purge system. */
purge_sys_t();
/** Destruct the purge system. */
~purge_sys_t();
sess_t* sess; /*!< System session running the purge
query */
trx_t* trx; /*!< System transaction running the
purge query: this trx is not in the
trx list of the trx system and it
never ends */
rw_lock_t latch; /*!< The latch protecting the purge
view. A purge operation must acquire an
x-latch here for the instant at which
......@@ -522,7 +513,7 @@ struct trx_purge_t{
protects state and running */
os_event_t event; /*!< State signal event;
os_event_set() and os_event_reset()
are protected by trx_purge_t::latch
are protected by purge_sys_t::latch
X-lock */
ulint n_stop; /*!< Counter to track number stops */
volatile bool running; /*!< true, if purge is active,
......@@ -534,7 +525,6 @@ struct trx_purge_t{
parallelized purge operation */
ReadView view; /*!< The purge will not remove undo logs
which are >= this view (purge view) */
bool view_active; /*!< true if view is active */
volatile ulint n_submitted; /*!< Count of total tasks submitted
to the task queue */
volatile ulint n_completed; /*!< Count of total tasks completed */
......@@ -557,11 +547,8 @@ struct trx_purge_t{
purged already accurately. */
#endif /* UNIV_DEBUG */
/*-----------------------------*/
ibool next_stored; /*!< TRUE if the info of the next record
to purge is stored below: if yes, then
the transaction number and the undo
number of the record are stored in
purge_trx_no and purge_undo_no above */
bool next_stored; /*!< whether rseg holds the next record
to purge */
trx_rseg_t* rseg; /*!< Rollback segment for the next undo
record to purge */
ulint page_no; /*!< Page number for the next undo
......@@ -575,11 +562,11 @@ struct trx_purge_t{
ulint hdr_offset; /*!< Header byte offset on the page */
TrxUndoRsegsIterator*
TrxUndoRsegsIterator
rseg_iter; /*!< Iterator to get the next rseg
to process */
purge_pq_t* purge_queue; /*!< Binary min-heap, ordered on
purge_pq_t purge_queue; /*!< Binary min-heap, ordered on
TrxUndoRsegs::trx_no. It is protected
by the pq_mutex */
PQMutex pq_mutex; /*!< Mutex protecting purge_queue */
......@@ -588,6 +575,9 @@ struct trx_purge_t{
for truncate. */
};
/** The global data structure coordinating a purge */
extern purge_sys_t* purge_sys;
/** Info required to purge a record */
struct trx_purge_rec_t {
trx_undo_rec_t* undo_rec; /*!< Record to purge */
......
......@@ -123,8 +123,6 @@ struct trx_sig_t;
struct trx_rseg_t;
/** Transaction undo log */
struct trx_undo_t;
/** The control structure used in the purge operation */
struct trx_purge_t;
/** Rollback command node in a query graph */
struct roll_node_t;
/** Commit command node in a query graph */
......
......@@ -1661,12 +1661,8 @@ srv_export_innodb_status(void)
#ifdef UNIV_DEBUG
rw_lock_s_lock(&purge_sys->latch);
trx_id_t up_limit_id;
trx_id_t up_limit_id = purge_sys->view.up_limit_id();;
trx_id_t done_trx_no = purge_sys->done.trx_no;
up_limit_id = purge_sys->view_active
? purge_sys->view.up_limit_id() : 0;
rw_lock_s_unlock(&purge_sys->latch);
mutex_enter(&trx_sys->mutex);
......
......@@ -2857,9 +2857,8 @@ innodb_shutdown()
trx_sys_file_format_close();
trx_sys_close();
}
if (purge_sys) {
trx_purge_sys_close();
}
UT_DELETE(purge_sys);
purge_sys = NULL;
if (buf_dblwr) {
buf_dblwr_free();
}
......
......@@ -58,7 +58,7 @@ ulong srv_max_purge_lag = 0;
ulong srv_max_purge_lag_delay = 0;
/** The global data structure coordinating a purge */
trx_purge_t* purge_sys = NULL;
purge_sys_t* purge_sys;
/** A dummy undo record used as a return value when we have a whole undo log
which needs no purge */
......@@ -73,15 +73,14 @@ bool trx_commit_disallowed = false;
const TrxUndoRsegs TrxUndoRsegsIterator::NullElement(UINT64_UNDEFINED);
/** Constructor */
TrxUndoRsegsIterator::TrxUndoRsegsIterator(trx_purge_t* purge_sys)
TrxUndoRsegsIterator::TrxUndoRsegsIterator()
:
m_purge_sys(purge_sys),
m_trx_undo_rsegs(NullElement),
m_iter(m_trx_undo_rsegs.end())
{
}
/** Sets the next rseg to purge in m_purge_sys.
/** Sets the next rseg to purge in purge_sys.
@return page size of the table for which the log is.
NOTE: if rseg is NULL when this function returns this means that
there are no rollback segments to purge and then the returned page
......@@ -89,7 +88,7 @@ size object should not be used. */
const page_size_t
TrxUndoRsegsIterator::set_next()
{
mutex_enter(&m_purge_sys->pq_mutex);
mutex_enter(&purge_sys->pq_mutex);
/* Only purge consumes events from the priority queue, user
threads only produce the events. */
......@@ -103,9 +102,9 @@ TrxUndoRsegsIterator::set_next()
number shouldn't increase. Undo increment of
expected trx_no done by caller assuming rollback
segments from given transaction are done. */
m_purge_sys->iter.trx_no = (*m_iter)->last_trx_no;
purge_sys->iter.trx_no = (*m_iter)->last_trx_no;
} else if (!m_purge_sys->purge_queue->empty()) {
} else if (!purge_sys->purge_queue.empty()) {
/* Read the next element from the queue.
Combine elements if they have same transaction number.
......@@ -115,20 +114,21 @@ TrxUndoRsegsIterator::set_next()
rollback segment for purge. */
m_trx_undo_rsegs = NullElement;
while (!m_purge_sys->purge_queue->empty()) {
purge_pq_t& purge_queue = purge_sys->purge_queue;
while (!purge_queue.empty()) {
if (m_trx_undo_rsegs.get_trx_no() == UINT64_UNDEFINED) {
m_trx_undo_rsegs =
purge_sys->purge_queue->top();
} else if (purge_sys->purge_queue->top().get_trx_no() ==
m_trx_undo_rsegs = purge_queue.top();
} else if (purge_queue.top().get_trx_no() ==
m_trx_undo_rsegs.get_trx_no()) {
m_trx_undo_rsegs.append(
purge_sys->purge_queue->top());
purge_queue.top());
} else {
break;
}
m_purge_sys->purge_queue->pop();
purge_queue.pop();
}
m_iter = m_trx_undo_rsegs.begin();
......@@ -138,156 +138,115 @@ TrxUndoRsegsIterator::set_next()
m_trx_undo_rsegs = NullElement;
m_iter = m_trx_undo_rsegs.end();
mutex_exit(&m_purge_sys->pq_mutex);
mutex_exit(&purge_sys->pq_mutex);
m_purge_sys->rseg = NULL;
purge_sys->rseg = NULL;
/* return a dummy object, not going to be used by the caller */
return(univ_page_size);
}
m_purge_sys->rseg = *m_iter++;
purge_sys->rseg = *m_iter++;
mutex_exit(&m_purge_sys->pq_mutex);
mutex_exit(&purge_sys->pq_mutex);
ut_a(m_purge_sys->rseg != NULL);
ut_a(purge_sys->rseg != NULL);
mutex_enter(&m_purge_sys->rseg->mutex);
mutex_enter(&purge_sys->rseg->mutex);
ut_a(m_purge_sys->rseg->last_page_no != FIL_NULL);
ut_ad(m_purge_sys->rseg->last_trx_no == m_trx_undo_rsegs.get_trx_no());
ut_a(purge_sys->rseg->last_page_no != FIL_NULL);
ut_ad(purge_sys->rseg->last_trx_no == m_trx_undo_rsegs.get_trx_no());
/* We assume in purge of externally stored fields that
space id is in the range of UNDO tablespace space ids
unless space is system tablespace */
ut_a(m_purge_sys->rseg->space <= srv_undo_tablespaces_open
ut_a(purge_sys->rseg->space <= srv_undo_tablespaces_open
|| is_system_tablespace(
m_purge_sys->rseg->space));
purge_sys->rseg->space));
const page_size_t page_size(m_purge_sys->rseg->page_size);
const page_size_t page_size(purge_sys->rseg->page_size);
ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no);
m_purge_sys->iter.trx_no = m_purge_sys->rseg->last_trx_no;
m_purge_sys->hdr_offset = m_purge_sys->rseg->last_offset;
m_purge_sys->hdr_page_no = m_purge_sys->rseg->last_page_no;
purge_sys->iter.trx_no = purge_sys->rseg->last_trx_no;
purge_sys->hdr_offset = purge_sys->rseg->last_offset;
purge_sys->hdr_page_no = purge_sys->rseg->last_page_no;
mutex_exit(&m_purge_sys->rseg->mutex);
mutex_exit(&purge_sys->rseg->mutex);
return(page_size);
}
/****************************************************************//**
Builds a purge 'query' graph. The actual purge is performed by executing
/** Build a purge 'query' graph. The actual purge is performed by executing
this query graph.
@param[in,out] sess the purge session
@return own: the query graph */
static
que_t*
trx_purge_graph_build(
/*==================*/
trx_t* trx, /*!< in: transaction */
ulint n_purge_threads) /*!< in: number of purge
threads */
trx_purge_graph_build(sess_t* sess)
{
ulint i;
mem_heap_t* heap;
que_fork_t* fork;
ut_a(srv_n_purge_threads > 0);
/* A purge transaction is not a real transaction, we use a transaction
here only because the query threads code requires it. It is otherwise
quite unnecessary. We should get rid of it eventually. */
trx_t* trx = sess->trx;
heap = mem_heap_create(512);
fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
fork->trx = trx;
ut_ad(trx->sess == sess);
for (i = 0; i < n_purge_threads; ++i) {
que_thr_t* thr;
trx->id = 0;
trx->start_time = ut_time();
trx->state = TRX_STATE_ACTIVE;
trx->op_info = "purge trx";
thr = que_thr_create(fork, heap, NULL);
mem_heap_t* heap = mem_heap_create(512);
que_fork_t* fork = que_fork_create(
NULL, NULL, QUE_FORK_PURGE, heap);
fork->trx = trx;
for (ulint i = 0; i < srv_n_purge_threads; ++i) {
que_thr_t* thr = que_thr_create(fork, heap, NULL);
thr->child = row_purge_node_create(thr, heap);
}
return(fork);
}
/** Create the global purge system data structure. */
void
trx_purge_sys_create()
{
purge_sys = static_cast<trx_purge_t*>(
ut_zalloc_nokey(sizeof(*purge_sys)));
purge_sys->state = PURGE_STATE_INIT;
purge_sys->event = os_event_create(0);
new (&purge_sys->iter) purge_iter_t;
new (&purge_sys->limit) purge_iter_t;
new (&purge_sys->undo_trunc) undo::Truncate;
/** Construct the purge system. */
purge_sys_t::purge_sys_t()
: sess(sess_open()), latch(), event(os_event_create(0)),
n_stop(0), running(false), state(PURGE_STATE_INIT),
query(trx_purge_graph_build(sess)),
view(), n_submitted(0), n_completed(0),
iter(), limit(),
#ifdef UNIV_DEBUG
new (&purge_sys->done) purge_iter_t;
done(),
#endif /* UNIV_DEBUG */
purge_sys->purge_queue = UT_NEW_NOKEY(purge_pq_t());
ut_a(purge_sys->purge_queue != NULL);
if (srv_force_recovery < SRV_FORCE_NO_UNDO_LOG_SCAN) {
trx_rseg_array_init();
}
rw_lock_create(trx_purge_latch_key,
&purge_sys->latch, SYNC_PURGE_LATCH);
mutex_create(LATCH_ID_PURGE_SYS_PQ, &purge_sys->pq_mutex);
ut_a(srv_n_purge_threads > 0);
purge_sys->sess = sess_open();
purge_sys->trx = purge_sys->sess->trx;
ut_a(purge_sys->trx->sess == purge_sys->sess);
/* A purge transaction is not a real transaction, we use a transaction
here only because the query threads code requires it. It is otherwise
quite unnecessary. We should get rid of it eventually. */
purge_sys->trx->id = 0;
purge_sys->trx->start_time = ut_time();
purge_sys->trx->state = TRX_STATE_ACTIVE;
purge_sys->trx->op_info = "purge trx";
purge_sys->query = trx_purge_graph_build(
purge_sys->trx, srv_n_purge_threads);
new(&purge_sys->view) ReadView();
purge_sys->rseg_iter = UT_NEW_NOKEY(TrxUndoRsegsIterator(purge_sys));
next_stored(false), rseg(NULL),
page_no(0), offset(0), hdr_page_no(0), hdr_offset(0),
rseg_iter(), purge_queue(), pq_mutex(), undo_trunc()
{
ut_ad(!purge_sys);
rw_lock_create(trx_purge_latch_key, &latch, SYNC_PURGE_LATCH);
mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex);
}
/** Free the global purge system data structure. */
void
trx_purge_sys_close()
/** Destruct the purge system. */
purge_sys_t::~purge_sys_t()
{
que_graph_free(purge_sys->query);
ut_a(purge_sys->trx->id == 0);
ut_a(purge_sys->sess->trx == purge_sys->trx);
purge_sys->trx->state = TRX_STATE_NOT_STARTED;
sess_close(purge_sys->sess);
purge_sys->view.close();
purge_sys->view.~ReadView();
rw_lock_free(&purge_sys->latch);
mutex_free(&purge_sys->pq_mutex);
UT_DELETE(purge_sys->purge_queue);
os_event_destroy(purge_sys->event);
UT_DELETE(purge_sys->rseg_iter);
ut_free(purge_sys);
purge_sys = NULL;
ut_ad(this == purge_sys);
que_graph_free(query);
ut_a(sess->trx->id == 0);
sess->trx->state = TRX_STATE_NOT_STARTED;
sess_close(sess);
view.close();
rw_lock_free(&latch);
/* rw_lock_free() already called latch.~rw_lock_t(); tame the
debug assertions when the destructor will be called once more. */
ut_ad(latch.magic_n == 0);
ut_d(latch.magic_n = RW_LOCK_MAGIC_N);
mutex_free(&pq_mutex);
os_event_destroy(event);
}
/*================ UNDO LOG HISTORY LIST =============================*/
......@@ -950,11 +909,11 @@ trx_purge_cleanse_purge_queue(
/* Remove rseg instances that are in the purge queue before we start
truncate of corresponding UNDO truncate. */
while (!purge_sys->purge_queue->empty()) {
purge_elem_list.push_back(purge_sys->purge_queue->top());
purge_sys->purge_queue->pop();
while (!purge_sys->purge_queue.empty()) {
purge_elem_list.push_back(purge_sys->purge_queue.top());
purge_sys->purge_queue.pop();
}
ut_ad(purge_sys->purge_queue->empty());
ut_ad(purge_sys->purge_queue.empty());
for (purge_elem_list_t::iterator it = purge_elem_list.begin();
it != purge_elem_list.end();
......@@ -971,12 +930,11 @@ trx_purge_cleanse_purge_queue(
}
}
const ulint size = it->size();
if (size != 0) {
if (it->size()) {
/* size != 0 suggest that there exist other rsegs that
needs processing so add this element to purge queue.
Note: Other rseg could be non-redo rsegs. */
purge_sys->purge_queue->push(*it);
purge_sys->purge_queue.push(*it);
}
}
mutex_exit(&purge_sys->pq_mutex);
......@@ -1130,7 +1088,7 @@ trx_purge_initiate_truncate(
purge_batch_size that can force the purge loop to exit before
all the records are purged and in this case purge_sys->rseg
could point to a valid rseg waiting for next purge cycle. */
purge_sys->next_stored = FALSE;
purge_sys->next_stored = false;
purge_sys->rseg = NULL;
}
......@@ -1226,7 +1184,7 @@ trx_purge_rseg_get_next_history_log(
purge_sys->iter.trx_no = rseg->last_trx_no + 1;
purge_sys->iter.undo_no = 0;
purge_sys->iter.undo_rseg_space = ULINT_UNDEFINED;
purge_sys->next_stored = FALSE;
purge_sys->next_stored = false;
mtr_start(&mtr);
......@@ -1310,7 +1268,7 @@ trx_purge_rseg_get_next_history_log(
mutex_enter(&purge_sys->pq_mutex);
purge_sys->purge_queue->push(elem);
purge_sys->purge_queue.push(elem);
mutex_exit(&purge_sys->pq_mutex);
......@@ -1318,12 +1276,10 @@ trx_purge_rseg_get_next_history_log(
}
/** Position the purge sys "iterator" on the undo record to use for purging.
@param[in,out] purge_sys purge instance
@param[in] page_size page size */
static
void
trx_purge_read_undo_rec(
trx_purge_t* purge_sys,
const page_size_t& page_size)
{
ulint offset;
......@@ -1369,7 +1325,7 @@ trx_purge_read_undo_rec(
purge_sys->iter.undo_no = undo_no;
purge_sys->iter.undo_rseg_space = undo_rseg_space;
purge_sys->next_stored = TRUE;
purge_sys->next_stored = true;
}
/***********************************************************************//**
......@@ -1382,12 +1338,12 @@ void
trx_purge_choose_next_log(void)
/*===========================*/
{
ut_ad(purge_sys->next_stored == FALSE);
ut_ad(!purge_sys->next_stored);
const page_size_t& page_size = purge_sys->rseg_iter->set_next();
const page_size_t& page_size = purge_sys->rseg_iter.set_next();
if (purge_sys->rseg != NULL) {
trx_purge_read_undo_rec(purge_sys, page_size);
trx_purge_read_undo_rec(page_size);
} else {
/* There is nothing to do yet. */
os_thread_yield();
......@@ -1573,7 +1529,7 @@ ulint
trx_purge_attach_undo_recs(
/*=======================*/
ulint n_purge_threads,/*!< in: number of purge threads */
trx_purge_t* purge_sys, /*!< in/out: purge instance */
purge_sys_t* purge_sys, /*!< in/out: purge instance */
ulint batch_size) /*!< in: no. of pages to purge */
{
que_thr_t* thr;
......@@ -1720,7 +1676,7 @@ static
void
trx_purge_wait_for_workers_to_complete(
/*===================================*/
trx_purge_t* purge_sys) /*!< in: purge instance */
purge_sys_t* purge_sys) /*!< in: purge instance */
{
ulint n_submitted = purge_sys->n_submitted;
......@@ -1781,13 +1737,7 @@ trx_purge(
ut_a(purge_sys->n_submitted == purge_sys->n_completed);
rw_lock_x_lock(&purge_sys->latch);
purge_sys->view_active = false;
trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
purge_sys->view_active = true;
rw_lock_x_unlock(&purge_sys->latch);
#ifdef UNIV_DEBUG
......
......@@ -167,7 +167,6 @@ array in the trx system object.
@param[in] space space where the segment is placed
@param[in] page_no page number of the segment header
@param[in] page_size page size
@param[in,out] purge_queue rseg queue
@param[out] rseg_array add rseg reference to this central array
@param[in,out] mtr mini-transaction
@return own: rollback segment object */
......@@ -178,7 +177,6 @@ trx_rseg_mem_create(
ulint space,
ulint page_no,
const page_size_t& page_size,
purge_pq_t* purge_queue,
trx_rseg_t** rseg_array,
mtr_t* mtr)
{
......@@ -253,7 +251,7 @@ trx_rseg_mem_create(
/* There is no need to cover this operation by the purge
mutex because we are still bootstrapping. */
purge_queue->push(elem);
purge_sys->purge_queue.push(elem);
}
} else {
rseg->last_page_no = FIL_NULL;
......@@ -296,7 +294,7 @@ trx_rseg_array_init()
rseg = trx_rseg_mem_create(
i, space, page_no, page_size,
purge_sys->purge_queue, rseg_array, &mtr);
rseg_array, &mtr);
ut_a(rseg->id == i);
} else {
......@@ -365,7 +363,7 @@ trx_rseg_create(
rseg = trx_rseg_mem_create(
slot_no, space_id, page_no, page_size,
purge_sys->purge_queue, rseg_array, &mtr);
rseg_array, &mtr);
}
mtr_commit(&mtr);
......
......@@ -647,8 +647,6 @@ trx_sys_init_at_db_start()
trx_sys_mutex_exit();
trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
purge_sys->view_active = true;
}
/*****************************************************************//**
......
......@@ -1015,7 +1015,11 @@ trx_lists_init_at_db_start()
ut_ad(!srv_was_started);
ut_ad(!purge_sys);
trx_purge_sys_create();
purge_sys = UT_NEW_NOKEY(purge_sys_t());
if (srv_force_recovery < SRV_FORCE_NO_UNDO_LOG_SCAN) {
trx_rseg_array_init();
}
/* Look from the rollback segments if there exist undo logs for
transactions. */
......@@ -1553,7 +1557,7 @@ trx_serialisation_number_get(
trx_sys_mutex_exit();
purge_sys->purge_queue->push(elem);
purge_sys->purge_queue.push(elem);
mutex_exit(&purge_sys->pq_mutex);
} else {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment