Commit 2c4b774c authored by unknown's avatar unknown

replication updates. This changeset seems to be working fine on test systems.

If no problems are discovered in the next week, this will make the replication
code ready for 4.0.2 release.


dbug/dbug.c:
  cleanup of my previous fix
sql/mysqld.cc:
  fixed a REALLY NASTY BUG - slave threads were being launched before 
  initialization of global thread keys. Thus if the slave thread was slow
  to start everything worked fine, but if it started quickly, we would get
  into trouble using the unitinialized keys
sql/net_pkg.cc:
  make net_printf() work with 0 error code taking the third argument as 
  format string in that case
sql/slave.cc:
  misc fix-ups and debugging instrumentations
sql/slave.h:
  added skip_log_purge member
sql/sql_class.cc:
  debugging instrumentation to track down random memory corruption
sql/sql_class.h:
  added debugging sentry to THD to track down memory corruption
sql/sql_repl.cc:
  fixed bugs in CHANGE MASTER
parent 5d8b3250
......@@ -706,8 +706,6 @@ char ***_sframep_ __attribute__((unused)))
if (!_no_db_)
{
int save_errno=errno;
if (!init_done)
_db_push_ (_DBUG_START_CONDITION_);
/* Sasha: the test below is so we could call functions with DBUG_ENTER
before my_thread_init(). I needed this because I suspected corruption
of a block allocated by my_thread_init() itself, so I wanted to use
......@@ -715,6 +713,8 @@ char ***_sframep_ __attribute__((unused)))
*/
if (!(state=code_state()))
return;
if (!init_done)
_db_push_ (_DBUG_START_CONDITION_);
*_sfunc_ = state->func;
*_sfile_ = state->file;
......@@ -794,10 +794,10 @@ uint *_slevel_)
if (!_no_db_)
{
int save_errno=errno;
if (!(state=code_state()))
return;
if (!init_done)
_db_push_ ("");
if (!(state=code_state()))
return; /* Only happens at end of program */
if (stack->flags & (TRACE_ON | DEBUG_ON | PROFILE_ON))
{
if (!state->locked)
......
......@@ -1911,45 +1911,6 @@ int main(int argc, char **argv)
using_update_log=1;
}
init_slave();
if (opt_bin_log && !server_id)
{
server_id= !master_host ? 1 : 2;
switch (server_id) {
#ifdef EXTRA_DEBUG
case 1:
sql_print_error("\
Warning: You have enabled the binary log, but you haven't set server-id:\n\
Updates will be logged to the binary log, but connections to slaves will\n\
not be accepted.");
break;
#endif
case 2:
sql_print_error("\
Warning: You should set server-id to a non-0 value if master_host is set.\n\
The server will not act as a slave.");
break;
}
}
if (opt_bin_log)
{
if (!opt_bin_logname)
{
char tmp[FN_REFLEN];
/* TODO: The following should be using fn_format(); We just need to
first change fn_format() to cut the file name if it's too long.
*/
strmake(tmp,glob_hostname,FN_REFLEN-5);
strmov(strcend(tmp,'.'),"-bin");
opt_bin_logname=my_strdup(tmp,MYF(MY_WME));
}
mysql_bin_log.set_index_file_name(opt_binlog_index_name);
open_log(&mysql_bin_log, glob_hostname, opt_bin_logname, "-bin",
LOG_BIN);
using_update_log=1;
}
if (opt_slow_log)
open_log(&mysql_slow_log, glob_hostname, opt_slow_logname, "-slow.log",
LOG_NORMAL);
......@@ -2020,6 +1981,46 @@ The server will not act as a slave.");
if (!opt_noacl)
udf_init();
#endif
/* init_slave() must be called after the thread keys are created */
init_slave();
if (opt_bin_log && !server_id)
{
server_id= !master_host ? 1 : 2;
switch (server_id) {
#ifdef EXTRA_DEBUG
case 1:
sql_print_error("\
Warning: You have enabled the binary log, but you haven't set server-id:\n\
Updates will be logged to the binary log, but connections to slaves will\n\
not be accepted.");
break;
#endif
case 2:
sql_print_error("\
Warning: You should set server-id to a non-0 value if master_host is set.\n\
The server will not act as a slave.");
break;
}
}
if (opt_bin_log)
{
if (!opt_bin_logname)
{
char tmp[FN_REFLEN];
/* TODO: The following should be using fn_format(); We just need to
first change fn_format() to cut the file name if it's too long.
*/
strmake(tmp,glob_hostname,FN_REFLEN-5);
strmov(strcend(tmp,'.'),"-bin");
opt_bin_logname=my_strdup(tmp,MYF(MY_WME));
}
mysql_bin_log.set_index_file_name(opt_binlog_index_name);
open_log(&mysql_bin_log, glob_hostname, opt_bin_logname, "-bin",
LOG_BIN);
using_update_log=1;
}
if (opt_bootstrap)
{
......
......@@ -108,7 +108,11 @@ net_printf(NET *net, uint errcode, ...)
thd->query_error = 1; // if we are here, something is wrong :-)
query_cache_abort(net); // Safety
va_start(args,errcode);
format=ER(errcode);
// Sasha: this is needed to make net_printf() work with 0 argument for
// errorcode and use the argument after that as the format string. This
// is usefull for rare errors that are not worth the hassle to put in
// errmsg.sys, but at the same time, the message is not fixed text
format=errcode ? ER(errcode) : va_arg(args,char*);
offset= net->return_errno ? 2 : 0;
text_pos=(char*) net->buff+head_length+offset+1;
(void) vsprintf(my_const_cast(char*) (text_pos),format,args);
......
......@@ -166,6 +166,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
ulonglong pos, bool need_data_lock,
const char** errmsg)
{
*errmsg=0;
if (rli->log_pos_current)
return 0;
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
......@@ -348,6 +349,7 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
/* is is criticate to test if the slave is running. Otherwise, we might
be referening freed memory trying to kick it
*/
THD_CHECK_SENTRY(thd);
if (*slave_running)
{
KICK_SLAVE(thd);
......@@ -966,6 +968,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->cur_log_fd = -1;
rli->slave_skip_counter=0;
rli->log_pos_current=0;
rli->abort_pos_wait=0;
rli->skip_log_purge=0;
// TODO: make this work with multi-master
if (!opt_relay_logname)
{
......@@ -1296,9 +1300,16 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
bool pos_reached = 0;
int event_count = 0;
pthread_mutex_lock(&data_lock);
while (!thd->killed)
abort_pos_wait=0; // abort only if master info changes during wait
while (!thd->killed || !abort_pos_wait)
{
int cmp_result;
if (abort_pos_wait)
{
abort_pos_wait=0;
pthread_mutex_unlock(&data_lock);
return -1;
}
DBUG_ASSERT(*master_log_name || master_log_pos == 0);
if (*master_log_name)
{
......@@ -1350,10 +1361,7 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
thd->thread_id = thread_id++;
pthread_mutex_unlock(&LOCK_thread_count);
if (init_thr_lock() ||
my_pthread_setspecific_ptr(THR_THD, thd) ||
my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
my_pthread_setspecific_ptr(THR_NET, &thd->net))
if (init_thr_lock() || thd->store_globals())
{
end_thread(thd,0);
DBUG_RETURN(-1);
......@@ -1367,7 +1375,6 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif
thd->mem_root.free=thd->mem_root.used=0; // Probably not needed
if (thd->max_join_size == (ulong) ~0L)
thd->options |= OPTION_BIG_SELECTS;
......@@ -1381,7 +1388,6 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
}
thd->version=refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
......@@ -1611,6 +1617,7 @@ pthread_handler_decl(handle_slave_io,arg)
my_thread_init();
thd = new THD; // note that contructor of THD uses DBUG_ !
DBUG_ENTER("handle_slave_io");
THD_CHECK_SENTRY(thd);
pthread_detach_this_thread();
if (init_slave_thread(thd, SLAVE_THD_IO))
......@@ -1808,11 +1815,12 @@ from master");
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because net.vio is 0
pthread_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
my_thread_end(); // clean-up before broadcast
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
pthread_mutex_unlock(&mi->run_lock);
my_thread_end();
#ifndef DBUG_OFF
if(abort_slave_event_count && !events_till_abort)
goto slave_begin;
......@@ -1848,6 +1856,7 @@ pthread_handler_decl(handle_slave_sql,arg)
my_thread_init();
thd = new THD; // note that contructor of THD uses DBUG_ !
DBUG_ENTER("handle_slave_sql");
THD_CHECK_SENTRY(thd);
pthread_detach_this_thread();
if (init_slave_thread(thd, SLAVE_THD_SQL))
......@@ -1861,6 +1870,7 @@ pthread_handler_decl(handle_slave_sql,arg)
sql_print_error("Failed during slave thread initialization");
goto err;
}
THD_CHECK_SENTRY(thd);
thd->thread_stack = (char*)&thd; // remember where our stack is
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
threads.append(thd);
......@@ -1891,6 +1901,7 @@ log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME,
{
thd->proc_info = "Processing master log event";
DBUG_ASSERT(rli->sql_thd == thd);
THD_CHECK_SENTRY(thd);
if (exec_relay_log_event(thd,rli))
{
// do not scare the user if SQL thread was simply killed or stopped
......@@ -1926,14 +1937,16 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because we are weird
DBUG_ASSERT(rli->sql_thd == thd);
THD_CHECK_SENTRY(thd);
rli->sql_thd = 0;
pthread_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
my_thread_end(); // clean-up before broadcasting termination
pthread_cond_broadcast(&rli->stop_cond);
// tell the world we are done
pthread_mutex_unlock(&rli->run_lock);
my_thread_end();
#ifndef DBUG_OFF // TODO: reconsider the code below
if (abort_slave_event_count && !rli->events_till_abort)
goto slave_begin;
......@@ -2431,12 +2444,34 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
my_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1;
// TODO: make skip_log_purge a start-up option. At this point this
// is not critical priority
if (!rli->skip_log_purge)
{
// purge_first_log will properly set up relay log coordinates in rli
if (rli->relay_log.purge_first_log(rli))
{
errmsg = "Error purging processed log";
goto err;
}
}
else
{
// TODO: verify that no lock is ok here. At this point, if we
// get this wrong, this is actually no big deal - the only time
// this code will ever be executed is if we are recovering from
// a bug when a full reload of the slave is not feasible or
// desirable.
if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/))
{
errmsg = "error switching to the next log";
goto err;
}
rli->relay_log_pos = 4;
strnmov(rli->relay_log_name,rli->linfo.log_file_name,
sizeof(rli->relay_log_name));
flush_relay_log_info(rli);
}
// next log is hot
if (rli->relay_log.is_active(rli->linfo.log_file_name))
......
......@@ -151,10 +151,13 @@ typedef struct st_relay_log_info
char last_slave_error[MAX_SLAVE_ERRMSG];
THD* sql_thd;
bool log_pos_current;
bool abort_pos_wait;
bool skip_log_purge;
st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0),
cur_log_init_count(0),
log_pos_current(0)
log_pos_current(0),abort_pos_wait(0),
skip_log_purge(0)
{
relay_log_name[0] = master_log_name[0] = 0;
bzero(&info_file,sizeof(info_file));
......
......@@ -104,6 +104,9 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
cond_count=0;
convert_set=0;
mysys_var=0;
#ifndef DBUG_OFF
dbug_sentry=THD_SENTRY_MAGIC;
#endif
net.vio=0;
ull=0;
system_thread=cleanup_done=0;
......@@ -191,6 +194,7 @@ void THD::cleanup(void)
THD::~THD()
{
THD_CHECK_SENTRY(this);
DBUG_ENTER("~THD()");
/* Close connection */
if (net.vio)
......@@ -223,12 +227,16 @@ THD::~THD()
mysys_var=0; // Safety (shouldn't be needed)
#ifdef SIGNAL_WITH_VIO_CLOSE
pthread_mutex_destroy(&active_vio_lock);
#endif
#ifndef DBUG_OFF
dbug_sentry = THD_SENTRY_GONE;
#endif
DBUG_VOID_RETURN;
}
void THD::awake(bool prepare_to_die)
{
THD_CHECK_SENTRY(this);
if (prepare_to_die)
killed = 1;
thr_alarm_kill(real_id);
......
......@@ -251,6 +251,11 @@ class i_string_pair: public ilink
class delayed_insert;
#define THD_SENTRY_MAGIC 0xfeedd1ff
#define THD_SENTRY_GONE 0xdeadbeef
#define THD_CHECK_SENTRY(thd) DBUG_ASSERT(thd->dbug_sentry == THD_SENTRY_MAGIC)
/* For each client connection we create a separate thread with THD serving as
a thread/connection descriptor */
......@@ -312,6 +317,9 @@ class THD :public ilink {
// TODO: document the variables below
MYSQL_LOCK *lock,*locked_tables;
ULL *ull;
#ifndef DBUG_OFF
uint dbug_sentry; // watch out for memory corruption
#endif
struct st_my_thread_var *mysys_var;
enum enum_server_command command;
uint32 server_id;
......
......@@ -714,7 +714,10 @@ int change_master(THD* thd, MASTER_INFO* mi)
return 1;
}
pthread_mutex_lock(&mi->data_lock);
/* data lock not needed since we have already stopped the running threads,
and we have the hold on the run locks which will keep all threads that
could possibly modify the data structures from running
*/
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
{
// if we change host or port, we must reset the postion
......@@ -746,6 +749,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->relay_log_name)
{
need_relay_log_purge = 0;
mi->rli.skip_log_purge=1;
strnmov(mi->rli.relay_log_name,lex_mi->relay_log_name,
sizeof(mi->rli.relay_log_name));
}
......@@ -759,16 +763,14 @@ int change_master(THD* thd, MASTER_INFO* mi)
flush_master_info(mi);
if (need_relay_log_purge)
{
pthread_mutex_unlock(&mi->data_lock);
mi->rli.skip_log_purge=0;
thd->proc_info="purging old relay logs";
if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/,
&errmsg))
{
send_error(&thd->net, 0, "Failed purging old relay logs");
unlock_slave_threads(mi);
net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg);
return 1;
}
pthread_mutex_lock(&mi->rli.data_lock);
}
else
{
......@@ -778,6 +780,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
0 /*no data lock*/,
&msg))
{
//Sasha: note that I had to change net_printf() to make this work
net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg);
unlock_slave_threads(mi);
return 1;
......@@ -789,7 +792,10 @@ int change_master(THD* thd, MASTER_INFO* mi)
sizeof(mi->rli.master_log_name));
if (!mi->rli.master_log_name[0]) // uninitialized case
mi->rli.master_log_pos=0;
pthread_cond_broadcast(&mi->rli.data_cond);
pthread_mutex_lock(&mi->rli.data_lock);
mi->rli.abort_pos_wait = 1;
pthread_cond_broadcast(&mi->data_cond);
pthread_mutex_unlock(&mi->rli.data_lock);
thd->proc_info = "starting slave";
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment