Commit 0e3981a8 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

Threadpool : address some of the monty's review points

Also, print message when pool blocks.
parent 7dd541cc
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <time.h> #include <time.h>
#include <sql_plist.h> #include <sql_plist.h>
#include <threadpool.h> #include <threadpool.h>
#include <time.h>
#ifdef __linux__ #ifdef __linux__
#include <sys/epoll.h> #include <sys/epoll.h>
typedef struct epoll_event native_event; typedef struct epoll_event native_event;
...@@ -91,6 +92,7 @@ struct connection_t ...@@ -91,6 +92,7 @@ struct connection_t
connection_t **prev_in_queue; connection_t **prev_in_queue;
ulonglong abs_wait_timeout; ulonglong abs_wait_timeout;
bool logged_in; bool logged_in;
bool bound_to_poll_descriptor;
bool waiting; bool waiting;
}; };
...@@ -127,6 +129,12 @@ struct thread_group_t ...@@ -127,6 +129,12 @@ struct thread_group_t
static thread_group_t all_groups[MAX_THREAD_GROUPS]; static thread_group_t all_groups[MAX_THREAD_GROUPS];
static uint group_count; static uint group_count;
/**
Used for printing "pool blocked" message, see
print_pool_blocked_message();
*/
static time_t pool_block_start;
/* Global timer for all groups */ /* Global timer for all groups */
struct pool_timer_t struct pool_timer_t
{ {
...@@ -140,6 +148,7 @@ struct pool_timer_t ...@@ -140,6 +148,7 @@ struct pool_timer_t
static pool_timer_t pool_timer; static pool_timer_t pool_timer;
/* Externals functions and variables we use */ /* Externals functions and variables we use */
extern void scheduler_init(); extern void scheduler_init();
extern pthread_attr_t *get_connection_attrib(void); extern pthread_attr_t *get_connection_attrib(void);
...@@ -155,7 +164,7 @@ static void connection_abort(connection_t *connection); ...@@ -155,7 +164,7 @@ static void connection_abort(connection_t *connection);
void tp_post_kill_notification(THD *thd); void tp_post_kill_notification(THD *thd);
static void set_wait_timeout(connection_t *connection); static void set_wait_timeout(connection_t *connection);
static void set_next_timeout_check(ulonglong abstime); static void set_next_timeout_check(ulonglong abstime);
static void print_pool_blocked_message(bool);
/** /**
Asynchronous network IO. Asynchronous network IO.
...@@ -641,9 +650,9 @@ static connection_t * listener(worker_thread_t *current_thread, ...@@ -641,9 +650,9 @@ static connection_t * listener(worker_thread_t *current_thread,
and wake a worker. and wake a worker.
NOTE: Currently nothing is done to detect or prevent long queuing times. NOTE: Currently nothing is done to detect or prevent long queuing times.
A solution (for the future) would be to give up "one active thread per group" A solutionc for the future would be to give up "one active thread per
principle, if events stay in the queue for too long, and wake more workers. group" principle, if events stay in the queue for too long, and just wake
more workers.
*/ */
bool listener_picks_event= thread_group->queue.is_empty(); bool listener_picks_event= thread_group->queue.is_empty();
...@@ -714,14 +723,16 @@ static connection_t * listener(worker_thread_t *current_thread, ...@@ -714,14 +723,16 @@ static connection_t * listener(worker_thread_t *current_thread,
static int create_worker(thread_group_t *thread_group) static int create_worker(thread_group_t *thread_group)
{ {
pthread_t thread_id; pthread_t thread_id;
bool max_threads_reached= false;
int err; int err;
DBUG_ENTER("create_worker"); DBUG_ENTER("create_worker");
if (tp_stats.num_worker_threads >= (int)threadpool_max_threads if (tp_stats.num_worker_threads >= (int)threadpool_max_threads
&& thread_group->thread_count >= 2) && thread_group->thread_count >= 2)
{ {
DBUG_PRINT("info", err= 1;
("Cannot create new thread (maximum allowed threads reached)")); max_threads_reached= true;
DBUG_RETURN(-1); goto end;
} }
err= mysql_thread_create(key_worker_thread, &thread_id, err= mysql_thread_create(key_worker_thread, &thread_id,
...@@ -731,6 +742,18 @@ static int create_worker(thread_group_t *thread_group) ...@@ -731,6 +742,18 @@ static int create_worker(thread_group_t *thread_group)
thread_group->pending_thread_start_count++; thread_group->pending_thread_start_count++;
thread_group->last_thread_creation_time=microsecond_interval_timer(); thread_group->last_thread_creation_time=microsecond_interval_timer();
} }
else
{
my_errno= errno;
}
end:
if (err)
print_pool_blocked_message(max_threads_reached);
else
pool_block_start= 0; /* Reset pool blocked timer, if it was set */
DBUG_RETURN(err); DBUG_RETURN(err);
} }
...@@ -816,7 +839,7 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr) ...@@ -816,7 +839,7 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
DBUG_ENTER("thread_group_init"); DBUG_ENTER("thread_group_init");
thread_group->pthread_attr = thread_attr; thread_group->pthread_attr = thread_attr;
mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL); mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
thread_group->pollfd=-1; thread_group->pollfd= -1;
thread_group->shutdown_pipe[0]= -1; thread_group->shutdown_pipe[0]= -1;
thread_group->shutdown_pipe[1]= -1; thread_group->shutdown_pipe[1]= -1;
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -1113,6 +1136,7 @@ connection_t *alloc_connection(THD *thd) ...@@ -1113,6 +1136,7 @@ connection_t *alloc_connection(THD *thd)
connection->thd = thd; connection->thd = thd;
connection->waiting= false; connection->waiting= false;
connection->logged_in= false; connection->logged_in= false;
connection->bound_to_poll_descriptor= false;
connection->abs_wait_timeout= ULONGLONG_MAX; connection->abs_wait_timeout= ULONGLONG_MAX;
} }
DBUG_RETURN(connection); DBUG_RETURN(connection);
...@@ -1289,8 +1313,11 @@ static int change_group(connection_t *c, ...@@ -1289,8 +1313,11 @@ static int change_group(connection_t *c,
/* Remove connection from the old group. */ /* Remove connection from the old group. */
mysql_mutex_lock(&old_group->mutex); mysql_mutex_lock(&old_group->mutex);
if (c->logged_in) if (c->bound_to_poll_descriptor)
{
io_poll_disassociate_fd(old_group->pollfd,fd); io_poll_disassociate_fd(old_group->pollfd,fd);
c->bound_to_poll_descriptor= false;
}
c->thread_group->connection_count--; c->thread_group->connection_count--;
mysql_mutex_unlock(&old_group->mutex); mysql_mutex_unlock(&old_group->mutex);
...@@ -1325,22 +1352,18 @@ static int start_io(connection_t *connection) ...@@ -1325,22 +1352,18 @@ static int start_io(connection_t *connection)
if (group != connection->thread_group) if (group != connection->thread_group)
{ {
if (!change_group(connection, connection->thread_group, group)) if (change_group(connection, connection->thread_group, group))
{ {
connection->logged_in= true;
return io_poll_associate_fd(group->pollfd, fd, connection);
}
else
return -1; return -1;
} }
}
/* /*
In case binding to a poll descriptor was not yet done, Bind to poll descriptor if not yet done.
(start_io called first time), do it now.
*/ */
if(!connection->logged_in) if(!connection->bound_to_poll_descriptor)
{ {
connection->logged_in= true; connection->bound_to_poll_descriptor= true;
return io_poll_associate_fd(group->pollfd, fd, connection); return io_poll_associate_fd(group->pollfd, fd, connection);
} }
...@@ -1353,36 +1376,34 @@ static void handle_event(connection_t *connection) ...@@ -1353,36 +1376,34 @@ static void handle_event(connection_t *connection)
{ {
DBUG_ENTER("handle_event"); DBUG_ENTER("handle_event");
int ret; int err;
if (!connection->logged_in) if (!connection->logged_in)
{ {
ret= threadpool_add_connection(connection->thd); err = threadpool_add_connection(connection->thd);
connection->logged_in= true;
} }
else else
{ {
ret= threadpool_process_request(connection->thd); err = threadpool_process_request(connection->thd);
} }
if(!ret) if(!err)
{ {
set_wait_timeout(connection); set_wait_timeout(connection);
ret= start_io(connection); err= start_io(connection);
} }
if (ret) if (err)
{ {
connection_abort(connection); connection_abort(connection);
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/** /**
Worker thread's main Worker thread's main
*/ */
static void *worker_main(void *param) static void *worker_main(void *param)
{ {
...@@ -1432,7 +1453,8 @@ static void *worker_main(void *param) ...@@ -1432,7 +1453,8 @@ static void *worker_main(void *param)
my_atomic_add32(&tp_stats.num_worker_threads, -1); my_atomic_add32(&tp_stats.num_worker_threads, -1);
/* If it is the last thread in group and pool is terminating, destroy group.*/ /* If it is the last thread in group and pool is terminating, destroy group.*/
if (thread_group->shutdown && thread_group->thread_count == 0 if (thread_group->shutdown
&& thread_group->thread_count == 0
&& thread_group->pending_thread_start_count == 0) && thread_group->pending_thread_start_count == 0)
{ {
thread_group_destroy(thread_group); thread_group_destroy(thread_group);
...@@ -1480,7 +1502,8 @@ void tp_end() ...@@ -1480,7 +1502,8 @@ void tp_end()
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/* Ensure that poll descriptors are created when threadpool_size changes */
/** Ensure that poll descriptors are created when threadpool_size changes */
int tp_set_threadpool_size(uint size) int tp_set_threadpool_size(uint size)
{ {
bool success= true; bool success= true;
...@@ -1513,8 +1536,8 @@ void tp_set_threadpool_stall_limit(uint limit) ...@@ -1513,8 +1536,8 @@ void tp_set_threadpool_stall_limit(uint limit)
return; return;
mysql_mutex_lock(&(pool_timer.mutex)); mysql_mutex_lock(&(pool_timer.mutex));
pool_timer.tick_interval= limit; pool_timer.tick_interval= limit;
mysql_cond_signal(&(pool_timer.cond));
mysql_mutex_unlock(&(pool_timer.mutex)); mysql_mutex_unlock(&(pool_timer.mutex));
mysql_cond_signal(&(pool_timer.cond));
} }
...@@ -1527,9 +1550,61 @@ void tp_set_threadpool_stall_limit(uint limit) ...@@ -1527,9 +1550,61 @@ void tp_set_threadpool_stall_limit(uint limit)
int tp_get_idle_thread_count() int tp_get_idle_thread_count()
{ {
int sum=0; int sum=0;
for(uint i= 0; i< array_elements(all_groups) && (all_groups[i].pollfd >= 0); i++) for(uint i= 0;
i< array_elements(all_groups) && (all_groups[i].pollfd >= 0);
i++)
{ {
sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count); sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
} }
return sum; return sum;
} }
/* Report threadpool problems */
#define BLOCK_MSG_DELAY 30
static const char *max_threads_reached_msg=
"Threadpool could not create additional thread to handle queries, because the "
"number of allowed threads was reached. Increasing 'thread_pool_max_threads' "
"parameter can help in this situation.\n"
"If 'extra_port' parameter is set, you can still connect to the database with "
"superuser account (it must be TCP connection using extra_port as TCP port) "
"and troubleshoot the situation. "
"A likely cause of pool blocks are clients that lock resources for long time. "
"'show processlist' or 'show engine innodb status' can give additional hints.";
static const char *create_thread_error_msg=
"Can't create threads in threadpool (errno=%d).";
/**
Write a message when blocking situation in threadpool occurs.
The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds.
It will be just a single message for each blocking situation (to prevent
log flood).
*/
static void print_pool_blocked_message(bool max_threads_reached)
{
time_t now;
static bool msg_written;
now= time(NULL);
if (pool_block_start == 0)
{
pool_block_start= now;
msg_written = false;
return;
}
if(now > pool_block_start + BLOCK_MSG_DELAY && !msg_written)
{
if(max_threads_reached)
sql_print_error(max_threads_reached_msg);
else
sql_print_error(create_thread_error_msg, my_errno);
sql_print_information("Threadpool has been blocked for %u seconds\n",(uint)(now- pool_block_start));
/* avoid reperated messages for the same blocking situation */
msg_written= true;
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment