Commit 9591abbd authored by unknown's avatar unknown

Make killing of threads safer


Docs/manual.texi:
  Changelog
sql/sql_insert.cc:
  Fix problem with INSERT DELAYED during shutdown
parent 42959176
...@@ -46712,6 +46712,9 @@ not yet 100% confident in this code. ...@@ -46712,6 +46712,9 @@ not yet 100% confident in this code.
@appendixsubsec Changes in release 3.23.42 @appendixsubsec Changes in release 3.23.42
@itemize @bullet @itemize @bullet
@item @item
Fixed rare hang when doing @code{mysqladmin shutdown} when there was
a lot of activity in other threads.
@item
Fixed problem with @code{INSERT DELAYED} where delay thread could be Fixed problem with @code{INSERT DELAYED} where delay thread could be
hanging on @code{upgrading locks} without any apparent reasons. hanging on @code{upgrading locks} without any apparent reasons.
@item @item
...@@ -575,12 +575,14 @@ extern int pthread_dummy(int); ...@@ -575,12 +575,14 @@ extern int pthread_dummy(int);
struct st_my_thread_var struct st_my_thread_var
{ {
int thr_errno; int thr_errno;
pthread_cond_t suspend, *current_cond; pthread_cond_t suspend;
pthread_mutex_t mutex, *current_mutex; pthread_mutex_t mutex;
pthread_mutex_t * volatile current_mutex;
pthread_cond_t * volatile current_cond;
pthread_t pthread_self; pthread_t pthread_self;
long id; long id;
int cmp_length; int cmp_length;
volatile int abort; int volatile abort;
#ifndef DBUG_OFF #ifndef DBUG_OFF
gptr dbug; gptr dbug;
char name[THREAD_NAME_SIZE+1]; char name[THREAD_NAME_SIZE+1];
......
...@@ -374,16 +374,16 @@ static my_bool wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, ...@@ -374,16 +374,16 @@ static my_bool wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
} }
/* Set up control struct to allow others to abort locks */ /* Set up control struct to allow others to abort locks */
pthread_mutex_lock(&thread_var->mutex);
thread_var->current_mutex= &data->lock->mutex; thread_var->current_mutex= &data->lock->mutex;
thread_var->current_cond= cond; thread_var->current_cond= cond;
pthread_mutex_unlock(&thread_var->mutex);
data->cond=cond; data->cond=cond;
do while (!thread_var->abort || in_wait_list)
{ {
pthread_cond_wait(cond,&data->lock->mutex); pthread_cond_wait(cond,&data->lock->mutex);
} while (data->cond == cond && (!thread_var->abort || in_wait_list)); if (data->cond != cond)
break;
}
if (data->cond || data->type == TL_UNLOCK) if (data->cond || data->type == TL_UNLOCK)
{ {
......
...@@ -1483,11 +1483,9 @@ longlong Item_func_get_lock::val_int() ...@@ -1483,11 +1483,9 @@ longlong Item_func_get_lock::val_int()
/* structure is now initialized. Try to get the lock */ /* structure is now initialized. Try to get the lock */
/* Set up control struct to allow others to abort locks */ /* Set up control struct to allow others to abort locks */
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->proc_info="User lock"; thd->proc_info="User lock";
thd->mysys_var->current_mutex= &LOCK_user_locks; thd->mysys_var->current_mutex= &LOCK_user_locks;
thd->mysys_var->current_cond= &ull->cond; thd->mysys_var->current_cond= &ull->cond;
pthread_mutex_unlock(&thd->mysys_var->mutex);
#ifdef HAVE_TIMESPEC_TS_SEC #ifdef HAVE_TIMESPEC_TS_SEC
abstime.ts_sec=time((time_t*) 0)+(time_t) timeout; abstime.ts_sec=time((time_t*) 0)+(time_t) timeout;
...@@ -1497,15 +1495,11 @@ longlong Item_func_get_lock::val_int() ...@@ -1497,15 +1495,11 @@ longlong Item_func_get_lock::val_int()
abstime.tv_nsec=0; abstime.tv_nsec=0;
#endif #endif
while ((error=pthread_cond_timedwait(&ull->cond,&LOCK_user_locks,&abstime)) while (!thd->killed &&
!= ETIME && error != ETIMEDOUT && ull->locked) (error=pthread_cond_timedwait(&ull->cond,&LOCK_user_locks,&abstime))
{ != ETIME && error != ETIMEDOUT && ull->locked) ;
if (thd->killed || abort_loop) if (thd->killed)
{
error=EINTR; // Return NULL error=EINTR; // Return NULL
break;
}
}
if (ull->locked) if (ull->locked)
{ {
if (!--ull->count) if (!--ull->count)
......
...@@ -65,11 +65,9 @@ MYSQL_LOCK *mysql_lock_tables(THD *thd,TABLE **tables,uint count) ...@@ -65,11 +65,9 @@ MYSQL_LOCK *mysql_lock_tables(THD *thd,TABLE **tables,uint count)
} }
pthread_mutex_lock(&LOCK_open); pthread_mutex_lock(&LOCK_open);
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex= &LOCK_open; thd->mysys_var->current_mutex= &LOCK_open;
thd->mysys_var->current_cond= &COND_refresh; thd->mysys_var->current_cond= &COND_refresh;
thd->proc_info="Waiting for table"; thd->proc_info="Waiting for table";
pthread_mutex_unlock(&thd->mysys_var->mutex);
while (global_read_lock && ! thd->killed && while (global_read_lock && ! thd->killed &&
thd->version == refresh_version) thd->version == refresh_version)
......
...@@ -358,11 +358,9 @@ bool close_cached_tables(THD *thd, bool if_wait_for_refresh, ...@@ -358,11 +358,9 @@ bool close_cached_tables(THD *thd, bool if_wait_for_refresh,
*/ */
if (!tables) if (!tables)
kill_delayed_threads(); kill_delayed_threads();
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex= &LOCK_open; thd->mysys_var->current_mutex= &LOCK_open;
thd->mysys_var->current_cond= &COND_refresh; thd->mysys_var->current_cond= &COND_refresh;
thd->proc_info="Flushing tables"; thd->proc_info="Flushing tables";
pthread_mutex_unlock(&thd->mysys_var->mutex);
close_old_data_files(thd,thd->open_tables,1,1); close_old_data_files(thd,thd->open_tables,1,1);
bool found=1; bool found=1;
...@@ -667,12 +665,11 @@ void wait_for_refresh(THD *thd) ...@@ -667,12 +665,11 @@ void wait_for_refresh(THD *thd)
{ {
/* Wait until the current table is up to date */ /* Wait until the current table is up to date */
const char *proc_info; const char *proc_info;
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex= &LOCK_open; thd->mysys_var->current_mutex= &LOCK_open;
thd->mysys_var->current_cond= &COND_refresh; thd->mysys_var->current_cond= &COND_refresh;
proc_info=thd->proc_info; proc_info=thd->proc_info;
thd->proc_info="Waiting for table"; thd->proc_info="Waiting for table";
pthread_mutex_unlock(&thd->mysys_var->mutex); if (!thd->killed)
(void) pthread_cond_wait(&COND_refresh,&LOCK_open); (void) pthread_cond_wait(&COND_refresh,&LOCK_open);
pthread_mutex_unlock(&LOCK_open); // Must be unlocked first pthread_mutex_unlock(&LOCK_open); // Must be unlocked first
...@@ -2182,7 +2179,7 @@ bool remove_table_from_cache(THD *thd, const char *db, const char *table_name, ...@@ -2182,7 +2179,7 @@ bool remove_table_from_cache(THD *thd, const char *db, const char *table_name,
{ {
in_use->killed=1; in_use->killed=1;
pthread_mutex_lock(&in_use->mysys_var->mutex); pthread_mutex_lock(&in_use->mysys_var->mutex);
if (in_use->mysys_var->current_mutex) if (in_use->mysys_var->current_cond)
{ {
pthread_mutex_lock(in_use->mysys_var->current_mutex); pthread_mutex_lock(in_use->mysys_var->current_mutex);
pthread_cond_broadcast(in_use->mysys_var->current_cond); pthread_cond_broadcast(in_use->mysys_var->current_cond);
......
...@@ -326,11 +326,9 @@ class THD :public ilink { ...@@ -326,11 +326,9 @@ class THD :public ilink {
const char* msg) const char* msg)
{ {
const char* old_msg = proc_info; const char* old_msg = proc_info;
pthread_mutex_lock(&mysys_var->mutex);
mysys_var->current_mutex = mutex; mysys_var->current_mutex = mutex;
mysys_var->current_cond = cond; mysys_var->current_cond = cond;
proc_info = msg; proc_info = msg;
pthread_mutex_unlock(&mysys_var->mutex);
return old_msg; return old_msg;
} }
inline void exit_cond(const char* old_msg) inline void exit_cond(const char* old_msg)
......
...@@ -482,7 +482,7 @@ class delayed_insert :public ilink { ...@@ -482,7 +482,7 @@ class delayed_insert :public ilink {
COPY_INFO info; COPY_INFO info;
I_List<delayed_row> rows; I_List<delayed_row> rows;
uint group_count; uint group_count;
TABLE_LIST *table_list; // Argument TABLE_LIST table_list; // Argument
delayed_insert() delayed_insert()
:locks_in_memory(0), :locks_in_memory(0),
...@@ -611,7 +611,9 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) ...@@ -611,7 +611,9 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
pthread_mutex_unlock(&LOCK_delayed_create); pthread_mutex_unlock(&LOCK_delayed_create);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
tmp->table_list=table_list; // Needed to open table tmp->table_list= *table_list; // Needed to open table
tmp->table_list.db= tmp->thd.db;
tmp->table_list.name= tmp->table_list.real_name=tmp->thd.query;
tmp->lock(); tmp->lock();
pthread_mutex_lock(&tmp->mutex); pthread_mutex_lock(&tmp->mutex);
if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib, if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib,
...@@ -846,10 +848,8 @@ void kill_delayed_threads(void) ...@@ -846,10 +848,8 @@ void kill_delayed_threads(void)
pthread_mutex_lock(&tmp->thd.mysys_var->mutex); pthread_mutex_lock(&tmp->thd.mysys_var->mutex);
if (tmp->thd.mysys_var->current_cond) if (tmp->thd.mysys_var->current_cond)
{ {
if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
pthread_mutex_lock(tmp->thd.mysys_var->current_mutex); pthread_mutex_lock(tmp->thd.mysys_var->current_mutex);
pthread_cond_broadcast(tmp->thd.mysys_var->current_cond); pthread_cond_broadcast(tmp->thd.mysys_var->current_cond);
if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex); pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex);
} }
pthread_mutex_unlock(&tmp->thd.mysys_var->mutex); pthread_mutex_unlock(&tmp->thd.mysys_var->mutex);
...@@ -875,6 +875,7 @@ static pthread_handler_decl(handle_delayed_insert,arg) ...@@ -875,6 +875,7 @@ static pthread_handler_decl(handle_delayed_insert,arg)
thd->thread_id=thread_id++; thd->thread_id=thread_id++;
thd->end_time(); thd->end_time();
threads.append(thd); threads.append(thd);
thd->killed=abort_loop;
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
pthread_mutex_lock(&di->mutex); pthread_mutex_lock(&di->mutex);
...@@ -905,7 +906,7 @@ static pthread_handler_decl(handle_delayed_insert,arg) ...@@ -905,7 +906,7 @@ static pthread_handler_decl(handle_delayed_insert,arg)
/* open table */ /* open table */
if (!(di->table=open_ltable(thd,di->table_list,TL_WRITE_DELAYED))) if (!(di->table=open_ltable(thd,&di->table_list,TL_WRITE_DELAYED)))
{ {
thd->fatal_error=1; // Abort waiting inserts thd->fatal_error=1; // Abort waiting inserts
goto end; goto end;
...@@ -913,7 +914,7 @@ static pthread_handler_decl(handle_delayed_insert,arg) ...@@ -913,7 +914,7 @@ static pthread_handler_decl(handle_delayed_insert,arg)
if (di->table->file->has_transactions()) if (di->table->file->has_transactions())
{ {
thd->fatal_error=1; thd->fatal_error=1;
my_error(ER_ILLEGAL_HA, MYF(0), di->table_list->real_name); my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.real_name);
goto end; goto end;
} }
di->table->copy_blobs=1; di->table->copy_blobs=1;
...@@ -965,10 +966,8 @@ static pthread_handler_decl(handle_delayed_insert,arg) ...@@ -965,10 +966,8 @@ static pthread_handler_decl(handle_delayed_insert,arg)
#endif #endif
/* Information for pthread_kill */ /* Information for pthread_kill */
pthread_mutex_lock(&di->thd.mysys_var->mutex);
di->thd.mysys_var->current_mutex= &di->mutex; di->thd.mysys_var->current_mutex= &di->mutex;
di->thd.mysys_var->current_cond= &di->cond; di->thd.mysys_var->current_cond= &di->cond;
pthread_mutex_unlock(&di->thd.mysys_var->mutex);
di->thd.proc_info=0; di->thd.proc_info=0;
DBUG_PRINT("info",("Waiting for someone to insert rows")); DBUG_PRINT("info",("Waiting for someone to insert rows"));
...@@ -996,10 +995,13 @@ static pthread_handler_decl(handle_delayed_insert,arg) ...@@ -996,10 +995,13 @@ static pthread_handler_decl(handle_delayed_insert,arg)
break; break;
} }
} }
/* We can't lock di->mutex and mysys_var->mutex at the same time */
pthread_mutex_unlock(&di->mutex);
pthread_mutex_lock(&di->thd.mysys_var->mutex); pthread_mutex_lock(&di->thd.mysys_var->mutex);
di->thd.mysys_var->current_mutex= 0; di->thd.mysys_var->current_mutex= 0;
di->thd.mysys_var->current_cond= 0; di->thd.mysys_var->current_cond= 0;
pthread_mutex_unlock(&di->thd.mysys_var->mutex); pthread_mutex_unlock(&di->thd.mysys_var->mutex);
pthread_mutex_lock(&di->mutex);
} }
if (di->tables_in_use && ! thd->lock) if (di->tables_in_use && ! thd->lock)
......
...@@ -414,12 +414,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -414,12 +414,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
log.error=0; log.error=0;
// tell the kill thread how to wake us up // tell the kill thread how to wake us up
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex = log_lock; thd->mysys_var->current_mutex = log_lock;
thd->mysys_var->current_cond = &COND_binlog_update; thd->mysys_var->current_cond = &COND_binlog_update;
const char* proc_info = thd->proc_info; const char* proc_info = thd->proc_info;
thd->proc_info = "Slave connection: waiting for binlog update"; thd->proc_info = "Slave connection: waiting for binlog update";
pthread_mutex_unlock(&thd->mysys_var->mutex);
bool read_packet = 0, fatal_error = 0; bool read_packet = 0, fatal_error = 0;
...@@ -444,6 +442,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -444,6 +442,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
break; break;
case LOG_READ_EOF: case LOG_READ_EOF:
DBUG_PRINT("wait",("waiting for data on binary log")); DBUG_PRINT("wait",("waiting for data on binary log"));
if (!thd->killed)
pthread_cond_wait(&COND_binlog_update, log_lock); pthread_cond_wait(&COND_binlog_update, log_lock);
break; break;
...@@ -694,9 +693,9 @@ void kill_zombie_dump_threads(uint32 slave_server_id) ...@@ -694,9 +693,9 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
thr_alarm_kill(tmp->real_id); thr_alarm_kill(tmp->real_id);
tmp->killed = 1; tmp->killed = 1;
pthread_mutex_lock(&tmp->mysys_var->mutex);
tmp->mysys_var->abort = 1; tmp->mysys_var->abort = 1;
if(tmp->mysys_var->current_mutex) pthread_mutex_lock(&tmp->mysys_var->mutex);
if(tmp->mysys_var->current_cond)
{ {
pthread_mutex_lock(tmp->mysys_var->current_mutex); pthread_mutex_lock(tmp->mysys_var->current_mutex);
pthread_cond_broadcast(tmp->mysys_var->current_cond); pthread_cond_broadcast(tmp->mysys_var->current_cond);
......
...@@ -53,11 +53,9 @@ int mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists) ...@@ -53,11 +53,9 @@ int mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists)
/* mark for close and remove all cached entries */ /* mark for close and remove all cached entries */
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex= &LOCK_open; thd->mysys_var->current_mutex= &LOCK_open;
thd->mysys_var->current_cond= &COND_refresh; thd->mysys_var->current_cond= &COND_refresh;
VOID(pthread_mutex_lock(&LOCK_open)); VOID(pthread_mutex_lock(&LOCK_open));
pthread_mutex_unlock(&thd->mysys_var->mutex);
if (global_read_lock) if (global_read_lock)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment