Commit 76f6b6d8 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-34515: Reduce context switching in purge

Before this patch, the InnoDB purge coordinator task submitted
innodb_purge_threads-1 tasks even if there was not sufficient amount
of work for all of them. For example, if there are undo log records
only for 1 table, only 1 task can be employed, and that task had better
be the purge coordinator.

srv_purge_worker_task_low(): Split from purge_worker_callback().

trx_purge_attach_undo_recs(): Remove the parameter n_purge_threads,
and add the parameter n_work_items, to keep track of the amount of
work.

trx_purge(): Launch purge worker tasks only if necessary. The work of
one thread will be executed by this purge coordinator thread.

que_fork_scheduler_round_robin(): Merged to trx_purge().

Thanks to Vladislav Vaintroub for supplying a prototype of this.

Reviewed by: Debarun Banerjee
parent b7b9f3ce
......@@ -209,17 +209,6 @@ que_eval_sql(
const char* sql, /*!< in: SQL string */
trx_t* trx); /*!< in: trx */
/**********************************************************************//**
Round robin scheduler.
@return a query thread of the graph moved to QUE_THR_RUNNING state, or
NULL; the query thread should be executed by que_run_threads by the
caller */
que_thr_t*
que_fork_scheduler_round_robin(
/*===========================*/
que_fork_t* fork, /*!< in: a query fork */
que_thr_t* thr); /*!< in: current pos */
/** Query thread states */
enum que_thr_state_t {
/** in selects this means that the thread is at the end of its
......
......@@ -624,6 +624,15 @@ Complete the shutdown tasks such as background DROP TABLE,
and optionally change buffer merge (on innodb_fast_shutdown=0). */
void srv_shutdown(bool ibuf_merge);
/**
Fetches and executes tasks from the purge work queue,
until this queue is empty.
This is main part of purge worker task, but also
executed in coordinator.
@note needs current_thd to be set beforehand.
*/
void srv_purge_worker_task_low();
} /* extern "C" */
#ifdef UNIV_DEBUG
......
......@@ -166,40 +166,6 @@ que_thr_init_command(
thr->state = QUE_THR_RUNNING;
}
/**********************************************************************//**
Round robin scheduler.
@return a query thread of the graph moved to QUE_THR_RUNNING state, or
NULL; the query thread should be executed by que_run_threads by the
caller */
que_thr_t*
que_fork_scheduler_round_robin(
/*===========================*/
que_fork_t* fork, /*!< in: a query fork */
que_thr_t* thr) /*!< in: current pos */
{
fork->trx->mutex_lock();
/* If no current, start first available. */
if (thr == NULL) {
thr = UT_LIST_GET_FIRST(fork->thrs);
} else {
thr = UT_LIST_GET_NEXT(thrs, thr);
}
if (thr) {
fork->state = QUE_FORK_ACTIVE;
fork->last_sel_node = NULL;
ut_ad(thr->state == QUE_THR_COMPLETED);
que_thr_init_command(thr);
}
fork->trx->mutex_unlock();
return(thr);
}
/**********************************************************************//**
Starts execution of a command in a query fork. Picks a query thread which
is not in the QUE_THR_RUNNING state and moves it to that state. If none
......
......@@ -1556,7 +1556,6 @@ static bool srv_purge_should_exit(size_t old_history_size)
/*********************************************************************//**
Fetch and execute a task from the work queue.
@param [in,out] slot purge worker thread slot
@return true if a task was executed */
static bool srv_task_execute()
{
......@@ -1696,6 +1695,13 @@ static void release_thd(THD *thd, void *ctx)
set_current_thd(0);
}
void srv_purge_worker_task_low()
{
ut_ad(current_thd);
while (srv_task_execute())
ut_ad(purge_sys.running());
}
static void purge_worker_callback(void*)
{
ut_ad(!current_thd);
......@@ -1703,8 +1709,7 @@ static void purge_worker_callback(void*)
ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND);
void *ctx;
THD *thd= acquire_thd(&ctx);
while (srv_task_execute())
ut_ad(purge_sys.running());
srv_purge_worker_task_low();
release_thd(thd,ctx);
}
......
......@@ -1199,123 +1199,108 @@ dict_table_t *purge_sys_t::close_and_reopen(table_id_t id, THD *thd,
/** Run a purge batch.
@param n_purge_threads number of purge threads
@param thd purge coordinator thread handle
@param n_work_items number of work items (currently tables) to process
@return new purge_sys.head */
static purge_sys_t::iterator
trx_purge_attach_undo_recs(ulint n_purge_threads, THD *thd)
static purge_sys_t::iterator trx_purge_attach_undo_recs(THD *thd,
ulint *n_work_items)
{
que_thr_t* thr;
ulint i;
que_thr_t *thr;
purge_sys_t::iterator head= purge_sys.tail;
ut_a(n_purge_threads > 0);
ut_a(UT_LIST_GET_LEN(purge_sys.query->thrs) >= n_purge_threads);
/* Fetch and parse the UNDO records. The UNDO records are added
to a per purge node vector. */
thr= nullptr;
purge_sys_t::iterator head = purge_sys.tail;
std::unordered_map<table_id_t, purge_node_t *>
table_id_map(TRX_PURGE_TABLE_BUCKETS);
purge_sys.m_active= true;
#ifdef UNIV_DEBUG
i = 0;
/* Debug code to validate some pre-requisites and reset done flag. */
for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
thr != NULL && i < n_purge_threads;
thr = UT_LIST_GET_NEXT(thrs, thr), ++i) {
MDL_context *const mdl_context=
static_cast<MDL_context*>(thd_mdl_context(thd));
ut_ad(mdl_context);
purge_node_t* node;
const size_t max_pages=
std::min(buf_pool.curr_size * 3 / 4, size_t{srv_purge_batch_size});
/* Get the purge node. */
node = (purge_node_t*) thr->child;
while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown)
{
/* Track the max {trx_id, undo_no} for truncating the
UNDO logs once we have purged the records. */
ut_ad(que_node_get_type(node) == QUE_NODE_PURGE);
ut_ad(node->undo_recs.empty());
ut_ad(!node->in_progress);
ut_d(node->in_progress = true);
}
if (head <= purge_sys.tail)
head= purge_sys.tail;
/* There should never be fewer nodes than threads, the inverse
however is allowed because we only use purge threads as needed. */
ut_ad(i == n_purge_threads);
#endif
/* Fetch the next record, and advance the purge_sys.tail. */
trx_purge_rec_t purge_rec= purge_sys.fetch_next_rec();
/* Fetch and parse the UNDO records. The UNDO records are added
to a per purge node vector. */
thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
ut_ad(head <= purge_sys.tail);
i = 0;
std::unordered_map<table_id_t, purge_node_t*>
table_id_map(TRX_PURGE_TABLE_BUCKETS);
purge_sys.m_active = true;
MDL_context* const mdl_context
= static_cast<MDL_context*>(thd_mdl_context(thd));
ut_ad(mdl_context);
const size_t max_pages = std::min(buf_pool.curr_size * 3 / 4,
size_t{srv_purge_batch_size});
while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) {
/* Track the max {trx_id, undo_no} for truncating the
UNDO logs once we have purged the records. */
if (head <= purge_sys.tail) {
head = purge_sys.tail;
}
/* Fetch the next record, and advance the purge_sys.tail. */
trx_purge_rec_t purge_rec = purge_sys.fetch_next_rec();
if (!purge_rec.undo_rec) {
if (!purge_rec.roll_ptr) {
break;
}
ut_ad(purge_rec.roll_ptr == 1);
continue;
}
table_id_t table_id = trx_undo_rec_get_table_id(
purge_rec.undo_rec);
purge_node_t*& table_node = table_id_map[table_id];
if (!table_node) {
std::pair<dict_table_t*,MDL_ticket*> p;
p.first = trx_purge_table_open(table_id, mdl_context,
&p.second);
if (p.first == reinterpret_cast<dict_table_t*>(-1)) {
p.first = purge_sys.close_and_reopen(
table_id, thd, &p.second);
}
thr = UT_LIST_GET_NEXT(thrs, thr);
if (!(++i % n_purge_threads)) {
thr = UT_LIST_GET_FIRST(
purge_sys.query->thrs);
}
table_node = static_cast<purge_node_t*>(thr->child);
ut_a(que_node_get_type(table_node) == QUE_NODE_PURGE);
ut_d(auto i=)
table_node->tables.emplace(table_id, p);
ut_ad(i.second);
if (p.first) {
goto enqueue;
}
} else if (table_node->tables[table_id].first) {
enqueue:
table_node->undo_recs.push(purge_rec);
}
if (purge_sys.n_pages_handled() >= max_pages) {
break;
}
}
if (!purge_rec.undo_rec)
{
if (!purge_rec.roll_ptr)
break;
ut_ad(purge_rec.roll_ptr == 1);
continue;
}
table_id_t table_id= trx_undo_rec_get_table_id(purge_rec.undo_rec);
purge_node_t *&table_node= table_id_map[table_id];
if (table_node)
ut_ad(!table_node->in_progress);
if (!table_node)
{
std::pair<dict_table_t *, MDL_ticket *> p;
p.first= trx_purge_table_open(table_id, mdl_context, &p.second);
if (p.first == reinterpret_cast<dict_table_t *>(-1))
p.first= purge_sys.close_and_reopen(table_id, thd, &p.second);
if (!thr || !(thr= UT_LIST_GET_NEXT(thrs, thr)))
thr= UT_LIST_GET_FIRST(purge_sys.query->thrs);
++*n_work_items;
table_node= static_cast<purge_node_t *>(thr->child);
ut_a(que_node_get_type(table_node) == QUE_NODE_PURGE);
ut_d(auto pair=) table_node->tables.emplace(table_id, p);
ut_ad(pair.second);
if (p.first)
goto enqueue;
}
else if (table_node->tables[table_id].first)
{
enqueue:
table_node->undo_recs.push(purge_rec);
ut_ad(!table_node->in_progress);
}
if (purge_sys.n_pages_handled() >= max_pages)
break;
}
purge_sys.m_active= false;
#ifdef UNIV_DEBUG
thr= UT_LIST_GET_FIRST(purge_sys.query->thrs);
for (ulint i= 0; thr && i < *n_work_items;
i++, thr= UT_LIST_GET_NEXT(thrs, thr))
{
purge_node_t *node= static_cast<purge_node_t*>(thr->child);
ut_ad(que_node_get_type(node) == QUE_NODE_PURGE);
ut_ad(!node->in_progress);
node->in_progress= true;
}
purge_sys.m_active = false;
for (; thr; thr= UT_LIST_GET_NEXT(thrs, thr))
{
purge_node_t *node= static_cast<purge_node_t*>(thr->child);
ut_ad(que_node_get_type(node) == QUE_NODE_PURGE);
ut_ad(!node->in_progress);
ut_ad(node->undo_recs.empty());
}
#endif
ut_ad(head <= purge_sys.tail);
ut_ad(head <= purge_sys.tail);
return head;
return head;
}
extern tpool::waitable_task purge_worker_task;
......@@ -1373,68 +1358,89 @@ Run a purge batch.
@return number of undo log pages handled in the batch */
TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, ulint history_size)
{
ut_ad(n_tasks > 0);
ut_ad(n_tasks > 0);
purge_sys.clone_oldest_view();
purge_sys.clone_oldest_view();
#ifdef UNIV_DEBUG
if (srv_purge_view_update_only_debug) {
return(0);
}
#endif /* UNIV_DEBUG */
ut_d(if (srv_purge_view_update_only_debug) return 0);
THD* const thd = current_thd;
/* Fetch the UNDO recs that need to be purged. */
const purge_sys_t::iterator head
= trx_purge_attach_undo_recs(n_tasks, thd);
const size_t n_pages = purge_sys.n_pages_handled();
{
ulint delay = n_pages ? srv_max_purge_lag : 0;
if (UNIV_UNLIKELY(delay)) {
if (delay >= history_size) {
no_throttle:
delay = 0;
} else if (const ulint max_delay =
srv_max_purge_lag_delay) {
delay = std::min(max_delay,
10000 * history_size / delay
- 5000);
} else {
goto no_throttle;
}
}
srv_dml_needed_delay = delay;
}
THD *const thd= current_thd;
que_thr_t* thr = nullptr;
/* Fetch the UNDO recs that need to be purged. */
ulint n_work= 0;
const purge_sys_t::iterator head= trx_purge_attach_undo_recs(thd, &n_work);
const size_t n_pages= purge_sys.n_pages_handled();
/* Submit tasks to workers queue if using multi-threaded purge. */
for (ulint i = n_tasks; --i; ) {
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
ut_a(thr);
srv_que_task_enqueue_low(thr);
srv_thread_pool->submit_task(&purge_worker_task);
}
{
ulint delay= n_pages ? srv_max_purge_lag : 0;
if (UNIV_UNLIKELY(delay))
{
if (delay >= history_size)
no_throttle:
delay= 0;
else if (const ulint max_delay= srv_max_purge_lag_delay)
delay= std::min(max_delay, 10000 * history_size / delay - 5000);
else
goto no_throttle;
}
srv_dml_needed_delay= delay;
}
ut_ad(n_tasks);
que_thr_t *thr= nullptr;
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
if (n_work)
{
for (auto i= n_work; i--; )
{
if (!thr)
thr= UT_LIST_GET_FIRST(purge_sys.query->thrs);
else
thr= UT_LIST_GET_NEXT(thrs, thr);
if (!thr)
break;
ut_ad(thr->state == QUE_THR_COMPLETED);
thr->state= QUE_THR_RUNNING;
thr->run_node= thr;
thr->prev_node= thr->common.parent;
purge_sys.query->state= QUE_FORK_ACTIVE;
purge_sys.query->last_sel_node= nullptr;
srv_que_task_enqueue_low(thr);
}
que_run_threads(thr);
/*
To reduce context switches we only submit at most n_tasks-1 worker task.
(we can use less tasks, if there is not enough work)
trx_purge_wait_for_workers_to_complete();
The coordinator does worker's job, instead of waiting and sitting idle,
then waits for all others to finish.
for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs); thr;
thr = UT_LIST_GET_NEXT(thrs, thr)) {
purge_node_t* node = static_cast<purge_node_t*>(thr->child);
trx_purge_close_tables(node, thd);
node->tables.clear();
}
This also means if innodb_purge_threads=1, the coordinator does all
the work alone.
*/
const ulint workers{std::min(n_work, n_tasks) - 1};
for (ulint i= 0; i < workers; i++)
srv_thread_pool->submit_task(&purge_worker_task);
srv_purge_worker_task_low();
if (workers)
trx_purge_wait_for_workers_to_complete();
for (thr= UT_LIST_GET_FIRST(purge_sys.query->thrs); thr && n_work--;
thr= UT_LIST_GET_NEXT(thrs, thr))
{
purge_node_t *node= static_cast<purge_node_t*>(thr->child);
trx_purge_close_tables(node, thd);
node->tables.clear();
}
}
purge_sys.batch_cleanup(head);
purge_sys.batch_cleanup(head);
MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages);
MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages);
return n_pages;
return n_pages;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment