Commit 2027c482 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-32050: Hold exclusive purge_sys.rseg->latch longer

Let the purge_coordinator_task acquire purge_sys.rseg->latch
less frequently and hold it longer at a time. This may throttle
concurrent DML and prevent purge lag a little.

Remove an unnecessary std::this_thread::yield(), because the
trx_purge_attach_undo_recs() is supposed to terminate the scan
when running out of undo log records. Ultimately, this will
result in purge_coordinator_state::do_purge() and
purge_coordinator_callback() returning control to the thread pool.

Reviewed by: Vladislav Lesin and Vladislav Vaintroub
parent 44689eb7
...@@ -109,7 +109,8 @@ struct TrxUndoRsegsIterator { ...@@ -109,7 +109,8 @@ struct TrxUndoRsegsIterator {
TrxUndoRsegsIterator(); TrxUndoRsegsIterator();
/** Sets the next rseg to purge in purge_sys. /** Sets the next rseg to purge in purge_sys.
Executed in the purge coordinator thread. Executed in the purge coordinator thread.
@return whether anything is to be purged */ @retval false when nothing is to be purged
@retval true when purge_sys.rseg->latch was locked */
inline bool set_next(); inline bool set_next();
private: private:
......
...@@ -66,9 +66,11 @@ TrxUndoRsegsIterator::TrxUndoRsegsIterator() ...@@ -66,9 +66,11 @@ TrxUndoRsegsIterator::TrxUndoRsegsIterator()
/** Sets the next rseg to purge in purge_sys. /** Sets the next rseg to purge in purge_sys.
Executed in the purge coordinator thread. Executed in the purge coordinator thread.
@return whether anything is to be purged */ @retval false when nothing is to be purged
TRANSACTIONAL_INLINE inline bool TrxUndoRsegsIterator::set_next() @retval true when purge_sys.rseg->latch was locked */
inline bool TrxUndoRsegsIterator::set_next()
{ {
ut_ad(!purge_sys.next_stored);
mysql_mutex_lock(&purge_sys.pq_mutex); mysql_mutex_lock(&purge_sys.pq_mutex);
/* Only purge consumes events from the priority queue, user /* Only purge consumes events from the priority queue, user
...@@ -106,23 +108,10 @@ TRANSACTIONAL_INLINE inline bool TrxUndoRsegsIterator::set_next() ...@@ -106,23 +108,10 @@ TRANSACTIONAL_INLINE inline bool TrxUndoRsegsIterator::set_next()
ut_ad(purge_sys.rseg->space->id == TRX_SYS_SPACE ut_ad(purge_sys.rseg->space->id == TRX_SYS_SPACE
|| srv_is_undo_tablespace(purge_sys.rseg->space->id)); || srv_is_undo_tablespace(purge_sys.rseg->space->id));
trx_id_t last_trx_no; purge_sys.rseg->latch.wr_lock(SRW_LOCK_CALL);
{ trx_id_t last_trx_no = purge_sys.rseg->last_trx_no();
#ifdef SUX_LOCK_GENERIC purge_sys.hdr_offset = purge_sys.rseg->last_offset();
purge_sys.rseg->latch.rd_lock(SRW_LOCK_CALL); purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
#else
transactional_shared_lock_guard<srw_spin_lock> rg
{purge_sys.rseg->latch};
#endif
last_trx_no = purge_sys.rseg->last_trx_no();
purge_sys.hdr_offset = purge_sys.rseg->last_offset();
purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
#ifdef SUX_LOCK_GENERIC
purge_sys.rseg->latch.rd_unlock();
#endif
}
/* Only the purge_coordinator_task will access this object /* Only the purge_coordinator_task will access this object
purge_sys.rseg_iter, or any of purge_sys.hdr_page_no, purge_sys.rseg_iter, or any of purge_sys.hdr_page_no,
...@@ -839,18 +828,17 @@ TRANSACTIONAL_TARGET void trx_purge_truncate_history() ...@@ -839,18 +828,17 @@ TRANSACTIONAL_TARGET void trx_purge_truncate_history()
/***********************************************************************//** /***********************************************************************//**
Updates the last not yet purged history log info in rseg when we have purged Updates the last not yet purged history log info in rseg when we have purged
a whole undo log. Advances also purge_sys.purge_trx_no past the purged log. */ a whole undo log. Advances also purge_sys.purge_trx_no past the purged log.
static void trx_purge_rseg_get_next_history_log(
ulint* n_pages_handled)/*!< in/out: number of UNDO pages @param n_pages_handled number of UNDO pages handled */
handled */ static void trx_purge_rseg_get_next_history_log(ulint *n_pages_handled)
{ {
fil_addr_t prev_log_addr; fil_addr_t prev_log_addr;
mtr_t mtr; mtr_t mtr;
mtr.start(); mtr.start();
purge_sys.rseg->latch.wr_lock(SRW_LOCK_CALL); ut_ad(purge_sys.rseg->latch.is_write_locked());
ut_a(purge_sys.rseg->last_page_no != FIL_NULL); ut_a(purge_sys.rseg->last_page_no != FIL_NULL);
purge_sys.tail.trx_no= purge_sys.rseg->last_trx_no() + 1; purge_sys.tail.trx_no= purge_sys.rseg->last_trx_no() + 1;
...@@ -874,50 +862,44 @@ static void trx_purge_rseg_get_next_history_log( ...@@ -874,50 +862,44 @@ static void trx_purge_rseg_get_next_history_log(
else else
prev_log_addr.page= FIL_NULL; prev_log_addr.page= FIL_NULL;
const bool empty= prev_log_addr.page == FIL_NULL;
if (empty)
/* No logs left in the history list */
purge_sys.rseg->last_page_no= FIL_NULL;
purge_sys.rseg->latch.wr_unlock();
mtr.commit(); mtr.commit();
if (empty) if (prev_log_addr.page == FIL_NULL)
return; purge_sys.rseg->last_page_no= FIL_NULL;
else
/* Read the previous log header. */
mtr.start();
trx_id_t trx_no= 0;
if (const buf_block_t* undo_page=
buf_page_get_gen(page_id_t(purge_sys.rseg->space->id, prev_log_addr.page),
0, RW_S_LATCH, nullptr, BUF_GET_POSSIBLY_FREED, &mtr))
{ {
const byte *log_hdr= undo_page->page.frame + prev_log_addr.boffset; /* Read the previous log header. */
mtr.start();
trx_no= mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO); trx_id_t trx_no= 0;
ut_ad(mach_read_from_2(log_hdr + TRX_UNDO_NEEDS_PURGE) <= 1); if (const buf_block_t* undo_page=
} buf_page_get_gen(page_id_t(purge_sys.rseg->space->id,
prev_log_addr.page),
0, RW_S_LATCH, nullptr, BUF_GET_POSSIBLY_FREED, &mtr))
{
const byte *log_hdr= undo_page->page.frame + prev_log_addr.boffset;
trx_no= mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
ut_ad(mach_read_from_2(log_hdr + TRX_UNDO_NEEDS_PURGE) <= 1);
}
mtr.commit(); mtr.commit();
if (UNIV_UNLIKELY(!trx_no)) if (UNIV_LIKELY(trx_no != 0))
return; {
purge_sys.rseg->last_page_no= prev_log_addr.page;
purge_sys.rseg->set_last_commit(prev_log_addr.boffset, trx_no);
purge_sys.rseg->latch.wr_lock(SRW_LOCK_CALL); /* Purge can also produce events, however these are already
purge_sys.rseg->last_page_no= prev_log_addr.page; ordered in the rollback segment and any user generated event
purge_sys.rseg->set_last_commit(prev_log_addr.boffset, trx_no); will be greater than the events that Purge produces. ie. Purge
can never produce events from an empty rollback segment. */
/* Purge can also produce events, however these are already ordered mysql_mutex_lock(&purge_sys.pq_mutex);
in the rollback segment and any user generated event will be greater purge_sys.purge_queue.push(*purge_sys.rseg);
than the events that Purge produces. ie. Purge can never produce mysql_mutex_unlock(&purge_sys.pq_mutex);
events from an empty rollback segment. */ }
}
mysql_mutex_lock(&purge_sys.pq_mutex);
purge_sys.purge_queue.push(*purge_sys.rseg);
mysql_mutex_unlock(&purge_sys.pq_mutex);
purge_sys.rseg->latch.wr_unlock(); purge_sys.rseg->latch.wr_unlock();
} }
...@@ -965,16 +947,16 @@ static void trx_purge_read_undo_rec() ...@@ -965,16 +947,16 @@ static void trx_purge_read_undo_rec()
Chooses the next undo log to purge and updates the info in purge_sys. This Chooses the next undo log to purge and updates the info in purge_sys. This
function is used to initialize purge_sys when the next record to purge is function is used to initialize purge_sys when the next record to purge is
not known, and also to update the purge system info on the next record when not known, and also to update the purge system info on the next record when
purge has handled the whole undo log for a transaction. */ purge has handled the whole undo log for a transaction.
TRANSACTIONAL_TARGET static void trx_purge_choose_next_log() @retval false when nothing is to be purged
@retval true when purge_sys.rseg->latch was locked */
static bool trx_purge_choose_next_log()
{ {
ut_ad(!purge_sys.next_stored);
if (purge_sys.rseg_iter.set_next()) { if (purge_sys.rseg_iter.set_next()) {
trx_purge_read_undo_rec(); trx_purge_read_undo_rec();
return true;
} else { } else {
/* There is nothing to do yet. */ return false;
std::this_thread::yield();
} }
} }
...@@ -995,9 +977,11 @@ trx_purge_get_next_rec( ...@@ -995,9 +977,11 @@ trx_purge_get_next_rec(
ut_ad(purge_sys.next_stored); ut_ad(purge_sys.next_stored);
ut_ad(purge_sys.tail.trx_no < purge_sys.low_limit_no()); ut_ad(purge_sys.tail.trx_no < purge_sys.low_limit_no());
ut_ad(purge_sys.rseg->latch.is_write_locked());
const page_id_t page_id{purge_sys.rseg->space->id, purge_sys.page_no}; const page_id_t page_id{purge_sys.rseg->space->id, purge_sys.page_no};
const uint16_t offset = purge_sys.offset; const uint16_t offset = purge_sys.offset;
bool locked = true;
if (offset == 0) { if (offset == 0) {
/* It is the dummy undo log record, which means that there is /* It is the dummy undo log record, which means that there is
...@@ -1007,19 +991,24 @@ trx_purge_get_next_rec( ...@@ -1007,19 +991,24 @@ trx_purge_get_next_rec(
/* Look for the next undo log and record to purge */ /* Look for the next undo log and record to purge */
trx_purge_choose_next_log(); if (trx_purge_choose_next_log()) {
purge_sys.rseg->latch.wr_unlock();
}
return reinterpret_cast<trx_undo_rec_t*>(-1); return reinterpret_cast<trx_undo_rec_t*>(-1);
} }
mtr.start(); mtr.start();
trx_undo_rec_t *rec_copy = nullptr;
const buf_block_t* undo_page const buf_block_t* undo_page
= buf_page_get_gen(page_id, 0, RW_S_LATCH, nullptr, = buf_page_get_gen(page_id, 0, RW_S_LATCH, nullptr,
BUF_GET_POSSIBLY_FREED, &mtr); BUF_GET_POSSIBLY_FREED, &mtr);
if (UNIV_UNLIKELY(!undo_page)) { if (UNIV_UNLIKELY(!undo_page)) {
corrupted: func_exit:
if (locked) {
purge_sys.rseg->latch.wr_unlock();
}
mtr.commit(); mtr.commit();
return nullptr; return rec_copy;
} }
const buf_block_t* rec2_page = undo_page; const buf_block_t* rec2_page = undo_page;
...@@ -1040,7 +1029,7 @@ trx_purge_get_next_rec( ...@@ -1040,7 +1029,7 @@ trx_purge_get_next_rec(
/* Look for the next undo log and record to purge */ /* Look for the next undo log and record to purge */
trx_purge_choose_next_log(); locked = trx_purge_choose_next_log();
mtr_start(&mtr); mtr_start(&mtr);
...@@ -1048,7 +1037,7 @@ trx_purge_get_next_rec( ...@@ -1048,7 +1037,7 @@ trx_purge_get_next_rec(
nullptr, BUF_GET_POSSIBLY_FREED, nullptr, BUF_GET_POSSIBLY_FREED,
&mtr); &mtr);
if (UNIV_UNLIKELY(!undo_page)) { if (UNIV_UNLIKELY(!undo_page)) {
goto corrupted; goto func_exit;
} }
} else { } else {
purge_sys.offset = page_offset(rec2); purge_sys.offset = page_offset(rec2);
...@@ -1061,11 +1050,8 @@ trx_purge_get_next_rec( ...@@ -1061,11 +1050,8 @@ trx_purge_get_next_rec(
} }
} }
trx_undo_rec_t* rec_copy = trx_undo_rec_copy(undo_page->page.frame rec_copy = trx_undo_rec_copy(undo_page->page.frame + offset, heap);
+ offset, heap); goto func_exit;
mtr.commit();
return rec_copy;
} }
/********************************************************************//** /********************************************************************//**
...@@ -1083,34 +1069,35 @@ trx_purge_fetch_next_rec( ...@@ -1083,34 +1069,35 @@ trx_purge_fetch_next_rec(
handled */ handled */
mem_heap_t* heap) /*!< in: memory heap where copied */ mem_heap_t* heap) /*!< in: memory heap where copied */
{ {
if (!purge_sys.next_stored) { if (!purge_sys.next_stored)
trx_purge_choose_next_log(); {
bool locked= trx_purge_choose_next_log();
if (!purge_sys.next_stored) { ut_ad(locked == purge_sys.next_stored);
DBUG_PRINT("ib_purge", if (!locked)
("no logs left in the history list")); return nullptr;
return nullptr; if (purge_sys.tail.trx_no >= purge_sys.low_limit_no())
} {
} purge_sys.rseg->latch.wr_unlock();
return nullptr;
if (purge_sys.tail.trx_no >= purge_sys.low_limit_no()) { }
return nullptr; /* row_purge_record_func() will later set ROLL_PTR_INSERT_FLAG for
} TRX_UNDO_INSERT_REC */
*roll_ptr= trx_undo_build_roll_ptr(false,
/* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n", trx_sys.rseg_id(purge_sys.rseg, true),
pthread_self(), iter->trx_no, iter->undo_no); */ purge_sys.page_no, purge_sys.offset);
}
*roll_ptr = trx_undo_build_roll_ptr( else if (purge_sys.tail.trx_no >= purge_sys.low_limit_no())
/* row_purge_record_func() will later set return nullptr;
ROLL_PTR_INSERT_FLAG for TRX_UNDO_INSERT_REC */ else
false, {
trx_sys.rseg_id(purge_sys.rseg, true), *roll_ptr= trx_undo_build_roll_ptr(false,
purge_sys.page_no, purge_sys.offset); trx_sys.rseg_id(purge_sys.rseg, true),
purge_sys.page_no, purge_sys.offset);
/* The following call will advance the stored values of the purge_sys.rseg->latch.wr_lock(SRW_LOCK_CALL);
purge iterator. */ }
return trx_purge_get_next_rec(n_pages_handled, heap); /* The following will advance the purge iterator. */
return trx_purge_get_next_rec(n_pages_handled, heap);
} }
/** Run a purge batch. /** Run a purge batch.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment