Commit a163ae30 authored by unknown's avatar unknown

Replace the approach using Foo_thread_args + Foo_thread and manually

spawned threads with a reusable class Thread.

This is the second idea implemented in the Alik's patch for
BUG#22306: STOP INSTANCE can not be applied for instances in Crashed,
Failed and Abandoned.
Commiting separately to ease review process. 


server-tools/instance-manager/commands.cc:
  Remove an unused header.
server-tools/instance-manager/guardian.cc:
  Use Thread framework instead of manually spawning the Guardian thread.
  Tidy up.
server-tools/instance-manager/guardian.h:
  Use Thread framework instead of manually spawning the Guardian thread.
server-tools/instance-manager/instance.cc:
  Use Thread framework instead of manually spawning the instance
  monitoring thread.
server-tools/instance-manager/listener.cc:
  Use Thread framework instead of manually spawning the 
  mysql connection thread.
server-tools/instance-manager/listener.h:
  Use Thread framework instead of manually spawning the 
  mysql connection thread.
  Rename Listener_thread to Listener for brevity.
server-tools/instance-manager/manager.cc:
  Change references to pointers, as per the coding style.
  Use Thread framework instead of manually spawning threads.
server-tools/instance-manager/mysql_connection.cc:
  Get rid of Mysql_connection_thread_args. Use class Thread framework
  instead. Rename Mysql_connection_thread to Mysql_connection for brevity.
server-tools/instance-manager/mysql_connection.h:
  Get rid of Mysql_connection_thread_args. Use class Thread framework
  instead. Rename Mysql_connection_thread to Mysql_connection for brevity.
server-tools/instance-manager/priv.cc:
  Move set_stacksize_and_create_thread to thread_registry.cc and make it
  static: it is not used anywhere else now.
server-tools/instance-manager/priv.h:
  No public set_stacksize_n_create_thread
server-tools/instance-manager/thread_registry.cc:
  Implement a base Thread class to be used for all Instance Manager
  threads.
server-tools/instance-manager/thread_registry.h:
  Implement a base Thread class to be used for all Instance Manager
  threads.
parent 211b2bc9
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
#include "guardian.h" #include "guardian.h"
#include "instance_map.h" #include "instance_map.h"
#include "log.h" #include "log.h"
#include "manager.h"
#include "messages.h" #include "messages.h"
#include "mysqld_error.h" #include "mysqld_error.h"
#include "mysql_manager_error.h" #include "mysql_manager_error.h"
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#endif #endif
#include "guardian.h" #include "guardian.h"
#include <string.h> #include <string.h>
#include <sys/types.h> #include <sys/types.h>
#include <signal.h> #include <signal.h>
...@@ -30,15 +29,6 @@ ...@@ -30,15 +29,6 @@
#include "log.h" #include "log.h"
#include "mysql_manager_error.h" #include "mysql_manager_error.h"
pthread_handler_t guardian_thread_func(void *arg)
{
Guardian *guardian= (Guardian *) arg;
guardian->run();
return 0;
}
const char * const char *
Guardian::get_instance_state_name(enum_instance_state state) Guardian::get_instance_state_name(enum_instance_state state)
{ {
...@@ -68,18 +58,19 @@ Guardian::get_instance_state_name(enum_instance_state state) ...@@ -68,18 +58,19 @@ Guardian::get_instance_state_name(enum_instance_state state)
return NULL; /* just to ignore compiler warning. */ return NULL; /* just to ignore compiler warning. */
} }
/* {{{ Constructor & destructor. */
Guardian::Guardian(Thread_registry &thread_registry_arg, Guardian::Guardian(Thread_registry *thread_registry_arg,
Instance_map *instance_map_arg, Instance_map *instance_map_arg,
uint monitoring_interval_arg) : uint monitoring_interval_arg)
Guardian_args(thread_registry_arg, instance_map_arg, :monitoring_interval(monitoring_interval_arg),
monitoring_interval_arg), shutdown_requested(FALSE),
thread_info(pthread_self(), TRUE), guarded_instances(0) stopped(FALSE),
thread_registry(thread_registry_arg),
instance_map(instance_map_arg)
{ {
pthread_mutex_init(&LOCK_guardian, 0); pthread_mutex_init(&LOCK_guardian, 0);
pthread_cond_init(&COND_guardian, 0); pthread_cond_init(&COND_guardian, 0);
shutdown_requested= FALSE;
stopped= FALSE;
init_alloc_root(&alloc, MEM_ROOT_BLOCK_SIZE, 0); init_alloc_root(&alloc, MEM_ROOT_BLOCK_SIZE, 0);
} }
...@@ -94,6 +85,8 @@ Guardian::~Guardian() ...@@ -94,6 +85,8 @@ Guardian::~Guardian()
pthread_cond_destroy(&COND_guardian); pthread_cond_destroy(&COND_guardian);
} }
/* }}} */
void Guardian::request_shutdown() void Guardian::request_shutdown()
{ {
...@@ -117,7 +110,7 @@ void Guardian::process_instance(Instance *instance, ...@@ -117,7 +110,7 @@ void Guardian::process_instance(Instance *instance,
if (current_node->state == STOPPING) if (current_node->state == STOPPING)
{ {
/* this brach is executed during shutdown */ /* this branch is executed during shutdown */
if (instance->options.shutdown_delay) if (instance->options.shutdown_delay)
{ {
/* /*
...@@ -235,7 +228,7 @@ void Guardian::process_instance(Instance *instance, ...@@ -235,7 +228,7 @@ void Guardian::process_instance(Instance *instance,
/* /*
Run guardian thread Run guardian thread
SYNOPSYS SYNOPSIS
run() run()
DESCRIPTION DESCRIPTION
...@@ -252,9 +245,8 @@ void Guardian::run() ...@@ -252,9 +245,8 @@ void Guardian::run()
log_info("Guardian: started."); log_info("Guardian: started.");
thread_registry.register_thread(&thread_info); thread_registry->register_thread(&thread_info);
my_thread_init();
pthread_mutex_lock(&LOCK_guardian); pthread_mutex_lock(&LOCK_guardian);
/* loop, until all instances were shut down at the end */ /* loop, until all instances were shut down at the end */
...@@ -275,7 +267,7 @@ void Guardian::run() ...@@ -275,7 +267,7 @@ void Guardian::run()
/* check the loop predicate before sleeping */ /* check the loop predicate before sleeping */
if (!(shutdown_requested && (!(guarded_instances)))) if (!(shutdown_requested && (!(guarded_instances))))
thread_registry.cond_timedwait(&thread_info, &COND_guardian, thread_registry->cond_timedwait(&thread_info, &COND_guardian,
&LOCK_guardian, &timeout); &LOCK_guardian, &timeout);
} }
...@@ -284,9 +276,8 @@ void Guardian::run() ...@@ -284,9 +276,8 @@ void Guardian::run()
stopped= TRUE; stopped= TRUE;
pthread_mutex_unlock(&LOCK_guardian); pthread_mutex_unlock(&LOCK_guardian);
/* now, when the Guardian is stopped we can stop the IM */ /* now, when the Guardian is stopped we can stop the IM */
thread_registry.unregister_thread(&thread_info); thread_registry->unregister_thread(&thread_info);
thread_registry.request_shutdown(); thread_registry->request_shutdown();
my_thread_end();
log_info("Guardian: finished."); log_info("Guardian: finished.");
} }
...@@ -306,7 +297,7 @@ int Guardian::is_stopped() ...@@ -306,7 +297,7 @@ int Guardian::is_stopped()
Initialize the list of guarded instances: loop through the Instance_map and Initialize the list of guarded instances: loop through the Instance_map and
add all of the instances, which don't have 'nonguarded' option specified. add all of the instances, which don't have 'nonguarded' option specified.
SYNOPSYS SYNOPSIS
Guardian::init() Guardian::init()
NOTE: The operation should be invoked with the following locks acquired: NOTE: The operation should be invoked with the following locks acquired:
...@@ -315,7 +306,7 @@ int Guardian::is_stopped() ...@@ -315,7 +306,7 @@ int Guardian::is_stopped()
RETURN RETURN
0 - ok 0 - ok
1 - error occured 1 - error occurred
*/ */
int Guardian::init() int Guardian::init()
...@@ -344,7 +335,7 @@ int Guardian::init() ...@@ -344,7 +335,7 @@ int Guardian::init()
/* /*
Add instance to the Guardian list Add instance to the Guardian list
SYNOPSYS SYNOPSIS
guard() guard()
instance the instance to be guarded instance the instance to be guarded
nolock whether we prefer do not lock Guardian here, nolock whether we prefer do not lock Guardian here,
...@@ -357,7 +348,7 @@ int Guardian::init() ...@@ -357,7 +348,7 @@ int Guardian::init()
RETURN RETURN
0 - ok 0 - ok
1 - error occured 1 - error occurred
*/ */
int Guardian::guard(Instance *instance, bool nolock) int Guardian::guard(Instance *instance, bool nolock)
...@@ -418,7 +409,7 @@ int Guardian::stop_guard(Instance *instance) ...@@ -418,7 +409,7 @@ int Guardian::stop_guard(Instance *instance)
An internal method which is called at shutdown to unregister instances and An internal method which is called at shutdown to unregister instances and
attempt to stop them if requested. attempt to stop them if requested.
SYNOPSYS SYNOPSIS
stop_instances() stop_instances()
DESCRIPTION DESCRIPTION
...@@ -431,7 +422,7 @@ int Guardian::stop_guard(Instance *instance) ...@@ -431,7 +422,7 @@ int Guardian::stop_guard(Instance *instance)
RETURN RETURN
0 - ok 0 - ok
1 - error occured 1 - error occurred
*/ */
int Guardian::stop_instances() int Guardian::stop_instances()
......
...@@ -16,11 +16,10 @@ ...@@ -16,11 +16,10 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h>
#include <my_sys.h>
#include <my_list.h>
#include "thread_registry.h" #include "thread_registry.h"
#include <my_sys.h>
#include <my_list.h>
#if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE) #if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE)
#pragma interface #pragma interface
...@@ -31,30 +30,12 @@ class Instance_map; ...@@ -31,30 +30,12 @@ class Instance_map;
class Thread_registry; class Thread_registry;
struct GUARD_NODE; struct GUARD_NODE;
pthread_handler_t guardian_thread_func(void *arg); /**
struct Guardian_args
{
Thread_registry &thread_registry;
Instance_map *instance_map;
int monitoring_interval;
Guardian_args(Thread_registry &thread_registry_arg,
Instance_map *instance_map_arg,
uint monitoring_interval_arg) :
thread_registry(thread_registry_arg),
instance_map(instance_map_arg),
monitoring_interval(monitoring_interval_arg)
{}
};
/*
The guardian thread is responsible for monitoring and restarting of guarded The guardian thread is responsible for monitoring and restarting of guarded
instances. instances.
*/ */
class Guardian: public Guardian_args class Guardian: public Thread
{ {
public: public:
/* states of an instance */ /* states of an instance */
...@@ -82,12 +63,10 @@ class Guardian: public Guardian_args ...@@ -82,12 +63,10 @@ class Guardian: public Guardian_args
/* Return client state name. */ /* Return client state name. */
static const char *get_instance_state_name(enum_instance_state state); static const char *get_instance_state_name(enum_instance_state state);
Guardian(Thread_registry &thread_registry_arg, Guardian(Thread_registry *thread_registry_arg,
Instance_map *instance_map_arg, Instance_map *instance_map_arg,
uint monitoring_interval_arg); uint monitoring_interval_arg);
~Guardian(); virtual ~Guardian();
/* Main funtion of the thread */
void run();
/* Initialize or refresh the list of guarded instances */ /* Initialize or refresh the list of guarded instances */
int init(); int init();
/* Request guardian shutdown. Stop instances if needed */ /* Request guardian shutdown. Stop instances if needed */
...@@ -117,6 +96,9 @@ class Guardian: public Guardian_args ...@@ -117,6 +96,9 @@ class Guardian: public Guardian_args
a valid list node. a valid list node.
*/ */
inline enum_instance_state get_instance_state(LIST *instance_node); inline enum_instance_state get_instance_state(LIST *instance_node);
protected:
/* Main funtion of the thread */
virtual void run();
public: public:
pthread_cond_t COND_guardian; pthread_cond_t COND_guardian;
...@@ -133,6 +115,9 @@ class Guardian: public Guardian_args ...@@ -133,6 +115,9 @@ class Guardian: public Guardian_args
private: private:
pthread_mutex_t LOCK_guardian; pthread_mutex_t LOCK_guardian;
Thread_info thread_info; Thread_info thread_info;
int monitoring_interval;
Thread_registry *thread_registry;
Instance_map *instance_map;
LIST *guarded_instances; LIST *guarded_instances;
MEM_ROOT alloc; MEM_ROOT alloc;
/* this variable is set to TRUE when we want to stop Guardian thread */ /* this variable is set to TRUE when we want to stop Guardian thread */
......
...@@ -44,9 +44,6 @@ static const char * const INSTANCE_NAME_PREFIX= Instance::DFLT_INSTANCE_NAME.str ...@@ -44,9 +44,6 @@ static const char * const INSTANCE_NAME_PREFIX= Instance::DFLT_INSTANCE_NAME.str
static const int INSTANCE_NAME_PREFIX_LEN= Instance::DFLT_INSTANCE_NAME.length; static const int INSTANCE_NAME_PREFIX_LEN= Instance::DFLT_INSTANCE_NAME.length;
static void start_and_monitor_instance(Instance_options *old_instance_options,
Instance_map *instance_map,
Thread_registry *thread_registry);
#ifndef __WIN__ #ifndef __WIN__
typedef pid_t My_process_info; typedef pid_t My_process_info;
...@@ -61,13 +58,24 @@ typedef PROCESS_INFORMATION My_process_info; ...@@ -61,13 +58,24 @@ typedef PROCESS_INFORMATION My_process_info;
to do it in a portable way. to do it in a portable way.
*/ */
pthread_handler_t proxy(void *arg) class Instance_monitor: public Thread
{
public:
Instance_monitor(Instance *instance_arg) :instance(instance_arg) {}
protected:
virtual void run();
void start_and_monitor_instance(Instance_options *old_instance_options,
Instance_map *instance_map,
Thread_registry *thread_registry);
private:
Instance *instance;
};
void Instance_monitor::run()
{ {
Instance *instance= (Instance *) arg; start_and_monitor_instance(&instance->options, instance->get_map(),
start_and_monitor_instance(&instance->options,
instance->get_map(),
&instance->thread_registry); &instance->thread_registry);
return 0; delete this;
} }
/* /*
...@@ -242,14 +250,16 @@ static int start_process(Instance_options *instance_options, ...@@ -242,14 +250,16 @@ static int start_process(Instance_options *instance_options,
Function returns no value Function returns no value
*/ */
static void start_and_monitor_instance(Instance_options *old_instance_options, void
Instance_monitor::
start_and_monitor_instance(Instance_options *old_instance_options,
Instance_map *instance_map, Instance_map *instance_map,
Thread_registry *thread_registry) Thread_registry *thread_registry)
{ {
Instance_name instance_name(&old_instance_options->instance_name); Instance_name instance_name(&old_instance_options->instance_name);
Instance *current_instance; Instance *current_instance;
My_process_info process_info; My_process_info process_info;
Thread_info thread_info(pthread_self(), FALSE); Thread_info thread_info;
log_info("Monitoring thread (instance: '%s'): started.", log_info("Monitoring thread (instance: '%s'): started.",
(const char *) instance_name.get_c_str()); (const char *) instance_name.get_c_str());
...@@ -258,12 +268,10 @@ static void start_and_monitor_instance(Instance_options *old_instance_options, ...@@ -258,12 +268,10 @@ static void start_and_monitor_instance(Instance_options *old_instance_options,
{ {
/* /*
Register thread in Thread_registry to wait for it to stop on shutdown Register thread in Thread_registry to wait for it to stop on shutdown
only if instance is nuarded. If instance is guarded, the thread will not only if instance is guarded. If instance is guarded, the thread will not
finish, because nonguarded instances are not stopped on shutdown. finish, because nonguarded instances are not stopped on shutdown.
*/ */
thread_registry->register_thread(&thread_info, FALSE);
thread_registry->register_thread(&thread_info);
my_thread_init();
} }
/* /*
...@@ -302,10 +310,7 @@ static void start_and_monitor_instance(Instance_options *old_instance_options, ...@@ -302,10 +310,7 @@ static void start_and_monitor_instance(Instance_options *old_instance_options,
instance_map->unlock(); instance_map->unlock();
if (!old_instance_options->nonguarded) if (!old_instance_options->nonguarded)
{
thread_registry->unregister_thread(&thread_info); thread_registry->unregister_thread(&thread_info);
my_thread_end();
}
log_info("Monitoring thread (instance: '%s'): finished.", log_info("Monitoring thread (instance: '%s'): finished.",
(const char *) instance_name.get_c_str()); (const char *) instance_name.get_c_str());
...@@ -369,22 +374,19 @@ int Instance::start() ...@@ -369,22 +374,19 @@ int Instance::start()
if (configured && !is_running()) if (configured && !is_running())
{ {
Instance_monitor *instance_monitor;
remove_pid(); remove_pid();
pthread_t proxy_thd_id; instance_monitor= new Instance_monitor(this);
pthread_attr_t proxy_thd_attr;
int rc;
pthread_attr_init(&proxy_thd_attr); if (instance_monitor == NULL || instance_monitor->start_detached())
pthread_attr_setdetachstate(&proxy_thd_attr, PTHREAD_CREATE_DETACHED);
rc= pthread_create(&proxy_thd_id, &proxy_thd_attr, proxy,
this);
pthread_attr_destroy(&proxy_thd_attr);
if (rc)
{ {
log_error("Instance::start(): pthread_create(proxy) failed"); delete instance_monitor;
log_error("Instance::start(): failed to create the monitoring thread"
" to start an instance");
return ER_CANNOT_START_INSTANCE; return ER_CANNOT_START_INSTANCE;
} }
/* The monitoring thread will delete itself when it's finished. */
return 0; return 0;
} }
......
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
#include <sys/un.h> #include <sys/un.h>
#endif #endif
#include "instance_map.h"
#include "log.h" #include "log.h"
#include "mysql_connection.h" #include "mysql_connection.h"
#include "options.h" #include "options.h"
...@@ -59,47 +58,18 @@ static void set_no_inherit(int socket) ...@@ -59,47 +58,18 @@ static void set_no_inherit(int socket)
} }
/* Listener::Listener(Thread_registry *thread_registry_arg,
Listener_thread - incapsulates listening functionality User_map *user_map_arg)
*/ :thread_registry(thread_registry_arg),
user_map(user_map_arg),
class Listener_thread: public Listener_thread_args total_connection_count(0),
{ num_sockets(0)
public:
Listener_thread(const Listener_thread_args &args);
~Listener_thread();
void run();
private:
static const int LISTEN_BACK_LOG_SIZE= 5; /* standard backlog size */
ulong total_connection_count;
Thread_info thread_info;
int sockets[2];
int num_sockets;
fd_set read_fds;
private:
void handle_new_mysql_connection(Vio *vio);
int create_tcp_socket();
int create_unix_socket(struct sockaddr_un &unix_socket_address);
};
Listener_thread::Listener_thread(const Listener_thread_args &args) :
Listener_thread_args(args.thread_registry, args.user_map, args.instance_map)
,total_connection_count(0)
,thread_info(pthread_self(), TRUE)
,num_sockets(0)
{
}
Listener_thread::~Listener_thread()
{ {
} }
/* /*
Listener_thread::run() - listen all supported sockets and spawn a thread Listener::run() - listen all supported sockets and spawn a thread
to handle incoming connection. to handle incoming connection.
Using 'die' in case of syscall failure is OK now - we don't hold any Using 'die' in case of syscall failure is OK now - we don't hold any
resources and 'die' kills the signal thread automatically. To be rewritten resources and 'die' kills the signal thread automatically. To be rewritten
...@@ -108,11 +78,11 @@ Listener_thread::~Listener_thread() ...@@ -108,11 +78,11 @@ Listener_thread::~Listener_thread()
architecture. architecture.
*/ */
void Listener_thread::run() void Listener::run()
{ {
int i, n= 0; int i, n= 0;
log_info("Listener_thread: started."); log_info("Listener: started.");
#ifndef __WIN__ #ifndef __WIN__
/* we use this var to check whether we are running on LinuxThreads */ /* we use this var to check whether we are running on LinuxThreads */
...@@ -125,9 +95,7 @@ void Listener_thread::run() ...@@ -125,9 +95,7 @@ void Listener_thread::run()
linuxthreads= (thread_pid != manager_pid); linuxthreads= (thread_pid != manager_pid);
#endif #endif
thread_registry.register_thread(&thread_info); thread_registry->register_thread(&thread_info);
my_thread_init();
FD_ZERO(&read_fds); FD_ZERO(&read_fds);
...@@ -146,7 +114,7 @@ void Listener_thread::run() ...@@ -146,7 +114,7 @@ void Listener_thread::run()
n++; n++;
timeval tv; timeval tv;
while (!thread_registry.is_shutdown()) while (!thread_registry->is_shutdown())
{ {
fd_set read_fds_arg= read_fds; fd_set read_fds_arg= read_fds;
/* /*
...@@ -166,7 +134,7 @@ void Listener_thread::run() ...@@ -166,7 +134,7 @@ void Listener_thread::run()
if (rc == 0 || rc == -1) if (rc == 0 || rc == -1)
{ {
if (rc == -1 && errno != EINTR) if (rc == -1 && errno != EINTR)
log_error("Listener_thread: select() failed, %s", log_error("Listener: select() failed, %s",
strerror(errno)); strerror(errno));
continue; continue;
} }
...@@ -200,7 +168,7 @@ void Listener_thread::run() ...@@ -200,7 +168,7 @@ void Listener_thread::run()
/* III. Release all resources and exit */ /* III. Release all resources and exit */
log_info("Listener_thread: shutdown requested, exiting..."); log_info("Listener: shutdown requested, exiting...");
for (i= 0; i < num_sockets; i++) for (i= 0; i < num_sockets; i++)
close(sockets[i]); close(sockets[i]);
...@@ -209,10 +177,9 @@ void Listener_thread::run() ...@@ -209,10 +177,9 @@ void Listener_thread::run()
unlink(unix_socket_address.sun_path); unlink(unix_socket_address.sun_path);
#endif #endif
thread_registry.unregister_thread(&thread_info); thread_registry->unregister_thread(&thread_info);
my_thread_end();
log_info("Listener_thread: finished."); log_info("Listener: finished.");
return; return;
err: err:
...@@ -220,13 +187,12 @@ void Listener_thread::run() ...@@ -220,13 +187,12 @@ void Listener_thread::run()
for (i= 0; i < num_sockets; i++) for (i= 0; i < num_sockets; i++)
close(sockets[i]); close(sockets[i]);
thread_registry.unregister_thread(&thread_info); thread_registry->unregister_thread(&thread_info);
thread_registry.request_shutdown(); thread_registry->request_shutdown();
my_thread_end();
return; return;
} }
int Listener_thread::create_tcp_socket() int Listener::create_tcp_socket()
{ {
/* value to be set by setsockopt */ /* value to be set by setsockopt */
int arg= 1; int arg= 1;
...@@ -265,7 +231,7 @@ int Listener_thread::create_tcp_socket() ...@@ -265,7 +231,7 @@ int Listener_thread::create_tcp_socket()
if (bind(ip_socket, (struct sockaddr *) &ip_socket_address, if (bind(ip_socket, (struct sockaddr *) &ip_socket_address,
sizeof(ip_socket_address))) sizeof(ip_socket_address)))
{ {
log_error("Listener_thread: bind(ip socket) failed, '%s'", log_error("Listener: bind(ip socket) failed, '%s'",
strerror(errno)); strerror(errno));
close(ip_socket); close(ip_socket);
return -1; return -1;
...@@ -273,7 +239,7 @@ int Listener_thread::create_tcp_socket() ...@@ -273,7 +239,7 @@ int Listener_thread::create_tcp_socket()
if (listen(ip_socket, LISTEN_BACK_LOG_SIZE)) if (listen(ip_socket, LISTEN_BACK_LOG_SIZE))
{ {
log_error("Listener_thread: listen(ip socket) failed, %s", log_error("Listener: listen(ip socket) failed, %s",
strerror(errno)); strerror(errno));
close(ip_socket); close(ip_socket);
return -1; return -1;
...@@ -292,7 +258,7 @@ int Listener_thread::create_tcp_socket() ...@@ -292,7 +258,7 @@ int Listener_thread::create_tcp_socket()
} }
#ifndef __WIN__ #ifndef __WIN__
int Listener_thread:: int Listener::
create_unix_socket(struct sockaddr_un &unix_socket_address) create_unix_socket(struct sockaddr_un &unix_socket_address)
{ {
int unix_socket= socket(AF_UNIX, SOCK_STREAM, 0); int unix_socket= socket(AF_UNIX, SOCK_STREAM, 0);
...@@ -318,7 +284,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) ...@@ -318,7 +284,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
if (bind(unix_socket, (struct sockaddr *) &unix_socket_address, if (bind(unix_socket, (struct sockaddr *) &unix_socket_address,
sizeof(unix_socket_address))) sizeof(unix_socket_address)))
{ {
log_error("Listener_thread: bind(unix socket) failed, " log_error("Listener: bind(unix socket) failed, "
"socket file name is '%s', error '%s'", "socket file name is '%s', error '%s'",
unix_socket_address.sun_path, strerror(errno)); unix_socket_address.sun_path, strerror(errno));
close(unix_socket); close(unix_socket);
...@@ -329,7 +295,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) ...@@ -329,7 +295,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
if (listen(unix_socket, LISTEN_BACK_LOG_SIZE)) if (listen(unix_socket, LISTEN_BACK_LOG_SIZE))
{ {
log_error("Listener_thread: listen(unix socket) failed, %s", log_error("Listener: listen(unix socket) failed, %s",
strerror(errno)); strerror(errno));
close(unix_socket); close(unix_socket);
return -1; return -1;
...@@ -357,46 +323,16 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) ...@@ -357,46 +323,16 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
handle_new_mysql_connection() handle_new_mysql_connection()
*/ */
void Listener_thread::handle_new_mysql_connection(Vio *vio) void Listener::handle_new_mysql_connection(Vio *vio)
{ {
if (Mysql_connection_thread_args *mysql_thread_args= Mysql_connection *mysql_connection=
new Mysql_connection_thread_args(vio, thread_registry, user_map, new Mysql_connection(thread_registry, user_map,
++total_connection_count, vio, ++total_connection_count);
instance_map) if (mysql_connection == NULL || mysql_connection->start_detached())
)
{
/*
Initialize thread attributes to create detached thread; it seems
easier to do it ad-hoc than have a global variable for attributes.
*/
pthread_t mysql_thd_id;
pthread_attr_t mysql_thd_attr;
pthread_attr_init(&mysql_thd_attr);
pthread_attr_setdetachstate(&mysql_thd_attr, PTHREAD_CREATE_DETACHED);
if (set_stacksize_n_create_thread(&mysql_thd_id, &mysql_thd_attr,
mysql_connection, mysql_thread_args))
{ {
delete mysql_thread_args; log_error("handle_one_mysql_connection() failed");
delete mysql_connection;
vio_delete(vio); vio_delete(vio);
log_error("handle_one_mysql_connection():"
"set_stacksize_n_create_thread(mysql) failed");
} }
pthread_attr_destroy(&mysql_thd_attr); /* The connection will delete itself when the thread is finished */
}
else
vio_delete(vio);
} }
pthread_handler_t listener(void *arg)
{
Listener_thread_args *args= (Listener_thread_args *) arg;
Listener_thread listener(*args);
listener.run();
/*
args is a stack variable because listener thread lives as long as the
manager process itself
*/
return 0;
}
...@@ -16,33 +16,39 @@ ...@@ -16,33 +16,39 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h> #include "thread_registry.h"
#include <my_pthread.h>
#if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE) #if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE)
#pragma interface #pragma interface
#endif #endif
pthread_handler_t listener(void *arg);
class Thread_registry; class Thread_registry;
class User_map; class User_map;
class Instance_map;
struct Listener_thread_args /**
Listener - a thread listening on sockets and spawning
connection threads.
*/
class Listener: public Thread
{ {
Thread_registry &thread_registry; public:
const User_map &user_map; Listener(Thread_registry *thread_registry_arg, User_map *user_map_arg);
Instance_map &instance_map; protected:
virtual void run();
Listener_thread_args(Thread_registry &thread_registry_arg, private:
const User_map &user_map_arg, Thread_info thread_info;
Instance_map &instance_map_arg) : Thread_registry *thread_registry;
thread_registry(thread_registry_arg) User_map *user_map;
,user_map(user_map_arg) static const int LISTEN_BACK_LOG_SIZE= 5; /* standard backlog size */
,instance_map(instance_map_arg) ulong total_connection_count;
{}
int sockets[2];
int num_sockets;
fd_set read_fds;
void handle_new_mysql_connection(struct st_vio *vio);
int create_tcp_socket();
int create_unix_socket(struct sockaddr_un &unix_socket_address);
}; };
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_LISTENER_H #endif // INCLUDES_MYSQL_INSTANCE_MANAGER_LISTENER_H
...@@ -139,10 +139,10 @@ int Manager::main() ...@@ -139,10 +139,10 @@ int Manager::main()
User_map user_map; User_map user_map;
Instance_map instance_map(Options::Main::default_mysqld_path, Instance_map instance_map(Options::Main::default_mysqld_path,
thread_registry); thread_registry);
Guardian guardian(thread_registry, &instance_map, Guardian guardian(&thread_registry, &instance_map,
Options::Main::monitoring_interval); Options::Main::monitoring_interval);
Listener_thread_args listener_args(thread_registry, user_map, instance_map); Listener listener(&thread_registry, &user_map);
manager_pid= getpid(); manager_pid= getpid();
p_instance_map= &instance_map; p_instance_map= &instance_map;
...@@ -212,40 +212,29 @@ int Manager::main() ...@@ -212,40 +212,29 @@ int Manager::main()
sigset_t mask; sigset_t mask;
set_signals(&mask); set_signals(&mask);
/* create guardian thread */
{
pthread_t guardian_thd_id;
pthread_attr_t guardian_thd_attr;
int rc;
/* /*
Create the guardian thread. The newly started thread will block until
we actually load instances.
NOTE: Guardian should be shutdown first. Only then all other threads NOTE: Guardian should be shutdown first. Only then all other threads
need to be stopped. This should be done, as guardian is responsible can be stopped. This should be done in this order because the guardian
for shutting down the instances, and this is a long operation. is responsible for shutting down all the guarded instances, and this
is a long operation.
NOTE: Guardian uses thr_alarm() when detects current state of
instances (is_running()), but it is not interfere with NOTE: Guardian uses thr_alarm() when detects the current state of an
flush_instances() later in the code, because until flush_instances() instance (is_running()), but this does not interfere with
complete in the main thread, Guardian thread is not permitted to flush_instances() call later in the code, because until
process instances. And before flush_instances() there is no instances flush_instances() completes in the main thread, Guardian thread is not
to proceed. permitted to process instances. And before flush_instances() has
completed, there are no instances to guard.
*/ */
if (guardian.start_detached())
pthread_attr_init(&guardian_thd_attr);
pthread_attr_setdetachstate(&guardian_thd_attr, PTHREAD_CREATE_DETACHED);
rc= set_stacksize_n_create_thread(&guardian_thd_id, &guardian_thd_attr,
guardian_thread_func, &guardian);
pthread_attr_destroy(&guardian_thd_attr);
if (rc)
{ {
log_error("manager(): set_stacksize_n_create_thread(guardian) failed"); log_error("manager(): Failed to create the guardian thread");
goto err; goto err;
} }
}
/* Load instances. */ /* Load instances. */
{ {
instance_map.guardian->lock(); instance_map.guardian->lock();
instance_map.lock(); instance_map.lock();
...@@ -265,24 +254,13 @@ int Manager::main() ...@@ -265,24 +254,13 @@ int Manager::main()
} }
} }
/* create the listener */ /* start the listener */
{ if (listener.start_detached())
pthread_t listener_thd_id;
pthread_attr_t listener_thd_attr;
int rc;
pthread_attr_init(&listener_thd_attr);
pthread_attr_setdetachstate(&listener_thd_attr, PTHREAD_CREATE_DETACHED);
rc= set_stacksize_n_create_thread(&listener_thd_id, &listener_thd_attr,
listener, &listener_args);
pthread_attr_destroy(&listener_thd_attr);
if (rc)
{ {
log_error("manager(): set_stacksize_n_create_thread(listener) failed"); log_error("manager(): set_stacksize_n_create_thread(listener) failed");
stop_all(&guardian, &thread_registry); stop_all(&guardian, &thread_registry);
goto err; goto err;
} }
}
/* /*
After the list of guarded instances have been initialized, After the list of guarded instances have been initialized,
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#include <m_string.h> #include <m_string.h>
#include <m_string.h> #include <m_string.h>
#include <my_global.h> #include <my_global.h>
#include <mysql_com.h>
#include <mysql.h> #include <mysql.h>
#include <my_sys.h> #include <my_sys.h>
#include <violite.h> #include <violite.h>
...@@ -40,66 +39,15 @@ ...@@ -40,66 +39,15 @@
#include "user_map.h" #include "user_map.h"
Mysql_connection_thread_args::Mysql_connection_thread_args( Mysql_connection::Mysql_connection(Thread_registry *thread_registry_arg,
struct st_vio *vio_arg, User_map *user_map_arg,
Thread_registry &thread_registry_arg, struct st_vio *vio_arg, ulong
const User_map &user_map_arg, connection_id_arg)
ulong connection_id_arg, :vio(vio_arg),
Instance_map &instance_map_arg) : connection_id(connection_id_arg),
vio(vio_arg) thread_registry(thread_registry_arg),
,thread_registry(thread_registry_arg) user_map(user_map_arg)
,user_map(user_map_arg)
,connection_id(connection_id_arg)
,instance_map(instance_map_arg)
{}
/*
MySQL connection - handle one connection with mysql command line client
See also comments in mysqlmanager.cc to picture general Instance Manager
architecture.
We use conventional technique to work with classes without exceptions:
class acquires all vital resource in init(); Thus if init() succeed,
a user must call cleanup(). All other methods are valid only between
init() and cleanup().
*/
class Mysql_connection_thread: public Mysql_connection_thread_args
{
public:
Mysql_connection_thread(const Mysql_connection_thread_args &args);
int init();
void cleanup();
void run();
~Mysql_connection_thread();
private:
Thread_info thread_info;
NET net;
struct rand_struct rand_st;
char scramble[SCRAMBLE_LENGTH + 1];
uint status;
ulong client_capabilities;
private:
/* Names are conventionally the same as in mysqld */
int check_connection();
int do_command();
int dispatch_command(enum enum_server_command command,
const char *text, uint len);
};
Mysql_connection_thread::Mysql_connection_thread(
const Mysql_connection_thread_args &args) :
Mysql_connection_thread_args(args.vio,
args.thread_registry,
args.user_map,
args.connection_id,
args.instance_map)
,thread_info(pthread_self(), TRUE)
{ {
thread_registry.register_thread(&thread_info);
} }
...@@ -129,7 +77,7 @@ C_MODE_END ...@@ -129,7 +77,7 @@ C_MODE_END
This function is complementary to cleanup(). This function is complementary to cleanup().
*/ */
int Mysql_connection_thread::init() int Mysql_connection::init()
{ {
/* Allocate buffers for network I/O */ /* Allocate buffers for network I/O */
if (my_net_init(&net, vio)) if (my_net_init(&net, vio))
...@@ -145,52 +93,46 @@ int Mysql_connection_thread::init() ...@@ -145,52 +93,46 @@ int Mysql_connection_thread::init()
create_random_string(scramble, SCRAMBLE_LENGTH, &rand_st); create_random_string(scramble, SCRAMBLE_LENGTH, &rand_st);
/* We don't support transactions, every query is atomic */ /* We don't support transactions, every query is atomic */
status= SERVER_STATUS_AUTOCOMMIT; status= SERVER_STATUS_AUTOCOMMIT;
thread_registry->register_thread(&thread_info);
return 0; return 0;
} }
void Mysql_connection_thread::cleanup() void Mysql_connection::cleanup()
{ {
net_end(&net); net_end(&net);
thread_registry->unregister_thread(&thread_info);
} }
Mysql_connection_thread::~Mysql_connection_thread() Mysql_connection::~Mysql_connection()
{ {
/* vio_delete closes the socket if necessary */ /* vio_delete closes the socket if necessary */
vio_delete(vio); vio_delete(vio);
thread_registry.unregister_thread(&thread_info);
} }
void Mysql_connection_thread::run() void Mysql_connection::main()
{ {
log_info("accepted mysql connection %lu", (unsigned long) connection_id); log_info("accepted mysql connection %lu", (unsigned long) connection_id);
my_thread_init();
if (check_connection()) if (check_connection())
{
my_thread_end();
return; return;
}
log_info("connection %lu is checked successfully", log_info("connection %lu is checked successfully",
(unsigned long) connection_id); (unsigned long) connection_id);
vio_keepalive(vio, TRUE); vio_keepalive(vio, TRUE);
while (!net.error && net.vio && !thread_registry.is_shutdown()) while (!net.error && net.vio && !thread_registry->is_shutdown())
{ {
if (do_command()) if (do_command())
break; break;
} }
my_thread_end();
} }
int Mysql_connection_thread::check_connection() int Mysql_connection::check_connection()
{ {
ulong pkt_len=0; // to hold client reply length ulong pkt_len=0; // to hold client reply length
...@@ -279,7 +221,7 @@ int Mysql_connection_thread::check_connection() ...@@ -279,7 +221,7 @@ int Mysql_connection_thread::check_connection()
net_send_error(&net, ER_ACCESS_DENIED_ERROR); net_send_error(&net, ER_ACCESS_DENIED_ERROR);
return 1; return 1;
} }
if (user_map.authenticate(&user_name, password, scramble)) if (user_map->authenticate(&user_name, password, scramble))
{ {
net_send_error(&net, ER_ACCESS_DENIED_ERROR); net_send_error(&net, ER_ACCESS_DENIED_ERROR);
return 1; return 1;
...@@ -289,7 +231,7 @@ int Mysql_connection_thread::check_connection() ...@@ -289,7 +231,7 @@ int Mysql_connection_thread::check_connection()
} }
int Mysql_connection_thread::do_command() int Mysql_connection::do_command()
{ {
char *packet; char *packet;
ulong packet_length; ulong packet_length;
...@@ -302,7 +244,7 @@ int Mysql_connection_thread::do_command() ...@@ -302,7 +244,7 @@ int Mysql_connection_thread::do_command()
/* Check if we can continue without closing the connection */ /* Check if we can continue without closing the connection */
if (net.error != 3) // what is 3 - find out if (net.error != 3) // what is 3 - find out
return 1; return 1;
if (thread_registry.is_shutdown()) if (thread_registry->is_shutdown())
return 1; return 1;
net_send_error(&net, net.last_errno); net_send_error(&net, net.last_errno);
net.error= 0; net.error= 0;
...@@ -310,7 +252,7 @@ int Mysql_connection_thread::do_command() ...@@ -310,7 +252,7 @@ int Mysql_connection_thread::do_command()
} }
else else
{ {
if (thread_registry.is_shutdown()) if (thread_registry->is_shutdown())
return 1; return 1;
packet= (char*) net.read_pos; packet= (char*) net.read_pos;
enum enum_server_command command= (enum enum_server_command) enum enum_server_command command= (enum enum_server_command)
...@@ -321,7 +263,7 @@ int Mysql_connection_thread::do_command() ...@@ -321,7 +263,7 @@ int Mysql_connection_thread::do_command()
} }
} }
int Mysql_connection_thread::dispatch_command(enum enum_server_command command, int Mysql_connection::dispatch_command(enum enum_server_command command,
const char *packet, uint len) const char *packet, uint len)
{ {
switch (command) { switch (command) {
...@@ -374,19 +316,16 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command, ...@@ -374,19 +316,16 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
} }
pthread_handler_t mysql_connection(void *arg) void Mysql_connection::run()
{ {
Mysql_connection_thread_args *args= (Mysql_connection_thread_args *) arg; if (init())
Mysql_connection_thread mysql_connection_thread(*args); log_info("Mysql_connection::run(): error initializing thread");
delete args;
if (mysql_connection_thread.init())
log_info("mysql_connection(): error initializing thread");
else else
{ {
mysql_connection_thread.run(); main();
mysql_connection_thread.cleanup(); cleanup();
} }
return 0; delete this;
} }
/* /*
......
...@@ -16,33 +16,60 @@ ...@@ -16,33 +16,60 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h> #include "thread_registry.h"
#include <my_pthread.h> #include <mysql_com.h>
#if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE) #if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE)
#pragma interface #pragma interface
#endif #endif
pthread_handler_t mysql_connection(void *arg);
class Thread_registry;
class User_map;
class Instance_map;
struct st_vio; struct st_vio;
class User_map;
struct Mysql_connection_thread_args /*
MySQL connection - handle one connection with mysql command line client
See also comments in mysqlmanager.cc to picture general Instance Manager
architecture.
We use conventional technique to work with classes without exceptions:
class acquires all vital resource in init(); Thus if init() succeed,
a user must call cleanup(). All other methods are valid only between
init() and cleanup().
*/
class Mysql_connection: public Thread
{ {
public:
Mysql_connection(Thread_registry *thread_registry_arg,
User_map *user_map_arg,
struct st_vio *vio_arg,
ulong connection_id_arg);
virtual ~Mysql_connection();
protected:
virtual void run();
private:
struct st_vio *vio; struct st_vio *vio;
Thread_registry &thread_registry;
const User_map &user_map;
ulong connection_id; ulong connection_id;
Instance_map &instance_map; Thread_info thread_info;
Thread_registry *thread_registry;
User_map *user_map;
NET net;
struct rand_struct rand_st;
char scramble[SCRAMBLE_LENGTH + 1];
uint status;
ulong client_capabilities;
private:
/* The main loop implementation triad */
int init();
void main();
void cleanup();
Mysql_connection_thread_args(struct st_vio *vio_arg, /* Names are conventionally the same as in mysqld */
Thread_registry &thread_registry_arg, int check_connection();
const User_map &user_map_arg, int do_command();
ulong connection_id_arg, int dispatch_command(enum enum_server_command command,
Instance_map &instance_map_arg); const char *text, uint len);
}; };
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_MYSQL_CONNECTION_H #endif // INCLUDES_MYSQL_INSTANCE_MANAGER_MYSQL_CONNECTION_H
...@@ -22,17 +22,6 @@ ...@@ -22,17 +22,6 @@
#include "log.h" #include "log.h"
#if defined(__ia64__) || defined(__ia64)
/*
We can live with 32K, but reserve 64K. Just to be safe.
On ia64 we need to reserve double of the size.
*/
#define IM_THREAD_STACK_SIZE (128*1024L)
#else
#define IM_THREAD_STACK_SIZE (64*1024)
#endif
/* the pid of the manager process (of the signal thread on the LinuxThreads) */ /* the pid of the manager process (of the signal thread on the LinuxThreads) */
pid_t manager_pid; pid_t manager_pid;
...@@ -66,33 +55,6 @@ unsigned long bytes_sent = 0L, bytes_received = 0L; ...@@ -66,33 +55,6 @@ unsigned long bytes_sent = 0L, bytes_received = 0L;
unsigned long mysqld_net_retry_count = 10L; unsigned long mysqld_net_retry_count = 10L;
unsigned long open_files_limit; unsigned long open_files_limit;
/*
Change the stack size and start a thread. Return an error if either
pthread_attr_setstacksize or pthread_create fails.
Arguments are the same as for pthread_create().
*/
int set_stacksize_n_create_thread(pthread_t *thread, pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg)
{
int rc= 0;
#ifndef __WIN__
#ifndef PTHREAD_STACK_MIN
#define PTHREAD_STACK_MIN 32768
#endif
/*
Set stack size to be safe on the platforms with too small
default thread stack.
*/
rc= pthread_attr_setstacksize(attr,
(size_t) (PTHREAD_STACK_MIN +
IM_THREAD_STACK_SIZE));
#endif
if (!rc)
rc= pthread_create(thread, attr, start_routine, arg);
return rc;
}
int create_pid_file(const char *pid_file_name, int pid) int create_pid_file(const char *pid_file_name, int pid)
......
...@@ -105,10 +105,6 @@ extern unsigned long bytes_sent, bytes_received; ...@@ -105,10 +105,6 @@ extern unsigned long bytes_sent, bytes_received;
extern unsigned long mysqld_net_retry_count; extern unsigned long mysqld_net_retry_count;
extern unsigned long open_files_limit; extern unsigned long open_files_limit;
int set_stacksize_n_create_thread(pthread_t *thread, pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg);
int create_pid_file(const char *pid_file_name, int pid); int create_pid_file(const char *pid_file_name, int pid);
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_PRIV_H #endif // INCLUDES_MYSQL_INSTANCE_MANAGER_PRIV_H
...@@ -38,15 +38,13 @@ static void handle_signal(int __attribute__((unused)) sig_no) ...@@ -38,15 +38,13 @@ static void handle_signal(int __attribute__((unused)) sig_no)
} }
#endif #endif
/* /* Thread_info initializer methods */
Thread_info initializer methods
*/
Thread_info::Thread_info() {} void Thread_info::init(bool send_signal_on_shutdown_arg)
Thread_info::Thread_info(pthread_t thread_id_arg, {
bool send_signal_on_shutdown_arg) : thread_id= pthread_self();
thread_id(thread_id_arg), send_signal_on_shutdown= send_signal_on_shutdown_arg;
send_signal_on_shutdown(send_signal_on_shutdown_arg) {} }
/* /*
TODO: think about moving signal information (now it's shutdown_in_progress) TODO: think about moving signal information (now it's shutdown_in_progress)
...@@ -86,11 +84,14 @@ Thread_registry::~Thread_registry() ...@@ -86,11 +84,14 @@ Thread_registry::~Thread_registry()
points to the last node. points to the last node.
*/ */
void Thread_registry::register_thread(Thread_info *info) void Thread_registry::register_thread(Thread_info *info,
bool send_signal_on_shutdown)
{ {
log_info("Thread_registry: registering thread %d...", log_info("Thread_registry: registering thread %d...",
(int) info->thread_id); (int) info->thread_id);
info->init(send_signal_on_shutdown);
#ifndef __WIN__ #ifndef __WIN__
struct sigaction sa; struct sigaction sa;
sa.sa_handler= handle_signal; sa.sa_handler= handle_signal;
...@@ -298,3 +299,80 @@ void Thread_registry::wait_for_threads_to_unregister() ...@@ -298,3 +299,80 @@ void Thread_registry::wait_for_threads_to_unregister()
} }
} }
} }
/*********************************************************************
class Thread
*********************************************************************/
#if defined(__ia64__) || defined(__ia64)
/*
We can live with 32K, but reserve 64K. Just to be safe.
On ia64 we need to reserve double of the size.
*/
#define IM_THREAD_STACK_SIZE (128*1024L)
#else
#define IM_THREAD_STACK_SIZE (64*1024)
#endif
/*
Change the stack size and start a thread. Return an error if either
pthread_attr_setstacksize or pthread_create fails.
Arguments are the same as for pthread_create().
*/
static
int set_stacksize_and_create_thread(pthread_t *thread, pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg)
{
int rc= 0;
#ifndef __WIN__
#ifndef PTHREAD_STACK_MIN
#define PTHREAD_STACK_MIN 32768
#endif
/*
Set stack size to be safe on the platforms with too small
default thread stack.
*/
rc= pthread_attr_setstacksize(attr,
(size_t) (PTHREAD_STACK_MIN +
IM_THREAD_STACK_SIZE));
#endif
if (!rc)
rc= pthread_create(thread, attr, start_routine, arg);
return rc;
}
Thread::~Thread()
{
}
void *Thread::thread_func(void *arg)
{
Thread *thread= (Thread *) arg;
my_thread_init();
thread->run();
my_thread_end();
return NULL;
}
bool Thread::start_detached()
{
pthread_t thd_id;
pthread_attr_t attr;
int rc;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
rc= set_stacksize_and_create_thread(&thd_id, &attr,
Thread::thread_func, this);
pthread_attr_destroy(&attr);
return rc != 0;
}
...@@ -57,7 +57,7 @@ ...@@ -57,7 +57,7 @@
#pragma interface #pragma interface
#endif #endif
/* /**
Thread_info - repository entry for each worker thread Thread_info - repository entry for each worker thread
All entries comprise double-linked list like: All entries comprise double-linked list like:
0 -- entry -- entry -- entry - 0 0 -- entry -- entry -- entry - 0
...@@ -67,12 +67,10 @@ ...@@ -67,12 +67,10 @@
class Thread_info class Thread_info
{ {
public: public:
Thread_info(pthread_t thread_id_arg, bool send_signal_on_shutdown_arg); Thread_info() {}
friend class Thread_registry; friend class Thread_registry;
private: private:
Thread_info(); void init(bool send_signal_on_shutdown);
private: private:
pthread_cond_t *current_cond; pthread_cond_t *current_cond;
Thread_info *prev, *next; Thread_info *prev, *next;
...@@ -81,7 +79,26 @@ class Thread_info ...@@ -81,7 +79,26 @@ class Thread_info
}; };
/* /**
A base class for a detached thread.
*/
class Thread
{
public:
Thread() {}
bool start_detached();
protected:
virtual void run()= 0;
virtual ~Thread();
private:
static void *thread_func(void *arg);
Thread(const Thread & /* rhs */); /* not implemented */
Thread &operator=(const Thread & /* rhs */); /* not implemented */
};
/**
Thread_registry - contains handles for each worker thread to deliver Thread_registry - contains handles for each worker thread to deliver
signal information to workers. signal information to workers.
*/ */
...@@ -92,7 +109,7 @@ class Thread_registry ...@@ -92,7 +109,7 @@ class Thread_registry
Thread_registry(); Thread_registry();
~Thread_registry(); ~Thread_registry();
void register_thread(Thread_info *info); void register_thread(Thread_info *info, bool send_signal_on_shutdown= TRUE);
void unregister_thread(Thread_info *info); void unregister_thread(Thread_info *info);
void deliver_shutdown(); void deliver_shutdown();
void request_shutdown(); void request_shutdown();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment