Commit e1295554 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-20372 thread_pool_info fails randomly in 10.5

Rework stats a bit, so we're not missing any queue_get() now.

Don't do stats_reset_table(), if generic threadpool is off.
parent f991c416
......@@ -194,10 +194,10 @@ static int stats_fill_table(THD* thd, TABLE_LIST* tables, COND*)
table->field[4]->store(counters->wakes_due_to_stall, true);
table->field[5]->store(counters->throttles, true);
table->field[6]->store(counters->stalls, true);
table->field[7]->store(counters->polls_by_listener, true);
table->field[8]->store(counters->polls_by_worker, true);
table->field[9]->store(counters->dequeues_by_listener, true);
table->field[10]->store(counters->dequeues_by_worker, true);
table->field[7]->store(counters->polls[(int)operation_origin::LISTENER], true);
table->field[8]->store(counters->polls[(int)operation_origin::WORKER], true);
table->field[9]->store(counters->dequeues[(int)operation_origin::LISTENER], true);
table->field[10]->store(counters->dequeues[(int)operation_origin::WORKER], true);
mysql_mutex_unlock(&group->mutex);
if (schema_table_store_record(thd, table))
return 1;
......@@ -207,6 +207,9 @@ static int stats_fill_table(THD* thd, TABLE_LIST* tables, COND*)
static int stats_reset_table()
{
if (!all_groups)
return 0;
for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
{
thread_group_t* group = &all_groups[i];
......
......@@ -45,7 +45,6 @@ static void io_poll_close(TP_file_handle fd)
#endif
}
/** Maximum number of native events a listener can read in one go */
#define MAX_EVENTS 1024
......@@ -435,6 +434,16 @@ static TP_connection_generic *queue_get(thread_group_t *thread_group)
DBUG_RETURN(0);
}
static TP_connection_generic* queue_get(thread_group_t* group, operation_origin origin)
{
auto ret = queue_get(group);
if (ret)
{
TP_INCREMENT_GROUP_COUNTER(group, dequeues[(int)origin]);
}
return ret;
}
static bool is_queue_empty(thread_group_t *thread_group)
{
for (int i=0; i < NQUEUES; i++)
......@@ -684,7 +693,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
break;
cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_listener);
TP_INCREMENT_GROUP_COUNTER(thread_group, polls[(int)operation_origin::LISTENER]);
if (cnt <=0)
{
DBUG_ASSERT(thread_group->shutdown);
......@@ -750,7 +759,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
if (listener_picks_event)
{
/* Handle the first event. */
retval= queue_get(thread_group);
retval= queue_get(thread_group, operation_origin::LISTENER);
mysql_mutex_unlock(&thread_group->mutex);
break;
}
......@@ -1130,10 +1139,9 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
/* Check if queue is not empty */
if (!oversubscribed)
{
connection = queue_get(thread_group);
connection = queue_get(thread_group, operation_origin::WORKER);
if(connection)
{
TP_INCREMENT_GROUP_COUNTER(thread_group,dequeues_by_worker);
break;
}
}
......@@ -1146,10 +1154,7 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
mysql_mutex_unlock(&thread_group->mutex);
connection = listener(current_thread, thread_group);
if (connection)
{
TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues_by_listener);
}
mysql_mutex_lock(&thread_group->mutex);
thread_group->active_thread_count++;
/* There is no listener anymore, it just returned. */
......@@ -1167,11 +1172,11 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
{
native_event ev[MAX_EVENTS];
int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_worker);
TP_INCREMENT_GROUP_COUNTER(thread_group, polls[(int)operation_origin::WORKER]);
if (cnt > 0)
{
queue_put(thread_group, ev, cnt);
connection= queue_get(thread_group);
connection= queue_get(thread_group,operation_origin::WORKER);
break;
}
}
......
......@@ -108,6 +108,12 @@ typedef I_P_List<TP_connection_generic,
const int NQUEUES = 2; /* We have high and low priority queues*/
enum class operation_origin
{
WORKER,
LISTENER
};
struct thread_group_counters_t
{
ulonglong thread_creations;
......@@ -116,10 +122,8 @@ struct thread_group_counters_t
ulonglong wakes_due_to_stall;
ulonglong throttles;
ulonglong stalls;
ulonglong dequeues_by_worker;
ulonglong dequeues_by_listener;
ulonglong polls_by_listener;
ulonglong polls_by_worker;
ulonglong dequeues[2];
ulonglong polls[2];
};
struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
......@@ -143,7 +147,7 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
thread_group_counters_t counters;
};
#define TP_INCREMENT_GROUP_COUNTER(group,var) group->counters.var++;
#define TP_INCREMENT_GROUP_COUNTER(group,var) do {group->counters.var++;}while(0)
extern thread_group_t* all_groups;
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment