slave.cc 165 KB
Newer Older
1
/* Copyright (C) 2000-2003 MySQL AB
2

unknown's avatar
unknown committed
3 4 5 6
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
7

unknown's avatar
unknown committed
8 9 10 11
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
12

unknown's avatar
unknown committed
13 14 15 16 17
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "mysql_priv.h"
unknown's avatar
SCRUM  
unknown committed
18 19 20

#ifdef HAVE_REPLICATION

unknown's avatar
unknown committed
21
#include <mysql.h>
22
#include <myisam.h>
23
#include "slave.h"
24
#include "sql_repl.h"
25
#include "repl_failsafe.h"
unknown's avatar
unknown committed
26
#include <thr_alarm.h>
unknown's avatar
unknown committed
27
#include <my_dir.h>
unknown's avatar
unknown committed
28
#include <sql_common.h>
unknown's avatar
SCRUM  
unknown committed
29

30
#define MAX_SLAVE_RETRY_PAUSE 5
31 32 33
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;

34 35
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);

36
volatile bool slave_sql_running = 0, slave_io_running = 0;
37
char* slave_load_tmpdir = 0;
unknown's avatar
unknown committed
38
MASTER_INFO *active_mi;
39
HASH replicate_do_table, replicate_ignore_table;
unknown's avatar
unknown committed
40
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
41
bool do_table_inited = 0, ignore_table_inited = 0;
unknown's avatar
unknown committed
42
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
43 44
bool table_rules_on= 0;
my_bool replicate_same_server_id;
45
ulonglong relay_log_space_limit = 0;
unknown's avatar
unknown committed
46 47 48 49 50 51 52

/*
  When slave thread exits, we need to remember the temporary tables so we
  can re-use them on slave start.

  TODO: move the vars below under MASTER_INFO
*/
53

54
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
55
int events_till_abort = -1;
56
static int events_till_disconnect = -1;
unknown's avatar
unknown committed
57

58
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
unknown's avatar
unknown committed
59

60
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
unknown's avatar
unknown committed
61
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
unknown's avatar
unknown committed
62
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
63 64
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
unknown's avatar
unknown committed
65
static int count_relay_log_space(RELAY_LOG_INFO* rli);
66
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
67
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
unknown's avatar
unknown committed
68 69
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings);
unknown's avatar
unknown committed
70
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
71
			     bool reconnect, bool suppress_warnings);
72 73
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg);
74
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
75
static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
76
				  const char* table_name, bool overwrite);
77
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi);
78 79

/*
unknown's avatar
unknown committed
80
  Find out which replications threads are running
81

unknown's avatar
unknown committed
82 83 84 85 86 87 88 89 90 91 92 93 94
  SYNOPSIS
    init_thread_mask()
    mask		Return value here
    mi			master_info for slave
    inverse		If set, returns which threads are not running

  IMPLEMENTATION
    Get a bit mask for which threads are running so that we can later restart
    these threads.

  RETURN
    mask	If inverse == 0, running threads
		If inverse == 1, stopped threads    
95 96
*/

97 98 99 100 101 102 103 104
void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
{
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
  register int tmp_mask=0;
  if (set_io)
    tmp_mask |= SLAVE_IO;
  if (set_sql)
    tmp_mask |= SLAVE_SQL;
105 106
  if (inverse)
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
107 108 109
  *mask = tmp_mask;
}

110

unknown's avatar
unknown committed
111
/*
112
  lock_slave_threads()
unknown's avatar
unknown committed
113
*/
114

115 116 117 118 119 120 121
void lock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_lock(&mi->run_lock);
  pthread_mutex_lock(&mi->rli.run_lock);
}

122

unknown's avatar
unknown committed
123
/*
124
  unlock_slave_threads()
unknown's avatar
unknown committed
125
*/
126

127 128 129 130 131 132 133
void unlock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_unlock(&mi->rli.run_lock);
  pthread_mutex_unlock(&mi->run_lock);
}

134

unknown's avatar
unknown committed
135
/* Initialize slave structures */
136

137 138
int init_slave()
{
139
  DBUG_ENTER("init_slave");
140

141 142 143 144 145 146
  /*
    This is called when mysqld starts. Before client connections are
    accepted. However bootstrap may conflict with us if it does START SLAVE.
    So it's safer to take the lock.
  */
  pthread_mutex_lock(&LOCK_active_mi);
147 148 149 150
  /*
    TODO: re-write this to interate through the list of files
    for multi-master
  */
unknown's avatar
unknown committed
151
  active_mi= new MASTER_INFO;
152 153

  /*
154 155 156
    If master_host is not specified, try to read it from the master_info file.
    If master_host is specified, create the master_info file if it doesn't
    exists.
157
  */
158 159 160 161 162
  if (!active_mi)
  {
    sql_print_error("Failed to allocate memory for the master info structure");
    goto err;
  }
163

unknown's avatar
unknown committed
164
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
165
		       !master_host, (SLAVE_IO | SLAVE_SQL)))
166
  {
167
    sql_print_error("Failed to initialize the master info structure");
unknown's avatar
unknown committed
168
    goto err;
169
  }
170 171 172 173

  if (server_id && !master_host && active_mi->host[0])
    master_host= active_mi->host;

174 175
  /* If server id is not set, start_slave_thread() will say it */

176
  if (master_host && !opt_skip_slave_start)
177
  {
178 179 180 181 182 183
    if (start_slave_threads(1 /* need mutex */,
			    0 /* no wait for start*/,
			    active_mi,
			    master_info_file,
			    relay_log_info_file,
			    SLAVE_IO | SLAVE_SQL))
unknown's avatar
unknown committed
184
    {
185
      sql_print_error("Failed to create slave threads");
unknown's avatar
unknown committed
186 187
      goto err;
    }
188
  }
189
  pthread_mutex_unlock(&LOCK_active_mi);
190
  DBUG_RETURN(0);
191

unknown's avatar
unknown committed
192
err:
193
  pthread_mutex_unlock(&LOCK_active_mi);
unknown's avatar
unknown committed
194
  DBUG_RETURN(1);
195
}
196

197

198 199
static void free_table_ent(TABLE_RULE_ENT* e)
{
unknown's avatar
unknown committed
200
  my_free((gptr) e, MYF(0));
201 202
}

203

204 205 206 207 208 209 210
static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
			   my_bool not_used __attribute__((unused)))
{
  *len = e->key_len;
  return (byte*)e->db;
}

211 212 213 214 215 216 217 218 219 220 221

/*
  Open the given relay log

  SYNOPSIS
    init_relay_log_pos()
    rli			Relay information (will be initialized)
    log			Name of relay log file to read from. NULL = First log
    pos			Position in relay log file 
    need_data_lock	Set to 1 if this functions should do mutex locks
    errmsg		Store pointer to error message here
222 223 224
    look_for_description_event 
                        1 if we should look for such an event. We only need
                        this when the SQL thread starts and opens an existing
225 226 227 228
                        relay log and has to execute it (possibly from an
                        offset >4); then we need to read the first event of
                        the relay log to be able to parse the events we have
                        to execute.
229 230 231 232 233 234 235 236

  DESCRIPTION
  - Close old open relay log files.
  - If we are using the same relay log as the running IO-thread, then set
    rli->cur_log to point to the same IO_CACHE entry.
  - If not, open the 'log' binary file.

  TODO
237
    - check proper initialization of group_master_log_name/group_master_log_pos
238 239 240 241 242

  RETURN VALUES
    0	ok
    1	error.  errmsg is set to point to the error message
*/
unknown's avatar
unknown committed
243

244 245
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
		       ulonglong pos, bool need_data_lock,
246 247
		       const char** errmsg,
                       bool look_for_description_event)
248
{
unknown's avatar
unknown committed
249
  DBUG_ENTER("init_relay_log_pos");
250
  DBUG_PRINT("info", ("pos=%lu", pos));
unknown's avatar
unknown committed
251

252
  *errmsg=0;
253
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
254
  
255 256
  if (need_data_lock)
    pthread_mutex_lock(&rli->data_lock);
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274

  /*
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
    is, too, and init_slave() too; these 2 functions allocate a description
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
    thread as that thread is not started by these functions. So we have to free
    the description_event here, in case, so that there is no memory leak in
    running, say, CHANGE MASTER.
  */
  delete rli->relay_log.description_event_for_exec;
  /*
    By default the relay log is in binlog format 3 (4.0).
    Even if format is 4, this will work enough to read the first event
    (Format_desc) (remember that format 4 is just lenghtened compared to format
    3; format 3 is a prefix of format 4). 
  */
  rli->relay_log.description_event_for_exec= new
    Format_description_log_event(3);
275
  
276 277
  pthread_mutex_lock(log_lock);
  
278
  /* Close log file and free buffers if it's already open */
279 280 281 282 283 284 285
  if (rli->cur_log_fd >= 0)
  {
    end_io_cache(&rli->cache_buf);
    my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
  
286
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
unknown's avatar
unknown committed
287

unknown's avatar
unknown committed
288 289 290 291
  /*
    Test to see if the previous run was with the skip of purging
    If yes, we do not purge when we restart
  */
292
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
293 294 295 296
  {
    *errmsg="Could not find first log during relay log initialization";
    goto err;
  }
297

298
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
unknown's avatar
unknown committed
299
  {
300 301
    *errmsg="Could not find target log during relay log initialization";
    goto err;
unknown's avatar
unknown committed
302
  }
303 304 305 306
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
	  sizeof(rli->group_relay_log_name)-1);
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
	  sizeof(rli->event_relay_log_name)-1);
307 308
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
  {
309 310 311 312 313
    /*
      The IO thread is using this log file.
      In this case, we will use the same IO_CACHE pointer to
      read data as the IO thread is using to write data.
    */
314 315
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
    if (check_binlog_magic(rli->cur_log,errmsg))
316
      goto err;
unknown's avatar
unknown committed
317
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
318 319 320
  }
  else
  {
321 322 323
    /*
      Open the relay log and set rli->cur_log to point at this one
    */
324 325 326 327 328
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
				     rli->linfo.log_file_name,errmsg)) < 0)
      goto err;
    rli->cur_log = &rli->cache_buf;
  }
329 330 331 332 333 334 335 336 337 338
  /*
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
    sure.
  */
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
  {
    Log_event* ev;
    while (look_for_description_event) 
    {
      /*
339 340
        Read the possible Format_description_log_event; if position
        was 4, no need, it will be read naturally.
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
      */
      DBUG_PRINT("info",("looking for a Format_description_log_event"));

      if (my_b_tell(rli->cur_log) >= pos)
        break;

      /*
        Because of we have rli->data_lock and log_lock, we can safely read an
        event
      */
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
                                         rli->relay_log.description_event_for_exec)))
      {
        DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
                           rli->cur_log->error));
        if (rli->cur_log->error) /* not EOF */
        {
          *errmsg= "I/O error reading event at position 4";
          goto err;
        }
        break;
      }
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
      {
        DBUG_PRINT("info",("found Format_description_log_event"));
        delete rli->relay_log.description_event_for_exec;
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
        /*
          As ev was returned by read_log_event, it has passed is_valid(), so
          my_malloc() in ctor worked, no need to check again.
        */
        /*
          Ok, we found a Format_description event. But it is not sure that this
          describes the whole relay log; indeed, one can have this sequence
          (starting from position 4):
          Format_desc (of slave)
          Rotate (of master)
378
          Format_desc (of master)
379 380 381
          So the Format_desc which really describes the rest of the relay log
          is the 3rd event (it can't be further than that, because we rotate
          the relay log when we queue a Rotate event from the master).
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
          But what describes the Rotate is the first Format_desc.
          So what we do is:
          go on searching for Format_description events, until you exceed the
          position (argument 'pos') or until you find another event than Rotate
          or Format_desc.
        */
      }
      else 
      {
        DBUG_PRINT("info",("found event of another type=%d",
                           ev->get_type_code()));
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
        delete ev;
      }
    }
unknown's avatar
unknown committed
397
    my_b_seek(rli->cur_log,(off_t)pos);
398 399 400 401 402 403 404 405 406 407
#ifndef DBUG_OFF
  {
    char llbuf1[22], llbuf2[22];
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
                        llstr(my_b_tell(rli->cur_log),llbuf1), 
                        llstr(rli->event_relay_log_pos,llbuf2)));
  }
#endif

  }
unknown's avatar
unknown committed
408

409
err:
410 411 412 413
  /*
    If we don't purge, we can't honour relay_log_space_limit ;
    silently discard it
  */
414
  if (!relay_log_purge)
415
    rli->log_space_limit= 0;
unknown's avatar
unknown committed
416
  pthread_cond_broadcast(&rli->data_cond);
417 418 419
  
  pthread_mutex_unlock(log_lock);

unknown's avatar
unknown committed
420 421
  if (need_data_lock)
    pthread_mutex_unlock(&rli->data_lock);
422 423
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
    *errmsg= "Invalid Format_description log event; could be out of memory";
424

unknown's avatar
unknown committed
425
  DBUG_RETURN ((*errmsg) ? 1 : 0);
426 427
}

428

unknown's avatar
unknown committed
429
/*
430
  Init function to set up array for errors that should be skipped for slave
unknown's avatar
unknown committed
431

unknown's avatar
unknown committed
432 433 434 435 436 437 438
  SYNOPSIS
    init_slave_skip_errors()
    arg		List of errors numbers to skip, separated with ','

  NOTES
    Called from get_options() in mysqld.cc on start-up
*/
439

unknown's avatar
unknown committed
440
void init_slave_skip_errors(const char* arg)
441
{
unknown's avatar
unknown committed
442
  const char *p;
443
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
444 445 446 447 448
  {
    fprintf(stderr, "Badly out of memory, please check your system status\n");
    exit(1);
  }
  use_slave_mask = 1;
449
  for (;my_isspace(system_charset_info,*arg);++arg)
450
    /* empty */;
451
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
452 453 454 455 456 457 458 459 460 461 462
  {
    bitmap_set_all(&slave_error_mask);
    return;
  }
  for (p= arg ; *p; )
  {
    long err_code;
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
      break;
    if (err_code < MAX_SLAVE_ERROR)
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
463
    while (!my_isdigit(system_charset_info,*p) && *p)
464 465 466 467
      p++;
  }
}

468

unknown's avatar
unknown committed
469
void st_relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
470
						bool skip_lock)  
471 472 473
{
  if (!skip_lock)
    pthread_mutex_lock(&data_lock);
unknown's avatar
unknown committed
474
  inc_event_relay_log_pos();
unknown's avatar
unknown committed
475 476
  group_relay_log_pos= event_relay_log_pos;
  strmake(group_relay_log_name,event_relay_log_name,
unknown's avatar
unknown committed
477
	  sizeof(group_relay_log_name)-1);
unknown's avatar
unknown committed
478 479 480 481 482 483 484 485 486 487 488 489 490

  notify_group_relay_log_name_update();
        
  /*
    If the slave does not support transactions and replicates a transaction,
    users should not trust group_master_log_pos (which they can display with
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
    group_master_log_pos the slave relies on log_pos stored in the master's
    binlog, but if we are in a master's transaction these positions are always
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
    not advance as it should on the non-transactional slave (it advances by
    big leaps, whereas it should advance by small leaps).
  */
unknown's avatar
unknown committed
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
  /*
    In 4.x we used the event's len to compute the positions here. This is
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
    then the event's len is not what is was in the master's binlog, so this
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
    have the original offset of the end of the event the relay log. This is
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
    end_log_pos instead of begin_log_pos.
    If we had not done this fix here, the problem would also have appeared
    when the slave and master are 5.0 but with different event length (for
    example the slave is more recent than the master and features the event
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
    value which would lead to badly broken replication.
    Even the relay_log_pos will be corrupted in this case, because the len is
    the relay log is not "val".
    With the end_log_pos solution, we avoid computations involving lengthes.
  */
511 512
  DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
		      (long) log_pos, (long) group_master_log_pos));
unknown's avatar
unknown committed
513 514
  if (log_pos) // 3.23 binlogs don't have log_posx
  {
unknown's avatar
unknown committed
515
    group_master_log_pos= log_pos;
unknown's avatar
unknown committed
516
  }
517 518 519 520 521 522
  pthread_cond_broadcast(&data_cond);
  if (!skip_lock)
    pthread_mutex_unlock(&data_lock);
}


unknown's avatar
unknown committed
523 524 525 526 527 528 529 530 531 532 533 534 535
void st_relay_log_info::close_temporary_tables()
{
  TABLE *table,*next;

  for (table=save_temporary_tables ; table ; table=next)
  {
    next=table->next;
    /*
      Don't ask for disk deletion. For now, anyway they will be deleted when
      slave restarts, but it is a better intention to not delete them.
    */
    close_temporary(table, 0);
  }
536 537
  save_temporary_tables= 0;
  slave_open_temp_tables= 0;
unknown's avatar
unknown committed
538
}
unknown's avatar
unknown committed
539

unknown's avatar
unknown committed
540
/*
541 542
  purge_relay_logs()

unknown's avatar
unknown committed
543 544
  NOTES
    Assumes to have a run lock on rli and that no slave thread are running.
unknown's avatar
unknown committed
545 546
*/

547 548
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
		     const char** errmsg)
549
{
550
  int error=0;
unknown's avatar
unknown committed
551
  DBUG_ENTER("purge_relay_logs");
552 553 554 555

  /*
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
    Indeed, rli->inited==0 does not imply that they already are empty.
556
    It could be that slave's info initialization partly succeeded :
557
    for example if relay-log.info existed but *relay-bin*.*
558
    have been manually removed, init_relay_log_info reads the old
559 560 561 562 563 564 565 566
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
    checks for the existence of the relay log, this fails and
    init_relay_log_info leaves rli->inited to 0.
    In that pathological case, rli->master_log_pos* will be properly reinited
    at the next START SLAVE (as RESET SLAVE or CHANGE
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
    or replace them with correct files), however if the user does SHOW SLAVE
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
567
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
568 569 570
    to display fine in any case.
  */

571 572
  rli->group_master_log_name[0]= 0;
  rli->group_master_log_pos= 0;
573

574
  if (!rli->inited)
575 576
  {
    DBUG_PRINT("info", ("rli->inited == 0"));
577
    DBUG_RETURN(0);
578
  }
unknown's avatar
unknown committed
579

580 581
  DBUG_ASSERT(rli->slave_running == 0);
  DBUG_ASSERT(rli->mi->slave_running == 0);
unknown's avatar
unknown committed
582

583 584
  rli->slave_skip_counter=0;
  pthread_mutex_lock(&rli->data_lock);
585
  if (rli->relay_log.reset_logs(thd))
586 587 588 589 590
  {
    *errmsg = "Failed during log reset";
    error=1;
    goto err;
  }
591
  /* Save name of used relay log file */
592 593 594 595 596
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
	  sizeof(rli->group_relay_log_name)-1);
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
 	  sizeof(rli->event_relay_log_name)-1);
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
597 598 599 600 601
  if (count_relay_log_space(rli))
  {
    *errmsg= "Error counting relay log space";
    goto err;
  }
602
  if (!just_reset)
603 604
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
                              rli->group_relay_log_pos,
605
  			      0 /* do not need data lock */, errmsg, 0);
606
  
unknown's avatar
unknown committed
607 608 609 610
err:
#ifndef DBUG_OFF
  char buf[22];
#endif  
unknown's avatar
unknown committed
611
  DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
612
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
613
  DBUG_RETURN(error);
614 615
}

616

617 618 619 620 621 622 623
int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
{
  if (!mi->inited)
    return 0; /* successfully do nothing */
  int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
  pthread_mutex_t *sql_cond_lock,*io_cond_lock;
624
  DBUG_ENTER("terminate_slave_threads");
625 626 627 628 629 630 631 632 633 634

  sql_cond_lock=sql_lock;
  io_cond_lock=io_lock;
  
  if (skip_lock)
  {
    sql_lock = io_lock = 0;
  }
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
  {
unknown's avatar
unknown committed
635
    DBUG_PRINT("info",("Terminating IO thread"));
636 637
    mi->abort_slave=1;
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
unknown's avatar
unknown committed
638 639 640
				      io_cond_lock,
				      &mi->stop_cond,
				      &mi->slave_running)) &&
641
	!force_all)
642
      DBUG_RETURN(error);
643 644 645
  }
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
  {
unknown's avatar
unknown committed
646
    DBUG_PRINT("info",("Terminating SQL thread"));
647 648 649 650 651 652 653
    DBUG_ASSERT(mi->rli.sql_thd != 0) ;
    mi->rli.abort_slave=1;
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
				      sql_cond_lock,
				      &mi->rli.stop_cond,
				      &mi->rli.slave_running)) &&
	!force_all)
654
      DBUG_RETURN(error);
655
  }
656
  DBUG_RETURN(0);
657 658
}

659

660 661 662
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
			   pthread_mutex_t *cond_lock,
			   pthread_cond_t* term_cond,
663
			   volatile uint *slave_running)
664
{
665
  DBUG_ENTER("terminate_slave_thread");
666 667 668 669 670 671
  if (term_lock)
  {
    pthread_mutex_lock(term_lock);
    if (!*slave_running)
    {
      pthread_mutex_unlock(term_lock);
672
      DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
673 674 675
    }
  }
  DBUG_ASSERT(thd != 0);
676
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
677
  /*
678
    Is is critical to test if the slave is running. Otherwise, we might
unknown's avatar
unknown committed
679
    be referening freed memory trying to kick it
680
  */
unknown's avatar
unknown committed
681 682

  while (*slave_running)			// Should always be true
683
  {
684
    DBUG_PRINT("loop", ("killing slave thread"));
685
    KICK_SLAVE(thd);
unknown's avatar
unknown committed
686 687 688
    /*
      There is a small chance that slave thread might miss the first
      alarm. To protect againts it, resend the signal until it reacts
689 690
    */
    struct timespec abstime;
unknown's avatar
unknown committed
691
    set_timespec(abstime,2);
692 693 694 695
    pthread_cond_timedwait(term_cond, cond_lock, &abstime);
  }
  if (term_lock)
    pthread_mutex_unlock(term_lock);
696
  DBUG_RETURN(0);
697 698
}

699

unknown's avatar
unknown committed
700
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
701
		       pthread_mutex_t *cond_lock,
unknown's avatar
unknown committed
702
		       pthread_cond_t *start_cond,
703
		       volatile uint *slave_running,
unknown's avatar
unknown committed
704
		       volatile ulong *slave_run_id,
705 706
		       MASTER_INFO* mi,
                       bool high_priority)
707 708
{
  pthread_t th;
unknown's avatar
unknown committed
709
  ulong start_id;
710
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
711 712
  DBUG_ENTER("start_slave_thread");

713 714 715 716 717 718 719 720 721
  if (start_lock)
    pthread_mutex_lock(start_lock);
  if (!server_id)
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
    sql_print_error("Server id not set, will not start slave");
unknown's avatar
unknown committed
722
    DBUG_RETURN(ER_BAD_SLAVE);
723 724 725
  }
  
  if (*slave_running)
726 727 728 729 730
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
731
    DBUG_RETURN(ER_SLAVE_MUST_STOP);
732
  }
unknown's avatar
unknown committed
733 734
  start_id= *slave_run_id;
  DBUG_PRINT("info",("Creating new slave thread"));
735 736
  if (high_priority)
    my_pthread_attr_setprio(&connection_attrib,CONNECT_PRIOR);
737 738 739 740
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
  {
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
741
    DBUG_RETURN(ER_SLAVE_THREAD);
742
  }
unknown's avatar
unknown committed
743
  if (start_cond && cond_lock) // caller has cond_lock
744 745
  {
    THD* thd = current_thd;
unknown's avatar
unknown committed
746
    while (start_id == *slave_run_id)
747
    {
unknown's avatar
unknown committed
748
      DBUG_PRINT("sleep",("Waiting for slave thread to start"));
749
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
750
					    "Waiting for slave thread to start");
751 752
      pthread_cond_wait(start_cond,cond_lock);
      thd->exit_cond(old_msg);
unknown's avatar
unknown committed
753
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
754
      if (thd->killed)
unknown's avatar
SCRUM  
unknown committed
755
	DBUG_RETURN(thd->killed_errno());
756 757 758 759
    }
  }
  if (start_lock)
    pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
760
  DBUG_RETURN(0);
761
}
unknown's avatar
unknown committed
762

763

unknown's avatar
unknown committed
764
/*
765
  start_slave_threads()
unknown's avatar
unknown committed
766

unknown's avatar
unknown committed
767 768 769 770
  NOTES
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
    sense to do that for starting a slave--we always care if it actually
    started the threads that were not previously running
771
*/
unknown's avatar
unknown committed
772

773 774 775 776 777 778 779
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
			MASTER_INFO* mi, const char* master_info_fname,
			const char* slave_info_fname, int thread_mask)
{
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
  pthread_cond_t* cond_io=0,*cond_sql=0;
  int error=0;
780
  DBUG_ENTER("start_slave_threads");
781 782 783 784 785 786 787 788 789 790 791 792 793
  
  if (need_slave_mutex)
  {
    lock_io = &mi->run_lock;
    lock_sql = &mi->rli.run_lock;
  }
  if (wait_for_start)
  {
    cond_io = &mi->start_cond;
    cond_sql = &mi->rli.start_cond;
    lock_cond_io = &mi->run_lock;
    lock_cond_sql = &mi->rli.run_lock;
  }
794 795 796

  if (thread_mask & SLAVE_IO)
    error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
unknown's avatar
unknown committed
797 798
			     cond_io,
			     &mi->slave_running, &mi->slave_run_id,
799
			     mi, 1); //high priority, to read the most possible
800
  if (!error && (thread_mask & SLAVE_SQL))
801
  {
802 803
    error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
			     cond_sql,
unknown's avatar
unknown committed
804
			     &mi->rli.slave_running, &mi->rli.slave_run_id,
805
			     mi, 0);
806 807 808
    if (error)
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
  }
809
  DBUG_RETURN(error);
810
}
811

812

813 814
void init_table_rule_hash(HASH* h, bool* h_inited)
{
unknown's avatar
unknown committed
815
  hash_init(h, system_charset_info,TABLE_RULE_HASH_SIZE,0,0,
816
	    (hash_get_key) get_table_key,
817
	    (hash_free_key) free_table_ent, 0);
818 819
  *h_inited = 1;
}
unknown's avatar
unknown committed
820

821

unknown's avatar
unknown committed
822 823
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
{
824
  my_init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
unknown's avatar
unknown committed
825 826 827 828
		     TABLE_RULE_ARR_SIZE);
  *a_inited = 1;
}

829

unknown's avatar
unknown committed
830 831 832 833 834
static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
  uint i;
  const char* key_end = key + len;
  
835
  for (i = 0; i < a->elements; i++)
unknown's avatar
unknown committed
836 837 838
    {
      TABLE_RULE_ENT* e ;
      get_dynamic(a, (gptr)&e, i);
unknown's avatar
unknown committed
839
      if (!my_wildcmp(system_charset_info, key, key_end, 
840
                            (const char*)e->db,
unknown's avatar
unknown committed
841 842
			    (const char*)(e->db + e->key_len),
			    '\\',wild_one,wild_many))
unknown's avatar
unknown committed
843 844 845 846 847 848
	return e;
    }
  
  return 0;
}

849

850 851 852 853 854 855
/*
  Checks whether tables match some (wild_)do_table and (wild_)ignore_table
  rules (for replication)

  SYNOPSIS
    tables_ok()
856
    thd             thread (SQL slave thread normally). Mustn't be null.
857 858 859 860 861 862
    tables          list of tables to check

  NOTES
    Note that changing the order of the tables in the list can lead to
    different results. Note also the order of precedence of the do/ignore 
    rules (see code below). For that reason, users should not set conflicting 
863 864
    rules because they may get unpredicted results (precedence order is
    explained in the manual).
865

866 867
    Thought which arose from a question of a big customer "I want to include
    all tables like "abc.%" except the "%.EFG"". This can't be done now. If we
868 869
    supported Perl regexps we could do it with this pattern: /^abc\.(?!EFG)/
    (I could not find an equivalent in the regex library MySQL uses).
870 871 872 873 874

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated                  
*/
875

876
bool tables_ok(THD* thd, TABLE_LIST* tables)
877
{
878
  bool some_tables_updating= 0;
879 880
  DBUG_ENTER("tables_ok");

881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897
  /*
    In routine, can't reliably pick and choose substatements, so always
    replicate.
    We can't reliably know if one substatement should be executed or not:
    consider the case of this substatement: a SELECT on a non-replicated
    constant table; if we don't execute it maybe it was going to fill a
    variable which was going to be used by the next substatement to update
    a replicated table? If we execute it maybe the constant non-replicated
    table does not exist (and so we'll fail) while there was no need to
    execute this as this SELECT does not influence replicated tables in the
    rest of the routine? In other words: users are used to replicate-*-table
    specifying how to handle updates to tables, these options don't say
    anything about reads to tables; we can't guess.
  */
  if (thd->spcont)
    DBUG_RETURN(1);

unknown's avatar
VIEW  
unknown committed
898
  for (; tables; tables= tables->next_global)
unknown's avatar
unknown committed
899
  {
900 901 902 903
    char hash_key[2*NAME_LEN+2];
    char *end;
    uint len;

unknown's avatar
unknown committed
904 905
    if (!tables->updating) 
      continue;
906
    some_tables_updating= 1;
907 908
    end= strmov(hash_key, tables->db ? tables->db : thd->db);
    *end++= '.';
909
    len= (uint) (strmov(end, tables->table_name) - hash_key);
unknown's avatar
unknown committed
910
    if (do_table_inited) // if there are any do's
911
    {
unknown's avatar
unknown committed
912
      if (hash_search(&replicate_do_table, (byte*) hash_key, len))
913
	DBUG_RETURN(1);
unknown's avatar
unknown committed
914
    }
915
    if (ignore_table_inited) // if there are any ignores
unknown's avatar
unknown committed
916 917
    {
      if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
918
	DBUG_RETURN(0); 
919
    }
unknown's avatar
unknown committed
920 921
    if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
					  hash_key, len))
922
      DBUG_RETURN(1);
unknown's avatar
unknown committed
923 924
    if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
					      hash_key, len))
925
      DBUG_RETURN(0);
unknown's avatar
unknown committed
926
  }
927

unknown's avatar
unknown committed
928
  /*
929 930
    If no table was to be updated, ignore statement (no reason we play it on
    slave, slave is supposed to replicate _changes_ only).
unknown's avatar
unknown committed
931 932 933
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
934 935
  DBUG_RETURN(some_tables_updating &&
              !do_table_inited && !wild_do_table_inited);
936 937
}

938

939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988
/*
  Checks whether a db matches wild_do_table and wild_ignore_table
  rules (for replication)

  SYNOPSIS
    db_ok_with_wild_table()
    db		name of the db to check.
		Is tested with check_db_name() before calling this function.

  NOTES
    Here is the reason for this function.
    We advise users who want to exclude a database 'db1' safely to do it
    with replicate_wild_ignore_table='db1.%' instead of binlog_ignore_db or
    replicate_ignore_db because the two lasts only check for the selected db,
    which won't work in that case:
    USE db2;
    UPDATE db1.t SET ... #this will be replicated and should not
    whereas replicate_wild_ignore_table will work in all cases.
    With replicate_wild_ignore_table, we only check tables. When
    one does 'DROP DATABASE db1', tables are not involved and the
    statement will be replicated, while users could expect it would not (as it
    rougly means 'DROP db1.first_table, DROP db1.second_table...').
    In other words, we want to interpret 'db1.%' as "everything touching db1".
    That is why we want to match 'db1' against 'db1.%' wild table rules.

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated
 */

int db_ok_with_wild_table(const char *db)
{
  char hash_key[NAME_LEN+2];
  char *end;
  int len;
  end= strmov(hash_key, db);
  *end++= '.';
  len= end - hash_key ;
  if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
                                        hash_key, len))
    return 1;
  if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
                                            hash_key, len))
    return 0;
  
  /*
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
  return !wild_do_table_inited;
989 990 991 992 993
}


int add_table_rule(HASH* h, const char* table_spec)
{
unknown's avatar
unknown committed
994
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
995
  if (!dot) return 1;
unknown's avatar
unknown committed
996
  // len is always > 0 because we know the there exists a '.'
997 998 999
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
unknown's avatar
unknown committed
1000
  if (!e) return 1;
1001 1002 1003 1004
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
unknown's avatar
SCRUM  
unknown committed
1005
  (void)my_hash_insert(h, (byte*)e);
1006 1007 1008
  return 0;
}

1009

unknown's avatar
unknown committed
1010 1011 1012
/*
  Add table expression with wildcards to dynamic array
*/
1013

unknown's avatar
unknown committed
1014 1015
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
unknown's avatar
unknown committed
1016
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
1017
  if (!dot) return 1;
unknown's avatar
unknown committed
1018 1019 1020
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
unknown's avatar
unknown committed
1021
  if (!e) return 1;
unknown's avatar
unknown committed
1022 1023 1024 1025 1026 1027 1028 1029
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  insert_dynamic(a, (gptr)&e);
  return 0;
}

1030

1031 1032 1033
static void free_string_array(DYNAMIC_ARRAY *a)
{
  uint i;
1034
  for (i = 0; i < a->elements; i++)
1035 1036
    {
      char* p;
unknown's avatar
unknown committed
1037
      get_dynamic(a, (gptr) &p, i);
1038 1039 1040 1041 1042
      my_free(p, MYF(MY_WME));
    }
  delete_dynamic(a);
}

1043

1044
#ifdef NOT_USED_YET
1045 1046 1047 1048 1049
static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
{
  end_master_info(mi);
  return 0;
}
1050
#endif
1051

1052

unknown's avatar
unknown committed
1053 1054 1055 1056 1057 1058
/*
  Free all resources used by slave

  SYNOPSIS
    end_slave()
*/
1059

1060 1061
void end_slave()
{
1062 1063 1064 1065 1066 1067 1068 1069
  /*
    This is called when the server terminates, in close_connections().
    It terminates slave threads. However, some CHANGE MASTER etc may still be
    running presently. If a START SLAVE was in progress, the mutex lock below
    will make us wait until slave threads have started, and START SLAVE
    returns, then we terminate them here.
  */
  pthread_mutex_lock(&LOCK_active_mi);
unknown's avatar
unknown committed
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
  if (active_mi)
  {
    /*
      TODO: replace the line below with
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
      once multi-master code is ready.
    */
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
    end_master_info(active_mi);
    if (do_table_inited)
      hash_free(&replicate_do_table);
    if (ignore_table_inited)
      hash_free(&replicate_ignore_table);
    if (wild_do_table_inited)
      free_string_array(&replicate_wild_do_table);
    if (wild_ignore_table_inited)
      free_string_array(&replicate_wild_ignore_table);
    delete active_mi;
    active_mi= 0;
  }
1090
  pthread_mutex_unlock(&LOCK_active_mi);
1091
}
unknown's avatar
unknown committed
1092

1093

1094
static bool io_slave_killed(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
1095
{
1096
  DBUG_ASSERT(mi->io_thd == thd);
1097
  DBUG_ASSERT(mi->slave_running); // tracking buffer overrun
1098
  return mi->abort_slave || abort_loop || thd->killed;
unknown's avatar
unknown committed
1099 1100
}

1101

1102
static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
1103 1104 1105 1106 1107 1108
{
  DBUG_ASSERT(rli->sql_thd == thd);
  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
  return rli->abort_slave || abort_loop || thd->killed;
}

1109

1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
/*
  Writes an error message to rli->last_slave_error and rli->last_slave_errno
  (which will be displayed by SHOW SLAVE STATUS), and prints it to stderr.

  SYNOPSIS
    slave_print_error()
    rli		
    err_code    The error code
    msg         The error message (usually related to the error code, but can
                contain more information).
    ...         (this is printf-like format, with % symbols in msg)

  RETURN VALUES
    void
1124
*/
1125

1126
void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
1127 1128 1129
{
  va_list args;
  va_start(args,msg);
1130 1131 1132
  my_vsnprintf(rli->last_slave_error,
	       sizeof(rli->last_slave_error), msg, args);
  rli->last_slave_errno = err_code;
1133 1134
  /* If the error string ends with '.', do not add a ',' it would be ugly */
  if (rli->last_slave_error[0] && 
1135 1136
      (*(strend(rli->last_slave_error)-1) == '.'))
    sql_print_error("Slave: %s Error_code: %d", rli->last_slave_error,
1137 1138
                    err_code);
  else
1139
    sql_print_error("Slave: %s, Error_code: %d", rli->last_slave_error,
1140 1141
                    err_code);

1142 1143
}

unknown's avatar
unknown committed
1144
/*
1145 1146
  skip_load_data_infile()

unknown's avatar
unknown committed
1147 1148 1149
  NOTES
    This is used to tell a 3.23 master to break send_file()
*/
1150 1151 1152 1153 1154 1155 1156 1157

void skip_load_data_infile(NET *net)
{
  (void)net_request_file(net, "/dev/null");
  (void)my_net_read(net);				// discard response
  (void)net_write_command(net, 0, "", 0, "", 0);	// Send ok
}

1158

1159
bool net_request_file(NET* net, const char* fname)
1160
{
1161 1162
  DBUG_ENTER("net_request_file");
  DBUG_RETURN(net_write_command(net, 251, fname, strlen(fname), "", 0));
1163 1164
}

1165

1166
const char *rewrite_db(const char* db, uint32 *new_len)
unknown's avatar
unknown committed
1167
{
unknown's avatar
unknown committed
1168 1169
  if (replicate_rewrite_db.is_empty() || !db)
    return db;
unknown's avatar
unknown committed
1170 1171 1172
  I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  i_string_pair* tmp;

unknown's avatar
unknown committed
1173 1174 1175
  while ((tmp=it++))
  {
    if (!strcmp(tmp->key, db))
1176
    {
1177
      *new_len= (uint32)strlen(tmp->val);
unknown's avatar
unknown committed
1178
      return tmp->val;
1179
    }
unknown's avatar
unknown committed
1180
  }
unknown's avatar
unknown committed
1181 1182
  return db;
}
1183

1184 1185
/*
  From other comments and tests in code, it looks like
1186
  sometimes Query_log_event and Load_log_event can have db == 0
1187 1188 1189
  (see rewrite_db() above for example)
  (cases where this happens are unclear; it may be when the master is 3.23).
*/
1190 1191

const char *print_slave_db_safe(const char* db)
1192
{
1193
  return (db ? db : "");
1194
}
1195

1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209
/*
  Checks whether a db matches some do_db and ignore_db rules
  (for logging or replication)

  SYNOPSIS
    db_ok()
    db              name of the db to check
    do_list         either binlog_do_db or replicate_do_db
    ignore_list     either binlog_ignore_db or replicate_ignore_db

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated                  
*/
unknown's avatar
unknown committed
1210

unknown's avatar
unknown committed
1211 1212 1213
int db_ok(const char* db, I_List<i_string> &do_list,
	  I_List<i_string> &ignore_list )
{
unknown's avatar
unknown committed
1214
  if (do_list.is_empty() && ignore_list.is_empty())
unknown's avatar
unknown committed
1215 1216
    return 1; // ok to replicate if the user puts no constraints

unknown's avatar
unknown committed
1217 1218 1219 1220 1221
  /*
    If the user has specified restrictions on which databases to replicate
    and db was not selected, do not replicate.
  */
  if (!db)
unknown's avatar
unknown committed
1222
    return 0;
unknown's avatar
unknown committed
1223

unknown's avatar
unknown committed
1224 1225 1226 1227
  if (!do_list.is_empty()) // if the do's are not empty
  {
    I_List_iterator<i_string> it(do_list);
    i_string* tmp;
unknown's avatar
unknown committed
1228

unknown's avatar
unknown committed
1229 1230 1231 1232
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 1; // match
unknown's avatar
unknown committed
1233
    }
unknown's avatar
unknown committed
1234 1235
    return 0;
  }
unknown's avatar
unknown committed
1236
  else // there are some elements in the don't, otherwise we cannot get here
unknown's avatar
unknown committed
1237 1238 1239
  {
    I_List_iterator<i_string> it(ignore_list);
    i_string* tmp;
unknown's avatar
unknown committed
1240

unknown's avatar
unknown committed
1241 1242 1243 1244
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 0; // match
unknown's avatar
unknown committed
1245
    }
unknown's avatar
unknown committed
1246 1247
    return 1;
  }
unknown's avatar
unknown committed
1248 1249
}

1250

unknown's avatar
unknown committed
1251 1252
static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
				 const char *default_val)
unknown's avatar
unknown committed
1253
{
unknown's avatar
unknown committed
1254 1255 1256 1257 1258 1259 1260
  uint length;
  if ((length=my_b_gets(f,var, max_size)))
  {
    char* last_p = var + length -1;
    if (*last_p == '\n')
      *last_p = 0; // if we stopped on newline, kill it
    else
unknown's avatar
unknown committed
1261
    {
unknown's avatar
unknown committed
1262 1263 1264 1265
      /*
	If we truncated a line or stopped on last char, remove all chars
	up to and including newline.
      */
unknown's avatar
unknown committed
1266
      int c;
unknown's avatar
unknown committed
1267
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
unknown's avatar
unknown committed
1268
    }
unknown's avatar
unknown committed
1269 1270 1271 1272
    return 0;
  }
  else if (default_val)
  {
unknown's avatar
unknown committed
1273
    strmake(var,  default_val, max_size-1);
unknown's avatar
unknown committed
1274 1275
    return 0;
  }
unknown's avatar
unknown committed
1276
  return 1;
unknown's avatar
unknown committed
1277 1278
}

1279

unknown's avatar
unknown committed
1280
static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
unknown's avatar
unknown committed
1281 1282 1283
{
  char buf[32];
  
unknown's avatar
unknown committed
1284 1285 1286 1287 1288
  if (my_b_gets(f, buf, sizeof(buf))) 
  {
    *var = atoi(buf);
    return 0;
  }
unknown's avatar
unknown committed
1289
  else if (default_val)
unknown's avatar
unknown committed
1290 1291 1292 1293
  {
    *var = default_val;
    return 0;
  }
unknown's avatar
unknown committed
1294
  return 1;
unknown's avatar
unknown committed
1295 1296
}

1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
/*
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
  relying on the binlog's version. This is not perfect: imagine an upgrade
  of the master without waiting that all slaves are in sync with the master;
  then a slave could be fooled about the binlog's format. This is what happens
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
  slaves are fooled. So we do this only to distinguish between 3.23 and more
  recent masters (it's too late to change things for 3.23).
  
  RETURNS
  0       ok
  1       error
*/
1310

unknown's avatar
unknown committed
1311
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi)
1312
{
1313
  const char* errmsg= 0;
1314 1315 1316 1317 1318 1319 1320

  /*
    Free old description_event_for_queue (that is needed if we are in
    a reconnection).
  */
  delete mi->rli.relay_log.description_event_for_queue;
  mi->rli.relay_log.description_event_for_queue= 0;
1321
  
1322
  if (!my_isdigit(&my_charset_bin,*mysql->server_version))
unknown's avatar
unknown committed
1323
    errmsg = "Master reported unrecognized MySQL version";
1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
  else
  {
    /*
      Note the following switch will bug when we have MySQL branch 30 ;)
    */
    switch (*mysql->server_version) 
    {
    case '0':
    case '1':
    case '2':
      errmsg = "Master reported unrecognized MySQL version";
      break;
    case '3':
      mi->rli.relay_log.description_event_for_queue= new
        Format_description_log_event(1, mysql->server_version); 
      break;
    case '4':
      mi->rli.relay_log.description_event_for_queue= new
        Format_description_log_event(3, mysql->server_version); 
      break;
    default: 
      /*
        Master is MySQL >=5.0. Give a default Format_desc event, so that we can
        take the early steps (like tests for "is this a 3.23 master") which we
        have to take before we receive the real master's Format_desc which will
        override this one. Note that the Format_desc we create below is garbage
        (it has the format of the *slave*); it's only good to help know if the
        master is 3.23, 4.0, etc.
      */
      mi->rli.relay_log.description_event_for_queue= new
        Format_description_log_event(4, mysql->server_version); 
      break;
    }
1357
  }
1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370
  
  /* 
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
     can't read a 6.0 master, this will show up when the slave can't read some
     events sent by the master, and there will be error messages.
  */
  
  if (errmsg)
  {
    sql_print_error(errmsg);
    return 1;
  }
1371 1372 1373 1374 1375 1376 1377 1378

  /* as we are here, we tried to allocate the event */
  if (!mi->rli.relay_log.description_event_for_queue)
  {
    sql_print_error("Slave I/O thread failed to create a default Format_description_log_event");
    return 1;
  }

1379 1380 1381 1382 1383 1384 1385 1386 1387 1388
  /*
    Compare the master and slave's clock. Do not die if master's clock is
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
  */
  MYSQL_RES *master_res= 0;
  MYSQL_ROW master_row;
  
  if (!mysql_real_query(mysql, "SELECT UNIX_TIMESTAMP()", 23) &&
      (master_res= mysql_store_result(mysql)) &&
      (master_row= mysql_fetch_row(master_res)))
unknown's avatar
unknown committed
1389
  {
1390 1391
    mi->clock_diff_with_master= 
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
unknown's avatar
unknown committed
1392
  }
1393
  else
unknown's avatar
unknown committed
1394
  {
1395
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
1396
    sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, \
1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422
do not trust column Seconds_Behind_Master of SHOW SLAVE STATUS");
  }
  if (master_res)
    mysql_free_result(master_res);      
 
  /*
    Check that the master's server id and ours are different. Because if they
    are equal (which can result from a simple copy of master's datadir to slave,
    thus copying some my.cnf), replication will work but all events will be
    skipped.
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
    master?).
    Note: we could have put a @@SERVER_ID in the previous SELECT
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
  */
  if (!mysql_real_query(mysql, "SHOW VARIABLES LIKE 'SERVER_ID'", 31) &&
      (master_res= mysql_store_result(mysql)))
  {
    if ((master_row= mysql_fetch_row(master_res)) &&
        (::server_id == strtoul(master_row[1], 0, 10)) &&
        !replicate_same_server_id)
      errmsg= "The slave I/O thread stops because master and slave have equal \
MySQL server ids; these ids must be different for replication to work (or \
the --replicate-same-server-id option must be used on slave but this does \
not always make sense; please check the manual before using it).";
    mysql_free_result(master_res);
unknown's avatar
unknown committed
1423 1424
  }

1425 1426 1427
  /*
    Check that the master's global character_set_server and ours are the same.
    Not fatal if query fails (old master?).
1428 1429 1430 1431 1432 1433
    Note that we don't check for equality of global character_set_client and
    collation_connection (neither do we prevent their setting in
    set_var.cc). That's because from what I (Guilhem) have tested, the global
    values of these 2 are never used (new connections don't use them).
    We don't test equality of global collation_database either as it's is
    going to be deprecated (made read-only) in 4.1 very soon.
1434
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
1435
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
1436 1437 1438 1439
    charset info in each binlog event.
    We don't do it for 3.23 because masters <3.23.50 hang on
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
    test only if master is 4.x.
1440
  */
1441 1442 1443

  /* redundant with rest of code but safer against later additions */
  if (*mysql->server_version == '3')
1444
    goto err;
1445 1446 1447

  if ((*mysql->server_version == '4') &&
      !mysql_real_query(mysql, "SELECT @@GLOBAL.COLLATION_SERVER", 32) &&
1448
      (master_res= mysql_store_result(mysql)))
unknown's avatar
unknown committed
1449
  {
1450 1451 1452 1453 1454 1455
    if ((master_row= mysql_fetch_row(master_res)) &&
        strcmp(master_row[0], global_system_variables.collation_server->name))
      errmsg= "The slave I/O thread stops because master and slave have \
different values for the COLLATION_SERVER global variable. The values must \
be equal for replication to work";
    mysql_free_result(master_res);
unknown's avatar
unknown committed
1456
  }
1457

1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469
  /*
    Perform analogous check for time zone. Theoretically we also should
    perform check here to verify that SYSTEM time zones are the same on
    slave and master, but we can't rely on value of @@system_time_zone
    variable (it is time zone abbreviation) since it determined at start
    time and so could differ for slave and master even if they are really
    in the same system time zone. So we are omiting this check and just
    relying on documentation. Also according to Monty there are many users
    who are using replication between servers in various time zones. Hence 
    such check will broke everything for them. (And now everything will 
    work for them because by default both their master and slave will have 
    'SYSTEM' time zone).
1470 1471
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
    those were alpha).
1472
  */
1473
  if ((*mysql->server_version == '4') &&
1474
      !mysql_real_query(mysql, "SELECT @@GLOBAL.TIME_ZONE", 25) &&
1475
      (master_res= mysql_store_result(mysql)))
unknown's avatar
unknown committed
1476
  {
1477 1478 1479 1480 1481 1482 1483
    if ((master_row= mysql_fetch_row(master_res)) &&
        strcmp(master_row[0], 
               global_system_variables.time_zone->get_name()->ptr()))
      errmsg= "The slave I/O thread stops because master and slave have \
different values for the TIME_ZONE global variable. The values must \
be equal for replication to work";
    mysql_free_result(master_res);
unknown's avatar
unknown committed
1484 1485
  }

1486
err:
1487 1488 1489 1490 1491
  if (errmsg)
  {
    sql_print_error(errmsg);
    return 1;
  }
1492

1493 1494 1495
  return 0;
}

1496 1497 1498 1499
/*
  Used by fetch_master_table (used by LOAD TABLE tblname FROM MASTER and LOAD
  DATA FROM MASTER). Drops the table (if 'overwrite' is true) and recreates it
  from the dump. Honours replication inclusion/exclusion rules.
1500
  db must be non-zero (guarded by assertion).
1501 1502 1503 1504 1505

  RETURN VALUES
    0           success
    1           error
*/
1506

1507
static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
1508
				  const char* table_name, bool overwrite)
unknown's avatar
unknown committed
1509
{
1510
  ulong packet_len;
1511 1512
  char *query, *save_db;
  uint32 save_db_length;
1513 1514
  Vio* save_vio;
  HA_CHECK_OPT check_opt;
unknown's avatar
unknown committed
1515
  TABLE_LIST tables;
1516 1517
  int error= 1;
  handler *file;
1518
  ulong save_options;
1519
  NET *net= &mysql->net;
unknown's avatar
unknown committed
1520 1521
  DBUG_ENTER("create_table_from_dump");  

1522
  packet_len= my_net_read(net); // read create table statement
1523 1524
  if (packet_len == packet_error)
  {
unknown's avatar
unknown committed
1525
    my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
unknown's avatar
unknown committed
1526
    DBUG_RETURN(1);
1527 1528 1529
  }
  if (net->read_pos[0] == 255) // error from master
  {
1530 1531 1532 1533
    char *err_msg; 
    err_msg= (char*) net->read_pos + ((mysql->server_capabilities &
				       CLIENT_PROTOCOL_41) ?
				      3+SQLSTATE_LENGTH+1 : 3);
1534
    my_error(ER_MASTER, MYF(0), err_msg);
unknown's avatar
unknown committed
1535
    DBUG_RETURN(1);
1536
  }
unknown's avatar
unknown committed
1537
  thd->command = COM_TABLE_DUMP;
1538
  thd->query_length= packet_len;
1539
  /* Note that we should not set thd->query until the area is initalized */
1540
  if (!(query = thd->strmake((char*) net->read_pos, packet_len)))
1541 1542
  {
    sql_print_error("create_table_from_dump: out of memory");
unknown's avatar
unknown committed
1543
    my_message(ER_GET_ERRNO, "Out of memory", MYF(0));
unknown's avatar
unknown committed
1544
    DBUG_RETURN(1);
1545
  }
1546
  thd->query= query;
unknown's avatar
unknown committed
1547 1548
  thd->query_error = 0;
  thd->net.no_send_ok = 1;
1549

unknown's avatar
unknown committed
1550 1551
  bzero((char*) &tables,sizeof(tables));
  tables.db = (char*)db;
1552
  tables.alias= tables.table_name= (char*)table_name;
unknown's avatar
unknown committed
1553

unknown's avatar
unknown committed
1554 1555 1556 1557 1558 1559 1560
  /* Drop the table if 'overwrite' is true */
  if (overwrite && mysql_rm_table(thd,&tables,1,0)) /* drop if exists */
  {
    sql_print_error("create_table_from_dump: failed to drop the table");
    goto err;
  }

1561
  /* Create the table. We do not want to log the "create table" statement */
1562
  save_options = thd->options;
1563
  thd->options &= ~(ulong) (OPTION_BIN_LOG);
unknown's avatar
unknown committed
1564
  thd->proc_info = "Creating table from master dump";
unknown's avatar
unknown committed
1565
  // save old db in case we are creating in a different database
1566
  save_db = thd->db;
1567
  save_db_length= thd->db_length;
1568
  thd->db = (char*)db;
1569
  DBUG_ASSERT(thd->db != 0);
1570
  thd->db_length= strlen(thd->db);
unknown's avatar
unknown committed
1571
  mysql_parse(thd, thd->query, packet_len); // run create table
1572
  thd->db = save_db;		// leave things the way the were before
1573
  thd->db_length= save_db_length;
1574
  thd->options = save_options;
unknown's avatar
unknown committed
1575
  
1576 1577
  if (thd->query_error)
    goto err;			// mysql_parse took care of the error send
unknown's avatar
unknown committed
1578 1579

  thd->proc_info = "Opening master dump table";
1580
  tables.lock_type = TL_WRITE;
unknown's avatar
unknown committed
1581 1582 1583
  if (!open_ltable(thd, &tables, TL_WRITE))
  {
    sql_print_error("create_table_from_dump: could not open created table");
1584
    goto err;
unknown's avatar
unknown committed
1585
  }
unknown's avatar
unknown committed
1586
  
1587
  file = tables.table->file;
unknown's avatar
unknown committed
1588
  thd->proc_info = "Reading master dump table data";
1589
  /* Copy the data file */
unknown's avatar
unknown committed
1590 1591
  if (file->net_read_dump(net))
  {
unknown's avatar
unknown committed
1592
    my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
1593
    sql_print_error("create_table_from_dump: failed in\
unknown's avatar
unknown committed
1594
 handler::net_read_dump()");
1595
    goto err;
unknown's avatar
unknown committed
1596
  }
unknown's avatar
unknown committed
1597 1598

  check_opt.init();
unknown's avatar
unknown committed
1599
  check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK;
unknown's avatar
unknown committed
1600
  thd->proc_info = "Rebuilding the index on master dump table";
unknown's avatar
unknown committed
1601 1602 1603 1604 1605
  /*
    We do not want repair() to spam us with messages
    just send them to the error log, and report the failure in case of
    problems.
  */
1606
  save_vio = thd->net.vio;
unknown's avatar
unknown committed
1607
  thd->net.vio = 0;
1608
  /* Rebuild the index file from the copied data file (with REPAIR) */
1609
  error=file->repair(thd,&check_opt) != 0;
unknown's avatar
unknown committed
1610
  thd->net.vio = save_vio;
1611
  if (error)
1612
    my_error(ER_INDEX_REBUILD, MYF(0), tables.table->s->table_name);
1613 1614

err:
unknown's avatar
unknown committed
1615 1616
  close_thread_tables(thd);
  thd->net.no_send_ok = 0;
unknown's avatar
unknown committed
1617
  DBUG_RETURN(error); 
unknown's avatar
unknown committed
1618 1619
}

1620

1621
int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
1622
		       MASTER_INFO *mi, MYSQL *mysql, bool overwrite)
unknown's avatar
unknown committed
1623
{
1624 1625 1626 1627 1628 1629
  int error= 1;
  const char *errmsg=0;
  bool called_connected= (mysql != NULL);
  DBUG_ENTER("fetch_master_table");
  DBUG_PRINT("enter", ("db_name: '%s'  table_name: '%s'",
		       db_name,table_name));
unknown's avatar
unknown committed
1630

unknown's avatar
merge  
unknown committed
1631
  if (!called_connected)
1632
  { 
unknown's avatar
SCRUM  
unknown committed
1633
    if (!(mysql = mysql_init(NULL)))
1634 1635 1636
    {
      DBUG_RETURN(1);
    }
unknown's avatar
merge  
unknown committed
1637
    if (connect_to_master(thd, mysql, mi))
1638
    {
1639
      my_error(ER_CONNECT_TO_MASTER, MYF(0), mysql_error(mysql));
unknown's avatar
SCRUM  
unknown committed
1640
      mysql_close(mysql);
1641
      DBUG_RETURN(1);
1642
    }
1643 1644
    if (thd->killed)
      goto err;
1645
  }
unknown's avatar
unknown committed
1646

unknown's avatar
unknown committed
1647
  if (request_table_dump(mysql, db_name, table_name))
1648
  {
1649 1650
    error= ER_UNKNOWN_ERROR;
    errmsg= "Failed on table dump request";
1651 1652
    goto err;
  }
unknown's avatar
unknown committed
1653 1654 1655
  if (create_table_from_dump(thd, mysql, db_name,
			     table_name, overwrite))
    goto err;    // create_table_from_dump have sent the error already
unknown's avatar
unknown committed
1656
  error = 0;
1657

unknown's avatar
unknown committed
1658
 err:
1659
  thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
1660
  if (!called_connected)
unknown's avatar
SCRUM  
unknown committed
1661
    mysql_close(mysql);
1662
  if (errmsg && thd->vio_ok())
unknown's avatar
unknown committed
1663
    my_message(error, errmsg, MYF(0));
1664
  DBUG_RETURN(test(error));			// Return 1 on error
unknown's avatar
unknown committed
1665 1666
}

1667

1668 1669
void end_master_info(MASTER_INFO* mi)
{
1670 1671
  DBUG_ENTER("end_master_info");

1672
  if (!mi->inited)
1673
    DBUG_VOID_RETURN;
1674 1675
  end_relay_log_info(&mi->rli);
  if (mi->fd >= 0)
1676 1677 1678 1679 1680
  {
    end_io_cache(&mi->file);
    (void)my_close(mi->fd, MYF(MY_WME));
    mi->fd = -1;
  }
1681
  mi->inited = 0;
1682 1683

  DBUG_VOID_RETURN;
1684 1685
}

1686

1687 1688
static int init_relay_log_info(RELAY_LOG_INFO* rli,
                               const char* info_fname)
1689 1690 1691 1692 1693
{
  char fname[FN_REFLEN+128];
  int info_fd;
  const char* msg = 0;
  int error = 0;
1694
  DBUG_ENTER("init_relay_log_info");
unknown's avatar
unknown committed
1695

1696
  if (rli->inited)                       // Set if this function called
unknown's avatar
unknown committed
1697 1698
    DBUG_RETURN(0);
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
1699 1700 1701 1702
  pthread_mutex_lock(&rli->data_lock);
  info_fd = rli->info_fd;
  rli->cur_log_fd = -1;
  rli->slave_skip_counter=0;
1703
  rli->abort_pos_wait=0;
1704 1705
  rli->log_space_limit= relay_log_space_limit;
  rli->log_space_total= 0;
1706

1707
  /*
unknown's avatar
unknown committed
1708
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
1709 1710
    Note that the I/O thread flushes it to disk after writing every
    event, in flush_master_info(mi, 1).
1711 1712
  */

unknown's avatar
unknown committed
1713 1714 1715 1716 1717 1718 1719 1720 1721
  /*
    For the maximum log size, we choose max_relay_log_size if it is
    non-zero, max_binlog_size otherwise. If later the user does SET
    GLOBAL on one of these variables, fix_max_binlog_size and
    fix_max_relay_log_size will reconsider the choice (for example
    if the user changes max_relay_log_size to zero, we have to
    switch to using max_binlog_size for the relay log) and update
    rli->relay_log.max_size (and mysql_bin_log.max_size).
  */
1722
  {
1723 1724
    char buf[FN_REFLEN];
    const char *ln;
1725
    static bool name_warning_sent= 0;
1726 1727
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
                                     1, buf);
1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744
    /* We send the warning only at startup, not after every RESET SLAVE */
    if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
    {
      /*
        User didn't give us info to name the relay log index file.
        Picking `hostname`-relay-bin.index like we do, causes replication to
        fail if this slave's hostname is changed later. So, we would like to
        instead require a name. But as we don't want to break many existing
        setups, we only give warning, not error.
      */
      sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
                        " so replication "
                        "may break when this MySQL server acts as a "
                        "slave and has his hostname changed!! Please "
                        "use '--relay-log=%s' to avoid this problem.", ln);
      name_warning_sent= 1;
    }
1745 1746 1747 1748 1749 1750 1751
    /*
      note, that if open() fails, we'll still have index file open
      but a destructor will take care of that
    */
    if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
                            (max_relay_log_size ? max_relay_log_size :
1752
                            max_binlog_size), 1))
1753 1754 1755 1756 1757
    {
      pthread_mutex_unlock(&rli->data_lock);
      sql_print_error("Failed in open_log() called from init_relay_log_info()");
      DBUG_RETURN(1);
    }
1758
  }
1759

1760
  /* if file does not exist */
unknown's avatar
unknown committed
1761
  if (access(fname,F_OK))
1762
  {
unknown's avatar
unknown committed
1763 1764 1765 1766
    /*
      If someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
1767 1768
    if (info_fd >= 0)
      my_close(info_fd, MYF(MY_WME));
1769
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
1770
    {
1771 1772 1773 1774 1775 1776 1777
      sql_print_error("Failed to create a new relay log info file (\
file '%s', errno %d)", fname, my_errno);
      msg= current_thd->net.last_error;
      goto err;
    }
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
		      MYF(MY_WME))) 
1778
    {
1779 1780
      sql_print_error("Failed to create a cache on relay log info file '%s'",
		      fname);
1781 1782
      msg= current_thd->net.last_error;
      goto err;
1783
    }
1784 1785 1786

    /* Init relay log with first entry in the relay index file */
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
1787
			   &msg, 0))
1788
    {
1789
      sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
1790
      goto err;
1791
    }
1792 1793
    rli->group_master_log_name[0]= 0;
    rli->group_master_log_pos= 0;		
1794
    rli->info_fd= info_fd;
1795 1796 1797
  }
  else // file exists
  {
unknown's avatar
unknown committed
1798
    if (info_fd >= 0)
1799
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
1800
    else 
1801
    {
1802 1803 1804
      int error=0;
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
      {
1805 1806 1807
        sql_print_error("\
Failed to open the existing relay log info file '%s' (errno %d)",
			fname, my_errno);
1808 1809 1810 1811 1812
        error= 1;
      }
      else if (init_io_cache(&rli->info_file, info_fd,
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
      {
1813 1814
        sql_print_error("Failed to create a cache on relay log info file '%s'",
			fname);
1815 1816 1817 1818 1819 1820 1821
        error= 1;
      }
      if (error)
      {
        if (info_fd >= 0)
          my_close(info_fd, MYF(0));
        rli->info_fd= -1;
1822
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1823 1824 1825
        pthread_mutex_unlock(&rli->data_lock);
        DBUG_RETURN(1);
      }
1826
    }
1827
         
1828
    rli->info_fd = info_fd;
1829
    int relay_log_pos, master_log_pos;
1830
    if (init_strvar_from_file(rli->group_relay_log_name,
unknown's avatar
unknown committed
1831 1832
			      sizeof(rli->group_relay_log_name),
                              &rli->info_file, "") ||
1833
       init_intvar_from_file(&relay_log_pos,
unknown's avatar
unknown committed
1834
			     &rli->info_file, BIN_LOG_HEADER_SIZE) ||
1835
       init_strvar_from_file(rli->group_master_log_name,
unknown's avatar
unknown committed
1836 1837
			     sizeof(rli->group_master_log_name),
                             &rli->info_file, "") ||
1838
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
1839 1840 1841 1842
    {
      msg="Error reading slave log configuration";
      goto err;
    }
1843 1844 1845 1846
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
            sizeof(rli->event_relay_log_name)-1);
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
    rli->group_master_log_pos= master_log_pos;
1847

1848
    if (init_relay_log_pos(rli,
1849 1850
			   rli->group_relay_log_name,
			   rli->group_relay_log_pos,
1851
			   0 /* no data lock*/,
1852
			   &msg, 0))
1853 1854
    {
      char llbuf[22];
1855
      sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
unknown's avatar
unknown committed
1856 1857
		      rli->group_relay_log_name,
		      llstr(rli->group_relay_log_pos, llbuf));
1858
      goto err;
1859
    }
1860
  }
1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872

#ifndef DBUG_OFF
  {
    char llbuf1[22], llbuf2[22];
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
                        llstr(my_b_tell(rli->cur_log),llbuf1), 
                        llstr(rli->event_relay_log_pos,llbuf2)));
    DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
    DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
  }
#endif

unknown's avatar
unknown committed
1873 1874 1875 1876
  /*
    Now change the cache from READ to WRITE - must do this
    before flush_relay_log_info
  */
1877
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
1878 1879
  if ((error= flush_relay_log_info(rli)))
    sql_print_error("Failed to flush relay log info file");
unknown's avatar
unknown committed
1880 1881 1882 1883 1884
  if (count_relay_log_space(rli))
  {
    msg="Error counting relay log space";
    goto err;
  }
1885
  rli->inited= 1;
1886
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1887
  DBUG_RETURN(error);
1888 1889 1890 1891

err:
  sql_print_error(msg);
  end_io_cache(&rli->info_file);
1892 1893
  if (info_fd >= 0)
    my_close(info_fd, MYF(0));
unknown's avatar
unknown committed
1894
  rli->info_fd= -1;
1895
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1896
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1897
  DBUG_RETURN(1);
1898 1899
}

1900

unknown's avatar
unknown committed
1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914
static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo)
{
  MY_STAT s;
  DBUG_ENTER("add_relay_log");
  if (!my_stat(linfo->log_file_name,&s,MYF(0)))
  {
    sql_print_error("log %s listed in the index, but failed to stat",
		    linfo->log_file_name);
    DBUG_RETURN(1);
  }
  rli->log_space_total += s.st_size;
#ifndef DBUG_OFF
  char buf[22];
  DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
1915
#endif  
unknown's avatar
unknown committed
1916 1917 1918
  DBUG_RETURN(0);
}

1919

unknown's avatar
unknown committed
1920 1921
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
{
1922
  bool slave_killed=0;
unknown's avatar
unknown committed
1923
  MASTER_INFO* mi = rli->mi;
unknown's avatar
unknown committed
1924
  const char *save_proc_info;
unknown's avatar
unknown committed
1925
  THD* thd = mi->io_thd;
1926

unknown's avatar
unknown committed
1927
  DBUG_ENTER("wait_for_relay_log_space");
1928

unknown's avatar
unknown committed
1929
  pthread_mutex_lock(&rli->log_space_lock);
unknown's avatar
unknown committed
1930 1931
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
				  &rli->log_space_lock, 
unknown's avatar
unknown committed
1932
				  "\
1933
Waiting for the slave SQL thread to free enough relay log space");
unknown's avatar
unknown committed
1934
  while (rli->log_space_limit < rli->log_space_total &&
1935 1936
	 !(slave_killed=io_slave_killed(thd,mi)) &&
         !rli->ignore_log_space_limit)
unknown's avatar
unknown committed
1937
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
unknown's avatar
unknown committed
1938
  thd->exit_cond(save_proc_info);
unknown's avatar
unknown committed
1939 1940 1941
  DBUG_RETURN(slave_killed);
}

unknown's avatar
unknown committed
1942

unknown's avatar
unknown committed
1943 1944 1945 1946
static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
  LOG_INFO linfo;
  DBUG_ENTER("count_relay_log_space");
1947
  rli->log_space_total= 0;
1948
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
unknown's avatar
unknown committed
1949 1950 1951 1952
  {
    sql_print_error("Could not find first log while counting relay log space");
    DBUG_RETURN(1);
  }
unknown's avatar
unknown committed
1953
  do
unknown's avatar
unknown committed
1954 1955 1956
  {
    if (add_relay_log(rli,&linfo))
      DBUG_RETURN(1);
1957
  } while (!rli->relay_log.find_next_log(&linfo, 1));
1958 1959 1960 1961 1962 1963
  /* 
     As we have counted everything, including what may have written in a
     preceding write, we must reset bytes_written, or we may count some space 
     twice.
  */
  rli->relay_log.reset_bytes_written();
unknown's avatar
unknown committed
1964 1965
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1966

1967

1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016
/*
  Builds a Rotate from the ignored events' info and writes it to relay log.

  SYNOPSIS
  write_ignored_events_info_to_relay_log()
    thd             pointer to I/O thread's thd
    mi

  DESCRIPTION
    Slave I/O thread, going to die, must leave a durable trace of the
    ignored events' end position for the use of the slave SQL thread, by
    calling this function. Only that thread can call it (see assertion).
 */
static void write_ignored_events_info_to_relay_log(THD *thd, MASTER_INFO *mi)
{
  RELAY_LOG_INFO *rli= &mi->rli;
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
  DBUG_ASSERT(thd == mi->io_thd);
  pthread_mutex_lock(log_lock);
  if (rli->ign_master_log_name_end[0])
  {
    DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
    Rotate_log_event *ev= new Rotate_log_event(thd, rli->ign_master_log_name_end,
                                               0, rli->ign_master_log_pos_end,
                                               Rotate_log_event::DUP_NAME);
    rli->ign_master_log_name_end[0]= 0;
    /* can unlock before writing as slave SQL thd will soon see our Rotate */
    pthread_mutex_unlock(log_lock);
    if (likely((bool)ev))
    {
      ev->server_id= 0; // don't be ignored by slave SQL thread
      if (unlikely(rli->relay_log.append(ev)))
        sql_print_error("Slave I/O thread failed to write a Rotate event"
                        " to the relay log, "
                        "SHOW SLAVE STATUS may be inaccurate");
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
      flush_master_info(mi, 1);
      delete ev;
    }
    else
      sql_print_error("Slave I/O thread failed to create a Rotate event"
                      " (out of memory?), "
                      "SHOW SLAVE STATUS may be inaccurate");
  }
  else
    pthread_mutex_unlock(log_lock);
}


unknown's avatar
unknown committed
2017 2018 2019 2020 2021 2022 2023 2024 2025 2026
void init_master_info_with_options(MASTER_INFO* mi)
{
  mi->master_log_name[0] = 0;
  mi->master_log_pos = BIN_LOG_HEADER_SIZE;		// skip magic number
  
  if (master_host)
    strmake(mi->host, master_host, sizeof(mi->host) - 1);
  if (master_user)
    strmake(mi->user, master_user, sizeof(mi->user) - 1);
  if (master_password)
unknown's avatar
unknown committed
2027
    strmake(mi->password, master_password, MAX_PASSWORD_LENGTH);
unknown's avatar
unknown committed
2028 2029
  mi->port = master_port;
  mi->connect_retry = master_connect_retry;
unknown's avatar
unknown committed
2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041
  
  mi->ssl= master_ssl;
  if (master_ssl_ca)
    strmake(mi->ssl_ca, master_ssl_ca, sizeof(mi->ssl_ca)-1);
  if (master_ssl_capath)
    strmake(mi->ssl_capath, master_ssl_capath, sizeof(mi->ssl_capath)-1);
  if (master_ssl_cert)
    strmake(mi->ssl_cert, master_ssl_cert, sizeof(mi->ssl_cert)-1);
  if (master_ssl_cipher)
    strmake(mi->ssl_cipher, master_ssl_cipher, sizeof(mi->ssl_cipher)-1);
  if (master_ssl_key)
    strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1);
unknown's avatar
unknown committed
2042 2043
}

2044
void clear_slave_error(RELAY_LOG_INFO* rli)
unknown's avatar
unknown committed
2045
{
unknown's avatar
unknown committed
2046 2047 2048
  /* Clear the errors displayed by SHOW SLAVE STATUS */
  rli->last_slave_error[0]= 0;
  rli->last_slave_errno= 0;
unknown's avatar
unknown committed
2049
}
2050

2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
/*
    Reset UNTIL condition for RELAY_LOG_INFO
   SYNOPSYS
    clear_until_condition()
      rli - RELAY_LOG_INFO structure where UNTIL condition should be reset
 */
void clear_until_condition(RELAY_LOG_INFO* rli)
{
  rli->until_condition= RELAY_LOG_INFO::UNTIL_NONE;
  rli->until_log_name[0]= 0;
  rli->until_log_pos= 0;
}


unknown's avatar
unknown committed
2065
#define LINES_IN_MASTER_INFO_WITH_SSL 14
2066

unknown's avatar
unknown committed
2067

2068
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
2069 2070 2071
                     const char* slave_info_fname,
                     bool abort_if_no_master_info_file,
                     int thread_mask)
unknown's avatar
unknown committed
2072
{
unknown's avatar
unknown committed
2073 2074 2075 2076
  int fd,error;
  char fname[FN_REFLEN+128];
  DBUG_ENTER("init_master_info");

unknown's avatar
unknown committed
2077
  if (mi->inited)
unknown's avatar
unknown committed
2078 2079 2080 2081 2082 2083 2084
  {
    /*
      We have to reset read position of relay-log-bin as we may have
      already been reading from 'hotlog' when the slave was stopped
      last time. If this case pos_in_file would be set and we would
      get a crash when trying to read the signature for the binary
      relay log.
2085

2086 2087 2088 2089
      We only rewind the read position if we are starting the SQL
      thread. The handle_slave_sql thread assumes that the read
      position is at the beginning of the file, and will read the
      "signature" and then fast-forward to the last position read.
unknown's avatar
unknown committed
2090
    */
unknown's avatar
unknown committed
2091 2092
    if (thread_mask & SLAVE_SQL)
    {
2093 2094
      my_b_seek(mi->rli.cur_log, (my_off_t) 0);
    }
unknown's avatar
unknown committed
2095
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2096 2097
  }

unknown's avatar
unknown committed
2098 2099
  mi->mysql=0;
  mi->file_id=1;
2100
  fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
unknown's avatar
unknown committed
2101

unknown's avatar
unknown committed
2102 2103 2104 2105
  /*
    We need a mutex while we are changing master info parameters to
    keep other threads from reading bogus info
  */
unknown's avatar
unknown committed
2106

2107
  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
2108
  fd = mi->fd;
2109 2110

  /* does master.info exist ? */
2111

2112
  if (access(fname,F_OK))
unknown's avatar
unknown committed
2113
  {
2114 2115 2116 2117 2118
    if (abort_if_no_master_info_file)
    {
      pthread_mutex_unlock(&mi->data_lock);
      DBUG_RETURN(0);
    }
unknown's avatar
unknown committed
2119 2120 2121 2122
    /*
      if someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
unknown's avatar
unknown committed
2123 2124
    if (fd >= 0)
      my_close(fd, MYF(MY_WME));
2125 2126 2127 2128 2129 2130 2131
    if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
    {
      sql_print_error("Failed to create a new master info file (\
file '%s', errno %d)", fname, my_errno);
      goto err;
    }
    if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
unknown's avatar
unknown committed
2132
		      MYF(MY_WME)))
2133 2134 2135
    {
      sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
unknown's avatar
unknown committed
2136
      goto err;
2137
    }
unknown's avatar
unknown committed
2138

unknown's avatar
unknown committed
2139
    mi->fd = fd;
unknown's avatar
unknown committed
2140 2141
    init_master_info_with_options(mi);

unknown's avatar
unknown committed
2142
  }
2143
  else // file exists
unknown's avatar
unknown committed
2144
  {
unknown's avatar
unknown committed
2145
    if (fd >= 0)
unknown's avatar
unknown committed
2146
      reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
2147
    else
2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162
    {
      if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
      {
        sql_print_error("Failed to open the existing master info file (\
file '%s', errno %d)", fname, my_errno);
        goto err;
      }
      if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
                        0, MYF(MY_WME)))
      {
        sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
        goto err;
      }
    }
unknown's avatar
unknown committed
2163

unknown's avatar
unknown committed
2164
    mi->fd = fd;
unknown's avatar
unknown committed
2165 2166
    int port, connect_retry, master_log_pos, ssl= 0, lines;
    char *first_non_digit;
2167

unknown's avatar
unknown committed
2168 2169
    /*
       Starting from 4.1.x master.info has new format. Now its
2170 2171 2172
       first line contains number of lines in file. By reading this
       number we will be always distinguish to which version our
       master.info corresponds to. We can't simply count lines in
unknown's avatar
unknown committed
2173 2174
       file since versions before 4.1.x could generate files with more
       lines than needed.
2175
       If first line doesn't contain a number or contain number less than
unknown's avatar
unknown committed
2176
       14 then such file is treated like file from pre 4.1.1 version.
2177
       There is no ambiguity when reading an old master.info, as before
unknown's avatar
unknown committed
2178
       4.1.1, the first line contained the binlog's name, which is either
2179
       empty or has an extension (contains a '.'), so can't be confused
unknown's avatar
unknown committed
2180 2181
       with an integer.

2182
       So we're just reading first line and trying to figure which version
unknown's avatar
unknown committed
2183 2184
       is this.
    */
2185 2186 2187 2188

    /*
       The first row is temporarily stored in mi->master_log_name,
       if it is line count and not binlog name (new format) it will be
unknown's avatar
unknown committed
2189 2190
       overwritten by the second row later.
    */
2191
    if (init_strvar_from_file(mi->master_log_name,
unknown's avatar
unknown committed
2192
			      sizeof(mi->master_log_name), &mi->file,
unknown's avatar
unknown committed
2193 2194
			      ""))
      goto errwithmsg;
2195

unknown's avatar
unknown committed
2196 2197
    lines= strtoul(mi->master_log_name, &first_non_digit, 10);

2198
    if (mi->master_log_name[0]!='\0' &&
unknown's avatar
unknown committed
2199 2200
        *first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL)
    {                                          // Seems to be new format
2201
      if (init_strvar_from_file(mi->master_log_name,
unknown's avatar
unknown committed
2202 2203 2204 2205 2206
            sizeof(mi->master_log_name), &mi->file, ""))
        goto errwithmsg;
    }
    else
      lines= 7;
2207

unknown's avatar
unknown committed
2208
    if (init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
unknown's avatar
unknown committed
2209 2210 2211
	init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
			      master_host) ||
	init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
2212
			      master_user) ||
2213 2214
        init_strvar_from_file(mi->password, SCRAMBLED_PASSWORD_CHAR_LENGTH+1,
                              &mi->file, master_password) ||
2215 2216
	init_intvar_from_file(&port, &mi->file, master_port) ||
	init_intvar_from_file(&connect_retry, &mi->file,
unknown's avatar
unknown committed
2217
			      master_connect_retry))
unknown's avatar
unknown committed
2218 2219
      goto errwithmsg;

2220 2221 2222 2223
    /*
       If file has ssl part use it even if we have server without
       SSL support. But these option will be ignored later when
       slave will try connect to master, so in this case warning
unknown's avatar
unknown committed
2224 2225
       is printed.
     */
2226
    if (lines >= LINES_IN_MASTER_INFO_WITH_SSL &&
unknown's avatar
unknown committed
2227
        (init_intvar_from_file(&ssl, &mi->file, master_ssl) ||
2228
         init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca),
unknown's avatar
unknown committed
2229
                               &mi->file, master_ssl_ca) ||
2230
         init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath),
unknown's avatar
unknown committed
2231 2232 2233 2234 2235 2236 2237 2238 2239 2240
                               &mi->file, master_ssl_capath) ||
         init_strvar_from_file(mi->ssl_cert, sizeof(mi->ssl_cert),
                               &mi->file, master_ssl_cert) ||
         init_strvar_from_file(mi->ssl_cipher, sizeof(mi->ssl_cipher),
                               &mi->file, master_ssl_cipher) ||
         init_strvar_from_file(mi->ssl_key, sizeof(mi->ssl_key),
                              &mi->file, master_ssl_key)))
      goto errwithmsg;
#ifndef HAVE_OPENSSL
    if (ssl)
2241
      sql_print_warning("SSL information in the master info file "
unknown's avatar
unknown committed
2242 2243 2244
                      "('%s') are ignored because this MySQL slave was compiled "
                      "without SSL support.", fname);
#endif /* HAVE_OPENSSL */
2245

2246 2247 2248 2249 2250 2251 2252
    /*
      This has to be handled here as init_intvar_from_file can't handle
      my_off_t types
    */
    mi->master_log_pos= (my_off_t) master_log_pos;
    mi->port= (uint) port;
    mi->connect_retry= (uint) connect_retry;
unknown's avatar
unknown committed
2253
    mi->ssl= (my_bool) ssl;
unknown's avatar
unknown committed
2254
  }
2255 2256 2257
  DBUG_PRINT("master_info",("log_file_name: %s  position: %ld",
			    mi->master_log_name,
			    (ulong) mi->master_log_pos));
2258

2259
  mi->rli.mi = mi;
2260 2261 2262
  if (init_relay_log_info(&mi->rli, slave_info_fname))
    goto err;

unknown's avatar
unknown committed
2263
  mi->inited = 1;
unknown's avatar
unknown committed
2264
  // now change cache READ -> WRITE - must do this before flush_master_info
2265
  reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
unknown's avatar
unknown committed
2266
  if ((error=test(flush_master_info(mi, 1))))
2267
    sql_print_error("Failed to flush master info file");
2268
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2269
  DBUG_RETURN(error);
2270

unknown's avatar
unknown committed
2271 2272
errwithmsg:
  sql_print_error("Error reading master configuration");
2273

2274
err:
unknown's avatar
unknown committed
2275 2276 2277 2278 2279 2280
  if (fd >= 0)
  {
    my_close(fd, MYF(0));
    end_io_cache(&mi->file);
  }
  mi->fd= -1;
2281
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2282
  DBUG_RETURN(1);
unknown's avatar
unknown committed
2283 2284
}

2285

2286 2287
int register_slave_on_master(MYSQL* mysql)
{
2288 2289
  char buf[1024], *pos= buf;
  uint report_host_len, report_user_len=0, report_password_len=0;
2290

2291
  if (!report_host)
2292
    return 0;
2293
  report_host_len= strlen(report_host);
2294
  if (report_user)
2295
    report_user_len= strlen(report_user);
unknown's avatar
unknown committed
2296
  if (report_password)
2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311
    report_password_len= strlen(report_password);
  /* 30 is a good safety margin */
  if (report_host_len + report_user_len + report_password_len + 30 >
      sizeof(buf))
    return 0;					// safety

  int4store(pos, server_id); pos+= 4;
  pos= net_store_data(pos, report_host, report_host_len); 
  pos= net_store_data(pos, report_user, report_user_len);
  pos= net_store_data(pos, report_password, report_password_len);
  int2store(pos, (uint16) report_port); pos+= 2;
  int4store(pos, rpl_recovery_rank);	pos+= 4;
  /* The master will fill in master_id */
  int4store(pos, 0);			pos+= 4;

unknown's avatar
SCRUM  
unknown committed
2312
  if (simple_command(mysql, COM_REGISTER_SLAVE, (char*) buf,
2313
			(uint) (pos- buf), 0))
2314
  {
2315
    sql_print_error("Error on COM_REGISTER_SLAVE: %d '%s'",
unknown's avatar
SCRUM  
unknown committed
2316 2317
		    mysql_errno(mysql),
		    mysql_error(mysql));
2318 2319 2320 2321 2322
    return 1;
  }
  return 0;
}

2323

2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365
/*
  Builds a String from a HASH of TABLE_RULE_ENT. Cannot be used for any other 
  hash, as it assumes that the hash entries are TABLE_RULE_ENT.

  SYNOPSIS
    table_rule_ent_hash_to_str()
    s               pointer to the String to fill
    h               pointer to the HASH to read

  RETURN VALUES
    none
*/

void table_rule_ent_hash_to_str(String* s, HASH* h)
{
  s->length(0);
  for (uint i=0 ; i < h->records ; i++)
  {
    TABLE_RULE_ENT* e= (TABLE_RULE_ENT*) hash_element(h, i);
    if (s->length())
      s->append(',');
    s->append(e->db,e->key_len);
  }
}

/*
  Mostly the same thing as above
*/

void table_rule_ent_dynamic_array_to_str(String* s, DYNAMIC_ARRAY* a)
{
  s->length(0);
  for (uint i=0 ; i < a->elements ; i++)
  {
    TABLE_RULE_ENT* e;
    get_dynamic(a, (gptr)&e, i);
    if (s->length())
      s->append(',');
    s->append(e->db,e->key_len);
  }
}

unknown's avatar
unknown committed
2366
bool show_master_info(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
2367
{
2368
  // TODO: fix this for multi-master
unknown's avatar
unknown committed
2369
  List<Item> field_list;
2370 2371 2372
  Protocol *protocol= thd->protocol;
  DBUG_ENTER("show_master_info");

unknown's avatar
unknown committed
2373 2374
  field_list.push_back(new Item_empty_string("Slave_IO_State",
						     14));
unknown's avatar
unknown committed
2375
  field_list.push_back(new Item_empty_string("Master_Host",
2376
						     sizeof(mi->host)));
unknown's avatar
unknown committed
2377
  field_list.push_back(new Item_empty_string("Master_User",
2378
						     sizeof(mi->user)));
2379 2380
  field_list.push_back(new Item_return_int("Master_Port", 7,
					   MYSQL_TYPE_LONG));
2381
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
2382
					   MYSQL_TYPE_LONG));
2383
  field_list.push_back(new Item_empty_string("Master_Log_File",
2384 2385 2386
					     FN_REFLEN));
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
					   MYSQL_TYPE_LONGLONG));
2387
  field_list.push_back(new Item_empty_string("Relay_Log_File",
2388 2389 2390
					     FN_REFLEN));
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
					   MYSQL_TYPE_LONGLONG));
2391
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
2392
					     FN_REFLEN));
2393 2394
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
2395 2396 2397 2398 2399 2400
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
2401
					     28));
2402 2403 2404
  field_list.push_back(new Item_return_int("Last_Errno", 4, MYSQL_TYPE_LONG));
  field_list.push_back(new Item_empty_string("Last_Error", 20));
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
2405
					   MYSQL_TYPE_LONG));
2406
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
2407
					   MYSQL_TYPE_LONGLONG));
2408
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
2409
					   MYSQL_TYPE_LONGLONG));
2410
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
2411
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
2412
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10, 
2413
                                           MYSQL_TYPE_LONGLONG));
unknown's avatar
unknown committed
2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424
  field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
  field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
                                             sizeof(mi->ssl_ca)));
  field_list.push_back(new Item_empty_string("Master_SSL_CA_Path", 
                                             sizeof(mi->ssl_capath)));
  field_list.push_back(new Item_empty_string("Master_SSL_Cert", 
                                             sizeof(mi->ssl_cert)));
  field_list.push_back(new Item_empty_string("Master_SSL_Cipher", 
                                             sizeof(mi->ssl_cipher)));
  field_list.push_back(new Item_empty_string("Master_SSL_Key", 
                                             sizeof(mi->ssl_key)));
2425
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
unknown's avatar
unknown committed
2426
                                           MYSQL_TYPE_LONGLONG));
unknown's avatar
unknown committed
2427
  
2428 2429
  if (protocol->send_fields(&field_list,
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
unknown's avatar
unknown committed
2430
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
2431

2432 2433
  if (mi->host[0])
  {
2434
    DBUG_PRINT("info",("host is set: '%s'", mi->host));
2435
    String *packet= &thd->packet;
2436
    protocol->prepare_for_resend();
unknown's avatar
unknown committed
2437
  
2438 2439 2440 2441 2442
    /*
      TODO: we read slave_running without run_lock, whereas these variables
      are updated under run_lock and not data_lock. In 5.0 we should lock
      run_lock on top of data_lock (with good order).
    */
2443 2444
    pthread_mutex_lock(&mi->data_lock);
    pthread_mutex_lock(&mi->rli.data_lock);
unknown's avatar
unknown committed
2445 2446

    protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
2447 2448
    protocol->store(mi->host, &my_charset_bin);
    protocol->store(mi->user, &my_charset_bin);
2449 2450
    protocol->store((uint32) mi->port);
    protocol->store((uint32) mi->connect_retry);
2451
    protocol->store(mi->master_log_name, &my_charset_bin);
2452
    protocol->store((ulonglong) mi->master_log_pos);
2453
    protocol->store(mi->rli.group_relay_log_name +
2454 2455
		    dirname_length(mi->rli.group_relay_log_name),
		    &my_charset_bin);
2456 2457
    protocol->store((ulonglong) mi->rli.group_relay_log_pos);
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
unknown's avatar
unknown committed
2458 2459
    protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
                    "Yes" : "No", &my_charset_bin);
2460
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
2461 2462
    protocol->store(&replicate_do_db);
    protocol->store(&replicate_ignore_db);
2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479
    /*
      We can't directly use some protocol->store for 
      replicate_*_table,
      as Protocol doesn't know the TABLE_RULE_ENT struct.
      We first build Strings and then pass them to protocol->store.
    */
    char buf[256];
    String tmp(buf, sizeof(buf), &my_charset_bin);
    table_rule_ent_hash_to_str(&tmp, &replicate_do_table);
    protocol->store(&tmp);
    table_rule_ent_hash_to_str(&tmp, &replicate_ignore_table);
    protocol->store(&tmp);
    table_rule_ent_dynamic_array_to_str(&tmp, &replicate_wild_do_table);
    protocol->store(&tmp);
    table_rule_ent_dynamic_array_to_str(&tmp, &replicate_wild_ignore_table);
    protocol->store(&tmp);

2480
    protocol->store((uint32) mi->rli.last_slave_errno);
2481
    protocol->store(mi->rli.last_slave_error, &my_charset_bin);
2482
    protocol->store((uint32) mi->rli.slave_skip_counter);
2483
    protocol->store((ulonglong) mi->rli.group_master_log_pos);
2484
    protocol->store((ulonglong) mi->rli.log_space_total);
2485 2486 2487 2488 2489 2490 2491 2492

    protocol->store(
      mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_NONE ? "None": 
        ( mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_MASTER_POS? "Master":
          "Relay"), &my_charset_bin);
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
    protocol->store((ulonglong) mi->rli.until_log_pos);
    
unknown's avatar
unknown committed
2493 2494 2495 2496 2497 2498 2499 2500 2501 2502
#ifdef HAVE_OPENSSL 
    protocol->store(mi->ssl? "Yes":"No", &my_charset_bin);
#else
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
#endif
    protocol->store(mi->ssl_ca, &my_charset_bin);
    protocol->store(mi->ssl_capath, &my_charset_bin);
    protocol->store(mi->ssl_cert, &my_charset_bin);
    protocol->store(mi->ssl_cipher, &my_charset_bin);
    protocol->store(mi->ssl_key, &my_charset_bin);
unknown's avatar
unknown committed
2503

2504 2505 2506 2507 2508 2509
    /*
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
      connected, we can compute it otherwise show NULL (i.e. unknown).
    */
    if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) &&
        mi->rli.slave_running)
2510 2511 2512 2513 2514
    {
      long tmp= (long)((time_t)time((time_t*) 0)
                               - mi->rli.last_master_timestamp)
        - mi->clock_diff_with_master;
      /*
2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528
        Apparently on some systems tmp can be <0. Here are possible reasons
        related to MySQL:
        - the master is itself a slave of another master whose time is ahead.
        - somebody used an explicit SET TIMESTAMP on the master.
        Possible reason related to granularity-to-second of time functions
        (nothing to do with MySQL), which can explain a value of -1:
        assume the master's and slave's time are perfectly synchronized, and
        that at slave's connection time, when the master's timestamp is read,
        it is at the very end of second 1, and (a very short time later) when
        the slave's timestamp is read it is at the very beginning of second
        2. Then the recorded value for master is 1 and the recorded value for
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
        between timestamp of slave and rli->last_master_timestamp is 0
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
2529 2530 2531 2532
        This confuses users, so we don't go below 0: hence the max().

        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
        special marker to say "consider we have caught up".
2533
      */
2534 2535
      protocol->store((longlong)(mi->rli.last_master_timestamp ? max(0, tmp)
                                 : 0));
2536
    }
unknown's avatar
unknown committed
2537 2538 2539
    else
      protocol->store_null();

2540 2541
    pthread_mutex_unlock(&mi->rli.data_lock);
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2542
  
2543
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
unknown's avatar
unknown committed
2544
      DBUG_RETURN(TRUE);
2545
  }
2546
  send_eof(thd);
unknown's avatar
unknown committed
2547
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
2548 2549
}

2550

unknown's avatar
unknown committed
2551
bool flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache)
unknown's avatar
unknown committed
2552
{
unknown's avatar
unknown committed
2553
  IO_CACHE* file = &mi->file;
unknown's avatar
unknown committed
2554
  char lbuf[22];
2555 2556 2557
  DBUG_ENTER("flush_master_info");
  DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));

2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581
  /*
    Flush the relay log to disk. If we don't do it, then the relay log while
    have some part (its last kilobytes) in memory only, so if the slave server
    dies now, with, say, from master's position 100 to 150 in memory only (not
    on disk), and with position 150 in master.info, then when the slave
    restarts, the I/O thread will fetch binlogs from 150, so in the relay log
    we will have "[0, 100] U [150, infinity[" and nobody will notice it, so the
    SQL thread will jump from 100 to 150, and replication will silently break.

    When we come to this place in code, relay log may or not be initialized;
    the caller is responsible for setting 'flush_relay_log_cache' accordingly.
  */
  if (flush_relay_log_cache)
    flush_io_cache(mi->rli.relay_log.get_log_file());

  /*
    We flushed the relay log BEFORE the master.info file, because if we crash
    now, we will get a duplicate event in the relay log at restart. If we
    flushed in the other order, we would get a hole in the relay log.
    And duplicate is better than hole (with a duplicate, in later versions we
    can add detection and scrap one event; with a hole there's nothing we can
    do).
  */

unknown's avatar
unknown committed
2582 2583 2584 2585 2586 2587 2588 2589
  /*
     In certain cases this code may create master.info files that seems 
     corrupted, because of extra lines filled with garbage in the end 
     file (this happens if new contents take less space than previous 
     contents of file). But because of number of lines in the first line 
     of file we don't care about this garbage.
  */
  
unknown's avatar
unknown committed
2590
  my_b_seek(file, 0L);
unknown's avatar
unknown committed
2591 2592 2593
  my_b_printf(file, "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n",
	      LINES_IN_MASTER_INFO_WITH_SSL,
              mi->master_log_name, llstr(mi->master_log_pos, lbuf),
2594
	      mi->host, mi->user,
unknown's avatar
unknown committed
2595 2596 2597
	      mi->password, mi->port, mi->connect_retry,
              (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
              mi->ssl_cipher, mi->ssl_key);
unknown's avatar
unknown committed
2598
  flush_io_cache(file);
2599
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2600 2601
}

2602

unknown's avatar
unknown committed
2603
st_relay_log_info::st_relay_log_info()
2604 2605
  :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
   cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
unknown's avatar
unknown committed
2606 2607 2608
   ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0),
   abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0),
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
2609
   until_log_pos(0), retried_trans(0)
2610
{
unknown's avatar
unknown committed
2611 2612
  group_relay_log_name[0]= event_relay_log_name[0]=
    group_master_log_name[0]= 0;
2613
  last_slave_error[0]= until_log_name[0]= ign_master_log_name_end[0]= 0;
2614

unknown's avatar
unknown committed
2615 2616
  bzero((char*) &info_file, sizeof(info_file));
  bzero((char*) &cache_buf, sizeof(cache_buf));
2617
  cached_charset_invalidate();
unknown's avatar
unknown committed
2618 2619 2620 2621 2622 2623 2624
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
  pthread_cond_init(&data_cond, NULL);
  pthread_cond_init(&start_cond, NULL);
  pthread_cond_init(&stop_cond, NULL);
  pthread_cond_init(&log_space_cond, NULL);
unknown's avatar
unknown committed
2625
  relay_log.init_pthread_objects();
unknown's avatar
unknown committed
2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637
}


st_relay_log_info::~st_relay_log_info()
{
  pthread_mutex_destroy(&run_lock);
  pthread_mutex_destroy(&data_lock);
  pthread_mutex_destroy(&log_space_lock);
  pthread_cond_destroy(&data_cond);
  pthread_cond_destroy(&start_cond);
  pthread_cond_destroy(&stop_cond);
  pthread_cond_destroy(&log_space_cond);
2638
  relay_log.cleanup();
unknown's avatar
unknown committed
2639 2640
}

2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664
/*
  Waits until the SQL thread reaches (has executed up to) the
  log/position or timed out.

  SYNOPSIS
    wait_for_pos()
    thd             client thread that sent SELECT MASTER_POS_WAIT
    log_name        log name to wait for
    log_pos         position to wait for 
    timeout         timeout in seconds before giving up waiting

  NOTES
    timeout is longlong whereas it should be ulong ; but this is
    to catch if the user submitted a negative timeout.

  RETURN VALUES
    -2          improper arguments (log_pos<0)
                or slave not running, or master info changed
                during the function's execution,
                or client thread killed. -2 is translated to NULL by caller
    -1          timed out
    >=0         number of log events the function had to wait
                before reaching the desired log/position
 */
2665

2666
int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
2667 2668
                                    longlong log_pos,
                                    longlong timeout)
unknown's avatar
unknown committed
2669
{
2670 2671
  if (!inited)
    return -1;
unknown's avatar
unknown committed
2672
  int event_count = 0;
2673
  ulong init_abort_pos_wait;
2674 2675
  int error=0;
  struct timespec abstime; // for timeout checking
2676
  const char *msg;
2677
  DBUG_ENTER("wait_for_pos");
2678 2679
  DBUG_PRINT("enter",("log_name: '%s'  log_pos: %lu  timeout: %lu",
                      log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
2680

2681
  set_timespec(abstime,timeout);
2682
  pthread_mutex_lock(&data_lock);
2683 2684 2685
  msg= thd->enter_cond(&data_cond, &data_lock,
                       "Waiting for the slave SQL thread to "
                       "advance position");
2686
  /* 
unknown's avatar
unknown committed
2687 2688 2689 2690 2691 2692 2693 2694
     This function will abort when it notices that some CHANGE MASTER or
     RESET MASTER has changed the master info.
     To catch this, these commands modify abort_pos_wait ; We just monitor
     abort_pos_wait and see if it has changed.
     Why do we have this mechanism instead of simply monitoring slave_running
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
     the SQL thread be stopped?
     This is becasue if someones does:
2695
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
unknown's avatar
unknown committed
2696 2697
     the change may happen very quickly and we may not notice that
     slave_running briefly switches between 1/0/1.
2698
  */
2699
  init_abort_pos_wait= abort_pos_wait;
2700

2701
  /*
2702
    We'll need to
2703
    handle all possible log names comparisons (e.g. 999 vs 1000).
2704
    We use ulong for string->number conversion ; this is no
2705 2706 2707 2708
    stronger limitation than in find_uniq_filename in sql/log.cc
  */
  ulong log_name_extension;
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
2709 2710 2711

  strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));

2712 2713
  char *p= fn_ext(log_name_tmp);
  char *p_end;
2714
  if (!*p || log_pos<0)
2715 2716 2717 2718
  {
    error= -2; //means improper arguments
    goto err;
  }
2719 2720
  // Convert 0-3 to 4
  log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
unknown's avatar
unknown committed
2721
  /* p points to '.' */
2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733
  log_name_extension= strtoul(++p, &p_end, 10);
  /*
    p_end points to the first invalid character.
    If it equals to p, no digits were found, error.
    If it contains '\0' it means conversion went ok.
  */
  if (p_end==p || *p_end)
  {
    error= -2;
    goto err;
  }    

unknown's avatar
unknown committed
2734
  /* The "compare and wait" main loop */
2735
  while (!thd->killed &&
2736
         init_abort_pos_wait == abort_pos_wait &&
2737
         slave_running)
unknown's avatar
unknown committed
2738
  {
unknown's avatar
unknown committed
2739 2740
    bool pos_reached;
    int cmp_result= 0;
unknown's avatar
unknown committed
2741

2742 2743 2744 2745 2746 2747
    DBUG_PRINT("info",
               ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
                init_abort_pos_wait, abort_pos_wait));
    DBUG_PRINT("info",("group_master_log_name: '%s'  pos: %lu",
                       group_master_log_name, (ulong) group_master_log_pos));

2748
    /*
unknown's avatar
unknown committed
2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759
      group_master_log_name can be "", if we are just after a fresh
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
      (before we have executed one Rotate event from the master) or
      (rare) if the user is doing a weird slave setup (see next
      paragraph).  If group_master_log_name is "", we assume we don't
      have enough info to do the comparison yet, so we just wait until
      more data. In this case master_log_pos is always 0 except if
      somebody (wrongly) sets this slave to be a slave of itself
      without using --replicate-same-server-id (an unsupported
      configuration which does nothing), then group_master_log_pos
      will grow and group_master_log_name will stay "".
2760
    */
2761
    if (*group_master_log_name)
unknown's avatar
unknown committed
2762
    {
unknown's avatar
unknown committed
2763 2764
      char *basename= (group_master_log_name +
                       dirname_length(group_master_log_name));
unknown's avatar
unknown committed
2765
      /*
2766 2767 2768 2769
        First compare the parts before the extension.
        Find the dot in the master's log basename,
        and protect against user's input error :
        if the names do not match up to '.' included, return error
2770
      */
2771 2772 2773 2774 2775 2776 2777 2778
      char *q= (char*)(fn_ext(basename)+1);
      if (strncmp(basename, log_name_tmp, (int)(q-basename)))
      {
        error= -2;
        break;
      }
      // Now compare extensions.
      char *q_end;
2779 2780
      ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
      if (group_master_log_name_extension < log_name_extension)
2781
        cmp_result= -1 ;
2782
      else
2783
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
2784

unknown's avatar
unknown committed
2785
      pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
2786 2787
                    cmp_result > 0);
      if (pos_reached || thd->killed)
2788
        break;
unknown's avatar
unknown committed
2789
    }
2790 2791

    //wait for master update, with optional timeout.
2792
    
unknown's avatar
unknown committed
2793
    DBUG_PRINT("info",("Waiting for master update"));
2794 2795 2796 2797
    /*
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
      will wake us up.
    */
2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814
    if (timeout > 0)
    {
      /*
        Note that pthread_cond_timedwait checks for the timeout
        before for the condition ; i.e. it returns ETIMEDOUT 
        if the system time equals or exceeds the time specified by abstime
        before the condition variable is signaled or broadcast, _or_ if
        the absolute time specified by abstime has already passed at the time
        of the call.
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
        even if its condition is always immediately signaled (case of a loaded
        master).
      */
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
    }
    else
      pthread_cond_wait(&data_cond, &data_lock);
unknown's avatar
unknown committed
2815
    DBUG_PRINT("info",("Got signal of master update or timed out"));
unknown's avatar
unknown committed
2816
    if (error == ETIMEDOUT || error == ETIME)
2817 2818 2819 2820
    {
      error= -1;
      break;
    }
unknown's avatar
unknown committed
2821
    error=0;
2822
    event_count++;
2823
    DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
2824
  }
2825 2826

err:
unknown's avatar
unknown committed
2827
  thd->exit_cond(msg);
2828
  DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
unknown's avatar
unknown committed
2829
improper_arguments: %d  timed_out: %d",
unknown's avatar
SCRUM  
unknown committed
2830
                     thd->killed_errno(),
2831
                     (int) (init_abort_pos_wait != abort_pos_wait),
2832
                     (int) slave_running,
2833 2834 2835
                     (int) (error == -2),
                     (int) (error == -1)));
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
2836
      !slave_running) 
2837 2838 2839 2840
  {
    error= -2;
  }
  DBUG_RETURN( error ? error : event_count );
unknown's avatar
unknown committed
2841 2842
}

2843 2844 2845 2846
void set_slave_thread_options(THD* thd)
{
  thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) |
    OPTION_AUTO_IS_NULL;
2847
  thd->variables.completion_type= 0;
2848
}
2849

2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861
void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli)
{
  thd->variables.character_set_client=
    global_system_variables.character_set_client;
  thd->variables.collation_connection=
    global_system_variables.collation_connection;
  thd->variables.collation_server=
    global_system_variables.collation_server;
  thd->update_charset();
  rli->cached_charset_invalidate();
}

unknown's avatar
unknown committed
2862
/*
2863
  init_slave_thread()
unknown's avatar
unknown committed
2864
*/
2865

2866
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
unknown's avatar
unknown committed
2867 2868
{
  DBUG_ENTER("init_slave_thread");
2869 2870
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO; 
2871
  thd->security_ctx->skip_grants();
unknown's avatar
unknown committed
2872 2873
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
unknown's avatar
unknown committed
2874
  thd->net.read_timeout = slave_net_timeout;
2875
  thd->slave_thread = 1;
2876
  set_slave_thread_options(thd);
2877
  /* 
2878
     It's nonsense to constrain the slave threads with max_join_size; if a
2879 2880 2881 2882 2883 2884
     query succeeded on master, we HAVE to execute it. So set
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
     SELECT examining more than 4 billion rows would still fail (yes, because
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
     only for client threads.
2885
  */
2886 2887
  thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) |
    OPTION_AUTO_IS_NULL | OPTION_BIG_SELECTS;
unknown's avatar
unknown committed
2888
  thd->client_capabilities = CLIENT_LOCAL_FILES;
2889
  thd->real_id=pthread_self();
unknown's avatar
unknown committed
2890 2891 2892 2893
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

2894
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
2895
  {
2896 2897
    thd->cleanup();
    delete thd;
unknown's avatar
unknown committed
2898 2899 2900
    DBUG_RETURN(-1);
  }

unknown's avatar
unknown committed
2901
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
unknown's avatar
unknown committed
2902 2903 2904 2905 2906
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

2907
  if (thd_type == SLAVE_THD_SQL)
2908
    thd->proc_info= "Waiting for the next event in relay log";
2909
  else
2910
    thd->proc_info= "Waiting for master update";
unknown's avatar
unknown committed
2911 2912 2913 2914 2915
  thd->version=refresh_version;
  thd->set_time();
  DBUG_RETURN(0);
}

2916

2917 2918
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg)
unknown's avatar
unknown committed
2919
{
2920
  int nap_time;
unknown's avatar
unknown committed
2921 2922 2923 2924 2925
  thr_alarm_t alarmed;
  thr_alarm_init(&alarmed);
  time_t start_time= time((time_t*) 0);
  time_t end_time= start_time+sec;

2926
  while ((nap_time= (int) (end_time - start_time)) > 0)
unknown's avatar
unknown committed
2927
  {
2928
    ALARM alarm_buff;
unknown's avatar
unknown committed
2929
    /*
2930
      The only reason we are asking for alarm is so that
unknown's avatar
unknown committed
2931 2932 2933
      we will be woken up in case of murder, so if we do not get killed,
      set the alarm so it goes off after we wake up naturally
    */
2934
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
unknown's avatar
unknown committed
2935
    sleep(nap_time);
2936
    thr_end_alarm(&alarmed);
unknown's avatar
unknown committed
2937
    
2938
    if ((*thread_killed)(thd,thread_killed_arg))
unknown's avatar
unknown committed
2939 2940 2941 2942 2943 2944
      return 1;
    start_time=time((time_t*) 0);
  }
  return 0;
}

2945

unknown's avatar
unknown committed
2946 2947
static int request_dump(MYSQL* mysql, MASTER_INFO* mi,
			bool *suppress_warnings)
unknown's avatar
unknown committed
2948
{
2949
  char buf[FN_REFLEN + 10];
unknown's avatar
unknown committed
2950 2951
  int len;
  int binlog_flags = 0; // for now
2952
  char* logname = mi->master_log_name;
2953 2954
  DBUG_ENTER("request_dump");

unknown's avatar
unknown committed
2955
  // TODO if big log files: Change next to int8store()
unknown's avatar
unknown committed
2956
  int4store(buf, (ulong) mi->master_log_pos);
unknown's avatar
unknown committed
2957
  int2store(buf + 4, binlog_flags);
2958
  int4store(buf + 6, server_id);
unknown's avatar
unknown committed
2959
  len = (uint) strlen(logname);
2960
  memcpy(buf + 10, logname,len);
unknown's avatar
SCRUM  
unknown committed
2961
  if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
unknown's avatar
unknown committed
2962
  {
unknown's avatar
unknown committed
2963 2964 2965 2966 2967
    /*
      Something went wrong, so we will just reconnect and retry later
      in the future, we should do a better error analysis, but for
      now we just fill up the error log :-)
    */
unknown's avatar
SCRUM  
unknown committed
2968
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
unknown's avatar
unknown committed
2969 2970
      *suppress_warnings= 1;			// Suppress reconnect warning
    else
2971
      sql_print_error("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs",
unknown's avatar
SCRUM  
unknown committed
2972
		      mysql_errno(mysql), mysql_error(mysql),
2973 2974
		      master_connect_retry);
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2975
  }
unknown's avatar
unknown committed
2976

2977
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2978 2979
}

2980

2981
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
unknown's avatar
unknown committed
2982 2983 2984
{
  char buf[1024];
  char * p = buf;
unknown's avatar
unknown committed
2985 2986
  uint table_len = (uint) strlen(table);
  uint db_len = (uint) strlen(db);
unknown's avatar
unknown committed
2987
  if (table_len + db_len > sizeof(buf) - 2)
unknown's avatar
unknown committed
2988 2989 2990 2991
  {
    sql_print_error("request_table_dump: Buffer overrun");
    return 1;
  } 
unknown's avatar
unknown committed
2992 2993 2994 2995 2996 2997 2998
  
  *p++ = db_len;
  memcpy(p, db, db_len);
  p += db_len;
  *p++ = table_len;
  memcpy(p, table, table_len);
  
unknown's avatar
SCRUM  
unknown committed
2999
  if (simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
unknown's avatar
unknown committed
3000 3001
  {
    sql_print_error("request_table_dump: Error sending the table dump \
unknown's avatar
unknown committed
3002
command");
unknown's avatar
unknown committed
3003 3004
    return 1;
  }
unknown's avatar
unknown committed
3005 3006 3007 3008

  return 0;
}

3009

unknown's avatar
unknown committed
3010
/*
3011
  Read one event from the master
unknown's avatar
unknown committed
3012 3013 3014 3015 3016 3017 3018 3019 3020
  
  SYNOPSIS
    read_event()
    mysql		MySQL connection
    mi			Master connection information
    suppress_warnings	TRUE when a normal net read timeout has caused us to
			try a reconnect.  We do not want to print anything to
			the error log in this case because this a anormal
			event in an idle server.
3021

unknown's avatar
unknown committed
3022 3023 3024 3025 3026 3027
    RETURN VALUES
    'packet_error'	Error
    number		Length of packet
*/

static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
unknown's avatar
unknown committed
3028
{
3029
  ulong len;
unknown's avatar
unknown committed
3030

3031
  *suppress_warnings= 0;
unknown's avatar
unknown committed
3032 3033 3034
  /*
    my_real_read() will time us out
    We check if we were told to die, and if not, try reading again
3035 3036

    TODO:  Move 'events_till_disconnect' to the MASTER_INFO structure
unknown's avatar
unknown committed
3037
  */
unknown's avatar
unknown committed
3038
#ifndef DBUG_OFF
unknown's avatar
unknown committed
3039
  if (disconnect_slave_event_count && !(events_till_disconnect--))
unknown's avatar
unknown committed
3040 3041 3042
    return packet_error;      
#endif
  
unknown's avatar
SCRUM  
unknown committed
3043
  len = net_safe_read(mysql);
3044
  if (len == packet_error || (long) len < 1)
unknown's avatar
unknown committed
3045
  {
unknown's avatar
SCRUM  
unknown committed
3046
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
unknown's avatar
unknown committed
3047 3048 3049 3050 3051 3052 3053 3054 3055
    {
      /*
	We are trying a normal reconnect after a read timeout;
	we suppress prints to .err file as long as the reconnect
	happens without problems
      */
      *suppress_warnings= TRUE;
    }
    else
3056
      sql_print_error("Error reading packet from server: %s ( server_errno=%d)",
unknown's avatar
SCRUM  
unknown committed
3057
		      mysql_error(mysql), mysql_errno(mysql));
unknown's avatar
unknown committed
3058 3059 3060
    return packet_error;
  }

3061 3062
  /* Check if eof packet */
  if (len < 8 && mysql->net.read_pos[0] == 254)
unknown's avatar
unknown committed
3063
  {
3064 3065
    sql_print_information("Slave: received end packet from server, apparent "
                          "master shutdown: %s",
unknown's avatar
SCRUM  
unknown committed
3066
		     mysql_error(mysql));
unknown's avatar
unknown committed
3067
     return packet_error;
unknown's avatar
unknown committed
3068
  }
unknown's avatar
unknown committed
3069 3070
  
  DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
3071
		      len, mysql->net.read_pos[4]));
unknown's avatar
unknown committed
3072 3073 3074
  return len - 1;   
}

unknown's avatar
unknown committed
3075

3076
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
3077
{
unknown's avatar
unknown committed
3078 3079 3080 3081 3082 3083 3084 3085 3086
  switch (expected_error) {
  case ER_NET_READ_ERROR:
  case ER_NET_ERROR_ON_WRITE:  
  case ER_SERVER_SHUTDOWN:  
  case ER_NEW_ABORTING_CONNECTION:
    return 1;
  default:
    return 0;
  }
3087
}
3088

3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136
/*
     Check if condition stated in UNTIL clause of START SLAVE is reached.
   SYNOPSYS
     st_relay_log_info::is_until_satisfied()
   DESCRIPTION
     Checks if UNTIL condition is reached. Uses caching result of last 
     comparison of current log file name and target log file name. So cached 
     value should be invalidated if current log file name changes 
     (see st_relay_log_info::notify_... functions).
     
     This caching is needed to avoid of expensive string comparisons and 
     strtol() conversions needed for log names comparison. We don't need to
     compare them each time this function is called, we only need to do this 
     when current log name changes. If we have UNTIL_MASTER_POS condition we 
     need to do this only after Rotate_log_event::exec_event() (which is 
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS 
     condition then we should invalidate cached comarison value after 
     inc_group_relay_log_pos() which called for each group of events (so we
     have some benefit if we have something like queries that use 
     autoincrement or if we have transactions).
     
     Should be called ONLY if until_condition != UNTIL_NONE !
   RETURN VALUE
     true - condition met or error happened (condition seems to have 
            bad log file name)
     false - condition not met
*/

bool st_relay_log_info::is_until_satisfied()
{
  const char *log_name;
  ulonglong log_pos;

  DBUG_ASSERT(until_condition != UNTIL_NONE);
  
  if (until_condition == UNTIL_MASTER_POS)
  {
    log_name= group_master_log_name;
    log_pos= group_master_log_pos;
  }
  else
  { /* until_condition == UNTIL_RELAY_POS */
    log_name= group_relay_log_name;
    log_pos= group_relay_log_pos;
  }
  
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
  {
3137 3138 3139 3140 3141 3142
    /*
      We have no cached comparison results so we should compare log names
      and cache result.
      If we are after RESET SLAVE, and the SQL slave thread has not processed
      any event yet, it could be that group_master_log_name is "". In that case,
      just wait for more events (as there is no sensible comparison to do).
3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166
    */

    if (*log_name)
    {
      const char *basename= log_name + dirname_length(log_name);
      
      const char *q= (const char*)(fn_ext(basename)+1);
      if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
      {
        /* Now compare extensions. */
        char *q_end;
        ulong log_name_extension= strtoul(q, &q_end, 10);
        if (log_name_extension < until_log_name_extension)
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
        else
          until_log_names_cmp_result= 
            (log_name_extension > until_log_name_extension) ? 
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
      }
      else  
      {
        /* Probably error so we aborting */
        sql_print_error("Slave SQL thread is stopped because UNTIL "
                        "condition is bad.");
unknown's avatar
unknown committed
3167
        return TRUE;
3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178
      }
    }
    else
      return until_log_pos == 0;
  }
    
  return ((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL && 
           log_pos >= until_log_pos) ||
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER);
}

3179

3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197
void st_relay_log_info::cached_charset_invalidate()
{
  /* Full of zeroes means uninitialized. */
  bzero(cached_charset, sizeof(cached_charset));
}


bool st_relay_log_info::cached_charset_compare(char *charset)
{
  if (bcmp(cached_charset, charset, sizeof(cached_charset)))
  {
    memcpy(cached_charset, charset, sizeof(cached_charset));
    return 1;
  }
  return 0;
}


3198
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
unknown's avatar
unknown committed
3199
{
3200 3201
  /*
     We acquire this mutex since we need it for all operations except
3202
     event execution. But we will release it in places where we will
3203 3204 3205
     wait for something for example inside of next_event().
   */
  pthread_mutex_lock(&rli->data_lock);
3206 3207 3208 3209 3210 3211 3212 3213
  /*
    This tests if the position of the end of the last previous executed event
    hits the UNTIL barrier.
    We would prefer to test if the position of the start (or possibly) end of
    the to-be-read event hits the UNTIL barrier, this is different if there
    was an event ignored by the I/O thread just before (BUG#13861 to be
    fixed).
  */
3214 3215
  if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE &&
      rli->is_until_satisfied())
3216
  {
3217
    char buf[22];
3218
    sql_print_error("Slave SQL thread stopped because it reached its"
3219
                    " UNTIL position %s", llstr(rli->until_pos(), buf));
3220
    /*
3221 3222 3223 3224 3225 3226 3227
      Setting abort_slave flag because we do not want additional message about
      error in query execution to be printed.
    */
    rli->abort_slave= 1;
    pthread_mutex_unlock(&rli->data_lock);
    return 1;
  }
3228

3229
  Log_event * ev = next_event(rli);
3230

3231
  DBUG_ASSERT(rli->sql_thd==thd);
3232

3233
  if (sql_slave_killed(thd,rli))
3234
  {
3235
    pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
3236
    delete ev;
3237
    return 1;
3238
  }
3239 3240
  if (ev)
  {
3241
    int type_code = ev->get_type_code();
3242
    int exec_res;
3243 3244

    /*
3245 3246 3247 3248 3249 3250 3251
      Queries originating from this server must be skipped.
      Low-level events (Format_desc, Rotate, Stop) from this server
      must also be skipped. But for those we don't want to modify
      group_master_log_pos, because these events did not exist on the master.
      Format_desc is not completely skipped.
      Skip queries specified by the user in slave_skip_counter.
      We can't however skip events that has something to do with the
3252
      log files themselves.
3253 3254 3255
      Filtering on own server id is extremely important, to ignore execution of
      events created by the creation/rotation of the relay log (remember that
      now the relay log starts with its Format_desc, has a Rotate etc).
3256
    */
3257

3258
    DBUG_PRINT("info",("type_code=%d, server_id=%d",type_code,ev->server_id));
3259

3260
    if ((ev->server_id == (uint32) ::server_id &&
unknown's avatar
unknown committed
3261
         !replicate_same_server_id &&
3262
         type_code != FORMAT_DESCRIPTION_EVENT) ||
3263
        (rli->slave_skip_counter &&
3264 3265
         type_code != ROTATE_EVENT && type_code != STOP_EVENT &&
         type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT))
unknown's avatar
unknown committed
3266
    {
3267
      DBUG_PRINT("info", ("event skipped"));
3268 3269 3270 3271
      if (thd->options & OPTION_BEGIN)
        rli->inc_event_relay_log_pos();
      else
      {
3272
        rli->inc_group_relay_log_pos((type_code == ROTATE_EVENT ||
3273 3274 3275 3276 3277 3278
                                      type_code == STOP_EVENT ||
                                      type_code == FORMAT_DESCRIPTION_EVENT) ?
                                     LL(0) : ev->log_pos,
                                     1/* skip lock*/);
        flush_relay_log_info(rli);
      }
3279

unknown's avatar
unknown committed
3280
      /*
3281 3282 3283
        Protect against common user error of setting the counter to 1
        instead of 2 while recovering from an insert which used auto_increment,
        rand or user var.
unknown's avatar
unknown committed
3284
      */
3285 3286 3287
      if (rli->slave_skip_counter &&
          !((type_code == INTVAR_EVENT ||
             type_code == RAND_EVENT ||
3288
             type_code == USER_VAR_EVENT) &&
3289
            rli->slave_skip_counter == 1) &&
3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301
          /*
            The events from ourselves which have something to do with the relay
            log itself must be skipped, true, but they mustn't decrement
            rli->slave_skip_counter, because the user is supposed to not see
            these events (they are not in the master's binlog) and if we
            decremented, START SLAVE would for example decrement when it sees
            the Rotate, so the event which the user probably wanted to skip
            would not be skipped.
          */
          !(ev->server_id == (uint32) ::server_id &&
            (type_code == ROTATE_EVENT || type_code == STOP_EVENT ||
             type_code == START_EVENT_V3 || type_code == FORMAT_DESCRIPTION_EVENT)))
3302 3303
        --rli->slave_skip_counter;
      pthread_mutex_unlock(&rli->data_lock);
3304 3305 3306
      delete ev;
      return 0;                                 // avoid infinite update loops
    }
3307
    pthread_mutex_unlock(&rli->data_lock);
3308

3309
    thd->server_id = ev->server_id; // use the original server id for logging
unknown's avatar
unknown committed
3310
    thd->set_time();				// time the query
unknown's avatar
unknown committed
3311
    thd->lex->current_select= 0;
unknown's avatar
unknown committed
3312
    if (!ev->when)
unknown's avatar
unknown committed
3313
      ev->when = time(NULL);
3314
    ev->thd = thd;
3315 3316
    exec_res = ev->exec_event(rli);
    DBUG_ASSERT(rli->sql_thd==thd);
3317
    /*
3318 3319 3320 3321 3322 3323 3324 3325 3326
       Format_description_log_event should not be deleted because it will be
       used to read info about the relay log's format; it will be deleted when
       the SQL thread does not need it, i.e. when this thread terminates.
    */
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
    {
      DBUG_PRINT("info", ("Deleting the event after it has been executed"));
      delete ev;
    }
3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347
    if (slave_trans_retries)
    {
      if (exec_res &&
          (thd->net.last_errno == ER_LOCK_DEADLOCK ||
           thd->net.last_errno == ER_LOCK_WAIT_TIMEOUT) &&
          !thd->is_fatal_error)
      {
        const char *errmsg;
        /*
          We were in a transaction which has been rolled back because of a
          deadlock (currently, InnoDB deadlock detected by InnoDB) or lock
          wait timeout (innodb_lock_wait_timeout exceeded); let's seek back to
          BEGIN log event and retry it all again.
          We have to not only seek but also
          a) init_master_info(), to seek back to hot relay log's start for later
          (for when we will come back to this hot log after re-processing the
          possibly existing old logs where BEGIN is: check_binlog_magic() will
          then need the cache to be at position 0 (see comments at beginning of
          init_master_info()).
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
        */
3348
        if (rli->trans_retries < slave_trans_retries)
3349 3350 3351 3352 3353 3354
        {
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
            sql_print_error("Failed to initialize the master info structure");
          else if (init_relay_log_pos(rli,
                                      rli->group_relay_log_name,
                                      rli->group_relay_log_pos,
3355
                                      1, &errmsg, 1))
3356 3357 3358 3359 3360
            sql_print_error("Error initializing relay log position: %s",
                            errmsg);
          else
          {
            exec_res= 0;
3361 3362 3363 3364 3365 3366 3367 3368 3369 3370
	    /* chance for concurrent connection to get more locks */
            safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
		       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
	    rli->trans_retries++;
            rli->retried_trans++;
            pthread_mutex_unlock(&rli->data_lock);
            DBUG_PRINT("info", ("Slave retries transaction "
                                "rli->trans_retries: %lu", rli->trans_retries));
	  }
3371 3372 3373 3374 3375 3376 3377 3378
        }
        else
          sql_print_error("Slave SQL thread retried transaction %lu time(s) "
                          "in vain, giving up. Consider raising the value of "
                          "the slave_transaction_retries variable.",
                          slave_trans_retries);
      }
      if (!((thd->options & OPTION_BEGIN) && opt_using_transactions))
3379 3380
         rli->trans_retries= 0; // restart from fresh
     }
3381
    return exec_res;
3382
  }
unknown's avatar
unknown committed
3383
  else
3384
  {
3385
    pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
3386
    slave_print_error(rli, 0, "\
3387 3388 3389 3390 3391 3392 3393
Could not parse relay log event entry. The possible reasons are: the master's \
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
binary log), the slave's relay log is corrupted (you can check this by running \
'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
or slave's MySQL code. If you want to check the master's binary log or slave's \
relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
on this slave.\
unknown's avatar
unknown committed
3394
");
3395 3396
    return 1;
  }
unknown's avatar
unknown committed
3397 3398
}

3399

unknown's avatar
unknown committed
3400
/* Slave I/O Thread entry point */
3401

3402
pthread_handler_t handle_slave_io(void *arg)
unknown's avatar
unknown committed
3403
{
unknown's avatar
unknown committed
3404 3405
  THD *thd; // needs to be first for thread_stack
  MYSQL *mysql;
3406
  MASTER_INFO *mi = (MASTER_INFO*)arg;
3407
  RELAY_LOG_INFO *rli= &mi->rli;
unknown's avatar
unknown committed
3408 3409
  char llbuff[22];
  uint retry_count;
3410

unknown's avatar
unknown committed
3411 3412
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
3413
  DBUG_ENTER("handle_slave_io");
unknown's avatar
unknown committed
3414

unknown's avatar
unknown committed
3415
#ifndef DBUG_OFF
unknown's avatar
unknown committed
3416
slave_begin:
3417
#endif
3418
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
3419 3420 3421
  mysql= NULL ;
  retry_count= 0;

3422
  pthread_mutex_lock(&mi->run_lock);
unknown's avatar
unknown committed
3423 3424 3425
  /* Inform waiting threads that slave has started */
  mi->slave_run_id++;

3426
#ifndef DBUG_OFF
3427
  mi->events_till_abort = abort_slave_event_count;
3428 3429
#endif

3430
  thd= new THD; // note that contructor of THD uses DBUG_ !
3431
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
3432 3433

  pthread_detach_this_thread();
3434
  if (init_slave_thread(thd, SLAVE_THD_IO))
unknown's avatar
unknown committed
3435 3436 3437 3438 3439 3440
  {
    pthread_cond_broadcast(&mi->start_cond);
    pthread_mutex_unlock(&mi->run_lock);
    sql_print_error("Failed during slave I/O thread initialization");
    goto err;
  }
3441
  mi->io_thd = thd;
unknown's avatar
unknown committed
3442
  thd->thread_stack = (char*)&thd; // remember where our stack is
3443
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
3444
  threads.append(thd);
3445
  pthread_mutex_unlock(&LOCK_thread_count);
3446 3447 3448
  mi->slave_running = 1;
  mi->abort_slave = 0;
  pthread_mutex_unlock(&mi->run_lock);
3449
  pthread_cond_broadcast(&mi->start_cond);
3450

3451 3452 3453
  DBUG_PRINT("master_info",("log_file_name: '%s'  position: %s",
			    mi->master_log_name,
			    llstr(mi->master_log_pos,llbuff)));
3454

unknown's avatar
SCRUM  
unknown committed
3455
  if (!(mi->mysql = mysql = mysql_init(NULL)))
unknown's avatar
unknown committed
3456
  {
unknown's avatar
unknown committed
3457
    sql_print_error("Slave I/O thread: error in mysql_init()");
unknown's avatar
unknown committed
3458 3459
    goto err;
  }
3460

3461
  thd->proc_info = "Connecting to master";
3462
  // we can get killed during safe_connect
3463
  if (!safe_connect(thd, mysql, mi))
3464
    sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',\
3465
  replication started in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
3466 3467 3468
		    mi->host, mi->port,
		    IO_RPL_LOG_NAME,
		    llstr(mi->master_log_pos,llbuff));
3469
  else
unknown's avatar
unknown committed
3470
  {
3471
    sql_print_information("Slave I/O thread killed while connecting to master");
unknown's avatar
unknown committed
3472 3473
    goto err;
  }
3474

3475
connected:
3476

3477 3478
  // TODO: the assignment below should be under mutex (5.0)
  mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
3479
  thd->slave_net = &mysql->net;
3480
  thd->proc_info = "Checking master version";
unknown's avatar
unknown committed
3481
  if (get_master_version_and_clock(mysql, mi))
3482
    goto err;
3483 3484

  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
3485
  {
unknown's avatar
unknown committed
3486 3487 3488 3489 3490
    /*
      Register ourselves with the master.
      If fails, this is not fatal - we just print the error message and go
      on with life.
    */
3491
    thd->proc_info = "Registering slave on master";
3492
    if (register_slave_on_master(mysql) ||  update_slave_list(mysql, mi))
3493 3494
      goto err;
  }
3495

3496
  DBUG_PRINT("info",("Starting reading binary log from master"));
3497
  while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3498
  {
3499
    bool suppress_warnings= 0;
unknown's avatar
unknown committed
3500
    thd->proc_info = "Requesting binlog dump";
unknown's avatar
unknown committed
3501
    if (request_dump(mysql, mi, &suppress_warnings))
unknown's avatar
unknown committed
3502 3503
    {
      sql_print_error("Failed on request_dump()");
unknown's avatar
unknown committed
3504
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3505
      {
3506
	sql_print_information("Slave I/O thread killed while requesting master \
unknown's avatar
unknown committed
3507
dump");
unknown's avatar
unknown committed
3508 3509
	goto err;
      }
3510

3511
      mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
3512
      thd->proc_info= "Waiting to reconnect after a failed binlog dump request";
3513 3514 3515
#ifdef SIGNAL_WITH_VIO_CLOSE
      thd->clear_active_vio();
#endif
unknown's avatar
SCRUM  
unknown committed
3516
      end_server(mysql);
unknown's avatar
unknown committed
3517 3518 3519 3520 3521
      /*
	First time retry immediately, assuming that we can recover
	right away - if first time fails, sleep between re-tries
	hopefuly the admin can fix the problem sometime
      */
3522 3523 3524 3525
      if (retry_count++)
      {
	if (retry_count > master_retry_count)
	  goto err;				// Don't retry forever
3526 3527
	safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
		   (void*)mi);
3528
      }
3529
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3530
      {
3531
	sql_print_information("Slave I/O thread killed while retrying master \
unknown's avatar
unknown committed
3532
dump");
unknown's avatar
unknown committed
3533 3534
	goto err;
      }
unknown's avatar
unknown committed
3535

3536
      thd->proc_info = "Reconnecting after a failed binlog dump request";
unknown's avatar
unknown committed
3537 3538
      if (!suppress_warnings)
	sql_print_error("Slave I/O thread: failed dump request, \
3539
reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
3540 3541 3542
			llstr(mi->master_log_pos,llbuff));
      if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	  io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3543
      {
3544
	sql_print_information("Slave I/O thread killed during or \
3545
after reconnect");
unknown's avatar
unknown committed
3546 3547
	goto err;
      }
unknown's avatar
unknown committed
3548

unknown's avatar
unknown committed
3549 3550
      goto connected;
    }
unknown's avatar
unknown committed
3551

3552
    while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3553
    {
3554 3555
      bool suppress_warnings= 0;
      /*
3556
         We say "waiting" because read_event() will wait if there's nothing to
3557 3558 3559
         read. But if there's something to read, it will not wait. The
         important thing is to not confuse users by saying "reading" whereas
         we're in fact receiving nothing.
3560 3561
      */
      thd->proc_info = "Waiting for master to send event";
unknown's avatar
unknown committed
3562
      ulong event_len = read_event(mysql, mi, &suppress_warnings);
3563
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3564
      {
3565
	if (global_system_variables.log_warnings)
3566
	  sql_print_information("Slave I/O thread killed while reading event");
unknown's avatar
unknown committed
3567 3568
	goto err;
      }
3569

unknown's avatar
unknown committed
3570 3571
      if (event_len == packet_error)
      {
unknown's avatar
SCRUM  
unknown committed
3572
	uint mysql_error_number= mysql_errno(mysql);
3573
	if (mysql_error_number == ER_NET_PACKET_TOO_LARGE)
unknown's avatar
unknown committed
3574
	{
3575 3576 3577 3578
	  sql_print_error("\
Log entry on master is longer than max_allowed_packet (%ld) on \
slave. If the entry is correct, restart the server with a higher value of \
max_allowed_packet",
unknown's avatar
unknown committed
3579
			  thd->variables.max_allowed_packet);
unknown's avatar
unknown committed
3580 3581
	  goto err;
	}
3582 3583 3584
	if (mysql_error_number == ER_MASTER_FATAL_ERROR_READING_BINLOG)
	{
	  sql_print_error(ER(mysql_error_number), mysql_error_number,
unknown's avatar
SCRUM  
unknown committed
3585
			  mysql_error(mysql));
3586 3587
	  goto err;
	}
3588
        mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
unknown's avatar
unknown committed
3589
	thd->proc_info = "Waiting to reconnect after a failed master event read";
3590 3591 3592
#ifdef SIGNAL_WITH_VIO_CLOSE
        thd->clear_active_vio();
#endif
unknown's avatar
SCRUM  
unknown committed
3593
	end_server(mysql);
3594 3595 3596 3597
	if (retry_count++)
	{
	  if (retry_count > master_retry_count)
	    goto err;				// Don't retry forever
3598
	  safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3599
		     (void*) mi);
3600
	}
3601
	if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3602
	{
3603
	  if (global_system_variables.log_warnings)
3604
	    sql_print_information("Slave I/O thread killed while waiting to \
unknown's avatar
unknown committed
3605
reconnect after a failed read");
unknown's avatar
unknown committed
3606 3607
	  goto err;
	}
3608
	thd->proc_info = "Reconnecting after a failed master event read";
unknown's avatar
unknown committed
3609
	if (!suppress_warnings)
3610
	  sql_print_information("Slave I/O thread: Failed reading log event, \
3611
reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
3612 3613 3614
			  llstr(mi->master_log_pos, llbuff));
	if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	    io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3615
	{
3616
	  if (global_system_variables.log_warnings)
3617
	    sql_print_information("Slave I/O thread killed during or after a \
unknown's avatar
unknown committed
3618
reconnect done to recover from failed read");
unknown's avatar
unknown committed
3619 3620 3621
	  goto err;
	}
	goto connected;
unknown's avatar
unknown committed
3622
      } // if (event_len == packet_error)
3623

3624
      retry_count=0;			// ok event, reset retry counter
3625
      thd->proc_info = "Queueing master event to the relay log";
unknown's avatar
unknown committed
3626 3627 3628
      if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
		      event_len))
      {
3629
	sql_print_error("Slave I/O thread could not queue event from master");
unknown's avatar
unknown committed
3630 3631
	goto err;
      }
unknown's avatar
unknown committed
3632
      flush_master_info(mi, 1); /* sure that we can flush the relay log */
3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644
      /*
        See if the relay logs take too much space.
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
        and does not introduce any problem:
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
        the clean value is 0), then we are reading only one more event as we
        should, and we'll block only at the next event. No big deal.
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
        the clean value is 1), then we are going into wait_for_relay_log_space()
        for no reason, but this function will do a clean read, notice the clean
        value and exit immediately.
      */
3645 3646 3647 3648 3649
#ifndef DBUG_OFF
      {
        char llbuf1[22], llbuf2[22];
        DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \
ignore_log_space_limit=%d",
3650 3651 3652
                            llstr(rli->log_space_limit,llbuf1),
                            llstr(rli->log_space_total,llbuf2),
                            (int) rli->ignore_log_space_limit)); 
3653 3654 3655
      }
#endif

3656 3657 3658 3659
      if (rli->log_space_limit && rli->log_space_limit <
	  rli->log_space_total &&
          !rli->ignore_log_space_limit)
	if (wait_for_relay_log_space(rli))
unknown's avatar
unknown committed
3660 3661 3662 3663 3664
	{
	  sql_print_error("Slave I/O thread aborted while waiting for relay \
log space");
	  goto err;
	}
unknown's avatar
unknown committed
3665
      // TODO: check debugging abort code
3666
#ifndef DBUG_OFF
unknown's avatar
unknown committed
3667 3668 3669 3670 3671
      if (abort_slave_event_count && !--events_till_abort)
      {
	sql_print_error("Slave I/O thread: debugging abort");
	goto err;
      }
3672
#endif
3673
    } 
3674
  }
unknown's avatar
unknown committed
3675

unknown's avatar
unknown committed
3676
  // error = 0;
unknown's avatar
unknown committed
3677
err:
3678
  // print the current replication position
3679
  sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
3680
		  IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
3681
  VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
3682
  thd->query = thd->db = 0; // extra safety
3683
  thd->query_length= thd->db_length= 0;
3684
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
3685 3686
  if (mysql)
  {
unknown's avatar
SCRUM  
unknown committed
3687
    mysql_close(mysql);
unknown's avatar
unknown committed
3688 3689
    mi->mysql=0;
  }
3690
  write_ignored_events_info_to_relay_log(thd, mi);
unknown's avatar
unknown committed
3691
  thd->proc_info = "Waiting for slave mutex on exit";
3692 3693 3694
  pthread_mutex_lock(&mi->run_lock);
  mi->slave_running = 0;
  mi->io_thd = 0;
3695 3696 3697
  /* Forget the relay log's format */
  delete mi->rli.relay_log.description_event_for_queue;
  mi->rli.relay_log.description_event_for_queue= 0;
3698
  // TODO: make rpl_status part of MASTER_INFO
3699
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
3700 3701
  mi->abort_slave = 0; // TODO: check if this is needed
  DBUG_ASSERT(thd->net.buff != 0);
unknown's avatar
unknown committed
3702
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
unknown's avatar
unknown committed
3703
  close_thread_tables(thd, 0);
3704
  pthread_mutex_lock(&LOCK_thread_count);
3705
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
3706
  delete thd;
3707
  pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
3708
  pthread_cond_broadcast(&mi->stop_cond);	// tell the world we are done
3709
  pthread_mutex_unlock(&mi->run_lock);
unknown's avatar
unknown committed
3710
#ifndef DBUG_OFF
unknown's avatar
unknown committed
3711
  if (abort_slave_event_count && !events_till_abort)
unknown's avatar
unknown committed
3712
    goto slave_begin;
3713
#endif
unknown's avatar
unknown committed
3714
  my_thread_end();
unknown's avatar
unknown committed
3715 3716 3717 3718
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}

unknown's avatar
unknown committed
3719

unknown's avatar
unknown committed
3720
/* Slave SQL Thread entry point */
unknown's avatar
unknown committed
3721

3722
pthread_handler_t handle_slave_sql(void *arg)
3723
{
3724
  THD *thd;			/* needs to be first for thread_stack */
3725
  char llbuff[22],llbuff1[22];
3726
  RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli;
unknown's avatar
unknown committed
3727 3728 3729 3730
  const char *errmsg;

  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
3731
  DBUG_ENTER("handle_slave_sql");
unknown's avatar
unknown committed
3732 3733

#ifndef DBUG_OFF
3734
slave_begin:
unknown's avatar
unknown committed
3735 3736
#endif  

3737 3738 3739
  DBUG_ASSERT(rli->inited);
  pthread_mutex_lock(&rli->run_lock);
  DBUG_ASSERT(!rli->slave_running);
unknown's avatar
unknown committed
3740
  errmsg= 0;
3741 3742 3743
#ifndef DBUG_OFF  
  rli->events_till_abort = abort_slave_event_count;
#endif  
3744

unknown's avatar
unknown committed
3745
  thd = new THD; // note that contructor of THD uses DBUG_ !
3746 3747
  thd->thread_stack = (char*)&thd; // remember where our stack is
  
unknown's avatar
unknown committed
3748 3749 3750
  /* Inform waiting threads that slave has started */
  rli->slave_run_id++;

3751 3752
  pthread_detach_this_thread();
  if (init_slave_thread(thd, SLAVE_THD_SQL))
unknown's avatar
unknown committed
3753 3754 3755 3756 3757 3758 3759 3760 3761 3762
  {
    /*
      TODO: this is currently broken - slave start and change master
      will be stuck if we fail here
    */
    pthread_cond_broadcast(&rli->start_cond);
    pthread_mutex_unlock(&rli->run_lock);
    sql_print_error("Failed during slave thread initialization");
    goto err;
  }
3763
  thd->init_for_queries();
3764
  rli->sql_thd= thd;
3765
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
3766
  pthread_mutex_lock(&LOCK_thread_count);
3767
  threads.append(thd);
3768
  pthread_mutex_unlock(&LOCK_thread_count);
3769 3770 3771 3772 3773 3774 3775 3776
  /*
    We are going to set slave_running to 1. Assuming slave I/O thread is
    alive and connected, this is going to make Seconds_Behind_Master be 0
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
    the moment we start we can think we are caught up, and the next second we
    start receiving data so we realize we are not caught up and
    Seconds_Behind_Master grows. No big deal.
  */
3777 3778 3779
  rli->slave_running = 1;
  rli->abort_slave = 0;
  pthread_mutex_unlock(&rli->run_lock);
3780
  pthread_cond_broadcast(&rli->start_cond);
3781

unknown's avatar
unknown committed
3782 3783 3784
  /*
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
    thread may execute no Query_log_event, so the error will remain even
unknown's avatar
unknown committed
3785
    though there's no problem anymore). Do not reset the master timestamp
3786 3787 3788 3789
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
    as we are not sure that we are going to receive a query, we want to
    remember the last master timestamp (to say how many seconds behind we are
    now.
unknown's avatar
unknown committed
3790
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
unknown's avatar
unknown committed
3791
  */
unknown's avatar
unknown committed
3792
  clear_slave_error(rli);
3793 3794

  //tell the I/O thread to take relay_log_space_limit into account from now on
3795
  pthread_mutex_lock(&rli->log_space_lock);
3796
  rli->ignore_log_space_limit= 0;
3797
  pthread_mutex_unlock(&rli->log_space_lock);
3798
  rli->trans_retries= 0; // start from "no error"
3799

3800
  if (init_relay_log_pos(rli,
3801 3802
			 rli->group_relay_log_name,
			 rli->group_relay_log_pos,
3803 3804
			 1 /*need data lock*/, &errmsg,
                         1 /*look for a description_event*/))
3805 3806 3807 3808 3809
  {
    sql_print_error("Error initializing relay log position: %s",
		    errmsg);
    goto err;
  }
3810
  THD_CHECK_SENTRY(thd);
3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834
#ifndef DBUG_OFF
  {
    char llbuf1[22], llbuf2[22];
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
                        llstr(my_b_tell(rli->cur_log),llbuf1), 
                        llstr(rli->event_relay_log_pos,llbuf2)));
    DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
    /*
      Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
      correct position when it's called just after my_b_seek() (the questionable
      stuff is those "seek is done on next read" comments in the my_b_seek()
      source code).
      The crude reality is that this assertion randomly fails whereas
      replication seems to work fine. And there is no easy explanation why it
      fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
      init_relay_log_pos() called above). Maybe the assertion would be
      meaningful if we held rli->data_lock between the my_b_seek() and the
      DBUG_ASSERT().
    */
#ifdef SHOULD_BE_CHECKED
    DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
#endif
  }
#endif
3835
  DBUG_ASSERT(rli->sql_thd == thd);
3836 3837

  DBUG_PRINT("master_info",("log_file_name: %s  position: %s",
3838 3839
			    rli->group_master_log_name,
			    llstr(rli->group_master_log_pos,llbuff)));
3840
  if (global_system_variables.log_warnings)
3841
    sql_print_information("Slave SQL thread initialized, starting replication in \
unknown's avatar
unknown committed
3842
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
3843 3844
		    llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
		    llstr(rli->group_relay_log_pos,llbuff1));
3845

unknown's avatar
unknown committed
3846
  /* execute init_slave variable */
unknown's avatar
unknown committed
3847
  if (sys_init_slave.value_length)
unknown's avatar
unknown committed
3848
  {
3849
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
unknown's avatar
unknown committed
3850 3851 3852 3853 3854 3855 3856 3857
    if (thd->query_error)
    {
      sql_print_error("\
Slave SQL thread aborted. Can't execute init_slave query");
      goto err;
    }
  }

3858 3859
  /* Read queries from the IO/THREAD until this thread is killed */

3860
  while (!sql_slave_killed(thd,rli))
3861
  {
3862
    thd->proc_info = "Reading event from the relay log";
3863
    DBUG_ASSERT(rli->sql_thd == thd);
3864
    THD_CHECK_SENTRY(thd);
3865 3866 3867
    if (exec_relay_log_event(thd,rli))
    {
      // do not scare the user if SQL thread was simply killed or stopped
3868
      if (!sql_slave_killed(thd,rli))
3869 3870
        sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \
unknown's avatar
unknown committed
3871
the slave SQL thread with \"SLAVE START\". We stopped at log \
3872
'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
3873 3874
      goto err;
    }
3875
  }
3876

3877
  /* Thread stopped. Print the current replication position to the log */
unknown's avatar
unknown committed
3878 3879 3880
  sql_print_information("Slave SQL thread exiting, replication stopped in log "
 			"'%s' at position %s",
		        RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
3881 3882

 err:
3883
  VOID(pthread_mutex_lock(&LOCK_thread_count));
3884 3885 3886 3887 3888 3889
  /*
    Some extra safety, which should not been needed (normally, event deletion
    should already have done these assignments (each event which sets these
    variables is supposed to set them to 0 before terminating)).
  */
  thd->query= thd->db= thd->catalog= 0; 
3890
  thd->query_length= thd->db_length= 0;
3891
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
3892 3893
  thd->proc_info = "Waiting for slave mutex on exit";
  pthread_mutex_lock(&rli->run_lock);
3894 3895
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
  pthread_mutex_lock(&rli->data_lock);
3896
  DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
3897 3898
  /* When master_pos_wait() wakes up it will check this and terminate */
  rli->slave_running= 0; 
3899 3900 3901
  /* Forget the relay log's format */
  delete rli->relay_log.description_event_for_exec;
  rli->relay_log.description_event_for_exec= 0;
3902 3903 3904 3905
  /* Wake up master_pos_wait() */
  pthread_mutex_unlock(&rli->data_lock);
  DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
  pthread_cond_broadcast(&rli->data_cond);
unknown's avatar
unknown committed
3906
  rli->ignore_log_space_limit= 0; /* don't need any lock */
3907 3908
  /* we die so won't remember charset - re-update them on next thread start */
  rli->cached_charset_invalidate();
3909
  rli->save_temporary_tables = thd->temporary_tables;
unknown's avatar
unknown committed
3910 3911 3912 3913 3914

  /*
    TODO: see if we can do this conditionally in next_event() instead
    to avoid unneeded position re-init
  */
3915 3916 3917 3918
  thd->temporary_tables = 0; // remove tempation from destructor to close them
  DBUG_ASSERT(thd->net.buff != 0);
  net_end(&thd->net); // destructor will not free it, because we are weird
  DBUG_ASSERT(rli->sql_thd == thd);
3919
  THD_CHECK_SENTRY(thd);
3920
  rli->sql_thd= 0;
3921
  pthread_mutex_lock(&LOCK_thread_count);
3922
  THD_CHECK_SENTRY(thd);
3923 3924 3925 3926 3927 3928 3929 3930 3931
  delete thd;
  pthread_mutex_unlock(&LOCK_thread_count);
  pthread_cond_broadcast(&rli->stop_cond);
  // tell the world we are done
  pthread_mutex_unlock(&rli->run_lock);
#ifndef DBUG_OFF // TODO: reconsider the code below
  if (abort_slave_event_count && !rli->events_till_abort)
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
3932
  my_thread_end();
3933 3934 3935
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}
unknown's avatar
unknown committed
3936

3937

unknown's avatar
unknown committed
3938
/*
3939
  process_io_create_file()
unknown's avatar
unknown committed
3940
*/
3941

unknown's avatar
unknown committed
3942 3943 3944 3945 3946
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
  int error = 1;
  ulong num_bytes;
  bool cev_not_written;
3947 3948
  THD *thd = mi->io_thd;
  NET *net = &mi->mysql->net;
unknown's avatar
unknown committed
3949
  DBUG_ENTER("process_io_create_file");
unknown's avatar
unknown committed
3950 3951

  if (unlikely(!cev->is_valid()))
unknown's avatar
unknown committed
3952
    DBUG_RETURN(1);
unknown's avatar
unknown committed
3953 3954 3955 3956 3957 3958
  /*
    TODO: fix to honor table rules, not only db rules
  */
  if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
  {
    skip_load_data_infile(net);
unknown's avatar
unknown committed
3959
    DBUG_RETURN(0);
unknown's avatar
unknown committed
3960 3961 3962
  }
  DBUG_ASSERT(cev->inited_from_old);
  thd->file_id = cev->file_id = mi->file_id++;
3963
  thd->server_id = cev->server_id;
unknown's avatar
unknown committed
3964 3965 3966 3967 3968 3969 3970 3971 3972
  cev_not_written = 1;
  
  if (unlikely(net_request_file(net,cev->fname)))
  {
    sql_print_error("Slave I/O: failed requesting download of '%s'",
		    cev->fname);
    goto err;
  }

unknown's avatar
unknown committed
3973 3974 3975 3976
  /*
    This dummy block is so we could instantiate Append_block_log_event
    once and then modify it slightly instead of doing it multiple times
    in the loop
unknown's avatar
unknown committed
3977 3978
  */
  {
3979
    Append_block_log_event aev(thd,0,0,0,0);
unknown's avatar
unknown committed
3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990
  
    for (;;)
    {
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
      {
	sql_print_error("Network read error downloading '%s' from master",
			cev->fname);
	goto err;
      }
      if (unlikely(!num_bytes)) /* eof */
      {
3991
	net_write_command(net, 0, "", 0, "", 0);/* 3.23 master wants it */
unknown's avatar
unknown committed
3992 3993 3994 3995 3996 3997 3998 3999
        /*
          If we wrote Create_file_log_event, then we need to write
          Execute_load_log_event. If we did not write Create_file_log_event,
          then this is an empty file and we can just do as if the LOAD DATA
          INFILE had not existed, i.e. write nothing.
        */
        if (unlikely(cev_not_written))
	  break;
unknown's avatar
unknown committed
4000
	Execute_load_log_event xev(thd,0,0);
4001
	xev.log_pos = cev->log_pos;
unknown's avatar
unknown committed
4002 4003 4004 4005 4006 4007
	if (unlikely(mi->rli.relay_log.append(&xev)))
	{
	  sql_print_error("Slave I/O: error writing Exec_load event to \
relay log");
	  goto err;
	}
unknown's avatar
unknown committed
4008
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021
	break;
      }
      if (unlikely(cev_not_written))
      {
	cev->block = (char*)net->read_pos;
	cev->block_len = num_bytes;
	if (unlikely(mi->rli.relay_log.append(cev)))
	{
	  sql_print_error("Slave I/O: error writing Create_file event to \
relay log");
	  goto err;
	}
	cev_not_written=0;
unknown's avatar
unknown committed
4022
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
4023 4024 4025 4026 4027
      }
      else
      {
	aev.block = (char*)net->read_pos;
	aev.block_len = num_bytes;
4028
	aev.log_pos = cev->log_pos;
unknown's avatar
unknown committed
4029 4030 4031 4032 4033 4034
	if (unlikely(mi->rli.relay_log.append(&aev)))
	{
	  sql_print_error("Slave I/O: error writing Append_block event to \
relay log");
	  goto err;
	}
unknown's avatar
unknown committed
4035
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
unknown's avatar
unknown committed
4036 4037 4038 4039 4040
      }
    }
  }
  error=0;
err:
unknown's avatar
unknown committed
4041
  DBUG_RETURN(error);
unknown's avatar
unknown committed
4042
}
unknown's avatar
unknown committed
4043

4044

unknown's avatar
unknown committed
4045
/*
unknown's avatar
unknown committed
4046 4047 4048 4049 4050 4051 4052 4053
  Start using a new binary log on the master

  SYNOPSIS
    process_io_rotate()
    mi			master_info for the slave
    rev			The rotate log event read from the binary log

  DESCRIPTION
4054
    Updates the master info with the place in the next binary
unknown's avatar
unknown committed
4055
    log where we should start reading.
4056
    Rotate the relay log to avoid mixed-format relay logs.
unknown's avatar
unknown committed
4057 4058 4059 4060 4061 4062 4063

  NOTES
    We assume we already locked mi->data_lock

  RETURN VALUES
    0		ok
    1	        Log event is illegal
unknown's avatar
unknown committed
4064 4065 4066

*/

unknown's avatar
unknown committed
4067
static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
4068
{
4069
  DBUG_ENTER("process_io_rotate");
unknown's avatar
unknown committed
4070
  safe_mutex_assert_owner(&mi->data_lock);
4071

unknown's avatar
unknown committed
4072
  if (unlikely(!rev->is_valid()))
4073
    DBUG_RETURN(1);
unknown's avatar
unknown committed
4074

4075
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
unknown's avatar
unknown committed
4076 4077 4078 4079
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
  mi->master_log_pos= rev->pos;
  DBUG_PRINT("info", ("master_log_pos: '%s' %d",
		      mi->master_log_name, (ulong) mi->master_log_pos));
4080
#ifndef DBUG_OFF
unknown's avatar
unknown committed
4081 4082 4083 4084 4085 4086
  /*
    If we do not do this, we will be getting the first
    rotate event forever, so we need to not disconnect after one.
  */
  if (disconnect_slave_event_count)
    events_till_disconnect++;
4087
#endif
4088

4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102
  /*
    If description_event_for_queue is format <4, there is conversion in the
    relay log to the slave's format (4). And Rotate can mean upgrade or
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
    no need to reset description_event_for_queue now. And if it's nothing (same
    master version as before), no need (still using the slave's format).
  */
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
  {
    delete mi->rli.relay_log.description_event_for_queue;
    /* start from format 3 (MySQL 4.0) again */
    mi->rli.relay_log.description_event_for_queue= new
      Format_description_log_event(3);
  }
4103 4104 4105 4106
  /*
    Rotate the relay log makes binlog format detection easier (at next slave
    start or mysqlbinlog)
  */
4107
  rotate_relay_log(mi); /* will take the right mutexes */
4108
  DBUG_RETURN(0);
4109 4110
}

unknown's avatar
unknown committed
4111
/*
4112 4113
  Reads a 3.23 event and converts it to the slave's format. This code was
  copied from MySQL 4.0.
unknown's avatar
unknown committed
4114
*/
4115
static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf,
unknown's avatar
unknown committed
4116
			   ulong event_len)
4117
{
unknown's avatar
unknown committed
4118
  const char *errmsg = 0;
unknown's avatar
unknown committed
4119 4120 4121 4122
  ulong inc_pos;
  bool ignore_event= 0;
  char *tmp_buf = 0;
  RELAY_LOG_INFO *rli= &mi->rli;
4123
  DBUG_ENTER("queue_binlog_ver_1_event");
unknown's avatar
unknown committed
4124

unknown's avatar
unknown committed
4125 4126 4127
  /*
    If we get Load event, we need to pass a non-reusable buffer
    to read_log_event, so we do a trick
unknown's avatar
unknown committed
4128 4129 4130 4131 4132 4133
  */
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
  {
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
    {
      sql_print_error("Slave I/O: out of memory for Load event");
unknown's avatar
unknown committed
4134
      DBUG_RETURN(1);
unknown's avatar
unknown committed
4135 4136
    }
    memcpy(tmp_buf,buf,event_len);
4137 4138 4139 4140 4141 4142 4143 4144
    /*
      Create_file constructor wants a 0 as last char of buffer, this 0 will
      serve as the string-termination char for the file's name (which is at the
      end of the buffer)
      We must increment event_len, otherwise the event constructor will not see
      this end 0, which leads to segfault.
    */
    tmp_buf[event_len++]=0;
unknown's avatar
unknown committed
4145
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
unknown's avatar
unknown committed
4146 4147
    buf = (const char*)tmp_buf;
  }
4148 4149 4150 4151 4152 4153
  /*
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
    send the loaded file, and write it to the relay log in the form of
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
    connected to the master).
  */
unknown's avatar
unknown committed
4154
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
4155
                                            mi->rli.relay_log.description_event_for_queue);
4156
  if (unlikely(!ev))
4157 4158
  {
    sql_print_error("Read invalid event from master: '%s',\
unknown's avatar
unknown committed
4159
 master could be corrupt but a more likely cause of this is a bug",
4160
		    errmsg);
unknown's avatar
unknown committed
4161 4162
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(1);
4163
  }
4164
  pthread_mutex_lock(&mi->data_lock);
4165
  ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
unknown's avatar
unknown committed
4166
  switch (ev->get_type_code()) {
unknown's avatar
unknown committed
4167
  case STOP_EVENT:
4168
    ignore_event= 1;
unknown's avatar
unknown committed
4169 4170
    inc_pos= event_len;
    break;
4171
  case ROTATE_EVENT:
4172
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
4173 4174
    {
      delete ev;
4175
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
4176
      DBUG_RETURN(1);
4177
    }
unknown's avatar
unknown committed
4178
    inc_pos= 0;
4179
    break;
unknown's avatar
unknown committed
4180
  case CREATE_FILE_EVENT:
4181 4182 4183 4184 4185 4186
    /*
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
      queue_old_event() which is for 3.23 events which don't comprise
      CREATE_FILE_EVENT. This is because read_log_event() above has just
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
    */
unknown's avatar
unknown committed
4187
  {
unknown's avatar
unknown committed
4188
    /* We come here when and only when tmp_buf != 0 */
4189
    DBUG_ASSERT(tmp_buf != 0);
4190 4191
    inc_pos=event_len;
    ev->log_pos+= inc_pos;
unknown's avatar
unknown committed
4192
    int error = process_io_create_file(mi,(Create_file_log_event*)ev);
4193
    delete ev;
4194
    mi->master_log_pos += inc_pos;
4195
    DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
4196
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
4197
    my_free((char*)tmp_buf, MYF(0));
unknown's avatar
unknown committed
4198
    DBUG_RETURN(error);
unknown's avatar
unknown committed
4199
  }
4200
  default:
unknown's avatar
unknown committed
4201
    inc_pos= event_len;
4202 4203
    break;
  }
unknown's avatar
unknown committed
4204
  if (likely(!ignore_event))
4205
  {
4206 4207 4208 4209 4210 4211
    if (ev->log_pos) 
      /* 
         Don't do it for fake Rotate events (see comment in
      Log_event::Log_event(const char* buf...) in log_event.cc).
      */
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
unknown's avatar
unknown committed
4212
    if (unlikely(rli->relay_log.append(ev)))
4213 4214 4215
    {
      delete ev;
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
4216
      DBUG_RETURN(1);
4217
    }
unknown's avatar
unknown committed
4218
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
4219 4220
  }
  delete ev;
unknown's avatar
unknown committed
4221
  mi->master_log_pos+= inc_pos;
4222
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
4223
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
4224
  DBUG_RETURN(0);
4225 4226
}

4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267 4268 4269 4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287 4288 4289 4290 4291 4292 4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309
/*
  Reads a 4.0 event and converts it to the slave's format. This code was copied
  from queue_binlog_ver_1_event(), with some affordable simplifications.
*/
static int queue_binlog_ver_3_event(MASTER_INFO *mi, const char *buf,
			   ulong event_len)
{
  const char *errmsg = 0;
  ulong inc_pos;
  char *tmp_buf = 0;
  RELAY_LOG_INFO *rli= &mi->rli;
  DBUG_ENTER("queue_binlog_ver_3_event");

  /* read_log_event() will adjust log_pos to be end_log_pos */
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
                                            mi->rli.relay_log.description_event_for_queue);
  if (unlikely(!ev))
  {
    sql_print_error("Read invalid event from master: '%s',\
 master could be corrupt but a more likely cause of this is a bug",
		    errmsg);
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(1);
  }
  pthread_mutex_lock(&mi->data_lock);
  switch (ev->get_type_code()) {
  case STOP_EVENT:
    goto err;
  case ROTATE_EVENT:
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
    {
      delete ev;
      pthread_mutex_unlock(&mi->data_lock);
      DBUG_RETURN(1);
    }
    inc_pos= 0;
    break;
  default:
    inc_pos= event_len;
    break;
  }
  if (unlikely(rli->relay_log.append(ev)))
  {
    delete ev;
    pthread_mutex_unlock(&mi->data_lock);
    DBUG_RETURN(1);
  }
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
  delete ev;
  mi->master_log_pos+= inc_pos;
err:
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
  pthread_mutex_unlock(&mi->data_lock);
  DBUG_RETURN(0);
}

/*
  queue_old_event()

  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
  the 3.23/4.0 bytes, then write this event to the relay log.

  TODO: 
    Test this code before release - it has to be tested on a separate
    setup with 3.23 master or 4.0 master
*/

static int queue_old_event(MASTER_INFO *mi, const char *buf,
			   ulong event_len)
{
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
  {
  case 1:
      return queue_binlog_ver_1_event(mi,buf,event_len);
  case 3:
      return queue_binlog_ver_3_event(mi,buf,event_len);
  default: /* unsupported format; eg version 2 */
    DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()",
                       mi->rli.relay_log.description_event_for_queue->binlog_version));  
    return 1;
  }
}
4310

unknown's avatar
unknown committed
4311
/*
4312 4313
  queue_event()

4314 4315 4316 4317 4318
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
  no format conversion, it's pure read/write of bytes.
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
  any >=5.0.0 format.
unknown's avatar
unknown committed
4319 4320 4321
*/

int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
4322
{
unknown's avatar
unknown committed
4323 4324 4325
  int error= 0;
  ulong inc_pos;
  RELAY_LOG_INFO *rli= &mi->rli;
4326
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
unknown's avatar
unknown committed
4327 4328
  DBUG_ENTER("queue_event");

4329 4330
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
unknown's avatar
unknown committed
4331
    DBUG_RETURN(queue_old_event(mi,buf,event_len));
4332 4333

  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
4334

unknown's avatar
unknown committed
4335
  switch (buf[EVENT_TYPE_OFFSET]) {
4336
  case STOP_EVENT:
4337 4338
    /*
      We needn't write this event to the relay log. Indeed, it just indicates a
unknown's avatar
unknown committed
4339 4340 4341
      master server shutdown. The only thing this does is cleaning. But
      cleaning is already done on a per-master-thread basis (as the master
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
4342
      prepared statements' deletion are TODO only when we binlog prep stmts).
4343
      
unknown's avatar
unknown committed
4344 4345 4346 4347
      We don't even increment mi->master_log_pos, because we may be just after
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
      event from the next binlog (unless the master is presently running
      without --log-bin).
4348 4349
    */
    goto err;
4350 4351
  case ROTATE_EVENT:
  {
4352
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue); 
4353
    if (unlikely(process_io_rotate(mi,&rev)))
unknown's avatar
unknown committed
4354
    {
4355 4356
      error= 1;
      goto err;
unknown's avatar
unknown committed
4357
    }
4358 4359 4360 4361
    /*
      Now the I/O thread has just changed its mi->master_log_name, so
      incrementing mi->master_log_pos is nonsense.
    */
unknown's avatar
unknown committed
4362
    inc_pos= 0;
4363 4364
    break;
  }
4365 4366 4367 4368 4369 4370 4371
  case FORMAT_DESCRIPTION_EVENT:
  {
    /*
      Create an event, and save it (when we rotate the relay log, we will have
      to write this event again).
    */
    /*
4372 4373 4374
      We are the only thread which reads/writes description_event_for_queue.
      The relay_log struct does not move (though some members of it can
      change), so we needn't any lock (no rli->data_lock, no log lock).
4375
    */
4376
    Format_description_log_event* tmp;
4377
    const char* errmsg;
4378
    if (!(tmp= (Format_description_log_event*)
4379
          Log_event::read_log_event(buf, event_len, &errmsg,
4380
                                    mi->rli.relay_log.description_event_for_queue)))
4381 4382 4383 4384
    {
      error= 2;
      goto err;
    }
4385 4386
    delete mi->rli.relay_log.description_event_for_queue;
    mi->rli.relay_log.description_event_for_queue= tmp;
4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400
    /* 
       Though this does some conversion to the slave's format, this will
       preserve the master's binlog format version, and number of event types. 
    */
    /* 
       If the event was not requested by the slave (the slave did not ask for
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos 
    */
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
    DBUG_PRINT("info",("binlog format is now %d",
                       mi->rli.relay_log.description_event_for_queue->binlog_version));  

  }
  break;
4401
  default:
unknown's avatar
unknown committed
4402
    inc_pos= event_len;
4403 4404
    break;
  }
4405 4406 4407 4408

  /* 
     If this event is originating from this server, don't queue it. 
     We don't check this for 3.23 events because it's simpler like this; 3.23
unknown's avatar
unknown committed
4409 4410
     will be filtered anyway by the SQL slave thread which also tests the
     server id (we must also keep this test in the SQL thread, in case somebody
4411 4412 4413 4414 4415 4416 4417 4418
     upgrades a 4.0 slave which has a not-filtered relay log).

     ANY event coming from ourselves can be ignored: it is obvious for queries;
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
     (--log-slave-updates would not log that) unless this slave is also its
     direct master (an unsupported, useless setup!).
  */

4419 4420
  pthread_mutex_lock(log_lock);

unknown's avatar
unknown committed
4421 4422
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
      !replicate_same_server_id)
4423
  {
4424 4425
    /*
      Do not write it to the relay log.
4426 4427 4428 4429 4430 4431 4432 4433
      a) We still want to increment mi->master_log_pos, so that we won't
      re-read this event from the master if the slave IO thread is now
      stopped/restarted (more efficient if the events we are ignoring are big
      LOAD DATA INFILE).
      b) We want to record that we are skipping events, for the information of
      the slave SQL thread, otherwise that thread may let
      rli->group_relay_log_pos stay too small if the last binlog's event is
      ignored.
4434 4435 4436
      But events which were generated by this slave and which do not exist in
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
      mi->master_log_pos.
4437
    */
4438 4439 4440
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
unknown's avatar
unknown committed
4441
    {
4442
      mi->master_log_pos+= inc_pos;
unknown's avatar
unknown committed
4443 4444 4445 4446
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
      DBUG_ASSERT(rli->ign_master_log_name_end[0]);
      rli->ign_master_log_pos_end= mi->master_log_pos;
    }
4447
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
4448 4449
    DBUG_PRINT("info", ("master_log_pos: %d, event originating from the same server, ignored", (ulong) mi->master_log_pos));
  }  
unknown's avatar
unknown committed
4450 4451 4452
  else
  {
    /* write the event to the relay log */
4453
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
4454 4455 4456 4457 4458
    {
      mi->master_log_pos+= inc_pos;
      DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
    }
4459
    else
4460
      error= 3;
4461
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
unknown's avatar
unknown committed
4462
  }
4463
  pthread_mutex_unlock(log_lock);
4464 4465 4466


err:
4467
  pthread_mutex_unlock(&mi->data_lock);
4468
  DBUG_PRINT("info", ("error=%d", error));
unknown's avatar
unknown committed
4469
  DBUG_RETURN(error);
4470 4471
}

4472

4473 4474
void end_relay_log_info(RELAY_LOG_INFO* rli)
{
4475 4476
  DBUG_ENTER("end_relay_log_info");

4477
  if (!rli->inited)
4478
    DBUG_VOID_RETURN;
4479
  if (rli->info_fd >= 0)
unknown's avatar
unknown committed
4480 4481
  {
    end_io_cache(&rli->info_file);
4482
    (void) my_close(rli->info_fd, MYF(MY_WME));
unknown's avatar
unknown committed
4483 4484
    rli->info_fd = -1;
  }
4485
  if (rli->cur_log_fd >= 0)
unknown's avatar
unknown committed
4486 4487 4488 4489 4490
  {
    end_io_cache(&rli->cache_buf);
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
4491
  rli->inited = 0;
4492
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
4493
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
unknown's avatar
unknown committed
4494 4495 4496 4497 4498 4499
  /*
    Delete the slave's temporary tables from memory.
    In the future there will be other actions than this, to ensure persistance
    of slave's temp tables after shutdown.
  */
  rli->close_temporary_tables();
4500
  DBUG_VOID_RETURN;
4501 4502
}

unknown's avatar
unknown committed
4503 4504
/*
  Try to connect until successful or slave killed
4505

unknown's avatar
unknown committed
4506 4507 4508 4509 4510
  SYNPOSIS
    safe_connect()
    thd			Thread handler for slave
    mysql		MySQL connection handle
    mi			Replication handle
4511

unknown's avatar
unknown committed
4512 4513 4514 4515
  RETURN
    0	ok
    #	Error
*/
4516

4517
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
unknown's avatar
unknown committed
4518
{
unknown's avatar
unknown committed
4519
  return connect_to_master(thd, mysql, mi, 0, 0);
unknown's avatar
unknown committed
4520 4521
}

4522

4523
/*
unknown's avatar
unknown committed
4524 4525
  SYNPOSIS
    connect_to_master()
unknown's avatar
unknown committed
4526

unknown's avatar
unknown committed
4527 4528 4529
  IMPLEMENTATION
    Try to connect until successful or slave killed or we have retried
    master_retry_count times
4530
*/
unknown's avatar
unknown committed
4531

unknown's avatar
unknown committed
4532
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
4533
			     bool reconnect, bool suppress_warnings)
unknown's avatar
unknown committed
4534
{
4535
  int slave_was_killed;
4536 4537
  int last_errno= -2;				// impossible error
  ulong err_count=0;
unknown's avatar
unknown committed
4538
  char llbuff[22];
4539
  DBUG_ENTER("connect_to_master");
unknown's avatar
unknown committed
4540

unknown's avatar
unknown committed
4541 4542 4543
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif
4544
  ulong client_flag= CLIENT_REMEMBER_OPTIONS;
4545 4546 4547
  if (opt_slave_compressed_protocol)
    client_flag=CLIENT_COMPRESS;		/* We will use compression */

4548 4549
  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
unknown's avatar
unknown committed
4550 4551 4552 4553 4554 4555 4556 4557 4558 4559 4560
 
#ifdef HAVE_OPENSSL
  if (mi->ssl)
    mysql_ssl_set(mysql, 
                  mi->ssl_key[0]?mi->ssl_key:0,
                  mi->ssl_cert[0]?mi->ssl_cert:0, 
                  mi->ssl_ca[0]?mi->ssl_ca:0,
                  mi->ssl_capath[0]?mi->ssl_capath:0,
                  mi->ssl_cipher[0]?mi->ssl_cipher:0);
#endif

4561 4562 4563 4564
  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
  /* This one is not strictly needed but we have it here for completeness */
  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);

4565
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
4566 4567 4568
	 (reconnect ? mysql_reconnect(mysql) != 0 :
	  mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
			     mi->port, 0, client_flag) == 0))
unknown's avatar
unknown committed
4569
  {
4570
    /* Don't repeat last error */
unknown's avatar
SCRUM  
unknown committed
4571
    if ((int)mysql_errno(mysql) != last_errno)
4572
    {
unknown's avatar
SCRUM  
unknown committed
4573
      last_errno=mysql_errno(mysql);
unknown's avatar
unknown committed
4574
      suppress_warnings= 0;
4575
      sql_print_error("Slave I/O thread: error %s to master \
4576
'%s@%s:%d': \
4577
Error: '%s'  errno: %d  retry-time: %d  retries: %d",
4578
		      (reconnect ? "reconnecting" : "connecting"),
4579
		      mi->user,mi->host,mi->port,
unknown's avatar
SCRUM  
unknown committed
4580
		      mysql_error(mysql), last_errno,
4581 4582
		      mi->connect_retry,
		      master_retry_count);
4583
    }
unknown's avatar
unknown committed
4584 4585 4586
    /*
      By default we try forever. The reason is that failure will trigger
      master election, so if the user did not set master_retry_count we
4587
      do not want to have election triggered on the first failure to
unknown's avatar
unknown committed
4588
      connect
4589
    */
4590
    if (++err_count == master_retry_count)
4591 4592
    {
      slave_was_killed=1;
unknown's avatar
unknown committed
4593 4594
      if (reconnect)
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
4595 4596
      break;
    }
4597 4598
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
	       (void*)mi);
unknown's avatar
unknown committed
4599
  }
4600

4601 4602
  if (!slave_was_killed)
  {
unknown's avatar
unknown committed
4603
    if (reconnect)
unknown's avatar
unknown committed
4604
    { 
4605
      if (!suppress_warnings && global_system_variables.log_warnings)
4606
	sql_print_information("Slave: connected to master '%s@%s:%d',\
4607
replication resumed in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
4608 4609 4610 4611
			mi->host, mi->port,
			IO_RPL_LOG_NAME,
			llstr(mi->master_log_pos,llbuff));
    }
unknown's avatar
unknown committed
4612 4613 4614 4615
    else
    {
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
      mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
unknown's avatar
unknown committed
4616
		      mi->user, mi->host, mi->port);
unknown's avatar
unknown committed
4617
    }
4618
#ifdef SIGNAL_WITH_VIO_CLOSE
4619
    thd->set_active_vio(mysql->net.vio);
4620
#endif      
4621
  }
4622
  mysql->reconnect= 1;
4623 4624
  DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
  DBUG_RETURN(slave_was_killed);
unknown's avatar
unknown committed
4625 4626
}

4627

unknown's avatar
unknown committed
4628
/*
4629
  safe_reconnect()
unknown's avatar
unknown committed
4630

unknown's avatar
unknown committed
4631 4632 4633
  IMPLEMENTATION
    Try to connect until successful or slave killed or we have retried
    master_retry_count times
unknown's avatar
unknown committed
4634 4635
*/

unknown's avatar
unknown committed
4636 4637
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings)
unknown's avatar
unknown committed
4638
{
unknown's avatar
unknown committed
4639 4640
  DBUG_ENTER("safe_reconnect");
  DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
unknown's avatar
unknown committed
4641 4642
}

unknown's avatar
unknown committed
4643

4644 4645 4646 4647 4648 4649 4650 4651 4652 4653 4654 4655 4656 4657 4658 4659 4660 4661 4662 4663 4664 4665 4666 4667 4668 4669 4670 4671 4672 4673
/*
  Store the file and position where the execute-slave thread are in the
  relay log.

  SYNOPSIS
    flush_relay_log_info()
    rli			Relay log information

  NOTES
    - As this is only called by the slave thread, we don't need to
      have a lock on this.
    - If there is an active transaction, then we don't update the position
      in the relay log.  This is to ensure that we re-execute statements
      if we die in the middle of an transaction that was rolled back.
    - As a transaction never spans binary logs, we don't have to handle the
      case where we do a relay-log-rotation in the middle of the transaction.
      If this would not be the case, we would have to ensure that we
      don't delete the relay log file where the transaction started when
      we switch to a new relay log file.

  TODO
    - Change the log file information to a binary format to avoid calling
      longlong2str.

  RETURN VALUES
    0	ok
    1	write error
*/

bool flush_relay_log_info(RELAY_LOG_INFO* rli)
4674
{
4675 4676 4677 4678
  bool error=0;
  IO_CACHE *file = &rli->info_file;
  char buff[FN_REFLEN*2+22*2+4], *pos;

4679
  my_b_seek(file, 0L);
4680
  pos=strmov(buff, rli->group_relay_log_name);
4681
  *pos++='\n';
4682
  pos=longlong2str(rli->group_relay_log_pos, pos, 10);
4683
  *pos++='\n';
4684
  pos=strmov(pos, rli->group_master_log_name);
4685
  *pos++='\n';
4686
  pos=longlong2str(rli->group_master_log_pos, pos, 10);
4687
  *pos='\n';
4688
  if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
4689 4690 4691
    error=1;
  if (flush_io_cache(file))
    error=1;
4692
  /* Flushing the relay log is done by the slave I/O thread */
4693
  return error;
4694 4695
}

4696

unknown's avatar
unknown committed
4697
/*
4698
  Called when we notice that the current "hot" log got rotated under our feet.
unknown's avatar
unknown committed
4699 4700 4701
*/

static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
4702 4703 4704
{
  DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
  DBUG_ASSERT(rli->cur_log_fd == -1);
unknown's avatar
unknown committed
4705 4706 4707
  DBUG_ENTER("reopen_relay_log");

  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
4708
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
unknown's avatar
unknown committed
4709
				   errmsg)) <0)
unknown's avatar
unknown committed
4710
    DBUG_RETURN(0);
4711 4712 4713 4714 4715
  /*
    We want to start exactly where we was before:
    relay_log_pos	Current log pos
    pending		Number of bytes already processed from the event
  */
4716
  rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE);
4717
  my_b_seek(cur_log,rli->event_relay_log_pos);
unknown's avatar
unknown committed
4718
  DBUG_RETURN(cur_log);
4719 4720
}

unknown's avatar
unknown committed
4721

4722 4723 4724 4725
Log_event* next_event(RELAY_LOG_INFO* rli)
{
  Log_event* ev;
  IO_CACHE* cur_log = rli->cur_log;
4726
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); 
4727 4728
  const char* errmsg=0;
  THD* thd = rli->sql_thd;
4729
  
unknown's avatar
unknown committed
4730
  DBUG_ENTER("next_event");
4731 4732
  DBUG_ASSERT(thd != 0);

unknown's avatar
unknown committed
4733 4734
  /*
    For most operations we need to protect rli members with data_lock,
4735 4736 4737 4738
    so we assume calling function acquired this mutex for us and we will
    hold it for the most of the loop below However, we will release it
    whenever it is worth the hassle,  and in the cases when we go into a
    pthread_cond_wait() with the non-data_lock mutex
unknown's avatar
unknown committed
4739
  */
4740
  safe_mutex_assert_owner(&rli->data_lock);
4741
  
4742
  while (!sql_slave_killed(thd,rli))
unknown's avatar
unknown committed
4743 4744 4745
  {
    /*
      We can have two kinds of log reading:
unknown's avatar
unknown committed
4746 4747 4748 4749 4750 4751 4752 4753
      hot_log:
        rli->cur_log points at the IO_CACHE of relay_log, which
        is actively being updated by the I/O thread. We need to be careful
        in this case and make sure that we are not looking at a stale log that
        has already been rotated. If it has been, we reopen the log.

      The other case is much simpler:
        We just have a read only log that nobody else will be updating.
unknown's avatar
unknown committed
4754
    */
4755 4756 4757 4758 4759
    bool hot_log;
    if ((hot_log = (cur_log != &rli->cache_buf)))
    {
      DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
      pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
4760 4761

      /*
unknown's avatar
unknown committed
4762
	Reading xxx_file_id is safe because the log will only
unknown's avatar
unknown committed
4763 4764
	be rotated when we hold relay_log.LOCK_log
      */
unknown's avatar
unknown committed
4765
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
4766
      {
unknown's avatar
unknown committed
4767 4768 4769 4770
	// The master has switched to a new log file; Reopen the old log file
	cur_log=reopen_relay_log(rli, &errmsg);
	pthread_mutex_unlock(log_lock);
	if (!cur_log)				// No more log files
4771
	  goto err;
unknown's avatar
unknown committed
4772
	hot_log=0;				// Using old binary log
4773 4774
      }
    }
4775

4776 4777
#ifndef DBUG_OFF
    {
4778
      /* This is an assertion which sometimes fails, let's try to track it */
4779
      char llbuf1[22], llbuf2[22];
4780
      DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
4781
                          llstr(my_b_tell(cur_log),llbuf1),
4782 4783 4784
                          llstr(rli->event_relay_log_pos,llbuf2)));
      DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
      DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
4785 4786
    }
#endif
unknown's avatar
unknown committed
4787 4788
    /*
      Relay log is always in new format - if the master is 3.23, the
4789
      I/O thread will convert the format for us.
4790 4791
      A problem: the description event may be in a previous relay log. So if
      the slave has been shutdown meanwhile, we would have to look in old relay
4792 4793
      logs, which may even have been deleted. So we need to write this
      description event at the beginning of the relay log.
4794 4795
      When the relay log is created when the I/O thread starts, easy: the
      master will send the description event and we will queue it.
4796 4797
      But if the relay log is created by new_file(): then the solution is:
      MYSQL_LOG::open() will write the buffered description event.
unknown's avatar
unknown committed
4798
    */
4799 4800
    if ((ev=Log_event::read_log_event(cur_log,0,
                                      rli->relay_log.description_event_for_exec)))
4801

4802 4803
    {
      DBUG_ASSERT(thd==rli->sql_thd);
4804 4805 4806 4807 4808
      /*
        read it while we have a lock, to avoid a mutex lock in
        inc_event_relay_log_pos()
      */
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
4809 4810
      if (hot_log)
	pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
4811
      DBUG_RETURN(ev);
4812 4813
    }
    DBUG_ASSERT(thd==rli->sql_thd);
unknown's avatar
unknown committed
4814
    if (opt_reckless_slave)			// For mysql-test
unknown's avatar
unknown committed
4815
      cur_log->error = 0;
unknown's avatar
unknown committed
4816
    if (cur_log->error < 0)
unknown's avatar
unknown committed
4817 4818
    {
      errmsg = "slave SQL thread aborted because of I/O error";
unknown's avatar
unknown committed
4819 4820
      if (hot_log)
	pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
4821 4822
      goto err;
    }
4823 4824
    if (!cur_log->error) /* EOF */
    {
unknown's avatar
unknown committed
4825 4826 4827 4828 4829
      /*
	On a hot log, EOF means that there are no more updates to
	process and we must block until I/O thread adds some and
	signals us to continue
      */
4830 4831
      if (hot_log)
      {
4832 4833 4834 4835 4836 4837 4838 4839 4840 4841 4842 4843 4844 4845 4846
        /*
          We say in Seconds_Behind_Master that we have "caught up". Note that
          for example if network link is broken but I/O slave thread hasn't
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
          up" whereas we're not really caught up. Fixing that would require
          internally cutting timeout in smaller pieces in network read, no
          thanks. Another example: SQL has caught up on I/O, now I/O has read
          a new event and is queuing it; the false "0" will exist until SQL
          finishes executing the new event; it will be look abnormal only if
          the events have old timestamps (then you get "many", 0, "many").
          Transient phases like this can't really be fixed.
        */
        time_t save_timestamp= rli->last_master_timestamp;
        rli->last_master_timestamp= 0;

unknown's avatar
unknown committed
4847
	DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count);
4848 4849 4850 4851 4852 4853 4854

        if (rli->ign_master_log_name_end[0])
        {
          /* We generate and return a Rotate, to make our positions advance */
          DBUG_PRINT("info",("seeing an ignored end segment"));
          ev= new Rotate_log_event(thd, rli->ign_master_log_name_end,
                                   0, rli->ign_master_log_pos_end,
unknown's avatar
unknown committed
4855
                                   Rotate_log_event::DUP_NAME);
4856 4857 4858 4859 4860 4861 4862 4863 4864 4865 4866 4867
          rli->ign_master_log_name_end[0]= 0;
          if (unlikely(!ev))
          {
            errmsg= "Slave SQL thread failed to create a Rotate event "
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
            goto err;
          }
          pthread_mutex_unlock(log_lock);
          ev->server_id= 0; // don't be ignored by slave SQL thread
          DBUG_RETURN(ev);
        }

unknown's avatar
unknown committed
4868 4869 4870 4871
	/*
	  We can, and should release data_lock while we are waiting for
	  update. If we do not, show slave status will block
	*/
4872
	pthread_mutex_unlock(&rli->data_lock);
4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883 4884 4885 4886 4887 4888 4889 4890 4891 4892 4893 4894

        /*
          Possible deadlock : 
          - the I/O thread has reached log_space_limit
          - the SQL thread has read all relay logs, but cannot purge for some
          reason:
            * it has already purged all logs except the current one
            * there are other logs than the current one but they're involved in
            a transaction that finishes in the current one (or is not finished)
          Solution :
          Wake up the possibly waiting I/O thread, and set a boolean asking
          the I/O thread to temporarily ignore the log_space_limit
          constraint, because we do not want the I/O thread to block because of
          space (it's ok if it blocks for any other reason (e.g. because the
          master does not send anything). Then the I/O thread stops waiting 
          and reads more events.
          The SQL thread decides when the I/O thread should take log_space_limit
          into account again : ignore_log_space_limit is reset to 0 
          in purge_first_log (when the SQL thread purges the just-read relay
          log), and also when the SQL thread starts. We should also reset
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
          fact, no need as RESET SLAVE requires that the slave
unknown's avatar
unknown committed
4895 4896
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
          it stops.
4897 4898 4899 4900
        */
        pthread_mutex_lock(&rli->log_space_lock);
        // prevent the I/O thread from blocking next times
        rli->ignore_log_space_limit= 1; 
4901 4902 4903 4904 4905 4906
        /*
          If the I/O thread is blocked, unblock it.
          Ok to broadcast after unlock, because the mutex is only destroyed in
          ~st_relay_log_info(), i.e. when rli is destroyed, and rli will not be
          destroyed before we exit the present function.
        */
4907
        pthread_mutex_unlock(&rli->log_space_lock);
4908
        pthread_cond_broadcast(&rli->log_space_cond);
4909
        // Note that wait_for_update unlocks lock_log !
4910
        rli->relay_log.wait_for_update(rli->sql_thd, 1);
4911 4912
        // re-acquire data lock since we released it earlier
        pthread_mutex_lock(&rli->data_lock);
4913
        rli->last_master_timestamp= save_timestamp;
4914 4915
	continue;
      }
unknown's avatar
unknown committed
4916 4917 4918 4919 4920 4921 4922 4923 4924 4925
      /*
	If the log was not hot, we need to move to the next log in
	sequence. The next log could be hot or cold, we deal with both
	cases separately after doing some common initialization
      */
      end_io_cache(cur_log);
      DBUG_ASSERT(rli->cur_log_fd >= 0);
      my_close(rli->cur_log_fd, MYF(MY_WME));
      rli->cur_log_fd = -1;
	
4926
      if (relay_log_purge)
unknown's avatar
unknown committed
4927
      {
4928 4929 4930 4931 4932 4933 4934 4935 4936 4937 4938 4939 4940 4941 4942
	/*
          purge_first_log will properly set up relay log coordinates in rli.
          If the group's coordinates are equal to the event's coordinates
          (i.e. the relay log was not rotated in the middle of a group),
          we can purge this relay log too.
          We do ulonglong and string comparisons, this may be slow but
          - purging the last relay log is nice (it can save 1GB of disk), so we
          like to detect the case where we can do it, and given this,
          - I see no better detection method
          - purge_first_log is not called that often
        */
	if (rli->relay_log.purge_first_log
            (rli,
             rli->group_relay_log_pos == rli->event_relay_log_pos
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
unknown's avatar
unknown committed
4943
	{
4944
	  errmsg = "Error purging processed logs";
unknown's avatar
unknown committed
4945 4946 4947
	  goto err;
	}
      }
4948 4949
      else
      {
unknown's avatar
unknown committed
4950
	/*
4951 4952 4953 4954 4955
	  If hot_log is set, then we already have a lock on
	  LOCK_log.  If not, we have to get the lock.

	  According to Sasha, the only time this code will ever be executed
	  is if we are recovering from a bug.
unknown's avatar
unknown committed
4956
	*/
4957
	if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
4958
	{
unknown's avatar
unknown committed
4959 4960
	  errmsg = "error switching to the next log";
	  goto err;
4961
	}
4962 4963 4964
	rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
	strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
		sizeof(rli->event_relay_log_name)-1);
unknown's avatar
unknown committed
4965 4966
	flush_relay_log_info(rli);
      }
4967 4968 4969 4970 4971 4972 4973 4974 4975 4976 4977 4978 4979 4980

      /*
        Now we want to open this next log. To know if it's a hot log (the one
        being written by the I/O thread now) or a cold log, we can use
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
        the file normally. But if is_active() reports that the log is hot, this
        may change between the test and the consequence of the test. So we may
        open the I/O cache whereas the log is now cold, which is nonsense.
        To guard against this, we need to have LOCK_log.
      */

      DBUG_PRINT("info",("hot_log: %d",hot_log));
      if (!hot_log) /* if hot_log, we already have this mutex */
        pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
4981 4982
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
      {
4983
#ifdef EXTRA_DEBUG
unknown's avatar
unknown committed
4984
	if (global_system_variables.log_warnings)
4985 4986
	  sql_print_information("next log '%s' is currently active",
                                rli->linfo.log_file_name);
4987
#endif	  
unknown's avatar
unknown committed
4988 4989 4990
	rli->cur_log= cur_log= rli->relay_log.get_log_file();
	rli->cur_log_old_open_count= rli->relay_log.get_open_count();
	DBUG_ASSERT(rli->cur_log_fd == -1);
4991
	  
unknown's avatar
unknown committed
4992
	/*
unknown's avatar
unknown committed
4993
	  Read pointer has to be at the start since we are the only
4994 4995 4996 4997
	  reader.
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
          log (same as when we call read_log_event() above: for a hot log we
          take the mutex).
unknown's avatar
unknown committed
4998
	*/
unknown's avatar
unknown committed
4999
	if (check_binlog_magic(cur_log,&errmsg))
5000 5001
        {
          if (!hot_log) pthread_mutex_unlock(log_lock);
5002
	  goto err;
5003 5004
        }
        if (!hot_log) pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
5005
	continue;
5006
      }
5007
      if (!hot_log) pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
5008
      /*
5009 5010 5011
	if we get here, the log was not hot, so we will have to open it
	ourselves. We are sure that the log is still not hot now (a log can get
	from hot to cold, but not from cold to hot). No need for LOCK_log.
unknown's avatar
unknown committed
5012 5013
      */
#ifdef EXTRA_DEBUG
unknown's avatar
unknown committed
5014
      if (global_system_variables.log_warnings)
5015 5016
	sql_print_information("next log '%s' is not active",
                              rli->linfo.log_file_name);
unknown's avatar
unknown committed
5017 5018 5019 5020 5021
#endif	  
      // open_binlog() will check the magic header
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
				       &errmsg)) <0)
	goto err;
5022
    }
unknown's avatar
unknown committed
5023
    else
5024
    {
unknown's avatar
unknown committed
5025 5026 5027 5028 5029 5030
      /*
	Read failed with a non-EOF error.
	TODO: come up with something better to handle this error
      */
      if (hot_log)
	pthread_mutex_unlock(log_lock);
5031
      sql_print_error("Slave SQL thread: I/O error reading \
unknown's avatar
unknown committed
5032
event(errno: %d  cur_log->error: %d)",
5033
		      my_errno,cur_log->error);
unknown's avatar
unknown committed
5034
      // set read position to the beginning of the event
5035
      my_b_seek(cur_log,rli->event_relay_log_pos);
unknown's avatar
unknown committed
5036 5037
      /* otherwise, we have had a partial read */
      errmsg = "Aborting slave SQL thread because of partial event read";
5038
      break;					// To end of function
5039 5040
    }
  }
5041
  if (!errmsg && global_system_variables.log_warnings)
5042 5043 5044 5045 5046
  {
    sql_print_information("Error reading relay log event: %s", 
                          "slave SQL thread was killed");
    DBUG_RETURN(0);
  }
unknown's avatar
unknown committed
5047

5048
err:
5049 5050
  if (errmsg)
    sql_print_error("Error reading relay log event: %s", errmsg);
unknown's avatar
unknown committed
5051
  DBUG_RETURN(0);
5052 5053
}

5054 5055 5056 5057 5058 5059 5060 5061 5062 5063 5064
/*
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
  because of size is simpler because when we do it we already have all relevant
  locks; here we don't, so this function is mainly taking locks). 
  Returns nothing as we cannot catch any error (MYSQL_LOG::new_file() is void).
*/

void rotate_relay_log(MASTER_INFO* mi)
{
  DBUG_ENTER("rotate_relay_log");
  RELAY_LOG_INFO* rli= &mi->rli;
unknown's avatar
unknown committed
5065

5066 5067 5068
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
  pthread_mutex_lock(&mi->run_lock);

unknown's avatar
unknown committed
5069 5070 5071 5072
  /* 
     We need to test inited because otherwise, new_file() will attempt to lock
     LOCK_log, which may not be inited (if we're not a slave).
  */
5073 5074
  if (!rli->inited)
  {
unknown's avatar
unknown committed
5075
    DBUG_PRINT("info", ("rli->inited == 0"));
unknown's avatar
unknown committed
5076
    goto end;
5077
  }
unknown's avatar
unknown committed
5078

5079 5080
  /* If the relay log is closed, new_file() will do nothing. */
  rli->relay_log.new_file(1);
unknown's avatar
unknown committed
5081

5082 5083 5084 5085
  /*
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
    threads are started:
unknown's avatar
unknown committed
5086 5087 5088 5089 5090 5091
    relay_log_space decreases by the size of the deleted relay log, but does
    not increase, so flush-after-flush we may become negative, which is wrong.
    Even if this will be corrected as soon as a query is replicated on the
    slave (because the I/O thread will then call harvest_bytes_written() which
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
5092 5093 5094 5095
    If the log is closed, then this will just harvest the last writes, probably
    0 as they probably have been harvested.
  */
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
unknown's avatar
unknown committed
5096
end:
5097
  pthread_mutex_unlock(&mi->run_lock);
5098 5099 5100
  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
5101

5102
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
unknown's avatar
unknown committed
5103
template class I_List_iterator<i_string>;
unknown's avatar
unknown committed
5104
template class I_List_iterator<i_string_pair>;
unknown's avatar
unknown committed
5105
#endif
5106

5107

unknown's avatar
SCRUM  
unknown committed
5108
#endif /* HAVE_REPLICATION */