Commit df44e75b authored by Marko Mäkelä's avatar Marko Mäkelä

Minor clean-up of purge code

purge_sys_t::n_submitted: Document that it is only accessed by
srv_purge_coordinator_thread.

purge_sys_t::n_completed: Exclusively use my_atomic access.

srv_task_execute(): Simplify the code.

srv_purge_coordinator_thread(): Test the cheaper condition first.

trx_purge(): Atomically access purge_sys.n_completed.
Remove some code duplication.

trx_purge_wait_for_workers_to_complete(): Atomically access
purge_sys.n_completed. Remove an unnecessary local variable.

trx_purge_stop(): Remove a redundant assignment.
parent 0f6186c5
......@@ -424,9 +424,12 @@ class purge_sys_t
MY_ALIGNED(CACHE_LINE_SIZE)
ReadView view; /*!< The purge will not remove undo logs
which are >= this view (purge view) */
ulint n_submitted; /*!< Count of total tasks submitted
to the task queue */
ulint n_completed; /*!< Count of total tasks completed */
/** Total number of tasks submitted by srv_purge_coordinator_thread.
Not accessed by other threads. */
ulint n_submitted;
/** Number of completed tasks. Accessed by srv_purge_coordinator
and srv_worker_thread by my_atomic. */
ulint n_completed;
/** Iterator to the undo log records of committed transactions */
struct iterator
......
......@@ -2474,38 +2474,25 @@ srv_purge_should_exit(ulint n_purged)
/*********************************************************************//**
Fetch and execute a task from the work queue.
@return true if a task was executed */
static
bool
srv_task_execute(void)
/*==================*/
static bool srv_task_execute()
{
que_thr_t* thr = NULL;
ut_ad(!srv_read_only_mode);
ut_a(srv_force_recovery < SRV_FORCE_NO_BACKGROUND);
ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND);
mutex_enter(&srv_sys.tasks_mutex);
if (UT_LIST_GET_LEN(srv_sys.tasks) > 0) {
thr = UT_LIST_GET_FIRST(srv_sys.tasks);
if (que_thr_t* thr = UT_LIST_GET_FIRST(srv_sys.tasks)) {
ut_a(que_node_get_type(thr->child) == QUE_NODE_PURGE);
UT_LIST_REMOVE(srv_sys.tasks, thr);
}
mutex_exit(&srv_sys.tasks_mutex);
if (thr != NULL) {
mutex_exit(&srv_sys.tasks_mutex);
que_run_threads(thr);
my_atomic_addlint(
&purge_sys.n_completed, 1);
my_atomic_addlint(&purge_sys.n_completed, 1);
return true;
}
return(thr != NULL);
ut_ad(UT_LIST_GET_LEN(srv_sys.tasks) == 0);
mutex_exit(&srv_sys.tasks_mutex);
return false;
}
/*********************************************************************//**
......@@ -2781,8 +2768,8 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
if (srv_shutdown_state == SRV_SHUTDOWN_NONE
&& srv_undo_sources
&& (purge_sys.state == PURGE_STATE_STOP
|| n_total_purged == 0)) {
&& (n_total_purged == 0
|| purge_sys.state == PURGE_STATE_STOP)) {
srv_purge_coordinator_suspend(slot, rseg_history_len);
}
......
......@@ -1510,10 +1510,9 @@ static
void
trx_purge_wait_for_workers_to_complete()
{
ulint n_submitted = purge_sys.n_submitted;
/* Ensure that the work queue empties out. */
while ((ulint) my_atomic_loadlint(&purge_sys.n_completed) != n_submitted) {
while (my_atomic_loadlint(&purge_sys.n_completed)
!= purge_sys.n_submitted) {
if (srv_get_task_queue_length() > 0) {
srv_release_threads(SRV_WORKER, 1);
......@@ -1522,9 +1521,6 @@ trx_purge_wait_for_workers_to_complete()
os_thread_yield();
}
/* None of the worker threads should be doing any work. */
ut_a(purge_sys.n_submitted == purge_sys.n_completed);
/* There should be no outstanding tasks as long
as the worker threads are active. */
ut_a(srv_get_task_queue_length() == 0);
......@@ -1548,7 +1544,8 @@ trx_purge(
srv_dml_needed_delay = trx_purge_dml_delay();
/* The number of tasks submitted should be completed. */
ut_a(purge_sys.n_submitted == purge_sys.n_completed);
ut_a(purge_sys.n_submitted
== my_atomic_loadlint(&purge_sys.n_completed));
rw_lock_x_lock(&purge_sys.latch);
trx_sys.clone_oldest_view();
......@@ -1562,46 +1559,27 @@ trx_purge(
/* Fetch the UNDO recs that need to be purged. */
n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads);
purge_sys.n_submitted += n_purge_threads;
/* Do we do an asynchronous purge or not ? */
if (n_purge_threads > 1) {
ulint i = 0;
/* Submit the tasks to the work queue. */
for (i = 0; i < n_purge_threads - 1; ++i) {
thr = que_fork_scheduler_round_robin(
purge_sys.query, thr);
ut_a(thr != NULL);
srv_que_task_enqueue_low(thr);
}
/* Submit tasks to workers queue if using multi-threaded purge. */
for (ulint i = n_purge_threads; --i; ) {
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
ut_a(thr != NULL);
purge_sys.n_submitted += n_purge_threads - 1;
goto run_synchronously;
/* Do it synchronously. */
} else {
thr = que_fork_scheduler_round_robin(purge_sys.query, NULL);
ut_ad(thr);
ut_a(thr);
srv_que_task_enqueue_low(thr);
}
run_synchronously:
++purge_sys.n_submitted;
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
que_run_threads(thr);
que_run_threads(thr);
my_atomic_addlint(&purge_sys.n_completed, 1);
my_atomic_addlint(&purge_sys.n_completed, 1);
if (n_purge_threads > 1) {
trx_purge_wait_for_workers_to_complete();
}
if (n_purge_threads > 1) {
trx_purge_wait_for_workers_to_complete();
}
ut_a(purge_sys.n_submitted == purge_sys.n_completed);
ut_a(purge_sys.n_submitted
== my_atomic_loadlint(&purge_sys.n_completed));
if (truncate) {
trx_purge_truncate_history();
......@@ -1653,7 +1631,6 @@ trx_purge_stop(void)
case PURGE_STATE_STOP:
ut_ad(srv_n_purge_threads > 0);
++purge_sys.n_stop;
purge_sys.state = PURGE_STATE_STOP;
if (!purge_sys.running) {
goto unlock;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment