Improve ndb thread shutdown handling

parent 168c5f14
...@@ -134,7 +134,6 @@ static uint ndbcluster_alter_table_flags(uint flags) ...@@ -134,7 +134,6 @@ static uint ndbcluster_alter_table_flags(uint flags)
} }
static int ndbcluster_inited= 0; static int ndbcluster_inited= 0;
int ndbcluster_util_inited= 0;
static Ndb* g_ndb= NULL; static Ndb* g_ndb= NULL;
Ndb_cluster_connection* g_ndb_cluster_connection= NULL; Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
...@@ -158,6 +157,7 @@ static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*, const NDBTAB *, ...@@ -158,6 +157,7 @@ static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*, const NDBTAB *,
// Util thread variables // Util thread variables
pthread_t ndb_util_thread; pthread_t ndb_util_thread;
int ndb_util_thread_running= 0;
pthread_mutex_t LOCK_ndb_util_thread; pthread_mutex_t LOCK_ndb_util_thread;
pthread_cond_t COND_ndb_util_thread; pthread_cond_t COND_ndb_util_thread;
pthread_handler_t ndb_util_thread_func(void *arg); pthread_handler_t ndb_util_thread_func(void *arg);
...@@ -6720,6 +6720,12 @@ static int ndbcluster_init(void *p) ...@@ -6720,6 +6720,12 @@ static int ndbcluster_init(void *p)
goto ndbcluster_init_error; goto ndbcluster_init_error;
} }
/* Wait for the util thread to start */
pthread_mutex_lock(&LOCK_ndb_util_thread);
while (!ndb_util_thread_running)
pthread_cond_wait(&COND_ndb_util_thread, &LOCK_ndb_util_thread);
pthread_mutex_unlock(&LOCK_ndb_util_thread);
ndbcluster_inited= 1; ndbcluster_inited= 1;
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
...@@ -6742,6 +6748,27 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type) ...@@ -6742,6 +6748,27 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type)
if (!ndbcluster_inited) if (!ndbcluster_inited)
DBUG_RETURN(0); DBUG_RETURN(0);
ndbcluster_inited= 0;
/* wait for util thread to finish */
pthread_mutex_lock(&LOCK_ndb_util_thread);
if (ndb_util_thread_running > 0)
{
pthread_cond_signal(&COND_ndb_util_thread);
pthread_mutex_unlock(&LOCK_ndb_util_thread);
pthread_mutex_lock(&LOCK_ndb_util_thread);
while (ndb_util_thread_running > 0)
{
struct timespec abstime;
set_timespec(abstime, 1);
pthread_cond_timedwait(&COND_ndb_util_thread,
&LOCK_ndb_util_thread,
&abstime);
}
}
pthread_mutex_unlock(&LOCK_ndb_util_thread);
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
{ {
...@@ -6788,7 +6815,6 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type) ...@@ -6788,7 +6815,6 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type)
pthread_mutex_destroy(&ndbcluster_mutex); pthread_mutex_destroy(&ndbcluster_mutex);
pthread_mutex_destroy(&LOCK_ndb_util_thread); pthread_mutex_destroy(&LOCK_ndb_util_thread);
pthread_cond_destroy(&COND_ndb_util_thread); pthread_cond_destroy(&COND_ndb_util_thread);
ndbcluster_inited= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -8334,6 +8360,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -8334,6 +8360,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
{ {
thd->cleanup(); thd->cleanup();
delete thd; delete thd;
ndb_util_thread_running= 0;
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
} }
thd->init_for_queries(); thd->init_for_queries();
...@@ -8346,6 +8373,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -8346,6 +8373,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
thd->main_security_ctx.priv_user = 0; thd->main_security_ctx.priv_user = 0;
thd->current_stmt_binlog_row_based= TRUE; // If in mixed mode thd->current_stmt_binlog_row_based= TRUE; // If in mixed mode
ndb_util_thread_running= 1;
pthread_cond_signal(&COND_ndb_util_thread);
/* /*
wait for mysql server to start wait for mysql server to start
*/ */
...@@ -8354,8 +8384,6 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -8354,8 +8384,6 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
pthread_cond_wait(&COND_server_started, &LOCK_server_started); pthread_cond_wait(&COND_server_started, &LOCK_server_started);
pthread_mutex_unlock(&LOCK_server_started); pthread_mutex_unlock(&LOCK_server_started);
ndbcluster_util_inited= 1;
/* /*
Wait for cluster to start Wait for cluster to start
*/ */
...@@ -8537,6 +8565,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -8537,6 +8565,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
net_end(&thd->net); net_end(&thd->net);
thd->cleanup(); thd->cleanup();
delete thd; delete thd;
pthread_mutex_lock(&LOCK_ndb_util_thread);
ndb_util_thread_running= 0;
pthread_mutex_unlock(&LOCK_ndb_util_thread);
DBUG_PRINT("exit", ("ndb_util_thread")); DBUG_PRINT("exit", ("ndb_util_thread"));
my_thread_end(); my_thread_end();
pthread_exit(0); pthread_exit(0);
......
...@@ -81,6 +81,8 @@ THD *injector_thd= 0; ...@@ -81,6 +81,8 @@ THD *injector_thd= 0;
static Ndb *injector_ndb= 0; static Ndb *injector_ndb= 0;
static Ndb *schema_ndb= 0; static Ndb *schema_ndb= 0;
static int ndbcluster_binlog_inited= 0;
/* /*
Mutex and condition used for interacting between client sql thread Mutex and condition used for interacting between client sql thread
and injector thread and injector thread
...@@ -558,29 +560,28 @@ ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binl ...@@ -558,29 +560,28 @@ ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binl
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/* /*
End use of the NDB Cluster table handler End use of the NDB Cluster binlog
- free all global variables allocated by - wait for binlog thread to shutdown
ndbcluster_init()
*/ */
static int ndbcluster_binlog_end(THD *thd) static int ndbcluster_binlog_end(THD *thd)
{ {
DBUG_ENTER("ndb_binlog_end"); DBUG_ENTER("ndbcluster_binlog_end");
if (!ndbcluster_util_inited) if (!ndbcluster_binlog_inited)
DBUG_RETURN(0); DBUG_RETURN(0);
ndbcluster_binlog_inited= 0;
// Kill ndb utility thread
(void) pthread_mutex_lock(&LOCK_ndb_util_thread);
DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread));
(void) pthread_cond_signal(&COND_ndb_util_thread);
(void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/* wait for injector thread to finish */ /* wait for injector thread to finish */
pthread_mutex_lock(&injector_mutex);
if (ndb_binlog_thread_running > 0) if (ndb_binlog_thread_running > 0)
{ {
pthread_cond_signal(&injector_cond);
pthread_mutex_unlock(&injector_mutex);
pthread_mutex_lock(&injector_mutex); pthread_mutex_lock(&injector_mutex);
while (ndb_binlog_thread_running > 0) while (ndb_binlog_thread_running > 0)
{ {
...@@ -588,8 +589,9 @@ static int ndbcluster_binlog_end(THD *thd) ...@@ -588,8 +589,9 @@ static int ndbcluster_binlog_end(THD *thd)
set_timespec(abstime, 1); set_timespec(abstime, 1);
pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
} }
pthread_mutex_unlock(&injector_mutex);
} }
pthread_mutex_unlock(&injector_mutex);
/* remove all shares */ /* remove all shares */
{ {
...@@ -617,8 +619,10 @@ static int ndbcluster_binlog_end(THD *thd) ...@@ -617,8 +619,10 @@ static int ndbcluster_binlog_end(THD *thd)
} }
pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_unlock(&ndbcluster_mutex);
} }
pthread_mutex_destroy(&injector_mutex);
pthread_cond_destroy(&injector_cond);
#endif #endif
ndbcluster_util_inited= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2286,31 +2290,21 @@ int ndbcluster_binlog_start() ...@@ -2286,31 +2290,21 @@ int ndbcluster_binlog_start()
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
/* ndbcluster_binlog_inited= 1;
Wait for the ndb injector thread to finish starting up.
*/ /* Wait for the injector thread to start */
pthread_mutex_lock(&injector_mutex); pthread_mutex_lock(&injector_mutex);
while (!ndb_binlog_thread_running) while (!ndb_binlog_thread_running)
pthread_cond_wait(&injector_cond, &injector_mutex); pthread_cond_wait(&injector_cond, &injector_mutex);
pthread_mutex_unlock(&injector_mutex); pthread_mutex_unlock(&injector_mutex);
if (ndb_binlog_thread_running < 0) if (ndb_binlog_thread_running < 0)
DBUG_RETURN(-1); DBUG_RETURN(-1);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
static void ndbcluster_binlog_close_connection(THD *thd)
{
DBUG_ENTER("ndbcluster_binlog_close_connection");
const char *save_info= thd->proc_info;
thd->proc_info= "ndbcluster_binlog_close_connection";
do_ndbcluster_binlog_close_connection= BCCC_exit;
while (ndb_binlog_thread_running > 0)
sleep(1);
thd->proc_info= save_info;
DBUG_VOID_RETURN;
}
/************************************************************** /**************************************************************
Internal helper functions for creating/dropping ndb events Internal helper functions for creating/dropping ndb events
...@@ -3953,6 +3947,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3953,6 +3947,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
goto restart; goto restart;
} }
err: err:
sql_print_information("Stopping Cluster Binlog");
DBUG_PRINT("info",("Shutting down cluster binlog thread")); DBUG_PRINT("info",("Shutting down cluster binlog thread"));
thd->proc_info= "Shutting down"; thd->proc_info= "Shutting down";
close_thread_tables(thd); close_thread_tables(thd);
...@@ -3965,8 +3960,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3965,8 +3960,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
pthread_mutex_unlock(&injector_mutex); pthread_mutex_unlock(&injector_mutex);
thd->db= 0; // as not to try to free memory thd->db= 0; // as not to try to free memory
sql_print_information("Stopping Cluster Binlog");
if (ndb_apply_status_share) if (ndb_apply_status_share)
{ {
free_share(&ndb_apply_status_share); free_share(&ndb_apply_status_share);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment