slave.cc 89.6 KB
Newer Older
1
/* Copyright (C) 2000-2003 MySQL AB
unknown's avatar
unknown committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
   
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#include "mysql_priv.h"
#include <mysql.h>
20
#include <myisam.h>
unknown's avatar
unknown committed
21
#include "mini_client.h"
22
#include "slave.h"
23
#include "sql_repl.h"
24
#include "repl_failsafe.h"
unknown's avatar
unknown committed
25
#include <thr_alarm.h>
unknown's avatar
unknown committed
26
#include <my_dir.h>
27
#include <assert.h>
unknown's avatar
unknown committed
28

29 30 31
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;

32 33
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);

34
volatile bool slave_sql_running = 0, slave_io_running = 0;
35
char* slave_load_tmpdir = 0;
unknown's avatar
unknown committed
36
MASTER_INFO *active_mi;
37
volatile int active_mi_in_use = 0;
38
HASH replicate_do_table, replicate_ignore_table;
unknown's avatar
unknown committed
39
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
40
bool do_table_inited = 0, ignore_table_inited = 0;
unknown's avatar
unknown committed
41
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
42
bool table_rules_on = 0;
43
ulonglong relay_log_space_limit = 0;
unknown's avatar
unknown committed
44 45 46 47 48 49 50

/*
  When slave thread exits, we need to remember the temporary tables so we
  can re-use them on slave start.

  TODO: move the vars below under MASTER_INFO
*/
51

52
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
53
int events_till_abort = -1;
54
static int events_till_disconnect = -1;
unknown's avatar
unknown committed
55

56
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
unknown's avatar
unknown committed
57

58
void skip_load_data_infile(NET* net);
59
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
unknown's avatar
unknown committed
60
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
unknown's avatar
unknown committed
61
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
62 63
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
unknown's avatar
unknown committed
64
static int count_relay_log_space(RELAY_LOG_INFO* rli);
65
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
66
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
unknown's avatar
unknown committed
67 68
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings);
unknown's avatar
unknown committed
69
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
70
			     bool reconnect, bool suppress_warnings);
71 72
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg);
73
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
unknown's avatar
unknown committed
74 75
static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name);
76
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
77
char* rewrite_db(char* db);
78

79 80 81 82 83 84

/*
  Get a bit mask for which threads are running so that we later can
  restart these threads
*/

85 86 87 88 89 90 91 92
void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
{
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
  register int tmp_mask=0;
  if (set_io)
    tmp_mask |= SLAVE_IO;
  if (set_sql)
    tmp_mask |= SLAVE_SQL;
93 94
  if (inverse)
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
95 96 97
  *mask = tmp_mask;
}

98

99 100 101 102 103 104 105 106 107 108 109 110 111 112
void lock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_lock(&mi->run_lock);
  pthread_mutex_lock(&mi->rli.run_lock);
}

void unlock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_unlock(&mi->rli.run_lock);
  pthread_mutex_unlock(&mi->run_lock);
}

113

114 115
int init_slave()
{
116
  DBUG_ENTER("init_slave");
117 118 119 120 121

  /*
    TODO: re-write this to interate through the list of files
    for multi-master
  */
unknown's avatar
unknown committed
122
  active_mi= new MASTER_INFO;
123 124

  /*
125 126 127
    If master_host is not specified, try to read it from the master_info file.
    If master_host is specified, create the master_info file if it doesn't
    exists.
128
  */
unknown's avatar
unknown committed
129 130
  if (!active_mi ||
      init_master_info(active_mi,master_info_file,relay_log_info_file,
131
		       !master_host))
132
  {
unknown's avatar
unknown committed
133 134
    sql_print_error("Note: Failed to initialized master info");
    goto err;
135
  }
136 137 138 139 140 141 142 143 144

  /*
    make sure slave thread gets started if server_id is set,
    valid master.info is present, and master_host has not been specified
  */
  if (server_id && !master_host && active_mi->host[0])
    master_host= active_mi->host;

  if (master_host && !opt_skip_slave_start)
145
  {
146 147 148 149 150 151
    if (start_slave_threads(1 /* need mutex */,
			    0 /* no wait for start*/,
			    active_mi,
			    master_info_file,
			    relay_log_info_file,
			    SLAVE_IO | SLAVE_SQL))
unknown's avatar
unknown committed
152
    {
153
      sql_print_error("Warning: Can't create threads to handle slave");
unknown's avatar
unknown committed
154 155
      goto err;
    }
156
  }
157
  DBUG_RETURN(0);
unknown's avatar
unknown committed
158 159 160

err:
  DBUG_RETURN(1);
161 162
}

163

164 165
static void free_table_ent(TABLE_RULE_ENT* e)
{
unknown's avatar
unknown committed
166
  my_free((gptr) e, MYF(0));
167 168 169 170 171 172 173 174 175
}

static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
			   my_bool not_used __attribute__((unused)))
{
  *len = e->key_len;
  return (byte*)e->db;
}

176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204

/*
  Open the given relay log

  SYNOPSIS
    init_relay_log_pos()
    rli			Relay information (will be initialized)
    log			Name of relay log file to read from. NULL = First log
    pos			Position in relay log file 
    need_data_lock	Set to 1 if this functions should do mutex locks
    errmsg		Store pointer to error message here

  DESCRIPTION
  - Close old open relay log files.
  - If we are using the same relay log as the running IO-thread, then set
    rli->cur_log to point to the same IO_CACHE entry.
  - If not, open the 'log' binary file.

  TODO
    - check proper initialization of master_log_name/master_log_pos
    - We may always want to delete all logs before 'log'.
      Currently if we are not calling this with 'log' as NULL or the first
      log we will never delete relay logs.
      If we want this we should not set skip_log_purge to 1.

  RETURN VALUES
    0	ok
    1	error.  errmsg is set to point to the error message
*/
unknown's avatar
unknown committed
205

206 207 208 209
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
		       ulonglong pos, bool need_data_lock,
		       const char** errmsg)
{
unknown's avatar
unknown committed
210 211
  DBUG_ENTER("init_relay_log_pos");

212
  *errmsg=0;
213 214 215 216 217
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
  pthread_mutex_lock(log_lock);
  if (need_data_lock)
    pthread_mutex_lock(&rli->data_lock);
  
218
  /* Close log file and free buffers if it's already open */
219 220 221 222 223 224 225
  if (rli->cur_log_fd >= 0)
  {
    end_io_cache(&rli->cache_buf);
    my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
  
226
  rli->relay_log_pos = pos;
unknown's avatar
unknown committed
227

unknown's avatar
unknown committed
228 229 230 231
  /*
    Test to see if the previous run was with the skip of purging
    If yes, we do not purge when we restart
  */
232
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
233 234 235 236
  {
    *errmsg="Could not find first log during relay log initialization";
    goto err;
  }
237 238

  if (log)					// If not first log
unknown's avatar
unknown committed
239
  {
240 241
    if (strcmp(log, rli->linfo.log_file_name))
      rli->skip_log_purge=1;			// Different name; Don't purge
242
    if (rli->relay_log.find_log_pos(&rli->linfo, log, 1))
243 244 245 246
    {
      *errmsg="Could not find target log during relay log initialization";
      goto err;
    }
unknown's avatar
unknown committed
247
  }
248 249
  strmake(rli->relay_log_name,rli->linfo.log_file_name,
	  sizeof(rli->relay_log_name)-1);
250 251
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
  {
252 253 254 255 256
    /*
      The IO thread is using this log file.
      In this case, we will use the same IO_CACHE pointer to
      read data as the IO thread is using to write data.
    */
257
    if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 &&
unknown's avatar
unknown committed
258
	check_binlog_magic(rli->cur_log,errmsg))
259
      goto err;
unknown's avatar
unknown committed
260
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
261 262 263
  }
  else
  {
264 265 266
    /*
      Open the relay log and set rli->cur_log to point at this one
    */
267 268 269 270 271
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
				     rli->linfo.log_file_name,errmsg)) < 0)
      goto err;
    rli->cur_log = &rli->cache_buf;
  }
unknown's avatar
unknown committed
272 273 274
  if (pos > BIN_LOG_HEADER_SIZE)
    my_b_seek(rli->cur_log,(off_t)pos);

275
err:
unknown's avatar
unknown committed
276 277 278 279 280
  pthread_cond_broadcast(&rli->data_cond);
  if (need_data_lock)
    pthread_mutex_unlock(&rli->data_lock);
  pthread_mutex_unlock(log_lock);
  DBUG_RETURN ((*errmsg) ? 1 : 0);
281 282
}

283

284
/* called from get_options() in mysqld.cc on start-up */
unknown's avatar
unknown committed
285 286

void init_slave_skip_errors(const char* arg)
287
{
unknown's avatar
unknown committed
288
  const char *p;
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
  if (bitmap_init(&slave_error_mask,MAX_SLAVE_ERROR,0))
  {
    fprintf(stderr, "Badly out of memory, please check your system status\n");
    exit(1);
  }
  use_slave_mask = 1;
  for (;isspace(*arg);++arg)
    /* empty */;
  if (!my_casecmp(arg,"all",3))
  {
    bitmap_set_all(&slave_error_mask);
    return;
  }
  for (p= arg ; *p; )
  {
    long err_code;
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
      break;
    if (err_code < MAX_SLAVE_ERROR)
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
    while (!isdigit(*p) && *p)
      p++;
  }
}

unknown's avatar
unknown committed
314

unknown's avatar
unknown committed
315
/*
unknown's avatar
unknown committed
316
  We assume we have a run lock on rli and that both slave thread
unknown's avatar
unknown committed
317 318 319
  are not running
*/

320 321
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
		     const char** errmsg)
322
{
323
  int error=0;
unknown's avatar
unknown committed
324
  DBUG_ENTER("purge_relay_logs");
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347

  /*
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
    Indeed, rli->inited==0 does not imply that they already are empty.
    It could be that slave's info initialization partly succeeded : 
    for example if relay-log.info existed but *relay-bin*.*
    have been manually removed, init_relay_log_info reads the old 
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
    checks for the existence of the relay log, this fails and
    init_relay_log_info leaves rli->inited to 0.
    In that pathological case, rli->master_log_pos* will be properly reinited
    at the next START SLAVE (as RESET SLAVE or CHANGE
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
    or replace them with correct files), however if the user does SHOW SLAVE
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS 
    to display fine in any case.
  */

  rli->master_log_name[0]= 0;
  rli->master_log_pos= 0;
  rli->pending= 0;

348
  if (!rli->inited)
349
    DBUG_RETURN(0);
unknown's avatar
unknown committed
350

351 352
  DBUG_ASSERT(rli->slave_running == 0);
  DBUG_ASSERT(rli->mi->slave_running == 0);
unknown's avatar
unknown committed
353

354 355
  rli->slave_skip_counter=0;
  pthread_mutex_lock(&rli->data_lock);
356
  if (rli->relay_log.reset_logs(thd))
357 358 359 360 361
  {
    *errmsg = "Failed during log reset";
    error=1;
    goto err;
  }
362 363
  /* Save name of used relay log file */
  strmake(rli->relay_log_name, rli->relay_log.get_log_fname(),
unknown's avatar
unknown committed
364
	  sizeof(rli->relay_log_name)-1);
unknown's avatar
unknown committed
365 366 367
  // Just first log with magic number and nothing else
  rli->log_space_total= BIN_LOG_HEADER_SIZE;
  rli->relay_log_pos=   BIN_LOG_HEADER_SIZE;
unknown's avatar
unknown committed
368
  rli->relay_log.reset_bytes_written();
369
  if (!just_reset)
370 371
    error= init_relay_log_pos(rli, rli->relay_log_name, rli->relay_log_pos,
			      0 /* do not need data lock */, errmsg);
unknown's avatar
unknown committed
372

unknown's avatar
unknown committed
373 374 375 376
err:
#ifndef DBUG_OFF
  char buf[22];
#endif  
unknown's avatar
unknown committed
377
  DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
378
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
379
  DBUG_RETURN(error);
380 381
}

382

383 384 385 386 387 388 389
int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
{
  if (!mi->inited)
    return 0; /* successfully do nothing */
  int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
  pthread_mutex_t *sql_cond_lock,*io_cond_lock;
390
  DBUG_ENTER("terminate_slave_threads");
391 392 393 394 395 396 397 398 399 400

  sql_cond_lock=sql_lock;
  io_cond_lock=io_lock;
  
  if (skip_lock)
  {
    sql_lock = io_lock = 0;
  }
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
  {
unknown's avatar
unknown committed
401
    DBUG_PRINT("info",("Terminating IO thread"));
402 403 404 405 406 407
    mi->abort_slave=1;
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
				        io_cond_lock,
					&mi->stop_cond,
					&mi->slave_running)) &&
	!force_all)
408
      DBUG_RETURN(error);
409 410 411
  }
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
  {
unknown's avatar
unknown committed
412
    DBUG_PRINT("info",("Terminating SQL thread"));
413 414 415 416 417 418 419
    DBUG_ASSERT(mi->rli.sql_thd != 0) ;
    mi->rli.abort_slave=1;
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
				      sql_cond_lock,
				      &mi->rli.stop_cond,
				      &mi->rli.slave_running)) &&
	!force_all)
420
      DBUG_RETURN(error);
421
  }
422
  DBUG_RETURN(0);
423 424
}

425

426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
			   pthread_mutex_t *cond_lock,
			   pthread_cond_t* term_cond,
			   volatile bool* slave_running)
{
  if (term_lock)
  {
    pthread_mutex_lock(term_lock);
    if (!*slave_running)
    {
      pthread_mutex_unlock(term_lock);
      return ER_SLAVE_NOT_RUNNING;
    }
  }
  DBUG_ASSERT(thd != 0);
unknown's avatar
unknown committed
441 442 443
  /*
    Is is criticate to test if the slave is running. Otherwise, we might
    be referening freed memory trying to kick it
444
  */
445
  THD_CHECK_SENTRY(thd);
446 447 448 449
  if (*slave_running)
  {
    KICK_SLAVE(thd);
  }
450 451
  while (*slave_running)
  {
unknown's avatar
unknown committed
452 453 454
    /*
      There is a small chance that slave thread might miss the first
      alarm. To protect againts it, resend the signal until it reacts
455 456
    */
    struct timespec abstime;
unknown's avatar
unknown committed
457
    set_timespec(abstime,2);
458 459
    pthread_cond_timedwait(term_cond, cond_lock, &abstime);
    if (*slave_running)
460
    {
461
      KICK_SLAVE(thd);
462
    }
463 464 465 466 467 468
  }
  if (term_lock)
    pthread_mutex_unlock(term_lock);
  return 0;
}

469

unknown's avatar
unknown committed
470
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
471
		       pthread_mutex_t *cond_lock,
unknown's avatar
unknown committed
472 473 474
		       pthread_cond_t *start_cond,
		       volatile bool *slave_running,
		       volatile ulong *slave_run_id,
475 476 477
		       MASTER_INFO* mi)
{
  pthread_t th;
unknown's avatar
unknown committed
478
  ulong start_id;
479
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
480 481
  DBUG_ENTER("start_slave_thread");

482 483 484 485 486 487 488 489 490
  if (start_lock)
    pthread_mutex_lock(start_lock);
  if (!server_id)
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
    sql_print_error("Server id not set, will not start slave");
unknown's avatar
unknown committed
491
    DBUG_RETURN(ER_BAD_SLAVE);
492 493 494
  }
  
  if (*slave_running)
495 496 497 498 499
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
500
    DBUG_RETURN(ER_SLAVE_MUST_STOP);
501
  }
unknown's avatar
unknown committed
502 503
  start_id= *slave_run_id;
  DBUG_PRINT("info",("Creating new slave thread"));
504 505 506 507
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
  {
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
508
    DBUG_RETURN(ER_SLAVE_THREAD);
509 510 511 512
  }
  if (start_cond && cond_lock)
  {
    THD* thd = current_thd;
unknown's avatar
unknown committed
513
    while (start_id == *slave_run_id)
514
    {
unknown's avatar
unknown committed
515
      DBUG_PRINT("sleep",("Waiting for slave thread to start"));
516
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
517
					    "Waiting for slave thread to start");
518 519 520 521 522
      pthread_cond_wait(start_cond,cond_lock);
      thd->exit_cond(old_msg);
      if (thd->killed)
      {
	pthread_mutex_unlock(cond_lock);
unknown's avatar
unknown committed
523
	DBUG_RETURN(ER_SERVER_SHUTDOWN);
524 525 526 527 528
      }
    }
  }
  if (start_lock)
    pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
529
  DBUG_RETURN(0);
530
}
unknown's avatar
unknown committed
531 532 533 534 535 536


/*
  SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
  sense to do that for starting a slave - we always care if it actually
  started the threads that were not previously running
537
*/
unknown's avatar
unknown committed
538

539 540 541 542 543 544 545
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
			MASTER_INFO* mi, const char* master_info_fname,
			const char* slave_info_fname, int thread_mask)
{
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
  pthread_cond_t* cond_io=0,*cond_sql=0;
  int error=0;
546
  DBUG_ENTER("start_slave_threads");
547 548 549 550 551 552 553 554 555 556 557 558 559
  
  if (need_slave_mutex)
  {
    lock_io = &mi->run_lock;
    lock_sql = &mi->rli.run_lock;
  }
  if (wait_for_start)
  {
    cond_io = &mi->start_cond;
    cond_sql = &mi->rli.start_cond;
    lock_cond_io = &mi->run_lock;
    lock_cond_sql = &mi->rli.run_lock;
  }
560 561 562

  if (thread_mask & SLAVE_IO)
    error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
unknown's avatar
unknown committed
563 564
			     cond_io,
			     &mi->slave_running, &mi->slave_run_id,
565 566
			     mi);
  if (!error && (thread_mask & SLAVE_SQL))
567
  {
568 569
    error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
			     cond_sql,
unknown's avatar
unknown committed
570 571
			     &mi->rli.slave_running, &mi->rli.slave_run_id,
			     mi);
572 573 574
    if (error)
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
  }
575
  DBUG_RETURN(error);
576
}
577

578

579 580 581 582
void init_table_rule_hash(HASH* h, bool* h_inited)
{
  hash_init(h, TABLE_RULE_HASH_SIZE,0,0,
	    (hash_get_key) get_table_key,
583
	    (hash_free_key) free_table_ent, 0);
584 585
  *h_inited = 1;
}
unknown's avatar
unknown committed
586

unknown's avatar
unknown committed
587 588
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
{
589
  my_init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
unknown's avatar
unknown committed
590 591 592 593 594 595 596 597 598
		     TABLE_RULE_ARR_SIZE);
  *a_inited = 1;
}

static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
  uint i;
  const char* key_end = key + len;
  
599
  for (i = 0; i < a->elements; i++)
unknown's avatar
unknown committed
600 601 602
    {
      TABLE_RULE_ENT* e ;
      get_dynamic(a, (gptr)&e, i);
603
      if (!wild_case_compare(key, key_end, (const char*)e->db,
unknown's avatar
unknown committed
604 605 606 607 608 609 610
			    (const char*)(e->db + e->key_len),'\\'))
	return e;
    }
  
  return 0;
}

611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631

/*
  Checks whether tables match some (wild_)do_table and (wild_)ignore_table
  rules (for replication)

  SYNOPSIS
    tables_ok()
    thd             thread (SQL slave thread normally)
    tables          list of tables to check

  NOTES
    Note that changing the order of the tables in the list can lead to
    different results. Note also the order of precedence of the do/ignore 
    rules (see code below). For that reason, users should not set conflicting 
    rules because they may get unpredicted results.

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated                  
*/

632 633
int tables_ok(THD* thd, TABLE_LIST* tables)
{
634 635
  DBUG_ENTER("tables_ok");

unknown's avatar
unknown committed
636 637
  for (; tables; tables = tables->next)
  {
638 639 640 641
    char hash_key[2*NAME_LEN+2];
    char *end;
    uint len;

unknown's avatar
unknown committed
642 643
    if (!tables->updating) 
      continue;
644 645 646
    end= strmov(hash_key, tables->db ? tables->db : thd->db);
    *end++= '.';
    len= (uint) (strmov(end, tables->real_name) - hash_key);
unknown's avatar
unknown committed
647
    if (do_table_inited) // if there are any do's
648
    {
unknown's avatar
unknown committed
649
      if (hash_search(&replicate_do_table, (byte*) hash_key, len))
650
	DBUG_RETURN(1);
unknown's avatar
unknown committed
651
    }
652
    if (ignore_table_inited) // if there are any ignores
unknown's avatar
unknown committed
653 654
    {
      if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
655
	DBUG_RETURN(0); 
656
    }
unknown's avatar
unknown committed
657 658
    if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
					  hash_key, len))
659
      DBUG_RETURN(1);
unknown's avatar
unknown committed
660 661
    if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
					      hash_key, len))
662
      DBUG_RETURN(0);
unknown's avatar
unknown committed
663
  }
664

unknown's avatar
unknown committed
665 666 667 668
  /*
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722
  DBUG_RETURN(!do_table_inited && !wild_do_table_inited);
}


/*
  Checks whether a db matches wild_do_table and wild_ignore_table
  rules (for replication)

  SYNOPSIS
    db_ok_with_wild_table()
    db		name of the db to check.
		Is tested with check_db_name() before calling this function.

  NOTES
    Here is the reason for this function.
    We advise users who want to exclude a database 'db1' safely to do it
    with replicate_wild_ignore_table='db1.%' instead of binlog_ignore_db or
    replicate_ignore_db because the two lasts only check for the selected db,
    which won't work in that case:
    USE db2;
    UPDATE db1.t SET ... #this will be replicated and should not
    whereas replicate_wild_ignore_table will work in all cases.
    With replicate_wild_ignore_table, we only check tables. When
    one does 'DROP DATABASE db1', tables are not involved and the
    statement will be replicated, while users could expect it would not (as it
    rougly means 'DROP db1.first_table, DROP db1.second_table...').
    In other words, we want to interpret 'db1.%' as "everything touching db1".
    That is why we want to match 'db1' against 'db1.%' wild table rules.

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated
 */

int db_ok_with_wild_table(const char *db)
{
  char hash_key[NAME_LEN+2];
  char *end;
  int len;
  end= strmov(hash_key, db);
  *end++= '.';
  len= end - hash_key ;
  if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
                                        hash_key, len))
    return 1;
  if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
                                            hash_key, len))
    return 0;
  
  /*
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
  return !wild_do_table_inited;
723 724 725 726 727
}


int add_table_rule(HASH* h, const char* table_spec)
{
unknown's avatar
unknown committed
728
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
729
  if (!dot) return 1;
unknown's avatar
unknown committed
730
  // len is always > 0 because we know the there exists a '.'
731 732 733
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
unknown's avatar
unknown committed
734
  if (!e) return 1;
735 736 737 738 739 740 741 742
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  (void)hash_insert(h, (byte*)e);
  return 0;
}

unknown's avatar
unknown committed
743 744
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
unknown's avatar
unknown committed
745
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
746
  if (!dot) return 1;
unknown's avatar
unknown committed
747 748 749
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
unknown's avatar
unknown committed
750
  if (!e) return 1;
unknown's avatar
unknown committed
751 752 753 754 755 756 757 758
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  insert_dynamic(a, (gptr)&e);
  return 0;
}

759 760 761
static void free_string_array(DYNAMIC_ARRAY *a)
{
  uint i;
762
  for (i = 0; i < a->elements; i++)
763 764
    {
      char* p;
unknown's avatar
unknown committed
765
      get_dynamic(a, (gptr) &p, i);
766 767 768 769 770
      my_free(p, MYF(MY_WME));
    }
  delete_dynamic(a);
}

771 772
#ifdef NOT_USED_YET

773 774 775 776 777
static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
{
  end_master_info(mi);
  return 0;
}
778
#endif
779

unknown's avatar
unknown committed
780

781 782
void end_slave()
{
unknown's avatar
unknown committed
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
  if (active_mi)
  {
    /*
      TODO: replace the line below with
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
      once multi-master code is ready.
    */
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
    end_master_info(active_mi);
    if (do_table_inited)
      hash_free(&replicate_do_table);
    if (ignore_table_inited)
      hash_free(&replicate_ignore_table);
    if (wild_do_table_inited)
      free_string_array(&replicate_wild_do_table);
    if (wild_ignore_table_inited)
      free_string_array(&replicate_wild_ignore_table);
    delete active_mi;
    active_mi= 0;
  }
803
}
unknown's avatar
unknown committed
804

805

806
static bool io_slave_killed(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
807
{
808 809 810
  DBUG_ASSERT(mi->io_thd == thd);
  DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun
  return mi->abort_slave || abort_loop || thd->killed;
unknown's avatar
unknown committed
811 812
}

813

814
static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
815 816 817 818 819 820
{
  DBUG_ASSERT(rli->sql_thd == thd);
  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
  return rli->abort_slave || abort_loop || thd->killed;
}

821

822
void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
823 824 825
{
  va_list args;
  va_start(args,msg);
826 827 828 829 830
  my_vsnprintf(rli->last_slave_error,
	       sizeof(rli->last_slave_error), msg, args);
  sql_print_error("Slave: %s, error_code=%d", rli->last_slave_error,
		  err_code);
  rli->last_slave_errno = err_code;
831 832
}

833

834
void skip_load_data_infile(NET* net)
835 836 837
{
  (void)my_net_write(net, "\xfb/dev/null", 10);
  (void)net_flush(net);
unknown's avatar
unknown committed
838 839
  (void)my_net_read(net);			// discard response
  send_ok(net);					// the master expects it
840 841
}

842

843
char* rewrite_db(char* db)
unknown's avatar
unknown committed
844
{
unknown's avatar
unknown committed
845 846
  if (replicate_rewrite_db.is_empty() || !db)
    return db;
unknown's avatar
unknown committed
847 848 849
  I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  i_string_pair* tmp;

unknown's avatar
unknown committed
850 851 852 853 854
  while ((tmp=it++))
  {
    if (!strcmp(tmp->key, db))
      return tmp->val;
  }
unknown's avatar
unknown committed
855 856
  return db;
}
857

unknown's avatar
unknown committed
858

859 860 861 862 863 864 865 866 867 868 869 870 871 872 873
/*
  Checks whether a db matches some do_db and ignore_db rules
  (for logging or replication)

  SYNOPSIS
    db_ok()
    db              name of the db to check
    do_list         either binlog_do_db or replicate_do_db
    ignore_list     either binlog_ignore_db or replicate_ignore_db

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated                  
*/

unknown's avatar
unknown committed
874 875 876
int db_ok(const char* db, I_List<i_string> &do_list,
	  I_List<i_string> &ignore_list )
{
unknown's avatar
unknown committed
877
  if (do_list.is_empty() && ignore_list.is_empty())
unknown's avatar
unknown committed
878 879
    return 1; // ok to replicate if the user puts no constraints

unknown's avatar
unknown committed
880 881 882 883 884
  /*
    If the user has specified restrictions on which databases to replicate
    and db was not selected, do not replicate.
  */
  if (!db)
unknown's avatar
unknown committed
885
    return 0;
unknown's avatar
unknown committed
886

unknown's avatar
unknown committed
887 888 889 890
  if (!do_list.is_empty()) // if the do's are not empty
  {
    I_List_iterator<i_string> it(do_list);
    i_string* tmp;
unknown's avatar
unknown committed
891

unknown's avatar
unknown committed
892 893 894 895
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 1; // match
unknown's avatar
unknown committed
896
    }
unknown's avatar
unknown committed
897 898
    return 0;
  }
unknown's avatar
unknown committed
899
  else // there are some elements in the don't, otherwise we cannot get here
unknown's avatar
unknown committed
900 901 902
  {
    I_List_iterator<i_string> it(ignore_list);
    i_string* tmp;
unknown's avatar
unknown committed
903

unknown's avatar
unknown committed
904 905 906 907
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 0; // match
unknown's avatar
unknown committed
908
    }
unknown's avatar
unknown committed
909 910
    return 1;
  }
unknown's avatar
unknown committed
911 912
}

913

unknown's avatar
unknown committed
914 915
static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
				 const char *default_val)
unknown's avatar
unknown committed
916
{
unknown's avatar
unknown committed
917 918 919 920 921 922 923
  uint length;
  if ((length=my_b_gets(f,var, max_size)))
  {
    char* last_p = var + length -1;
    if (*last_p == '\n')
      *last_p = 0; // if we stopped on newline, kill it
    else
unknown's avatar
unknown committed
924
    {
unknown's avatar
unknown committed
925 926 927 928
      /*
	If we truncated a line or stopped on last char, remove all chars
	up to and including newline.
      */
unknown's avatar
unknown committed
929
      int c;
unknown's avatar
unknown committed
930
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
unknown's avatar
unknown committed
931
    }
unknown's avatar
unknown committed
932 933 934 935
    return 0;
  }
  else if (default_val)
  {
unknown's avatar
unknown committed
936
    strmake(var,  default_val, max_size-1);
unknown's avatar
unknown committed
937 938
    return 0;
  }
unknown's avatar
unknown committed
939
  return 1;
unknown's avatar
unknown committed
940 941
}

942

unknown's avatar
unknown committed
943
static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
unknown's avatar
unknown committed
944 945 946
{
  char buf[32];
  
unknown's avatar
unknown committed
947 948 949 950 951
  if (my_b_gets(f, buf, sizeof(buf))) 
  {
    *var = atoi(buf);
    return 0;
  }
unknown's avatar
unknown committed
952
  else if (default_val)
unknown's avatar
unknown committed
953 954 955 956
  {
    *var = default_val;
    return 0;
  }
unknown's avatar
unknown committed
957
  return 1;
unknown's avatar
unknown committed
958 959
}

960

961 962
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
{
963
  const char* errmsg= 0;
964
  
965
  switch (*mysql->server_version) {
966 967 968 969
  case '3':
    mi->old_format = 1;
    break;
  case '4':
unknown's avatar
unknown committed
970
  case '5':
971 972 973 974
    mi->old_format = 0;
    break;
  default:
    errmsg = "Master reported unrecognized MySQL version";
975
    break;
976
  }
977

978 979 980 981 982 983 984 985
  if (errmsg)
  {
    sql_print_error(errmsg);
    return 1;
  }
  return 0;
}

unknown's avatar
unknown committed
986 987 988 989

static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name)
{
990
  ulong packet_len = my_net_read(net); // read create table statement
991
  char *query;
992 993
  Vio* save_vio;
  HA_CHECK_OPT check_opt;
unknown's avatar
unknown committed
994
  TABLE_LIST tables;
995 996
  int error= 1;
  handler *file;
997
  ulong save_options;
unknown's avatar
unknown committed
998
  
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
  if (packet_len == packet_error)
  {
    send_error(&thd->net, ER_MASTER_NET_READ);
    return 1;
  }
  if (net->read_pos[0] == 255) // error from master
  {
    net->read_pos[packet_len] = 0;
    net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
    return 1;
  }
unknown's avatar
unknown committed
1010
  thd->command = COM_TABLE_DUMP;
1011 1012
  /* Note that we should not set thd->query until the area is initalized */
  if (!(query = sql_alloc(packet_len + 1)))
1013 1014 1015 1016 1017
  {
    sql_print_error("create_table_from_dump: out of memory");
    net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
    return 1;
  }
1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
  memcpy(query, net->read_pos, packet_len);
  query[packet_len]= 0;
  thd->query_length= packet_len;
  /*
    We make the following lock in an attempt to ensure that the compiler will
    not rearrange the code so that thd->query is set too soon
  */
  VOID(pthread_mutex_lock(&LOCK_thread_count));
  thd->query= query;
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
1028 1029 1030
  thd->current_tablenr = 0;
  thd->query_error = 0;
  thd->net.no_send_ok = 1;
1031 1032 1033
  
  /* we do not want to log create table statement */
  save_options = thd->options;
1034
  thd->options &= ~(ulong) (OPTION_BIN_LOG);
unknown's avatar
unknown committed
1035
  thd->proc_info = "Creating table from master dump";
unknown's avatar
unknown committed
1036
  // save old db in case we are creating in a different database
unknown's avatar
unknown committed
1037
  char* save_db = thd->db;
1038
  thd->db = (char*)db;
unknown's avatar
unknown committed
1039
  mysql_parse(thd, thd->query, packet_len); // run create table
1040
  thd->db = save_db;		// leave things the way the were before
1041
  thd->options = save_options;
unknown's avatar
unknown committed
1042
  
1043 1044
  if (thd->query_error)
    goto err;			// mysql_parse took care of the error send
unknown's avatar
unknown committed
1045 1046 1047

  bzero((char*) &tables,sizeof(tables));
  tables.db = (char*)db;
1048
  tables.alias= tables.real_name= (char*)table_name;
unknown's avatar
unknown committed
1049 1050
  tables.lock_type = TL_WRITE;
  thd->proc_info = "Opening master dump table";
unknown's avatar
unknown committed
1051 1052
  if (!open_ltable(thd, &tables, TL_WRITE))
  {
1053
    send_error(&thd->net,0,0);			// Send error from open_ltable
unknown's avatar
unknown committed
1054
    sql_print_error("create_table_from_dump: could not open created table");
1055
    goto err;
unknown's avatar
unknown committed
1056
  }
unknown's avatar
unknown committed
1057
  
1058
  file = tables.table->file;
unknown's avatar
unknown committed
1059
  thd->proc_info = "Reading master dump table data";
unknown's avatar
unknown committed
1060 1061 1062 1063
  if (file->net_read_dump(net))
  {
    net_printf(&thd->net, ER_MASTER_NET_READ);
    sql_print_error("create_table_from_dump::failed in\
unknown's avatar
unknown committed
1064
 handler::net_read_dump()");
1065
    goto err;
unknown's avatar
unknown committed
1066
  }
unknown's avatar
unknown committed
1067 1068

  check_opt.init();
unknown's avatar
unknown committed
1069
  check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK;
unknown's avatar
unknown committed
1070
  thd->proc_info = "Rebuilding the index on master dump table";
unknown's avatar
unknown committed
1071 1072 1073 1074 1075
  /*
    We do not want repair() to spam us with messages
    just send them to the error log, and report the failure in case of
    problems.
  */
1076
  save_vio = thd->net.vio;
unknown's avatar
unknown committed
1077
  thd->net.vio = 0;
1078
  error=file->repair(thd,&check_opt) != 0;
unknown's avatar
unknown committed
1079
  thd->net.vio = save_vio;
1080 1081 1082 1083
  if (error)
    net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name);

err:
unknown's avatar
unknown committed
1084 1085 1086 1087 1088
  close_thread_tables(thd);
  thd->net.no_send_ok = 0;
  return error; 
}

1089 1090
int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
		       MASTER_INFO *mi, MYSQL *mysql)
unknown's avatar
unknown committed
1091
{
1092 1093 1094 1095 1096 1097
  int error= 1;
  const char *errmsg=0;
  bool called_connected= (mysql != NULL);
  DBUG_ENTER("fetch_master_table");
  DBUG_PRINT("enter", ("db_name: '%s'  table_name: '%s'",
		       db_name,table_name));
unknown's avatar
unknown committed
1098

unknown's avatar
merge  
unknown committed
1099
  if (!called_connected)
1100 1101 1102 1103 1104 1105
  { 
    if (!(mysql = mc_mysql_init(NULL)))
    {
      send_error(&thd->net);			// EOM
      DBUG_RETURN(1);
    }
unknown's avatar
merge  
unknown committed
1106
    if (connect_to_master(thd, mysql, mi))
1107
    {
1108 1109 1110
      net_printf(&thd->net, ER_CONNECT_TO_MASTER, mc_mysql_error(mysql));
      mc_mysql_close(mysql);
      DBUG_RETURN(1);
1111
    }
1112 1113
    if (thd->killed)
      goto err;
1114
  }
unknown's avatar
unknown committed
1115

unknown's avatar
unknown committed
1116
  if (request_table_dump(mysql, db_name, table_name))
1117
  {
1118 1119
    error= ER_UNKNOWN_ERROR;
    errmsg= "Failed on table dump request";
1120 1121
    goto err;
  }
unknown's avatar
merge  
unknown committed
1122
  if (create_table_from_dump(thd, &mysql->net, db_name,
1123
			    table_name))
1124
    goto err;    // create_table_from_dump will have sent the error already
unknown's avatar
unknown committed
1125
  error = 0;
1126

unknown's avatar
unknown committed
1127
 err:
1128
  thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
1129 1130 1131 1132 1133
  if (!called_connected)
    mc_mysql_close(mysql);
  if (errmsg && thd->net.vio)
    send_error(&thd->net, error, errmsg);
  DBUG_RETURN(test(error));			// Return 1 on error
unknown's avatar
unknown committed
1134 1135
}

1136

1137 1138
void end_master_info(MASTER_INFO* mi)
{
1139 1140
  DBUG_ENTER("end_master_info");

1141
  if (!mi->inited)
1142
    DBUG_VOID_RETURN;
1143 1144
  end_relay_log_info(&mi->rli);
  if (mi->fd >= 0)
1145 1146 1147 1148 1149
  {
    end_io_cache(&mi->file);
    (void)my_close(mi->fd, MYF(MY_WME));
    mi->fd = -1;
  }
1150
  mi->inited = 0;
1151 1152

  DBUG_VOID_RETURN;
1153 1154
}

1155

1156 1157 1158 1159 1160 1161
int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
{
  char fname[FN_REFLEN+128];
  int info_fd;
  const char* msg = 0;
  int error = 0;
1162
  DBUG_ENTER("init_relay_log_info");
unknown's avatar
unknown committed
1163

1164
  if (rli->inited)				// Set if this function called
unknown's avatar
unknown committed
1165 1166
    DBUG_RETURN(0);
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
1167 1168 1169 1170 1171
  pthread_mutex_lock(&rli->data_lock);
  info_fd = rli->info_fd;
  rli->pending = 0;
  rli->cur_log_fd = -1;
  rli->slave_skip_counter=0;
1172 1173
  rli->abort_pos_wait=0;
  rli->skip_log_purge=0;
unknown's avatar
unknown committed
1174 1175
  rli->log_space_limit = relay_log_space_limit;
  rli->log_space_total = 0;
1176

1177 1178 1179 1180
  // TODO: make this work with multi-master
  if (!opt_relay_logname)
  {
    char tmp[FN_REFLEN];
unknown's avatar
unknown committed
1181 1182 1183
    /*
      TODO: The following should be using fn_format();  We just need to
      first change fn_format() to cut the file name if it's too long.
1184 1185 1186 1187 1188
    */
    strmake(tmp,glob_hostname,FN_REFLEN-5);
    strmov(strcend(tmp,'.'),"-relay-bin");
    opt_relay_logname=my_strdup(tmp,MYF(MY_WME));
  }
1189 1190 1191 1192 1193 1194
  if (open_log(&rli->relay_log, glob_hostname, opt_relay_logname,
	       "-relay-bin", opt_relaylog_index_name,
	       LOG_BIN, 1 /* read_append cache */,
	       1 /* no auto events */))
    DBUG_RETURN(1);

1195
  /* if file does not exist */
unknown's avatar
unknown committed
1196
  if (access(fname,F_OK))
1197
  {
unknown's avatar
unknown committed
1198 1199 1200 1201
    /*
      If someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
1202 1203
    if (info_fd >= 0)
      my_close(info_fd, MYF(MY_WME));
unknown's avatar
unknown committed
1204 1205 1206
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
	init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
		      MYF(MY_WME)))
1207
    {
1208 1209
      msg= current_thd->net.last_error;
      goto err;
1210
    }
1211 1212 1213

    /* Init relay log with first entry in the relay index file */
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
unknown's avatar
unknown committed
1214
			   &msg))
1215
      goto err;
1216 1217 1218
    rli->master_log_name[0]= 0;
    rli->master_log_pos= 0;		
    rli->info_fd= info_fd;
1219 1220 1221
  }
  else // file exists
  {
unknown's avatar
unknown committed
1222
    if (info_fd >= 0)
1223
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
unknown's avatar
unknown committed
1224 1225 1226
    else if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
	     init_io_cache(&rli->info_file, info_fd,
			   IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
1227 1228 1229
    {
      if (info_fd >= 0)
	my_close(info_fd, MYF(0));
unknown's avatar
unknown committed
1230
      rli->info_fd= -1;
1231
      rli->relay_log.close(1);
1232
      pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1233
      DBUG_RETURN(1);
1234 1235 1236
    }
      
    rli->info_fd = info_fd;
1237
    int relay_log_pos, master_log_pos;
1238
    if (init_strvar_from_file(rli->relay_log_name,
unknown's avatar
unknown committed
1239 1240
			      sizeof(rli->relay_log_name), &rli->info_file,
			      "") ||
1241
       init_intvar_from_file(&relay_log_pos,
unknown's avatar
unknown committed
1242
			     &rli->info_file, BIN_LOG_HEADER_SIZE) ||
1243 1244
       init_strvar_from_file(rli->master_log_name,
			     sizeof(rli->master_log_name), &rli->info_file,
unknown's avatar
unknown committed
1245
			     "") ||
1246
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
1247 1248 1249 1250
    {
      msg="Error reading slave log configuration";
      goto err;
    }
1251 1252 1253
    rli->relay_log_pos=  relay_log_pos;
    rli->master_log_pos= master_log_pos;

1254 1255 1256
    if (init_relay_log_pos(rli,
			   rli->relay_log_name,
			   rli->relay_log_pos,
1257 1258
			   0 /* no data lock*/,
			   &msg))
1259
      goto err;
1260
  }
unknown's avatar
unknown committed
1261
  DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
1262
  DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
unknown's avatar
unknown committed
1263 1264 1265 1266
  /*
    Now change the cache from READ to WRITE - must do this
    before flush_relay_log_info
  */
1267
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
1268
  error= flush_relay_log_info(rli);
unknown's avatar
unknown committed
1269 1270 1271 1272 1273
  if (count_relay_log_space(rli))
  {
    msg="Error counting relay log space";
    goto err;
  }
1274
  rli->inited= 1;
1275
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1276
  DBUG_RETURN(error);
1277 1278 1279 1280

err:
  sql_print_error(msg);
  end_io_cache(&rli->info_file);
1281 1282
  if (info_fd >= 0)
    my_close(info_fd, MYF(0));
unknown's avatar
unknown committed
1283
  rli->info_fd= -1;
1284
  rli->relay_log.close(1);
1285
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1286
  DBUG_RETURN(1);
1287 1288
}

1289

unknown's avatar
unknown committed
1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303
static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo)
{
  MY_STAT s;
  DBUG_ENTER("add_relay_log");
  if (!my_stat(linfo->log_file_name,&s,MYF(0)))
  {
    sql_print_error("log %s listed in the index, but failed to stat",
		    linfo->log_file_name);
    DBUG_RETURN(1);
  }
  rli->log_space_total += s.st_size;
#ifndef DBUG_OFF
  char buf[22];
  DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
1304
#endif  
unknown's avatar
unknown committed
1305 1306 1307
  DBUG_RETURN(0);
}

1308

unknown's avatar
unknown committed
1309 1310
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
{
1311
  bool slave_killed=0;
unknown's avatar
unknown committed
1312 1313 1314
  MASTER_INFO* mi = rli->mi;
  const char* save_proc_info;
  THD* thd = mi->io_thd;
1315

unknown's avatar
unknown committed
1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329
  DBUG_ENTER("wait_for_relay_log_space");
  pthread_mutex_lock(&rli->log_space_lock);
  save_proc_info = thd->proc_info;
  thd->proc_info = "Waiting for relay log space to free";
  while (rli->log_space_limit < rli->log_space_total &&
	 !(slave_killed=io_slave_killed(thd,mi)))
  {
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
  }
  thd->proc_info = save_proc_info;
  pthread_mutex_unlock(&rli->log_space_lock);
  DBUG_RETURN(slave_killed);
}

unknown's avatar
unknown committed
1330

unknown's avatar
unknown committed
1331 1332 1333 1334 1335
static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
  LOG_INFO linfo;
  DBUG_ENTER("count_relay_log_space");
  rli->log_space_total = 0;
1336
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
unknown's avatar
unknown committed
1337 1338 1339 1340
  {
    sql_print_error("Could not find first log while counting relay log space");
    DBUG_RETURN(1);
  }
unknown's avatar
unknown committed
1341
  do
unknown's avatar
unknown committed
1342 1343 1344
  {
    if (add_relay_log(rli,&linfo))
      DBUG_RETURN(1);
1345
  } while (!rli->relay_log.find_next_log(&linfo, 1));
unknown's avatar
unknown committed
1346 1347
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1348

unknown's avatar
unknown committed
1349

1350
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
1351 1352
		     const char* slave_info_fname,
		     bool abort_if_no_master_info_file)
unknown's avatar
unknown committed
1353
{
unknown's avatar
unknown committed
1354 1355 1356 1357
  int fd,error;
  char fname[FN_REFLEN+128];
  DBUG_ENTER("init_master_info");

unknown's avatar
unknown committed
1358
  if (mi->inited)
unknown's avatar
unknown committed
1359
    DBUG_RETURN(0);
unknown's avatar
unknown committed
1360 1361
  mi->mysql=0;
  mi->file_id=1;
1362
  mi->ignore_stop_event=0;
1363
  fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
unknown's avatar
unknown committed
1364

unknown's avatar
unknown committed
1365 1366 1367 1368
  /*
    We need a mutex while we are changing master info parameters to
    keep other threads from reading bogus info
  */
unknown's avatar
unknown committed
1369

1370
  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
1371
  fd = mi->fd;
unknown's avatar
unknown committed
1372
  
1373
  if (access(fname,F_OK))
unknown's avatar
unknown committed
1374
  {
1375 1376 1377 1378 1379
    if (abort_if_no_master_info_file)
    {
      pthread_mutex_unlock(&mi->data_lock);
      DBUG_RETURN(0);
    }
unknown's avatar
unknown committed
1380 1381 1382 1383
    /*
      if someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
unknown's avatar
unknown committed
1384 1385
    if (fd >= 0)
      my_close(fd, MYF(MY_WME));
unknown's avatar
unknown committed
1386 1387 1388 1389 1390
    if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
	init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
		      MYF(MY_WME)))
      goto err;

1391
    mi->master_log_name[0] = 0;
unknown's avatar
unknown committed
1392
    mi->master_log_pos = BIN_LOG_HEADER_SIZE;		// skip magic number
unknown's avatar
unknown committed
1393 1394 1395 1396 1397 1398 1399
    mi->fd = fd;
      
    if (master_host)
      strmake(mi->host, master_host, sizeof(mi->host) - 1);
    if (master_user)
      strmake(mi->user, master_user, sizeof(mi->user) - 1);
    if (master_password)
1400
      strmake(mi->password, master_password, HASH_PASSWORD_LENGTH);
unknown's avatar
unknown committed
1401 1402 1403
    mi->port = master_port;
    mi->connect_retry = master_connect_retry;
  }
1404
  else // file exists
unknown's avatar
unknown committed
1405
  {
unknown's avatar
unknown committed
1406
    if (fd >= 0)
unknown's avatar
unknown committed
1407
      reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
unknown's avatar
unknown committed
1408 1409 1410 1411
    else if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
	     init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
			   0, MYF(MY_WME)))
      goto err;
unknown's avatar
unknown committed
1412

unknown's avatar
unknown committed
1413
    mi->fd = fd;
1414 1415
    int port, connect_retry, master_log_pos;

1416
    if (init_strvar_from_file(mi->master_log_name,
unknown's avatar
unknown committed
1417
			      sizeof(mi->master_log_name), &mi->file,
unknown's avatar
unknown committed
1418
			      "") ||
1419
	init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
unknown's avatar
unknown committed
1420 1421 1422 1423 1424 1425
	init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
			      master_host) ||
	init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
			      master_user) || 
	init_strvar_from_file(mi->password, HASH_PASSWORD_LENGTH+1, &mi->file,
			      master_password) ||
1426 1427
	init_intvar_from_file(&port, &mi->file, master_port) ||
	init_intvar_from_file(&connect_retry, &mi->file,
unknown's avatar
unknown committed
1428
			      master_connect_retry))
unknown's avatar
unknown committed
1429
    {
unknown's avatar
unknown committed
1430
      sql_print_error("Error reading master configuration");
1431
      goto err;
unknown's avatar
unknown committed
1432
    }
1433 1434 1435 1436 1437 1438 1439
    /*
      This has to be handled here as init_intvar_from_file can't handle
      my_off_t types
    */
    mi->master_log_pos= (my_off_t) master_log_pos;
    mi->port= (uint) port;
    mi->connect_retry= (uint) connect_retry;
unknown's avatar
unknown committed
1440
  }
1441 1442 1443
  DBUG_PRINT("master_info",("log_file_name: %s  position: %ld",
			    mi->master_log_name,
			    (ulong) mi->master_log_pos));
1444 1445 1446 1447 1448

  if (init_relay_log_info(&mi->rli, slave_info_fname))
    goto err;
  mi->rli.mi = mi;

unknown's avatar
unknown committed
1449
  mi->inited = 1;
unknown's avatar
unknown committed
1450
  // now change cache READ -> WRITE - must do this before flush_master_info
1451
  reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
unknown's avatar
unknown committed
1452
  error=test(flush_master_info(mi));
1453
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
1454
  DBUG_RETURN(error);
unknown's avatar
unknown committed
1455

1456
err:
unknown's avatar
unknown committed
1457 1458 1459 1460 1461 1462
  if (fd >= 0)
  {
    my_close(fd, MYF(0));
    end_io_cache(&mi->file);
  }
  mi->fd= -1;
1463
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
1464
  DBUG_RETURN(1);
unknown's avatar
unknown committed
1465 1466
}

1467

1468 1469 1470 1471 1472
int register_slave_on_master(MYSQL* mysql)
{
  String packet;
  char buf[4];

1473
  if (!report_host)
1474 1475 1476 1477 1478
    return 0;
  
  int4store(buf, server_id);
  packet.append(buf, 4);

1479
  net_store_data(&packet, report_host); 
1480
  if (report_user)
1481 1482 1483 1484
    net_store_data(&packet, report_user);
  else
    packet.append((char)0);
  
unknown's avatar
unknown committed
1485
  if (report_password)
1486
    net_store_data(&packet, report_user);
1487 1488 1489 1490 1491
  else
    packet.append((char)0);

  int2store(buf, (uint16)report_port);
  packet.append(buf, 2);
unknown's avatar
unknown committed
1492 1493 1494 1495
  int4store(buf, rpl_recovery_rank);
  packet.append(buf, 4);
  int4store(buf, 0); /* tell the master will fill in master_id */
  packet.append(buf, 4);
1496

1497
  if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
1498 1499
		       packet.length(), 0))
  {
1500 1501
    sql_print_error("Error on COM_REGISTER_SLAVE: %d '%s'",
		    mc_mysql_errno(mysql),
1502 1503 1504 1505 1506 1507 1508
		    mc_mysql_error(mysql));
    return 1;
  }

  return 0;
}

1509
int show_master_info(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
1510
{
1511
  // TODO: fix this for multi-master
unknown's avatar
unknown committed
1512 1513 1514
  DBUG_ENTER("show_master_info");
  List<Item> field_list;
  field_list.push_back(new Item_empty_string("Master_Host",
1515
						     sizeof(mi->host)));
unknown's avatar
unknown committed
1516
  field_list.push_back(new Item_empty_string("Master_User",
1517
						     sizeof(mi->user)));
unknown's avatar
unknown committed
1518 1519
  field_list.push_back(new Item_empty_string("Master_Port", 6));
  field_list.push_back(new Item_empty_string("Connect_retry", 6));
1520
  field_list.push_back(new Item_empty_string("Master_Log_File",
unknown's avatar
unknown committed
1521
						     FN_REFLEN));
1522 1523 1524 1525 1526 1527 1528 1529
  field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12));
  field_list.push_back(new Item_empty_string("Relay_Log_File",
						     FN_REFLEN));
  field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12));
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
						     FN_REFLEN));
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
unknown's avatar
unknown committed
1530 1531
  field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
  field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
1532 1533 1534
  field_list.push_back(new Item_empty_string("Last_errno", 4));
  field_list.push_back(new Item_empty_string("Last_error", 20));
  field_list.push_back(new Item_empty_string("Skip_counter", 12));
1535
  field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12));
unknown's avatar
unknown committed
1536
  field_list.push_back(new Item_empty_string("Relay_log_space", 12));
unknown's avatar
unknown committed
1537
  if (send_fields(thd, field_list, 1))
unknown's avatar
unknown committed
1538 1539
    DBUG_RETURN(-1);

1540 1541 1542 1543
  if (mi->host[0])
  {
    String *packet= &thd->packet;
    packet->length(0);
unknown's avatar
unknown committed
1544
  
1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567
    pthread_mutex_lock(&mi->data_lock);
    pthread_mutex_lock(&mi->rli.data_lock);
    net_store_data(packet, mi->host);
    net_store_data(packet, mi->user);
    net_store_data(packet, (uint32) mi->port);
    net_store_data(packet, (uint32) mi->connect_retry);
    net_store_data(packet, mi->master_log_name);
    net_store_data(packet, (longlong) mi->master_log_pos);
    net_store_data(packet, mi->rli.relay_log_name +
		   dirname_length(mi->rli.relay_log_name));
    net_store_data(packet, (longlong) mi->rli.relay_log_pos);
    net_store_data(packet, mi->rli.master_log_name);
    net_store_data(packet, mi->slave_running ? "Yes":"No");
    net_store_data(packet, mi->rli.slave_running ? "Yes":"No");
    net_store_data(packet, &replicate_do_db);
    net_store_data(packet, &replicate_ignore_db);
    net_store_data(packet, (uint32)mi->rli.last_slave_errno);
    net_store_data(packet, mi->rli.last_slave_error);
    net_store_data(packet, mi->rli.slave_skip_counter);
    net_store_data(packet, (longlong) mi->rli.master_log_pos);
    net_store_data(packet, (longlong) mi->rli.log_space_total);
    pthread_mutex_unlock(&mi->rli.data_lock);
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
1568
  
1569 1570 1571
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
      DBUG_RETURN(-1);
  }
unknown's avatar
unknown committed
1572 1573 1574 1575
  send_eof(&thd->net);
  DBUG_RETURN(0);
}

1576 1577

bool flush_master_info(MASTER_INFO* mi)
unknown's avatar
unknown committed
1578
{
unknown's avatar
unknown committed
1579
  IO_CACHE* file = &mi->file;
unknown's avatar
unknown committed
1580
  char lbuf[22];
1581 1582 1583
  DBUG_ENTER("flush_master_info");
  DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));

unknown's avatar
unknown committed
1584
  my_b_seek(file, 0L);
unknown's avatar
unknown committed
1585
  my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
1586 1587 1588 1589
	      mi->master_log_name, llstr(mi->master_log_pos, lbuf),
	      mi->host, mi->user,
	      mi->password, mi->port, mi->connect_retry
	      );
unknown's avatar
unknown committed
1590
  flush_io_cache(file);
1591
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1592 1593
}

unknown's avatar
unknown committed
1594 1595 1596 1597 1598 1599

st_relay_log_info::st_relay_log_info()
  :info_fd(-1), cur_log_fd(-1), master_log_pos(0), save_temporary_tables(0),
   cur_log_old_open_count(0), log_space_total(0), 
   slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0),
   sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0),
1600
   slave_running(0), skip_log_purge(0),
unknown's avatar
unknown committed
1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629
   inside_transaction(0) /* the default is autocommit=1 */
{
  relay_log_name[0] = master_log_name[0] = 0;
  last_slave_error[0]=0;
  

  bzero(&info_file,sizeof(info_file));
  bzero(&cache_buf, sizeof(cache_buf));
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
  pthread_cond_init(&data_cond, NULL);
  pthread_cond_init(&start_cond, NULL);
  pthread_cond_init(&stop_cond, NULL);
  pthread_cond_init(&log_space_cond, NULL);
}


st_relay_log_info::~st_relay_log_info()
{
  pthread_mutex_destroy(&run_lock);
  pthread_mutex_destroy(&data_lock);
  pthread_mutex_destroy(&log_space_lock);
  pthread_cond_destroy(&data_cond);
  pthread_cond_destroy(&start_cond);
  pthread_cond_destroy(&stop_cond);
  pthread_cond_destroy(&log_space_cond);
}

1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653
/*
  Waits until the SQL thread reaches (has executed up to) the
  log/position or timed out.

  SYNOPSIS
    wait_for_pos()
    thd             client thread that sent SELECT MASTER_POS_WAIT
    log_name        log name to wait for
    log_pos         position to wait for 
    timeout         timeout in seconds before giving up waiting

  NOTES
    timeout is longlong whereas it should be ulong ; but this is
    to catch if the user submitted a negative timeout.

  RETURN VALUES
    -2          improper arguments (log_pos<0)
                or slave not running, or master info changed
                during the function's execution,
                or client thread killed. -2 is translated to NULL by caller
    -1          timed out
    >=0         number of log events the function had to wait
                before reaching the desired log/position
 */
1654

1655
int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
1656 1657
                                    longlong log_pos,
                                    longlong timeout)
unknown's avatar
unknown committed
1658
{
1659 1660
  if (!inited)
    return -1;
unknown's avatar
unknown committed
1661
  int event_count = 0;
1662
  ulong init_abort_pos_wait;
1663 1664 1665 1666
  int error=0;
  struct timespec abstime; // for timeout checking
  set_timespec(abstime,timeout);

1667
  DBUG_ENTER("wait_for_pos");
1668 1669 1670
  DBUG_PRINT("enter",("master_log_name: '%s'  pos: %lu timeout: %ld",
                      master_log_name, (ulong) master_log_pos, 
                      (long) timeout));
1671

1672
  pthread_mutex_lock(&data_lock);
1673 1674 1675 1676 1677 1678 1679
  /* 
     This function will abort when it notices that
     some CHANGE MASTER or RESET MASTER has changed
     the master info. To catch this, these commands
     modify abort_pos_wait ; we just monitor abort_pos_wait
     and see if it has changed.
  */
1680
  init_abort_pos_wait= abort_pos_wait;
1681

1682 1683 1684 1685 1686 1687 1688 1689
  /*
    We'll need to 
    handle all possible log names comparisons (e.g. 999 vs 1000).
    We use ulong for string->number conversion ; this is no 
    stronger limitation than in find_uniq_filename in sql/log.cc
  */
  ulong log_name_extension;
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
unknown's avatar
unknown committed
1690 1691
  char *end= strmake(log_name_tmp, log_name->ptr(), min(log_name->length(),
							FN_REFLEN-1));
1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712
  char *p= fn_ext(log_name_tmp);
  char *p_end;
  if (!*p || log_pos<0)   
  {
    error= -2; //means improper arguments
    goto err;
  }
  //p points to '.'
  log_name_extension= strtoul(++p, &p_end, 10);
  /*
    p_end points to the first invalid character.
    If it equals to p, no digits were found, error.
    If it contains '\0' it means conversion went ok.
  */
  if (p_end==p || *p_end)
  {
    error= -2;
    goto err;
  }    

  //"compare and wait" main loop
1713
  while (!thd->killed &&
1714 1715
         init_abort_pos_wait == abort_pos_wait &&
         mi->slave_running)
unknown's avatar
unknown committed
1716
  {
1717 1718
    bool pos_reached;
    int cmp_result= 0;
1719 1720
    DBUG_ASSERT(*master_log_name || master_log_pos == 0);
    if (*master_log_name)
unknown's avatar
unknown committed
1721
    {
1722
      char *basename= master_log_name + dirname_length(master_log_name);
unknown's avatar
unknown committed
1723
      /*
1724 1725 1726 1727
        First compare the parts before the extension.
        Find the dot in the master's log basename,
        and protect against user's input error :
        if the names do not match up to '.' included, return error
1728
      */
1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741
      char *q= (char*)(fn_ext(basename)+1);
      if (strncmp(basename, log_name_tmp, (int)(q-basename)))
      {
        error= -2;
        break;
      }
      // Now compare extensions.
      char *q_end;
      ulong master_log_name_extension= strtoul(q, &q_end, 10);
      if (master_log_name_extension < log_name_extension)
        cmp_result = -1 ;
      else
        cmp_result= (master_log_name_extension > log_name_extension) ? 1 : 0 ;
unknown's avatar
unknown committed
1742
    }
1743 1744
    pos_reached = ((!cmp_result && master_log_pos >= (ulonglong)log_pos) ||
                   cmp_result > 0);
unknown's avatar
unknown committed
1745 1746
    if (pos_reached || thd->killed)
      break;
1747 1748

    //wait for master update, with optional timeout.
1749
    
unknown's avatar
unknown committed
1750
    DBUG_PRINT("info",("Waiting for master update"));
1751
    const char* msg = thd->enter_cond(&data_cond, &data_lock,
1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769
                                      "Waiting for master update");
    if (timeout > 0)
    {
      /*
        Note that pthread_cond_timedwait checks for the timeout
        before for the condition ; i.e. it returns ETIMEDOUT 
        if the system time equals or exceeds the time specified by abstime
        before the condition variable is signaled or broadcast, _or_ if
        the absolute time specified by abstime has already passed at the time
        of the call.
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
        even if its condition is always immediately signaled (case of a loaded
        master).
      */
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
    }
    else
      pthread_cond_wait(&data_cond, &data_lock);
1770
    thd->exit_cond(msg);
1771 1772 1773 1774 1775
    if (error == ETIMEDOUT || error == ETIME)
    {
      error= -1;
      break;
    }
unknown's avatar
unknown committed
1776
    error=0;
1777 1778
    event_count++;
  }
1779 1780

err:
1781
  pthread_mutex_unlock(&data_lock);
1782
  DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
unknown's avatar
unknown committed
1783
improper_arguments: %d  timed_out: %d",
1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794
                     (int) thd->killed,
                     (int) (init_abort_pos_wait != abort_pos_wait),
                     (int) mi->slave_running,
                     (int) (error == -2),
                     (int) (error == -1)));
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
      !mi->slave_running) 
  {
    error= -2;
  }
  DBUG_RETURN( error ? error : event_count );
unknown's avatar
unknown committed
1795 1796
}

1797

1798
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
unknown's avatar
unknown committed
1799 1800 1801
{
  DBUG_ENTER("init_slave_thread");
  thd->system_thread = thd->bootstrap = 1;
1802
  thd->host_or_ip= "";
unknown's avatar
unknown committed
1803 1804
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
unknown's avatar
unknown committed
1805
  thd->net.read_timeout = slave_net_timeout;
unknown's avatar
unknown committed
1806 1807
  thd->master_access= ~0;
  thd->priv_user = 0;
1808
  thd->slave_thread = 1;
1809
  thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ;
unknown's avatar
unknown committed
1810
  thd->client_capabilities = CLIENT_LOCAL_FILES;
1811
  thd->real_id=pthread_self();
unknown's avatar
unknown committed
1812 1813 1814 1815
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

1816
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
1817 1818 1819 1820 1821
  {
    end_thread(thd,0);
    DBUG_RETURN(-1);
  }

unknown's avatar
unknown committed
1822
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
unknown's avatar
unknown committed
1823 1824 1825 1826 1827
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

1828
  if (thd->variables.max_join_size == HA_POS_ERROR)
unknown's avatar
unknown committed
1829 1830
    thd->options |= OPTION_BIG_SELECTS;

1831
  if (thd_type == SLAVE_THD_SQL)
1832
    thd->proc_info= "Waiting for the next event in slave queue";
1833
  else
1834
    thd->proc_info= "Waiting for master update";
unknown's avatar
unknown committed
1835 1836 1837 1838 1839
  thd->version=refresh_version;
  thd->set_time();
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1840

1841 1842
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg)
unknown's avatar
unknown committed
1843
{
1844
  int nap_time;
unknown's avatar
unknown committed
1845 1846 1847 1848 1849
  thr_alarm_t alarmed;
  thr_alarm_init(&alarmed);
  time_t start_time= time((time_t*) 0);
  time_t end_time= start_time+sec;

1850
  while ((nap_time= (int) (end_time - start_time)) > 0)
unknown's avatar
unknown committed
1851
  {
1852
    ALARM alarm_buff;
unknown's avatar
unknown committed
1853
    /*
1854
      The only reason we are asking for alarm is so that
unknown's avatar
unknown committed
1855 1856 1857
      we will be woken up in case of murder, so if we do not get killed,
      set the alarm so it goes off after we wake up naturally
    */
1858
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
unknown's avatar
unknown committed
1859
    sleep(nap_time);
1860
    thr_end_alarm(&alarmed);
unknown's avatar
unknown committed
1861
    
1862
    if ((*thread_killed)(thd,thread_killed_arg))
unknown's avatar
unknown committed
1863 1864 1865 1866 1867 1868
      return 1;
    start_time=time((time_t*) 0);
  }
  return 0;
}

1869

unknown's avatar
unknown committed
1870 1871
static int request_dump(MYSQL* mysql, MASTER_INFO* mi,
			bool *suppress_warnings)
unknown's avatar
unknown committed
1872
{
1873
  char buf[FN_REFLEN + 10];
unknown's avatar
unknown committed
1874 1875
  int len;
  int binlog_flags = 0; // for now
1876
  char* logname = mi->master_log_name;
1877 1878
  DBUG_ENTER("request_dump");

unknown's avatar
unknown committed
1879 1880
  // TODO if big log files: Change next to int8store()
  int4store(buf, (longlong) mi->master_log_pos);
unknown's avatar
unknown committed
1881
  int2store(buf + 4, binlog_flags);
1882
  int4store(buf + 6, server_id);
unknown's avatar
unknown committed
1883
  len = (uint) strlen(logname);
1884
  memcpy(buf + 10, logname,len);
unknown's avatar
unknown committed
1885 1886
  if (mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
  {
unknown's avatar
unknown committed
1887 1888 1889 1890 1891
    /*
      Something went wrong, so we will just reconnect and retry later
      in the future, we should do a better error analysis, but for
      now we just fill up the error log :-)
    */
unknown's avatar
unknown committed
1892 1893 1894
    if (mc_mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
      *suppress_warnings= 1;			// Suppress reconnect warning
    else
1895 1896 1897 1898
      sql_print_error("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs",
		      mc_mysql_errno(mysql), mc_mysql_error(mysql),
		      master_connect_retry);
    DBUG_RETURN(1);
unknown's avatar
unknown committed
1899
  }
unknown's avatar
unknown committed
1900

1901
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1902 1903
}

1904

1905
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
unknown's avatar
unknown committed
1906 1907 1908
{
  char buf[1024];
  char * p = buf;
unknown's avatar
unknown committed
1909 1910
  uint table_len = (uint) strlen(table);
  uint db_len = (uint) strlen(db);
unknown's avatar
unknown committed
1911
  if (table_len + db_len > sizeof(buf) - 2)
unknown's avatar
unknown committed
1912 1913 1914 1915
  {
    sql_print_error("request_table_dump: Buffer overrun");
    return 1;
  } 
unknown's avatar
unknown committed
1916 1917 1918 1919 1920 1921 1922
  
  *p++ = db_len;
  memcpy(p, db, db_len);
  p += db_len;
  *p++ = table_len;
  memcpy(p, table, table_len);
  
unknown's avatar
unknown committed
1923 1924 1925
  if (mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
  {
    sql_print_error("request_table_dump: Error sending the table dump \
unknown's avatar
unknown committed
1926
command");
unknown's avatar
unknown committed
1927 1928
    return 1;
  }
unknown's avatar
unknown committed
1929 1930 1931 1932

  return 0;
}

1933

unknown's avatar
unknown committed
1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944
/*
  read one event from the master
  
  SYNOPSIS
    read_event()
    mysql		MySQL connection
    mi			Master connection information
    suppress_warnings	TRUE when a normal net read timeout has caused us to
			try a reconnect.  We do not want to print anything to
			the error log in this case because this a anormal
			event in an idle server.
1945

unknown's avatar
unknown committed
1946 1947 1948 1949 1950 1951 1952
    RETURN VALUES
    'packet_error'	Error
    number		Length of packet

*/

static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
unknown's avatar
unknown committed
1953
{
1954
  ulong len;
unknown's avatar
unknown committed
1955

1956
  *suppress_warnings= 0;
unknown's avatar
unknown committed
1957 1958 1959
  /*
    my_real_read() will time us out
    We check if we were told to die, and if not, try reading again
1960 1961

    TODO:  Move 'events_till_disconnect' to the MASTER_INFO structure
unknown's avatar
unknown committed
1962
  */
unknown's avatar
unknown committed
1963
#ifndef DBUG_OFF
unknown's avatar
unknown committed
1964
  if (disconnect_slave_event_count && !(events_till_disconnect--))
unknown's avatar
unknown committed
1965 1966 1967
    return packet_error;      
#endif
  
unknown's avatar
unknown committed
1968
  len = mc_net_safe_read(mysql);
1969
  if (len == packet_error || (long) len < 1)
unknown's avatar
unknown committed
1970
  {
unknown's avatar
unknown committed
1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981
    if (mc_mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
    {
      /*
	We are trying a normal reconnect after a read timeout;
	we suppress prints to .err file as long as the reconnect
	happens without problems
      */
      *suppress_warnings= TRUE;
    }
    else
      sql_print_error("Error reading packet from server: %s (\
unknown's avatar
unknown committed
1982
server_errno=%d)",
unknown's avatar
unknown committed
1983
		      mc_mysql_error(mysql), mc_mysql_errno(mysql));
unknown's avatar
unknown committed
1984 1985 1986
    return packet_error;
  }

unknown's avatar
unknown committed
1987 1988
  if (len == 1)
  {
unknown's avatar
unknown committed
1989
     sql_print_error("Slave: received 0 length packet from server, apparent\
unknown's avatar
unknown committed
1990 1991
 master shutdown: %s",
		     mc_mysql_error(mysql));
unknown's avatar
unknown committed
1992
     return packet_error;
unknown's avatar
unknown committed
1993
  }
unknown's avatar
unknown committed
1994 1995
  
  DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
1996
		      len, mysql->net.read_pos[4]));
unknown's avatar
unknown committed
1997 1998 1999
  return len - 1;   
}

unknown's avatar
unknown committed
2000

2001
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
2002
{
unknown's avatar
unknown committed
2003 2004 2005 2006 2007 2008 2009
  switch (expected_error) {
  case ER_NET_READ_ERROR:
  case ER_NET_ERROR_ON_WRITE:  
  case ER_SERVER_SHUTDOWN:  
  case ER_NEW_ABORTING_CONNECTION:
    my_snprintf(rli->last_slave_error, sizeof(rli->last_slave_error), 
		"Slave: query '%s' partially completed on the master \
2010 2011
and was aborted. There is a chance that your master is inconsistent at this \
point. If you are sure that your master is ok, run this query manually on the\
2012
 slave and then restart the slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1;\
2013
 SLAVE START;", thd->query);
unknown's avatar
unknown committed
2014 2015 2016 2017 2018 2019
    rli->last_slave_errno = expected_error;
    sql_print_error("%s",rli->last_slave_error);
    return 1;
  default:
    return 0;
  }
2020
}
2021

2022

2023
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
unknown's avatar
unknown committed
2024
{
2025 2026 2027
  DBUG_ASSERT(rli->sql_thd==thd);
  Log_event * ev = next_event(rli);
  DBUG_ASSERT(rli->sql_thd==thd);
2028
  if (sql_slave_killed(thd,rli))
2029
    return 1;
2030 2031
  if (ev)
  {
2032
    int type_code = ev->get_type_code();
2033
    int exec_res;
2034
    pthread_mutex_lock(&rli->data_lock);
2035 2036 2037 2038 2039 2040 2041 2042 2043

    /*
      Skip queries originating from this server or number of
      queries specified by the user in slave_skip_counter
      We can't however skip event's that has something to do with the
      log files themselves.
    */

    if (ev->server_id == (uint32) ::server_id ||
2044
	(rli->slave_skip_counter && type_code != ROTATE_EVENT))
unknown's avatar
unknown committed
2045
    {
2046 2047
      /* TODO: I/O thread should not even log events with the same server id */
      rli->inc_pos(ev->get_event_len(),
2048
		   type_code != STOP_EVENT ? ev->log_pos : LL(0),
2049 2050
		   1/* skip lock*/);
      flush_relay_log_info(rli);
unknown's avatar
unknown committed
2051 2052 2053 2054 2055 2056 2057 2058

      /*
	Protect against common user error of setting the counter to 1
	instead of 2 while recovering from an failed auto-increment insert
      */
      if (rli->slave_skip_counter && 
	  !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
	    rli->slave_skip_counter == 1))
2059 2060
        --rli->slave_skip_counter;
      pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
2061 2062 2063
      delete ev;     
      return 0;					// avoid infinite update loops
    }
2064
    pthread_mutex_unlock(&rli->data_lock);
2065 2066
  
    thd->server_id = ev->server_id; // use the original server id for logging
unknown's avatar
unknown committed
2067
    thd->set_time();				// time the query
unknown's avatar
unknown committed
2068
    if (!ev->when)
unknown's avatar
unknown committed
2069
      ev->when = time(NULL);
2070
    ev->thd = thd;
2071 2072 2073
    thd->log_pos = ev->log_pos;
    exec_res = ev->exec_event(rli);
    DBUG_ASSERT(rli->sql_thd==thd);
2074 2075
    delete ev;
    return exec_res;
2076
  }
unknown's avatar
unknown committed
2077
  else
2078
  {
unknown's avatar
unknown committed
2079 2080 2081 2082
    sql_print_error("\
Could not parse log event entry, check the master for binlog corruption\n\
This may also be a network problem, or just a bug in the master or slave code.\
");
2083 2084
    return 1;
  }
unknown's avatar
unknown committed
2085 2086
}

2087

2088
/* slave I/O thread */
2089
extern "C" pthread_handler_decl(handle_slave_io,arg)
unknown's avatar
unknown committed
2090
{
unknown's avatar
unknown committed
2091 2092 2093 2094 2095 2096 2097 2098 2099
  THD *thd; // needs to be first for thread_stack
  MYSQL *mysql;
  MASTER_INFO *mi = (MASTER_INFO*)arg; 
  char llbuff[22];
  uint retry_count;
  
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();

unknown's avatar
unknown committed
2100
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2101
slave_begin:  
unknown's avatar
unknown committed
2102
#endif  
2103
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
2104 2105 2106
  mysql= NULL ;
  retry_count= 0;

2107
  pthread_mutex_lock(&mi->run_lock);
unknown's avatar
unknown committed
2108 2109 2110
  /* Inform waiting threads that slave has started */
  mi->slave_run_id++;

2111
#ifndef DBUG_OFF  
2112
  mi->events_till_abort = abort_slave_event_count;
2113
#endif  
unknown's avatar
unknown committed
2114
  
2115
  thd= new THD; // note that contructor of THD uses DBUG_ !
2116
  DBUG_ENTER("handle_slave_io");
2117
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
2118 2119

  pthread_detach_this_thread();
2120
  if (init_slave_thread(thd, SLAVE_THD_IO))
unknown's avatar
unknown committed
2121 2122 2123 2124 2125 2126
  {
    pthread_cond_broadcast(&mi->start_cond);
    pthread_mutex_unlock(&mi->run_lock);
    sql_print_error("Failed during slave I/O thread initialization");
    goto err;
  }
2127
  mi->io_thd = thd;
unknown's avatar
unknown committed
2128
  thd->thread_stack = (char*)&thd; // remember where our stack is
2129
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
2130
  threads.append(thd);
2131
  pthread_mutex_unlock(&LOCK_thread_count);
2132 2133 2134
  mi->slave_running = 1;
  mi->abort_slave = 0;
  pthread_mutex_unlock(&mi->run_lock);
2135
  pthread_cond_broadcast(&mi->start_cond);
unknown's avatar
unknown committed
2136
  
2137 2138 2139
  DBUG_PRINT("master_info",("log_file_name: '%s'  position: %s",
			    mi->master_log_name,
			    llstr(mi->master_log_pos,llbuff)));
unknown's avatar
unknown committed
2140
  
unknown's avatar
unknown committed
2141
  if (!(mi->mysql = mysql = mc_mysql_init(NULL)))
unknown's avatar
unknown committed
2142
  {
2143
    sql_print_error("Slave I/O thread: error in mc_mysql_init()");
unknown's avatar
unknown committed
2144 2145
    goto err;
  }
unknown's avatar
unknown committed
2146
  
unknown's avatar
unknown committed
2147

unknown's avatar
unknown committed
2148
  thd->proc_info = "connecting to master";
2149
  // we can get killed during safe_connect
2150
  if (!safe_connect(thd, mysql, mi))
unknown's avatar
unknown committed
2151
    sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\
2152
  replication started in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
2153 2154 2155
		    mi->host, mi->port,
		    IO_RPL_LOG_NAME,
		    llstr(mi->master_log_pos,llbuff));
2156
  else
unknown's avatar
unknown committed
2157
  {
2158
    sql_print_error("Slave I/O thread killed while connecting to master");
unknown's avatar
unknown committed
2159 2160
    goto err;
  }
2161

2162
connected:
2163

2164
  thd->slave_net = &mysql->net;
2165
  thd->proc_info = "Checking master version";
2166
  if (check_master_version(mysql, mi))
2167
    goto err;
2168
  if (!mi->old_format)
2169
  {
unknown's avatar
unknown committed
2170 2171 2172 2173 2174
    /*
      Register ourselves with the master.
      If fails, this is not fatal - we just print the error message and go
      on with life.
    */
2175
    thd->proc_info = "Registering slave on master";
2176
    if (register_slave_on_master(mysql) ||  update_slave_list(mysql, mi))
2177 2178
      goto err;
  }
unknown's avatar
unknown committed
2179
  
2180
  DBUG_PRINT("info",("Starting reading binary log from master"));
2181
  while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2182
  {
unknown's avatar
unknown committed
2183
    bool suppress_warnings= 0;    
unknown's avatar
unknown committed
2184
    thd->proc_info = "Requesting binlog dump";
unknown's avatar
unknown committed
2185
    if (request_dump(mysql, mi, &suppress_warnings))
unknown's avatar
unknown committed
2186 2187
    {
      sql_print_error("Failed on request_dump()");
unknown's avatar
unknown committed
2188
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2189 2190
      {
	sql_print_error("Slave I/O thread killed while requesting master \
unknown's avatar
unknown committed
2191
dump");
unknown's avatar
unknown committed
2192 2193
	goto err;
      }
unknown's avatar
unknown committed
2194
	  
unknown's avatar
unknown committed
2195 2196 2197 2198 2199 2200 2201
      thd->proc_info = "Waiiting to reconnect after a failed dump request";
      mc_end_server(mysql);
      /*
	First time retry immediately, assuming that we can recover
	right away - if first time fails, sleep between re-tries
	hopefuly the admin can fix the problem sometime
      */
2202 2203 2204 2205
      if (retry_count++)
      {
	if (retry_count > master_retry_count)
	  goto err;				// Don't retry forever
2206 2207
	safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
		   (void*)mi);
2208
      }
2209
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2210 2211
      {
	sql_print_error("Slave I/O thread killed while retrying master \
unknown's avatar
unknown committed
2212
dump");
unknown's avatar
unknown committed
2213 2214
	goto err;
      }
unknown's avatar
unknown committed
2215

unknown's avatar
unknown committed
2216
      thd->proc_info = "Reconnecting after a failed dump request";
unknown's avatar
unknown committed
2217 2218
      if (!suppress_warnings)
	sql_print_error("Slave I/O thread: failed dump request, \
2219
reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
2220 2221 2222
			llstr(mi->master_log_pos,llbuff));
      if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	  io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2223 2224
      {
	sql_print_error("Slave I/O thread killed during or \
2225
after reconnect");
unknown's avatar
unknown committed
2226 2227
	goto err;
      }
unknown's avatar
unknown committed
2228

unknown's avatar
unknown committed
2229 2230
      goto connected;
    }
unknown's avatar
unknown committed
2231

2232
    while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2233
    {
unknown's avatar
unknown committed
2234
      bool suppress_warnings= 0;    
unknown's avatar
unknown committed
2235
      thd->proc_info = "Reading master update";
unknown's avatar
unknown committed
2236
      ulong event_len = read_event(mysql, mi, &suppress_warnings);
2237
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2238
      {
2239 2240
	if (global_system_variables.log_warnings)
	  sql_print_error("Slave I/O thread killed while reading event");
unknown's avatar
unknown committed
2241 2242
	goto err;
      }
2243
	  	  
unknown's avatar
unknown committed
2244 2245
      if (event_len == packet_error)
      {
2246 2247
	uint mysql_error_number= mc_mysql_errno(mysql);
	if (mysql_error_number == ER_NET_PACKET_TOO_LARGE)
unknown's avatar
unknown committed
2248
	{
2249 2250 2251 2252
	  sql_print_error("\
Log entry on master is longer than max_allowed_packet (%ld) on \
slave. If the entry is correct, restart the server with a higher value of \
max_allowed_packet",
unknown's avatar
unknown committed
2253
			  thd->variables.max_allowed_packet);
unknown's avatar
unknown committed
2254 2255
	  goto err;
	}
2256 2257 2258 2259 2260 2261
	if (mysql_error_number == ER_MASTER_FATAL_ERROR_READING_BINLOG)
	{
	  sql_print_error(ER(mysql_error_number), mysql_error_number,
			  mc_mysql_error(mysql));
	  goto err;
	}
unknown's avatar
unknown committed
2262 2263
	thd->proc_info = "Waiting to reconnect after a failed read";
	mc_end_server(mysql);
2264 2265 2266 2267
	if (retry_count++)
	{
	  if (retry_count > master_retry_count)
	    goto err;				// Don't retry forever
2268
	  safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
2269 2270
		     (void*) mi);
	}	    
2271
	if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2272
	{
2273 2274
	  if (global_system_variables.log_warnings)
	    sql_print_error("Slave I/O thread killed while waiting to \
unknown's avatar
unknown committed
2275
reconnect after a failed read");
unknown's avatar
unknown committed
2276 2277 2278
	  goto err;
	}
	thd->proc_info = "Reconnecting after a failed read";
unknown's avatar
unknown committed
2279 2280
	if (!suppress_warnings)
	  sql_print_error("Slave I/O thread: Failed reading log event, \
2281
reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
2282 2283 2284
			  llstr(mi->master_log_pos, llbuff));
	if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	    io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2285
	{
2286 2287
	  if (global_system_variables.log_warnings)
	    sql_print_error("Slave I/O thread killed during or after a \
unknown's avatar
unknown committed
2288
reconnect done to recover from failed read");
unknown's avatar
unknown committed
2289 2290 2291
	  goto err;
	}
	goto connected;
unknown's avatar
unknown committed
2292
      } // if (event_len == packet_error)
unknown's avatar
unknown committed
2293
	  
2294
      retry_count=0;			// ok event, reset retry counter
unknown's avatar
unknown committed
2295 2296 2297 2298
      thd->proc_info = "Queueing event from master";
      if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
		      event_len))
      {
2299
	sql_print_error("Slave I/O thread could not queue event from master");
unknown's avatar
unknown committed
2300 2301
	goto err;
      }
2302
      flush_master_info(mi);
unknown's avatar
unknown committed
2303 2304 2305 2306 2307 2308 2309 2310
      if (mi->rli.log_space_limit && mi->rli.log_space_limit <
	  mi->rli.log_space_total)
	if (wait_for_relay_log_space(&mi->rli))
	{
	  sql_print_error("Slave I/O thread aborted while waiting for relay \
log space");
	  goto err;
	}
unknown's avatar
unknown committed
2311
      // TODO: check debugging abort code
2312
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2313 2314 2315 2316 2317
      if (abort_slave_event_count && !--events_till_abort)
      {
	sql_print_error("Slave I/O thread: debugging abort");
	goto err;
      }
2318
#endif
2319
    } 
2320
  }
unknown's avatar
unknown committed
2321

unknown's avatar
unknown committed
2322
  // error = 0;
unknown's avatar
unknown committed
2323
err:
2324 2325 2326
  // print the current replication position
  sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
		  IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2327
  VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
2328
  thd->query = thd->db = 0; // extra safety
2329
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
2330 2331
  if (mysql)
  {
unknown's avatar
unknown committed
2332
    mc_mysql_close(mysql);
unknown's avatar
unknown committed
2333 2334
    mi->mysql=0;
  }
unknown's avatar
unknown committed
2335
  thd->proc_info = "Waiting for slave mutex on exit";
2336 2337 2338 2339
  pthread_mutex_lock(&mi->run_lock);
  mi->slave_running = 0;
  mi->io_thd = 0;
  // TODO: make rpl_status part of MASTER_INFO
2340
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2341 2342
  mi->abort_slave = 0; // TODO: check if this is needed
  DBUG_ASSERT(thd->net.buff != 0);
unknown's avatar
unknown committed
2343
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
2344
  pthread_mutex_lock(&LOCK_thread_count);
2345
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
2346
  delete thd;
2347
  pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
2348
  pthread_cond_broadcast(&mi->stop_cond);	// tell the world we are done
2349
  pthread_mutex_unlock(&mi->run_lock);
unknown's avatar
unknown committed
2350
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2351
  if (abort_slave_event_count && !events_till_abort)
unknown's avatar
unknown committed
2352 2353
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
2354 2355
  my_thread_end();
#ifndef __NETWARE__
unknown's avatar
unknown committed
2356
  pthread_exit(0);
unknown's avatar
unknown committed
2357
#endif /* __NETWARE__ */
unknown's avatar
unknown committed
2358 2359 2360
  DBUG_RETURN(0);				// Can't return anything here
}

unknown's avatar
unknown committed
2361

2362
/* slave SQL logic thread */
unknown's avatar
unknown committed
2363

2364
extern "C" pthread_handler_decl(handle_slave_sql,arg)
2365
{
2366
  THD *thd;			/* needs to be first for thread_stack */
2367 2368
  char llbuff[22],llbuff1[22];
  RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli; 
unknown's avatar
unknown committed
2369 2370 2371 2372 2373 2374 2375 2376 2377
  const char *errmsg;

  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();

#ifndef DBUG_OFF
slave_begin:  
#endif  

2378 2379 2380
  DBUG_ASSERT(rli->inited);
  pthread_mutex_lock(&rli->run_lock);
  DBUG_ASSERT(!rli->slave_running);
unknown's avatar
unknown committed
2381
  errmsg= 0;
2382 2383 2384 2385
#ifndef DBUG_OFF  
  rli->events_till_abort = abort_slave_event_count;
#endif  
  DBUG_ENTER("handle_slave_sql");
2386

unknown's avatar
unknown committed
2387
  thd = new THD; // note that contructor of THD uses DBUG_ !
2388
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
2389 2390 2391
  /* Inform waiting threads that slave has started */
  rli->slave_run_id++;

2392 2393
  pthread_detach_this_thread();
  if (init_slave_thread(thd, SLAVE_THD_SQL))
unknown's avatar
unknown committed
2394 2395 2396 2397 2398 2399 2400 2401 2402 2403
  {
    /*
      TODO: this is currently broken - slave start and change master
      will be stuck if we fail here
    */
    pthread_cond_broadcast(&rli->start_cond);
    pthread_mutex_unlock(&rli->run_lock);
    sql_print_error("Failed during slave thread initialization");
    goto err;
  }
2404
  rli->sql_thd= thd;
2405
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2406 2407
  thd->thread_stack = (char*)&thd; // remember where our stack is
  pthread_mutex_lock(&LOCK_thread_count);
2408
  threads.append(thd);
2409
  pthread_mutex_unlock(&LOCK_thread_count);
2410 2411 2412
  rli->slave_running = 1;
  rli->abort_slave = 0;
  pthread_mutex_unlock(&rli->run_lock);
2413
  pthread_cond_broadcast(&rli->start_cond);
unknown's avatar
unknown committed
2414 2415
  // This should always be set to 0 when the slave thread is started
  rli->pending = 0;
2416 2417 2418 2419
  if (init_relay_log_pos(rli,
			 rli->relay_log_name,
			 rli->relay_log_pos,
			 1 /*need data lock*/, &errmsg))
2420 2421 2422 2423 2424
  {
    sql_print_error("Error initializing relay log position: %s",
		    errmsg);
    goto err;
  }
2425
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
2426
  DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
2427 2428
  DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
  DBUG_ASSERT(rli->sql_thd == thd);
2429 2430 2431 2432

  DBUG_PRINT("master_info",("log_file_name: %s  position: %s",
			    rli->master_log_name,
			    llstr(rli->master_log_pos,llbuff)));
2433 2434
  if (global_system_variables.log_warnings)
    sql_print_error("Slave SQL thread initialized, starting replication in \
unknown's avatar
unknown committed
2435
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
2436 2437 2438 2439 2440
		    llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
		    llstr(rli->relay_log_pos,llbuff1));

  /* Read queries from the IO/THREAD until this thread is killed */

2441
  while (!sql_slave_killed(thd,rli))
2442 2443 2444
  {
    thd->proc_info = "Processing master log event"; 
    DBUG_ASSERT(rli->sql_thd == thd);
2445
    THD_CHECK_SENTRY(thd);
2446 2447 2448
    if (exec_relay_log_event(thd,rli))
    {
      // do not scare the user if SQL thread was simply killed or stopped
2449
      if (!sql_slave_killed(thd,rli))
2450 2451
        sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \
unknown's avatar
unknown committed
2452
the slave SQL thread with \"SLAVE START\". We stopped at log \
2453 2454 2455 2456
'%s' position %s",
		      RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
      goto err;
    }
2457
  }
2458

2459
  /* Thread stopped. Print the current replication position to the log */
2460 2461 2462
  sql_print_error("Slave SQL thread exiting, replication stopped in log \
 '%s' at position %s",
		  RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff));
2463 2464

 err:
2465
  VOID(pthread_mutex_lock(&LOCK_thread_count));
2466
  thd->query = thd->db = 0; // extra safety
2467
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
2468 2469 2470 2471 2472
  thd->proc_info = "Waiting for slave mutex on exit";
  pthread_mutex_lock(&rli->run_lock);
  DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
  rli->slave_running = 0;
  rli->save_temporary_tables = thd->temporary_tables;
unknown's avatar
unknown committed
2473 2474 2475 2476 2477

  /*
    TODO: see if we can do this conditionally in next_event() instead
    to avoid unneeded position re-init
  */
2478 2479 2480 2481
  thd->temporary_tables = 0; // remove tempation from destructor to close them
  DBUG_ASSERT(thd->net.buff != 0);
  net_end(&thd->net); // destructor will not free it, because we are weird
  DBUG_ASSERT(rli->sql_thd == thd);
2482
  THD_CHECK_SENTRY(thd);
2483
  rli->sql_thd= 0;
2484
  pthread_mutex_lock(&LOCK_thread_count);
2485
  THD_CHECK_SENTRY(thd);
2486 2487 2488 2489 2490 2491 2492 2493 2494
  delete thd;
  pthread_mutex_unlock(&LOCK_thread_count);
  pthread_cond_broadcast(&rli->stop_cond);
  // tell the world we are done
  pthread_mutex_unlock(&rli->run_lock);
#ifndef DBUG_OFF // TODO: reconsider the code below
  if (abort_slave_event_count && !rli->events_till_abort)
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
2495 2496
  my_thread_end(); // clean-up before broadcasting termination
#ifndef __NETWARE__
2497
  pthread_exit(0);
unknown's avatar
unknown committed
2498
#endif /* __NETWARE__ */
2499 2500
  DBUG_RETURN(0);				// Can't return anything here
}
unknown's avatar
unknown committed
2501

unknown's avatar
unknown committed
2502

unknown's avatar
unknown committed
2503 2504 2505 2506 2507 2508 2509
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
  int error = 1;
  ulong num_bytes;
  bool cev_not_written;
  THD* thd;
  NET* net = &mi->mysql->net;
unknown's avatar
unknown committed
2510
  DBUG_ENTER("process_io_create_file");
unknown's avatar
unknown committed
2511 2512

  if (unlikely(!cev->is_valid()))
unknown's avatar
unknown committed
2513
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2514 2515 2516 2517 2518 2519
  /*
    TODO: fix to honor table rules, not only db rules
  */
  if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
  {
    skip_load_data_infile(net);
unknown's avatar
unknown committed
2520
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2521 2522 2523 2524
  }
  DBUG_ASSERT(cev->inited_from_old);
  thd = mi->io_thd;
  thd->file_id = cev->file_id = mi->file_id++;
2525
  thd->server_id = cev->server_id;
unknown's avatar
unknown committed
2526 2527 2528 2529 2530 2531 2532 2533 2534
  cev_not_written = 1;
  
  if (unlikely(net_request_file(net,cev->fname)))
  {
    sql_print_error("Slave I/O: failed requesting download of '%s'",
		    cev->fname);
    goto err;
  }

unknown's avatar
unknown committed
2535 2536 2537 2538
  /*
    This dummy block is so we could instantiate Append_block_log_event
    once and then modify it slightly instead of doing it multiple times
    in the loop
unknown's avatar
unknown committed
2539 2540
  */
  {
2541
    Append_block_log_event aev(thd,0,0,0);
unknown's avatar
unknown committed
2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553
  
    for (;;)
    {
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
      {
	sql_print_error("Network read error downloading '%s' from master",
			cev->fname);
	goto err;
      }
      if (unlikely(!num_bytes)) /* eof */
      {
	send_ok(net); /* 3.23 master wants it */
2554
	Execute_load_log_event xev(thd,0);
2555
	xev.log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
2556 2557 2558 2559 2560 2561
	if (unlikely(mi->rli.relay_log.append(&xev)))
	{
	  sql_print_error("Slave I/O: error writing Exec_load event to \
relay log");
	  goto err;
	}
unknown's avatar
unknown committed
2562
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
2563 2564 2565 2566 2567 2568
	break;
      }
      if (unlikely(cev_not_written))
      {
	cev->block = (char*)net->read_pos;
	cev->block_len = num_bytes;
2569
	cev->log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
2570 2571 2572 2573 2574 2575 2576
	if (unlikely(mi->rli.relay_log.append(cev)))
	{
	  sql_print_error("Slave I/O: error writing Create_file event to \
relay log");
	  goto err;
	}
	cev_not_written=0;
unknown's avatar
unknown committed
2577
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
2578 2579 2580 2581 2582
      }
      else
      {
	aev.block = (char*)net->read_pos;
	aev.block_len = num_bytes;
2583
	aev.log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
2584 2585 2586 2587 2588 2589
	if (unlikely(mi->rli.relay_log.append(&aev)))
	{
	  sql_print_error("Slave I/O: error writing Append_block event to \
relay log");
	  goto err;
	}
unknown's avatar
unknown committed
2590
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
unknown's avatar
unknown committed
2591 2592 2593 2594 2595
      }
    }
  }
  error=0;
err:
unknown's avatar
unknown committed
2596
  DBUG_RETURN(error);
unknown's avatar
unknown committed
2597
}
unknown's avatar
unknown committed
2598

unknown's avatar
unknown committed
2599
/*
unknown's avatar
unknown committed
2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616
  Start using a new binary log on the master

  SYNOPSIS
    process_io_rotate()
    mi			master_info for the slave
    rev			The rotate log event read from the binary log

  DESCRIPTION
    Updates the master info and relay data with the place in the next binary
    log where we should start reading.

  NOTES
    We assume we already locked mi->data_lock

  RETURN VALUES
    0		ok
    1	        Log event is illegal
unknown's avatar
unknown committed
2617 2618
*/

unknown's avatar
unknown committed
2619
static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
2620
{
unknown's avatar
unknown committed
2621
  int return_val= 1;
2622
  DBUG_ENTER("process_io_rotate");
unknown's avatar
unknown committed
2623
  safe_mutex_assert_owner(&mi->data_lock);
2624

unknown's avatar
unknown committed
2625
  if (unlikely(!rev->is_valid()))
2626
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2627 2628 2629 2630 2631

  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
  mi->master_log_pos= rev->pos;
  DBUG_PRINT("info", ("master_log_pos: '%s' %d",
		      mi->master_log_name, (ulong) mi->master_log_pos));
2632
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2633 2634 2635 2636 2637 2638
  /*
    If we do not do this, we will be getting the first
    rotate event forever, so we need to not disconnect after one.
  */
  if (disconnect_slave_event_count)
    events_till_disconnect++;
2639
#endif
2640
  DBUG_RETURN(0);
2641 2642
}

unknown's avatar
unknown committed
2643
/*
unknown's avatar
unknown committed
2644 2645 2646
  TODO: 
    Test this code before release - it has to be tested on a separate
    setup with 3.23 master 
unknown's avatar
unknown committed
2647 2648 2649 2650
*/

static int queue_old_event(MASTER_INFO *mi, const char *buf,
			   ulong event_len)
2651
{
unknown's avatar
unknown committed
2652
  const char *errmsg = 0;
unknown's avatar
unknown committed
2653 2654 2655 2656
  ulong inc_pos;
  bool ignore_event= 0;
  char *tmp_buf = 0;
  RELAY_LOG_INFO *rli= &mi->rli;
unknown's avatar
unknown committed
2657 2658
  DBUG_ENTER("queue_old_event");

unknown's avatar
unknown committed
2659 2660 2661
  /*
    If we get Load event, we need to pass a non-reusable buffer
    to read_log_event, so we do a trick
unknown's avatar
unknown committed
2662 2663 2664 2665 2666 2667
  */
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
  {
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
    {
      sql_print_error("Slave I/O: out of memory for Load event");
unknown's avatar
unknown committed
2668
      DBUG_RETURN(1);
unknown's avatar
unknown committed
2669 2670 2671 2672 2673
    }
    memcpy(tmp_buf,buf,event_len);
    tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer
    buf = (const char*)tmp_buf;
  }
unknown's avatar
unknown committed
2674 2675
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
					    1 /*old format*/ );
2676
  if (unlikely(!ev))
2677 2678
  {
    sql_print_error("Read invalid event from master: '%s',\
unknown's avatar
unknown committed
2679
 master could be corrupt but a more likely cause of this is a bug",
2680
		    errmsg);
unknown's avatar
unknown committed
2681 2682
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(1);
2683
  }
2684
  pthread_mutex_lock(&mi->data_lock);
2685
  ev->log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
2686
  switch (ev->get_type_code()) {
unknown's avatar
unknown committed
2687 2688 2689 2690 2691
  case STOP_EVENT:
    ignore_event= mi->ignore_stop_event;
    mi->ignore_stop_event=0;
    inc_pos= event_len;
    break;
2692
  case ROTATE_EVENT:
2693
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2694 2695
    {
      delete ev;
2696
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2697
      DBUG_RETURN(1);
2698
    }
2699
    mi->ignore_stop_event=1;
unknown's avatar
unknown committed
2700
    inc_pos= 0;
2701
    break;
unknown's avatar
unknown committed
2702 2703
  case CREATE_FILE_EVENT:
  {
unknown's avatar
unknown committed
2704 2705
    /* We come here when and only when tmp_buf != 0 */
    DBUG_ASSERT(tmp_buf);
unknown's avatar
unknown committed
2706
    int error = process_io_create_file(mi,(Create_file_log_event*)ev);
2707
    delete ev;
unknown's avatar
unknown committed
2708
    mi->master_log_pos += event_len;
2709
    DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
2710
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2711
    my_free((char*)tmp_buf, MYF(0));
unknown's avatar
unknown committed
2712
    DBUG_RETURN(error);
unknown's avatar
unknown committed
2713
  }
2714
  default:
2715
    mi->ignore_stop_event=0;
unknown's avatar
unknown committed
2716
    inc_pos= event_len;
2717 2718
    break;
  }
unknown's avatar
unknown committed
2719
  if (likely(!ignore_event))
2720
  {
unknown's avatar
unknown committed
2721
    if (unlikely(rli->relay_log.append(ev)))
2722 2723 2724
    {
      delete ev;
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2725
      DBUG_RETURN(1);
2726
    }
unknown's avatar
unknown committed
2727
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2728 2729
  }
  delete ev;
unknown's avatar
unknown committed
2730
  mi->master_log_pos+= inc_pos;
2731
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
2732
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2733
  DBUG_RETURN(0);
2734 2735
}

unknown's avatar
unknown committed
2736 2737 2738 2739 2740 2741
/*
  TODO: verify the issue with stop events, see if we need them at all
  in the relay log
*/

int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
2742
{
unknown's avatar
unknown committed
2743 2744 2745 2746
  int error= 0;
  ulong inc_pos;
  bool ignore_event= 0;
  RELAY_LOG_INFO *rli= &mi->rli;
unknown's avatar
unknown committed
2747 2748
  DBUG_ENTER("queue_event");

2749
  if (mi->old_format)
unknown's avatar
unknown committed
2750
    DBUG_RETURN(queue_old_event(mi,buf,event_len));
2751 2752

  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
2753

unknown's avatar
unknown committed
2754 2755 2756 2757 2758
  /*
    TODO: figure out if other events in addition to Rotate
    require special processing
  */
  switch (buf[EVENT_TYPE_OFFSET]) {
2759
  case STOP_EVENT:
unknown's avatar
unknown committed
2760 2761 2762
    ignore_event= mi->ignore_stop_event;
    mi->ignore_stop_event= 0;
    inc_pos= event_len;
2763
    break;
2764 2765 2766
  case ROTATE_EVENT:
  {
    Rotate_log_event rev(buf,event_len,0);
2767
    if (unlikely(process_io_rotate(mi,&rev)))
unknown's avatar
unknown committed
2768 2769
    {
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2770
      DBUG_RETURN(1);
unknown's avatar
unknown committed
2771 2772 2773
    }
    mi->ignore_stop_event= 1;
    inc_pos= 0;
2774 2775 2776
    break;
  }
  default:
unknown's avatar
unknown committed
2777 2778
    mi->ignore_stop_event= 0;
    inc_pos= event_len;
2779 2780 2781
    break;
  }
  
unknown's avatar
unknown committed
2782 2783
  if (likely(!ignore_event &&
	     !(error= rli->relay_log.appendv(buf,event_len,0))))
2784
  {
unknown's avatar
unknown committed
2785
    mi->master_log_pos+= inc_pos;
2786
    DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
unknown's avatar
unknown committed
2787
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2788
  }
2789
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2790
  DBUG_RETURN(error);
2791 2792
}

unknown's avatar
unknown committed
2793

2794 2795
void end_relay_log_info(RELAY_LOG_INFO* rli)
{
2796 2797
  DBUG_ENTER("end_relay_log_info");

2798
  if (!rli->inited)
2799
    DBUG_VOID_RETURN;
2800
  if (rli->info_fd >= 0)
unknown's avatar
unknown committed
2801 2802
  {
    end_io_cache(&rli->info_file);
2803
    (void) my_close(rli->info_fd, MYF(MY_WME));
unknown's avatar
unknown committed
2804 2805
    rli->info_fd = -1;
  }
2806
  if (rli->cur_log_fd >= 0)
unknown's avatar
unknown committed
2807 2808 2809 2810 2811
  {
    end_io_cache(&rli->cache_buf);
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
2812 2813
  rli->inited = 0;
  rli->relay_log.close(1);
2814
  DBUG_VOID_RETURN;
2815 2816 2817
}

/* try to connect until successful or slave killed */
2818
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
unknown's avatar
unknown committed
2819
{
unknown's avatar
unknown committed
2820
  return connect_to_master(thd, mysql, mi, 0, 0);
unknown's avatar
unknown committed
2821 2822
}

unknown's avatar
unknown committed
2823

2824 2825 2826 2827
/*
  Try to connect until successful or slave killed or we have retried
  master_retry_count times
*/
unknown's avatar
unknown committed
2828

unknown's avatar
unknown committed
2829
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
2830
			     bool reconnect, bool suppress_warnings)
unknown's avatar
unknown committed
2831
{
2832
  int slave_was_killed;
2833 2834
  int last_errno= -2;				// impossible error
  ulong err_count=0;
unknown's avatar
unknown committed
2835
  char llbuff[22];
2836
  DBUG_ENTER("connect_to_master");
unknown's avatar
unknown committed
2837

unknown's avatar
unknown committed
2838 2839 2840
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif
2841 2842 2843 2844
  uint client_flag=0;
  if (opt_slave_compressed_protocol)
    client_flag=CLIENT_COMPRESS;		/* We will use compression */

2845
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
2846
	 (reconnect ? mc_mysql_reconnect(mysql) != 0:
unknown's avatar
unknown committed
2847
	  !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
2848
			    mi->port, 0, client_flag,
unknown's avatar
unknown committed
2849
			    thd->variables.net_read_timeout)))
unknown's avatar
unknown committed
2850
  {
2851 2852 2853
    /* Don't repeat last error */
    if (mc_mysql_errno(mysql) != last_errno)
    {
2854
      last_errno=mc_mysql_errno(mysql);
unknown's avatar
unknown committed
2855
      suppress_warnings= 0;
2856
      sql_print_error("Slave I/O thread: error %s to master \
2857
'%s@%s:%d': \
2858
Error: '%s'  errno: %d  retry-time: %d  retries: %d",
2859
		      (reconnect ? "reconnecting" : "connecting"),
2860
		      mi->user,mi->host,mi->port,
2861
		      mc_mysql_error(mysql), last_errno,
2862 2863
		      mi->connect_retry,
		      master_retry_count);
2864
    }
unknown's avatar
unknown committed
2865 2866 2867
    /*
      By default we try forever. The reason is that failure will trigger
      master election, so if the user did not set master_retry_count we
2868
      do not want to have election triggered on the first failure to
unknown's avatar
unknown committed
2869
      connect
2870
    */
2871
    if (++err_count == master_retry_count)
2872 2873
    {
      slave_was_killed=1;
unknown's avatar
unknown committed
2874 2875
      if (reconnect)
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
2876 2877
      break;
    }
2878 2879
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
	       (void*)mi);
unknown's avatar
unknown committed
2880
  }
2881

2882 2883
  if (!slave_was_killed)
  {
unknown's avatar
unknown committed
2884
    if (reconnect)
unknown's avatar
unknown committed
2885
    { 
2886
      if (!suppress_warnings && global_system_variables.log_warnings)
unknown's avatar
unknown committed
2887
	sql_print_error("Slave: connected to master '%s@%s:%d',\
2888
replication resumed in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
2889 2890 2891 2892
			mi->host, mi->port,
			IO_RPL_LOG_NAME,
			llstr(mi->master_log_pos,llbuff));
    }
unknown's avatar
unknown committed
2893 2894 2895 2896
    else
    {
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
      mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
unknown's avatar
unknown committed
2897
		      mi->user, mi->host, mi->port);
unknown's avatar
unknown committed
2898
    }
2899
#ifdef SIGNAL_WITH_VIO_CLOSE
2900
    thd->set_active_vio(mysql->net.vio);
2901
#endif      
2902
  }
2903 2904
  DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
  DBUG_RETURN(slave_was_killed);
unknown's avatar
unknown committed
2905 2906
}

unknown's avatar
unknown committed
2907

unknown's avatar
unknown committed
2908 2909 2910 2911 2912
/*
  Try to connect until successful or slave killed or we have retried
  master_retry_count times
*/

unknown's avatar
unknown committed
2913 2914
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings)
unknown's avatar
unknown committed
2915
{
unknown's avatar
unknown committed
2916
  return connect_to_master(thd, mysql, mi, 1, suppress_warnings);
unknown's avatar
unknown committed
2917 2918
}

unknown's avatar
unknown committed
2919

2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949
/*
  Store the file and position where the execute-slave thread are in the
  relay log.

  SYNOPSIS
    flush_relay_log_info()
    rli			Relay log information

  NOTES
    - As this is only called by the slave thread, we don't need to
      have a lock on this.
    - If there is an active transaction, then we don't update the position
      in the relay log.  This is to ensure that we re-execute statements
      if we die in the middle of an transaction that was rolled back.
    - As a transaction never spans binary logs, we don't have to handle the
      case where we do a relay-log-rotation in the middle of the transaction.
      If this would not be the case, we would have to ensure that we
      don't delete the relay log file where the transaction started when
      we switch to a new relay log file.

  TODO
    - Change the log file information to a binary format to avoid calling
      longlong2str.

  RETURN VALUES
    0	ok
    1	write error
*/

bool flush_relay_log_info(RELAY_LOG_INFO* rli)
2950
{
2951 2952 2953 2954 2955 2956 2957 2958
  bool error=0;
  IO_CACHE *file = &rli->info_file;
  char buff[FN_REFLEN*2+22*2+4], *pos;

  /* sql_thd is not set when calling from init_slave() */
  if ((rli->sql_thd && rli->sql_thd->options & OPTION_BEGIN))
    return 0;					// Wait for COMMIT

2959
  my_b_seek(file, 0L);
2960 2961 2962 2963 2964 2965 2966 2967
  pos=strmov(buff, rli->relay_log_name);
  *pos++='\n';
  pos=longlong2str(rli->relay_log_pos, pos, 10);
  *pos++='\n';
  pos=strmov(pos, rli->master_log_name);
  *pos++='\n';
  pos=longlong2str(rli->master_log_pos, pos, 10);
  *pos='\n';
2968
  if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
2969 2970 2971 2972 2973 2974
    error=1;
  if (flush_io_cache(file))
    error=1;
  if (flush_io_cache(rli->cur_log))		// QQ Why this call ?
    error=1;
  return error;
2975 2976
}

unknown's avatar
unknown committed
2977 2978 2979 2980 2981 2982 2983

/*
  This function is called when we notice that the current "hot" log
  got rotated under our feet.
*/

static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
2984 2985 2986
{
  DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
  DBUG_ASSERT(rli->cur_log_fd == -1);
unknown's avatar
unknown committed
2987 2988 2989
  DBUG_ENTER("reopen_relay_log");

  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
2990
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name,
unknown's avatar
unknown committed
2991
				   errmsg)) <0)
unknown's avatar
unknown committed
2992
    DBUG_RETURN(0);
2993 2994 2995 2996 2997 2998
  /*
    We want to start exactly where we was before:
    relay_log_pos	Current log pos
    pending		Number of bytes already processed from the event
  */
  my_b_seek(cur_log,rli->relay_log_pos + rli->pending);
unknown's avatar
unknown committed
2999
  DBUG_RETURN(cur_log);
3000 3001
}

unknown's avatar
unknown committed
3002

3003 3004 3005 3006 3007 3008 3009
Log_event* next_event(RELAY_LOG_INFO* rli)
{
  Log_event* ev;
  IO_CACHE* cur_log = rli->cur_log;
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
  const char* errmsg=0;
  THD* thd = rli->sql_thd;
unknown's avatar
unknown committed
3010
  DBUG_ENTER("next_event");
3011 3012
  DBUG_ASSERT(thd != 0);

unknown's avatar
unknown committed
3013 3014 3015 3016 3017 3018 3019
  /*
    For most operations we need to protect rli members with data_lock,
    so we will hold it for the most of the loop below
    However, we will release it whenever it is worth the hassle, 
    and in the cases when we go into a pthread_cond_wait() with the
    non-data_lock mutex
  */
3020 3021
  pthread_mutex_lock(&rli->data_lock);
  
3022
  while (!sql_slave_killed(thd,rli))
unknown's avatar
unknown committed
3023 3024 3025
  {
    /*
      We can have two kinds of log reading:
unknown's avatar
unknown committed
3026 3027 3028 3029 3030 3031 3032 3033
      hot_log:
        rli->cur_log points at the IO_CACHE of relay_log, which
        is actively being updated by the I/O thread. We need to be careful
        in this case and make sure that we are not looking at a stale log that
        has already been rotated. If it has been, we reopen the log.

      The other case is much simpler:
        We just have a read only log that nobody else will be updating.
unknown's avatar
unknown committed
3034
    */
3035 3036 3037 3038 3039
    bool hot_log;
    if ((hot_log = (cur_log != &rli->cache_buf)))
    {
      DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
      pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
3040 3041

      /*
unknown's avatar
unknown committed
3042
	Reading xxx_file_id is safe because the log will only
unknown's avatar
unknown committed
3043 3044
	be rotated when we hold relay_log.LOCK_log
      */
unknown's avatar
unknown committed
3045
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3046
      {
unknown's avatar
unknown committed
3047 3048 3049 3050
	// The master has switched to a new log file; Reopen the old log file
	cur_log=reopen_relay_log(rli, &errmsg);
	pthread_mutex_unlock(log_lock);
	if (!cur_log)				// No more log files
3051
	  goto err;
unknown's avatar
unknown committed
3052
	hot_log=0;				// Using old binary log
3053 3054
      }
    }
unknown's avatar
unknown committed
3055
    DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3056
    DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
unknown's avatar
unknown committed
3057 3058 3059
    /*
      Relay log is always in new format - if the master is 3.23, the
      I/O thread will convert the format for us
unknown's avatar
unknown committed
3060
    */
unknown's avatar
unknown committed
3061
    if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */)))
3062 3063 3064 3065 3066
    {
      DBUG_ASSERT(thd==rli->sql_thd);
      if (hot_log)
	pthread_mutex_unlock(log_lock);
      pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
3067
      DBUG_RETURN(ev);
3068 3069
    }
    DBUG_ASSERT(thd==rli->sql_thd);
unknown's avatar
unknown committed
3070
    if (opt_reckless_slave)			// For mysql-test
unknown's avatar
unknown committed
3071
      cur_log->error = 0;
unknown's avatar
unknown committed
3072
    if (cur_log->error < 0)
unknown's avatar
unknown committed
3073 3074
    {
      errmsg = "slave SQL thread aborted because of I/O error";
unknown's avatar
unknown committed
3075 3076
      if (hot_log)
	pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
3077 3078
      goto err;
    }
3079 3080
    if (!cur_log->error) /* EOF */
    {
unknown's avatar
unknown committed
3081 3082 3083 3084 3085
      /*
	On a hot log, EOF means that there are no more updates to
	process and we must block until I/O thread adds some and
	signals us to continue
      */
3086 3087
      if (hot_log)
      {
unknown's avatar
unknown committed
3088
	DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count);
unknown's avatar
unknown committed
3089 3090 3091 3092
	/*
	  We can, and should release data_lock while we are waiting for
	  update. If we do not, show slave status will block
	*/
3093
	pthread_mutex_unlock(&rli->data_lock);
3094
 	/* Note that wait_for_update unlocks lock_log ! */
3095 3096 3097 3098 3099 3100
	rli->relay_log.wait_for_update(rli->sql_thd);
	
	// re-acquire data lock since we released it earlier
	pthread_mutex_lock(&rli->data_lock);
	continue;
      }
unknown's avatar
unknown committed
3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123
      /*
	If the log was not hot, we need to move to the next log in
	sequence. The next log could be hot or cold, we deal with both
	cases separately after doing some common initialization
      */
      end_io_cache(cur_log);
      DBUG_ASSERT(rli->cur_log_fd >= 0);
      my_close(rli->cur_log_fd, MYF(MY_WME));
      rli->cur_log_fd = -1;
	
      /*
	TODO: make skip_log_purge a start-up option. At this point this
	is not critical priority
      */
      if (!rli->skip_log_purge)
      {
	// purge_first_log will properly set up relay log coordinates in rli
	if (rli->relay_log.purge_first_log(rli))
	{
	  errmsg = "Error purging processed log";
	  goto err;
	}
      }
3124 3125
      else
      {
unknown's avatar
unknown committed
3126
	/*
3127 3128 3129 3130 3131
	  If hot_log is set, then we already have a lock on
	  LOCK_log.  If not, we have to get the lock.

	  According to Sasha, the only time this code will ever be executed
	  is if we are recovering from a bug.
unknown's avatar
unknown committed
3132
	*/
3133
	if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3134
	{
unknown's avatar
unknown committed
3135 3136
	  errmsg = "error switching to the next log";
	  goto err;
3137
	}
unknown's avatar
unknown committed
3138 3139
	rli->relay_log_pos = BIN_LOG_HEADER_SIZE;
	rli->pending=0;
3140 3141
	strmake(rli->relay_log_name,rli->linfo.log_file_name,
		sizeof(rli->relay_log_name)-1);
unknown's avatar
unknown committed
3142 3143
	flush_relay_log_info(rli);
      }
3144
	
unknown's avatar
unknown committed
3145 3146 3147
      // next log is hot 
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
      {
3148
#ifdef EXTRA_DEBUG
unknown's avatar
unknown committed
3149 3150
	sql_print_error("next log '%s' is currently active",
			rli->linfo.log_file_name);
3151
#endif	  
unknown's avatar
unknown committed
3152 3153 3154
	rli->cur_log= cur_log= rli->relay_log.get_log_file();
	rli->cur_log_old_open_count= rli->relay_log.get_open_count();
	DBUG_ASSERT(rli->cur_log_fd == -1);
3155
	  
unknown's avatar
unknown committed
3156
	/*
unknown's avatar
unknown committed
3157 3158
	  Read pointer has to be at the start since we are the only
	  reader
unknown's avatar
unknown committed
3159
	*/
unknown's avatar
unknown committed
3160
	if (check_binlog_magic(cur_log,&errmsg))
3161
	  goto err;
unknown's avatar
unknown committed
3162
	continue;
3163
      }
unknown's avatar
unknown committed
3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175
      /*
	if we get here, the log was not hot, so we will have to
	open it ourselves
      */
#ifdef EXTRA_DEBUG
      sql_print_error("next log '%s' is not active",
		      rli->linfo.log_file_name);
#endif	  
      // open_binlog() will check the magic header
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
				       &errmsg)) <0)
	goto err;
3176
    }
unknown's avatar
unknown committed
3177
    else
3178
    {
unknown's avatar
unknown committed
3179 3180 3181 3182 3183 3184
      /*
	Read failed with a non-EOF error.
	TODO: come up with something better to handle this error
      */
      if (hot_log)
	pthread_mutex_unlock(log_lock);
3185
      sql_print_error("Slave SQL thread: I/O error reading \
unknown's avatar
unknown committed
3186
event(errno: %d  cur_log->error: %d)",
3187
		      my_errno,cur_log->error);
unknown's avatar
unknown committed
3188 3189
      // set read position to the beginning of the event
      my_b_seek(cur_log,rli->relay_log_pos+rli->pending);
unknown's avatar
unknown committed
3190 3191
      /* otherwise, we have had a partial read */
      errmsg = "Aborting slave SQL thread because of partial event read";
3192
      break;					// To end of function
3193 3194
    }
  }
3195
  if (!errmsg && global_system_variables.log_warnings)
3196
    errmsg = "slave SQL thread was killed";
unknown's avatar
unknown committed
3197

3198 3199
err:
  pthread_mutex_unlock(&rli->data_lock);
3200 3201
  if (errmsg)
    sql_print_error("Error reading relay log event: %s", errmsg);
unknown's avatar
unknown committed
3202
  DBUG_RETURN(0);
3203 3204
}

unknown's avatar
unknown committed
3205

unknown's avatar
unknown committed
3206 3207
#ifdef __GNUC__
template class I_List_iterator<i_string>;
unknown's avatar
unknown committed
3208
template class I_List_iterator<i_string_pair>;
unknown's avatar
unknown committed
3209
#endif