Commit edde1f6e authored by Sergey Vojtovich's avatar Sergey Vojtovich

MDEV-17441 - InnoDB transition to C++11 atomics

trx_sys_t::rseg_history_len transition to Atomic_counter.
parent de32e663
...@@ -1392,7 +1392,7 @@ btr_cur_search_to_nth_level_func( ...@@ -1392,7 +1392,7 @@ btr_cur_search_to_nth_level_func(
Free blocks and read IO bandwidth should be prior Free blocks and read IO bandwidth should be prior
for them, when the history list is glowing huge. */ for them, when the history list is glowing huge. */
if (lock_intention == BTR_INTENTION_DELETE if (lock_intention == BTR_INTENTION_DELETE
&& trx_sys.history_size() > BTR_CUR_FINE_HISTORY_LENGTH && trx_sys.rseg_history_len > BTR_CUR_FINE_HISTORY_LENGTH
&& buf_get_n_pending_read_ios()) { && buf_get_n_pending_read_ios()) {
mtr_x_lock(dict_index_get_lock(index), mtr); mtr_x_lock(dict_index_get_lock(index), mtr);
} else if (dict_index_is_spatial(index) } else if (dict_index_is_spatial(index)
...@@ -2528,7 +2528,7 @@ btr_cur_open_at_index_side_func( ...@@ -2528,7 +2528,7 @@ btr_cur_open_at_index_side_func(
Free blocks and read IO bandwidth should be prior Free blocks and read IO bandwidth should be prior
for them, when the history list is glowing huge. */ for them, when the history list is glowing huge. */
if (lock_intention == BTR_INTENTION_DELETE if (lock_intention == BTR_INTENTION_DELETE
&& trx_sys.history_size() > BTR_CUR_FINE_HISTORY_LENGTH && trx_sys.rseg_history_len > BTR_CUR_FINE_HISTORY_LENGTH
&& buf_get_n_pending_read_ios()) { && buf_get_n_pending_read_ios()) {
mtr_x_lock(dict_index_get_lock(index), mtr); mtr_x_lock(dict_index_get_lock(index), mtr);
} else { } else {
...@@ -2873,7 +2873,7 @@ btr_cur_open_at_rnd_pos_func( ...@@ -2873,7 +2873,7 @@ btr_cur_open_at_rnd_pos_func(
Free blocks and read IO bandwidth should be prior Free blocks and read IO bandwidth should be prior
for them, when the history list is glowing huge. */ for them, when the history list is glowing huge. */
if (lock_intention == BTR_INTENTION_DELETE if (lock_intention == BTR_INTENTION_DELETE
&& trx_sys.history_size() > BTR_CUR_FINE_HISTORY_LENGTH && trx_sys.rseg_history_len > BTR_CUR_FINE_HISTORY_LENGTH
&& buf_get_n_pending_read_ios()) { && buf_get_n_pending_read_ios()) {
mtr_x_lock(dict_index_get_lock(index), mtr); mtr_x_lock(dict_index_get_lock(index), mtr);
} else { } else {
......
...@@ -125,7 +125,7 @@ flst_remove( ...@@ -125,7 +125,7 @@ flst_remove(
@param[in] base base node @param[in] base base node
@return length */ @return length */
UNIV_INLINE UNIV_INLINE
ulint uint32_t
flst_get_len( flst_get_len(
const flst_base_node_t* base); const flst_base_node_t* base);
/********************************************************************//** /********************************************************************//**
......
...@@ -91,7 +91,7 @@ flst_init( ...@@ -91,7 +91,7 @@ flst_init(
@param[in] base base node @param[in] base base node
@return length */ @return length */
UNIV_INLINE UNIV_INLINE
ulint uint32_t
flst_get_len( flst_get_len(
const flst_base_node_t* base) const flst_base_node_t* base)
{ {
......
...@@ -804,14 +804,14 @@ class trx_sys_t ...@@ -804,14 +804,14 @@ class trx_sys_t
MY_ALIGNED(CACHE_LINE_SIZE) std::atomic<trx_id_t> m_rw_trx_hash_version; MY_ALIGNED(CACHE_LINE_SIZE) std::atomic<trx_id_t> m_rw_trx_hash_version;
bool m_initialised;
public:
/** /**
TRX_RSEG_HISTORY list length (number of committed transactions to purge) TRX_RSEG_HISTORY list length (number of committed transactions to purge)
*/ */
MY_ALIGNED(CACHE_LINE_SIZE) int32 rseg_history_len; MY_ALIGNED(CACHE_LINE_SIZE) Atomic_counter<uint32_t> rseg_history_len;
bool m_initialised;
public:
/** Mutex protecting trx_list. */ /** Mutex protecting trx_list. */
MY_ALIGNED(CACHE_LINE_SIZE) mutable TrxSysMutex mutex; MY_ALIGNED(CACHE_LINE_SIZE) mutable TrxSysMutex mutex;
...@@ -1103,22 +1103,6 @@ class trx_sys_t ...@@ -1103,22 +1103,6 @@ class trx_sys_t
return count; return count;
} }
/** @return number of committed transactions waiting for purge */
ulint history_size() const
{
return uint32(my_atomic_load32(&const_cast<trx_sys_t*>(this)
->rseg_history_len));
}
/** Add to the TRX_RSEG_HISTORY length (on database startup). */
void history_add(int32 len)
{
my_atomic_add32(&rseg_history_len, len);
}
/** Register a committed transaction. */
void history_insert() { history_add(1); }
/** Note that a committed transaction was purged. */
void history_remove() { history_add(-1); }
private: private:
static my_bool get_min_trx_id_callback(rw_trx_hash_element_t *element, static my_bool get_min_trx_id_callback(rw_trx_hash_element_t *element,
trx_id_t *id) trx_id_t *id)
......
...@@ -4607,14 +4607,14 @@ lock_print_info_summary( ...@@ -4607,14 +4607,14 @@ lock_print_info_summary(
fprintf(file, fprintf(file,
"Purge done for trx's n:o < " TRX_ID_FMT "Purge done for trx's n:o < " TRX_ID_FMT
" undo n:o < " TRX_ID_FMT " state: %s\n" " undo n:o < " TRX_ID_FMT " state: %s\n"
"History list length " ULINTPF "\n", "History list length %u\n",
purge_sys.tail.trx_no(), purge_sys.tail.trx_no(),
purge_sys.tail.undo_no, purge_sys.tail.undo_no,
purge_sys.enabled() purge_sys.enabled()
? (purge_sys.running() ? "running" ? (purge_sys.running() ? "running"
: purge_sys.paused() ? "stopped" : "running but idle") : purge_sys.paused() ? "stopped" : "running but idle")
: "disabled", : "disabled",
trx_sys.history_size()); uint32_t{trx_sys.rseg_history_len});
#ifdef PRINT_NUM_OF_LOCK_STRUCTS #ifdef PRINT_NUM_OF_LOCK_STRUCTS
fprintf(file, fprintf(file,
......
...@@ -1936,7 +1936,7 @@ srv_mon_process_existing_counter( ...@@ -1936,7 +1936,7 @@ srv_mon_process_existing_counter(
break; break;
case MONITOR_RSEG_HISTORY_LEN: case MONITOR_RSEG_HISTORY_LEN:
value = trx_sys.history_size(); value = trx_sys.rseg_history_len;
break; break;
case MONITOR_RSEG_CUR_SIZE: case MONITOR_RSEG_CUR_SIZE:
......
...@@ -1939,7 +1939,7 @@ srv_wake_purge_thread_if_not_active() ...@@ -1939,7 +1939,7 @@ srv_wake_purge_thread_if_not_active()
if (purge_sys.enabled() && !purge_sys.paused() if (purge_sys.enabled() && !purge_sys.paused()
&& !my_atomic_loadlint(&srv_sys.n_threads_active[SRV_PURGE]) && !my_atomic_loadlint(&srv_sys.n_threads_active[SRV_PURGE])
&& trx_sys.history_size()) { && trx_sys.rseg_history_len) {
srv_release_threads(SRV_PURGE, 1); srv_release_threads(SRV_PURGE, 1);
} }
...@@ -2412,9 +2412,7 @@ static bool srv_purge_should_exit() ...@@ -2412,9 +2412,7 @@ static bool srv_purge_should_exit()
return(true); return(true);
} }
/* Slow shutdown was requested. */ /* Slow shutdown was requested. */
ulint history_size = trx_sys.history_size(); if (uint32_t history_size = trx_sys.rseg_history_len) {
if (history_size) {
#if defined HAVE_SYSTEMD && !defined EMBEDDED_LIBRARY #if defined HAVE_SYSTEMD && !defined EMBEDDED_LIBRARY
static ib_time_t progress_time; static ib_time_t progress_time;
ib_time_t time = ut_time(); ib_time_t time = ut_time();
...@@ -2422,7 +2420,7 @@ static bool srv_purge_should_exit() ...@@ -2422,7 +2420,7 @@ static bool srv_purge_should_exit()
progress_time = time; progress_time = time;
service_manager_extend_timeout( service_manager_extend_timeout(
INNODB_EXTEND_TIMEOUT_INTERVAL, INNODB_EXTEND_TIMEOUT_INTERVAL,
"InnoDB: to purge " ULINTPF " transactions", "InnoDB: to purge %u transactions",
history_size); history_size);
} }
#endif #endif
...@@ -2525,14 +2523,14 @@ DECLARE_THREAD(srv_worker_thread)( ...@@ -2525,14 +2523,14 @@ DECLARE_THREAD(srv_worker_thread)(
@param[in,out] n_total_purged total number of purged pages @param[in,out] n_total_purged total number of purged pages
@return length of history list before the last purge batch. */ @return length of history list before the last purge batch. */
static static
ulint uint32_t
srv_do_purge(ulint* n_total_purged) srv_do_purge(ulint* n_total_purged)
{ {
ulint n_pages_purged; ulint n_pages_purged;
static ulint count = 0; static ulint count = 0;
static ulint n_use_threads = 0; static ulint n_use_threads = 0;
static ulint rseg_history_len = 0; static uint32_t rseg_history_len = 0;
ulint old_activity_count = srv_get_activity_count(); ulint old_activity_count = srv_get_activity_count();
const ulint n_threads = srv_n_purge_threads; const ulint n_threads = srv_n_purge_threads;
...@@ -2550,7 +2548,7 @@ srv_do_purge(ulint* n_total_purged) ...@@ -2550,7 +2548,7 @@ srv_do_purge(ulint* n_total_purged)
} }
do { do {
if (trx_sys.history_size() > rseg_history_len if (trx_sys.rseg_history_len > rseg_history_len
|| (srv_max_purge_lag > 0 || (srv_max_purge_lag > 0
&& rseg_history_len > srv_max_purge_lag)) { && rseg_history_len > srv_max_purge_lag)) {
...@@ -2579,7 +2577,7 @@ srv_do_purge(ulint* n_total_purged) ...@@ -2579,7 +2577,7 @@ srv_do_purge(ulint* n_total_purged)
ut_a(n_use_threads <= n_threads); ut_a(n_use_threads <= n_threads);
/* Take a snapshot of the history list before purge. */ /* Take a snapshot of the history list before purge. */
if (!(rseg_history_len = trx_sys.history_size())) { if (!(rseg_history_len = trx_sys.rseg_history_len)) {
break; break;
} }
...@@ -2603,7 +2601,7 @@ srv_purge_coordinator_suspend( ...@@ -2603,7 +2601,7 @@ srv_purge_coordinator_suspend(
/*==========================*/ /*==========================*/
srv_slot_t* slot, /*!< in/out: Purge coordinator srv_slot_t* slot, /*!< in/out: Purge coordinator
thread slot */ thread slot */
ulint rseg_history_len) /*!< in: history list length uint32_t rseg_history_len) /*!< in: history list length
before last purge */ before last purge */
{ {
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
...@@ -2620,7 +2618,7 @@ srv_purge_coordinator_suspend( ...@@ -2620,7 +2618,7 @@ srv_purge_coordinator_suspend(
/* We don't wait right away on the the non-timed wait because /* We don't wait right away on the the non-timed wait because
we want to signal the thread that wants to suspend purge. */ we want to signal the thread that wants to suspend purge. */
const bool wait = stop const bool wait = stop
|| rseg_history_len <= trx_sys.history_size(); || rseg_history_len <= trx_sys.rseg_history_len;
const bool timeout = srv_resume_thread( const bool timeout = srv_resume_thread(
slot, sig_count, wait, slot, sig_count, wait,
stop ? 0 : SRV_PURGE_MAX_TIMEOUT); stop ? 0 : SRV_PURGE_MAX_TIMEOUT);
...@@ -2635,7 +2633,7 @@ srv_purge_coordinator_suspend( ...@@ -2635,7 +2633,7 @@ srv_purge_coordinator_suspend(
if (!stop) { if (!stop) {
if (timeout if (timeout
&& rseg_history_len < 5000 && rseg_history_len < 5000
&& rseg_history_len == trx_sys.history_size()) { && rseg_history_len == trx_sys.rseg_history_len) {
/* No new records were added since the /* No new records were added since the
wait started. Simply wait for new wait started. Simply wait for new
records. The magic number 5000 is an records. The magic number 5000 is an
...@@ -2688,7 +2686,7 @@ DECLARE_THREAD(srv_purge_coordinator_thread)( ...@@ -2688,7 +2686,7 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
slot = srv_reserve_slot(SRV_PURGE); slot = srv_reserve_slot(SRV_PURGE);
ulint rseg_history_len = trx_sys.history_size(); uint32_t rseg_history_len = trx_sys.rseg_history_len;
do { do {
/* If there are no records to purge or the last /* If there are no records to purge or the last
......
...@@ -315,7 +315,7 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) ...@@ -315,7 +315,7 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
rseg->needs_purge = true; rseg->needs_purge = true;
} }
trx_sys.history_insert(); trx_sys.rseg_history_len++;
if (undo->state == TRX_UNDO_CACHED) { if (undo->state == TRX_UNDO_CACHED) {
UT_LIST_ADD_FIRST(rseg->undo_cached, undo); UT_LIST_ADD_FIRST(rseg->undo_cached, undo);
...@@ -341,7 +341,7 @@ trx_purge_remove_log_hdr( ...@@ -341,7 +341,7 @@ trx_purge_remove_log_hdr(
{ {
flst_remove(rseg_hdr + TRX_RSEG_HISTORY, flst_remove(rseg_hdr + TRX_RSEG_HISTORY,
log_hdr + TRX_UNDO_HISTORY_NODE, mtr); log_hdr + TRX_UNDO_HISTORY_NODE, mtr);
trx_sys.history_remove(); trx_sys.rseg_history_len--;
} }
/** Free an undo log segment, and remove the header from the history list. /** Free an undo log segment, and remove the header from the history list.
...@@ -1240,7 +1240,7 @@ trx_purge_dml_delay(void) ...@@ -1240,7 +1240,7 @@ trx_purge_dml_delay(void)
if (srv_max_purge_lag > 0) { if (srv_max_purge_lag > 0) {
float ratio; float ratio;
ratio = float(trx_sys.history_size()) / srv_max_purge_lag; ratio = float(trx_sys.rseg_history_len) / srv_max_purge_lag;
if (ratio > 1.0) { if (ratio > 1.0) {
/* If the history list length exceeds the /* If the history list length exceeds the
......
...@@ -468,8 +468,8 @@ trx_rseg_mem_restore(trx_rseg_t* rseg, trx_id_t& max_trx_id, mtr_t* mtr) ...@@ -468,8 +468,8 @@ trx_rseg_mem_restore(trx_rseg_t* rseg, trx_id_t& max_trx_id, mtr_t* mtr)
rseg->curr_size = mach_read_from_4(rseg_header + TRX_RSEG_HISTORY_SIZE) rseg->curr_size = mach_read_from_4(rseg_header + TRX_RSEG_HISTORY_SIZE)
+ 1 + trx_undo_lists_init(rseg, max_trx_id, rseg_header); + 1 + trx_undo_lists_init(rseg, max_trx_id, rseg_header);
if (ulint len = flst_get_len(rseg_header + TRX_RSEG_HISTORY)) { if (auto len = flst_get_len(rseg_header + TRX_RSEG_HISTORY)) {
trx_sys.history_add(int32(len)); trx_sys.rseg_history_len += len;
fil_addr_t node_addr = trx_purge_get_log_from_hist( fil_addr_t node_addr = trx_purge_get_log_from_hist(
flst_get_last(rseg_header + TRX_RSEG_HISTORY, mtr)); flst_get_last(rseg_header + TRX_RSEG_HISTORY, mtr));
......
...@@ -212,7 +212,7 @@ trx_sys_t::create() ...@@ -212,7 +212,7 @@ trx_sys_t::create()
m_initialised = true; m_initialised = true;
mutex_create(LATCH_ID_TRX_SYS, &mutex); mutex_create(LATCH_ID_TRX_SYS, &mutex);
UT_LIST_INIT(trx_list, &trx_t::trx_list); UT_LIST_INIT(trx_list, &trx_t::trx_list);
my_atomic_store32(&rseg_history_len, 0); rseg_history_len= 0;
rw_trx_hash.init(); rw_trx_hash.init();
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment