Commit df48c9bf authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

allow changing thread_pool_size without server restart

parent 2e4bde4c
...@@ -358,6 +358,7 @@ let $wait_condition= ...@@ -358,6 +358,7 @@ let $wait_condition=
AND info = "INSERT INTO t2 SELECT a FROM t1"; AND info = "INSERT INTO t2 SELECT a FROM t1";
--echo # Waiting until INSERT ... is blocked --echo # Waiting until INSERT ... is blocked
--source include/wait_condition.inc --source include/wait_condition.inc
--sleep 0.1
DELETE FROM t1; DELETE FROM t1;
COMMIT; COMMIT;
......
...@@ -2189,6 +2189,7 @@ static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type) ...@@ -2189,6 +2189,7 @@ static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type)
return false; return false;
} }
#ifdef _WIN32 #ifdef _WIN32
static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type) static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type)
{ {
...@@ -2198,6 +2199,14 @@ static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type) ...@@ -2198,6 +2199,14 @@ static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type)
#endif #endif
#ifndef _WIN32
static bool fix_threadpool_size(sys_var*, THD*, enum_var_type)
{
tp_set_threadpool_size(threadpool_size);
return false;
}
#endif
#ifdef _WIN32 #ifdef _WIN32
static Sys_var_uint Sys_threadpool_min_threads( static Sys_var_uint Sys_threadpool_min_threads(
"thread_pool_min_threads", "thread_pool_min_threads",
...@@ -2220,7 +2229,9 @@ static Sys_var_uint Sys_threadpool_size( ...@@ -2220,7 +2229,9 @@ static Sys_var_uint Sys_threadpool_size(
"Number of concurrently executing threads in the pool. " "Number of concurrently executing threads in the pool. "
"Leaving value default (0) sets it to the number of processors.", "Leaving value default (0) sets it to the number of processors.",
GLOBAL_VAR(threadpool_size), CMD_LINE(REQUIRED_ARG), GLOBAL_VAR(threadpool_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1, 128), DEFAULT(my_getncpus()), BLOCK_SIZE(1) VALID_RANGE(1, MAX_THREAD_GROUPS), DEFAULT(my_getncpus()), BLOCK_SIZE(1),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_threadpool_size)
); );
static Sys_var_uint Sys_threadpool_stall_limit( static Sys_var_uint Sys_threadpool_stall_limit(
"thread_pool_stall_limit", "thread_pool_stall_limit",
......
#define MAX_THREAD_GROUPS 128
/* Threadpool parameters */ /* Threadpool parameters */
extern uint threadpool_min_threads; /* Minimum threads in pool */ extern uint threadpool_min_threads; /* Minimum threads in pool */
extern uint threadpool_idle_timeout; /* Shutdown idle worker threads after this timeout */ extern uint threadpool_idle_timeout; /* Shutdown idle worker threads after this timeout */
...@@ -38,6 +40,7 @@ extern TP_STATISTICS tp_stats; ...@@ -38,6 +40,7 @@ extern TP_STATISTICS tp_stats;
/* Functions to set threadpool parameters */ /* Functions to set threadpool parameters */
extern void tp_set_min_threads(uint val); extern void tp_set_min_threads(uint val);
extern void tp_set_max_threads(uint val); extern void tp_set_max_threads(uint val);
extern int tp_set_threadpool_size(uint val);
/* Activate threadpool scheduler */ /* Activate threadpool scheduler */
extern void tp_scheduler(void); extern void tp_scheduler(void);
......
...@@ -98,7 +98,8 @@ struct thread_group_t ...@@ -98,7 +98,8 @@ struct thread_group_t
ulonglong queue_event_count; ulonglong queue_event_count;
} MY_ALIGNED(512); } MY_ALIGNED(512);
static thread_group_t all_groups[128]; static thread_group_t all_groups[MAX_THREAD_GROUPS];
static uint group_count;
/* Global timer for all groups */ /* Global timer for all groups */
struct pool_timer_t struct pool_timer_t
...@@ -213,10 +214,10 @@ int io_poll_start_read(int pollfd, int fd, void *data) ...@@ -213,10 +214,10 @@ int io_poll_start_read(int pollfd, int fd, void *data)
return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev); return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
} }
void io_poll_disassociate_fd(int pollfd, int fd) int io_poll_disassociate_fd(int pollfd, int fd)
{ {
struct epoll_event ev; struct epoll_event ev;
epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev); return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
} }
...@@ -258,11 +259,11 @@ int io_poll_associate_fd(int pollfd, int fd, void *data) ...@@ -258,11 +259,11 @@ int io_poll_associate_fd(int pollfd, int fd, void *data)
} }
int io_poll_disassociate_fd(thread_group_t *thread_group, int fd) int io_poll_disassociate_fd(int pollfd, int fd)
{ {
struct kevent ke; struct kevent ke;
EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
return kevent(thread_group->pollfd, &ke, 1, 0, 0, 0); return kevent(pollfd, &ke, 1, 0, 0, 0);
} }
...@@ -315,6 +316,11 @@ static int io_poll_associate_fd(int pollfd, int fd, void *data) ...@@ -315,6 +316,11 @@ static int io_poll_associate_fd(int pollfd, int fd, void *data)
return io_poll_start_read(pollfd, fd, data); return io_poll_start_read(pollfd, fd, data);
} }
int io_poll_disassociate_fd(int pollfd, int fd)
{
return 0;
}
int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms) int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
{ {
struct timespec ts; struct timespec ts;
...@@ -466,7 +472,7 @@ static void* timer_thread(void *param) ...@@ -466,7 +472,7 @@ static void* timer_thread(void *param)
timer->current_microtime= microsecond_interval_timer(); timer->current_microtime= microsecond_interval_timer();
/* Check stallls in thread groups */ /* Check stallls in thread groups */
for(i=0; i< threadpool_size;i++) for(i=0; i< array_elements(all_groups);i++)
{ {
if(all_groups[i].connection_count) if(all_groups[i].connection_count)
check_stall(&all_groups[i]); check_stall(&all_groups[i]);
...@@ -723,21 +729,9 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr) ...@@ -723,21 +729,9 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
SLIST_INIT(&thread_group->waiting_threads); SLIST_INIT(&thread_group->waiting_threads);
thread_group->pending_thread_start_count= 0; thread_group->pending_thread_start_count= 0;
thread_group->pollfd= io_poll_create();
thread_group->stalled= false; thread_group->stalled= false;
if (thread_group->pollfd < 0)
{ thread_group->pollfd= -1;
DBUG_RETURN(-1);
}
if (pipe(thread_group->shutdown_pipe))
{
DBUG_RETURN(-1);
}
if (io_poll_associate_fd(thread_group->pollfd,
thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT))
{
DBUG_RETURN(-1);
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -772,9 +766,29 @@ static void thread_group_close(thread_group_t *thread_group) ...@@ -772,9 +766,29 @@ static void thread_group_close(thread_group_t *thread_group)
char c= 0; char c= 0;
mysql_mutex_lock(&thread_group->mutex); mysql_mutex_lock(&thread_group->mutex);
if (thread_group->thread_count == 0 &&
thread_group->pending_thread_start_count == 0)
{
if (thread_group->pollfd >= 0)
close(thread_group->pollfd);
mysql_mutex_unlock(&thread_group->mutex);
mysql_mutex_destroy(&thread_group->mutex);
DBUG_VOID_RETURN;
}
thread_group->shutdown= true; thread_group->shutdown= true;
thread_group->listener= NULL; thread_group->listener= NULL;
if (pipe(thread_group->shutdown_pipe))
{
DBUG_VOID_RETURN;
}
if (io_poll_associate_fd(thread_group->pollfd,
thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT))
{
DBUG_VOID_RETURN;
}
/* Wake listener. */ /* Wake listener. */
if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -1000,7 +1014,7 @@ void tp_add_connection(THD *thd) ...@@ -1000,7 +1014,7 @@ void tp_add_connection(THD *thd)
connection_t *c= alloc_connection(thd); connection_t *c= alloc_connection(thd);
if(c) if(c)
{ {
c->thread_group= &all_groups[c->thd->thread_id%threadpool_size]; c->thread_group= &all_groups[c->thd->thread_id%group_count];
mysql_mutex_lock(&c->thread_group->mutex); mysql_mutex_lock(&c->thread_group->mutex);
c->thread_group->connection_count++; c->thread_group->connection_count++;
mysql_mutex_unlock(&c->thread_group->mutex); mysql_mutex_unlock(&c->thread_group->mutex);
...@@ -1101,6 +1115,87 @@ static void set_wait_timeout(connection_t *c) ...@@ -1101,6 +1115,87 @@ static void set_wait_timeout(connection_t *c)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/*
Handle a (rare) special case,where connection needs to
migrate to a different group because group_count has changed
as a result of thread_pool_size setting.
*/
static int change_group(connection_t *c,
thread_group_t *old_group,
thread_group_t *new_group)
{
int ret= 0;
int fd = c->thd->net.vio->sd;
DBUG_ASSERT(c->thread_group == old_group);
/* Remove connection from the old group. */
mysql_mutex_lock(&old_group->mutex);
if (c->logged_in)
io_poll_disassociate_fd(old_group->pollfd,fd);
c->thread_group->connection_count--;
mysql_mutex_lock(&old_group->mutex);
/* Add connection to the new group. */
mysql_mutex_lock(&new_group->mutex);
c->thread_group= new_group;
new_group->connection_count++;
/* Ensure that there is a listener in the new group. */
if(!new_group->thread_count && !new_group->pending_thread_start_count)
ret= create_worker(new_group);
mysql_mutex_unlock(&new_group->mutex);
return ret;
}
static int start_io(connection_t *c)
{
int fd = c->thd->net.vio->sd;
/*
Usually, connection will stay in the same group for the entire
connection's life. However, we do allow group_count to
change at runtime, which means in rare cases when it changes is
connection should need to migrate to another group, this ensures
to ensure equal load between groups.
So we recalculate in which group the connection should be, based
on thread_id and current group count, and migrate if necessary.
*/
thread_group_t *g = &all_groups[c->thd->thread_id%group_count];
if (g != c->thread_group)
{
if (!change_group(c, c->thread_group, g))
{
c->logged_in= true;
return io_poll_associate_fd(c->thread_group->pollfd, fd, c);
}
else
return -1;
}
/*
Handle case where connection is not yet logged in, i.e
not associated with poll fd.
*/
if(!c->logged_in)
{
c->logged_in= true;
return io_poll_associate_fd(c->thread_group->pollfd, fd, c);
}
return io_poll_start_read(c->thread_group->pollfd, fd, c);
}
static void handle_event(pool_event_t *ev) static void handle_event(pool_event_t *ev)
{ {
...@@ -1108,13 +1203,11 @@ static void handle_event(pool_event_t *ev) ...@@ -1108,13 +1203,11 @@ static void handle_event(pool_event_t *ev)
/* Normal case, handle query on connection */ /* Normal case, handle query on connection */
connection_t *c = (connection_t*)(void *)ev; connection_t *c = (connection_t*)(void *)ev;
bool do_login = (!c->logged_in);
int ret; int ret;
if (do_login) if (!c->logged_in)
{ {
ret= threadpool_add_connection(c->thd); ret= threadpool_add_connection(c->thd);
c->logged_in= true;
} }
else else
{ {
...@@ -1124,13 +1217,7 @@ static void handle_event(pool_event_t *ev) ...@@ -1124,13 +1217,7 @@ static void handle_event(pool_event_t *ev)
if(!ret) if(!ret)
{ {
set_wait_timeout(c); set_wait_timeout(c);
int fd = c->thd->net.vio->sd; ret= start_io(c);
if (do_login)
{
ret= io_poll_associate_fd(c->thread_group->pollfd, fd, c);
}
else
ret= io_poll_start_read(c->thread_group->pollfd, fd, c);
} }
if (ret) if (ret)
...@@ -1159,8 +1246,8 @@ static void *worker_main(void *param) ...@@ -1159,8 +1246,8 @@ static void *worker_main(void *param)
this_thread.thread_group= thread_group; this_thread.thread_group= thread_group;
this_thread.event_count=0; this_thread.event_count=0;
my_atomic_add32(&tp_stats.num_worker_threads, 1);
mysql_mutex_lock(&thread_group->mutex); mysql_mutex_lock(&thread_group->mutex);
tp_stats.num_worker_threads++;
thread_group->thread_count++; thread_group->thread_count++;
thread_group->active_thread_count++; thread_group->active_thread_count++;
thread_group->pending_thread_start_count--; thread_group->pending_thread_start_count--;
...@@ -1187,8 +1274,8 @@ static void *worker_main(void *param) ...@@ -1187,8 +1274,8 @@ static void *worker_main(void *param)
mysql_mutex_lock(&thread_group->mutex); mysql_mutex_lock(&thread_group->mutex);
thread_group->active_thread_count--; thread_group->active_thread_count--;
thread_group->thread_count--; thread_group->thread_count--;
tp_stats.num_worker_threads--;
mysql_mutex_unlock(&thread_group->mutex); mysql_mutex_unlock(&thread_group->mutex);
my_atomic_add32(&tp_stats.num_worker_threads, -1);
/* If it is the last thread in pool and pool is terminating, destroy pool.*/ /* If it is the last thread in pool and pool is terminating, destroy pool.*/
if (thread_group->shutdown && (thread_group->thread_count == 0)) if (thread_group->shutdown && (thread_group->thread_count == 0))
...@@ -1209,15 +1296,12 @@ bool tp_init() ...@@ -1209,15 +1296,12 @@ bool tp_init()
DBUG_ENTER("tp_init"); DBUG_ENTER("tp_init");
started = true; started = true;
scheduler_init(); scheduler_init();
if (threadpool_size == 0)
{
threadpool_size= my_getncpus();
}
for(uint i=0; i < threadpool_size; i++) for(uint i=0; i < array_elements(all_groups); i++)
{ {
thread_group_init(&all_groups[i], get_connection_attrib()); thread_group_init(&all_groups[i], get_connection_attrib());
} }
tp_set_threadpool_size(threadpool_size);
#define PSI_register(X) \ #define PSI_register(X) \
if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list)) if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
...@@ -1239,9 +1323,33 @@ void tp_end() ...@@ -1239,9 +1323,33 @@ void tp_end()
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
stop_timer(&pool_timer); stop_timer(&pool_timer);
for(uint i=0; i< threadpool_size; i++) for(uint i=0; i< array_elements(all_groups); i++)
{ {
thread_group_close(&all_groups[i]); thread_group_close(&all_groups[i]);
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/* Ensure that poll descriptors are created when threadpool_size changes */
int tp_set_threadpool_size(uint size)
{
bool success= true;
for(uint i=0; i< size; i++)
{
thread_group_t *group= &all_groups[i];
mysql_mutex_lock(&group->mutex);
if (group->pollfd == -1)
{
group->pollfd= io_poll_create();
success= (group->pollfd >= 0);
}
mysql_mutex_unlock(&all_groups[i].mutex);
if (!success)
{
group_count= i-1;
return -1;
}
}
group_count= size;
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment