Commit cd347705 authored by Mats Kindahl's avatar Mats Kindahl

Merging with mysql-5.5-stage.

parents 5cb0cceb 71553c2c
......@@ -26,6 +26,8 @@ HEADERS_ABI = mysql.h mysql_com.h mysql_time.h \
pkginclude_HEADERS = $(HEADERS_ABI) my_dbug.h m_string.h my_sys.h \
my_xml.h mysql_embed.h mysql/services.h \
mysql/service_my_snprintf.h mysql/service_thd_alloc.h \
mysql/service_thread_scheduler.h \
mysql/service_thd_wait.h \
my_pthread.h my_no_pthread.h \
decimal.h errmsg.h my_global.h my_net.h \
my_getopt.h sslopt-longopts.h my_dir.h \
......
......@@ -71,7 +71,7 @@ typedef struct st_mysql_xid MYSQL_XID;
Plugin API. Common for all plugin types.
*/
#define MYSQL_PLUGIN_INTERFACE_VERSION 0x0101
#define MYSQL_PLUGIN_INTERFACE_VERSION 0x0102
/*
The allowable types of plugins
......
......@@ -30,6 +30,27 @@ void *thd_memdup(void* thd, const void* str, unsigned int size);
MYSQL_LEX_STRING *thd_make_lex_string(void* thd, MYSQL_LEX_STRING *lex_str,
const char *str, unsigned int size,
int allocate_lex_string);
#include <mysql/service_thd_wait.h>
typedef enum _thd_wait_type_e {
THD_WAIT_MUTEX= 1,
THD_WAIT_DISKIO= 2,
THD_WAIT_ROW_TABLE_LOCK= 3,
THD_WAIT_GLOBAL_LOCK= 4
} thd_wait_type;
extern struct thd_wait_service_st {
void (*thd_wait_begin_func)(void*, thd_wait_type);
void (*thd_wait_end_func)(void*);
} *thd_wait_service;
void thd_wait_begin(void* thd, thd_wait_type wait_type);
void thd_wait_end(void* thd);
#include <mysql/service_thread_scheduler.h>
struct scheduler_functions;
extern struct my_thread_scheduler_service {
int (*set)(struct scheduler_functions *scheduler);
int (*reset)();
} *my_thread_scheduler_service;
int my_thread_scheduler_set(struct scheduler_functions *scheduler);
int my_thread_scheduler_reset();
struct st_mysql_xid {
long formatID;
long gtrid_length;
......
/* Copyright (C) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef MYSQL_SERVICE_THD_WAIT_INCLUDED
#define MYSQL_SERVICE_THD_WAIT_INCLUDED
/**
@file include/mysql/service_thd_wait.h
This service provides functions for plugins and storage engines to report
when they are going to sleep/stall.
SYNOPSIS
thd_wait_begin() - call just before a wait begins
thd Thread object
Use NULL if the thd is NOT known.
wait_type Type of wait
1 -- short wait (e.g. for mutex)
2 -- medium wait (e.g. for disk io)
3 -- large wait (e.g. for locked row/table)
NOTES
This is used by the threadpool to have better knowledge of which
threads that currently are actively running on CPUs. When a thread
reports that it's going to sleep/stall, the threadpool scheduler is
free to start another thread in the pool most likely. The expected wait
time is simply an indication of how long the wait is expected to
become, the real wait time could be very different.
thd_wait_end() called immediately after the wait is complete
thd_wait_end() MUST be called if thd_wait_begin() was called.
Using thd_wait_...() service is optional but recommended. Using it will
improve performance as the thread pool will be more active at managing the
thread workload.
*/
#ifdef __cplusplus
extern "C" {
#endif
typedef enum _thd_wait_type_e {
THD_WAIT_MUTEX= 1,
THD_WAIT_DISKIO= 2,
THD_WAIT_ROW_TABLE_LOCK= 3,
THD_WAIT_GLOBAL_LOCK= 4
} thd_wait_type;
extern struct thd_wait_service_st {
void (*thd_wait_begin_func)(MYSQL_THD, thd_wait_type);
void (*thd_wait_end_func)(MYSQL_THD);
} *thd_wait_service;
#ifdef MYSQL_DYNAMIC_PLUGIN
#define thd_wait_begin(_THD, _WAIT_TYPE) \
thd_wait_service->thd_wait_begin_func(_THD, _WAIT_TYPE)
#define thd_wait_end(_THD) thd_wait_service->thd_wait_end_func(_THD)
#else
void thd_wait_begin(MYSQL_THD thd, thd_wait_type wait_type);
void thd_wait_end(MYSQL_THD thd);
#endif
#ifdef __cplusplus
}
#endif
#endif
/*
Copyright (C) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef SERVICE_THREAD_SCHEDULER_INCLUDED
#define SERVICE_THREAD_SCHEDULER_INCLUDED
#ifdef __cplusplus
extern "C" {
#endif
struct scheduler_functions;
extern struct my_thread_scheduler_service {
int (*set)(struct scheduler_functions *scheduler);
int (*reset)();
} *my_thread_scheduler_service;
#ifdef MYSQL_DYNAMIC_PLUGIN
#define my_thread_scheduler_set(F) my_thread_scheduler_service->set((F))
#define my_thread_scheduler_reset() my_thread_scheduler_service->reset()
#else
/**
Set the thread scheduler to use for the server.
@param scheduler Pointer to scheduler callbacks to use.
@retval 0 Scheduler installed correctly.
@retval 1 Invalid value (NULL) used for scheduler.
*/
int my_thread_scheduler_set(struct scheduler_functions *scheduler);
/**
Restore the previous thread scheduler.
@note If no thread scheduler was installed previously with
thd_set_thread_scheduler, this function will report an error.
@retval 0 Scheduler installed correctly.
@retval 1 No scheduler installed.
*/
int my_thread_scheduler_reset();
#endif
#ifdef __cplusplus
}
#endif
#endif /* SERVICE_THREAD_SCHEDULER_INCLUDED */
......@@ -20,6 +20,8 @@ extern "C" {
#include <mysql/service_my_snprintf.h>
#include <mysql/service_thd_alloc.h>
#include <mysql/service_thd_wait.h>
#include <mysql/service_thread_scheduler.h>
#ifdef __cplusplus
}
......
......@@ -21,4 +21,5 @@
#define VERSION_my_snprintf 0x0100
#define VERSION_thd_alloc 0x0100
#define VERSION_thd_wait 0x0100
#define VERSION_my_thread_scheduler 0x0100
......@@ -155,6 +155,8 @@ void thr_downgrade_write_lock(THR_LOCK_DATA *data,
enum thr_lock_type new_lock_type);
my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data,
ulong lock_wait_timeout);
void thr_set_lock_wait_callback(void (*before_wait)(void),
void (*after_wait)(void));
#ifdef __cplusplus
}
#endif
......
......@@ -217,6 +217,7 @@ struct st_vio
void (*timeout)(Vio*, unsigned int which, unsigned int timeout);
my_bool (*poll_read)(Vio *vio, uint timeout);
my_bool (*is_connected)(Vio*);
my_bool (*has_data) (Vio*);
#ifdef HAVE_OPENSSL
void *ssl_arg;
#endif
......
......@@ -15,7 +15,11 @@
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include)
SET(MYSQLSERVICES_SOURCES my_snprintf_service.c thd_alloc_service.c)
SET(MYSQLSERVICES_SOURCES
my_snprintf_service.c
thd_alloc_service.c
thd_wait_service.c
my_thread_scheduler_service.c)
ADD_LIBRARY(mysqlservices ${MYSQLSERVICES_SOURCES})
INSTALL(TARGETS mysqlservices DESTINATION ${INSTALL_LIBDIR})
......@@ -15,5 +15,7 @@
AM_CPPFLAGS = -I$(top_srcdir)/include
pkglib_LIBRARIES = libmysqlservices.a
libmysqlservices_a_SOURCES = my_snprintf_service.c thd_alloc_service.c
libmysqlservices_a_SOURCES = my_snprintf_service.c thd_alloc_service.c \
thd_wait_service.c \
my_thread_scheduler_service.c
EXTRA_DIST = CMakeLists.txt
/*
Copyright (C) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
USA
*/
#include <service_versions.h>
SERVICE_VERSION my_thread_scheduler_service=
(void*)VERSION_my_thread_scheduler;
/*
Copyright (C) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <service_versions.h>
SERVICE_VERSION *thd_wait_service= (void*)VERSION_thd_wait;
......@@ -477,7 +477,7 @@ static my_bool win32_init_tcp_ip()
{
if (win32_have_tcpip())
{
WORD wVersionRequested = MAKEWORD( 2, 0 );
WORD wVersionRequested = MAKEWORD( 2, 2 );
WSADATA wsaData;
/* Be a good citizen: maybe another lib has already initialised
sockets, so dont clobber them unless necessary */
......
......@@ -91,6 +91,16 @@ enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
LIST *thr_lock_thread_list; /* List of threads in use */
ulong max_write_lock_count= ~(ulong) 0L;
static void (*before_lock_wait)(void)= 0;
static void (*after_lock_wait)(void)= 0;
void thr_set_lock_wait_callback(void (*before_wait)(void),
void (*after_wait)(void))
{
before_lock_wait= before_wait;
after_lock_wait= after_wait;
}
static inline mysql_cond_t *get_cond(void)
{
return &my_thread_var->suspend;
......@@ -431,6 +441,19 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
old_proc_info= proc_info_hook(NULL, "Table lock",
__func__, __FILE__, __LINE__);
/*
Since before_lock_wait potentially can create more threads to
scheduler work for, we don't want to call the before_lock_wait
callback unless it will really start to wait.
For similar reasons, we do not want to call before_lock_wait and
after_lock_wait for each lap around the loop, so we restrict
ourselves to call it before_lock_wait once before starting to wait
and once after the thread has exited the wait loop.
*/
if ((!thread_var->abort || in_wait_list) && before_lock_wait)
(*before_lock_wait)();
set_timespec(wait_timeout, lock_wait_timeout);
while (!thread_var->abort || in_wait_list)
{
......@@ -462,6 +485,14 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
/* purecov: end */
}
}
/*
We call the after_lock_wait callback once the wait loop has
finished.
*/
if (after_lock_wait)
(*after_lock_wait)();
DBUG_PRINT("thr_lock", ("aborted: %d in_wait_list: %d",
thread_var->abort, in_wait_list));
......
......@@ -93,6 +93,7 @@ struct show_table_authors_st show_table_authors[]= {
{ "Arjen Lentz", "Brisbane, Australia",
"Documentation (2001-2004), Dutch error messages, LOG2()" },
{ "Marc Liyanage", "", "Created Mac OS X packages" },
{ "Kelly Long", "Denver, CO, USA", "Pool Of Threads" },
{ "Zarko Mocnik", "", "Sorting for Slovenian language" },
{ "Per-Erik Martin", "Uppsala, Sweden", "Stored Procedures (5.0)" },
{ "Alexis Mikhailov", "", "User-defined functions" },
......
......@@ -64,7 +64,9 @@
#include "events.h"
#include "sql_audit.h"
#include "probes_mysql.h"
#include "scheduler.h"
#include "debug_sync.h"
#include "sql_callback.h"
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
#include "../storage/perfschema/pfs_server.h"
......@@ -455,7 +457,7 @@ ulong slave_trans_retries;
uint slave_net_timeout;
uint slave_exec_mode_options;
ulonglong slave_type_conversions_options;
ulong thread_cache_size=0, thread_pool_size= 0;
ulong thread_cache_size=0;
ulong binlog_cache_size=0;
ulonglong max_binlog_cache_size=0;
ulong query_cache_size=0;
......@@ -897,8 +899,6 @@ my_bool opt_enable_shared_memory;
HANDLE smem_event_connect_request= 0;
#endif
scheduler_functions thread_scheduler;
my_bool opt_use_ssl = 0;
char *opt_ssl_ca= NULL, *opt_ssl_capath= NULL, *opt_ssl_cert= NULL,
*opt_ssl_cipher= NULL, *opt_ssl_key= NULL;
......@@ -1086,7 +1086,8 @@ static void close_connections(void)
continue;
tmp->killed= THD::KILL_CONNECTION;
thread_scheduler.post_kill_notification(tmp);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp));
mysql_mutex_lock(&tmp->LOCK_thd_data);
if (tmp->mysys_var)
{
tmp->mysys_var->abort=1;
......@@ -1099,6 +1100,7 @@ static void close_connections(void)
}
mysql_mutex_unlock(&tmp->mysys_var->mutex);
}
mysql_mutex_unlock(&tmp->LOCK_thd_data);
}
mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list
......@@ -1478,7 +1480,7 @@ void clean_up(bool print_message)
if (print_message && my_default_lc_messages && server_start_time)
sql_print_information(ER_DEFAULT(ER_SHUTDOWN_COMPLETE),my_progname);
cleanup_errmsgs();
thread_scheduler.end();
MYSQL_CALLBACK(thread_scheduler, end, ());
finish_client_errs();
DBUG_PRINT("quit", ("Error messages freed"));
/* Tell main we are ready */
......@@ -1753,7 +1755,7 @@ static void network_init(void)
DBUG_ENTER("network_init");
LINT_INIT(ret);
if (thread_scheduler.init())
if (MYSQL_CALLBACK_ELSE(thread_scheduler, init, (), 0))
unireg_abort(1); /* purecov: inspected */
set_ports();
......@@ -2001,7 +2003,7 @@ extern "C" sig_handler end_thread_signal(int sig __attribute__((unused)))
if (thd && ! thd->bootstrap)
{
statistic_increment(killed_threads, &LOCK_status);
thread_scheduler.end_thread(thd,0); /* purecov: inspected */
MYSQL_CALLBACK(thread_scheduler, end_thread, (thd,0)); /* purecov: inspected */
}
DBUG_VOID_RETURN; /* purecov: deadcode */
}
......@@ -2397,7 +2399,7 @@ and this may fail.\n\n");
(ulong) dflt_key_cache->key_cache_mem_size);
fprintf(stderr, "read_buffer_size=%ld\n", (long) global_system_variables.read_buff_size);
fprintf(stderr, "max_used_connections=%lu\n", max_used_connections);
fprintf(stderr, "max_threads=%u\n", thread_scheduler.max_threads);
fprintf(stderr, "max_threads=%u\n", thread_scheduler->max_threads);
fprintf(stderr, "thread_count=%u\n", thread_count);
fprintf(stderr, "connection_count=%u\n", connection_count);
fprintf(stderr, "It is possible that mysqld could use up to \n\
......@@ -2405,7 +2407,7 @@ key_buffer_size + (read_buffer_size + sort_buffer_size)*max_threads = %lu K\n\
bytes of memory\n", ((ulong) dflt_key_cache->key_cache_mem_size +
(global_system_variables.read_buff_size +
global_system_variables.sortbuff_size) *
thread_scheduler.max_threads +
thread_scheduler->max_threads +
max_connections * sizeof(THD)) / 1024);
fprintf(stderr, "Hope that's ok; if not, decrease some variables in the equation.\n\n");
......@@ -2652,7 +2654,7 @@ pthread_handler_t signal_hand(void *arg __attribute__((unused)))
This should actually be '+ max_number_of_slaves' instead of +10,
but the +10 should be quite safe.
*/
init_thr_alarm(thread_scheduler.max_threads +
init_thr_alarm(thread_scheduler->max_threads +
global_system_variables.max_insert_delayed_threads + 10);
if (thd_lib_detected != THD_LIB_LT && (test_flags & TEST_SIGINT))
{
......@@ -4356,23 +4358,6 @@ int mysqld_main(int argc, char **argv)
}
#endif
#ifdef __WIN__
/*
Before performing any socket operation (like retrieving hostname
in init_common_variables we have to call WSAStartup
*/
{
WSADATA WsaData;
if (SOCKET_ERROR == WSAStartup (0x0101, &WsaData))
{
/* errors are not read yet, so we use english text here */
my_message(ER_WSAS_FAILED, "WSAStartup Failed", MYF(0));
/* Not enough initializations for unireg_abort() */
return 1;
}
}
#endif /* __WIN__ */
if (init_common_variables())
unireg_abort(1); // Will do exit
......@@ -5029,7 +5014,7 @@ static void create_new_thread(THD *thd)
thread_count++;
thread_scheduler.add_connection(thd);
MYSQL_CALLBACK(thread_scheduler, add_connection, (thd));
DBUG_VOID_RETURN;
}
......@@ -7344,14 +7329,12 @@ static int get_options(int *argc_ptr, char ***argv_ptr)
return 1;
#ifdef EMBEDDED_LIBRARY
one_thread_scheduler(&thread_scheduler);
one_thread_scheduler();
#else
if (thread_handling <= SCHEDULER_ONE_THREAD_PER_CONNECTION)
one_thread_per_connection_scheduler(&thread_scheduler);
else if (thread_handling == SCHEDULER_NO_THREADS)
one_thread_scheduler(&thread_scheduler);
else
pool_of_threads_scheduler(&thread_scheduler); /* purecov: tested */
one_thread_per_connection_scheduler();
else /* thread_handling == SCHEDULER_NO_THREADS) */
one_thread_scheduler();
#endif
global_system_variables.engine_condition_pushdown=
......
......@@ -175,7 +175,7 @@ extern ulong binlog_cache_size, open_files_limit;
extern ulonglong max_binlog_cache_size;
extern ulong max_binlog_size, max_relay_log_size;
extern ulong opt_binlog_rows_event_max_size;
extern ulong rpl_recovery_rank, thread_cache_size, thread_pool_size;
extern ulong rpl_recovery_rank, thread_cache_size;
extern ulong back_log;
extern char language[FN_REFLEN];
extern ulong server_id, concurrency;
......@@ -207,7 +207,7 @@ extern my_bool old_mode;
extern LEX_STRING opt_init_connect, opt_init_slave;
extern int bootstrap_error;
extern I_List<THD> threads;
extern scheduler_functions thread_scheduler;
extern char err_shared_dir[];
extern TYPELIB thread_handling_typelib;
extern my_decimal decimal_zero;
......
......@@ -25,55 +25,103 @@
#include "unireg.h" // REQUIRED: for other includes
#include "scheduler.h"
#include "sql_connect.h" // init_new_connection_handler_thread
#include "scheduler.h"
#include "sql_callback.h"
/*
'Dummy' functions to be used when we don't need any handling for a scheduler
event
*/
End connection, in case when we are using 'no-threads'
*/
static bool init_dummy(void) {return 0;}
static void post_kill_dummy(THD* thd) {}
static void end_dummy(void) {}
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
static bool no_threads_end(THD *thd, bool put_in_cache)
{
unlink_thd(thd);
mysql_mutex_unlock(&LOCK_thread_count);
return 1; // Abort handle_one_connection
}
/*
Initialize default scheduler with dummy functions so that setup functions
only need to declare those that are relvant for their usage
static scheduler_functions one_thread_scheduler_functions=
{
1, // max_threads
NULL, // init
init_new_connection_handler_thread, // init_new_connection_thread
#ifndef EMBEDDED_LIBRARY
handle_connection_in_main_thread, // add_connection
#else
NULL, // add_connection
#endif // EMBEDDED_LIBRARY
NULL, // thd_wait_begin
NULL, // thd_wait_end
NULL, // post_kill_notification
no_threads_end, // end_thread
NULL, // end
};
#ifndef EMBEDDED_LIBRARY
static scheduler_functions one_thread_per_connection_scheduler_functions=
{
0, // max_threads
NULL, // init
init_new_connection_handler_thread, // init_new_connection_thread
create_thread_to_handle_connection, // add_connection
NULL, // thd_wait_begin
NULL, // thd_wait_end
NULL, // post_kill_notification
one_thread_per_connection_end, // end_thread
NULL, // end
};
#endif // EMBEDDED_LIBRARY
scheduler_functions *thread_scheduler= NULL;
/** @internal
Helper functions to allow mysys to call the thread scheduler when
waiting for locks.
*/
scheduler_functions::scheduler_functions()
:init(init_dummy),
init_new_connection_thread(init_new_connection_handler_thread),
add_connection(0), // Must be defined
post_kill_notification(post_kill_dummy),
end_thread(end_thread_dummy), end(end_dummy)
{}
/**@{*/
static void scheduler_wait_begin(void) {
MYSQL_CALLBACK(thread_scheduler,
thd_wait_begin, (current_thd, THD_WAIT_ROW_TABLE_LOCK));
}
static void scheduler_wait_end(void) {
MYSQL_CALLBACK(thread_scheduler, thd_wait_end, (current_thd));
}
/**@}*/
/**
Common scheduler init function.
The scheduler is either initialized by calling
one_thread_scheduler() or one_thread_per_connection_scheduler() in
mysqld.cc, so this init function will always be called.
*/
static void scheduler_init() {
thr_set_lock_wait_callback(scheduler_wait_begin, scheduler_wait_end);
}
/*
End connection, in case when we are using 'no-threads'
Initialize scheduler for --thread-handling=one-thread-per-connection
*/
static bool no_threads_end(THD *thd, bool put_in_cache)
#ifndef EMBEDDED_LIBRARY
void one_thread_per_connection_scheduler()
{
unlink_thd(thd);
mysql_mutex_unlock(&LOCK_thread_count);
return 1; // Abort handle_one_connection
scheduler_init();
one_thread_per_connection_scheduler_functions.max_threads= max_connections;
thread_scheduler= &one_thread_per_connection_scheduler_functions;
}
#endif
/*
Initailize scheduler for --thread-handling=no-threads
*/
void one_thread_scheduler(scheduler_functions* func)
void one_thread_scheduler()
{
func->max_threads= 1;
#ifndef EMBEDDED_LIBRARY
func->add_connection= handle_connection_in_main_thread;
#endif
func->init_new_connection_thread= init_dummy;
func->end_thread= no_threads_end;
scheduler_init();
thread_scheduler= &one_thread_scheduler_functions;
}
......@@ -81,11 +129,58 @@ void one_thread_scheduler(scheduler_functions* func)
Initialize scheduler for --thread-handling=one-thread-per-connection
*/
#ifndef EMBEDDED_LIBRARY
void one_thread_per_connection_scheduler(scheduler_functions* func)
/*
thd_scheduler keeps the link between THD and events.
It's embedded in the THD class.
*/
thd_scheduler::thd_scheduler()
: m_psi(NULL), data(NULL)
{
#ifndef DBUG_OFF
dbug_explain[0]= '\0';
set_explain= FALSE;
#endif
}
thd_scheduler::~thd_scheduler()
{
}
static scheduler_functions *saved_thread_scheduler;
static uint saved_thread_handling;
extern "C"
int my_thread_scheduler_set(scheduler_functions *scheduler)
{
DBUG_ASSERT(scheduler != 0);
if (scheduler == NULL)
return 1;
saved_thread_scheduler= thread_scheduler;
saved_thread_handling= thread_handling;
thread_scheduler= scheduler;
// Scheduler loaded dynamically
thread_handling= SCHEDULER_TYPES_COUNT;
return 0;
}
extern "C"
int my_thread_scheduler_reset()
{
func->max_threads= max_connections;
func->add_connection= create_thread_to_handle_connection;
func->end_thread= one_thread_per_connection_end;
DBUG_ASSERT(saved_thread_scheduler != NULL);
if (saved_thread_scheduler == NULL)
return 1;
thread_scheduler= saved_thread_scheduler;
thread_handling= saved_thread_handling;
saved_thread_scheduler= 0;
return 0;
}
#endif /* EMBEDDED_LIBRARY */
......@@ -28,38 +28,77 @@ class THD;
/* Functions used when manipulating threads */
class scheduler_functions
struct scheduler_functions
{
public:
uint max_threads;
bool (*init)(void);
bool (*init_new_connection_thread)(void);
void (*add_connection)(THD *thd);
void (*thd_wait_begin)(THD *thd, int wait_type);
void (*thd_wait_end)(THD *thd);
void (*post_kill_notification)(THD *thd);
bool (*end_thread)(THD *thd, bool cache_thread);
void (*end)(void);
scheduler_functions();
};
/**
Scheduler types enumeration.
The default of --thread-handling is the first one in the
thread_handling_names array, this array has to be consistent with
the order in this array, so to change default one has to change the
first entry in this enum and the first entry in the
thread_handling_names array.
@note The last entry of the enumeration is also used to mark the
thread handling as dynamic. In this case the name of the thread
handling is fetched from the name of the plugin that implements it.
*/
enum scheduler_types
{
SCHEDULER_ONE_THREAD_PER_CONNECTION=0,
SCHEDULER_NO_THREADS,
SCHEDULER_POOL_OF_THREADS
SCHEDULER_TYPES_COUNT
};
void one_thread_per_connection_scheduler(scheduler_functions* func);
void one_thread_scheduler(scheduler_functions* func);
void one_thread_per_connection_scheduler();
void one_thread_scheduler();
enum pool_command_op
{
NOT_IN_USE_OP= 0, NORMAL_OP= 1, CONNECT_OP, KILL_OP, DIE_OP
};
#define HAVE_POOL_OF_THREADS 0 /* For easyer tests */
#define pool_of_threads_scheduler(A) one_thread_per_connection_scheduler(A)
/*
To be used for pool-of-threads (implemeneted differently on various OSs)
*/
class thd_scheduler
{};
{
public:
/*
Thread instrumentation for the user job.
This member holds the instrumentation while the user job is not run
by a thread.
Note that this member is not conditionally declared
(ifdef HAVE_PSI_INTERFACE), because doing so will change the binary
layout of THD, which is exposed to plugin code that may be compiled
differently.
*/
PSI_thread *m_psi;
void *data; /* scheduler-specific data structure */
# ifndef DBUG_OFF
char dbug_explain[512];
bool set_explain;
# endif
#endif /* SCHEDULER_INCLUDED */
thd_scheduler();
~thd_scheduler();
};
extern scheduler_functions *thread_scheduler;
#endif
/*
Copyright (C) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef SQL_CALLBACK_INCLUDED
#define SQL_CALLBACK_INCLUDED
/**
Macro used for an internal callback.
The macro will check that the object exists and that the function
is defined. If that is the case, it will call the function with the
given parameters.
If the object or the function is not defined, the callback will be
considered successful (nothing needed to be done) and will
therefore return no error.
*/
#define MYSQL_CALLBACK(OBJ, FUNC, PARAMS) \
do { \
if ((OBJ) && ((OBJ)->FUNC)) \
(OBJ)->FUNC PARAMS; \
} while (0)
#define MYSQL_CALLBACK_ELSE(OBJ, FUNC, PARAMS, ELSE) \
(((OBJ) && ((OBJ)->FUNC)) ? (OBJ)->FUNC PARAMS : (ELSE))
#endif /* SQL_CALLBACK_INCLUDED */
......@@ -57,6 +57,7 @@
#include "transaction.h"
#include "debug_sync.h"
#include "sql_parse.h" // is_update_query
#include "sql_callback.h"
/*
The following is used to initialise Table_ident with a internal
......@@ -1070,6 +1071,7 @@ THD::~THD()
DBUG_ENTER("~THD()");
/* Ensure that no one is using THD */
mysql_mutex_lock(&LOCK_thd_data);
mysys_var=0; // Safety (shouldn't be needed)
mysql_mutex_unlock(&LOCK_thd_data);
add_to_status(&global_status_var, &status_var);
......@@ -1095,7 +1097,6 @@ THD::~THD()
my_free(db);
db= NULL;
free_root(&transaction.mem_root,MYF(0));
mysys_var=0; // Safety (shouldn't be needed)
mysql_mutex_destroy(&LOCK_thd_data);
#ifndef DBUG_OFF
dbug_sentry= THD_SENTRY_GONE;
......@@ -1184,7 +1185,7 @@ void THD::awake(THD::killed_state state_to_set)
{
thr_alarm_kill(thread_id);
if (!slave_thread)
thread_scheduler.post_kill_notification(this);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (this));
#ifdef SIGNAL_WITH_VIO_CLOSE
if (this != current_thd)
{
......@@ -1253,6 +1254,15 @@ bool THD::store_globals()
if (my_pthread_setspecific_ptr(THR_THD, this) ||
my_pthread_setspecific_ptr(THR_MALLOC, &mem_root))
return 1;
/*
mysys_var is concurrently readable by a killer thread.
It is protected by LOCK_thd_data, it is not needed to lock while the
pointer is changing from NULL not non-NULL. If the kill thread reads
NULL it doesn't refer to anything, but if it is non-NULL we need to
ensure that the thread doesn't proceed to assign another thread to
have the mysys_var reference (which in fact refers to the worker
threads local storage with key THR_KEY_mysys.
*/
mysys_var=my_thread_var;
/*
Let mysqld define the thread id (not mysys)
......@@ -3182,6 +3192,60 @@ extern "C" bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd)
{
return sqlcom_can_generate_row_events(thd);
}
#ifndef EMBEDDED_LIBRARY
extern "C" void thd_pool_wait_begin(MYSQL_THD thd, int wait_type);
extern "C" void thd_pool_wait_end(MYSQL_THD thd);
/*
Interface for MySQL Server, plugins and storage engines to report
when they are going to sleep/stall.
SYNOPSIS
thd_wait_begin()
thd Thread object
wait_type Type of wait
1 -- short wait (e.g. for mutex)
2 -- medium wait (e.g. for disk io)
3 -- large wait (e.g. for locked row/table)
NOTES
This is used by the threadpool to have better knowledge of which
threads that currently are actively running on CPUs. When a thread
reports that it's going to sleep/stall, the threadpool scheduler is
free to start another thread in the pool most likely. The expected wait
time is simply an indication of how long the wait is expected to
become, the real wait time could be very different.
thd_wait_end MUST be called immediately after waking up again.
*/
extern "C" void thd_wait_begin(MYSQL_THD thd, thd_wait_type wait_type)
{
MYSQL_CALLBACK(thread_scheduler, thd_wait_begin, (thd, wait_type));
}
/**
Interface for MySQL Server, plugins and storage engines to report
when they waking up from a sleep/stall.
@param thd Thread handle
*/
extern "C" void thd_wait_end(MYSQL_THD thd)
{
MYSQL_CALLBACK(thread_scheduler, thd_wait_end, (thd));
}
#else
extern "C" void thd_wait_begin(MYSQL_THD thd, thd_wait_type wait_type)
{
/* do NOTHING for the embedded library */
return;
}
extern "C" void thd_wait_end(MYSQL_THD thd)
{
/* do NOTHING for the embedded library */
return;
}
#endif
#endif // INNODB_COMPATIBILITY_HOOKS */
/****************************************************************************
......@@ -3365,6 +3429,13 @@ void THD::set_query_id(query_id_t new_query_id)
mysql_mutex_unlock(&LOCK_thd_data);
}
/** Assign a new value to thd->mysys_var. */
void THD::set_mysys_var(struct st_my_thread_var *new_mysys_var)
{
mysql_mutex_lock(&LOCK_thd_data);
mysys_var= new_mysys_var;
mysql_mutex_unlock(&LOCK_thd_data);
}
/**
Leave explicit LOCK TABLES or prelocked mode and restore value of
......
......@@ -1634,6 +1634,10 @@ public:
xid_state.xid.null();
free_root(&mem_root,MYF(MY_KEEP_PREALLOC));
}
my_bool is_active()
{
return (all.ha_list != NULL);
}
st_transactions()
{
bzero((char*)this, sizeof(*this));
......@@ -2664,7 +2668,7 @@ public:
virtual void set_statement(Statement *stmt);
/**
Assign a new value to thd->query and thd->query_id.
Assign a new value to thd->query and thd->query_id and mysys_var.
Protected with LOCK_thd_data mutex.
*/
void set_query(char *query_arg, uint32 query_length_arg);
......@@ -2677,6 +2681,7 @@ public:
open_tables= open_tables_arg;
mysql_mutex_unlock(&LOCK_thd_data);
}
void set_mysys_var(struct st_my_thread_var *new_mysys_var);
void enter_locked_tables_mode(enum_locked_tables_mode mode_arg)
{
DBUG_ASSERT(locked_tables_mode == LTM_NONE);
......
......@@ -35,6 +35,7 @@
#include "hostname.h" // inc_host_errors, ip_to_hostname,
// reset_host_errors
#include "sql_acl.h" // acl_getroot, NO_ACCESS, SUPER_ACL
#include "sql_callback.h"
#if defined(HAVE_OPENSSL) && !defined(EMBEDDED_LIBRARY)
/*
......@@ -966,7 +967,7 @@ bool setup_connection_thread_globals(THD *thd)
{
close_connection(thd, ER_OUT_OF_RESOURCES, 1);
statistic_increment(aborted_connects,&LOCK_status);
thread_scheduler.end_thread(thd, 0);
MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
return 1; // Error
}
return 0;
......@@ -989,7 +990,7 @@ bool setup_connection_thread_globals(THD *thd)
*/
static bool login_connection(THD *thd)
bool login_connection(THD *thd)
{
NET *net= &thd->net;
int error;
......@@ -1027,7 +1028,7 @@ static bool login_connection(THD *thd)
This mainly updates status variables
*/
static void end_connection(THD *thd)
void end_connection(THD *thd)
{
NET *net= &thd->net;
plugin_thdvar_cleanup(thd);
......@@ -1068,7 +1069,7 @@ static void end_connection(THD *thd)
Initialize THD to handle queries
*/
static void prepare_new_connection_state(THD* thd)
void prepare_new_connection_state(THD* thd)
{
Security_context *sctx= thd->security_ctx;
......@@ -1137,11 +1138,11 @@ void do_handle_one_connection(THD *thd_arg)
thd->thr_create_utime= my_micro_time();
if (thread_scheduler.init_new_connection_thread())
if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
{
close_connection(thd, ER_OUT_OF_RESOURCES, 1);
statistic_increment(aborted_connects,&LOCK_status);
thread_scheduler.end_thread(thd,0);
MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
return;
}
......@@ -1195,7 +1196,7 @@ void do_handle_one_connection(THD *thd_arg)
end_thread:
close_connection(thd, 0, 1);
if (thread_scheduler.end_thread(thd,1))
if (MYSQL_CALLBACK_ELSE(thread_scheduler, end_thread, (thd, 1), 0))
return; // Probably no-threads
/*
......
......@@ -40,4 +40,8 @@ int check_user(THD *thd, enum enum_server_command command,
const char *passwd, uint passwd_len, const char *db,
bool check_count);
bool login_connection(THD *thd);
void prepare_new_connection_state(THD* thd);
void end_connection(THD *thd);
#endif /* SQL_CONNECT_INCLUDED */
......@@ -36,9 +36,23 @@ static struct thd_alloc_service_st thd_alloc_handler= {
thd_make_lex_string
};
static struct thd_wait_service_st thd_wait_handler= {
thd_wait_begin,
thd_wait_end
};
static struct my_thread_scheduler_service my_thread_scheduler_handler= {
my_thread_scheduler_set,
my_thread_scheduler_reset,
};
static struct st_service_ref list_of_services[]=
{
{ "my_snprintf_service", VERSION_my_snprintf, &my_snprintf_handler },
{ "thd_alloc_service", VERSION_thd_alloc, &thd_alloc_handler }
{ "thd_alloc_service", VERSION_thd_alloc, &thd_alloc_handler },
{ "thd_wait_service", VERSION_thd_wait, &thd_wait_handler },
{ "my_thread_scheduler_service",
VERSION_my_thread_scheduler, &my_thread_scheduler_handler },
};
......@@ -1815,6 +1815,7 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose)
if ((thd_info->db=tmp->db)) // Safe test
thd_info->db=thd->strdup(thd_info->db);
thd_info->command=(int) tmp->command;
mysql_mutex_lock(&tmp->LOCK_thd_data);
if ((mysys_var= tmp->mysys_var))
mysql_mutex_lock(&mysys_var->mutex);
thd_info->proc_info= (char*) (tmp->killed == THD::KILL_CONNECTION? "Killed" : 0);
......@@ -1822,16 +1823,15 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose)
if (mysys_var)
mysql_mutex_unlock(&mysys_var->mutex);
thd_info->start_time= tmp->start_time;
thd_info->query=0;
/* Lock THD mutex that protects its data when looking at it. */
mysql_mutex_lock(&tmp->LOCK_thd_data);
if (tmp->query())
{
uint length= min(max_query_length, tmp->query_length());
thd_info->query= (char*) thd->strmake(tmp->query(),length);
}
mysql_mutex_unlock(&tmp->LOCK_thd_data);
thd_info->start_time= tmp->start_time;
thread_infos.append(thd_info);
}
}
......@@ -1918,6 +1918,7 @@ int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond)
table->field[3]->set_notnull();
}
mysql_mutex_lock(&tmp->LOCK_thd_data);
if ((mysys_var= tmp->mysys_var))
mysql_mutex_lock(&mysys_var->mutex);
/* COMMAND */
......@@ -1938,6 +1939,7 @@ int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond)
if (mysys_var)
mysql_mutex_unlock(&mysys_var->mutex);
mysql_mutex_unlock(&tmp->LOCK_thd_data);
/* INFO */
/* Lock THD mutex that protects its data when looking at it. */
......
......@@ -1669,19 +1669,13 @@ static Sys_var_ulong Sys_trans_prealloc_size(
static const char *thread_handling_names[]=
{
"one-thread-per-connection", "no-threads",
#if HAVE_POOL_OF_THREADS == 1
"pool-of-threads",
#endif
"one-thread-per-connection", "no-threads", "loaded-dynamically",
0
};
static Sys_var_enum Sys_thread_handling(
"thread_handling",
"Define threads usage for handling queries, one of "
"one-thread-per-connection, no-threads"
#if HAVE_POOL_OF_THREADS == 1
", pool-of-threads"
#endif
"one-thread-per-connection, no-threads, loaded-dynamically"
, READ_ONLY GLOBAL_VAR(thread_handling), CMD_LINE(REQUIRED_ARG),
thread_handling_names, DEFAULT(0));
......
......@@ -43,6 +43,8 @@ Created 11/11/1995 Heikki Tuuri
#include "log0log.h"
#include "os0file.h"
#include "trx0sys.h"
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
/**********************************************************************
These statistics are generated for heuristics used in estimating the
......@@ -1719,10 +1721,14 @@ buf_flush_wait_batch_end(
buf_pool = buf_pool_from_array(i);
thd_wait_begin(NULL, THD_WAIT_DISKIO);
os_event_wait(buf_pool->no_flush[type]);
thd_wait_end(NULL);
}
} else {
thd_wait_begin(NULL, THD_WAIT_DISKIO);
os_event_wait(buf_pool->no_flush[type]);
thd_wait_end(NULL);
}
}
......
......@@ -37,6 +37,8 @@ Created 11/5/1995 Heikki Tuuri
#include "os0file.h"
#include "srv0start.h"
#include "srv0srv.h"
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
/** The linear read-ahead area size */
#define BUF_READ_AHEAD_LINEAR_AREA BUF_READ_AHEAD_AREA
......@@ -135,6 +137,7 @@ buf_read_page_low(
ut_ad(buf_page_in_file(bpage));
thd_wait_begin(NULL, THD_WAIT_DISKIO);
if (zip_size) {
*err = fil_io(OS_FILE_READ | wake_later,
sync, space, zip_size, offset, 0, zip_size,
......@@ -146,6 +149,7 @@ buf_read_page_low(
sync, space, 0, offset, 0, UNIV_PAGE_SIZE,
((buf_block_t*) bpage)->frame, bpage);
}
thd_wait_end(NULL);
ut_a(*err == DB_SUCCESS);
if (sync) {
......
......@@ -84,6 +84,8 @@ Created 10/8/1995 Heikki Tuuri
#include "ha_prototypes.h"
#include "trx0i_s.h"
#include "os0sync.h" /* for HAVE_ATOMIC_BUILTINS */
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
/* This is set to TRUE if the MySQL user has set it in MySQL; currently
affects only FOREIGN KEY definition parsing */
......@@ -1232,7 +1234,9 @@ retry:
trx->op_info = "waiting in InnoDB queue";
thd_wait_begin(trx->mysql_thd, THD_WAIT_ROW_TABLE_LOCK);
os_event_wait(slot->event);
thd_wait_end(trx->mysql_thd);
trx->op_info = "";
......@@ -1597,7 +1601,9 @@ srv_suspend_mysql_thread(
/* Suspend this thread and wait for the event. */
thd_wait_begin(trx->mysql_thd, THD_WAIT_ROW_TABLE_LOCK);
os_event_wait(event);
thd_wait_end(trx->mysql_thd);
/* After resuming, reacquire the data dictionary latch if
necessary. */
......
......@@ -44,6 +44,11 @@ static my_bool no_poll_read(Vio *vio __attribute__((unused)),
#endif
static my_bool has_no_data(Vio *vio __attribute__((unused)))
{
return FALSE;
}
/*
* Helper to fill most of the Vio* with defaults.
*/
......@@ -83,6 +88,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->poll_read =no_poll_read;
vio->is_connected =vio_is_connected_pipe;
vio->has_data =has_no_data;
vio->timeout=vio_win32_timeout;
/* Set default timeout */
......@@ -110,6 +116,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->poll_read =no_poll_read;
vio->is_connected =vio_is_connected_shared_memory;
vio->has_data =has_no_data;
/* Currently, shared memory is on Windows only, hence the below is ok*/
vio->timeout= vio_win32_timeout;
......@@ -137,6 +144,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->timeout =vio_timeout;
vio->poll_read =vio_poll_read;
vio->is_connected =vio_is_connected;
vio->has_data =vio_ssl_has_data;
DBUG_VOID_RETURN;
}
#endif /* HAVE_OPENSSL */
......@@ -155,6 +163,8 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->timeout =vio_timeout;
vio->poll_read =vio_poll_read;
vio->is_connected =vio_is_connected;
vio->has_data= (flags & VIO_BUFFERED_READ) ?
vio_buff_has_data : has_no_data;
DBUG_VOID_RETURN;
}
......
......@@ -49,6 +49,7 @@ int vio_close_shared_memory(Vio * vio);
#endif
void vio_timeout(Vio *vio,uint which, uint timeout);
my_bool vio_buff_has_data(Vio *vio);
#ifdef HAVE_OPENSSL
#include "my_net.h" /* needed because of struct in_addr */
......@@ -62,5 +63,7 @@ void vio_ssl_delete(Vio *vio);
int vio_ssl_blocking(Vio *vio, my_bool set_blocking_mode, my_bool *old_mode);
my_bool vio_ssl_has_data(Vio *vio);
#endif /* HAVE_OPENSSL */
#endif /* VIO_PRIV_INCLUDED */
......@@ -98,6 +98,10 @@ size_t vio_read_buff(Vio *vio, uchar* buf, size_t size)
#undef VIO_UNBUFFERED_READ_MIN_SIZE
}
my_bool vio_buff_has_data(Vio *vio)
{
return (vio->read_pos != vio->read_end);
}
size_t vio_write(Vio * vio, const uchar* buf, size_t size)
{
......
......@@ -244,6 +244,9 @@ int vio_ssl_blocking(Vio *vio __attribute__((unused)),
return (set_blocking_mode ? 0 : 1);
}
my_bool vio_ssl_has_data(Vio *vio)
{
return SSL_pending(vio->ssl_arg) > 0 ? TRUE : FALSE;
}
#endif /* HAVE_OPENSSL */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment