Commit b1dcf448 authored by Mats Kindahl's avatar Mats Kindahl

WL#5363: Thread Pool Service Interface

In order to allow thread schedulers to be dynamically loaded,
it is necessary to make the following changes to the server:

- Two new service interfaces

- Modifications to InnoDB to inform the thread scheduler of state changes.

- Changes to the VIO subsystem for checking if data is available on a socket.

- Elimination of remains of the old thread pool implementation.

The two new service interfaces introduces are:

my_thread_scheduler
  A service interface to register a thread
  scheduler.

thd_wait
  A service interface to inform thread scheduler
  that the thread is about to start waiting.

In addition, the patch adds code that:

- Add a call to thd_wait for table locks in mysys
  thd_lock.c by introducing a set function that
  can be used to set a callback to be used when
  waiting on a lock and resuming from waiting.

- Calling the mysys set function from the server
  to set the callbacks correctly.
parent 0c8c4a71
[MYSQL] [MYSQL]
post_commit_to = "commits@lists.mysql.com" # post_commit_to = "commits@lists.mysql.com"
post_push_to = "commits@lists.mysql.com" # post_push_to = "commits@lists.mysql.com"
tree_name = "mysql-trunk" tree_name = "mysql-trunk"
...@@ -24,6 +24,8 @@ HEADERS_ABI = mysql.h mysql_com.h mysql_time.h \ ...@@ -24,6 +24,8 @@ HEADERS_ABI = mysql.h mysql_com.h mysql_time.h \
pkginclude_HEADERS = $(HEADERS_ABI) my_dbug.h m_string.h my_sys.h \ pkginclude_HEADERS = $(HEADERS_ABI) my_dbug.h m_string.h my_sys.h \
my_xml.h mysql_embed.h mysql/services.h \ my_xml.h mysql_embed.h mysql/services.h \
mysql/service_my_snprintf.h mysql/service_thd_alloc.h \ mysql/service_my_snprintf.h mysql/service_thd_alloc.h \
mysql/service_thread_scheduler.h \
mysql/service_thd_wait.h \
my_pthread.h my_no_pthread.h \ my_pthread.h my_no_pthread.h \
mysql/psi/psi.h mysql/psi/mysql_thread.h \ mysql/psi/psi.h mysql/psi/mysql_thread.h \
mysql/psi/mysql_file.h \ mysql/psi/mysql_file.h \
......
...@@ -20,13 +20,6 @@ ...@@ -20,13 +20,6 @@
#define BIG_TABLES #define BIG_TABLES
/*
Minimal version of Windows we should be able to run on.
Currently Windows XP.
*/
#define _WIN32_WINNT 0x0501
#if defined(_MSC_VER) && _MSC_VER >= 1400 #if defined(_MSC_VER) && _MSC_VER >= 1400
/* Avoid endless warnings about sprintf() etc. being unsafe. */ /* Avoid endless warnings about sprintf() etc. being unsafe. */
#define _CRT_SECURE_NO_DEPRECATE 1 #define _CRT_SECURE_NO_DEPRECATE 1
......
...@@ -71,7 +71,7 @@ typedef struct st_mysql_xid MYSQL_XID; ...@@ -71,7 +71,7 @@ typedef struct st_mysql_xid MYSQL_XID;
Plugin API. Common for all plugin types. Plugin API. Common for all plugin types.
*/ */
#define MYSQL_PLUGIN_INTERFACE_VERSION 0x0101 #define MYSQL_PLUGIN_INTERFACE_VERSION 0x0102
/* /*
The allowable types of plugins The allowable types of plugins
......
...@@ -33,6 +33,27 @@ void *thd_memdup(void* thd, const void* str, unsigned int size); ...@@ -33,6 +33,27 @@ void *thd_memdup(void* thd, const void* str, unsigned int size);
MYSQL_LEX_STRING *thd_make_lex_string(void* thd, MYSQL_LEX_STRING *lex_str, MYSQL_LEX_STRING *thd_make_lex_string(void* thd, MYSQL_LEX_STRING *lex_str,
const char *str, unsigned int size, const char *str, unsigned int size,
int allocate_lex_string); int allocate_lex_string);
#include <mysql/service_thd_wait.h>
typedef enum _thd_wait_type_e {
THD_WAIT_MUTEX= 1,
THD_WAIT_DISKIO= 2,
THD_WAIT_ROW_TABLE_LOCK= 3,
THD_WAIT_GLOBAL_LOCK= 4
} thd_wait_type;
extern struct thd_wait_service_st {
void (*thd_wait_begin_func)(void*, thd_wait_type);
void (*thd_wait_end_func)(void*);
} *thd_wait_service;
void thd_wait_begin(void* thd, thd_wait_type wait_type);
void thd_wait_end(void* thd);
#include <mysql/service_thread_scheduler.h>
struct scheduler_functions;
extern struct my_thread_scheduler_service {
int (*set)(struct scheduler_functions *scheduler);
int (*reset)();
} *my_thread_scheduler_service;
int my_thread_scheduler_set(struct scheduler_functions *scheduler);
int my_thread_scheduler_reset();
struct st_mysql_xid { struct st_mysql_xid {
long formatID; long formatID;
long gtrid_length; long gtrid_length;
......
/* Copyright (C) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef MYSQL_SERVICE_THD_WAIT_INCLUDED
#define MYSQL_SERVICE_THD_WAIT_INCLUDED
/**
@file include/mysql/service_thd_wait.h
This service provides functions for plugins and storage engines to report
when they are going to sleep/stall.
SYNOPSIS
thd_wait_begin() - call just before a wait begins
thd Thread object
Use NULL if the thd is NOT known.
wait_type Type of wait
1 -- short wait (e.g. for mutex)
2 -- medium wait (e.g. for disk io)
3 -- large wait (e.g. for locked row/table)
NOTES
This is used by the threadpool to have better knowledge of which
threads that currently are actively running on CPUs. When a thread
reports that it's going to sleep/stall, the threadpool scheduler is
free to start another thread in the pool most likely. The expected wait
time is simply an indication of how long the wait is expected to
become, the real wait time could be very different.
thd_wait_end() called immediately after the wait is complete
thd_wait_end() MUST be called if thd_wait_begin() was called.
Using thd_wait_...() service is optional but recommended. Using it will
improve performance as the thread pool will be more active at managing the
thread workload.
*/
#ifdef __cplusplus
extern "C" {
#endif
typedef enum _thd_wait_type_e {
THD_WAIT_MUTEX= 1,
THD_WAIT_DISKIO= 2,
THD_WAIT_ROW_TABLE_LOCK= 3,
THD_WAIT_GLOBAL_LOCK= 4
} thd_wait_type;
extern struct thd_wait_service_st {
void (*thd_wait_begin_func)(MYSQL_THD, thd_wait_type);
void (*thd_wait_end_func)(MYSQL_THD);
} *thd_wait_service;
#ifdef MYSQL_DYNAMIC_PLUGIN
#define thd_wait_begin(_THD, _WAIT_TYPE) \
thd_wait_service->thd_wait_begin_func(_THD, _WAIT_TYPE)
#define thd_wait_end(_THD) thd_wait_service->thd_wait_end_func(_THD)
#else
void thd_wait_begin(MYSQL_THD thd, thd_wait_type wait_type);
void thd_wait_end(MYSQL_THD thd);
#endif
#ifdef __cplusplus
}
#endif
#endif
/*
Copyright (C) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef SERVICE_THREAD_SCHEDULER_INCLUDED
#define SERVICE_THREAD_SCHEDULER_INCLUDED
#ifdef __cplusplus
extern "C" {
#endif
struct scheduler_functions;
extern struct my_thread_scheduler_service {
int (*set)(struct scheduler_functions *scheduler);
int (*reset)();
} *my_thread_scheduler_service;
#ifdef MYSQL_DYNAMIC_PLUGIN
#define my_thread_scheduler_set(F) my_thread_scheduler_service->set((F))
#define my_thread_scheduler_reset() my_thread_scheduler_service->reset()
#else
/**
Set the thread scheduler to use for the server.
@param scheduler Pointer to scheduler callbacks to use.
@retval 0 Scheduler installed correctly.
@retval 1 Invalid value (NULL) used for scheduler.
*/
int my_thread_scheduler_set(struct scheduler_functions *scheduler);
/**
Restore the previous thread scheduler.
@note If no thread scheduler was installed previously with
thd_set_thread_scheduler, this function will report an error.
@retval 0 Scheduler installed correctly.
@retval 1 No scheduler installed.
*/
int my_thread_scheduler_reset();
#endif
#ifdef __cplusplus
}
#endif
#endif /* SERVICE_THREAD_SCHEDULER_INCLUDED */
...@@ -20,6 +20,8 @@ extern "C" { ...@@ -20,6 +20,8 @@ extern "C" {
#include <mysql/service_my_snprintf.h> #include <mysql/service_my_snprintf.h>
#include <mysql/service_thd_alloc.h> #include <mysql/service_thd_alloc.h>
#include <mysql/service_thd_wait.h>
#include <mysql/service_thread_scheduler.h>
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -21,4 +21,5 @@ ...@@ -21,4 +21,5 @@
#define VERSION_my_snprintf 0x0100 #define VERSION_my_snprintf 0x0100
#define VERSION_thd_alloc 0x0100 #define VERSION_thd_alloc 0x0100
#define VERSION_thd_wait 0x0100
#define VERSION_my_thread_scheduler 0x0100
...@@ -174,6 +174,8 @@ void thr_downgrade_write_lock(THR_LOCK_DATA *data, ...@@ -174,6 +174,8 @@ void thr_downgrade_write_lock(THR_LOCK_DATA *data,
enum thr_lock_type new_lock_type); enum thr_lock_type new_lock_type);
my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data, my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data,
ulong lock_wait_timeout); ulong lock_wait_timeout);
void thr_set_lock_wait_callback(void (*before_wait)(void),
void (*after_wait)(void));
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -217,6 +217,7 @@ struct st_vio ...@@ -217,6 +217,7 @@ struct st_vio
void (*timeout)(Vio*, unsigned int which, unsigned int timeout); void (*timeout)(Vio*, unsigned int which, unsigned int timeout);
my_bool (*poll_read)(Vio *vio, uint timeout); my_bool (*poll_read)(Vio *vio, uint timeout);
my_bool (*is_connected)(Vio*); my_bool (*is_connected)(Vio*);
my_bool (*has_data) (Vio*);
#ifdef HAVE_OPENSSL #ifdef HAVE_OPENSSL
void *ssl_arg; void *ssl_arg;
#endif #endif
......
...@@ -15,7 +15,11 @@ ...@@ -15,7 +15,11 @@
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include) INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include)
SET(MYSQLSERVICES_SOURCES my_snprintf_service.c thd_alloc_service.c) SET(MYSQLSERVICES_SOURCES
my_snprintf_service.c
thd_alloc_service.c
thd_wait_service.c
my_thread_scheduler_service.c)
ADD_LIBRARY(mysqlservices ${MYSQLSERVICES_SOURCES}) ADD_LIBRARY(mysqlservices ${MYSQLSERVICES_SOURCES})
INSTALL(TARGETS mysqlservices DESTINATION ${INSTALL_LIBDIR}) INSTALL(TARGETS mysqlservices DESTINATION ${INSTALL_LIBDIR})
...@@ -15,5 +15,7 @@ ...@@ -15,5 +15,7 @@
AM_CPPFLAGS = -I$(top_srcdir)/include AM_CPPFLAGS = -I$(top_srcdir)/include
pkglib_LIBRARIES = libmysqlservices.a pkglib_LIBRARIES = libmysqlservices.a
libmysqlservices_a_SOURCES = my_snprintf_service.c thd_alloc_service.c libmysqlservices_a_SOURCES = my_snprintf_service.c thd_alloc_service.c \
thd_wait_service.c \
my_thread_scheduler_service.c
EXTRA_DIST = CMakeLists.txt EXTRA_DIST = CMakeLists.txt
/*
Copyright (C) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
USA
*/
#include <service_versions.h>
SERVICE_VERSION my_thread_scheduler_service=
(void*)VERSION_my_thread_scheduler;
/*
Copyright (C) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <service_versions.h>
SERVICE_VERSION *thd_wait_service= (void*)VERSION_thd_wait;
...@@ -499,7 +499,7 @@ static my_bool win32_init_tcp_ip() ...@@ -499,7 +499,7 @@ static my_bool win32_init_tcp_ip()
{ {
if (win32_have_tcpip()) if (win32_have_tcpip())
{ {
WORD wVersionRequested = MAKEWORD( 2, 0 ); WORD wVersionRequested = MAKEWORD( 2, 2 );
WSADATA wsaData; WSADATA wsaData;
/* Be a good citizen: maybe another lib has already initialised /* Be a good citizen: maybe another lib has already initialised
sockets, so dont clobber them unless necessary */ sockets, so dont clobber them unless necessary */
......
...@@ -93,6 +93,16 @@ enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE; ...@@ -93,6 +93,16 @@ enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
LIST *thr_lock_thread_list; /* List of threads in use */ LIST *thr_lock_thread_list; /* List of threads in use */
ulong max_write_lock_count= ~(ulong) 0L; ulong max_write_lock_count= ~(ulong) 0L;
static void (*before_lock_wait)(void)= 0;
static void (*after_lock_wait)(void)= 0;
void thr_set_lock_wait_callback(void (*before_wait)(void),
void (*after_wait)(void))
{
before_lock_wait= before_wait;
after_lock_wait= after_wait;
}
static inline mysql_cond_t *get_cond(void) static inline mysql_cond_t *get_cond(void)
{ {
return &my_thread_var->suspend; return &my_thread_var->suspend;
...@@ -436,6 +446,19 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, ...@@ -436,6 +446,19 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
old_proc_info= proc_info_hook(NULL, "Table lock", old_proc_info= proc_info_hook(NULL, "Table lock",
__func__, __FILE__, __LINE__); __func__, __FILE__, __LINE__);
/*
Since before_lock_wait potentially can create more threads to
scheduler work for, we don't want to call the before_lock_wait
callback unless it will really start to wait.
For similar reasons, we do not want to call before_lock_wait and
after_lock_wait for each lap around the loop, so we restrict
ourselves to call it before_lock_wait once before starting to wait
and once after the thread has exited the wait loop.
*/
if ((!thread_var->abort || in_wait_list) && before_lock_wait)
(*before_lock_wait)();
set_timespec(wait_timeout, lock_wait_timeout); set_timespec(wait_timeout, lock_wait_timeout);
while (!thread_var->abort || in_wait_list) while (!thread_var->abort || in_wait_list)
{ {
...@@ -467,6 +490,14 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, ...@@ -467,6 +490,14 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
/* purecov: end */ /* purecov: end */
} }
} }
/*
We call the after_lock_wait callback once the wait loop has
finished.
*/
if (after_lock_wait)
(*after_lock_wait)();
DBUG_PRINT("thr_lock", ("aborted: %d in_wait_list: %d", DBUG_PRINT("thr_lock", ("aborted: %d in_wait_list: %d",
thread_var->abort, in_wait_list)); thread_var->abort, in_wait_list));
......
...@@ -92,6 +92,7 @@ struct show_table_authors_st show_table_authors[]= { ...@@ -92,6 +92,7 @@ struct show_table_authors_st show_table_authors[]= {
{ "Arjen Lentz", "Brisbane, Australia", { "Arjen Lentz", "Brisbane, Australia",
"Documentation (2001-2004), Dutch error messages, LOG2()" }, "Documentation (2001-2004), Dutch error messages, LOG2()" },
{ "Marc Liyanage", "", "Created Mac OS X packages" }, { "Marc Liyanage", "", "Created Mac OS X packages" },
{ "Kelly Long", "Denver, CO, USA", "Pool Of Threads" },
{ "Zarko Mocnik", "", "Sorting for Slovenian language" }, { "Zarko Mocnik", "", "Sorting for Slovenian language" },
{ "Per-Erik Martin", "Uppsala, Sweden", "Stored Procedures (5.0)" }, { "Per-Erik Martin", "Uppsala, Sweden", "Stored Procedures (5.0)" },
{ "Alexis Mikhailov", "", "User-defined functions" }, { "Alexis Mikhailov", "", "User-defined functions" },
......
...@@ -64,7 +64,9 @@ ...@@ -64,7 +64,9 @@
#include "events.h" #include "events.h"
#include "sql_audit.h" #include "sql_audit.h"
#include "probes_mysql.h" #include "probes_mysql.h"
#include "scheduler.h"
#include "debug_sync.h" #include "debug_sync.h"
#include "sql_callback.h"
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
#include "../storage/perfschema/pfs_server.h" #include "../storage/perfschema/pfs_server.h"
...@@ -504,7 +506,7 @@ ulong slave_trans_retries; ...@@ -504,7 +506,7 @@ ulong slave_trans_retries;
uint slave_net_timeout; uint slave_net_timeout;
uint slave_exec_mode_options; uint slave_exec_mode_options;
ulonglong slave_type_conversions_options; ulonglong slave_type_conversions_options;
ulong thread_cache_size=0, thread_pool_size= 0; ulong thread_cache_size=0;
ulong binlog_cache_size=0; ulong binlog_cache_size=0;
ulonglong max_binlog_cache_size=0; ulonglong max_binlog_cache_size=0;
ulong query_cache_size=0; ulong query_cache_size=0;
...@@ -935,8 +937,6 @@ my_bool opt_enable_shared_memory; ...@@ -935,8 +937,6 @@ my_bool opt_enable_shared_memory;
HANDLE smem_event_connect_request= 0; HANDLE smem_event_connect_request= 0;
#endif #endif
scheduler_functions thread_scheduler;
my_bool opt_use_ssl = 0; my_bool opt_use_ssl = 0;
char *opt_ssl_ca= NULL, *opt_ssl_capath= NULL, *opt_ssl_cert= NULL, char *opt_ssl_ca= NULL, *opt_ssl_capath= NULL, *opt_ssl_cert= NULL,
*opt_ssl_cipher= NULL, *opt_ssl_key= NULL; *opt_ssl_cipher= NULL, *opt_ssl_key= NULL;
...@@ -1125,7 +1125,8 @@ static void close_connections(void) ...@@ -1125,7 +1125,8 @@ static void close_connections(void)
continue; continue;
tmp->killed= THD::KILL_CONNECTION; tmp->killed= THD::KILL_CONNECTION;
thread_scheduler.post_kill_notification(tmp); MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp));
mysql_mutex_lock(&tmp->LOCK_thd_data);
if (tmp->mysys_var) if (tmp->mysys_var)
{ {
tmp->mysys_var->abort=1; tmp->mysys_var->abort=1;
...@@ -1138,6 +1139,7 @@ static void close_connections(void) ...@@ -1138,6 +1139,7 @@ static void close_connections(void)
} }
mysql_mutex_unlock(&tmp->mysys_var->mutex); mysql_mutex_unlock(&tmp->mysys_var->mutex);
} }
mysql_mutex_unlock(&tmp->LOCK_thd_data);
} }
mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list
...@@ -1543,7 +1545,7 @@ void clean_up(bool print_message) ...@@ -1543,7 +1545,7 @@ void clean_up(bool print_message)
if (print_message && my_default_lc_messages && server_start_time) if (print_message && my_default_lc_messages && server_start_time)
sql_print_information(ER_DEFAULT(ER_SHUTDOWN_COMPLETE),my_progname); sql_print_information(ER_DEFAULT(ER_SHUTDOWN_COMPLETE),my_progname);
cleanup_errmsgs(); cleanup_errmsgs();
thread_scheduler.end(); MYSQL_CALLBACK(thread_scheduler, end, ());
finish_client_errs(); finish_client_errs();
DBUG_PRINT("quit", ("Error messages freed")); DBUG_PRINT("quit", ("Error messages freed"));
/* Tell main we are ready */ /* Tell main we are ready */
...@@ -1823,7 +1825,7 @@ static void network_init(void) ...@@ -1823,7 +1825,7 @@ static void network_init(void)
DBUG_ENTER("network_init"); DBUG_ENTER("network_init");
LINT_INIT(ret); LINT_INIT(ret);
if (thread_scheduler.init()) if (MYSQL_CALLBACK_ELSE(thread_scheduler, init, (), 0))
unireg_abort(1); /* purecov: inspected */ unireg_abort(1); /* purecov: inspected */
set_ports(); set_ports();
...@@ -2071,7 +2073,7 @@ extern "C" sig_handler end_thread_signal(int sig __attribute__((unused))) ...@@ -2071,7 +2073,7 @@ extern "C" sig_handler end_thread_signal(int sig __attribute__((unused)))
if (thd && ! thd->bootstrap) if (thd && ! thd->bootstrap)
{ {
statistic_increment(killed_threads, &LOCK_status); statistic_increment(killed_threads, &LOCK_status);
thread_scheduler.end_thread(thd,0); /* purecov: inspected */ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd,0)); /* purecov: inspected */
} }
DBUG_VOID_RETURN; /* purecov: deadcode */ DBUG_VOID_RETURN; /* purecov: deadcode */
} }
...@@ -2679,7 +2681,7 @@ and this may fail.\n\n"); ...@@ -2679,7 +2681,7 @@ and this may fail.\n\n");
(ulong) dflt_key_cache->key_cache_mem_size); (ulong) dflt_key_cache->key_cache_mem_size);
fprintf(stderr, "read_buffer_size=%ld\n", (long) global_system_variables.read_buff_size); fprintf(stderr, "read_buffer_size=%ld\n", (long) global_system_variables.read_buff_size);
fprintf(stderr, "max_used_connections=%lu\n", max_used_connections); fprintf(stderr, "max_used_connections=%lu\n", max_used_connections);
fprintf(stderr, "max_threads=%u\n", thread_scheduler.max_threads); fprintf(stderr, "max_threads=%u\n", thread_scheduler->max_threads);
fprintf(stderr, "thread_count=%u\n", thread_count); fprintf(stderr, "thread_count=%u\n", thread_count);
fprintf(stderr, "connection_count=%u\n", connection_count); fprintf(stderr, "connection_count=%u\n", connection_count);
fprintf(stderr, "It is possible that mysqld could use up to \n\ fprintf(stderr, "It is possible that mysqld could use up to \n\
...@@ -2687,7 +2689,7 @@ key_buffer_size + (read_buffer_size + sort_buffer_size)*max_threads = %lu K\n\ ...@@ -2687,7 +2689,7 @@ key_buffer_size + (read_buffer_size + sort_buffer_size)*max_threads = %lu K\n\
bytes of memory\n", ((ulong) dflt_key_cache->key_cache_mem_size + bytes of memory\n", ((ulong) dflt_key_cache->key_cache_mem_size +
(global_system_variables.read_buff_size + (global_system_variables.read_buff_size +
global_system_variables.sortbuff_size) * global_system_variables.sortbuff_size) *
thread_scheduler.max_threads + thread_scheduler->max_threads +
max_connections * sizeof(THD)) / 1024); max_connections * sizeof(THD)) / 1024);
fprintf(stderr, "Hope that's ok; if not, decrease some variables in the equation.\n\n"); fprintf(stderr, "Hope that's ok; if not, decrease some variables in the equation.\n\n");
...@@ -2932,7 +2934,7 @@ pthread_handler_t signal_hand(void *arg __attribute__((unused))) ...@@ -2932,7 +2934,7 @@ pthread_handler_t signal_hand(void *arg __attribute__((unused)))
This should actually be '+ max_number_of_slaves' instead of +10, This should actually be '+ max_number_of_slaves' instead of +10,
but the +10 should be quite safe. but the +10 should be quite safe.
*/ */
init_thr_alarm(thread_scheduler.max_threads + init_thr_alarm(thread_scheduler->max_threads +
global_system_variables.max_insert_delayed_threads + 10); global_system_variables.max_insert_delayed_threads + 10);
if (thd_lib_detected != THD_LIB_LT && (test_flags & TEST_SIGINT)) if (thd_lib_detected != THD_LIB_LT && (test_flags & TEST_SIGINT))
{ {
...@@ -4640,23 +4642,6 @@ int mysqld_main(int argc, char **argv) ...@@ -4640,23 +4642,6 @@ int mysqld_main(int argc, char **argv)
} }
#endif #endif
#ifdef __WIN__
/*
Before performing any socket operation (like retrieving hostname
in init_common_variables we have to call WSAStartup
*/
{
WSADATA WsaData;
if (SOCKET_ERROR == WSAStartup (0x0101, &WsaData))
{
/* errors are not read yet, so we use english text here */
my_message(ER_WSAS_FAILED, "WSAStartup Failed", MYF(0));
/* Not enough initializations for unireg_abort() */
return 1;
}
}
#endif /* __WIN__ */
if (init_common_variables()) if (init_common_variables())
unireg_abort(1); // Will do exit unireg_abort(1); // Will do exit
...@@ -5310,7 +5295,7 @@ static void create_new_thread(THD *thd) ...@@ -5310,7 +5295,7 @@ static void create_new_thread(THD *thd)
thread_count++; thread_count++;
thread_scheduler.add_connection(thd); MYSQL_CALLBACK(thread_scheduler, add_connection, (thd));
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -7633,14 +7618,12 @@ static int get_options(int *argc_ptr, char ***argv_ptr) ...@@ -7633,14 +7618,12 @@ static int get_options(int *argc_ptr, char ***argv_ptr)
return 1; return 1;
#ifdef EMBEDDED_LIBRARY #ifdef EMBEDDED_LIBRARY
one_thread_scheduler(&thread_scheduler); one_thread_scheduler();
#else #else
if (thread_handling <= SCHEDULER_ONE_THREAD_PER_CONNECTION) if (thread_handling <= SCHEDULER_ONE_THREAD_PER_CONNECTION)
one_thread_per_connection_scheduler(&thread_scheduler); one_thread_per_connection_scheduler();
else if (thread_handling == SCHEDULER_NO_THREADS) else /* thread_handling == SCHEDULER_NO_THREADS) */
one_thread_scheduler(&thread_scheduler); one_thread_scheduler();
else
pool_of_threads_scheduler(&thread_scheduler); /* purecov: tested */
#endif #endif
global_system_variables.engine_condition_pushdown= global_system_variables.engine_condition_pushdown=
......
...@@ -177,7 +177,7 @@ extern ulong binlog_cache_size, open_files_limit; ...@@ -177,7 +177,7 @@ extern ulong binlog_cache_size, open_files_limit;
extern ulonglong max_binlog_cache_size; extern ulonglong max_binlog_cache_size;
extern ulong max_binlog_size, max_relay_log_size; extern ulong max_binlog_size, max_relay_log_size;
extern ulong opt_binlog_rows_event_max_size; extern ulong opt_binlog_rows_event_max_size;
extern ulong rpl_recovery_rank, thread_cache_size, thread_pool_size; extern ulong rpl_recovery_rank, thread_cache_size;
extern ulong back_log; extern ulong back_log;
extern char language[FN_REFLEN]; extern char language[FN_REFLEN];
extern ulong server_id, concurrency; extern ulong server_id, concurrency;
...@@ -211,7 +211,6 @@ extern int bootstrap_error; ...@@ -211,7 +211,6 @@ extern int bootstrap_error;
extern FILE *stderror_file; extern FILE *stderror_file;
extern I_List<THD> threads; extern I_List<THD> threads;
extern char err_shared_dir[]; extern char err_shared_dir[];
extern scheduler_functions thread_scheduler;
extern TYPELIB thread_handling_typelib; extern TYPELIB thread_handling_typelib;
extern my_decimal decimal_zero; extern my_decimal decimal_zero;
......
...@@ -25,55 +25,98 @@ ...@@ -25,55 +25,98 @@
#include "unireg.h" // REQUIRED: for other includes #include "unireg.h" // REQUIRED: for other includes
#include "scheduler.h" #include "scheduler.h"
#include "sql_connect.h" // init_new_connection_handler_thread #include "sql_connect.h" // init_new_connection_handler_thread
#include "scheduler.h"
#include "sql_callback.h"
/* /*
'Dummy' functions to be used when we don't need any handling for a scheduler End connection, in case when we are using 'no-threads'
event */
*/
static bool init_dummy(void) {return 0;} static bool no_threads_end(THD *thd, bool put_in_cache)
static void post_kill_dummy(THD* thd) {} {
static void end_dummy(void) {} unlink_thd(thd);
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; } mysql_mutex_unlock(&LOCK_thread_count);
return 1; // Abort handle_one_connection
}
/* static scheduler_functions one_thread_scheduler_functions=
Initialize default scheduler with dummy functions so that setup functions {
only need to declare those that are relvant for their usage 1, // max_threads
NULL, // init
init_new_connection_handler_thread, // init_new_connection_thread
handle_connection_in_main_thread, // add_connection
NULL, // thd_wait_begin
NULL, // thd_wait_end
NULL, // post_kill_notification
no_threads_end, // end_thread
NULL, // end
};
static scheduler_functions one_thread_per_connection_scheduler_functions=
{
0, // max_threads
NULL, // init
init_new_connection_handler_thread, // init_new_connection_thread
create_thread_to_handle_connection, // add_connection
NULL, // thd_wait_begin
NULL, // thd_wait_end
NULL, // post_kill_notification
one_thread_per_connection_end, // end_thread
NULL, // end
};
scheduler_functions *thread_scheduler=
&one_thread_per_connection_scheduler_functions;
/** @internal
Helper functions to allow mysys to call the thread scheduler when
waiting for locks.
*/ */
scheduler_functions::scheduler_functions() /**@{*/
:init(init_dummy), static void scheduler_wait_begin(void) {
init_new_connection_thread(init_new_connection_handler_thread), MYSQL_CALLBACK(thread_scheduler,
add_connection(0), // Must be defined thd_wait_begin, (current_thd, THD_WAIT_ROW_TABLE_LOCK));
post_kill_notification(post_kill_dummy), }
end_thread(end_thread_dummy), end(end_dummy)
{}
static void scheduler_wait_end(void) {
MYSQL_CALLBACK(thread_scheduler, thd_wait_end, (current_thd));
}
/**@}*/
/**
Common scheduler init function.
The scheduler is either initialized by calling
one_thread_scheduler() or one_thread_per_connection_scheduler() in
mysqld.cc, so this init function will always be called.
*/
static void scheduler_init() {
thr_set_lock_wait_callback(scheduler_wait_begin, scheduler_wait_end);
}
/* /*
End connection, in case when we are using 'no-threads' Initialize scheduler for --thread-handling=one-thread-per-connection
*/ */
static bool no_threads_end(THD *thd, bool put_in_cache) #ifndef EMBEDDED_LIBRARY
void one_thread_per_connection_scheduler()
{ {
unlink_thd(thd); scheduler_init();
mysql_mutex_unlock(&LOCK_thread_count); one_thread_per_connection_scheduler_functions.max_threads= max_connections;
return 1; // Abort handle_one_connection thread_scheduler= &one_thread_per_connection_scheduler_functions;
} }
#endif
/* /*
Initailize scheduler for --thread-handling=no-threads Initailize scheduler for --thread-handling=no-threads
*/ */
void one_thread_scheduler(scheduler_functions* func) void one_thread_scheduler()
{ {
func->max_threads= 1; scheduler_init();
#ifndef EMBEDDED_LIBRARY thread_scheduler= &one_thread_scheduler_functions;
func->add_connection= handle_connection_in_main_thread;
#endif
func->init_new_connection_thread= init_dummy;
func->end_thread= no_threads_end;
} }
...@@ -81,11 +124,58 @@ void one_thread_scheduler(scheduler_functions* func) ...@@ -81,11 +124,58 @@ void one_thread_scheduler(scheduler_functions* func)
Initialize scheduler for --thread-handling=one-thread-per-connection Initialize scheduler for --thread-handling=one-thread-per-connection
*/ */
#ifndef EMBEDDED_LIBRARY /*
void one_thread_per_connection_scheduler(scheduler_functions* func) thd_scheduler keeps the link between THD and events.
It's embedded in the THD class.
*/
thd_scheduler::thd_scheduler()
: m_psi(NULL), data(NULL)
{ {
func->max_threads= max_connections; #ifndef DBUG_OFF
func->add_connection= create_thread_to_handle_connection; dbug_explain[0]= '\0';
func->end_thread= one_thread_per_connection_end; set_explain= FALSE;
#endif
} }
#endif /* EMBEDDED_LIBRARY */
thd_scheduler::~thd_scheduler()
{
}
static scheduler_functions *saved_thread_scheduler;
static uint saved_thread_handling;
extern "C"
int my_thread_scheduler_set(scheduler_functions *scheduler)
{
DBUG_ASSERT(scheduler != 0);
if (scheduler == NULL)
return 1;
saved_thread_scheduler= thread_scheduler;
saved_thread_handling= thread_handling;
thread_scheduler= scheduler;
// Scheduler loaded dynamically
thread_handling= SCHEDULER_TYPES_COUNT;
return 0;
}
extern "C"
int my_thread_scheduler_reset()
{
DBUG_ASSERT(saved_thread_scheduler != NULL);
if (saved_thread_scheduler == NULL)
return 1;
thread_scheduler= saved_thread_scheduler;
thread_handling= saved_thread_handling;
saved_thread_scheduler= 0;
return 0;
}
...@@ -28,38 +28,77 @@ class THD; ...@@ -28,38 +28,77 @@ class THD;
/* Functions used when manipulating threads */ /* Functions used when manipulating threads */
class scheduler_functions struct scheduler_functions
{ {
public:
uint max_threads; uint max_threads;
bool (*init)(void); bool (*init)(void);
bool (*init_new_connection_thread)(void); bool (*init_new_connection_thread)(void);
void (*add_connection)(THD *thd); void (*add_connection)(THD *thd);
void (*thd_wait_begin)(THD *thd, int wait_type);
void (*thd_wait_end)(THD *thd);
void (*post_kill_notification)(THD *thd); void (*post_kill_notification)(THD *thd);
bool (*end_thread)(THD *thd, bool cache_thread); bool (*end_thread)(THD *thd, bool cache_thread);
void (*end)(void); void (*end)(void);
scheduler_functions();
}; };
/**
Scheduler types enumeration.
The default of --thread-handling is the first one in the
thread_handling_names array, this array has to be consistent with
the order in this array, so to change default one has to change the
first entry in this enum and the first entry in the
thread_handling_names array.
@note The last entry of the enumeration is also used to mark the
thread handling as dynamic. In this case the name of the thread
handling is fetched from the name of the plugin that implements it.
*/
enum scheduler_types enum scheduler_types
{ {
SCHEDULER_ONE_THREAD_PER_CONNECTION=0, SCHEDULER_ONE_THREAD_PER_CONNECTION=0,
SCHEDULER_NO_THREADS, SCHEDULER_NO_THREADS,
SCHEDULER_POOL_OF_THREADS SCHEDULER_TYPES_COUNT
}; };
void one_thread_per_connection_scheduler(scheduler_functions* func); void one_thread_per_connection_scheduler();
void one_thread_scheduler(scheduler_functions* func); void one_thread_scheduler();
enum pool_command_op enum pool_command_op
{ {
NOT_IN_USE_OP= 0, NORMAL_OP= 1, CONNECT_OP, KILL_OP, DIE_OP NOT_IN_USE_OP= 0, NORMAL_OP= 1, CONNECT_OP, KILL_OP, DIE_OP
}; };
#define HAVE_POOL_OF_THREADS 0 /* For easyer tests */ /*
#define pool_of_threads_scheduler(A) one_thread_per_connection_scheduler(A) To be used for pool-of-threads (implemeneted differently on various OSs)
*/
class thd_scheduler class thd_scheduler
{}; {
public:
/*
Thread instrumentation for the user job.
This member holds the instrumentation while the user job is not run
by a thread.
Note that this member is not conditionally declared
(ifdef HAVE_PSI_INTERFACE), because doing so will change the binary
layout of THD, which is exposed to plugin code that may be compiled
differently.
*/
PSI_thread *m_psi;
void *data; /* scheduler-specific data structure */
# ifndef DBUG_OFF
char dbug_explain[512];
bool set_explain;
# endif
#endif /* SCHEDULER_INCLUDED */ thd_scheduler();
~thd_scheduler();
};
extern scheduler_functions *thread_scheduler;
#endif
/*
Copyright (C) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef SQL_CALLBACK_INCLUDED
#define SQL_CALLBACK_INCLUDED
/**
Macro used for an internal callback.
The macro will check that the object exists and that the function
is defined. If that is the case, it will call the function with the
given parameters.
If the object or the function is not defined, the callback will be
considered successful (nothing needed to be done) and will
therefore return no error.
*/
#define MYSQL_CALLBACK(OBJ, FUNC, PARAMS) \
do { \
if ((OBJ) && ((OBJ)->FUNC)) \
(OBJ)->FUNC PARAMS; \
} while (0)
#define MYSQL_CALLBACK_ELSE(OBJ, FUNC, PARAMS, ELSE) \
(((OBJ) && ((OBJ)->FUNC)) ? (OBJ)->FUNC PARAMS : (ELSE))
#endif /* SQL_CALLBACK_INCLUDED */
...@@ -58,6 +58,7 @@ ...@@ -58,6 +58,7 @@
#include "transaction.h" #include "transaction.h"
#include "debug_sync.h" #include "debug_sync.h"
#include "sql_parse.h" // is_update_query #include "sql_parse.h" // is_update_query
#include "sql_callback.h"
/* /*
The following is used to initialise Table_ident with a internal The following is used to initialise Table_ident with a internal
...@@ -1055,6 +1056,7 @@ THD::~THD() ...@@ -1055,6 +1056,7 @@ THD::~THD()
DBUG_ENTER("~THD()"); DBUG_ENTER("~THD()");
/* Ensure that no one is using THD */ /* Ensure that no one is using THD */
mysql_mutex_lock(&LOCK_thd_data); mysql_mutex_lock(&LOCK_thd_data);
mysys_var=0; // Safety (shouldn't be needed)
mysql_mutex_unlock(&LOCK_thd_data); mysql_mutex_unlock(&LOCK_thd_data);
add_to_status(&global_status_var, &status_var); add_to_status(&global_status_var, &status_var);
...@@ -1080,7 +1082,6 @@ THD::~THD() ...@@ -1080,7 +1082,6 @@ THD::~THD()
main_security_ctx.destroy(); main_security_ctx.destroy();
safeFree(db); safeFree(db);
free_root(&transaction.mem_root,MYF(0)); free_root(&transaction.mem_root,MYF(0));
mysys_var=0; // Safety (shouldn't be needed)
mysql_mutex_destroy(&LOCK_thd_data); mysql_mutex_destroy(&LOCK_thd_data);
#ifndef DBUG_OFF #ifndef DBUG_OFF
dbug_sentry= THD_SENTRY_GONE; dbug_sentry= THD_SENTRY_GONE;
...@@ -1163,7 +1164,7 @@ void THD::awake(THD::killed_state state_to_set) ...@@ -1163,7 +1164,7 @@ void THD::awake(THD::killed_state state_to_set)
{ {
thr_alarm_kill(thread_id); thr_alarm_kill(thread_id);
if (!slave_thread) if (!slave_thread)
thread_scheduler.post_kill_notification(this); MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (this));
#ifdef SIGNAL_WITH_VIO_CLOSE #ifdef SIGNAL_WITH_VIO_CLOSE
if (this != current_thd) if (this != current_thd)
{ {
...@@ -1232,6 +1233,15 @@ bool THD::store_globals() ...@@ -1232,6 +1233,15 @@ bool THD::store_globals()
if (my_pthread_setspecific_ptr(THR_THD, this) || if (my_pthread_setspecific_ptr(THR_THD, this) ||
my_pthread_setspecific_ptr(THR_MALLOC, &mem_root)) my_pthread_setspecific_ptr(THR_MALLOC, &mem_root))
return 1; return 1;
/*
mysys_var is concurrently readable by a killer thread.
It is protected by LOCK_thd_data, it is not needed to lock while the
pointer is changing from NULL not non-NULL. If the kill thread reads
NULL it doesn't refer to anything, but if it is non-NULL we need to
ensure that the thread doesn't proceed to assign another thread to
have the mysys_var reference (which in fact refers to the worker
threads local storage with key THR_KEY_mysys.
*/
mysys_var=my_thread_var; mysys_var=my_thread_var;
/* /*
Let mysqld define the thread id (not mysys) Let mysqld define the thread id (not mysys)
...@@ -3145,6 +3155,60 @@ extern "C" bool thd_binlog_filter_ok(const MYSQL_THD thd) ...@@ -3145,6 +3155,60 @@ extern "C" bool thd_binlog_filter_ok(const MYSQL_THD thd)
{ {
return binlog_filter->db_ok(thd->db); return binlog_filter->db_ok(thd->db);
} }
#ifndef EMBEDDED_LIBRARY
extern "C" void thd_pool_wait_begin(MYSQL_THD thd, int wait_type);
extern "C" void thd_pool_wait_end(MYSQL_THD thd);
/*
Interface for MySQL Server, plugins and storage engines to report
when they are going to sleep/stall.
SYNOPSIS
thd_wait_begin()
thd Thread object
wait_type Type of wait
1 -- short wait (e.g. for mutex)
2 -- medium wait (e.g. for disk io)
3 -- large wait (e.g. for locked row/table)
NOTES
This is used by the threadpool to have better knowledge of which
threads that currently are actively running on CPUs. When a thread
reports that it's going to sleep/stall, the threadpool scheduler is
free to start another thread in the pool most likely. The expected wait
time is simply an indication of how long the wait is expected to
become, the real wait time could be very different.
thd_wait_end MUST be called immediately after waking up again.
*/
extern "C" void thd_wait_begin(MYSQL_THD thd, thd_wait_type wait_type)
{
MYSQL_CALLBACK(thread_scheduler, thd_wait_begin, (thd, wait_type));
}
/**
Interface for MySQL Server, plugins and storage engines to report
when they waking up from a sleep/stall.
@param thd Thread handle
*/
extern "C" void thd_wait_end(MYSQL_THD thd)
{
MYSQL_CALLBACK(thread_scheduler, thd_wait_end, (thd));
}
#else
extern "C" void thd_wait_begin(MYSQL_THD thd, thd_wait_type wait_type)
{
/* do NOTHING for the embedded library */
return;
}
extern "C" void thd_wait_end(MYSQL_THD thd)
{
/* do NOTHING for the embedded library */
return;
}
#endif
#endif // INNODB_COMPATIBILITY_HOOKS */ #endif // INNODB_COMPATIBILITY_HOOKS */
/**************************************************************************** /****************************************************************************
...@@ -3324,6 +3388,13 @@ void THD::set_query_id(query_id_t new_query_id) ...@@ -3324,6 +3388,13 @@ void THD::set_query_id(query_id_t new_query_id)
mysql_mutex_unlock(&LOCK_thd_data); mysql_mutex_unlock(&LOCK_thd_data);
} }
/** Assign a new value to thd->mysys_var. */
void THD::set_mysys_var(struct st_my_thread_var *new_mysys_var)
{
mysql_mutex_lock(&LOCK_thd_data);
mysys_var= new_mysys_var;
mysql_mutex_unlock(&LOCK_thd_data);
}
/** /**
Leave explicit LOCK TABLES or prelocked mode and restore value of Leave explicit LOCK TABLES or prelocked mode and restore value of
......
...@@ -1812,6 +1812,10 @@ public: ...@@ -1812,6 +1812,10 @@ public:
xid_state.xid.null(); xid_state.xid.null();
free_root(&mem_root,MYF(MY_KEEP_PREALLOC)); free_root(&mem_root,MYF(MY_KEEP_PREALLOC));
} }
my_bool is_active()
{
return (all.ha_list != NULL);
}
st_transactions() st_transactions()
{ {
bzero((char*)this, sizeof(*this)); bzero((char*)this, sizeof(*this));
...@@ -2734,13 +2738,14 @@ public: ...@@ -2734,13 +2738,14 @@ public:
virtual void set_statement(Statement *stmt); virtual void set_statement(Statement *stmt);
/** /**
Assign a new value to thd->query and thd->query_id. Assign a new value to thd->query and thd->query_id and mysys_var.
Protected with LOCK_thd_data mutex. Protected with LOCK_thd_data mutex.
*/ */
void set_query(char *query_arg, uint32 query_length_arg); void set_query(char *query_arg, uint32 query_length_arg);
void set_query_and_id(char *query_arg, uint32 query_length_arg, void set_query_and_id(char *query_arg, uint32 query_length_arg,
query_id_t new_query_id); query_id_t new_query_id);
void set_query_id(query_id_t new_query_id); void set_query_id(query_id_t new_query_id);
void set_mysys_var(struct st_my_thread_var *new_mysys_var);
void enter_locked_tables_mode(enum_locked_tables_mode mode_arg) void enter_locked_tables_mode(enum_locked_tables_mode mode_arg)
{ {
DBUG_ASSERT(locked_tables_mode == LTM_NONE); DBUG_ASSERT(locked_tables_mode == LTM_NONE);
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include "hostname.h" // inc_host_errors, ip_to_hostname, #include "hostname.h" // inc_host_errors, ip_to_hostname,
// reset_host_errors // reset_host_errors
#include "sql_acl.h" // acl_getroot, NO_ACCESS, SUPER_ACL #include "sql_acl.h" // acl_getroot, NO_ACCESS, SUPER_ACL
#include "sql_callback.h"
#if defined(HAVE_OPENSSL) && !defined(EMBEDDED_LIBRARY) #if defined(HAVE_OPENSSL) && !defined(EMBEDDED_LIBRARY)
/* /*
...@@ -958,7 +959,7 @@ bool setup_connection_thread_globals(THD *thd) ...@@ -958,7 +959,7 @@ bool setup_connection_thread_globals(THD *thd)
{ {
close_connection(thd, ER_OUT_OF_RESOURCES, 1); close_connection(thd, ER_OUT_OF_RESOURCES, 1);
statistic_increment(aborted_connects,&LOCK_status); statistic_increment(aborted_connects,&LOCK_status);
thread_scheduler.end_thread(thd, 0); MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
return 1; // Error return 1; // Error
} }
return 0; return 0;
...@@ -981,7 +982,7 @@ bool setup_connection_thread_globals(THD *thd) ...@@ -981,7 +982,7 @@ bool setup_connection_thread_globals(THD *thd)
*/ */
static bool login_connection(THD *thd) bool login_connection(THD *thd)
{ {
NET *net= &thd->net; NET *net= &thd->net;
int error; int error;
...@@ -1019,7 +1020,7 @@ static bool login_connection(THD *thd) ...@@ -1019,7 +1020,7 @@ static bool login_connection(THD *thd)
This mainly updates status variables This mainly updates status variables
*/ */
static void end_connection(THD *thd) void end_connection(THD *thd)
{ {
NET *net= &thd->net; NET *net= &thd->net;
plugin_thdvar_cleanup(thd); plugin_thdvar_cleanup(thd);
...@@ -1060,7 +1061,7 @@ static void end_connection(THD *thd) ...@@ -1060,7 +1061,7 @@ static void end_connection(THD *thd)
Initialize THD to handle queries Initialize THD to handle queries
*/ */
static void prepare_new_connection_state(THD* thd) void prepare_new_connection_state(THD* thd)
{ {
Security_context *sctx= thd->security_ctx; Security_context *sctx= thd->security_ctx;
...@@ -1134,11 +1135,11 @@ void do_handle_one_connection(THD *thd_arg) ...@@ -1134,11 +1135,11 @@ void do_handle_one_connection(THD *thd_arg)
thd->thr_create_utime= my_micro_time(); thd->thr_create_utime= my_micro_time();
if (thread_scheduler.init_new_connection_thread()) if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
{ {
close_connection(thd, ER_OUT_OF_RESOURCES, 1); close_connection(thd, ER_OUT_OF_RESOURCES, 1);
statistic_increment(aborted_connects,&LOCK_status); statistic_increment(aborted_connects,&LOCK_status);
thread_scheduler.end_thread(thd,0); MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
return; return;
} }
...@@ -1192,7 +1193,7 @@ void do_handle_one_connection(THD *thd_arg) ...@@ -1192,7 +1193,7 @@ void do_handle_one_connection(THD *thd_arg)
end_thread: end_thread:
close_connection(thd, 0, 1); close_connection(thd, 0, 1);
if (thread_scheduler.end_thread(thd,1)) if (MYSQL_CALLBACK_ELSE(thread_scheduler, end_thread, (thd, 1), 0))
return; // Probably no-threads return; // Probably no-threads
/* /*
......
...@@ -40,4 +40,8 @@ int check_user(THD *thd, enum enum_server_command command, ...@@ -40,4 +40,8 @@ int check_user(THD *thd, enum enum_server_command command,
const char *passwd, uint passwd_len, const char *db, const char *passwd, uint passwd_len, const char *db,
bool check_count); bool check_count);
bool login_connection(THD *thd);
void prepare_new_connection_state(THD* thd);
void end_connection(THD *thd);
#endif /* SQL_CONNECT_INCLUDED */ #endif /* SQL_CONNECT_INCLUDED */
...@@ -36,9 +36,23 @@ static struct thd_alloc_service_st thd_alloc_handler= { ...@@ -36,9 +36,23 @@ static struct thd_alloc_service_st thd_alloc_handler= {
thd_make_lex_string thd_make_lex_string
}; };
static struct thd_wait_service_st thd_wait_handler= {
thd_wait_begin,
thd_wait_end
};
static struct my_thread_scheduler_service my_thread_scheduler_handler= {
my_thread_scheduler_set,
my_thread_scheduler_reset,
};
static struct st_service_ref list_of_services[]= static struct st_service_ref list_of_services[]=
{ {
{ "my_snprintf_service", VERSION_my_snprintf, &my_snprintf_handler }, { "my_snprintf_service", VERSION_my_snprintf, &my_snprintf_handler },
{ "thd_alloc_service", VERSION_thd_alloc, &thd_alloc_handler } { "thd_alloc_service", VERSION_thd_alloc, &thd_alloc_handler },
{ "thd_wait_service", VERSION_thd_wait, &thd_wait_handler },
{ "my_thread_scheduler_service",
VERSION_my_thread_scheduler, &my_thread_scheduler_handler },
}; };
...@@ -1789,6 +1789,7 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose) ...@@ -1789,6 +1789,7 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose)
if ((thd_info->db=tmp->db)) // Safe test if ((thd_info->db=tmp->db)) // Safe test
thd_info->db=thd->strdup(thd_info->db); thd_info->db=thd->strdup(thd_info->db);
thd_info->command=(int) tmp->command; thd_info->command=(int) tmp->command;
mysql_mutex_lock(&tmp->LOCK_thd_data);
if ((mysys_var= tmp->mysys_var)) if ((mysys_var= tmp->mysys_var))
mysql_mutex_lock(&mysys_var->mutex); mysql_mutex_lock(&mysys_var->mutex);
thd_info->proc_info= (char*) (tmp->killed == THD::KILL_CONNECTION? "Killed" : 0); thd_info->proc_info= (char*) (tmp->killed == THD::KILL_CONNECTION? "Killed" : 0);
...@@ -1796,16 +1797,15 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose) ...@@ -1796,16 +1797,15 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose)
if (mysys_var) if (mysys_var)
mysql_mutex_unlock(&mysys_var->mutex); mysql_mutex_unlock(&mysys_var->mutex);
thd_info->start_time= tmp->start_time;
thd_info->query=0; thd_info->query=0;
/* Lock THD mutex that protects its data when looking at it. */ /* Lock THD mutex that protects its data when looking at it. */
mysql_mutex_lock(&tmp->LOCK_thd_data);
if (tmp->query()) if (tmp->query())
{ {
uint length= min(max_query_length, tmp->query_length()); uint length= min(max_query_length, tmp->query_length());
thd_info->query= (char*) thd->strmake(tmp->query(),length); thd_info->query= (char*) thd->strmake(tmp->query(),length);
} }
mysql_mutex_unlock(&tmp->LOCK_thd_data); mysql_mutex_unlock(&tmp->LOCK_thd_data);
thd_info->start_time= tmp->start_time;
thread_infos.append(thd_info); thread_infos.append(thd_info);
} }
} }
...@@ -1892,6 +1892,7 @@ int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond) ...@@ -1892,6 +1892,7 @@ int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond)
table->field[3]->set_notnull(); table->field[3]->set_notnull();
} }
mysql_mutex_lock(&tmp->LOCK_thd_data);
if ((mysys_var= tmp->mysys_var)) if ((mysys_var= tmp->mysys_var))
mysql_mutex_lock(&mysys_var->mutex); mysql_mutex_lock(&mysys_var->mutex);
/* COMMAND */ /* COMMAND */
...@@ -1912,6 +1913,7 @@ int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond) ...@@ -1912,6 +1913,7 @@ int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond)
if (mysys_var) if (mysys_var)
mysql_mutex_unlock(&mysys_var->mutex); mysql_mutex_unlock(&mysys_var->mutex);
mysql_mutex_unlock(&tmp->LOCK_thd_data);
/* INFO */ /* INFO */
if (tmp->query()) if (tmp->query())
......
...@@ -1658,19 +1658,13 @@ static Sys_var_ulong Sys_trans_prealloc_size( ...@@ -1658,19 +1658,13 @@ static Sys_var_ulong Sys_trans_prealloc_size(
static const char *thread_handling_names[]= static const char *thread_handling_names[]=
{ {
"one-thread-per-connection", "no-threads", "one-thread-per-connection", "no-threads", "loaded-dynamically",
#if HAVE_POOL_OF_THREADS == 1
"pool-of-threads",
#endif
0 0
}; };
static Sys_var_enum Sys_thread_handling( static Sys_var_enum Sys_thread_handling(
"thread_handling", "thread_handling",
"Define threads usage for handling queries, one of " "Define threads usage for handling queries, one of "
"one-thread-per-connection, no-threads" "one-thread-per-connection, no-threads, loaded-dynamically"
#if HAVE_POOL_OF_THREADS == 1
", pool-of-threads"
#endif
, READ_ONLY GLOBAL_VAR(thread_handling), CMD_LINE(REQUIRED_ARG), , READ_ONLY GLOBAL_VAR(thread_handling), CMD_LINE(REQUIRED_ARG),
thread_handling_names, DEFAULT(0)); thread_handling_names, DEFAULT(0));
...@@ -1997,15 +1991,6 @@ static Sys_var_ulong Sys_thread_cache_size( ...@@ -1997,15 +1991,6 @@ static Sys_var_ulong Sys_thread_cache_size(
GLOBAL_VAR(thread_cache_size), CMD_LINE(REQUIRED_ARG), GLOBAL_VAR(thread_cache_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, 16384), DEFAULT(0), BLOCK_SIZE(1)); VALID_RANGE(0, 16384), DEFAULT(0), BLOCK_SIZE(1));
#if HAVE_POOL_OF_THREADS == 1
static Sys_var_ulong Sys_thread_pool_size(
"thread_pool_size",
"How many threads we should create to handle query requests in "
"case of 'thread_handling=pool-of-threads'",
GLOBAL_VAR(thread_pool_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1, 16384), DEFAULT(20), BLOCK_SIZE(0));
#endif
// Can't change the 'next' tx_isolation if we are already in a transaction // Can't change the 'next' tx_isolation if we are already in a transaction
static bool check_tx_isolation(sys_var *self, THD *thd, set_var *var) static bool check_tx_isolation(sys_var *self, THD *thd, set_var *var)
{ {
......
...@@ -43,6 +43,8 @@ Created 11/11/1995 Heikki Tuuri ...@@ -43,6 +43,8 @@ Created 11/11/1995 Heikki Tuuri
#include "log0log.h" #include "log0log.h"
#include "os0file.h" #include "os0file.h"
#include "trx0sys.h" #include "trx0sys.h"
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
/********************************************************************** /**********************************************************************
These statistics are generated for heuristics used in estimating the These statistics are generated for heuristics used in estimating the
...@@ -1165,7 +1167,9 @@ buf_flush_wait_batch_end( ...@@ -1165,7 +1167,9 @@ buf_flush_wait_batch_end(
{ {
ut_ad((type == BUF_FLUSH_LRU) || (type == BUF_FLUSH_LIST)); ut_ad((type == BUF_FLUSH_LRU) || (type == BUF_FLUSH_LIST));
thd_wait_begin(NULL, THD_WAIT_DISKIO);
os_event_wait(buf_pool->no_flush[type]); os_event_wait(buf_pool->no_flush[type]);
thd_wait_end(NULL);
} }
/******************************************************************//** /******************************************************************//**
......
...@@ -37,6 +37,8 @@ Created 11/5/1995 Heikki Tuuri ...@@ -37,6 +37,8 @@ Created 11/5/1995 Heikki Tuuri
#include "os0file.h" #include "os0file.h"
#include "srv0start.h" #include "srv0start.h"
#include "srv0srv.h" #include "srv0srv.h"
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
/** The linear read-ahead area size */ /** The linear read-ahead area size */
#define BUF_READ_AHEAD_LINEAR_AREA BUF_READ_AHEAD_AREA #define BUF_READ_AHEAD_LINEAR_AREA BUF_READ_AHEAD_AREA
...@@ -135,6 +137,7 @@ buf_read_page_low( ...@@ -135,6 +137,7 @@ buf_read_page_low(
ut_ad(buf_page_in_file(bpage)); ut_ad(buf_page_in_file(bpage));
thd_wait_begin(NULL, THD_WAIT_DISKIO);
if (zip_size) { if (zip_size) {
*err = fil_io(OS_FILE_READ | wake_later, *err = fil_io(OS_FILE_READ | wake_later,
sync, space, zip_size, offset, 0, zip_size, sync, space, zip_size, offset, 0, zip_size,
...@@ -146,6 +149,7 @@ buf_read_page_low( ...@@ -146,6 +149,7 @@ buf_read_page_low(
sync, space, 0, offset, 0, UNIV_PAGE_SIZE, sync, space, 0, offset, 0, UNIV_PAGE_SIZE,
((buf_block_t*) bpage)->frame, bpage); ((buf_block_t*) bpage)->frame, bpage);
} }
thd_wait_end(NULL);
ut_a(*err == DB_SUCCESS); ut_a(*err == DB_SUCCESS);
if (sync) { if (sync) {
......
...@@ -103,6 +103,8 @@ Created 10/8/1995 Heikki Tuuri ...@@ -103,6 +103,8 @@ Created 10/8/1995 Heikki Tuuri
#include "ha_prototypes.h" #include "ha_prototypes.h"
#include "trx0i_s.h" #include "trx0i_s.h"
#include "os0sync.h" /* for HAVE_ATOMIC_BUILTINS */ #include "os0sync.h" /* for HAVE_ATOMIC_BUILTINS */
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
/* This is set to TRUE if the MySQL user has set it in MySQL; currently /* This is set to TRUE if the MySQL user has set it in MySQL; currently
affects only FOREIGN KEY definition parsing */ affects only FOREIGN KEY definition parsing */
...@@ -1186,7 +1188,9 @@ retry: ...@@ -1186,7 +1188,9 @@ retry:
trx->op_info = "waiting in InnoDB queue"; trx->op_info = "waiting in InnoDB queue";
thd_wait_begin(trx->mysql_thd, THD_WAIT_ROW_TABLE_LOCK);
os_event_wait(slot->event); os_event_wait(slot->event);
thd_wait_end(trx->mysql_thd);
trx->op_info = ""; trx->op_info = "";
...@@ -1551,7 +1555,9 @@ srv_suspend_mysql_thread( ...@@ -1551,7 +1555,9 @@ srv_suspend_mysql_thread(
/* Suspend this thread and wait for the event. */ /* Suspend this thread and wait for the event. */
thd_wait_begin(trx->mysql_thd, THD_WAIT_ROW_TABLE_LOCK);
os_event_wait(event); os_event_wait(event);
thd_wait_end(trx->mysql_thd);
/* After resuming, reacquire the data dictionary latch if /* After resuming, reacquire the data dictionary latch if
necessary. */ necessary. */
......
...@@ -44,6 +44,11 @@ static my_bool no_poll_read(Vio *vio __attribute__((unused)), ...@@ -44,6 +44,11 @@ static my_bool no_poll_read(Vio *vio __attribute__((unused)),
#endif #endif
static my_bool has_no_data(Vio *vio __attribute__((unused)))
{
return FALSE;
}
/* /*
* Helper to fill most of the Vio* with defaults. * Helper to fill most of the Vio* with defaults.
*/ */
...@@ -83,6 +88,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type, ...@@ -83,6 +88,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->poll_read =no_poll_read; vio->poll_read =no_poll_read;
vio->is_connected =vio_is_connected_pipe; vio->is_connected =vio_is_connected_pipe;
vio->has_data =has_no_data;
vio->timeout=vio_win32_timeout; vio->timeout=vio_win32_timeout;
/* Set default timeout */ /* Set default timeout */
...@@ -110,6 +116,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type, ...@@ -110,6 +116,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->poll_read =no_poll_read; vio->poll_read =no_poll_read;
vio->is_connected =vio_is_connected_shared_memory; vio->is_connected =vio_is_connected_shared_memory;
vio->has_data =has_no_data;
/* Currently, shared memory is on Windows only, hence the below is ok*/ /* Currently, shared memory is on Windows only, hence the below is ok*/
vio->timeout= vio_win32_timeout; vio->timeout= vio_win32_timeout;
...@@ -137,6 +144,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type, ...@@ -137,6 +144,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->timeout =vio_timeout; vio->timeout =vio_timeout;
vio->poll_read =vio_poll_read; vio->poll_read =vio_poll_read;
vio->is_connected =vio_is_connected; vio->is_connected =vio_is_connected;
vio->has_data =vio_ssl_has_data;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
#endif /* HAVE_OPENSSL */ #endif /* HAVE_OPENSSL */
...@@ -155,6 +163,8 @@ static void vio_init(Vio* vio, enum enum_vio_type type, ...@@ -155,6 +163,8 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->timeout =vio_timeout; vio->timeout =vio_timeout;
vio->poll_read =vio_poll_read; vio->poll_read =vio_poll_read;
vio->is_connected =vio_is_connected; vio->is_connected =vio_is_connected;
vio->has_data= (flags & VIO_BUFFERED_READ) ?
vio_buff_has_data : has_no_data;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
......
...@@ -49,6 +49,7 @@ int vio_close_shared_memory(Vio * vio); ...@@ -49,6 +49,7 @@ int vio_close_shared_memory(Vio * vio);
#endif #endif
void vio_timeout(Vio *vio,uint which, uint timeout); void vio_timeout(Vio *vio,uint which, uint timeout);
my_bool vio_buff_has_data(Vio *vio);
#ifdef HAVE_OPENSSL #ifdef HAVE_OPENSSL
#include "my_net.h" /* needed because of struct in_addr */ #include "my_net.h" /* needed because of struct in_addr */
...@@ -62,5 +63,7 @@ void vio_ssl_delete(Vio *vio); ...@@ -62,5 +63,7 @@ void vio_ssl_delete(Vio *vio);
int vio_ssl_blocking(Vio *vio, my_bool set_blocking_mode, my_bool *old_mode); int vio_ssl_blocking(Vio *vio, my_bool set_blocking_mode, my_bool *old_mode);
my_bool vio_ssl_has_data(Vio *vio);
#endif /* HAVE_OPENSSL */ #endif /* HAVE_OPENSSL */
#endif /* VIO_PRIV_INCLUDED */ #endif /* VIO_PRIV_INCLUDED */
...@@ -98,6 +98,10 @@ size_t vio_read_buff(Vio *vio, uchar* buf, size_t size) ...@@ -98,6 +98,10 @@ size_t vio_read_buff(Vio *vio, uchar* buf, size_t size)
#undef VIO_UNBUFFERED_READ_MIN_SIZE #undef VIO_UNBUFFERED_READ_MIN_SIZE
} }
my_bool vio_buff_has_data(Vio *vio)
{
return (vio->read_pos != vio->read_end);
}
size_t vio_write(Vio * vio, const uchar* buf, size_t size) size_t vio_write(Vio * vio, const uchar* buf, size_t size)
{ {
......
...@@ -274,6 +274,9 @@ int vio_ssl_blocking(Vio *vio __attribute__((unused)), ...@@ -274,6 +274,9 @@ int vio_ssl_blocking(Vio *vio __attribute__((unused)),
return (set_blocking_mode ? 0 : 1); return (set_blocking_mode ? 0 : 1);
} }
my_bool vio_ssl_has_data(Vio *vio)
{
return SSL_pending(vio->ssl_arg) > 0 ? TRUE : FALSE;
}
#endif /* HAVE_OPENSSL */ #endif /* HAVE_OPENSSL */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment