Commit cd825a19 authored by unknown's avatar unknown

more work on IO_CACHE

portability fixes for systems with broken syscalls that do not interrupt on 
a signal
temporary commit - will not be pushed, need to sync up


include/my_sys.h:
  work on READ_APPEND cache
mysys/Makefile.am:
  change to test IO_CACHE
mysys/mf_iocache.c:
  work on READ_APPEND cache
BitKeeper/etc/ignore:
  Added mysys/#mf_iocache.c# mysys/test_io_cache to the ignore list
sql/mysqld.cc:
  make shutdown work on broken systems
sql/sql_repl.cc:
  make sure slave can be stopped on broken systems in all cases, clean-up
parent 368c6c1f
...@@ -424,3 +424,5 @@ vio/test-ssl ...@@ -424,3 +424,5 @@ vio/test-ssl
vio/test-sslclient vio/test-sslclient
vio/test-sslserver vio/test-sslserver
vio/viotest-ssl vio/viotest-ssl
mysys/#mf_iocache.c#
mysys/test_io_cache
...@@ -293,6 +293,16 @@ typedef struct st_dynamic_string { ...@@ -293,6 +293,16 @@ typedef struct st_dynamic_string {
struct st_io_cache; struct st_io_cache;
typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
#ifdef THREAD
#define lock_append_buffer(info) \
pthread_mutex_lock(&(info)->append_buffer_lock)
#define unlock_append_buffer(info) \
pthread_mutex_unlock(&(info)->append_buffer_lock)
#else
#define lock_append_buffer(info)
#define unlock_append_buffer(info)
#endif
typedef struct st_io_cache /* Used when cacheing files */ typedef struct st_io_cache /* Used when cacheing files */
{ {
my_off_t pos_in_file,end_of_file; my_off_t pos_in_file,end_of_file;
...@@ -301,7 +311,7 @@ typedef struct st_io_cache /* Used when cacheing files */ ...@@ -301,7 +311,7 @@ typedef struct st_io_cache /* Used when cacheing files */
that will use a buffer allocated somewhere that will use a buffer allocated somewhere
else else
*/ */
byte *append_buffer, *append_pos, *append_end; byte *append_buffer, *append_read_pos, *append_write_pos, *append_end;
/* for append buffer used in READ_APPEND cache */ /* for append buffer used in READ_APPEND cache */
#ifdef THREAD #ifdef THREAD
pthread_mutex_t append_buffer_lock; pthread_mutex_t append_buffer_lock;
...@@ -348,10 +358,15 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); ...@@ -348,10 +358,15 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *);
_my_b_get(info)) _my_b_get(info))
#define my_b_write(info,Buffer,Count) \ #define my_b_write(info,Buffer,Count) \
((info)->type != SEQ_READ_APPEND) ? (\
((info)->rc_pos + (Count) <= (info)->rc_end ?\ ((info)->rc_pos + (Count) <= (info)->rc_end ?\
(memcpy((info)->rc_pos,Buffer,(size_t) (Count)), \ (memcpy((info)->rc_pos,Buffer,(size_t) (Count)), \
((info)->rc_pos+=(Count)),0) :\ ((info)->rc_pos+=(Count)),0) :\
_my_b_write(info,Buffer,Count)) _my_b_write(info,Buffer,Count))) : \
((info)->append_write_pos + (Count) <= (info)->append_end ?\
(memcpy((info)->append_write_pos,Buffer,(size_t)Count), \
((info)->append_write_pos+=(Count),0)) : \
_my_b_append(info,Buffer,Count))
/* my_b_write_byte dosn't have any err-check */ /* my_b_write_byte dosn't have any err-check */
#define my_b_write_byte(info,chr) \ #define my_b_write_byte(info,chr) \
...@@ -564,6 +579,7 @@ extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count); ...@@ -564,6 +579,7 @@ extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_get(IO_CACHE *info); extern int _my_b_get(IO_CACHE *info);
extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count); extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count);
extern int _my_b_append(IO_CACHE *info,const byte *Buffer,uint Count);
extern int my_block_write(IO_CACHE *info, const byte *Buffer, extern int my_block_write(IO_CACHE *info, const byte *Buffer,
uint Count, my_off_t pos); uint Count, my_off_t pos);
extern int flush_io_cache(IO_CACHE *info); extern int flush_io_cache(IO_CACHE *info);
......
...@@ -95,6 +95,10 @@ test_vsnprintf: my_vsnprintf.c $(LIBRARIES) ...@@ -95,6 +95,10 @@ test_vsnprintf: my_vsnprintf.c $(LIBRARIES)
$(CP) $(srcdir)/my_vsnprintf.c test_vsnprintf.c $(CP) $(srcdir)/my_vsnprintf.c test_vsnprintf.c
$(LINK) $(FLAGS) -DMAIN ./test_vsnprintf.c $(LDADD) $(LIBS) $(LINK) $(FLAGS) -DMAIN ./test_vsnprintf.c $(LDADD) $(LIBS)
$(RM) -f test_vsnprintf.* $(RM) -f test_vsnprintf.*
test_io_cache: mf_iocache.c $(LIBRARIES)
$(CP) $(srcdir)/mf_iocache.c test_io_cache.c
$(LINK) $(FLAGS) -DMAIN ./test_io_cache.c $(LDADD) $(LIBS)
$(RM) -f test_io_cache.*
test_dir: test_dir.c $(LIBRARIES) test_dir: test_dir.c $(LIBRARIES)
$(LINK) $(FLAGS) -DMAIN $(srcdir)/test_dir.c $(LDADD) $(LIBS) $(LINK) $(FLAGS) -DMAIN $(srcdir)/test_dir.c $(LDADD) $(LIBS)
......
...@@ -41,6 +41,10 @@ static void my_aiowait(my_aio_result *result); ...@@ -41,6 +41,10 @@ static void my_aiowait(my_aio_result *result);
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#ifdef MAIN
#include <my_dir.h>
#endif
static void init_read_function(IO_CACHE* info, enum cache_type type); static void init_read_function(IO_CACHE* info, enum cache_type type);
static void init_read_function(IO_CACHE* info, enum cache_type type) static void init_read_function(IO_CACHE* info, enum cache_type type)
...@@ -152,7 +156,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, ...@@ -152,7 +156,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->rc_request_pos=info->rc_pos=info->buffer; info->rc_request_pos=info->rc_pos=info->buffer;
if (type == SEQ_READ_APPEND) if (type == SEQ_READ_APPEND)
{ {
info->append_pos = info->append_buffer; info->append_read_pos = info->append_write_pos = info->append_buffer;
info->append_end = info->append_buffer + info->buffer_length; info->append_end = info->append_buffer + info->buffer_length;
#ifdef THREAD #ifdef THREAD
pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST); pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
...@@ -277,6 +281,10 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, ...@@ -277,6 +281,10 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
~(my_off_t) 0); ~(my_off_t) 0);
} }
} }
if (info->type == SEQ_READ_APPEND)
{
info->append_read_pos = info->append_write_pos = info->append_buffer;
}
info->type=type; info->type=type;
info->error=0; info->error=0;
init_read_function(info,type); init_read_function(info,type);
...@@ -294,7 +302,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, ...@@ -294,7 +302,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
info->inited=0; info->inited=0;
#endif #endif
DBUG_RETURN(0); DBUG_RETURN(0);
} /* init_io_cache */ } /* reinit_io_cache */
...@@ -377,11 +385,19 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -377,11 +385,19 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
return 0; return 0;
} }
/* Do sequential read from the SEQ_READ_APPEND cache
we do this in three stages:
- first read from info->buffer
- then if there are still data to read, try the file descriptor
- afterwards, if there are still data to read, try append buffer
*/
int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
{ {
uint length,diff_length,left_length; uint length,diff_length,left_length,save_count;
my_off_t max_length, pos_in_file; my_off_t max_length, pos_in_file;
save_count=Count;
/* first, read the regular buffer */
if ((left_length=(uint) (info->rc_end-info->rc_pos))) if ((left_length=(uint) (info->rc_end-info->rc_pos)))
{ {
dbug_assert(Count >= left_length); /* User is not using my_b_read() */ dbug_assert(Count >= left_length); /* User is not using my_b_read() */
...@@ -390,30 +406,33 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -390,30 +406,33 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
Count-=left_length; Count-=left_length;
} }
/* pos_in_file always point on where info->buffer was read */ /* pos_in_file always point on where info->buffer was read */
pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >=
info->end_of_file)
{
info->pos_in_file=pos_in_file;
goto read_append_buffer;
}
/* no need to seek since the read is guaranteed to be sequential */ /* no need to seek since the read is guaranteed to be sequential */
diff_length=(uint) (pos_in_file & (IO_SIZE-1)); diff_length=(uint) (pos_in_file & (IO_SIZE-1));
#ifdef THREAD
pthread_mutex_lock(&info->append_buffer_lock); /* now the second stage begins - read from file descriptor */
#endif
#ifdef THREAD
pthread_mutex_unlock(&info->append_buffer_lock);
#endif
if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
{ /* Fill first intern buffer */ { /* Fill first intern buffer */
uint read_length; uint read_length;
if (info->end_of_file == pos_in_file) if (info->end_of_file == pos_in_file)
{ /* End of file */ { /* End of file */
info->error=(int) left_length; goto read_append_buffer;
return 1;
} }
length=(Count & (uint) ~(IO_SIZE-1))-diff_length; length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
!= (uint) length) != (uint) length)
{ {
info->error= read_length == (uint) -1 ? -1 : if (read_length != (uint)-1)
(int) (read_length+left_length); {
return 1; Count -= read_length;
Buffer += read_length;
}
goto read_append_buffer;
} }
Count-=length; Count-=length;
Buffer+=length; Buffer+=length;
...@@ -422,15 +441,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -422,15 +441,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
diff_length=0; diff_length=0;
} }
max_length=info->read_length-diff_length; max_length=info->read_length-diff_length;
if (info->type != READ_FIFO && if ((info->end_of_file - pos_in_file) < max_length)
(info->end_of_file - pos_in_file) < max_length)
max_length = info->end_of_file - pos_in_file; max_length = info->end_of_file - pos_in_file;
if (!max_length) if (!max_length)
{ {
if (Count) if (Count)
{ {
info->error= left_length; /* We only got this many char */ goto read_append_buffer;
return 1;
} }
length=0; /* Didn't read any chars */ length=0; /* Didn't read any chars */
} }
...@@ -439,15 +456,36 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -439,15 +456,36 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
length == (uint) -1) length == (uint) -1)
{ {
if (length != (uint) -1) if (length != (uint) -1)
{
memcpy(Buffer,info->buffer,(size_t) length); memcpy(Buffer,info->buffer,(size_t) length);
info->error= length == (uint) -1 ? -1 : (int) (length+left_length); Count -= length;
return 1; Buffer += length;
}
goto read_append_buffer;
} }
info->rc_pos=info->buffer+Count; info->rc_pos=info->buffer+Count;
info->rc_end=info->buffer+length; info->rc_end=info->buffer+length;
info->pos_in_file=pos_in_file; info->pos_in_file=pos_in_file;
memcpy(Buffer,info->buffer,(size_t) Count); memcpy(Buffer,info->buffer,(size_t) Count);
return 0; return 0;
read_append_buffer:
lock_append_buffer(info);
if (!Count) return 0;
{
uint copy_len = (uint)(info->append_read_pos -
info->append_write_pos);
dbug_assert(info->append_read_pos <= info->append_write_pos);
if (copy_len > Count)
copy_len = Count;
memcpy(Buffer, info->append_read_pos,
copy_len);
info->append_read_pos += copy_len;
Count -= copy_len;
if (Count)
info->error = save_count - Count;
}
unlock_append_buffer(info);
return Count ? 1 : 0;
} }
#ifdef HAVE_AIOWAIT #ifdef HAVE_AIOWAIT
...@@ -672,6 +710,31 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) ...@@ -672,6 +710,31 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
return 0; return 0;
} }
int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
{
uint rest_length,length;
rest_length=(uint) (info->append_end -
info->append_write_pos);
memcpy(info->append_write_pos,Buffer,(size_t) rest_length);
Buffer+=rest_length;
Count-=rest_length;
info->append_write_pos+=rest_length;
if (flush_io_cache(info))
return 1;
if (Count >= IO_SIZE)
{ /* Fill first intern buffer */
length=Count & (uint) ~(IO_SIZE-1);
if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
return info->error= -1;
Count-=length;
Buffer+=length;
}
memcpy(info->append_write_pos,Buffer,(size_t) Count);
info->append_write_pos+=Count;
return 0;
}
/* /*
Write a block to disk where part of the data may be inside the record Write a block to disk where part of the data may be inside the record
...@@ -756,6 +819,30 @@ int flush_io_cache(IO_CACHE *info) ...@@ -756,6 +819,30 @@ int flush_io_cache(IO_CACHE *info)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
} }
else if (info->type == SEQ_READ_APPEND)
{
if (info->file == -1)
{
if (real_open_cached_file(info))
DBUG_RETURN((info->error= -1));
}
lock_append_buffer(info);
if (info->append_write_pos != info->append_buffer)
{
length=(uint) (info->append_write_pos - info->append_buffer);
info->append_read_pos=info->append_write_pos=info->append_buffer;
info->append_end=(info->append_buffer+info->buffer_length-
(info->pos_in_file & (IO_SIZE-1)));
if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP))
{
unlock_append_buffer(info);
DBUG_RETURN((info->error= -1));
}
unlock_append_buffer(info);
DBUG_RETURN(0);
}
unlock_append_buffer(info);
}
#ifdef HAVE_AIOWAIT #ifdef HAVE_AIOWAIT
else if (info->type != READ_NET) else if (info->type != READ_NET)
{ {
...@@ -784,3 +871,89 @@ int end_io_cache(IO_CACHE *info) ...@@ -784,3 +871,89 @@ int end_io_cache(IO_CACHE *info)
DBUG_RETURN(error); DBUG_RETURN(error);
} /* end_io_cache */ } /* end_io_cache */
#ifdef MAIN
void die(const char* fmt, ...)
{
va_list va_args;
va_start(va_args,fmt);
fprintf(stderr,"Error:");
vfprintf(stderr, fmt,va_args);
fprintf(stderr,", errno=%d\n", errno);
exit(1);
}
int open_file(const char* fname, IO_CACHE* info, int cache_size)
{
int fd;
if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0)
die("Could not open %s", fname);
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
die("failed in init_io_cache()");
return fd;
}
void close_file(IO_CACHE* info)
{
end_io_cache(info);
my_close(info->file, MYF(MY_WME));
}
int main(int argc, char** argv)
{
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
MY_STAT status;
const char* fname="/tmp/iocache.test";
int cache_size=16384;
char llstr_buf[22];
int max_block,total_bytes=0;
int i,num_loops=100,error=0;
char *p;
char* block, *block_end;
MY_INIT(argv[0]);
max_block = cache_size*3;
if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
die("Not enough memory to allocate test block");
block_end = block + max_block;
for (p = block,i=0; p < block_end;i++)
{
*p++ = (char)i;
}
if (my_stat(fname,&status, MYF(0)) &&
my_delete(fname,MYF(MY_WME)))
{
die("Delete of %s failed, aborting", fname);
}
open_file(fname,&sra_cache, cache_size);
for (i = 0; i < num_loops; i++)
{
char buf[4];
int block_size = abs(rand() % max_block);
int4store(buf, block_size);
if (my_b_write(&sra_cache,buf,4) ||
my_b_write(&sra_cache, block, block_size))
die("write failed");
total_bytes += 4+block_size;
}
close_file(&sra_cache);
my_free(block,MYF(MY_WME));
if (!my_stat(fname,&status,MYF(MY_WME)))
die("%s failed to stat, but I had just closed it,\
wonder how that happened");
printf("Final size of %s is %s, wrote %d bytes\n",fname,
llstr(status.st_size,llstr_buf),
total_bytes);
my_delete(fname, MYF(MY_WME));
/* check correctness of tests */
if (total_bytes != status.st_size)
{
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
supposedly written\n");
error=1;
}
exit(error);
return 0;
}
#endif
...@@ -45,6 +45,11 @@ ...@@ -45,6 +45,11 @@
char pstack_file_name[80]; char pstack_file_name[80];
#endif /* __linux__ */ #endif /* __linux__ */
#if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ)
#define HAVE_CLOSE_SERVER_SOCK 1
void close_server_sock();
#endif
extern "C" { // Because of SCO 3.2V4.2 extern "C" { // Because of SCO 3.2V4.2
#include <errno.h> #include <errno.h>
#include <sys/stat.h> #include <sys/stat.h>
...@@ -453,16 +458,7 @@ static void close_connections(void) ...@@ -453,16 +458,7 @@ static void close_connections(void)
sql_print_error("Got error %d from pthread_cond_timedwait",error); sql_print_error("Got error %d from pthread_cond_timedwait",error);
#endif #endif
#if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ) #if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ)
if (ip_sock != INVALID_SOCKET) close_server_sock();
{
DBUG_PRINT("error",("closing TCP/IP and socket files"));
VOID(shutdown(ip_sock,2));
VOID(closesocket(ip_sock));
VOID(shutdown(unix_sock,2));
VOID(closesocket(unix_sock));
VOID(unlink(mysql_unix_port));
ip_sock=unix_sock= INVALID_SOCKET;
}
#endif #endif
} }
(void) pthread_mutex_unlock(&LOCK_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count);
...@@ -577,10 +573,37 @@ static void close_connections(void) ...@@ -577,10 +573,37 @@ static void close_connections(void)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
#ifdef HAVE_CLOSE_SERVER_SOCK
void close_server_sock()
{
DBUG_ENTER("close_server_sock");
if (ip_sock != INVALID_SOCKET)
{
DBUG_PRINT("info",("closing TCP/IP socket"));
VOID(shutdown(ip_sock,2));
VOID(closesocket(ip_sock));
ip_sock=INVALID_SOCKET;
}
if (unix_sock != INVALID_SOCKET)
{
DBUG_PRINT("info",("closing Unix socket"));
VOID(shutdown(unix_sock,2));
VOID(closesocket(unix_sock));
VOID(unlink(mysql_unix_port));
unix_sock=INVALID_SOCKET;
}
DBUG_VOID_RETURN;
}
#endif
void kill_mysql(void) void kill_mysql(void)
{ {
DBUG_ENTER("kill_mysql"); DBUG_ENTER("kill_mysql");
#ifdef SIGNALS_DONT_BREAK_READ
close_server_sock(); /* force accept to wake up */
#endif
#if defined(__WIN__) #if defined(__WIN__)
{ {
if (!SetEvent(hEventShutdown)) if (!SetEvent(hEventShutdown))
...@@ -604,6 +627,7 @@ void kill_mysql(void) ...@@ -604,6 +627,7 @@ void kill_mysql(void)
#endif #endif
DBUG_PRINT("quit",("After pthread_kill")); DBUG_PRINT("quit",("After pthread_kill"));
shutdown_in_progress=1; // Safety if kill didn't work shutdown_in_progress=1; // Safety if kill didn't work
abort_loop=1;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -2023,6 +2047,7 @@ The server will not act as a slave."); ...@@ -2023,6 +2047,7 @@ The server will not act as a slave.");
sql_print_error("Before Lock_thread_count"); sql_print_error("Before Lock_thread_count");
#endif #endif
(void) pthread_mutex_lock(&LOCK_thread_count); (void) pthread_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("Got thread_count mutex"));
select_thread_in_use=0; // For close_connections select_thread_in_use=0; // For close_connections
(void) pthread_cond_broadcast(&COND_thread_count); (void) pthread_cond_broadcast(&COND_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count);
...@@ -2054,10 +2079,14 @@ The server will not act as a slave."); ...@@ -2054,10 +2079,14 @@ The server will not act as a slave.");
#endif /* HAVE_OPENSSL */ #endif /* HAVE_OPENSSL */
/* Wait until cleanup is done */ /* Wait until cleanup is done */
(void) pthread_mutex_lock(&LOCK_thread_count); (void) pthread_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("Got thread_count mutex for clean up wait"));
while (!ready_to_exit) while (!ready_to_exit)
{ {
DBUG_PRINT("quit", ("not yet ready to exit"));
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count); pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
} }
DBUG_PRINT("quit", ("ready to exit"));
(void) pthread_mutex_unlock(&LOCK_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count);
my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0); my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
exit(0); exit(0);
...@@ -2253,6 +2282,20 @@ static void create_new_thread(THD *thd) ...@@ -2253,6 +2282,20 @@ static void create_new_thread(THD *thd)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
#ifdef SIGNALS_DONT_BREAK_READ
inline void kill_broken_server()
{
/* hack to get around signals ignored in syscalls for problem OS's */
if (unix_sock == INVALID_SOCKET || ip_sock ==INVALID_SOCKET)
{
select_thread_in_use = 0;
kill_server((void*)MYSQL_KILL_SIGNAL); /* never returns */
}
}
#define MAYBE_BROKEN_SYSCALL kill_broken_server();
#else
#define MAYBE_BROKEN_SYSCALL
#endif
/* Handle new connections and spawn new process to handle them */ /* Handle new connections and spawn new process to handle them */
...@@ -2288,6 +2331,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) ...@@ -2288,6 +2331,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused)))
#endif #endif
DBUG_PRINT("general",("Waiting for connections.")); DBUG_PRINT("general",("Waiting for connections."));
MAYBE_BROKEN_SYSCALL;
while (!abort_loop) while (!abort_loop)
{ {
readFDs=clientFDs; readFDs=clientFDs;
...@@ -2302,12 +2346,15 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) ...@@ -2302,12 +2346,15 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused)))
if (!select_errors++ && !abort_loop) /* purecov: inspected */ if (!select_errors++ && !abort_loop) /* purecov: inspected */
sql_print_error("mysqld: Got error %d from select",socket_errno); /* purecov: inspected */ sql_print_error("mysqld: Got error %d from select",socket_errno); /* purecov: inspected */
} }
MAYBE_BROKEN_SYSCALL
continue; continue;
} }
#endif /* HPUX */ #endif /* HPUX */
if (abort_loop) if (abort_loop)
{
MAYBE_BROKEN_SYSCALL;
break; break;
}
/* /*
** Is this a new connection request ** Is this a new connection request
*/ */
...@@ -2343,6 +2390,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) ...@@ -2343,6 +2390,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused)))
if (new_sock != INVALID_SOCKET || if (new_sock != INVALID_SOCKET ||
(socket_errno != SOCKET_EINTR && socket_errno != SOCKET_EAGAIN)) (socket_errno != SOCKET_EINTR && socket_errno != SOCKET_EAGAIN))
break; break;
MAYBE_BROKEN_SYSCALL;
#if !defined(NO_FCNTL_NONBLOCK) #if !defined(NO_FCNTL_NONBLOCK)
if (!(test_flags & TEST_BLOCKING)) if (!(test_flags & TEST_BLOCKING))
{ {
...@@ -2359,6 +2407,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) ...@@ -2359,6 +2407,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused)))
{ {
if ((error_count++ & 255) == 0) // This can happen often if ((error_count++ & 255) == 0) // This can happen often
sql_perror("Error in accept"); sql_perror("Error in accept");
MAYBE_BROKEN_SYSCALL;
if (socket_errno == SOCKET_ENFILE || socket_errno == SOCKET_EMFILE) if (socket_errno == SOCKET_ENFILE || socket_errno == SOCKET_EMFILE)
sleep(1); // Give other threads some time sleep(1); // Give other threads some time
continue; continue;
......
...@@ -38,6 +38,13 @@ bool opt_sporadic_binlog_dump_fail = 0; ...@@ -38,6 +38,13 @@ bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0; static int binlog_dump_count = 0;
#endif #endif
#ifdef SIGNAL_WITH_VIO_CLOSE
#define KICK_SLAVE { slave_thd->close_active_vio(); \
thr_alarm_kill(slave_real_id); }
#else
#define KICK_SLAVE thr_alarm_kill(slave_real_id);
#endif
static Slave_log_event* find_slave_event(IO_CACHE* log, static Slave_log_event* find_slave_event(IO_CACHE* log,
const char* log_file_name, const char* log_file_name,
char* errmsg); char* errmsg);
...@@ -700,10 +707,7 @@ int stop_slave(THD* thd, bool net_report ) ...@@ -700,10 +707,7 @@ int stop_slave(THD* thd, bool net_report )
if (slave_running) if (slave_running)
{ {
abort_slave = 1; abort_slave = 1;
thr_alarm_kill(slave_real_id); KICK_SLAVE;
#ifdef SIGNAL_WITH_VIO_CLOSE
slave_thd->close_active_vio();
#endif
// do not abort the slave in the middle of a query, so we do not set // do not abort the slave in the middle of a query, so we do not set
// thd->killed for the slave thread // thd->killed for the slave thread
thd->proc_info = "waiting for slave to die"; thd->proc_info = "waiting for slave to die";
...@@ -728,7 +732,7 @@ int stop_slave(THD* thd, bool net_report ) ...@@ -728,7 +732,7 @@ int stop_slave(THD* thd, bool net_report )
#endif #endif
pthread_cond_timedwait(&COND_slave_stopped, &LOCK_slave, &abstime); pthread_cond_timedwait(&COND_slave_stopped, &LOCK_slave, &abstime);
if (slave_running) if (slave_running)
thr_alarm_kill(slave_real_id); KICK_SLAVE;
} }
} }
else else
...@@ -818,7 +822,7 @@ int change_master(THD* thd) ...@@ -818,7 +822,7 @@ int change_master(THD* thd)
if ((slave_was_running = slave_running)) if ((slave_was_running = slave_running))
{ {
abort_slave = 1; abort_slave = 1;
thr_alarm_kill(slave_real_id); KICK_SLAVE;
thd->proc_info = "waiting for slave to die"; thd->proc_info = "waiting for slave to die";
while (slave_running) while (slave_running)
pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
...@@ -1470,7 +1474,7 @@ int load_master_data(THD* thd) ...@@ -1470,7 +1474,7 @@ int load_master_data(THD* thd)
if ((slave_was_running = slave_running)) if ((slave_was_running = slave_running))
{ {
abort_slave = 1; abort_slave = 1;
thr_alarm_kill(slave_real_id); KICK_SLAVE;
thd->proc_info = "waiting for slave to die"; thd->proc_info = "waiting for slave to die";
while (slave_running) while (slave_running)
pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment