Commit 91734431 authored by Sergey Vojtovich's avatar Sergey Vojtovich

Move all thread cache specific code to a new class

Part of
MDEV-18353 - Shutdown may miss to wait for connection thread
parent 8ad3c615
This diff is collapsed.
...@@ -79,8 +79,6 @@ void close_connection(THD *thd, uint sql_errno= 0); ...@@ -79,8 +79,6 @@ void close_connection(THD *thd, uint sql_errno= 0);
void handle_connection_in_main_thread(CONNECT *thd); void handle_connection_in_main_thread(CONNECT *thd);
void create_thread_to_handle_connection(CONNECT *connect); void create_thread_to_handle_connection(CONNECT *connect);
void unlink_thd(THD *thd); void unlink_thd(THD *thd);
CONNECT *cache_thread(THD *thd);
void flush_thread_cache();
void refresh_status(THD *thd); void refresh_status(THD *thd);
bool is_secure_file_path(char *path); bool is_secure_file_path(char *path);
extern void init_net_server_extension(THD *thd); extern void init_net_server_extension(THD *thd);
...@@ -237,7 +235,6 @@ extern ulong slave_trans_retries; ...@@ -237,7 +235,6 @@ extern ulong slave_trans_retries;
extern ulong slave_trans_retry_interval; extern ulong slave_trans_retry_interval;
extern uint slave_net_timeout; extern uint slave_net_timeout;
extern int max_user_connections; extern int max_user_connections;
extern volatile ulong cached_thread_count;
extern ulong what_to_log,flush_time; extern ulong what_to_log,flush_time;
extern uint max_prepared_stmt_count, prepared_stmt_count; extern uint max_prepared_stmt_count, prepared_stmt_count;
extern MYSQL_PLUGIN_IMPORT ulong open_files_limit; extern MYSQL_PLUGIN_IMPORT ulong open_files_limit;
...@@ -373,8 +370,7 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, ...@@ -373,8 +370,7 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_relay_log_info_start_cond, key_relay_log_info_stop_cond, key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
key_rpl_group_info_sleep_cond, key_rpl_group_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond, key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_start_thread, key_COND_start_thread;
key_COND_thread_cache, key_COND_flush_thread_cache;
extern PSI_cond_key key_RELAYLOG_COND_relay_log_updated, extern PSI_cond_key key_RELAYLOG_COND_relay_log_updated,
key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready, key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready,
key_COND_wait_commit; key_COND_wait_commit;
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#endif #endif
#include "sql_audit.h" #include "sql_audit.h"
#include "sql_connect.h" #include "sql_connect.h"
#include "thread_cache.h"
#include "probes_mysql.h" #include "probes_mysql.h"
#include "sql_parse.h" // sql_command_flags, #include "sql_parse.h" // sql_command_flags,
// execute_init_command, // execute_init_command,
...@@ -1420,7 +1421,7 @@ void do_handle_one_connection(CONNECT *connect, bool put_in_cache) ...@@ -1420,7 +1421,7 @@ void do_handle_one_connection(CONNECT *connect, bool put_in_cache)
unlink_thd(thd); unlink_thd(thd);
if (IF_WSREP(thd->wsrep_applier, false) || !put_in_cache || if (IF_WSREP(thd->wsrep_applier, false) || !put_in_cache ||
!(connect= cache_thread(thd))) !(connect= thread_cache.park()))
break; break;
/* Create new instrumentation for the new THD job */ /* Create new instrumentation for the new THD job */
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "sql_acl.h" // acl_reload #include "sql_acl.h" // acl_reload
#include "sql_servers.h" // servers_reload #include "sql_servers.h" // servers_reload
#include "sql_connect.h" // reset_mqh #include "sql_connect.h" // reset_mqh
#include "thread_cache.h"
#include "sql_base.h" // close_cached_tables #include "sql_base.h" // close_cached_tables
#include "sql_db.h" // my_dbopt_cleanup #include "sql_db.h" // my_dbopt_cleanup
#include "hostname.h" // hostname_cache_refresh #include "hostname.h" // hostname_cache_refresh
...@@ -351,7 +352,7 @@ bool reload_acl_and_cache(THD *thd, unsigned long long options, ...@@ -351,7 +352,7 @@ bool reload_acl_and_cache(THD *thd, unsigned long long options,
if (thd && (options & REFRESH_STATUS)) if (thd && (options & REFRESH_STATUS))
refresh_status(thd); refresh_status(thd);
if (options & REFRESH_THREADS) if (options & REFRESH_THREADS)
flush_thread_cache(); thread_cache.flush();
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (options & REFRESH_MASTER) if (options & REFRESH_MASTER)
{ {
......
...@@ -27,6 +27,8 @@ ...@@ -27,6 +27,8 @@
#include "my_json_writer.h" #include "my_json_writer.h"
#include <hash.h> #include <hash.h>
#include <thr_alarm.h> #include <thr_alarm.h>
#include "sql_connect.h"
#include "thread_cache.h"
#if defined(HAVE_MALLINFO) && defined(HAVE_MALLOC_H) #if defined(HAVE_MALLINFO) && defined(HAVE_MALLOC_H)
#include <malloc.h> #include <malloc.h>
#elif defined(HAVE_MALLINFO) && defined(HAVE_SYS_MALLOC_H) #elif defined(HAVE_MALLINFO) && defined(HAVE_SYS_MALLOC_H)
...@@ -568,7 +570,7 @@ void mysql_print_status() ...@@ -568,7 +570,7 @@ void mysql_print_status()
(void) my_getwd(current_dir, sizeof(current_dir),MYF(0)); (void) my_getwd(current_dir, sizeof(current_dir),MYF(0));
printf("Current dir: %s\n", current_dir); printf("Current dir: %s\n", current_dir);
printf("Running threads: %d Cached threads: %lu Stack size: %ld\n", printf("Running threads: %d Cached threads: %lu Stack size: %ld\n",
count, cached_thread_count, count, thread_cache.size(),
(long) my_thread_stack_size); (long) my_thread_stack_size);
#ifdef EXTRA_DEBUG #ifdef EXTRA_DEBUG
thr_print_locks(); // Write some debug info thr_print_locks(); // Write some debug info
...@@ -641,7 +643,8 @@ Memory allocated by threads: %s\n", ...@@ -641,7 +643,8 @@ Memory allocated by threads: %s\n",
llstr(info.uordblks, llbuff[4]), llstr(info.uordblks, llbuff[4]),
llstr(info.fordblks, llbuff[5]), llstr(info.fordblks, llbuff[5]),
llstr(info.keepcost, llbuff[6]), llstr(info.keepcost, llbuff[6]),
llstr((count + cached_thread_count)* my_thread_stack_size + info.hblkhd + info.arena, llbuff[7]), llstr((count + thread_cache.size()) * my_thread_stack_size +
info.hblkhd + info.arena, llbuff[7]),
llstr(tmp.global_memory_used, llbuff[8]), llstr(tmp.global_memory_used, llbuff[8]),
llstr(tmp.local_memory_used, llbuff[9])); llstr(tmp.local_memory_used, llbuff[9]));
......
/*
Copyright (C) 2020 MariaDB Foundation
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
*/
/**
MariaDB thread cache for "one thread per connection" scheduler.
Thread cache allows to re-use threads (as well as THD objects) for
subsequent connections.
*/
class Thread_cache
{
mutable mysql_cond_t COND_thread_cache;
mutable mysql_cond_t COND_flush_thread_cache;
mutable mysql_mutex_t LOCK_thread_cache;
/** Queue of new connection requests. */
I_List<CONNECT> list;
/** Number of threads parked in the cache. */
ulong cached_thread_count;
/** Number of active flush requests. */
uint32_t kill_cached_threads;
/**
PFS stuff, only used during initialization.
Unfortunately needs to survive till destruction.
*/
PSI_cond_key key_COND_thread_cache, key_COND_flush_thread_cache;
PSI_mutex_key key_LOCK_thread_cache;
public:
void init()
{
#ifdef HAVE_PSI_INTERFACE
PSI_cond_info conds[]=
{
{ &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL },
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache",
PSI_FLAG_GLOBAL }
};
PSI_mutex_info mutexes[]=
{
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL }
};
mysql_mutex_register("sql", mutexes, array_elements(mutexes));
mysql_cond_register("sql", conds, array_elements(conds));
#endif
mysql_mutex_init(key_LOCK_thread_cache, &LOCK_thread_cache,
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_thread_cache, &COND_thread_cache, 0);
mysql_cond_init(key_COND_flush_thread_cache, &COND_flush_thread_cache, 0);
list.empty();
kill_cached_threads= 0;
cached_thread_count= 0;
}
void destroy()
{
DBUG_ASSERT(cached_thread_count == 0);
DBUG_ASSERT(list.is_empty());
mysql_cond_destroy(&COND_flush_thread_cache);
mysql_cond_destroy(&COND_thread_cache);
mysql_mutex_destroy(&LOCK_thread_cache);
}
/**
Flushes thread cache.
Awakes parked threads and requests them to shutdown.
Waits until last parked thread leaves the cache.
*/
void flush()
{
mysql_mutex_lock(&LOCK_thread_cache);
kill_cached_threads++;
while (cached_thread_count)
{
mysql_cond_broadcast(&COND_thread_cache);
mysql_cond_wait(&COND_flush_thread_cache, &LOCK_thread_cache);
}
kill_cached_threads--;
mysql_mutex_unlock(&LOCK_thread_cache);
}
/**
Flushes thread cache and forbids threads parking in the cache.
This is a pre-shutdown hook.
*/
void final_flush()
{
kill_cached_threads++;
flush();
}
/**
Requests parked thread to serve new connection.
@return
@retval true connection is enqueued and parked thread is about to serve it
@retval false thread cache is empty
*/
bool enqueue(CONNECT *connect)
{
mysql_mutex_lock(&LOCK_thread_cache);
if (cached_thread_count)
{
list.push_back(connect);
cached_thread_count--;
mysql_mutex_unlock(&LOCK_thread_cache);
mysql_cond_signal(&COND_thread_cache);
return true;
}
mysql_mutex_unlock(&LOCK_thread_cache);
return false;
}
/**
Parks thread in the cache.
Thread execution is suspended until either of the following occurs:
- thread is requested to serve new connection;
- thread cache is flushed;
- THREAD_CACHE_TIMEOUT elapsed.
@return
@retval pointer to CONNECT if requested to serve new connection
@retval 0 if thread cache is flushed or on timeout
*/
CONNECT *park()
{
struct timespec abstime;
CONNECT *connect;
bool flushed= false;
DBUG_ENTER("Thread_cache::park");
set_timespec(abstime, THREAD_CACHE_TIMEOUT);
/*
Delete the instrumentation for the job that just completed,
before parking this pthread in the cache (blocked on COND_thread_cache).
*/
PSI_CALL_delete_current_thread();
#ifndef DBUG_OFF
while (_db_is_pushed_())
_db_pop_();
#endif
mysql_mutex_lock(&LOCK_thread_cache);
if ((connect= list.get()))
cached_thread_count++;
else if (cached_thread_count < thread_cache_size && !kill_cached_threads)
{
/* Don't kill the thread, just put it in cache for reuse */
DBUG_PRINT("info", ("Adding thread to cache"));
cached_thread_count++;
for (;;)
{
int error= mysql_cond_timedwait(&COND_thread_cache, &LOCK_thread_cache,
&abstime);
flushed= kill_cached_threads;
if ((connect= list.get()))
break;
else if (flushed || error == ETIMEDOUT || error == ETIME)
{
/*
If timeout, end thread.
If a new thread is requested, we will handle the call, even if we
got a timeout (as we are already awake and free)
*/
cached_thread_count--;
break;
}
}
}
mysql_mutex_unlock(&LOCK_thread_cache);
if (flushed)
mysql_cond_signal(&COND_flush_thread_cache);
DBUG_RETURN(connect);
}
/** Returns the number of parked threads. */
ulong size() const
{
mysql_mutex_lock(&LOCK_thread_cache);
ulong r= cached_thread_count;
mysql_mutex_unlock(&LOCK_thread_cache);
return r;
}
};
extern Thread_cache thread_cache;
...@@ -46,6 +46,8 @@ ...@@ -46,6 +46,8 @@
#include <cstdlib> #include <cstdlib>
#include <string> #include <string>
#include "log_event.h" #include "log_event.h"
#include "sql_connect.h"
#include "thread_cache.h"
#include <sstream> #include <sstream>
...@@ -64,8 +66,6 @@ const char *wsrep_SR_store_types[]= { "none", "table", NullS }; ...@@ -64,8 +66,6 @@ const char *wsrep_SR_store_types[]= { "none", "table", NullS };
*/ */
extern my_bool plugins_are_initialized; extern my_bool plugins_are_initialized;
extern uint kill_cached_threads;
extern mysql_cond_t COND_thread_cache;
/* System variables. */ /* System variables. */
const char *wsrep_provider; const char *wsrep_provider;
...@@ -2593,8 +2593,7 @@ static my_bool kill_remaining_threads(THD *thd, THD *caller_thd) ...@@ -2593,8 +2593,7 @@ static my_bool kill_remaining_threads(THD *thd, THD *caller_thd)
void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd) void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
{ {
/* Clear thread cache */ /* Clear thread cache */
kill_cached_threads++; thread_cache.final_flush();
flush_thread_cache();
/* /*
First signal all threads that it's time to die First signal all threads that it's time to die
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment