Commit 8f329e8d authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-10384 Windows : Refactor threading in mysqld startup.

Remove threads that are doing nothing but wait
- main thread now handles the connections
(if threadpool is used, also threadpool threads would wait for connections)
- thread for socket and pipe connections are removed
- shutdown thread is now removed, we wait for shutdown
notification in main thread as well
- kill_server() is also called inside the main thread, after connection
loop finished.
parent 25ad38ab
......@@ -152,6 +152,7 @@ IF (CMAKE_SYSTEM_NAME MATCHES "Linux" OR
ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS)
IF(WIN32)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc)
SET(SQL_SOURCE ${SQL_SOURCE} handle_connections_win.cc)
ENDIF()
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc)
......
/* Copyright (c) 2018 MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
/* Accepting connections on Windows */
#include <my_global.h>
#include <sql_class.h>
#include <sql_connect.h>
#include <mysqld.h>
#include <mswsock.h>
#include <mysql/psi/mysql_socket.h>
#include <sddl.h>
#include <handle_connections_win.h>
/* From mysqld.cc */
extern HANDLE hEventShutdown;
extern MYSQL_SOCKET base_ip_sock, extra_ip_sock;
extern PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ();
extern void tp_win_callback_prolog();
static SECURITY_ATTRIBUTES pipe_security;
/**
Abstract base class for accepting new connection,
asynchronously (i.e the accept() operation can be posted,
and result is retrieved later) , and creating a new connection.
*/
struct Listener
{
/** Windows handle of the Listener.
Subclasses would use SOCKET or named pipe handle
*/
HANDLE m_handle;
/** Required for all async IO*/
OVERLAPPED m_overlapped;
/** Create new listener
@param handle - @see m_handle
@param wait_handle - usually, event handle or INVALID_HANDLE_VALUE
@see wait_handle
*/
Listener(HANDLE handle, HANDLE wait_handle):
m_handle(handle), m_overlapped()
{
m_overlapped.hEvent= wait_handle;
}
/**
if not NULL, this handle can be be used in WaitForSingle/MultipleObject(s).
This handle will be closed when object is destroyed.
If NULL, the completion notification happens in threadpool.
*/
HANDLE wait_handle()
{
return m_overlapped.hEvent;
}
/* Start waiting for new client connection. */
virtual void begin_accept()= 0;
/**
Completion callback,called whenever IO posted by begin_accept is finisjed
Listener needs to create a new THD then (or, call scheduler so it creates one)
@param success - whether IO completed successfull
*/
virtual void completion_callback(bool success)= 0;
/**
Completion callback for Listener, that uses events for waiting
to IO. Not suitable for threadpool etc. Retrieves the status of
completed IO from the OVERLAPPED structure
*/
void completion_callback()
{
DBUG_ASSERT(wait_handle() && (wait_handle() != INVALID_HANDLE_VALUE));
DWORD bytes;
return completion_callback(
GetOverlappedResult(wait_handle(), &m_overlapped, &bytes, FALSE));
}
/** Cancel an in-progress IO. Useful for threadpool-bound IO */
void cancel()
{
CancelIoEx(m_handle, &m_overlapped);
}
/* Destructor. Closes wait handle, if it was passed in constructor */
virtual ~Listener()
{
if (m_overlapped.hEvent)
CloseHandle(m_overlapped.hEvent);
};
};
/* Winsock extension finctions. */
static LPFN_ACCEPTEX my_AcceptEx;
static LPFN_GETACCEPTEXSOCKADDRS my_GetAcceptExSockaddrs;
/**
Listener that handles socket connections.
Can be threadpool-bound (i.e the completion is executed in threadpool thread),
or use events for waits.
Threadpool-bound listener should be used with theradpool scheduler, for better
performance.
*/
struct Socket_Listener: public Listener
{
/** Client socket passed to AcceptEx() call.*/
SOCKET m_client_socket;
/** Buffer for sockaddrs passed to AcceptEx()/GetAcceptExSockaddrs() */
char m_buffer[2 * sizeof(sockaddr_storage) + 32];
/* Threadpool IO struct.*/
PTP_IO m_tp_io;
/**
Callback for Windows threadpool's StartThreadpoolIo() function.
*/
static void CALLBACK tp_accept_completion_callback(
PTP_CALLBACK_INSTANCE, PVOID context, PVOID , ULONG io_result,
ULONG_PTR, PTP_IO io)
{
tp_win_callback_prolog();
Listener *listener= (Listener *)context;
if (io_result == ERROR_OPERATION_ABORTED)
{
/* ERROR_OPERATION_ABORTED caused by CancelIoEx()*/
CloseThreadpoolIo(io);
delete listener;
return;
}
listener->completion_callback(io_result == 0);
}
/**
Constructor
@param listen_socket - listening socket
@PTP_CALLBACK_ENVIRON callback_environ - threadpool environment, or NULL
if threadpool is not used for completion callbacks.
*/
Socket_Listener(MYSQL_SOCKET listen_socket, PTP_CALLBACK_ENVIRON callback_environ) :
Listener((HANDLE)listen_socket.fd,0),
m_client_socket(INVALID_SOCKET)
{
if (callback_environ)
{
/* Accept executed in threadpool. */
m_tp_io= CreateThreadpoolIo(m_handle,
tp_accept_completion_callback, this, callback_environ);
}
else
{
/* Completion signaled via event. */
m_tp_io= 0;
m_overlapped.hEvent= CreateEvent(0, FALSE , FALSE, 0);
}
}
/*
Use AcceptEx to asynchronously wait for new connection;
*/
void begin_accept()
{
retry :
m_client_socket= socket(server_socket_ai_family, SOCK_STREAM, IPPROTO_TCP);
if (m_client_socket == INVALID_SOCKET)
{
sql_perror("socket() call failed.");
unireg_abort(1);
}
DWORD bytes_received;
if (m_tp_io)
StartThreadpoolIo(m_tp_io);
BOOL ret= my_AcceptEx(
(SOCKET)m_handle,
m_client_socket,
m_buffer,
0,
sizeof(sockaddr_storage) + 16,
sizeof(sockaddr_storage) + 16,
&bytes_received,
&m_overlapped);
DWORD last_error= ret? 0: WSAGetLastError();
if (last_error == WSAECONNRESET)
{
if (m_tp_io)
CancelThreadpoolIo(m_tp_io);
goto retry;
}
if (ret || last_error == ERROR_IO_PENDING || abort_loop)
return;
sql_print_error("my_AcceptEx failed, last error %u", last_error);
abort();
}
/* Create new socket connection.*/
void completion_callback(bool success)
{
if (!success)
{
/* my_AcceptEx() returned error */
closesocket(m_client_socket);
begin_accept();
return;
}
MYSQL_SOCKET s_client{m_client_socket};
MYSQL_SOCKET s_listen{(SOCKET)m_handle};
#ifdef HAVE_PSI_SOCKET_INTERFACE
/* Parse socket addresses buffer filled by AcceptEx(),
only needed for PSI instrumentation. */
sockaddr *local_addr, *remote_addr;
int local_addr_len, remote_addr_len;
my_GetAcceptExSockaddrs(m_buffer,
0, sizeof(sockaddr_storage) + 16, sizeof(sockaddr_storage) + 16,
&local_addr, &local_addr_len, &remote_addr, &remote_addr_len);
s_client.m_psi= PSI_SOCKET_CALL(init_socket)
(key_socket_client_connection, (const my_socket*)&s_listen.fd, remote_addr, remote_addr_len);
#endif
/* Start accepting new connection. After this point, do not use
any member data, they could be used by a different (threadpool) thread. */
begin_accept();
/* Some chores post-AcceptEx() that we need to create a normal socket.*/
if (setsockopt(s_client.fd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char *)&s_listen.fd, sizeof(s_listen.fd)))
{
if (!abort_loop)
{
sql_perror("setsockopt(SO_UPDATE_ACCEPT_CONTEXT) failed.");
abort();
}
}
/* Create a new connection.*/
handle_accepted_socket(s_client, s_listen);
}
~Socket_Listener()
{
if (m_client_socket != INVALID_SOCKET)
closesocket(m_client_socket);
}
/*
Retrieve the pointer to the Winsock extension functions
AcceptEx and GetAcceptExSockaddrs.
*/
static void init_winsock_extensions()
{
SOCKET s= mysql_socket_getfd(base_ip_sock);
if (s == INVALID_SOCKET)
s= mysql_socket_getfd(extra_ip_sock);
if (s == INVALID_SOCKET)
{
/* --skip-networking was used*/
return;
}
GUID guid_AcceptEx= WSAID_ACCEPTEX;
GUID guid_GetAcceptExSockaddrs= WSAID_GETACCEPTEXSOCKADDRS;
GUID *guids[]= { &guid_AcceptEx, &guid_GetAcceptExSockaddrs };
void *funcs[]= { &my_AcceptEx, &my_GetAcceptExSockaddrs };
DWORD bytes;
for (int i= 0; i < array_elements(guids); i++)
{
if (WSAIoctl(s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
guids[i], sizeof(GUID),
funcs[i], sizeof(void *),
&bytes, 0, 0) == -1)
{
sql_print_error("WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) failed");
unireg_abort(1);
}
}
}
};
/**
Pipe Listener.
Only event notification mode is implemented, no threadpool
*/
struct Pipe_Listener : public Listener
{
PTP_CALLBACK_ENVIRON m_tp_env;
Pipe_Listener():
Listener(INVALID_HANDLE_VALUE, CreateEvent(0, FALSE, FALSE, 0)),
m_tp_env(get_threadpool_win_callback_environ())
{
}
/*
Creates local named pipe instance \\.\pipe\$socket for named pipe connection.
*/
static HANDLE create_named_pipe()
{
static bool first_instance= true;
static char pipe_name[512];
DWORD open_mode= PIPE_ACCESS_DUPLEX |
FILE_FLAG_OVERLAPPED;
if (first_instance)
{
snprintf(pipe_name, sizeof(pipe_name), "\\\\.\\pipe\\%s", mysqld_unix_port);
open_mode |= FILE_FLAG_FIRST_PIPE_INSTANCE;
if (!ConvertStringSecurityDescriptorToSecurityDescriptorA(
"S:(ML;; NW;;; LW) D:(A;; FRFW;;; WD)",
1, &pipe_security.lpSecurityDescriptor, NULL))
{
sql_perror("Can't start server : Initialize security descriptor");
unireg_abort(1);
}
pipe_security.nLength= sizeof(SECURITY_ATTRIBUTES);
pipe_security.bInheritHandle= FALSE;
}
HANDLE pipe_handle= CreateNamedPipe(pipe_name,
open_mode,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
(int)global_system_variables.net_buffer_length,
(int)global_system_variables.net_buffer_length,
NMPWAIT_USE_DEFAULT_WAIT,
&pipe_security);
if (pipe_handle == INVALID_HANDLE_VALUE)
{
sql_perror("Create named pipe failed");
sql_print_error("Aborting\n");
exit(1);
}
first_instance= false;
return pipe_handle;
}
static void create_pipe_connection(HANDLE pipe)
{
CONNECT *connect;
if (!(connect= new CONNECT) || !(connect->vio= vio_new_win32pipe(pipe)))
{
CloseHandle(pipe);
delete connect;
statistic_increment(aborted_connects, &LOCK_status);
statistic_increment(connection_errors_internal, &LOCK_status);
return;
}
connect->host= my_localhost;
create_new_thread(connect);
}
/* Threadpool callback.*/
static void CALLBACK tp_create_pipe_connection(
PTP_CALLBACK_INSTANCE,void *Context)
{
tp_win_callback_prolog();
create_pipe_connection(Context);
}
void begin_accept()
{
m_handle= create_named_pipe();
BOOL connected= ConnectNamedPipe(m_handle, &m_overlapped);
if (connected)
{
/* Overlapped ConnectNamedPipe should return zero. */
sql_perror("Overlapped ConnectNamedPipe() already connected.");
abort();
}
DWORD last_error= GetLastError();
switch (last_error)
{
case ERROR_PIPE_CONNECTED:
/* Client is already connected, so signal an event.*/
{
/*
Cleanup overlapped (so that subsequent GetOverlappedResult()
does not show results of previous IO
*/
HANDLE e= m_overlapped.hEvent;
memset(&m_overlapped, 0, sizeof(m_overlapped));
m_overlapped.hEvent = e;
}
if (!SetEvent(m_overlapped.hEvent))
{
sql_perror("SetEvent() failed for connected pipe.");
abort();
}
break;
case ERROR_IO_PENDING:
break;
default:
sql_perror("ConnectNamedPipe() failed.");
abort();
break;
}
}
void completion_callback(bool success)
{
if (!success)
{
#ifdef DBUG_OFF
sql_print_warning("ConnectNamedPipe completed with %u", GetLastError());
#endif
CloseHandle(m_handle);
m_handle= INVALID_HANDLE_VALUE;
begin_accept();
return;
}
HANDLE pipe= m_handle;
begin_accept();
// If threadpool is on, create connection in threadpool thread
if (!m_tp_env || !TrySubmitThreadpoolCallback(tp_create_pipe_connection, pipe, m_tp_env))
create_pipe_connection(pipe);
}
~Pipe_Listener()
{
if (m_handle != INVALID_HANDLE_VALUE)
{
CloseHandle(m_handle);
}
}
static void cleanup()
{
LocalFree(pipe_security.lpSecurityDescriptor);
}
};
/**
Accept new client connections on Windows.
Since we deal with pipe and sockets, they cannot be put into a select/loop.
But we can use asynchronous IO, and WaitForMultipleObject() loop.
In addition, for slightly better performance, if we're using threadpool,
socket connections are accepted directly in the threadpool.
The mode of operation is therefore
1. There is WaitForMultipleObject() loop that waits for shutdown notification
(hEventShutdown),and possibly pipes and sockets(e.g if threadpool is not used)
This loop ends when shutdown notification is detected.
2. If threadpool is used, new socket connections are accepted there.
*/
#define MAX_WAIT_HANDLES 32
#define NUM_PIPE_LISTENERS 24
#define SHUTDOWN_IDX 0
#define LISTENER_START_IDX 1
void handle_connections_win()
{
Listener* all_listeners[MAX_WAIT_HANDLES]= {};
HANDLE wait_events[MAX_WAIT_HANDLES]= {};
int n_listeners= 0;
int n_waits= 0;
Socket_Listener::init_winsock_extensions();
/* Listen for TCP connections on "extra-port" (no threadpool).*/
if (extra_ip_sock.fd != INVALID_SOCKET)
all_listeners[n_listeners++]= new Socket_Listener(extra_ip_sock, 0);
/* Listen for named pipe connections */
if (mysqld_unix_port[0] && !opt_bootstrap && opt_enable_named_pipe)
{
/*
Use several listeners for pipe, to reduce ERROR_PIPE_BUSY on client side.
*/
for (int i= 0; i < NUM_PIPE_LISTENERS; i++)
all_listeners[n_listeners++]= new Pipe_Listener();
}
if (base_ip_sock.fd != INVALID_SOCKET)
{
/* Wait for TCP connections.*/
SetFileCompletionNotificationModes((HANDLE)base_ip_sock.fd, FILE_SKIP_SET_EVENT_ON_HANDLE);
all_listeners[n_listeners++]= new Socket_Listener(base_ip_sock, get_threadpool_win_callback_environ());
}
if (!n_listeners && !opt_bootstrap)
{
sql_print_error("Either TCP connections or named pipe connections must be enabled.");
unireg_abort(1);
}
wait_events[SHUTDOWN_IDX]= hEventShutdown;
n_waits = 1;
for (int i= 0; i < n_listeners; i++)
{
HANDLE wait_handle= all_listeners[i]->wait_handle();
if(wait_handle)
{
DBUG_ASSERT((i == 0) || (all_listeners[i-1]->wait_handle() != 0));
wait_events[n_waits++]= wait_handle;
}
all_listeners[i]->begin_accept();
}
for (;;)
{
DWORD idx = WaitForMultipleObjects(n_waits ,wait_events, FALSE, INFINITE);
DBUG_ASSERT((int)idx >= 0 && (int)idx < n_waits);
if (idx == SHUTDOWN_IDX)
break;
all_listeners[idx - LISTENER_START_IDX]->completion_callback();
}
/* Cleanup */
for (int i= 0; i < n_listeners; i++)
{
Listener *listener= all_listeners[i];
if (listener->wait_handle())
delete listener;
else
// Threadpool-bound listener will be deleted in threadpool
// Do not call destructor, because callback maybe running.
listener->cancel();
}
Pipe_Listener::cleanup();
}
\ No newline at end of file
/* Copyright (c) 2018 MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
/**
Handles incoming socket and pipe connections, on Windows.
Creates new (THD) connections..
*/
extern void handle_connections_win();
......@@ -17,6 +17,6 @@
#define INIT_INCLUDED
void unireg_init(ulong options);
ATTRIBUTE_NORETURN void unireg_end(void);
void unireg_end(void);
#endif /* INIT_INCLUDED */
......@@ -117,6 +117,10 @@
#include <poll.h>
#endif
#ifdef _WIN32
#include <handle_connections_win.h>
#endif
#include <my_service_manager.h>
#define mysqld_charset &my_charset_latin1
......@@ -319,16 +323,6 @@ MY_TIMER_INFO sys_timer_info;
/* static variables */
#ifdef HAVE_PSI_INTERFACE
#if defined(_WIN32) && !defined(EMBEDDED_LIBRARY)
static PSI_thread_key key_thread_handle_con_namedpipes;
static PSI_cond_key key_COND_handler_count;
static PSI_thread_key key_thread_handle_con_sockets;
#endif /* _WIN32 |&& !EMBEDDED_LIBRARY */
#ifdef _WIN32
static PSI_thread_key key_thread_handle_shutdown;
#endif
#ifdef HAVE_OPENSSL10
static PSI_rwlock_key key_rwlock_openssl;
#endif
......@@ -364,6 +358,7 @@ static char *character_set_filesystem_name;
static char *lc_messages;
static char *lc_time_names_name;
char *my_bind_addr_str;
int server_socket_ai_family;
static char *default_collation_name;
char *default_storage_engine, *default_tmp_storage_engine;
char *enforced_storage_engine=NULL;
......@@ -737,7 +732,6 @@ mysql_mutex_t LOCK_thread_count;
other threads.
It also protects these variables:
handler_count
in_bootstrap
select_thread_in_use
slave_init_thread_running
......@@ -1091,9 +1085,6 @@ PSI_cond_key key_COND_ack_receiver;
static PSI_cond_info all_server_conds[]=
{
#if defined(_WIN32) && !defined(EMBEDDED_LIBRARY)
{ &key_COND_handler_count, "COND_handler_count", PSI_FLAG_GLOBAL},
#endif /* _WIN32 && !EMBEDDED_LIBRARY */
#ifdef HAVE_MMAP
{ &key_PAGE_cond, "PAGE::cond", 0},
{ &key_COND_active, "TC_LOG_MMAP::COND_active", 0},
......@@ -1154,12 +1145,6 @@ PSI_thread_key key_thread_ack_receiver;
static PSI_thread_info all_server_threads[]=
{
#if (defined (_WIN32) && !defined (EMBEDDED_LIBRARY))
{ &key_thread_handle_con_namedpipes, "con_named_pipes", PSI_FLAG_GLOBAL},
{ &key_thread_handle_con_sockets, "con_sockets", PSI_FLAG_GLOBAL},
{ &key_thread_handle_shutdown, "shutdown", PSI_FLAG_GLOBAL},
#endif
{ &key_thread_bootstrap, "bootstrap", PSI_FLAG_GLOBAL},
{ &key_thread_delayed_insert, "delayed_insert", 0},
{ &key_thread_handle_manager, "manager", PSI_FLAG_GLOBAL},
......@@ -1402,10 +1387,10 @@ void Buffered_logs::print()
/** Logs reported before a logger is available. */
static Buffered_logs buffered_logs;
static MYSQL_SOCKET unix_sock, base_ip_sock, extra_ip_sock;
struct my_rnd_struct sql_rand; ///< used by sql_class.cc:THD::THD()
#ifndef EMBEDDED_LIBRARY
MYSQL_SOCKET unix_sock, base_ip_sock, extra_ip_sock;
/**
Error reporter that buffer log messages.
@param level log message level
......@@ -1461,27 +1446,18 @@ static pthread_t select_thread;
#undef getpid
#include <process.h>
static mysql_cond_t COND_handler_count;
static uint handler_count;
static bool start_mode=0, use_opt_args;
static int opt_argc;
static char **opt_argv;
#if !defined(EMBEDDED_LIBRARY)
static HANDLE hEventShutdown;
HANDLE hEventShutdown;
static char shutdown_event_name[40];
#include "nt_servc.h"
static NTService Service; ///< Service object for WinNT
#endif /* EMBEDDED_LIBRARY */
#endif /* __WIN__ */
#ifdef _WIN32
#include <sddl.h> /* ConvertStringSecurityDescriptorToSecurityDescriptor */
static char pipe_name[512];
static SECURITY_ATTRIBUTES saPipeSecurity;
static HANDLE hPipe = INVALID_HANDLE_VALUE;
#endif
#ifndef EMBEDDED_LIBRARY
bool mysqld_embedded=0;
#else
......@@ -1554,16 +1530,13 @@ extern "C" my_bool mysqld_get_one_option(int, const struct my_option *, char *);
static int init_thread_environment();
static char *get_relative_path(const char *path);
static int fix_paths(void);
#ifndef _WIN32
void handle_connections_sockets();
#ifdef _WIN32
pthread_handler_t handle_connections_sockets_thread(void *arg);
#endif
pthread_handler_t kill_server_thread(void *arg);
static void bootstrap(MYSQL_FILE *file);
static bool read_init_file(char *file_name);
#ifdef _WIN32
pthread_handler_t handle_connections_namedpipes(void *arg);
#endif
pthread_handler_t handle_slave(void *arg);
static void clean_up(bool print_message);
static int test_if_case_insensitive(const char *dir_name);
......@@ -1598,6 +1571,7 @@ static void close_connections(void)
kill_cached_threads++;
flush_thread_cache();
/* kill connection thread */
#if !defined(__WIN__)
DBUG_PRINT("quit", ("waiting for select thread: %lu",
......@@ -1647,30 +1621,7 @@ static void close_connections(void)
extra_ip_sock= MYSQL_INVALID_SOCKET;
}
}
#ifdef _WIN32
if (hPipe != INVALID_HANDLE_VALUE && opt_enable_named_pipe)
{
HANDLE temp;
DBUG_PRINT("quit", ("Closing named pipes") );
/* Create connection to the handle named pipe handler to break the loop */
if ((temp = CreateFile(pipe_name,
GENERIC_READ | GENERIC_WRITE,
0,
NULL,
OPEN_EXISTING,
0,
NULL )) != INVALID_HANDLE_VALUE)
{
WaitNamedPipe(pipe_name, 1000);
DWORD dwMode = PIPE_READMODE_BYTE | PIPE_WAIT;
SetNamedPipeHandleState(temp, &dwMode, NULL, NULL);
CancelIo(temp);
DisconnectNamedPipe(temp);
CloseHandle(temp);
}
}
#endif
#ifdef HAVE_SYS_UN_H
if (mysql_socket_getfd(unix_sock) != INVALID_SOCKET)
{
......@@ -1910,12 +1861,6 @@ void kill_mysql(THD *thd)
{
DBUG_PRINT("error",("Got error: %ld from SetEvent",GetLastError()));
}
/*
or:
HANDLE hEvent=OpenEvent(0, FALSE, "MySqlShutdown");
SetEvent(hEventShutdown);
CloseHandle(hEvent);
*/
}
#endif
#elif defined(HAVE_PTHREAD_KILL)
......@@ -1947,7 +1892,7 @@ void kill_mysql(THD *thd)
/**
Force server down. Kill all connections and threads and exit.
@param sig_ptr Signal number that caused kill_server to be called.
@param sig Signal number that caused kill_server to be called.
@note
A signal number of 0 mean that the function was not called
......@@ -1955,22 +1900,14 @@ void kill_mysql(THD *thd)
or stop, we just want to kill the server.
*/
#if !defined(__WIN__)
static void *kill_server(void *sig_ptr)
#define RETURN_FROM_KILL_SERVER return 0
#else
static void __cdecl kill_server(int sig_ptr)
#define RETURN_FROM_KILL_SERVER return
#endif
static void kill_server(int sig)
{
DBUG_ENTER("kill_server");
#ifndef EMBEDDED_LIBRARY
int sig=(int) (long) sig_ptr; // This is passed a int
// if there is a signal during the kill in progress, ignore the other
if (kill_in_progress) // Safety
{
DBUG_LEAVE;
RETURN_FROM_KILL_SERVER;
DBUG_VOID_RETURN;
}
kill_in_progress=TRUE;
abort_loop=1; // This should be set
......@@ -2004,20 +1941,9 @@ static void __cdecl kill_server(int sig_ptr)
else
unireg_end();
/* purecov: begin deadcode */
DBUG_LEAVE; // Must match DBUG_ENTER()
my_thread_end();
pthread_exit(0);
/* purecov: end */
RETURN_FROM_KILL_SERVER; // Avoid compiler warnings
#else /* EMBEDDED_LIBRARY*/
#endif /* EMBEDDED_LIBRARY*/
DBUG_LEAVE;
RETURN_FROM_KILL_SERVER;
#endif /* EMBEDDED_LIBRARY */
DBUG_VOID_RETURN;
}
......@@ -2026,11 +1952,9 @@ pthread_handler_t kill_server_thread(void *arg __attribute__((unused)))
{
my_thread_init(); // Initialize new thread
kill_server(0);
/* purecov: begin deadcode */
my_thread_end();
pthread_exit(0);
return 0;
/* purecov: end */
}
#endif
......@@ -2076,13 +2000,7 @@ static void clean_up_error_log_mutex()
void unireg_end(void)
{
clean_up(1);
my_thread_end();
sd_notify(0, "STATUS=MariaDB server is down");
#if defined(SIGNALS_DONT_BREAK_READ)
exit(0);
#else
pthread_exit(0); // Exit is in main thread
#endif
}
......@@ -2577,6 +2495,7 @@ static MYSQL_SOCKET activate_tcp_port(uint port)
}
else
{
server_socket_ai_family= a->ai_family;
sql_print_information("Server socket created on IP: '%s'.",
(const char *) ip_addr);
break;
......@@ -2703,44 +2622,6 @@ static void network_init(void)
extra_ip_sock= activate_tcp_port(mysqld_extra_port);
}
#ifdef _WIN32
/* create named pipe */
if (mysqld_unix_port[0] && !opt_bootstrap &&
opt_enable_named_pipe)
{
strxnmov(pipe_name, sizeof(pipe_name)-1, "\\\\.\\pipe\\",
mysqld_unix_port, NullS);
/*
Create a security descriptor for pipe.
- Use low integrity level, so that it is possible to connect
from any process.
- Give Everyone read/write access to pipe.
*/
if (!ConvertStringSecurityDescriptorToSecurityDescriptor(
"S:(ML;; NW;;; LW) D:(A;; FRFW;;; WD)",
SDDL_REVISION_1, &saPipeSecurity.lpSecurityDescriptor, NULL))
{
sql_perror("Can't start server : Initialize security descriptor");
unireg_abort(1);
}
saPipeSecurity.nLength = sizeof(SECURITY_ATTRIBUTES);
saPipeSecurity.bInheritHandle = FALSE;
if ((hPipe= CreateNamedPipe(pipe_name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
(int) global_system_variables.net_buffer_length,
(int) global_system_variables.net_buffer_length,
NMPWAIT_USE_DEFAULT_WAIT,
&saPipeSecurity)) == INVALID_HANDLE_VALUE)
{
sql_perror("Create named pipe failed");
unireg_abort(1);
}
}
#endif
#if defined(HAVE_SYS_UN_H)
/*
** Create the UNIX socket
......@@ -3559,7 +3440,7 @@ pthread_handler_t signal_hand(void *arg __attribute__((unused)))
sql_print_error("Can't create thread to kill server (errno= %d)",
error);
#else
kill_server((void*) sig); // MIT THREAD has a alarm thread
kill_server(sig); // MIT THREAD has a alarm thread
#endif
}
break;
......@@ -3675,23 +3556,6 @@ void *my_str_malloc_mysqld(size_t size)
}
#ifdef __WIN__
pthread_handler_t handle_shutdown(void *arg)
{
MSG msg;
my_thread_init();
/* this call should create the message queue for this thread */
PeekMessage(&msg, NULL, 1, 65534,PM_NOREMOVE);
#if !defined(EMBEDDED_LIBRARY)
if (WaitForSingleObject(hEventShutdown,INFINITE)==WAIT_OBJECT_0)
#endif /* EMBEDDED_LIBRARY */
kill_server(MYSQL_KILL_SIGNAL);
return 0;
}
#endif
#include <mysqld_default_groups.h>
#if defined(__WIN__) && !defined(EMBEDDED_LIBRARY)
......@@ -5590,79 +5454,14 @@ static int init_server_components()
#ifndef EMBEDDED_LIBRARY
#ifdef _WIN32
static void create_shutdown_thread()
static void create_shutdown_event()
{
#ifdef __WIN__
hEventShutdown=CreateEvent(0, FALSE, FALSE, shutdown_event_name);
pthread_t hThread;
int error;
if (unlikely((error= mysql_thread_create(key_thread_handle_shutdown,
&hThread, &connection_attrib,
handle_shutdown, 0))))
sql_print_warning("Can't create thread to handle shutdown requests"
" (errno= %d)", error);
// On "Stop Service" we have to do regular shutdown
Service.SetShutdownEvent(hEventShutdown);
#endif /* __WIN__ */
}
static void handle_connections_methods()
{
pthread_t hThread;
int error;
DBUG_ENTER("handle_connections_methods");
if (hPipe == INVALID_HANDLE_VALUE && opt_disable_networking)
{
sql_print_error("TCP/IP, or --named-pipe should be configured on Windows");
unireg_abort(1); // Will not return
}
mysql_mutex_lock(&LOCK_start_thread);
mysql_cond_init(key_COND_handler_count, &COND_handler_count, NULL);
handler_count=0;
if (hPipe != INVALID_HANDLE_VALUE)
{
handler_count++;
if ((error= mysql_thread_create(key_thread_handle_con_namedpipes,
&hThread, &connection_attrib,
handle_connections_namedpipes, 0)))
{
sql_print_warning("Can't create thread to handle named pipes"
" (errno= %d)", error);
handler_count--;
}
}
if (have_tcpip && !opt_disable_networking)
{
handler_count++;
if ((error= mysql_thread_create(key_thread_handle_con_sockets,
&hThread, &connection_attrib,
handle_connections_sockets_thread, 0)))
{
sql_print_warning("Can't create thread to handle TCP/IP",
" (errno= %d)", error);
handler_count--;
}
}
while (handler_count > 0)
mysql_cond_wait(&COND_handler_count, &LOCK_start_thread);
mysql_mutex_unlock(&LOCK_start_thread);
DBUG_VOID_RETURN;
}
void decrement_handler_count()
{
mysql_mutex_lock(&LOCK_start_thread);
if (--handler_count == 0)
mysql_cond_signal(&COND_handler_count);
mysql_mutex_unlock(&LOCK_start_thread);
my_thread_end();
}
#else /* WIN32*/
#define create_shutdown_thread()
#define decrement_handler_count()
#else /*_WIN32*/
#define create_shutdown_event()
#endif
#endif /* EMBEDDED_LIBRARY */
......@@ -6067,7 +5866,7 @@ int mysqld_main(int argc, char **argv)
}
}
create_shutdown_thread();
create_shutdown_event();
start_handle_manager();
/* Copy default global rpl_filter to global_rpl_filter */
......@@ -6137,7 +5936,8 @@ int mysqld_main(int argc, char **argv)
start_memory_used= global_status_var.global_memory_used;
#ifdef _WIN32
handle_connections_methods();
handle_connections_win();
kill_server(0);
#else
handle_connections_sockets();
#endif /* _WIN32 */
......@@ -6549,7 +6349,7 @@ void create_thread_to_handle_connection(CONNECT *connect)
@param[in,out] thd Thread handle of future thread.
*/
static void create_new_thread(CONNECT *connect)
void create_new_thread(CONNECT *connect)
{
DBUG_ENTER("create_new_thread");
......@@ -6617,18 +6417,107 @@ inline void kill_broken_server()
#ifndef EMBEDDED_LIBRARY
void handle_accepted_socket(MYSQL_SOCKET new_sock, MYSQL_SOCKET sock)
{
CONNECT *connect;
bool is_unix_sock;
#ifdef FD_CLOEXEC
(void) fcntl(mysql_socket_getfd(new_sock), F_SETFD, FD_CLOEXEC);
#endif
#ifdef HAVE_LIBWRAP
{
if (mysql_socket_getfd(sock) == mysql_socket_getfd(base_ip_sock) ||
mysql_socket_getfd(sock) == mysql_socket_getfd(extra_ip_sock))
{
struct request_info req;
signal(SIGCHLD, SIG_DFL);
request_init(&req, RQ_DAEMON, libwrapName, RQ_FILE,
mysql_socket_getfd(new_sock), NULL);
my_fromhost(&req);
if (!my_hosts_access(&req))
{
/*
This may be stupid but refuse() includes an exit(0)
which we surely don't want...
clean_exit() - same stupid thing ...
*/
syslog(deny_severity, "refused connect from %s",
my_eval_client(&req));
/*
C++ sucks (the gibberish in front just translates the supplied
sink function pointer in the req structure from a void (*sink)();
to a void(*sink)(int) if you omit the cast, the C++ compiler
will cry...
*/
if (req.sink)
((void(*)(int))req.sink)(req.fd);
(void)mysql_socket_shutdown(new_sock, SHUT_RDWR);
(void)mysql_socket_close(new_sock);
/*
The connection was refused by TCP wrappers.
There are no details (by client IP) available to update the
host_cache.
*/
statistic_increment(connection_errors_tcpwrap, &LOCK_status);
return;
}
}
}
#endif /* HAVE_LIBWRAP */
DBUG_PRINT("info", ("Creating CONNECT for new connection"));
if ((connect= new CONNECT()))
{
is_unix_sock= (mysql_socket_getfd(sock) ==
mysql_socket_getfd(unix_sock));
if (!(connect->vio=
mysql_socket_vio_new(new_sock,
is_unix_sock ? VIO_TYPE_SOCKET :
VIO_TYPE_TCPIP,
is_unix_sock ? VIO_LOCALHOST : 0)))
{
delete connect;
connect= 0; // Error handling below
}
}
if (!connect)
{
/* Connect failure */
(void)mysql_socket_close(new_sock);
statistic_increment(aborted_connects, &LOCK_status);
statistic_increment(connection_errors_internal, &LOCK_status);
return;
}
if (is_unix_sock)
connect->host= my_localhost;
if (mysql_socket_getfd(sock) == mysql_socket_getfd(extra_ip_sock))
{
connect->extra_port= 1;
connect->scheduler= extra_thread_scheduler;
}
create_new_thread(connect);
}
#ifndef _WIN32
void handle_connections_sockets()
{
MYSQL_SOCKET sock= mysql_socket_invalid();
MYSQL_SOCKET new_sock= mysql_socket_invalid();
uint error_count=0;
CONNECT *connect;
struct sockaddr_storage cAddr;
int ip_flags __attribute__((unused))=0;
int socket_flags __attribute__((unused))= 0;
int extra_ip_flags __attribute__((unused))=0;
int flags=0,retval;
bool is_unix_sock;
#ifdef HAVE_POLL
int socket_count= 0;
struct pollfd fds[3]; // for ip_sock, unix_sock and extra_ip_sock
......@@ -6760,10 +6649,7 @@ void handle_connections_sockets()
}
#endif
}
#if !defined(NO_FCNTL_NONBLOCK)
if (!(test_flags & TEST_BLOCKING))
fcntl(mysql_socket_getfd(sock), F_SETFL, flags);
#endif
if (mysql_socket_getfd(new_sock) == INVALID_SOCKET)
{
/*
......@@ -6779,199 +6665,18 @@ void handle_connections_sockets()
sleep(1); // Give other threads some time
continue;
}
#ifdef FD_CLOEXEC
(void) fcntl(mysql_socket_getfd(new_sock), F_SETFD, FD_CLOEXEC);
#if !defined(NO_FCNTL_NONBLOCK)
if (!(test_flags & TEST_BLOCKING))
fcntl(mysql_socket_getfd(sock), F_SETFL, flags);
#endif
#ifdef HAVE_LIBWRAP
{
if (mysql_socket_getfd(sock) == mysql_socket_getfd(base_ip_sock) ||
mysql_socket_getfd(sock) == mysql_socket_getfd(extra_ip_sock))
{
struct request_info req;
signal(SIGCHLD, SIG_DFL);
request_init(&req, RQ_DAEMON, libwrapName, RQ_FILE,
mysql_socket_getfd(new_sock), NULL);
my_fromhost(&req);
if (!my_hosts_access(&req))
{
/*
This may be stupid but refuse() includes an exit(0)
which we surely don't want...
clean_exit() - same stupid thing ...
*/
syslog(deny_severity, "refused connect from %s",
my_eval_client(&req));
/*
C++ sucks (the gibberish in front just translates the supplied
sink function pointer in the req structure from a void (*sink)();
to a void(*sink)(int) if you omit the cast, the C++ compiler
will cry...
*/
if (req.sink)
((void (*)(int))req.sink)(req.fd);
(void) mysql_socket_shutdown(new_sock, SHUT_RDWR);
(void) mysql_socket_close(new_sock);
/*
The connection was refused by TCP wrappers.
There are no details (by client IP) available to update the
host_cache.
*/
statistic_increment(connection_errors_tcpwrap, &LOCK_status);
continue;
}
}
}
#endif /* HAVE_LIBWRAP */
DBUG_PRINT("info", ("Creating CONNECT for new connection"));
if ((connect= new CONNECT()))
{
is_unix_sock= (mysql_socket_getfd(sock) ==
mysql_socket_getfd(unix_sock));
if (!(connect->vio=
mysql_socket_vio_new(new_sock,
is_unix_sock ? VIO_TYPE_SOCKET :
VIO_TYPE_TCPIP,
is_unix_sock ? VIO_LOCALHOST: 0)))
{
delete connect;
connect= 0; // Error handling below
}
}
if (!connect)
{
/* Connect failure */
(void) mysql_socket_shutdown(new_sock, SHUT_RDWR);
(void) mysql_socket_close(new_sock);
statistic_increment(aborted_connects,&LOCK_status);
statistic_increment(connection_errors_internal, &LOCK_status);
continue;
}
if (is_unix_sock)
connect->host= my_localhost;
if (mysql_socket_getfd(sock) == mysql_socket_getfd(extra_ip_sock))
{
connect->extra_port= 1;
connect->scheduler= extra_thread_scheduler;
}
create_new_thread(connect);
handle_accepted_socket(new_sock, sock);
}
sd_notify(0, "STOPPING=1\n"
"STATUS=Shutdown in progress\n");
DBUG_VOID_RETURN;
}
#ifdef _WIN32
pthread_handler_t handle_connections_sockets_thread(void *arg)
{
my_thread_init();
handle_connections_sockets();
decrement_handler_count();
return 0;
}
pthread_handler_t handle_connections_namedpipes(void *arg)
{
HANDLE hConnectedPipe;
OVERLAPPED connectOverlapped= {0};
my_thread_init();
DBUG_ENTER("handle_connections_namedpipes");
connectOverlapped.hEvent= CreateEvent(NULL, TRUE, FALSE, NULL);
if (!connectOverlapped.hEvent)
{
sql_print_error("Can't create event, last error=%u", GetLastError());
unireg_abort(1);
}
DBUG_PRINT("general",("Waiting for named pipe connections."));
while (!abort_loop)
{
/* wait for named pipe connection */
BOOL fConnected= ConnectNamedPipe(hPipe, &connectOverlapped);
if (!fConnected && (GetLastError() == ERROR_IO_PENDING))
{
/*
ERROR_IO_PENDING says async IO has started but not yet finished.
GetOverlappedResult will wait for completion.
*/
DWORD bytes;
fConnected= GetOverlappedResult(hPipe, &connectOverlapped,&bytes, TRUE);
}
if (abort_loop)
break;
if (!fConnected)
fConnected = GetLastError() == ERROR_PIPE_CONNECTED;
if (!fConnected)
{
CloseHandle(hPipe);
if ((hPipe= CreateNamedPipe(pipe_name,
PIPE_ACCESS_DUPLEX |
FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE |
PIPE_READMODE_BYTE |
PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
(int) global_system_variables.
net_buffer_length,
(int) global_system_variables.
net_buffer_length,
NMPWAIT_USE_DEFAULT_WAIT,
&saPipeSecurity)) ==
INVALID_HANDLE_VALUE)
{
sql_perror("Can't create new named pipe!");
break; // Abort
}
}
hConnectedPipe = hPipe;
/* create new pipe for new connection */
if ((hPipe = CreateNamedPipe(pipe_name,
PIPE_ACCESS_DUPLEX |
FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE |
PIPE_READMODE_BYTE |
PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
(int) global_system_variables.net_buffer_length,
(int) global_system_variables.net_buffer_length,
NMPWAIT_USE_DEFAULT_WAIT,
&saPipeSecurity)) ==
INVALID_HANDLE_VALUE)
{
sql_perror("Can't create new named pipe!");
hPipe=hConnectedPipe;
continue; // We have to try again
}
CONNECT *connect;
if (!(connect= new CONNECT) ||
!(connect->vio= vio_new_win32pipe(hConnectedPipe)))
{
DisconnectNamedPipe(hConnectedPipe);
CloseHandle(hConnectedPipe);
delete connect;
statistic_increment(aborted_connects,&LOCK_status);
statistic_increment(connection_errors_internal, &LOCK_status);
continue;
}
connect->host= my_localhost;
create_new_thread(connect);
}
LocalFree(saPipeSecurity.lpSecurityDescriptor);
CloseHandle(connectOverlapped.hEvent);
DBUG_LEAVE;
decrement_handler_count();
return 0;
}
#endif /* _WIN32 */
#endif /* _WIN32*/
#endif /* EMBEDDED_LIBRARY */
......@@ -8661,7 +8366,9 @@ static int mysql_init_variables(void)
character_set_filesystem= &my_charset_bin;
opt_specialflag= SPECIAL_ENGLISH;
#ifndef EMBEDDED_LIBRARY
unix_sock= base_ip_sock= extra_ip_sock= MYSQL_INVALID_SOCKET;
#endif
mysql_home_ptr= mysql_home;
log_error_file_ptr= log_error_file;
protocol_version= PROTOCOL_VERSION;
......
......@@ -24,6 +24,7 @@
#include "mysql_com.h" /* SERVER_VERSION_LENGTH */
#include "my_atomic.h"
#include "mysql/psi/mysql_file.h" /* MYSQL_FILE */
#include "mysql/psi/mysql_socket.h" /* MYSQL_SOCKET */
#include "sql_list.h" /* I_List */
#include "sql_cmd.h"
#include <my_rnd.h>
......@@ -92,6 +93,8 @@ void refresh_status(THD *thd);
bool is_secure_file_path(char *path);
void dec_connection_count(scheduler_functions *scheduler);
extern void init_net_server_extension(THD *thd);
extern void handle_accepted_socket(MYSQL_SOCKET new_sock, MYSQL_SOCKET sock);
extern void create_new_thread(CONNECT *connect);
extern "C" MYSQL_PLUGIN_IMPORT CHARSET_INFO *system_charset_info;
extern MYSQL_PLUGIN_IMPORT CHARSET_INFO *files_charset_info ;
......@@ -152,6 +155,7 @@ extern ulong opt_replicate_events_marked_for_skip;
extern char *default_tz_name;
extern Time_zone *default_tz;
extern char *my_bind_addr_str;
extern int server_socket_ai_family;
extern char *default_storage_engine, *default_tmp_storage_engine;
extern char *enforced_storage_engine;
extern char *gtid_pos_auto_engines;
......@@ -759,7 +763,7 @@ enum enum_query_type
/* query_id */
extern query_id_t global_query_id;
ATTRIBUTE_NORETURN void unireg_end(void);
void unireg_end(void);
/* increment query_id and return it. */
inline __attribute__((warn_unused_result)) query_id_t next_query_id()
......
......@@ -70,6 +70,11 @@ static DWORD fls;
static bool skip_completion_port_on_success = false;
PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ()
{
return pool? &callback_environ: 0;
}
/*
Threadpool callbacks.
......@@ -134,7 +139,15 @@ struct TP_connection *new_TP_connection(CONNECT *connect)
void TP_pool_win::add(TP_connection *c)
{
if(FlsGetValue(fls))
{
/* Inside threadpool(), execute callback directly. */
tp_callback(c);
}
else
{
SubmitThreadpoolWork(((TP_connection_win *)c)->work);
}
}
......@@ -288,14 +301,13 @@ TP_connection_win::~TP_connection_win()
void TP_connection_win::wait_begin(int type)
{
/*
Signal to the threadpool whenever callback can run long. Currently, binlog
waits are a good candidate, its waits are really long
*/
if (type == THD_WAIT_BINLOG)
{
if (!long_callback)
if (!long_callback && callback_instance)
{
CallbackMayRunLong(callback_instance);
long_callback= true;
......@@ -312,8 +324,7 @@ void TP_connection_win::wait_end()
This function should be called first whenever a callback is invoked in the
threadpool, does my_thread_init() if not yet done
*/
extern ulong thread_created;
static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance)
void tp_win_callback_prolog()
{
if (FlsGetValue(fls) == NULL)
{
......@@ -323,6 +334,12 @@ static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance)
InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads);
my_thread_init();
}
}
extern ulong thread_created;
static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance)
{
tp_win_callback_prolog();
TP_connection_win *c = (TP_connection_win *)context;
c->callback_instance = instance;
c->long_callback = false;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment