slave.cc 160 KB
Newer Older
1
/* Copyright (C) 2000-2003 MySQL AB
2

unknown's avatar
unknown committed
3 4 5 6
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
7

unknown's avatar
unknown committed
8 9 10 11
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
12

unknown's avatar
unknown committed
13 14 15 16 17
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "mysql_priv.h"
unknown's avatar
SCRUM  
unknown committed
18 19 20

#ifdef HAVE_REPLICATION

unknown's avatar
unknown committed
21
#include <mysql.h>
22
#include <myisam.h>
23
#include "slave.h"
24
#include "sql_repl.h"
25
#include "repl_failsafe.h"
unknown's avatar
unknown committed
26
#include <thr_alarm.h>
unknown's avatar
unknown committed
27
#include <my_dir.h>
unknown's avatar
unknown committed
28
#include <sql_common.h>
unknown's avatar
SCRUM  
unknown committed
29

30
#define MAX_SLAVE_RETRY_PAUSE 5
31 32 33
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;

34 35
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);

36
volatile bool slave_sql_running = 0, slave_io_running = 0;
37
char* slave_load_tmpdir = 0;
unknown's avatar
unknown committed
38
MASTER_INFO *active_mi;
39
HASH replicate_do_table, replicate_ignore_table;
unknown's avatar
unknown committed
40
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
41
bool do_table_inited = 0, ignore_table_inited = 0;
unknown's avatar
unknown committed
42
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
43 44
bool table_rules_on= 0;
my_bool replicate_same_server_id;
45
ulonglong relay_log_space_limit = 0;
unknown's avatar
unknown committed
46 47 48 49 50 51 52

/*
  When slave thread exits, we need to remember the temporary tables so we
  can re-use them on slave start.

  TODO: move the vars below under MASTER_INFO
*/
53

54
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
55
int events_till_abort = -1;
56
static int events_till_disconnect = -1;
unknown's avatar
unknown committed
57

58
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
unknown's avatar
unknown committed
59

60
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
unknown's avatar
unknown committed
61
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
unknown's avatar
unknown committed
62
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
63 64
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
unknown's avatar
unknown committed
65
static int count_relay_log_space(RELAY_LOG_INFO* rli);
66
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
67
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
unknown's avatar
unknown committed
68 69
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings);
unknown's avatar
unknown committed
70
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
71
			     bool reconnect, bool suppress_warnings);
72 73
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg);
74
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
75
static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
76
				  const char* table_name, bool overwrite);
77
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi);
78 79

/*
unknown's avatar
unknown committed
80
  Find out which replications threads are running
81

unknown's avatar
unknown committed
82 83 84 85 86 87 88 89 90 91 92 93 94
  SYNOPSIS
    init_thread_mask()
    mask		Return value here
    mi			master_info for slave
    inverse		If set, returns which threads are not running

  IMPLEMENTATION
    Get a bit mask for which threads are running so that we can later restart
    these threads.

  RETURN
    mask	If inverse == 0, running threads
		If inverse == 1, stopped threads    
95 96
*/

97 98 99 100 101 102 103 104
void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
{
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
  register int tmp_mask=0;
  if (set_io)
    tmp_mask |= SLAVE_IO;
  if (set_sql)
    tmp_mask |= SLAVE_SQL;
105 106
  if (inverse)
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
107 108 109
  *mask = tmp_mask;
}

110

unknown's avatar
unknown committed
111
/*
112
  lock_slave_threads()
unknown's avatar
unknown committed
113
*/
114

115 116 117 118 119 120 121
void lock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_lock(&mi->run_lock);
  pthread_mutex_lock(&mi->rli.run_lock);
}

122

unknown's avatar
unknown committed
123
/*
124
  unlock_slave_threads()
unknown's avatar
unknown committed
125
*/
126

127 128 129 130 131 132 133
void unlock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_unlock(&mi->rli.run_lock);
  pthread_mutex_unlock(&mi->run_lock);
}

134

unknown's avatar
unknown committed
135
/* Initialize slave structures */
136

137 138
int init_slave()
{
139
  DBUG_ENTER("init_slave");
140

141 142 143 144 145 146
  /*
    This is called when mysqld starts. Before client connections are
    accepted. However bootstrap may conflict with us if it does START SLAVE.
    So it's safer to take the lock.
  */
  pthread_mutex_lock(&LOCK_active_mi);
147 148 149 150
  /*
    TODO: re-write this to interate through the list of files
    for multi-master
  */
unknown's avatar
unknown committed
151
  active_mi= new MASTER_INFO;
152 153

  /*
154 155 156
    If master_host is not specified, try to read it from the master_info file.
    If master_host is specified, create the master_info file if it doesn't
    exists.
157
  */
158 159 160 161 162
  if (!active_mi)
  {
    sql_print_error("Failed to allocate memory for the master info structure");
    goto err;
  }
163

unknown's avatar
unknown committed
164
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
165
		       !master_host, (SLAVE_IO | SLAVE_SQL)))
166
  {
167
    sql_print_error("Failed to initialize the master info structure");
unknown's avatar
unknown committed
168
    goto err;
169
  }
170 171 172 173

  if (server_id && !master_host && active_mi->host[0])
    master_host= active_mi->host;

174 175
  /* If server id is not set, start_slave_thread() will say it */

176
  if (master_host && !opt_skip_slave_start)
177
  {
178 179 180 181 182 183
    if (start_slave_threads(1 /* need mutex */,
			    0 /* no wait for start*/,
			    active_mi,
			    master_info_file,
			    relay_log_info_file,
			    SLAVE_IO | SLAVE_SQL))
unknown's avatar
unknown committed
184
    {
185
      sql_print_error("Failed to create slave threads");
unknown's avatar
unknown committed
186 187
      goto err;
    }
188
  }
189
  pthread_mutex_unlock(&LOCK_active_mi);
190
  DBUG_RETURN(0);
191

unknown's avatar
unknown committed
192
err:
193
  pthread_mutex_unlock(&LOCK_active_mi);
unknown's avatar
unknown committed
194
  DBUG_RETURN(1);
195
}
196

197

198 199
static void free_table_ent(TABLE_RULE_ENT* e)
{
unknown's avatar
unknown committed
200
  my_free((gptr) e, MYF(0));
201 202
}

203

204 205 206 207 208 209 210
static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
			   my_bool not_used __attribute__((unused)))
{
  *len = e->key_len;
  return (byte*)e->db;
}

211 212 213 214 215 216 217 218 219 220 221

/*
  Open the given relay log

  SYNOPSIS
    init_relay_log_pos()
    rli			Relay information (will be initialized)
    log			Name of relay log file to read from. NULL = First log
    pos			Position in relay log file 
    need_data_lock	Set to 1 if this functions should do mutex locks
    errmsg		Store pointer to error message here
222 223 224
    look_for_description_event 
                        1 if we should look for such an event. We only need
                        this when the SQL thread starts and opens an existing
225 226 227 228
                        relay log and has to execute it (possibly from an
                        offset >4); then we need to read the first event of
                        the relay log to be able to parse the events we have
                        to execute.
229 230 231 232 233 234 235 236

  DESCRIPTION
  - Close old open relay log files.
  - If we are using the same relay log as the running IO-thread, then set
    rli->cur_log to point to the same IO_CACHE entry.
  - If not, open the 'log' binary file.

  TODO
237
    - check proper initialization of group_master_log_name/group_master_log_pos
238 239 240 241 242

  RETURN VALUES
    0	ok
    1	error.  errmsg is set to point to the error message
*/
unknown's avatar
unknown committed
243

244 245
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
		       ulonglong pos, bool need_data_lock,
246 247
		       const char** errmsg,
                       bool look_for_description_event)
248
{
unknown's avatar
unknown committed
249
  DBUG_ENTER("init_relay_log_pos");
250
  DBUG_PRINT("info", ("pos=%lu", pos));
unknown's avatar
unknown committed
251

252
  *errmsg=0;
253
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
254
  
255 256
  if (need_data_lock)
    pthread_mutex_lock(&rli->data_lock);
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274

  /*
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
    is, too, and init_slave() too; these 2 functions allocate a description
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
    thread as that thread is not started by these functions. So we have to free
    the description_event here, in case, so that there is no memory leak in
    running, say, CHANGE MASTER.
  */
  delete rli->relay_log.description_event_for_exec;
  /*
    By default the relay log is in binlog format 3 (4.0).
    Even if format is 4, this will work enough to read the first event
    (Format_desc) (remember that format 4 is just lenghtened compared to format
    3; format 3 is a prefix of format 4). 
  */
  rli->relay_log.description_event_for_exec= new
    Format_description_log_event(3);
275
  
276 277
  pthread_mutex_lock(log_lock);
  
278
  /* Close log file and free buffers if it's already open */
279 280 281 282 283 284 285
  if (rli->cur_log_fd >= 0)
  {
    end_io_cache(&rli->cache_buf);
    my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
  
286
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
unknown's avatar
unknown committed
287

unknown's avatar
unknown committed
288 289 290 291
  /*
    Test to see if the previous run was with the skip of purging
    If yes, we do not purge when we restart
  */
292
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
293 294 295 296
  {
    *errmsg="Could not find first log during relay log initialization";
    goto err;
  }
297

298
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
unknown's avatar
unknown committed
299
  {
300 301
    *errmsg="Could not find target log during relay log initialization";
    goto err;
unknown's avatar
unknown committed
302
  }
303 304 305 306
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
	  sizeof(rli->group_relay_log_name)-1);
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
	  sizeof(rli->event_relay_log_name)-1);
307 308
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
  {
309 310 311 312 313
    /*
      The IO thread is using this log file.
      In this case, we will use the same IO_CACHE pointer to
      read data as the IO thread is using to write data.
    */
314 315
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
    if (check_binlog_magic(rli->cur_log,errmsg))
316
      goto err;
unknown's avatar
unknown committed
317
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
318 319 320
  }
  else
  {
321 322 323
    /*
      Open the relay log and set rli->cur_log to point at this one
    */
324 325 326 327 328
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
				     rli->linfo.log_file_name,errmsg)) < 0)
      goto err;
    rli->cur_log = &rli->cache_buf;
  }
329 330 331 332 333 334 335 336 337 338
  /*
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
    sure.
  */
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
  {
    Log_event* ev;
    while (look_for_description_event) 
    {
      /*
339 340
        Read the possible Format_description_log_event; if position
        was 4, no need, it will be read naturally.
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
      */
      DBUG_PRINT("info",("looking for a Format_description_log_event"));

      if (my_b_tell(rli->cur_log) >= pos)
        break;

      /*
        Because of we have rli->data_lock and log_lock, we can safely read an
        event
      */
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
                                         rli->relay_log.description_event_for_exec)))
      {
        DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
                           rli->cur_log->error));
        if (rli->cur_log->error) /* not EOF */
        {
          *errmsg= "I/O error reading event at position 4";
          goto err;
        }
        break;
      }
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
      {
        DBUG_PRINT("info",("found Format_description_log_event"));
        delete rli->relay_log.description_event_for_exec;
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
        /*
          As ev was returned by read_log_event, it has passed is_valid(), so
          my_malloc() in ctor worked, no need to check again.
        */
        /*
          Ok, we found a Format_description event. But it is not sure that this
          describes the whole relay log; indeed, one can have this sequence
          (starting from position 4):
          Format_desc (of slave)
          Rotate (of master)
378
          Format_desc (of master)
379 380 381
          So the Format_desc which really describes the rest of the relay log
          is the 3rd event (it can't be further than that, because we rotate
          the relay log when we queue a Rotate event from the master).
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
          But what describes the Rotate is the first Format_desc.
          So what we do is:
          go on searching for Format_description events, until you exceed the
          position (argument 'pos') or until you find another event than Rotate
          or Format_desc.
        */
      }
      else 
      {
        DBUG_PRINT("info",("found event of another type=%d",
                           ev->get_type_code()));
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
        delete ev;
      }
    }
unknown's avatar
unknown committed
397
    my_b_seek(rli->cur_log,(off_t)pos);
398 399 400 401 402 403 404 405 406 407
#ifndef DBUG_OFF
  {
    char llbuf1[22], llbuf2[22];
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
                        llstr(my_b_tell(rli->cur_log),llbuf1), 
                        llstr(rli->event_relay_log_pos,llbuf2)));
  }
#endif

  }
unknown's avatar
unknown committed
408

409
err:
410 411 412 413
  /*
    If we don't purge, we can't honour relay_log_space_limit ;
    silently discard it
  */
414
  if (!relay_log_purge)
415
    rli->log_space_limit= 0;
unknown's avatar
unknown committed
416
  pthread_cond_broadcast(&rli->data_cond);
417 418 419
  
  pthread_mutex_unlock(log_lock);

unknown's avatar
unknown committed
420 421
  if (need_data_lock)
    pthread_mutex_unlock(&rli->data_lock);
422 423
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
    *errmsg= "Invalid Format_description log event; could be out of memory";
424

unknown's avatar
unknown committed
425
  DBUG_RETURN ((*errmsg) ? 1 : 0);
426 427
}

428

unknown's avatar
unknown committed
429
/*
430
  Init function to set up array for errors that should be skipped for slave
unknown's avatar
unknown committed
431

unknown's avatar
unknown committed
432 433 434 435 436 437 438
  SYNOPSIS
    init_slave_skip_errors()
    arg		List of errors numbers to skip, separated with ','

  NOTES
    Called from get_options() in mysqld.cc on start-up
*/
439

unknown's avatar
unknown committed
440
void init_slave_skip_errors(const char* arg)
441
{
unknown's avatar
unknown committed
442
  const char *p;
443
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
444 445 446 447 448
  {
    fprintf(stderr, "Badly out of memory, please check your system status\n");
    exit(1);
  }
  use_slave_mask = 1;
449
  for (;my_isspace(system_charset_info,*arg);++arg)
450
    /* empty */;
451
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
452 453 454 455 456 457 458 459 460 461 462
  {
    bitmap_set_all(&slave_error_mask);
    return;
  }
  for (p= arg ; *p; )
  {
    long err_code;
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
      break;
    if (err_code < MAX_SLAVE_ERROR)
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
463
    while (!my_isdigit(system_charset_info,*p) && *p)
464 465 466 467
      p++;
  }
}

468

unknown's avatar
unknown committed
469
void st_relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
470
						bool skip_lock)  
471 472 473
{
  if (!skip_lock)
    pthread_mutex_lock(&data_lock);
unknown's avatar
unknown committed
474
  inc_event_relay_log_pos();
unknown's avatar
unknown committed
475 476
  group_relay_log_pos= event_relay_log_pos;
  strmake(group_relay_log_name,event_relay_log_name,
unknown's avatar
unknown committed
477
	  sizeof(group_relay_log_name)-1);
unknown's avatar
unknown committed
478 479 480 481 482 483 484 485 486 487 488 489 490

  notify_group_relay_log_name_update();
        
  /*
    If the slave does not support transactions and replicates a transaction,
    users should not trust group_master_log_pos (which they can display with
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
    group_master_log_pos the slave relies on log_pos stored in the master's
    binlog, but if we are in a master's transaction these positions are always
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
    not advance as it should on the non-transactional slave (it advances by
    big leaps, whereas it should advance by small leaps).
  */
unknown's avatar
unknown committed
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
  /*
    In 4.x we used the event's len to compute the positions here. This is
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
    then the event's len is not what is was in the master's binlog, so this
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
    have the original offset of the end of the event the relay log. This is
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
    end_log_pos instead of begin_log_pos.
    If we had not done this fix here, the problem would also have appeared
    when the slave and master are 5.0 but with different event length (for
    example the slave is more recent than the master and features the event
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
    value which would lead to badly broken replication.
    Even the relay_log_pos will be corrupted in this case, because the len is
    the relay log is not "val".
    With the end_log_pos solution, we avoid computations involving lengthes.
  */
511 512
  DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
		      (long) log_pos, (long) group_master_log_pos));
unknown's avatar
unknown committed
513 514
  if (log_pos) // 3.23 binlogs don't have log_posx
  {
unknown's avatar
unknown committed
515
    group_master_log_pos= log_pos;
unknown's avatar
unknown committed
516
  }
517 518 519 520 521 522
  pthread_cond_broadcast(&data_cond);
  if (!skip_lock)
    pthread_mutex_unlock(&data_lock);
}


unknown's avatar
unknown committed
523 524 525 526 527 528 529 530 531 532 533 534 535
void st_relay_log_info::close_temporary_tables()
{
  TABLE *table,*next;

  for (table=save_temporary_tables ; table ; table=next)
  {
    next=table->next;
    /*
      Don't ask for disk deletion. For now, anyway they will be deleted when
      slave restarts, but it is a better intention to not delete them.
    */
    close_temporary(table, 0);
  }
536 537
  save_temporary_tables= 0;
  slave_open_temp_tables= 0;
unknown's avatar
unknown committed
538
}
unknown's avatar
unknown committed
539

unknown's avatar
unknown committed
540
/*
541 542
  purge_relay_logs()

unknown's avatar
unknown committed
543 544
  NOTES
    Assumes to have a run lock on rli and that no slave thread are running.
unknown's avatar
unknown committed
545 546
*/

547 548
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
		     const char** errmsg)
549
{
550
  int error=0;
unknown's avatar
unknown committed
551
  DBUG_ENTER("purge_relay_logs");
552 553 554 555

  /*
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
    Indeed, rli->inited==0 does not imply that they already are empty.
556
    It could be that slave's info initialization partly succeeded :
557
    for example if relay-log.info existed but *relay-bin*.*
558
    have been manually removed, init_relay_log_info reads the old
559 560 561 562 563 564 565 566
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
    checks for the existence of the relay log, this fails and
    init_relay_log_info leaves rli->inited to 0.
    In that pathological case, rli->master_log_pos* will be properly reinited
    at the next START SLAVE (as RESET SLAVE or CHANGE
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
    or replace them with correct files), however if the user does SHOW SLAVE
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
567
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
568 569 570
    to display fine in any case.
  */

571 572
  rli->group_master_log_name[0]= 0;
  rli->group_master_log_pos= 0;
573

574
  if (!rli->inited)
575 576
  {
    DBUG_PRINT("info", ("rli->inited == 0"));
577
    DBUG_RETURN(0);
578
  }
unknown's avatar
unknown committed
579

580 581
  DBUG_ASSERT(rli->slave_running == 0);
  DBUG_ASSERT(rli->mi->slave_running == 0);
unknown's avatar
unknown committed
582

583 584
  rli->slave_skip_counter=0;
  pthread_mutex_lock(&rli->data_lock);
585
  if (rli->relay_log.reset_logs(thd))
586 587 588 589 590
  {
    *errmsg = "Failed during log reset";
    error=1;
    goto err;
  }
591
  /* Save name of used relay log file */
592 593 594 595 596
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
	  sizeof(rli->group_relay_log_name)-1);
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
 	  sizeof(rli->event_relay_log_name)-1);
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
597 598 599 600 601
  if (count_relay_log_space(rli))
  {
    *errmsg= "Error counting relay log space";
    goto err;
  }
602
  if (!just_reset)
603 604
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
                              rli->group_relay_log_pos,
605
  			      0 /* do not need data lock */, errmsg, 0);
606
  
unknown's avatar
unknown committed
607 608 609 610
err:
#ifndef DBUG_OFF
  char buf[22];
#endif  
unknown's avatar
unknown committed
611
  DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
612
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
613
  DBUG_RETURN(error);
614 615
}

616

617 618 619 620 621 622 623
int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
{
  if (!mi->inited)
    return 0; /* successfully do nothing */
  int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
  pthread_mutex_t *sql_cond_lock,*io_cond_lock;
624
  DBUG_ENTER("terminate_slave_threads");
625 626 627 628 629 630 631 632 633 634

  sql_cond_lock=sql_lock;
  io_cond_lock=io_lock;
  
  if (skip_lock)
  {
    sql_lock = io_lock = 0;
  }
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
  {
unknown's avatar
unknown committed
635
    DBUG_PRINT("info",("Terminating IO thread"));
636 637
    mi->abort_slave=1;
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
unknown's avatar
unknown committed
638 639 640
				      io_cond_lock,
				      &mi->stop_cond,
				      &mi->slave_running)) &&
641
	!force_all)
642
      DBUG_RETURN(error);
643 644 645
  }
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
  {
unknown's avatar
unknown committed
646
    DBUG_PRINT("info",("Terminating SQL thread"));
647 648 649 650 651 652 653
    DBUG_ASSERT(mi->rli.sql_thd != 0) ;
    mi->rli.abort_slave=1;
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
				      sql_cond_lock,
				      &mi->rli.stop_cond,
				      &mi->rli.slave_running)) &&
	!force_all)
654
      DBUG_RETURN(error);
655
  }
656
  DBUG_RETURN(0);
657 658
}

659

660 661 662
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
			   pthread_mutex_t *cond_lock,
			   pthread_cond_t* term_cond,
663
			   volatile uint *slave_running)
664
{
665
  DBUG_ENTER("terminate_slave_thread");
666 667 668 669 670 671
  if (term_lock)
  {
    pthread_mutex_lock(term_lock);
    if (!*slave_running)
    {
      pthread_mutex_unlock(term_lock);
672
      DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
673 674 675
    }
  }
  DBUG_ASSERT(thd != 0);
676
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
677
  /*
678
    Is is critical to test if the slave is running. Otherwise, we might
unknown's avatar
unknown committed
679
    be referening freed memory trying to kick it
680
  */
unknown's avatar
unknown committed
681 682

  while (*slave_running)			// Should always be true
683
  {
684
    DBUG_PRINT("loop", ("killing slave thread"));
685
    KICK_SLAVE(thd);
unknown's avatar
unknown committed
686 687 688
    /*
      There is a small chance that slave thread might miss the first
      alarm. To protect againts it, resend the signal until it reacts
689 690
    */
    struct timespec abstime;
unknown's avatar
unknown committed
691
    set_timespec(abstime,2);
692 693 694 695
    pthread_cond_timedwait(term_cond, cond_lock, &abstime);
  }
  if (term_lock)
    pthread_mutex_unlock(term_lock);
696
  DBUG_RETURN(0);
697 698
}

699

unknown's avatar
unknown committed
700
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
701
		       pthread_mutex_t *cond_lock,
unknown's avatar
unknown committed
702
		       pthread_cond_t *start_cond,
703
		       volatile uint *slave_running,
unknown's avatar
unknown committed
704
		       volatile ulong *slave_run_id,
705 706
		       MASTER_INFO* mi,
                       bool high_priority)
707 708
{
  pthread_t th;
unknown's avatar
unknown committed
709
  ulong start_id;
710
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
711 712
  DBUG_ENTER("start_slave_thread");

713 714 715 716 717 718 719 720 721
  if (start_lock)
    pthread_mutex_lock(start_lock);
  if (!server_id)
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
    sql_print_error("Server id not set, will not start slave");
unknown's avatar
unknown committed
722
    DBUG_RETURN(ER_BAD_SLAVE);
723 724 725
  }
  
  if (*slave_running)
726 727 728 729 730
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
731
    DBUG_RETURN(ER_SLAVE_MUST_STOP);
732
  }
unknown's avatar
unknown committed
733 734
  start_id= *slave_run_id;
  DBUG_PRINT("info",("Creating new slave thread"));
735 736
  if (high_priority)
    my_pthread_attr_setprio(&connection_attrib,CONNECT_PRIOR);
737 738 739 740
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
  {
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
741
    DBUG_RETURN(ER_SLAVE_THREAD);
742
  }
unknown's avatar
unknown committed
743
  if (start_cond && cond_lock) // caller has cond_lock
744 745
  {
    THD* thd = current_thd;
unknown's avatar
unknown committed
746
    while (start_id == *slave_run_id)
747
    {
unknown's avatar
unknown committed
748
      DBUG_PRINT("sleep",("Waiting for slave thread to start"));
749
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
750
					    "Waiting for slave thread to start");
751 752
      pthread_cond_wait(start_cond,cond_lock);
      thd->exit_cond(old_msg);
unknown's avatar
unknown committed
753
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
754
      if (thd->killed)
unknown's avatar
SCRUM  
unknown committed
755
	DBUG_RETURN(thd->killed_errno());
756 757 758 759
    }
  }
  if (start_lock)
    pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
760
  DBUG_RETURN(0);
761
}
unknown's avatar
unknown committed
762

763

unknown's avatar
unknown committed
764
/*
765
  start_slave_threads()
unknown's avatar
unknown committed
766

unknown's avatar
unknown committed
767 768 769 770
  NOTES
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
    sense to do that for starting a slave--we always care if it actually
    started the threads that were not previously running
771
*/
unknown's avatar
unknown committed
772

773 774 775 776 777 778 779
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
			MASTER_INFO* mi, const char* master_info_fname,
			const char* slave_info_fname, int thread_mask)
{
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
  pthread_cond_t* cond_io=0,*cond_sql=0;
  int error=0;
780
  DBUG_ENTER("start_slave_threads");
781 782 783 784 785 786 787 788 789 790 791 792 793
  
  if (need_slave_mutex)
  {
    lock_io = &mi->run_lock;
    lock_sql = &mi->rli.run_lock;
  }
  if (wait_for_start)
  {
    cond_io = &mi->start_cond;
    cond_sql = &mi->rli.start_cond;
    lock_cond_io = &mi->run_lock;
    lock_cond_sql = &mi->rli.run_lock;
  }
794 795 796

  if (thread_mask & SLAVE_IO)
    error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
unknown's avatar
unknown committed
797 798
			     cond_io,
			     &mi->slave_running, &mi->slave_run_id,
799
			     mi, 1); //high priority, to read the most possible
800
  if (!error && (thread_mask & SLAVE_SQL))
801
  {
802 803
    error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
			     cond_sql,
unknown's avatar
unknown committed
804
			     &mi->rli.slave_running, &mi->rli.slave_run_id,
805
			     mi, 0);
806 807 808
    if (error)
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
  }
809
  DBUG_RETURN(error);
810
}
811

812

813 814
void init_table_rule_hash(HASH* h, bool* h_inited)
{
unknown's avatar
unknown committed
815
  hash_init(h, system_charset_info,TABLE_RULE_HASH_SIZE,0,0,
816
	    (hash_get_key) get_table_key,
817
	    (hash_free_key) free_table_ent, 0);
818 819
  *h_inited = 1;
}
unknown's avatar
unknown committed
820

821

unknown's avatar
unknown committed
822 823
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
{
824
  my_init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
unknown's avatar
unknown committed
825 826 827 828
		     TABLE_RULE_ARR_SIZE);
  *a_inited = 1;
}

829

unknown's avatar
unknown committed
830 831 832 833 834
static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
  uint i;
  const char* key_end = key + len;
  
835
  for (i = 0; i < a->elements; i++)
unknown's avatar
unknown committed
836 837 838
    {
      TABLE_RULE_ENT* e ;
      get_dynamic(a, (gptr)&e, i);
unknown's avatar
unknown committed
839
      if (!my_wildcmp(system_charset_info, key, key_end, 
840
                            (const char*)e->db,
unknown's avatar
unknown committed
841 842
			    (const char*)(e->db + e->key_len),
			    '\\',wild_one,wild_many))
unknown's avatar
unknown committed
843 844 845 846 847 848
	return e;
    }
  
  return 0;
}

849

850 851 852 853 854 855
/*
  Checks whether tables match some (wild_)do_table and (wild_)ignore_table
  rules (for replication)

  SYNOPSIS
    tables_ok()
856
    thd             thread (SQL slave thread normally). Mustn't be null.
857 858 859 860 861 862
    tables          list of tables to check

  NOTES
    Note that changing the order of the tables in the list can lead to
    different results. Note also the order of precedence of the do/ignore 
    rules (see code below). For that reason, users should not set conflicting 
863 864
    rules because they may get unpredicted results (precedence order is
    explained in the manual).
865

866 867
    Thought which arose from a question of a big customer "I want to include
    all tables like "abc.%" except the "%.EFG"". This can't be done now. If we
868 869
    supported Perl regexps we could do it with this pattern: /^abc\.(?!EFG)/
    (I could not find an equivalent in the regex library MySQL uses).
870 871 872 873 874

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated                  
*/
875

876
bool tables_ok(THD* thd, TABLE_LIST* tables)
877
{
878
  bool some_tables_updating= 0;
879 880
  DBUG_ENTER("tables_ok");

881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897
  /*
    In routine, can't reliably pick and choose substatements, so always
    replicate.
    We can't reliably know if one substatement should be executed or not:
    consider the case of this substatement: a SELECT on a non-replicated
    constant table; if we don't execute it maybe it was going to fill a
    variable which was going to be used by the next substatement to update
    a replicated table? If we execute it maybe the constant non-replicated
    table does not exist (and so we'll fail) while there was no need to
    execute this as this SELECT does not influence replicated tables in the
    rest of the routine? In other words: users are used to replicate-*-table
    specifying how to handle updates to tables, these options don't say
    anything about reads to tables; we can't guess.
  */
  if (thd->spcont)
    DBUG_RETURN(1);

unknown's avatar
VIEW  
unknown committed
898
  for (; tables; tables= tables->next_global)
unknown's avatar
unknown committed
899
  {
900 901 902 903
    char hash_key[2*NAME_LEN+2];
    char *end;
    uint len;

unknown's avatar
unknown committed
904 905
    if (!tables->updating) 
      continue;
906
    some_tables_updating= 1;
907 908
    end= strmov(hash_key, tables->db ? tables->db : thd->db);
    *end++= '.';
909
    len= (uint) (strmov(end, tables->table_name) - hash_key);
unknown's avatar
unknown committed
910
    if (do_table_inited) // if there are any do's
911
    {
unknown's avatar
unknown committed
912
      if (hash_search(&replicate_do_table, (byte*) hash_key, len))
913
	DBUG_RETURN(1);
unknown's avatar
unknown committed
914
    }
915
    if (ignore_table_inited) // if there are any ignores
unknown's avatar
unknown committed
916 917
    {
      if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
918
	DBUG_RETURN(0); 
919
    }
unknown's avatar
unknown committed
920 921
    if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
					  hash_key, len))
922
      DBUG_RETURN(1);
unknown's avatar
unknown committed
923 924
    if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
					      hash_key, len))
925
      DBUG_RETURN(0);
unknown's avatar
unknown committed
926
  }
927

unknown's avatar
unknown committed
928
  /*
929 930
    If no table was to be updated, ignore statement (no reason we play it on
    slave, slave is supposed to replicate _changes_ only).
unknown's avatar
unknown committed
931 932 933
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
934 935
  DBUG_RETURN(some_tables_updating &&
              !do_table_inited && !wild_do_table_inited);
936 937
}

938

939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988
/*
  Checks whether a db matches wild_do_table and wild_ignore_table
  rules (for replication)

  SYNOPSIS
    db_ok_with_wild_table()
    db		name of the db to check.
		Is tested with check_db_name() before calling this function.

  NOTES
    Here is the reason for this function.
    We advise users who want to exclude a database 'db1' safely to do it
    with replicate_wild_ignore_table='db1.%' instead of binlog_ignore_db or
    replicate_ignore_db because the two lasts only check for the selected db,
    which won't work in that case:
    USE db2;
    UPDATE db1.t SET ... #this will be replicated and should not
    whereas replicate_wild_ignore_table will work in all cases.
    With replicate_wild_ignore_table, we only check tables. When
    one does 'DROP DATABASE db1', tables are not involved and the
    statement will be replicated, while users could expect it would not (as it
    rougly means 'DROP db1.first_table, DROP db1.second_table...').
    In other words, we want to interpret 'db1.%' as "everything touching db1".
    That is why we want to match 'db1' against 'db1.%' wild table rules.

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated
 */

int db_ok_with_wild_table(const char *db)
{
  char hash_key[NAME_LEN+2];
  char *end;
  int len;
  end= strmov(hash_key, db);
  *end++= '.';
  len= end - hash_key ;
  if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
                                        hash_key, len))
    return 1;
  if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
                                            hash_key, len))
    return 0;
  
  /*
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
  return !wild_do_table_inited;
989 990 991 992 993
}


int add_table_rule(HASH* h, const char* table_spec)
{
unknown's avatar
unknown committed
994
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
995
  if (!dot) return 1;
unknown's avatar
unknown committed
996
  // len is always > 0 because we know the there exists a '.'
997 998 999
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
unknown's avatar
unknown committed
1000
  if (!e) return 1;
1001 1002 1003 1004
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
unknown's avatar
SCRUM  
unknown committed
1005
  (void)my_hash_insert(h, (byte*)e);
1006 1007 1008
  return 0;
}

1009

unknown's avatar
unknown committed
1010 1011 1012
/*
  Add table expression with wildcards to dynamic array
*/
1013

unknown's avatar
unknown committed
1014 1015
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
unknown's avatar
unknown committed
1016
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
1017
  if (!dot) return 1;
unknown's avatar
unknown committed
1018 1019 1020
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
unknown's avatar
unknown committed
1021
  if (!e) return 1;
unknown's avatar
unknown committed
1022 1023 1024 1025 1026 1027 1028 1029
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  insert_dynamic(a, (gptr)&e);
  return 0;
}

1030

1031 1032 1033
static void free_string_array(DYNAMIC_ARRAY *a)
{
  uint i;
1034
  for (i = 0; i < a->elements; i++)
1035 1036
    {
      char* p;
unknown's avatar
unknown committed
1037
      get_dynamic(a, (gptr) &p, i);
1038 1039 1040 1041 1042
      my_free(p, MYF(MY_WME));
    }
  delete_dynamic(a);
}

1043

1044
#ifdef NOT_USED_YET
1045 1046 1047 1048 1049
static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
{
  end_master_info(mi);
  return 0;
}
1050
#endif
1051

1052

unknown's avatar
unknown committed
1053 1054 1055 1056 1057 1058
/*
  Free all resources used by slave

  SYNOPSIS
    end_slave()
*/
1059

1060 1061
void end_slave()
{
1062 1063 1064 1065 1066 1067 1068 1069
  /*
    This is called when the server terminates, in close_connections().
    It terminates slave threads. However, some CHANGE MASTER etc may still be
    running presently. If a START SLAVE was in progress, the mutex lock below
    will make us wait until slave threads have started, and START SLAVE
    returns, then we terminate them here.
  */
  pthread_mutex_lock(&LOCK_active_mi);
unknown's avatar
unknown committed
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
  if (active_mi)
  {
    /*
      TODO: replace the line below with
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
      once multi-master code is ready.
    */
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
    end_master_info(active_mi);
    if (do_table_inited)
      hash_free(&replicate_do_table);
    if (ignore_table_inited)
      hash_free(&replicate_ignore_table);
    if (wild_do_table_inited)
      free_string_array(&replicate_wild_do_table);
    if (wild_ignore_table_inited)
      free_string_array(&replicate_wild_ignore_table);
    delete active_mi;
    active_mi= 0;
  }
1090
  pthread_mutex_unlock(&LOCK_active_mi);
1091
}
unknown's avatar
unknown committed
1092

1093

1094
static bool io_slave_killed(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
1095
{
1096
  DBUG_ASSERT(mi->io_thd == thd);
1097
  DBUG_ASSERT(mi->slave_running); // tracking buffer overrun
1098
  return mi->abort_slave || abort_loop || thd->killed;
unknown's avatar
unknown committed
1099 1100
}

1101

1102
static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
1103 1104 1105 1106 1107 1108
{
  DBUG_ASSERT(rli->sql_thd == thd);
  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
  return rli->abort_slave || abort_loop || thd->killed;
}

1109

1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
/*
  Writes an error message to rli->last_slave_error and rli->last_slave_errno
  (which will be displayed by SHOW SLAVE STATUS), and prints it to stderr.

  SYNOPSIS
    slave_print_error()
    rli		
    err_code    The error code
    msg         The error message (usually related to the error code, but can
                contain more information).
    ...         (this is printf-like format, with % symbols in msg)

  RETURN VALUES
    void
1124
*/
1125

1126
void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
1127 1128 1129
{
  va_list args;
  va_start(args,msg);
1130 1131 1132
  my_vsnprintf(rli->last_slave_error,
	       sizeof(rli->last_slave_error), msg, args);
  rli->last_slave_errno = err_code;
1133 1134
  /* If the error string ends with '.', do not add a ',' it would be ugly */
  if (rli->last_slave_error[0] && 
1135 1136
      (*(strend(rli->last_slave_error)-1) == '.'))
    sql_print_error("Slave: %s Error_code: %d", rli->last_slave_error,
1137 1138
                    err_code);
  else
1139
    sql_print_error("Slave: %s, Error_code: %d", rli->last_slave_error,
1140 1141
                    err_code);

1142 1143
}

unknown's avatar
unknown committed
1144
/*
1145 1146
  skip_load_data_infile()

unknown's avatar
unknown committed
1147 1148 1149
  NOTES
    This is used to tell a 3.23 master to break send_file()
*/
1150 1151 1152 1153 1154 1155 1156 1157

void skip_load_data_infile(NET *net)
{
  (void)net_request_file(net, "/dev/null");
  (void)my_net_read(net);				// discard response
  (void)net_write_command(net, 0, "", 0, "", 0);	// Send ok
}

1158

1159
bool net_request_file(NET* net, const char* fname)
1160
{
1161 1162
  DBUG_ENTER("net_request_file");
  DBUG_RETURN(net_write_command(net, 251, fname, strlen(fname), "", 0));
1163 1164
}

1165

1166
const char *rewrite_db(const char* db, uint32 *new_len)
unknown's avatar
unknown committed
1167
{
unknown's avatar
unknown committed
1168 1169
  if (replicate_rewrite_db.is_empty() || !db)
    return db;
unknown's avatar
unknown committed
1170 1171 1172
  I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  i_string_pair* tmp;

unknown's avatar
unknown committed
1173 1174 1175
  while ((tmp=it++))
  {
    if (!strcmp(tmp->key, db))
1176
    {
1177
      *new_len= (uint32)strlen(tmp->val);
unknown's avatar
unknown committed
1178
      return tmp->val;
1179
    }
unknown's avatar
unknown committed
1180
  }
unknown's avatar
unknown committed
1181 1182
  return db;
}
1183

1184 1185
/*
  From other comments and tests in code, it looks like
1186
  sometimes Query_log_event and Load_log_event can have db == 0
1187 1188 1189
  (see rewrite_db() above for example)
  (cases where this happens are unclear; it may be when the master is 3.23).
*/
1190 1191

const char *print_slave_db_safe(const char* db)
1192
{
1193
  return (db ? db : "");
1194
}
1195

1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209
/*
  Checks whether a db matches some do_db and ignore_db rules
  (for logging or replication)

  SYNOPSIS
    db_ok()
    db              name of the db to check
    do_list         either binlog_do_db or replicate_do_db
    ignore_list     either binlog_ignore_db or replicate_ignore_db

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated                  
*/
unknown's avatar
unknown committed
1210

unknown's avatar
unknown committed
1211 1212 1213
int db_ok(const char* db, I_List<i_string> &do_list,
	  I_List<i_string> &ignore_list )
{
unknown's avatar
unknown committed
1214
  if (do_list.is_empty() && ignore_list.is_empty())
unknown's avatar
unknown committed
1215 1216
    return 1; // ok to replicate if the user puts no constraints

unknown's avatar
unknown committed
1217 1218 1219 1220 1221
  /*
    If the user has specified restrictions on which databases to replicate
    and db was not selected, do not replicate.
  */
  if (!db)
unknown's avatar
unknown committed
1222
    return 0;
unknown's avatar
unknown committed
1223

unknown's avatar
unknown committed
1224 1225 1226 1227
  if (!do_list.is_empty()) // if the do's are not empty
  {
    I_List_iterator<i_string> it(do_list);
    i_string* tmp;
unknown's avatar
unknown committed
1228

unknown's avatar
unknown committed
1229 1230 1231 1232
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 1; // match
unknown's avatar
unknown committed
1233
    }
unknown's avatar
unknown committed
1234 1235
    return 0;
  }
unknown's avatar
unknown committed
1236
  else // there are some elements in the don't, otherwise we cannot get here
unknown's avatar
unknown committed
1237 1238 1239
  {
    I_List_iterator<i_string> it(ignore_list);
    i_string* tmp;
unknown's avatar
unknown committed
1240

unknown's avatar
unknown committed
1241 1242 1243 1244
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 0; // match
unknown's avatar
unknown committed
1245
    }
unknown's avatar
unknown committed
1246 1247
    return 1;
  }
unknown's avatar
unknown committed
1248 1249
}

1250

unknown's avatar
unknown committed
1251 1252
static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
				 const char *default_val)
unknown's avatar
unknown committed
1253
{
unknown's avatar
unknown committed
1254 1255 1256 1257 1258 1259 1260
  uint length;
  if ((length=my_b_gets(f,var, max_size)))
  {
    char* last_p = var + length -1;
    if (*last_p == '\n')
      *last_p = 0; // if we stopped on newline, kill it
    else
unknown's avatar
unknown committed
1261
    {
unknown's avatar
unknown committed
1262 1263 1264 1265
      /*
	If we truncated a line or stopped on last char, remove all chars
	up to and including newline.
      */
unknown's avatar
unknown committed
1266
      int c;
unknown's avatar
unknown committed
1267
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
unknown's avatar
unknown committed
1268
    }
unknown's avatar
unknown committed
1269 1270 1271 1272
    return 0;
  }
  else if (default_val)
  {
unknown's avatar
unknown committed
1273
    strmake(var,  default_val, max_size-1);
unknown's avatar
unknown committed
1274 1275
    return 0;
  }
unknown's avatar
unknown committed
1276
  return 1;
unknown's avatar
unknown committed
1277 1278
}

1279

unknown's avatar
unknown committed
1280
static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
unknown's avatar
unknown committed
1281 1282 1283
{
  char buf[32];
  
unknown's avatar
unknown committed
1284 1285 1286 1287 1288
  if (my_b_gets(f, buf, sizeof(buf))) 
  {
    *var = atoi(buf);
    return 0;
  }
unknown's avatar
unknown committed
1289
  else if (default_val)
unknown's avatar
unknown committed
1290 1291 1292 1293
  {
    *var = default_val;
    return 0;
  }
unknown's avatar
unknown committed
1294
  return 1;
unknown's avatar
unknown committed
1295 1296
}

1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
/*
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
  relying on the binlog's version. This is not perfect: imagine an upgrade
  of the master without waiting that all slaves are in sync with the master;
  then a slave could be fooled about the binlog's format. This is what happens
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
  slaves are fooled. So we do this only to distinguish between 3.23 and more
  recent masters (it's too late to change things for 3.23).
  
  RETURNS
  0       ok
  1       error
*/
1310

unknown's avatar
unknown committed
1311
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi)
1312
{
1313
  const char* errmsg= 0;
1314 1315 1316 1317 1318 1319 1320

  /*
    Free old description_event_for_queue (that is needed if we are in
    a reconnection).
  */
  delete mi->rli.relay_log.description_event_for_queue;
  mi->rli.relay_log.description_event_for_queue= 0;
1321
  
1322
  if (!my_isdigit(&my_charset_bin,*mysql->server_version))
unknown's avatar
unknown committed
1323
    errmsg = "Master reported unrecognized MySQL version";
1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
  else
  {
    /*
      Note the following switch will bug when we have MySQL branch 30 ;)
    */
    switch (*mysql->server_version) 
    {
    case '0':
    case '1':
    case '2':
      errmsg = "Master reported unrecognized MySQL version";
      break;
    case '3':
      mi->rli.relay_log.description_event_for_queue= new
        Format_description_log_event(1, mysql->server_version); 
      break;
    case '4':
      mi->rli.relay_log.description_event_for_queue= new
        Format_description_log_event(3, mysql->server_version); 
      break;
    default: 
      /*
        Master is MySQL >=5.0. Give a default Format_desc event, so that we can
        take the early steps (like tests for "is this a 3.23 master") which we
        have to take before we receive the real master's Format_desc which will
        override this one. Note that the Format_desc we create below is garbage
        (it has the format of the *slave*); it's only good to help know if the
        master is 3.23, 4.0, etc.
      */
      mi->rli.relay_log.description_event_for_queue= new
        Format_description_log_event(4, mysql->server_version); 
      break;
    }
1357
  }
1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370
  
  /* 
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
     can't read a 6.0 master, this will show up when the slave can't read some
     events sent by the master, and there will be error messages.
  */
  
  if (errmsg)
  {
    sql_print_error(errmsg);
    return 1;
  }
1371 1372 1373 1374 1375 1376 1377 1378

  /* as we are here, we tried to allocate the event */
  if (!mi->rli.relay_log.description_event_for_queue)
  {
    sql_print_error("Slave I/O thread failed to create a default Format_description_log_event");
    return 1;
  }

1379 1380 1381 1382 1383 1384 1385 1386 1387 1388
  /*
    Compare the master and slave's clock. Do not die if master's clock is
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
  */
  MYSQL_RES *master_res= 0;
  MYSQL_ROW master_row;
  
  if (!mysql_real_query(mysql, "SELECT UNIX_TIMESTAMP()", 23) &&
      (master_res= mysql_store_result(mysql)) &&
      (master_row= mysql_fetch_row(master_res)))
unknown's avatar
unknown committed
1389
  {
1390 1391
    mi->clock_diff_with_master= 
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
unknown's avatar
unknown committed
1392
  }
1393
  else
unknown's avatar
unknown committed
1394
  {
1395
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
1396
    sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, \
1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422
do not trust column Seconds_Behind_Master of SHOW SLAVE STATUS");
  }
  if (master_res)
    mysql_free_result(master_res);      
 
  /*
    Check that the master's server id and ours are different. Because if they
    are equal (which can result from a simple copy of master's datadir to slave,
    thus copying some my.cnf), replication will work but all events will be
    skipped.
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
    master?).
    Note: we could have put a @@SERVER_ID in the previous SELECT
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
  */
  if (!mysql_real_query(mysql, "SHOW VARIABLES LIKE 'SERVER_ID'", 31) &&
      (master_res= mysql_store_result(mysql)))
  {
    if ((master_row= mysql_fetch_row(master_res)) &&
        (::server_id == strtoul(master_row[1], 0, 10)) &&
        !replicate_same_server_id)
      errmsg= "The slave I/O thread stops because master and slave have equal \
MySQL server ids; these ids must be different for replication to work (or \
the --replicate-same-server-id option must be used on slave but this does \
not always make sense; please check the manual before using it).";
    mysql_free_result(master_res);
unknown's avatar
unknown committed
1423 1424
  }

1425 1426 1427
  /*
    Check that the master's global character_set_server and ours are the same.
    Not fatal if query fails (old master?).
1428 1429 1430 1431 1432 1433
    Note that we don't check for equality of global character_set_client and
    collation_connection (neither do we prevent their setting in
    set_var.cc). That's because from what I (Guilhem) have tested, the global
    values of these 2 are never used (new connections don't use them).
    We don't test equality of global collation_database either as it's is
    going to be deprecated (made read-only) in 4.1 very soon.
1434
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
1435
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
1436 1437 1438 1439
    charset info in each binlog event.
    We don't do it for 3.23 because masters <3.23.50 hang on
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
    test only if master is 4.x.
1440
  */
1441 1442 1443

  /* redundant with rest of code but safer against later additions */
  if (*mysql->server_version == '3')
1444
    goto err;
1445 1446 1447

  if ((*mysql->server_version == '4') &&
      !mysql_real_query(mysql, "SELECT @@GLOBAL.COLLATION_SERVER", 32) &&
1448
      (master_res= mysql_store_result(mysql)))
unknown's avatar
unknown committed
1449
  {
1450 1451 1452 1453 1454 1455
    if ((master_row= mysql_fetch_row(master_res)) &&
        strcmp(master_row[0], global_system_variables.collation_server->name))
      errmsg= "The slave I/O thread stops because master and slave have \
different values for the COLLATION_SERVER global variable. The values must \
be equal for replication to work";
    mysql_free_result(master_res);
unknown's avatar
unknown committed
1456
  }
1457

1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469
  /*
    Perform analogous check for time zone. Theoretically we also should
    perform check here to verify that SYSTEM time zones are the same on
    slave and master, but we can't rely on value of @@system_time_zone
    variable (it is time zone abbreviation) since it determined at start
    time and so could differ for slave and master even if they are really
    in the same system time zone. So we are omiting this check and just
    relying on documentation. Also according to Monty there are many users
    who are using replication between servers in various time zones. Hence 
    such check will broke everything for them. (And now everything will 
    work for them because by default both their master and slave will have 
    'SYSTEM' time zone).
1470 1471
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
    those were alpha).
1472
  */
1473
  if ((*mysql->server_version == '4') &&
1474
      !mysql_real_query(mysql, "SELECT @@GLOBAL.TIME_ZONE", 25) &&
1475
      (master_res= mysql_store_result(mysql)))
unknown's avatar
unknown committed
1476
  {
1477 1478 1479 1480 1481 1482 1483
    if ((master_row= mysql_fetch_row(master_res)) &&
        strcmp(master_row[0], 
               global_system_variables.time_zone->get_name()->ptr()))
      errmsg= "The slave I/O thread stops because master and slave have \
different values for the TIME_ZONE global variable. The values must \
be equal for replication to work";
    mysql_free_result(master_res);
unknown's avatar
unknown committed
1484 1485
  }

1486
err:
1487 1488 1489 1490 1491
  if (errmsg)
  {
    sql_print_error(errmsg);
    return 1;
  }
1492

1493 1494 1495
  return 0;
}

1496 1497 1498 1499
/*
  Used by fetch_master_table (used by LOAD TABLE tblname FROM MASTER and LOAD
  DATA FROM MASTER). Drops the table (if 'overwrite' is true) and recreates it
  from the dump. Honours replication inclusion/exclusion rules.
1500
  db must be non-zero (guarded by assertion).
1501 1502 1503 1504 1505

  RETURN VALUES
    0           success
    1           error
*/
1506

1507
static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
1508
				  const char* table_name, bool overwrite)
unknown's avatar
unknown committed
1509
{
1510
  ulong packet_len;
1511 1512
  char *query, *save_db;
  uint32 save_db_length;
1513 1514
  Vio* save_vio;
  HA_CHECK_OPT check_opt;
unknown's avatar
unknown committed
1515
  TABLE_LIST tables;
1516 1517
  int error= 1;
  handler *file;
1518
  ulong save_options;
1519
  NET *net= &mysql->net;
unknown's avatar
unknown committed
1520 1521
  DBUG_ENTER("create_table_from_dump");  

1522
  packet_len= my_net_read(net); // read create table statement
1523 1524
  if (packet_len == packet_error)
  {
unknown's avatar
unknown committed
1525
    my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
unknown's avatar
unknown committed
1526
    DBUG_RETURN(1);
1527 1528 1529
  }
  if (net->read_pos[0] == 255) // error from master
  {
1530 1531 1532 1533
    char *err_msg; 
    err_msg= (char*) net->read_pos + ((mysql->server_capabilities &
				       CLIENT_PROTOCOL_41) ?
				      3+SQLSTATE_LENGTH+1 : 3);
1534
    my_error(ER_MASTER, MYF(0), err_msg);
unknown's avatar
unknown committed
1535
    DBUG_RETURN(1);
1536
  }
unknown's avatar
unknown committed
1537
  thd->command = COM_TABLE_DUMP;
1538
  thd->query_length= packet_len;
1539
  /* Note that we should not set thd->query until the area is initalized */
1540
  if (!(query = thd->strmake((char*) net->read_pos, packet_len)))
1541 1542
  {
    sql_print_error("create_table_from_dump: out of memory");
unknown's avatar
unknown committed
1543
    my_message(ER_GET_ERRNO, "Out of memory", MYF(0));
unknown's avatar
unknown committed
1544
    DBUG_RETURN(1);
1545
  }
1546
  thd->query= query;
unknown's avatar
unknown committed
1547 1548
  thd->query_error = 0;
  thd->net.no_send_ok = 1;
1549

unknown's avatar
unknown committed
1550 1551
  bzero((char*) &tables,sizeof(tables));
  tables.db = (char*)db;
1552
  tables.alias= tables.table_name= (char*)table_name;
unknown's avatar
unknown committed
1553

unknown's avatar
unknown committed
1554 1555 1556 1557 1558 1559 1560
  /* Drop the table if 'overwrite' is true */
  if (overwrite && mysql_rm_table(thd,&tables,1,0)) /* drop if exists */
  {
    sql_print_error("create_table_from_dump: failed to drop the table");
    goto err;
  }

1561
  /* Create the table. We do not want to log the "create table" statement */
1562
  save_options = thd->options;
1563
  thd->options &= ~(ulong) (OPTION_BIN_LOG);
unknown's avatar
unknown committed
1564
  thd->proc_info = "Creating table from master dump";
unknown's avatar
unknown committed
1565
  // save old db in case we are creating in a different database
1566
  save_db = thd->db;
1567
  save_db_length= thd->db_length;
1568
  thd->db = (char*)db;
1569
  DBUG_ASSERT(thd->db != 0);
1570
  thd->db_length= strlen(thd->db);
unknown's avatar
unknown committed
1571
  mysql_parse(thd, thd->query, packet_len); // run create table
1572
  thd->db = save_db;		// leave things the way the were before
1573
  thd->db_length= save_db_length;
1574
  thd->options = save_options;
unknown's avatar
unknown committed
1575
  
1576 1577
  if (thd->query_error)
    goto err;			// mysql_parse took care of the error send
unknown's avatar
unknown committed
1578 1579

  thd->proc_info = "Opening master dump table";
1580
  tables.lock_type = TL_WRITE;
unknown's avatar
unknown committed
1581 1582 1583
  if (!open_ltable(thd, &tables, TL_WRITE))
  {
    sql_print_error("create_table_from_dump: could not open created table");
1584
    goto err;
unknown's avatar
unknown committed
1585
  }
unknown's avatar
unknown committed
1586
  
1587
  file = tables.table->file;
unknown's avatar
unknown committed
1588
  thd->proc_info = "Reading master dump table data";
1589
  /* Copy the data file */
unknown's avatar
unknown committed
1590 1591
  if (file->net_read_dump(net))
  {
unknown's avatar
unknown committed
1592
    my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
1593
    sql_print_error("create_table_from_dump: failed in\
unknown's avatar
unknown committed
1594
 handler::net_read_dump()");
1595
    goto err;
unknown's avatar
unknown committed
1596
  }
unknown's avatar
unknown committed
1597 1598

  check_opt.init();
unknown's avatar
unknown committed
1599
  check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK;
unknown's avatar
unknown committed
1600
  thd->proc_info = "Rebuilding the index on master dump table";
unknown's avatar
unknown committed
1601 1602 1603 1604 1605
  /*
    We do not want repair() to spam us with messages
    just send them to the error log, and report the failure in case of
    problems.
  */
1606
  save_vio = thd->net.vio;
unknown's avatar
unknown committed
1607
  thd->net.vio = 0;
1608
  /* Rebuild the index file from the copied data file (with REPAIR) */
1609
  error=file->repair(thd,&check_opt) != 0;
unknown's avatar
unknown committed
1610
  thd->net.vio = save_vio;
1611
  if (error)
1612
    my_error(ER_INDEX_REBUILD, MYF(0), tables.table->s->table_name);
1613 1614

err:
unknown's avatar
unknown committed
1615 1616
  close_thread_tables(thd);
  thd->net.no_send_ok = 0;
unknown's avatar
unknown committed
1617
  DBUG_RETURN(error); 
unknown's avatar
unknown committed
1618 1619
}

1620

1621
int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
1622
		       MASTER_INFO *mi, MYSQL *mysql, bool overwrite)
unknown's avatar
unknown committed
1623
{
1624 1625 1626 1627 1628 1629
  int error= 1;
  const char *errmsg=0;
  bool called_connected= (mysql != NULL);
  DBUG_ENTER("fetch_master_table");
  DBUG_PRINT("enter", ("db_name: '%s'  table_name: '%s'",
		       db_name,table_name));
unknown's avatar
unknown committed
1630

unknown's avatar
merge  
unknown committed
1631
  if (!called_connected)
1632
  { 
unknown's avatar
SCRUM  
unknown committed
1633
    if (!(mysql = mysql_init(NULL)))
1634 1635 1636
    {
      DBUG_RETURN(1);
    }
unknown's avatar
merge  
unknown committed
1637
    if (connect_to_master(thd, mysql, mi))
1638
    {
1639
      my_error(ER_CONNECT_TO_MASTER, MYF(0), mysql_error(mysql));
unknown's avatar
SCRUM  
unknown committed
1640
      mysql_close(mysql);
1641
      DBUG_RETURN(1);
1642
    }
1643 1644
    if (thd->killed)
      goto err;
1645
  }
unknown's avatar
unknown committed
1646

unknown's avatar
unknown committed
1647
  if (request_table_dump(mysql, db_name, table_name))
1648
  {
1649 1650
    error= ER_UNKNOWN_ERROR;
    errmsg= "Failed on table dump request";
1651 1652
    goto err;
  }
unknown's avatar
unknown committed
1653 1654 1655
  if (create_table_from_dump(thd, mysql, db_name,
			     table_name, overwrite))
    goto err;    // create_table_from_dump have sent the error already
unknown's avatar
unknown committed
1656
  error = 0;
1657

unknown's avatar
unknown committed
1658
 err:
1659
  thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
1660
  if (!called_connected)
unknown's avatar
SCRUM  
unknown committed
1661
    mysql_close(mysql);
1662
  if (errmsg && thd->vio_ok())
unknown's avatar
unknown committed
1663
    my_message(error, errmsg, MYF(0));
1664
  DBUG_RETURN(test(error));			// Return 1 on error
unknown's avatar
unknown committed
1665 1666
}

1667

1668 1669
void end_master_info(MASTER_INFO* mi)
{
1670 1671
  DBUG_ENTER("end_master_info");

1672
  if (!mi->inited)
1673
    DBUG_VOID_RETURN;
1674 1675
  end_relay_log_info(&mi->rli);
  if (mi->fd >= 0)
1676 1677 1678 1679 1680
  {
    end_io_cache(&mi->file);
    (void)my_close(mi->fd, MYF(MY_WME));
    mi->fd = -1;
  }
1681
  mi->inited = 0;
1682 1683

  DBUG_VOID_RETURN;
1684 1685
}

1686

1687 1688
static int init_relay_log_info(RELAY_LOG_INFO* rli,
                               const char* info_fname)
1689 1690 1691 1692 1693
{
  char fname[FN_REFLEN+128];
  int info_fd;
  const char* msg = 0;
  int error = 0;
1694
  DBUG_ENTER("init_relay_log_info");
unknown's avatar
unknown committed
1695

1696
  if (rli->inited)                       // Set if this function called
unknown's avatar
unknown committed
1697 1698
    DBUG_RETURN(0);
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
1699 1700 1701 1702
  pthread_mutex_lock(&rli->data_lock);
  info_fd = rli->info_fd;
  rli->cur_log_fd = -1;
  rli->slave_skip_counter=0;
1703
  rli->abort_pos_wait=0;
1704 1705
  rli->log_space_limit= relay_log_space_limit;
  rli->log_space_total= 0;
1706

1707
  /*
unknown's avatar
unknown committed
1708
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
1709 1710
    Note that the I/O thread flushes it to disk after writing every
    event, in flush_master_info(mi, 1).
1711 1712
  */

unknown's avatar
unknown committed
1713 1714 1715 1716 1717 1718 1719 1720 1721
  /*
    For the maximum log size, we choose max_relay_log_size if it is
    non-zero, max_binlog_size otherwise. If later the user does SET
    GLOBAL on one of these variables, fix_max_binlog_size and
    fix_max_relay_log_size will reconsider the choice (for example
    if the user changes max_relay_log_size to zero, we have to
    switch to using max_binlog_size for the relay log) and update
    rli->relay_log.max_size (and mysql_bin_log.max_size).
  */
1722
  {
1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
    char buf[FN_REFLEN];
    const char *ln;
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
                                     1, buf);

    /*
      note, that if open() fails, we'll still have index file open
      but a destructor will take care of that
    */
    if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
                            (max_relay_log_size ? max_relay_log_size :
1735
                            max_binlog_size), 1))
1736 1737 1738 1739 1740
    {
      pthread_mutex_unlock(&rli->data_lock);
      sql_print_error("Failed in open_log() called from init_relay_log_info()");
      DBUG_RETURN(1);
    }
1741
  }
1742

1743
  /* if file does not exist */
unknown's avatar
unknown committed
1744
  if (access(fname,F_OK))
1745
  {
unknown's avatar
unknown committed
1746 1747 1748 1749
    /*
      If someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
1750 1751
    if (info_fd >= 0)
      my_close(info_fd, MYF(MY_WME));
1752
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
1753
    {
1754 1755 1756 1757 1758 1759 1760
      sql_print_error("Failed to create a new relay log info file (\
file '%s', errno %d)", fname, my_errno);
      msg= current_thd->net.last_error;
      goto err;
    }
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
		      MYF(MY_WME))) 
1761
    {
1762 1763
      sql_print_error("Failed to create a cache on relay log info file '%s'",
		      fname);
1764 1765
      msg= current_thd->net.last_error;
      goto err;
1766
    }
1767 1768 1769

    /* Init relay log with first entry in the relay index file */
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
1770
			   &msg, 0))
1771
    {
1772
      sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
1773
      goto err;
1774
    }
1775 1776
    rli->group_master_log_name[0]= 0;
    rli->group_master_log_pos= 0;		
1777
    rli->info_fd= info_fd;
1778 1779 1780
  }
  else // file exists
  {
unknown's avatar
unknown committed
1781
    if (info_fd >= 0)
1782
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
1783
    else 
1784
    {
1785 1786 1787
      int error=0;
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
      {
1788 1789 1790
        sql_print_error("\
Failed to open the existing relay log info file '%s' (errno %d)",
			fname, my_errno);
1791 1792 1793 1794 1795
        error= 1;
      }
      else if (init_io_cache(&rli->info_file, info_fd,
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
      {
1796 1797
        sql_print_error("Failed to create a cache on relay log info file '%s'",
			fname);
1798 1799 1800 1801 1802 1803 1804
        error= 1;
      }
      if (error)
      {
        if (info_fd >= 0)
          my_close(info_fd, MYF(0));
        rli->info_fd= -1;
1805
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1806 1807 1808
        pthread_mutex_unlock(&rli->data_lock);
        DBUG_RETURN(1);
      }
1809
    }
1810
         
1811
    rli->info_fd = info_fd;
1812
    int relay_log_pos, master_log_pos;
1813
    if (init_strvar_from_file(rli->group_relay_log_name,
unknown's avatar
unknown committed
1814 1815
			      sizeof(rli->group_relay_log_name),
                              &rli->info_file, "") ||
1816
       init_intvar_from_file(&relay_log_pos,
unknown's avatar
unknown committed
1817
			     &rli->info_file, BIN_LOG_HEADER_SIZE) ||
1818
       init_strvar_from_file(rli->group_master_log_name,
unknown's avatar
unknown committed
1819 1820
			     sizeof(rli->group_master_log_name),
                             &rli->info_file, "") ||
1821
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
1822 1823 1824 1825
    {
      msg="Error reading slave log configuration";
      goto err;
    }
1826 1827 1828 1829
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
            sizeof(rli->event_relay_log_name)-1);
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
    rli->group_master_log_pos= master_log_pos;
1830

1831
    if (init_relay_log_pos(rli,
1832 1833
			   rli->group_relay_log_name,
			   rli->group_relay_log_pos,
1834
			   0 /* no data lock*/,
1835
			   &msg, 0))
1836 1837
    {
      char llbuf[22];
1838
      sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
unknown's avatar
unknown committed
1839 1840
		      rli->group_relay_log_name,
		      llstr(rli->group_relay_log_pos, llbuf));
1841
      goto err;
1842
    }
1843
  }
1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855

#ifndef DBUG_OFF
  {
    char llbuf1[22], llbuf2[22];
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
                        llstr(my_b_tell(rli->cur_log),llbuf1), 
                        llstr(rli->event_relay_log_pos,llbuf2)));
    DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
    DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
  }
#endif

unknown's avatar
unknown committed
1856 1857 1858 1859
  /*
    Now change the cache from READ to WRITE - must do this
    before flush_relay_log_info
  */
1860
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
1861 1862
  if ((error= flush_relay_log_info(rli)))
    sql_print_error("Failed to flush relay log info file");
unknown's avatar
unknown committed
1863 1864 1865 1866 1867
  if (count_relay_log_space(rli))
  {
    msg="Error counting relay log space";
    goto err;
  }
1868
  rli->inited= 1;
1869
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1870
  DBUG_RETURN(error);
1871 1872 1873 1874

err:
  sql_print_error(msg);
  end_io_cache(&rli->info_file);
1875 1876
  if (info_fd >= 0)
    my_close(info_fd, MYF(0));
unknown's avatar
unknown committed
1877
  rli->info_fd= -1;
1878
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1879
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1880
  DBUG_RETURN(1);
1881 1882
}

1883

unknown's avatar
unknown committed
1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897
static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo)
{
  MY_STAT s;
  DBUG_ENTER("add_relay_log");
  if (!my_stat(linfo->log_file_name,&s,MYF(0)))
  {
    sql_print_error("log %s listed in the index, but failed to stat",
		    linfo->log_file_name);
    DBUG_RETURN(1);
  }
  rli->log_space_total += s.st_size;
#ifndef DBUG_OFF
  char buf[22];
  DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
1898
#endif  
unknown's avatar
unknown committed
1899 1900 1901
  DBUG_RETURN(0);
}

1902

unknown's avatar
unknown committed
1903 1904
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
{
1905
  bool slave_killed=0;
unknown's avatar
unknown committed
1906
  MASTER_INFO* mi = rli->mi;
unknown's avatar
unknown committed
1907
  const char *save_proc_info;
unknown's avatar
unknown committed
1908
  THD* thd = mi->io_thd;
1909

unknown's avatar
unknown committed
1910
  DBUG_ENTER("wait_for_relay_log_space");
1911

unknown's avatar
unknown committed
1912
  pthread_mutex_lock(&rli->log_space_lock);
unknown's avatar
unknown committed
1913 1914
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
				  &rli->log_space_lock, 
unknown's avatar
unknown committed
1915
				  "\
1916
Waiting for the slave SQL thread to free enough relay log space");
unknown's avatar
unknown committed
1917
  while (rli->log_space_limit < rli->log_space_total &&
1918 1919
	 !(slave_killed=io_slave_killed(thd,mi)) &&
         !rli->ignore_log_space_limit)
unknown's avatar
unknown committed
1920
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
unknown's avatar
unknown committed
1921
  thd->exit_cond(save_proc_info);
unknown's avatar
unknown committed
1922 1923 1924
  DBUG_RETURN(slave_killed);
}

unknown's avatar
unknown committed
1925

unknown's avatar
unknown committed
1926 1927 1928 1929
static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
  LOG_INFO linfo;
  DBUG_ENTER("count_relay_log_space");
1930
  rli->log_space_total= 0;
1931
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
unknown's avatar
unknown committed
1932 1933 1934 1935
  {
    sql_print_error("Could not find first log while counting relay log space");
    DBUG_RETURN(1);
  }
unknown's avatar
unknown committed
1936
  do
unknown's avatar
unknown committed
1937 1938 1939
  {
    if (add_relay_log(rli,&linfo))
      DBUG_RETURN(1);
1940
  } while (!rli->relay_log.find_next_log(&linfo, 1));
1941 1942 1943 1944 1945 1946
  /* 
     As we have counted everything, including what may have written in a
     preceding write, we must reset bytes_written, or we may count some space 
     twice.
  */
  rli->relay_log.reset_bytes_written();
unknown's avatar
unknown committed
1947 1948
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1949

1950

unknown's avatar
unknown committed
1951 1952 1953 1954 1955 1956 1957 1958 1959 1960
void init_master_info_with_options(MASTER_INFO* mi)
{
  mi->master_log_name[0] = 0;
  mi->master_log_pos = BIN_LOG_HEADER_SIZE;		// skip magic number
  
  if (master_host)
    strmake(mi->host, master_host, sizeof(mi->host) - 1);
  if (master_user)
    strmake(mi->user, master_user, sizeof(mi->user) - 1);
  if (master_password)
unknown's avatar
unknown committed
1961
    strmake(mi->password, master_password, MAX_PASSWORD_LENGTH);
unknown's avatar
unknown committed
1962 1963
  mi->port = master_port;
  mi->connect_retry = master_connect_retry;
unknown's avatar
unknown committed
1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975
  
  mi->ssl= master_ssl;
  if (master_ssl_ca)
    strmake(mi->ssl_ca, master_ssl_ca, sizeof(mi->ssl_ca)-1);
  if (master_ssl_capath)
    strmake(mi->ssl_capath, master_ssl_capath, sizeof(mi->ssl_capath)-1);
  if (master_ssl_cert)
    strmake(mi->ssl_cert, master_ssl_cert, sizeof(mi->ssl_cert)-1);
  if (master_ssl_cipher)
    strmake(mi->ssl_cipher, master_ssl_cipher, sizeof(mi->ssl_cipher)-1);
  if (master_ssl_key)
    strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1);
unknown's avatar
unknown committed
1976 1977
}

1978
void clear_slave_error(RELAY_LOG_INFO* rli)
unknown's avatar
unknown committed
1979
{
unknown's avatar
unknown committed
1980 1981 1982
  /* Clear the errors displayed by SHOW SLAVE STATUS */
  rli->last_slave_error[0]= 0;
  rli->last_slave_errno= 0;
unknown's avatar
unknown committed
1983
}
1984

1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998
/*
    Reset UNTIL condition for RELAY_LOG_INFO
   SYNOPSYS
    clear_until_condition()
      rli - RELAY_LOG_INFO structure where UNTIL condition should be reset
 */
void clear_until_condition(RELAY_LOG_INFO* rli)
{
  rli->until_condition= RELAY_LOG_INFO::UNTIL_NONE;
  rli->until_log_name[0]= 0;
  rli->until_log_pos= 0;
}


unknown's avatar
unknown committed
1999
#define LINES_IN_MASTER_INFO_WITH_SSL 14
2000

unknown's avatar
unknown committed
2001

2002
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
2003 2004 2005
                     const char* slave_info_fname,
                     bool abort_if_no_master_info_file,
                     int thread_mask)
unknown's avatar
unknown committed
2006
{
unknown's avatar
unknown committed
2007 2008 2009 2010
  int fd,error;
  char fname[FN_REFLEN+128];
  DBUG_ENTER("init_master_info");

unknown's avatar
unknown committed
2011
  if (mi->inited)
unknown's avatar
unknown committed
2012 2013 2014 2015 2016 2017 2018
  {
    /*
      We have to reset read position of relay-log-bin as we may have
      already been reading from 'hotlog' when the slave was stopped
      last time. If this case pos_in_file would be set and we would
      get a crash when trying to read the signature for the binary
      relay log.
2019

2020 2021 2022 2023
      We only rewind the read position if we are starting the SQL
      thread. The handle_slave_sql thread assumes that the read
      position is at the beginning of the file, and will read the
      "signature" and then fast-forward to the last position read.
unknown's avatar
unknown committed
2024
    */
unknown's avatar
unknown committed
2025 2026
    if (thread_mask & SLAVE_SQL)
    {
2027 2028
      my_b_seek(mi->rli.cur_log, (my_off_t) 0);
    }
unknown's avatar
unknown committed
2029
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2030 2031
  }

unknown's avatar
unknown committed
2032 2033
  mi->mysql=0;
  mi->file_id=1;
2034
  fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
unknown's avatar
unknown committed
2035

unknown's avatar
unknown committed
2036 2037 2038 2039
  /*
    We need a mutex while we are changing master info parameters to
    keep other threads from reading bogus info
  */
unknown's avatar
unknown committed
2040

2041
  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
2042
  fd = mi->fd;
2043 2044

  /* does master.info exist ? */
2045

2046
  if (access(fname,F_OK))
unknown's avatar
unknown committed
2047
  {
2048 2049 2050 2051 2052
    if (abort_if_no_master_info_file)
    {
      pthread_mutex_unlock(&mi->data_lock);
      DBUG_RETURN(0);
    }
unknown's avatar
unknown committed
2053 2054 2055 2056
    /*
      if someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
unknown's avatar
unknown committed
2057 2058
    if (fd >= 0)
      my_close(fd, MYF(MY_WME));
2059 2060 2061 2062 2063 2064 2065
    if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
    {
      sql_print_error("Failed to create a new master info file (\
file '%s', errno %d)", fname, my_errno);
      goto err;
    }
    if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
unknown's avatar
unknown committed
2066
		      MYF(MY_WME)))
2067 2068 2069
    {
      sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
unknown's avatar
unknown committed
2070
      goto err;
2071
    }
unknown's avatar
unknown committed
2072

unknown's avatar
unknown committed
2073
    mi->fd = fd;
unknown's avatar
unknown committed
2074 2075
    init_master_info_with_options(mi);

unknown's avatar
unknown committed
2076
  }
2077
  else // file exists
unknown's avatar
unknown committed
2078
  {
unknown's avatar
unknown committed
2079
    if (fd >= 0)
unknown's avatar
unknown committed
2080
      reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
2081
    else
2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096
    {
      if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
      {
        sql_print_error("Failed to open the existing master info file (\
file '%s', errno %d)", fname, my_errno);
        goto err;
      }
      if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
                        0, MYF(MY_WME)))
      {
        sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
        goto err;
      }
    }
unknown's avatar
unknown committed
2097

unknown's avatar
unknown committed
2098
    mi->fd = fd;
unknown's avatar
unknown committed
2099 2100
    int port, connect_retry, master_log_pos, ssl= 0, lines;
    char *first_non_digit;
2101

unknown's avatar
unknown committed
2102 2103
    /*
       Starting from 4.1.x master.info has new format. Now its
2104 2105 2106
       first line contains number of lines in file. By reading this
       number we will be always distinguish to which version our
       master.info corresponds to. We can't simply count lines in
unknown's avatar
unknown committed
2107 2108
       file since versions before 4.1.x could generate files with more
       lines than needed.
2109
       If first line doesn't contain a number or contain number less than
unknown's avatar
unknown committed
2110
       14 then such file is treated like file from pre 4.1.1 version.
2111
       There is no ambiguity when reading an old master.info, as before
unknown's avatar
unknown committed
2112
       4.1.1, the first line contained the binlog's name, which is either
2113
       empty or has an extension (contains a '.'), so can't be confused
unknown's avatar
unknown committed
2114 2115
       with an integer.

2116
       So we're just reading first line and trying to figure which version
unknown's avatar
unknown committed
2117 2118
       is this.
    */
2119 2120 2121 2122

    /*
       The first row is temporarily stored in mi->master_log_name,
       if it is line count and not binlog name (new format) it will be
unknown's avatar
unknown committed
2123 2124
       overwritten by the second row later.
    */
2125
    if (init_strvar_from_file(mi->master_log_name,
unknown's avatar
unknown committed
2126
			      sizeof(mi->master_log_name), &mi->file,
unknown's avatar
unknown committed
2127 2128
			      ""))
      goto errwithmsg;
2129

unknown's avatar
unknown committed
2130 2131
    lines= strtoul(mi->master_log_name, &first_non_digit, 10);

2132
    if (mi->master_log_name[0]!='\0' &&
unknown's avatar
unknown committed
2133 2134
        *first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL)
    {                                          // Seems to be new format
2135
      if (init_strvar_from_file(mi->master_log_name,
unknown's avatar
unknown committed
2136 2137 2138 2139 2140
            sizeof(mi->master_log_name), &mi->file, ""))
        goto errwithmsg;
    }
    else
      lines= 7;
2141

unknown's avatar
unknown committed
2142
    if (init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
unknown's avatar
unknown committed
2143 2144 2145
	init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
			      master_host) ||
	init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
2146
			      master_user) ||
2147 2148
        init_strvar_from_file(mi->password, SCRAMBLED_PASSWORD_CHAR_LENGTH+1,
                              &mi->file, master_password) ||
2149 2150
	init_intvar_from_file(&port, &mi->file, master_port) ||
	init_intvar_from_file(&connect_retry, &mi->file,
unknown's avatar
unknown committed
2151
			      master_connect_retry))
unknown's avatar
unknown committed
2152 2153
      goto errwithmsg;

2154 2155 2156 2157
    /*
       If file has ssl part use it even if we have server without
       SSL support. But these option will be ignored later when
       slave will try connect to master, so in this case warning
unknown's avatar
unknown committed
2158 2159
       is printed.
     */
2160
    if (lines >= LINES_IN_MASTER_INFO_WITH_SSL &&
unknown's avatar
unknown committed
2161
        (init_intvar_from_file(&ssl, &mi->file, master_ssl) ||
2162
         init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca),
unknown's avatar
unknown committed
2163
                               &mi->file, master_ssl_ca) ||
2164
         init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath),
unknown's avatar
unknown committed
2165 2166 2167 2168 2169 2170 2171 2172 2173 2174
                               &mi->file, master_ssl_capath) ||
         init_strvar_from_file(mi->ssl_cert, sizeof(mi->ssl_cert),
                               &mi->file, master_ssl_cert) ||
         init_strvar_from_file(mi->ssl_cipher, sizeof(mi->ssl_cipher),
                               &mi->file, master_ssl_cipher) ||
         init_strvar_from_file(mi->ssl_key, sizeof(mi->ssl_key),
                              &mi->file, master_ssl_key)))
      goto errwithmsg;
#ifndef HAVE_OPENSSL
    if (ssl)
2175
      sql_print_warning("SSL information in the master info file "
unknown's avatar
unknown committed
2176 2177 2178
                      "('%s') are ignored because this MySQL slave was compiled "
                      "without SSL support.", fname);
#endif /* HAVE_OPENSSL */
2179

2180 2181 2182 2183 2184 2185 2186
    /*
      This has to be handled here as init_intvar_from_file can't handle
      my_off_t types
    */
    mi->master_log_pos= (my_off_t) master_log_pos;
    mi->port= (uint) port;
    mi->connect_retry= (uint) connect_retry;
unknown's avatar
unknown committed
2187
    mi->ssl= (my_bool) ssl;
unknown's avatar
unknown committed
2188
  }
2189 2190 2191
  DBUG_PRINT("master_info",("log_file_name: %s  position: %ld",
			    mi->master_log_name,
			    (ulong) mi->master_log_pos));
2192

2193
  mi->rli.mi = mi;
2194 2195 2196
  if (init_relay_log_info(&mi->rli, slave_info_fname))
    goto err;

unknown's avatar
unknown committed
2197
  mi->inited = 1;
unknown's avatar
unknown committed
2198
  // now change cache READ -> WRITE - must do this before flush_master_info
2199
  reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
unknown's avatar
unknown committed
2200
  if ((error=test(flush_master_info(mi, 1))))
2201
    sql_print_error("Failed to flush master info file");
2202
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2203
  DBUG_RETURN(error);
2204

unknown's avatar
unknown committed
2205 2206
errwithmsg:
  sql_print_error("Error reading master configuration");
2207

2208
err:
unknown's avatar
unknown committed
2209 2210 2211 2212 2213 2214
  if (fd >= 0)
  {
    my_close(fd, MYF(0));
    end_io_cache(&mi->file);
  }
  mi->fd= -1;
2215
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2216
  DBUG_RETURN(1);
unknown's avatar
unknown committed
2217 2218
}

2219

2220 2221
int register_slave_on_master(MYSQL* mysql)
{
2222 2223
  char buf[1024], *pos= buf;
  uint report_host_len, report_user_len=0, report_password_len=0;
2224

2225
  if (!report_host)
2226
    return 0;
2227
  report_host_len= strlen(report_host);
2228
  if (report_user)
2229
    report_user_len= strlen(report_user);
unknown's avatar
unknown committed
2230
  if (report_password)
2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245
    report_password_len= strlen(report_password);
  /* 30 is a good safety margin */
  if (report_host_len + report_user_len + report_password_len + 30 >
      sizeof(buf))
    return 0;					// safety

  int4store(pos, server_id); pos+= 4;
  pos= net_store_data(pos, report_host, report_host_len); 
  pos= net_store_data(pos, report_user, report_user_len);
  pos= net_store_data(pos, report_password, report_password_len);
  int2store(pos, (uint16) report_port); pos+= 2;
  int4store(pos, rpl_recovery_rank);	pos+= 4;
  /* The master will fill in master_id */
  int4store(pos, 0);			pos+= 4;

unknown's avatar
SCRUM  
unknown committed
2246
  if (simple_command(mysql, COM_REGISTER_SLAVE, (char*) buf,
2247
			(uint) (pos- buf), 0))
2248
  {
2249
    sql_print_error("Error on COM_REGISTER_SLAVE: %d '%s'",
unknown's avatar
SCRUM  
unknown committed
2250 2251
		    mysql_errno(mysql),
		    mysql_error(mysql));
2252 2253 2254 2255 2256
    return 1;
  }
  return 0;
}

2257

2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299
/*
  Builds a String from a HASH of TABLE_RULE_ENT. Cannot be used for any other 
  hash, as it assumes that the hash entries are TABLE_RULE_ENT.

  SYNOPSIS
    table_rule_ent_hash_to_str()
    s               pointer to the String to fill
    h               pointer to the HASH to read

  RETURN VALUES
    none
*/

void table_rule_ent_hash_to_str(String* s, HASH* h)
{
  s->length(0);
  for (uint i=0 ; i < h->records ; i++)
  {
    TABLE_RULE_ENT* e= (TABLE_RULE_ENT*) hash_element(h, i);
    if (s->length())
      s->append(',');
    s->append(e->db,e->key_len);
  }
}

/*
  Mostly the same thing as above
*/

void table_rule_ent_dynamic_array_to_str(String* s, DYNAMIC_ARRAY* a)
{
  s->length(0);
  for (uint i=0 ; i < a->elements ; i++)
  {
    TABLE_RULE_ENT* e;
    get_dynamic(a, (gptr)&e, i);
    if (s->length())
      s->append(',');
    s->append(e->db,e->key_len);
  }
}

unknown's avatar
unknown committed
2300
bool show_master_info(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
2301
{
2302
  // TODO: fix this for multi-master
unknown's avatar
unknown committed
2303
  List<Item> field_list;
2304 2305 2306
  Protocol *protocol= thd->protocol;
  DBUG_ENTER("show_master_info");

unknown's avatar
unknown committed
2307 2308
  field_list.push_back(new Item_empty_string("Slave_IO_State",
						     14));
unknown's avatar
unknown committed
2309
  field_list.push_back(new Item_empty_string("Master_Host",
2310
						     sizeof(mi->host)));
unknown's avatar
unknown committed
2311
  field_list.push_back(new Item_empty_string("Master_User",
2312
						     sizeof(mi->user)));
2313 2314
  field_list.push_back(new Item_return_int("Master_Port", 7,
					   MYSQL_TYPE_LONG));
2315
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
2316
					   MYSQL_TYPE_LONG));
2317
  field_list.push_back(new Item_empty_string("Master_Log_File",
2318 2319 2320
					     FN_REFLEN));
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
					   MYSQL_TYPE_LONGLONG));
2321
  field_list.push_back(new Item_empty_string("Relay_Log_File",
2322 2323 2324
					     FN_REFLEN));
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
					   MYSQL_TYPE_LONGLONG));
2325
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
2326
					     FN_REFLEN));
2327 2328
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
2329 2330 2331 2332 2333 2334
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
2335
					     28));
2336 2337 2338
  field_list.push_back(new Item_return_int("Last_Errno", 4, MYSQL_TYPE_LONG));
  field_list.push_back(new Item_empty_string("Last_Error", 20));
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
2339
					   MYSQL_TYPE_LONG));
2340
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
2341
					   MYSQL_TYPE_LONGLONG));
2342
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
2343
					   MYSQL_TYPE_LONGLONG));
2344
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
2345
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
2346
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10, 
2347
                                           MYSQL_TYPE_LONGLONG));
unknown's avatar
unknown committed
2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358
  field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
  field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
                                             sizeof(mi->ssl_ca)));
  field_list.push_back(new Item_empty_string("Master_SSL_CA_Path", 
                                             sizeof(mi->ssl_capath)));
  field_list.push_back(new Item_empty_string("Master_SSL_Cert", 
                                             sizeof(mi->ssl_cert)));
  field_list.push_back(new Item_empty_string("Master_SSL_Cipher", 
                                             sizeof(mi->ssl_cipher)));
  field_list.push_back(new Item_empty_string("Master_SSL_Key", 
                                             sizeof(mi->ssl_key)));
2359
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
unknown's avatar
unknown committed
2360
                                           MYSQL_TYPE_LONGLONG));
unknown's avatar
unknown committed
2361
  
2362 2363
  if (protocol->send_fields(&field_list,
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
unknown's avatar
unknown committed
2364
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
2365

2366 2367
  if (mi->host[0])
  {
2368
    DBUG_PRINT("info",("host is set: '%s'", mi->host));
2369
    String *packet= &thd->packet;
2370
    protocol->prepare_for_resend();
unknown's avatar
unknown committed
2371
  
2372 2373 2374 2375 2376
    /*
      TODO: we read slave_running without run_lock, whereas these variables
      are updated under run_lock and not data_lock. In 5.0 we should lock
      run_lock on top of data_lock (with good order).
    */
2377 2378
    pthread_mutex_lock(&mi->data_lock);
    pthread_mutex_lock(&mi->rli.data_lock);
unknown's avatar
unknown committed
2379 2380

    protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
2381 2382
    protocol->store(mi->host, &my_charset_bin);
    protocol->store(mi->user, &my_charset_bin);
2383 2384
    protocol->store((uint32) mi->port);
    protocol->store((uint32) mi->connect_retry);
2385
    protocol->store(mi->master_log_name, &my_charset_bin);
2386
    protocol->store((ulonglong) mi->master_log_pos);
2387
    protocol->store(mi->rli.group_relay_log_name +
2388 2389
		    dirname_length(mi->rli.group_relay_log_name),
		    &my_charset_bin);
2390 2391
    protocol->store((ulonglong) mi->rli.group_relay_log_pos);
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
unknown's avatar
unknown committed
2392 2393
    protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
                    "Yes" : "No", &my_charset_bin);
2394
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
2395 2396
    protocol->store(&replicate_do_db);
    protocol->store(&replicate_ignore_db);
2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413
    /*
      We can't directly use some protocol->store for 
      replicate_*_table,
      as Protocol doesn't know the TABLE_RULE_ENT struct.
      We first build Strings and then pass them to protocol->store.
    */
    char buf[256];
    String tmp(buf, sizeof(buf), &my_charset_bin);
    table_rule_ent_hash_to_str(&tmp, &replicate_do_table);
    protocol->store(&tmp);
    table_rule_ent_hash_to_str(&tmp, &replicate_ignore_table);
    protocol->store(&tmp);
    table_rule_ent_dynamic_array_to_str(&tmp, &replicate_wild_do_table);
    protocol->store(&tmp);
    table_rule_ent_dynamic_array_to_str(&tmp, &replicate_wild_ignore_table);
    protocol->store(&tmp);

2414
    protocol->store((uint32) mi->rli.last_slave_errno);
2415
    protocol->store(mi->rli.last_slave_error, &my_charset_bin);
2416
    protocol->store((uint32) mi->rli.slave_skip_counter);
2417
    protocol->store((ulonglong) mi->rli.group_master_log_pos);
2418
    protocol->store((ulonglong) mi->rli.log_space_total);
2419 2420 2421 2422 2423 2424 2425 2426

    protocol->store(
      mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_NONE ? "None": 
        ( mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_MASTER_POS? "Master":
          "Relay"), &my_charset_bin);
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
    protocol->store((ulonglong) mi->rli.until_log_pos);
    
unknown's avatar
unknown committed
2427 2428 2429 2430 2431 2432 2433 2434 2435 2436
#ifdef HAVE_OPENSSL 
    protocol->store(mi->ssl? "Yes":"No", &my_charset_bin);
#else
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
#endif
    protocol->store(mi->ssl_ca, &my_charset_bin);
    protocol->store(mi->ssl_capath, &my_charset_bin);
    protocol->store(mi->ssl_cert, &my_charset_bin);
    protocol->store(mi->ssl_cipher, &my_charset_bin);
    protocol->store(mi->ssl_key, &my_charset_bin);
unknown's avatar
unknown committed
2437

2438 2439 2440 2441 2442 2443
    /*
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
      connected, we can compute it otherwise show NULL (i.e. unknown).
    */
    if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) &&
        mi->rli.slave_running)
2444 2445 2446 2447 2448
    {
      long tmp= (long)((time_t)time((time_t*) 0)
                               - mi->rli.last_master_timestamp)
        - mi->clock_diff_with_master;
      /*
2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462
        Apparently on some systems tmp can be <0. Here are possible reasons
        related to MySQL:
        - the master is itself a slave of another master whose time is ahead.
        - somebody used an explicit SET TIMESTAMP on the master.
        Possible reason related to granularity-to-second of time functions
        (nothing to do with MySQL), which can explain a value of -1:
        assume the master's and slave's time are perfectly synchronized, and
        that at slave's connection time, when the master's timestamp is read,
        it is at the very end of second 1, and (a very short time later) when
        the slave's timestamp is read it is at the very beginning of second
        2. Then the recorded value for master is 1 and the recorded value for
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
        between timestamp of slave and rli->last_master_timestamp is 0
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
2463 2464 2465 2466
        This confuses users, so we don't go below 0: hence the max().

        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
        special marker to say "consider we have caught up".
2467
      */
2468 2469
      protocol->store((longlong)(mi->rli.last_master_timestamp ? max(0, tmp)
                                 : 0));
2470
    }
unknown's avatar
unknown committed
2471 2472 2473
    else
      protocol->store_null();

2474 2475
    pthread_mutex_unlock(&mi->rli.data_lock);
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2476
  
2477
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
unknown's avatar
unknown committed
2478
      DBUG_RETURN(TRUE);
2479
  }
2480
  send_eof(thd);
unknown's avatar
unknown committed
2481
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
2482 2483
}

2484

unknown's avatar
unknown committed
2485
bool flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache)
unknown's avatar
unknown committed
2486
{
unknown's avatar
unknown committed
2487
  IO_CACHE* file = &mi->file;
unknown's avatar
unknown committed
2488
  char lbuf[22];
2489 2490 2491
  DBUG_ENTER("flush_master_info");
  DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));

2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515
  /*
    Flush the relay log to disk. If we don't do it, then the relay log while
    have some part (its last kilobytes) in memory only, so if the slave server
    dies now, with, say, from master's position 100 to 150 in memory only (not
    on disk), and with position 150 in master.info, then when the slave
    restarts, the I/O thread will fetch binlogs from 150, so in the relay log
    we will have "[0, 100] U [150, infinity[" and nobody will notice it, so the
    SQL thread will jump from 100 to 150, and replication will silently break.

    When we come to this place in code, relay log may or not be initialized;
    the caller is responsible for setting 'flush_relay_log_cache' accordingly.
  */
  if (flush_relay_log_cache)
    flush_io_cache(mi->rli.relay_log.get_log_file());

  /*
    We flushed the relay log BEFORE the master.info file, because if we crash
    now, we will get a duplicate event in the relay log at restart. If we
    flushed in the other order, we would get a hole in the relay log.
    And duplicate is better than hole (with a duplicate, in later versions we
    can add detection and scrap one event; with a hole there's nothing we can
    do).
  */

unknown's avatar
unknown committed
2516 2517 2518 2519 2520 2521 2522 2523
  /*
     In certain cases this code may create master.info files that seems 
     corrupted, because of extra lines filled with garbage in the end 
     file (this happens if new contents take less space than previous 
     contents of file). But because of number of lines in the first line 
     of file we don't care about this garbage.
  */
  
unknown's avatar
unknown committed
2524
  my_b_seek(file, 0L);
unknown's avatar
unknown committed
2525 2526 2527
  my_b_printf(file, "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n",
	      LINES_IN_MASTER_INFO_WITH_SSL,
              mi->master_log_name, llstr(mi->master_log_pos, lbuf),
2528
	      mi->host, mi->user,
unknown's avatar
unknown committed
2529 2530 2531
	      mi->password, mi->port, mi->connect_retry,
              (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
              mi->ssl_cipher, mi->ssl_key);
unknown's avatar
unknown committed
2532
  flush_io_cache(file);
2533
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2534 2535
}

2536

unknown's avatar
unknown committed
2537
st_relay_log_info::st_relay_log_info()
2538 2539
  :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
   cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
unknown's avatar
unknown committed
2540 2541 2542
   ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0),
   abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0),
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
2543
   until_log_pos(0), retried_trans(0)
2544
{
unknown's avatar
unknown committed
2545 2546
  group_relay_log_name[0]= event_relay_log_name[0]=
    group_master_log_name[0]= 0;
2547 2548
  last_slave_error[0]=0; until_log_name[0]= 0;

unknown's avatar
unknown committed
2549 2550
  bzero((char*) &info_file, sizeof(info_file));
  bzero((char*) &cache_buf, sizeof(cache_buf));
2551
  cached_charset_invalidate();
unknown's avatar
unknown committed
2552 2553 2554 2555 2556 2557 2558
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
  pthread_cond_init(&data_cond, NULL);
  pthread_cond_init(&start_cond, NULL);
  pthread_cond_init(&stop_cond, NULL);
  pthread_cond_init(&log_space_cond, NULL);
unknown's avatar
unknown committed
2559
  relay_log.init_pthread_objects();
unknown's avatar
unknown committed
2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571
}


st_relay_log_info::~st_relay_log_info()
{
  pthread_mutex_destroy(&run_lock);
  pthread_mutex_destroy(&data_lock);
  pthread_mutex_destroy(&log_space_lock);
  pthread_cond_destroy(&data_cond);
  pthread_cond_destroy(&start_cond);
  pthread_cond_destroy(&stop_cond);
  pthread_cond_destroy(&log_space_cond);
2572
  relay_log.cleanup();
unknown's avatar
unknown committed
2573 2574
}

2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598
/*
  Waits until the SQL thread reaches (has executed up to) the
  log/position or timed out.

  SYNOPSIS
    wait_for_pos()
    thd             client thread that sent SELECT MASTER_POS_WAIT
    log_name        log name to wait for
    log_pos         position to wait for 
    timeout         timeout in seconds before giving up waiting

  NOTES
    timeout is longlong whereas it should be ulong ; but this is
    to catch if the user submitted a negative timeout.

  RETURN VALUES
    -2          improper arguments (log_pos<0)
                or slave not running, or master info changed
                during the function's execution,
                or client thread killed. -2 is translated to NULL by caller
    -1          timed out
    >=0         number of log events the function had to wait
                before reaching the desired log/position
 */
2599

2600
int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
2601 2602
                                    longlong log_pos,
                                    longlong timeout)
unknown's avatar
unknown committed
2603
{
2604 2605
  if (!inited)
    return -1;
unknown's avatar
unknown committed
2606
  int event_count = 0;
2607
  ulong init_abort_pos_wait;
2608 2609
  int error=0;
  struct timespec abstime; // for timeout checking
2610
  const char *msg;
2611
  DBUG_ENTER("wait_for_pos");
2612 2613
  DBUG_PRINT("enter",("log_name: '%s'  log_pos: %lu  timeout: %lu",
                      log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
2614

2615
  set_timespec(abstime,timeout);
2616
  pthread_mutex_lock(&data_lock);
2617 2618 2619
  msg= thd->enter_cond(&data_cond, &data_lock,
                       "Waiting for the slave SQL thread to "
                       "advance position");
2620
  /* 
unknown's avatar
unknown committed
2621 2622 2623 2624 2625 2626 2627 2628
     This function will abort when it notices that some CHANGE MASTER or
     RESET MASTER has changed the master info.
     To catch this, these commands modify abort_pos_wait ; We just monitor
     abort_pos_wait and see if it has changed.
     Why do we have this mechanism instead of simply monitoring slave_running
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
     the SQL thread be stopped?
     This is becasue if someones does:
2629
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
unknown's avatar
unknown committed
2630 2631
     the change may happen very quickly and we may not notice that
     slave_running briefly switches between 1/0/1.
2632
  */
2633
  init_abort_pos_wait= abort_pos_wait;
2634

2635
  /*
2636
    We'll need to
2637
    handle all possible log names comparisons (e.g. 999 vs 1000).
2638
    We use ulong for string->number conversion ; this is no
2639 2640 2641 2642
    stronger limitation than in find_uniq_filename in sql/log.cc
  */
  ulong log_name_extension;
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
2643 2644 2645

  strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));

2646 2647
  char *p= fn_ext(log_name_tmp);
  char *p_end;
2648
  if (!*p || log_pos<0)
2649 2650 2651 2652
  {
    error= -2; //means improper arguments
    goto err;
  }
2653 2654
  // Convert 0-3 to 4
  log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
unknown's avatar
unknown committed
2655
  /* p points to '.' */
2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667
  log_name_extension= strtoul(++p, &p_end, 10);
  /*
    p_end points to the first invalid character.
    If it equals to p, no digits were found, error.
    If it contains '\0' it means conversion went ok.
  */
  if (p_end==p || *p_end)
  {
    error= -2;
    goto err;
  }    

unknown's avatar
unknown committed
2668
  /* The "compare and wait" main loop */
2669
  while (!thd->killed &&
2670
         init_abort_pos_wait == abort_pos_wait &&
2671
         slave_running)
unknown's avatar
unknown committed
2672
  {
unknown's avatar
unknown committed
2673 2674
    bool pos_reached;
    int cmp_result= 0;
unknown's avatar
unknown committed
2675

2676 2677 2678 2679 2680 2681
    DBUG_PRINT("info",
               ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
                init_abort_pos_wait, abort_pos_wait));
    DBUG_PRINT("info",("group_master_log_name: '%s'  pos: %lu",
                       group_master_log_name, (ulong) group_master_log_pos));

2682
    /*
unknown's avatar
unknown committed
2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693
      group_master_log_name can be "", if we are just after a fresh
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
      (before we have executed one Rotate event from the master) or
      (rare) if the user is doing a weird slave setup (see next
      paragraph).  If group_master_log_name is "", we assume we don't
      have enough info to do the comparison yet, so we just wait until
      more data. In this case master_log_pos is always 0 except if
      somebody (wrongly) sets this slave to be a slave of itself
      without using --replicate-same-server-id (an unsupported
      configuration which does nothing), then group_master_log_pos
      will grow and group_master_log_name will stay "".
2694
    */
2695
    if (*group_master_log_name)
unknown's avatar
unknown committed
2696
    {
unknown's avatar
unknown committed
2697 2698
      char *basename= (group_master_log_name +
                       dirname_length(group_master_log_name));
unknown's avatar
unknown committed
2699
      /*
2700 2701 2702 2703
        First compare the parts before the extension.
        Find the dot in the master's log basename,
        and protect against user's input error :
        if the names do not match up to '.' included, return error
2704
      */
2705 2706 2707 2708 2709 2710 2711 2712
      char *q= (char*)(fn_ext(basename)+1);
      if (strncmp(basename, log_name_tmp, (int)(q-basename)))
      {
        error= -2;
        break;
      }
      // Now compare extensions.
      char *q_end;
2713 2714
      ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
      if (group_master_log_name_extension < log_name_extension)
2715
        cmp_result= -1 ;
2716
      else
2717
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
2718

unknown's avatar
unknown committed
2719
      pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
2720 2721
                    cmp_result > 0);
      if (pos_reached || thd->killed)
2722
        break;
unknown's avatar
unknown committed
2723
    }
2724 2725

    //wait for master update, with optional timeout.
2726
    
unknown's avatar
unknown committed
2727
    DBUG_PRINT("info",("Waiting for master update"));
2728 2729 2730 2731
    /*
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
      will wake us up.
    */
2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748
    if (timeout > 0)
    {
      /*
        Note that pthread_cond_timedwait checks for the timeout
        before for the condition ; i.e. it returns ETIMEDOUT 
        if the system time equals or exceeds the time specified by abstime
        before the condition variable is signaled or broadcast, _or_ if
        the absolute time specified by abstime has already passed at the time
        of the call.
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
        even if its condition is always immediately signaled (case of a loaded
        master).
      */
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
    }
    else
      pthread_cond_wait(&data_cond, &data_lock);
unknown's avatar
unknown committed
2749
    DBUG_PRINT("info",("Got signal of master update or timed out"));
2750
    if (error == ETIMEDOUT)
2751 2752 2753 2754
    {
      error= -1;
      break;
    }
unknown's avatar
unknown committed
2755
    error=0;
2756
    event_count++;
2757
    DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
2758
  }
2759 2760

err:
unknown's avatar
unknown committed
2761
  thd->exit_cond(msg);
2762
  DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
unknown's avatar
unknown committed
2763
improper_arguments: %d  timed_out: %d",
unknown's avatar
SCRUM  
unknown committed
2764
                     thd->killed_errno(),
2765
                     (int) (init_abort_pos_wait != abort_pos_wait),
2766
                     (int) slave_running,
2767 2768 2769
                     (int) (error == -2),
                     (int) (error == -1)));
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
2770
      !slave_running) 
2771 2772 2773 2774
  {
    error= -2;
  }
  DBUG_RETURN( error ? error : event_count );
unknown's avatar
unknown committed
2775 2776
}

2777 2778 2779 2780
void set_slave_thread_options(THD* thd)
{
  thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) |
    OPTION_AUTO_IS_NULL;
2781
  thd->variables.completion_type= 0;
2782
}
2783

2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795
void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli)
{
  thd->variables.character_set_client=
    global_system_variables.character_set_client;
  thd->variables.collation_connection=
    global_system_variables.collation_connection;
  thd->variables.collation_server=
    global_system_variables.collation_server;
  thd->update_charset();
  rli->cached_charset_invalidate();
}

unknown's avatar
unknown committed
2796
/*
2797
  init_slave_thread()
unknown's avatar
unknown committed
2798
*/
2799

2800
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
unknown's avatar
unknown committed
2801 2802
{
  DBUG_ENTER("init_slave_thread");
2803 2804
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO; 
2805
  thd->security_ctx->skip_grants();
unknown's avatar
unknown committed
2806 2807
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
unknown's avatar
unknown committed
2808
  thd->net.read_timeout = slave_net_timeout;
2809
  thd->slave_thread = 1;
2810
  set_slave_thread_options(thd);
2811
  /* 
2812
     It's nonsense to constrain the slave threads with max_join_size; if a
2813 2814 2815 2816 2817 2818
     query succeeded on master, we HAVE to execute it. So set
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
     SELECT examining more than 4 billion rows would still fail (yes, because
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
     only for client threads.
2819
  */
2820 2821
  thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) |
    OPTION_AUTO_IS_NULL | OPTION_BIG_SELECTS;
unknown's avatar
unknown committed
2822
  thd->client_capabilities = CLIENT_LOCAL_FILES;
2823
  thd->real_id=pthread_self();
unknown's avatar
unknown committed
2824 2825 2826 2827
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

2828
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
2829
  {
2830 2831
    thd->cleanup();
    delete thd;
unknown's avatar
unknown committed
2832 2833 2834
    DBUG_RETURN(-1);
  }

unknown's avatar
unknown committed
2835
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
unknown's avatar
unknown committed
2836 2837 2838 2839 2840
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

2841
  if (thd_type == SLAVE_THD_SQL)
2842
    thd->proc_info= "Waiting for the next event in relay log";
2843
  else
2844
    thd->proc_info= "Waiting for master update";
unknown's avatar
unknown committed
2845 2846 2847 2848 2849
  thd->version=refresh_version;
  thd->set_time();
  DBUG_RETURN(0);
}

2850

2851 2852
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg)
unknown's avatar
unknown committed
2853
{
2854
  int nap_time;
unknown's avatar
unknown committed
2855 2856 2857 2858 2859
  thr_alarm_t alarmed;
  thr_alarm_init(&alarmed);
  time_t start_time= time((time_t*) 0);
  time_t end_time= start_time+sec;

2860
  while ((nap_time= (int) (end_time - start_time)) > 0)
unknown's avatar
unknown committed
2861
  {
2862
    ALARM alarm_buff;
unknown's avatar
unknown committed
2863
    /*
2864
      The only reason we are asking for alarm is so that
unknown's avatar
unknown committed
2865 2866 2867
      we will be woken up in case of murder, so if we do not get killed,
      set the alarm so it goes off after we wake up naturally
    */
2868
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
unknown's avatar
unknown committed
2869
    sleep(nap_time);
2870
    thr_end_alarm(&alarmed);
unknown's avatar
unknown committed
2871
    
2872
    if ((*thread_killed)(thd,thread_killed_arg))
unknown's avatar
unknown committed
2873 2874 2875 2876 2877 2878
      return 1;
    start_time=time((time_t*) 0);
  }
  return 0;
}

2879

unknown's avatar
unknown committed
2880 2881
static int request_dump(MYSQL* mysql, MASTER_INFO* mi,
			bool *suppress_warnings)
unknown's avatar
unknown committed
2882
{
2883
  char buf[FN_REFLEN + 10];
unknown's avatar
unknown committed
2884 2885
  int len;
  int binlog_flags = 0; // for now
2886
  char* logname = mi->master_log_name;
2887 2888
  DBUG_ENTER("request_dump");

unknown's avatar
unknown committed
2889
  // TODO if big log files: Change next to int8store()
unknown's avatar
unknown committed
2890
  int4store(buf, (ulong) mi->master_log_pos);
unknown's avatar
unknown committed
2891
  int2store(buf + 4, binlog_flags);
2892
  int4store(buf + 6, server_id);
unknown's avatar
unknown committed
2893
  len = (uint) strlen(logname);
2894
  memcpy(buf + 10, logname,len);
unknown's avatar
SCRUM  
unknown committed
2895
  if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
unknown's avatar
unknown committed
2896
  {
unknown's avatar
unknown committed
2897 2898 2899 2900 2901
    /*
      Something went wrong, so we will just reconnect and retry later
      in the future, we should do a better error analysis, but for
      now we just fill up the error log :-)
    */
unknown's avatar
SCRUM  
unknown committed
2902
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
unknown's avatar
unknown committed
2903 2904
      *suppress_warnings= 1;			// Suppress reconnect warning
    else
2905
      sql_print_error("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs",
unknown's avatar
SCRUM  
unknown committed
2906
		      mysql_errno(mysql), mysql_error(mysql),
2907 2908
		      master_connect_retry);
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2909
  }
unknown's avatar
unknown committed
2910

2911
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2912 2913
}

2914

2915
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
unknown's avatar
unknown committed
2916 2917 2918
{
  char buf[1024];
  char * p = buf;
unknown's avatar
unknown committed
2919 2920
  uint table_len = (uint) strlen(table);
  uint db_len = (uint) strlen(db);
unknown's avatar
unknown committed
2921
  if (table_len + db_len > sizeof(buf) - 2)
unknown's avatar
unknown committed
2922 2923 2924 2925
  {
    sql_print_error("request_table_dump: Buffer overrun");
    return 1;
  } 
unknown's avatar
unknown committed
2926 2927 2928 2929 2930 2931 2932
  
  *p++ = db_len;
  memcpy(p, db, db_len);
  p += db_len;
  *p++ = table_len;
  memcpy(p, table, table_len);
  
unknown's avatar
SCRUM  
unknown committed
2933
  if (simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
unknown's avatar
unknown committed
2934 2935
  {
    sql_print_error("request_table_dump: Error sending the table dump \
unknown's avatar
unknown committed
2936
command");
unknown's avatar
unknown committed
2937 2938
    return 1;
  }
unknown's avatar
unknown committed
2939 2940 2941 2942

  return 0;
}

2943

unknown's avatar
unknown committed
2944
/*
2945
  Read one event from the master
unknown's avatar
unknown committed
2946 2947 2948 2949 2950 2951 2952 2953 2954
  
  SYNOPSIS
    read_event()
    mysql		MySQL connection
    mi			Master connection information
    suppress_warnings	TRUE when a normal net read timeout has caused us to
			try a reconnect.  We do not want to print anything to
			the error log in this case because this a anormal
			event in an idle server.
2955

unknown's avatar
unknown committed
2956 2957 2958 2959 2960 2961
    RETURN VALUES
    'packet_error'	Error
    number		Length of packet
*/

static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
unknown's avatar
unknown committed
2962
{
2963
  ulong len;
unknown's avatar
unknown committed
2964

2965
  *suppress_warnings= 0;
unknown's avatar
unknown committed
2966 2967 2968
  /*
    my_real_read() will time us out
    We check if we were told to die, and if not, try reading again
2969 2970

    TODO:  Move 'events_till_disconnect' to the MASTER_INFO structure
unknown's avatar
unknown committed
2971
  */
unknown's avatar
unknown committed
2972
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2973
  if (disconnect_slave_event_count && !(events_till_disconnect--))
unknown's avatar
unknown committed
2974 2975 2976
    return packet_error;      
#endif
  
unknown's avatar
SCRUM  
unknown committed
2977
  len = net_safe_read(mysql);
2978
  if (len == packet_error || (long) len < 1)
unknown's avatar
unknown committed
2979
  {
unknown's avatar
SCRUM  
unknown committed
2980
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
unknown's avatar
unknown committed
2981 2982 2983 2984 2985 2986 2987 2988 2989
    {
      /*
	We are trying a normal reconnect after a read timeout;
	we suppress prints to .err file as long as the reconnect
	happens without problems
      */
      *suppress_warnings= TRUE;
    }
    else
2990
      sql_print_error("Error reading packet from server: %s ( server_errno=%d)",
unknown's avatar
SCRUM  
unknown committed
2991
		      mysql_error(mysql), mysql_errno(mysql));
unknown's avatar
unknown committed
2992 2993 2994
    return packet_error;
  }

2995 2996
  /* Check if eof packet */
  if (len < 8 && mysql->net.read_pos[0] == 254)
unknown's avatar
unknown committed
2997
  {
2998 2999
    sql_print_information("Slave: received end packet from server, apparent "
                          "master shutdown: %s",
unknown's avatar
SCRUM  
unknown committed
3000
		     mysql_error(mysql));
unknown's avatar
unknown committed
3001
     return packet_error;
unknown's avatar
unknown committed
3002
  }
unknown's avatar
unknown committed
3003 3004
  
  DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
3005
		      len, mysql->net.read_pos[4]));
unknown's avatar
unknown committed
3006 3007 3008
  return len - 1;   
}

unknown's avatar
unknown committed
3009

3010
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
3011
{
unknown's avatar
unknown committed
3012 3013 3014 3015 3016 3017 3018 3019 3020
  switch (expected_error) {
  case ER_NET_READ_ERROR:
  case ER_NET_ERROR_ON_WRITE:  
  case ER_SERVER_SHUTDOWN:  
  case ER_NEW_ABORTING_CONNECTION:
    return 1;
  default:
    return 0;
  }
3021
}
3022

3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070
/*
     Check if condition stated in UNTIL clause of START SLAVE is reached.
   SYNOPSYS
     st_relay_log_info::is_until_satisfied()
   DESCRIPTION
     Checks if UNTIL condition is reached. Uses caching result of last 
     comparison of current log file name and target log file name. So cached 
     value should be invalidated if current log file name changes 
     (see st_relay_log_info::notify_... functions).
     
     This caching is needed to avoid of expensive string comparisons and 
     strtol() conversions needed for log names comparison. We don't need to
     compare them each time this function is called, we only need to do this 
     when current log name changes. If we have UNTIL_MASTER_POS condition we 
     need to do this only after Rotate_log_event::exec_event() (which is 
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS 
     condition then we should invalidate cached comarison value after 
     inc_group_relay_log_pos() which called for each group of events (so we
     have some benefit if we have something like queries that use 
     autoincrement or if we have transactions).
     
     Should be called ONLY if until_condition != UNTIL_NONE !
   RETURN VALUE
     true - condition met or error happened (condition seems to have 
            bad log file name)
     false - condition not met
*/

bool st_relay_log_info::is_until_satisfied()
{
  const char *log_name;
  ulonglong log_pos;

  DBUG_ASSERT(until_condition != UNTIL_NONE);
  
  if (until_condition == UNTIL_MASTER_POS)
  {
    log_name= group_master_log_name;
    log_pos= group_master_log_pos;
  }
  else
  { /* until_condition == UNTIL_RELAY_POS */
    log_name= group_relay_log_name;
    log_pos= group_relay_log_pos;
  }
  
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
  {
3071 3072 3073 3074 3075 3076
    /*
      We have no cached comparison results so we should compare log names
      and cache result.
      If we are after RESET SLAVE, and the SQL slave thread has not processed
      any event yet, it could be that group_master_log_name is "". In that case,
      just wait for more events (as there is no sensible comparison to do).
3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100
    */

    if (*log_name)
    {
      const char *basename= log_name + dirname_length(log_name);
      
      const char *q= (const char*)(fn_ext(basename)+1);
      if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
      {
        /* Now compare extensions. */
        char *q_end;
        ulong log_name_extension= strtoul(q, &q_end, 10);
        if (log_name_extension < until_log_name_extension)
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
        else
          until_log_names_cmp_result= 
            (log_name_extension > until_log_name_extension) ? 
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
      }
      else  
      {
        /* Probably error so we aborting */
        sql_print_error("Slave SQL thread is stopped because UNTIL "
                        "condition is bad.");
unknown's avatar
unknown committed
3101
        return TRUE;
3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112
      }
    }
    else
      return until_log_pos == 0;
  }
    
  return ((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL && 
           log_pos >= until_log_pos) ||
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER);
}

3113

3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131
void st_relay_log_info::cached_charset_invalidate()
{
  /* Full of zeroes means uninitialized. */
  bzero(cached_charset, sizeof(cached_charset));
}


bool st_relay_log_info::cached_charset_compare(char *charset)
{
  if (bcmp(cached_charset, charset, sizeof(cached_charset)))
  {
    memcpy(cached_charset, charset, sizeof(cached_charset));
    return 1;
  }
  return 0;
}


3132
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
unknown's avatar
unknown committed
3133
{
3134 3135
  /*
     We acquire this mutex since we need it for all operations except
3136
     event execution. But we will release it in places where we will
3137 3138 3139
     wait for something for example inside of next_event().
   */
  pthread_mutex_lock(&rli->data_lock);
3140 3141 3142

  if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE &&
      rli->is_until_satisfied())
3143 3144
  {
    sql_print_error("Slave SQL thread stopped because it reached its"
3145
                    " UNTIL position %ld", (long) rli->until_pos());
3146
    /*
3147 3148 3149 3150 3151 3152 3153
      Setting abort_slave flag because we do not want additional message about
      error in query execution to be printed.
    */
    rli->abort_slave= 1;
    pthread_mutex_unlock(&rli->data_lock);
    return 1;
  }
3154

3155
  Log_event * ev = next_event(rli);
3156

3157
  DBUG_ASSERT(rli->sql_thd==thd);
3158

3159
  if (sql_slave_killed(thd,rli))
3160
  {
3161
    pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
3162
    delete ev;
3163
    return 1;
3164
  }
3165 3166
  if (ev)
  {
3167
    int type_code = ev->get_type_code();
3168
    int exec_res;
3169 3170

    /*
3171 3172 3173 3174 3175 3176 3177
      Queries originating from this server must be skipped.
      Low-level events (Format_desc, Rotate, Stop) from this server
      must also be skipped. But for those we don't want to modify
      group_master_log_pos, because these events did not exist on the master.
      Format_desc is not completely skipped.
      Skip queries specified by the user in slave_skip_counter.
      We can't however skip events that has something to do with the
3178
      log files themselves.
3179 3180 3181
      Filtering on own server id is extremely important, to ignore execution of
      events created by the creation/rotation of the relay log (remember that
      now the relay log starts with its Format_desc, has a Rotate etc).
3182
    */
3183

3184
    DBUG_PRINT("info",("type_code=%d, server_id=%d",type_code,ev->server_id));
3185

3186
    if ((ev->server_id == (uint32) ::server_id &&
unknown's avatar
unknown committed
3187
         !replicate_same_server_id &&
3188
         type_code != FORMAT_DESCRIPTION_EVENT) ||
3189
        (rli->slave_skip_counter &&
3190 3191
         type_code != ROTATE_EVENT && type_code != STOP_EVENT &&
         type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT))
unknown's avatar
unknown committed
3192
    {
3193
      DBUG_PRINT("info", ("event skipped"));
3194 3195 3196 3197
      if (thd->options & OPTION_BEGIN)
        rli->inc_event_relay_log_pos();
      else
      {
3198
        rli->inc_group_relay_log_pos((type_code == ROTATE_EVENT ||
3199 3200 3201 3202 3203 3204
                                      type_code == STOP_EVENT ||
                                      type_code == FORMAT_DESCRIPTION_EVENT) ?
                                     LL(0) : ev->log_pos,
                                     1/* skip lock*/);
        flush_relay_log_info(rli);
      }
3205

unknown's avatar
unknown committed
3206
      /*
3207 3208 3209
        Protect against common user error of setting the counter to 1
        instead of 2 while recovering from an insert which used auto_increment,
        rand or user var.
unknown's avatar
unknown committed
3210
      */
3211 3212 3213
      if (rli->slave_skip_counter &&
          !((type_code == INTVAR_EVENT ||
             type_code == RAND_EVENT ||
3214
             type_code == USER_VAR_EVENT) &&
3215
            rli->slave_skip_counter == 1) &&
3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227
          /*
            The events from ourselves which have something to do with the relay
            log itself must be skipped, true, but they mustn't decrement
            rli->slave_skip_counter, because the user is supposed to not see
            these events (they are not in the master's binlog) and if we
            decremented, START SLAVE would for example decrement when it sees
            the Rotate, so the event which the user probably wanted to skip
            would not be skipped.
          */
          !(ev->server_id == (uint32) ::server_id &&
            (type_code == ROTATE_EVENT || type_code == STOP_EVENT ||
             type_code == START_EVENT_V3 || type_code == FORMAT_DESCRIPTION_EVENT)))
3228 3229
        --rli->slave_skip_counter;
      pthread_mutex_unlock(&rli->data_lock);
3230 3231 3232
      delete ev;
      return 0;                                 // avoid infinite update loops
    }
3233
    pthread_mutex_unlock(&rli->data_lock);
3234

3235
    thd->server_id = ev->server_id; // use the original server id for logging
unknown's avatar
unknown committed
3236
    thd->set_time();				// time the query
unknown's avatar
unknown committed
3237
    thd->lex->current_select= 0;
unknown's avatar
unknown committed
3238
    if (!ev->when)
unknown's avatar
unknown committed
3239
      ev->when = time(NULL);
3240
    ev->thd = thd;
3241 3242
    exec_res = ev->exec_event(rli);
    DBUG_ASSERT(rli->sql_thd==thd);
3243
    /*
3244 3245 3246 3247 3248 3249 3250 3251 3252
       Format_description_log_event should not be deleted because it will be
       used to read info about the relay log's format; it will be deleted when
       the SQL thread does not need it, i.e. when this thread terminates.
    */
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
    {
      DBUG_PRINT("info", ("Deleting the event after it has been executed"));
      delete ev;
    }
3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273
    if (slave_trans_retries)
    {
      if (exec_res &&
          (thd->net.last_errno == ER_LOCK_DEADLOCK ||
           thd->net.last_errno == ER_LOCK_WAIT_TIMEOUT) &&
          !thd->is_fatal_error)
      {
        const char *errmsg;
        /*
          We were in a transaction which has been rolled back because of a
          deadlock (currently, InnoDB deadlock detected by InnoDB) or lock
          wait timeout (innodb_lock_wait_timeout exceeded); let's seek back to
          BEGIN log event and retry it all again.
          We have to not only seek but also
          a) init_master_info(), to seek back to hot relay log's start for later
          (for when we will come back to this hot log after re-processing the
          possibly existing old logs where BEGIN is: check_binlog_magic() will
          then need the cache to be at position 0 (see comments at beginning of
          init_master_info()).
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
        */
3274
        if (rli->trans_retries < slave_trans_retries)
3275 3276 3277 3278 3279 3280
        {
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
            sql_print_error("Failed to initialize the master info structure");
          else if (init_relay_log_pos(rli,
                                      rli->group_relay_log_name,
                                      rli->group_relay_log_pos,
3281
                                      1, &errmsg, 1))
3282 3283 3284 3285 3286
            sql_print_error("Error initializing relay log position: %s",
                            errmsg);
          else
          {
            exec_res= 0;
3287 3288 3289 3290 3291 3292 3293 3294 3295 3296
	    /* chance for concurrent connection to get more locks */
            safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
		       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
	    rli->trans_retries++;
            rli->retried_trans++;
            pthread_mutex_unlock(&rli->data_lock);
            DBUG_PRINT("info", ("Slave retries transaction "
                                "rli->trans_retries: %lu", rli->trans_retries));
	  }
3297 3298 3299 3300 3301 3302 3303 3304
        }
        else
          sql_print_error("Slave SQL thread retried transaction %lu time(s) "
                          "in vain, giving up. Consider raising the value of "
                          "the slave_transaction_retries variable.",
                          slave_trans_retries);
      }
      if (!((thd->options & OPTION_BEGIN) && opt_using_transactions))
3305 3306
         rli->trans_retries= 0; // restart from fresh
     }
3307
    return exec_res;
3308
  }
unknown's avatar
unknown committed
3309
  else
3310
  {
3311
    pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
3312
    slave_print_error(rli, 0, "\
3313 3314 3315 3316 3317 3318 3319
Could not parse relay log event entry. The possible reasons are: the master's \
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
binary log), the slave's relay log is corrupted (you can check this by running \
'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
or slave's MySQL code. If you want to check the master's binary log or slave's \
relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
on this slave.\
unknown's avatar
unknown committed
3320
");
3321 3322
    return 1;
  }
unknown's avatar
unknown committed
3323 3324
}

3325

unknown's avatar
unknown committed
3326
/* Slave I/O Thread entry point */
3327

3328
extern "C" pthread_handler_decl(handle_slave_io,arg)
unknown's avatar
unknown committed
3329
{
unknown's avatar
unknown committed
3330 3331
  THD *thd; // needs to be first for thread_stack
  MYSQL *mysql;
3332
  MASTER_INFO *mi = (MASTER_INFO*)arg;
unknown's avatar
unknown committed
3333 3334
  char llbuff[22];
  uint retry_count;
3335

unknown's avatar
unknown committed
3336 3337
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
3338
  DBUG_ENTER("handle_slave_io");
unknown's avatar
unknown committed
3339

unknown's avatar
unknown committed
3340
#ifndef DBUG_OFF
unknown's avatar
unknown committed
3341
slave_begin:
3342
#endif
3343
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
3344 3345 3346
  mysql= NULL ;
  retry_count= 0;

3347
  pthread_mutex_lock(&mi->run_lock);
unknown's avatar
unknown committed
3348 3349 3350
  /* Inform waiting threads that slave has started */
  mi->slave_run_id++;

3351
#ifndef DBUG_OFF
3352
  mi->events_till_abort = abort_slave_event_count;
3353 3354
#endif

3355
  thd= new THD; // note that contructor of THD uses DBUG_ !
3356
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
3357 3358

  pthread_detach_this_thread();
3359
  if (init_slave_thread(thd, SLAVE_THD_IO))
unknown's avatar
unknown committed
3360 3361 3362 3363 3364 3365
  {
    pthread_cond_broadcast(&mi->start_cond);
    pthread_mutex_unlock(&mi->run_lock);
    sql_print_error("Failed during slave I/O thread initialization");
    goto err;
  }
3366
  mi->io_thd = thd;
unknown's avatar
unknown committed
3367
  thd->thread_stack = (char*)&thd; // remember where our stack is
3368
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
3369
  threads.append(thd);
3370
  pthread_mutex_unlock(&LOCK_thread_count);
3371 3372 3373
  mi->slave_running = 1;
  mi->abort_slave = 0;
  pthread_mutex_unlock(&mi->run_lock);
3374
  pthread_cond_broadcast(&mi->start_cond);
3375

3376 3377 3378
  DBUG_PRINT("master_info",("log_file_name: '%s'  position: %s",
			    mi->master_log_name,
			    llstr(mi->master_log_pos,llbuff)));
3379

unknown's avatar
SCRUM  
unknown committed
3380
  if (!(mi->mysql = mysql = mysql_init(NULL)))
unknown's avatar
unknown committed
3381
  {
unknown's avatar
unknown committed
3382
    sql_print_error("Slave I/O thread: error in mysql_init()");
unknown's avatar
unknown committed
3383 3384
    goto err;
  }
3385

3386
  thd->proc_info = "Connecting to master";
3387
  // we can get killed during safe_connect
3388
  if (!safe_connect(thd, mysql, mi))
3389
    sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',\
3390
  replication started in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
3391 3392 3393
		    mi->host, mi->port,
		    IO_RPL_LOG_NAME,
		    llstr(mi->master_log_pos,llbuff));
3394
  else
unknown's avatar
unknown committed
3395
  {
3396
    sql_print_information("Slave I/O thread killed while connecting to master");
unknown's avatar
unknown committed
3397 3398
    goto err;
  }
3399

3400
connected:
3401

3402 3403
  // TODO: the assignment below should be under mutex (5.0)
  mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
3404
  thd->slave_net = &mysql->net;
3405
  thd->proc_info = "Checking master version";
unknown's avatar
unknown committed
3406
  if (get_master_version_and_clock(mysql, mi))
3407
    goto err;
3408 3409

  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
3410
  {
unknown's avatar
unknown committed
3411 3412 3413 3414 3415
    /*
      Register ourselves with the master.
      If fails, this is not fatal - we just print the error message and go
      on with life.
    */
3416
    thd->proc_info = "Registering slave on master";
3417
    if (register_slave_on_master(mysql) ||  update_slave_list(mysql, mi))
3418 3419
      goto err;
  }
3420

3421
  DBUG_PRINT("info",("Starting reading binary log from master"));
3422
  while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3423
  {
3424
    bool suppress_warnings= 0;
unknown's avatar
unknown committed
3425
    thd->proc_info = "Requesting binlog dump";
unknown's avatar
unknown committed
3426
    if (request_dump(mysql, mi, &suppress_warnings))
unknown's avatar
unknown committed
3427 3428
    {
      sql_print_error("Failed on request_dump()");
unknown's avatar
unknown committed
3429
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3430
      {
3431
	sql_print_information("Slave I/O thread killed while requesting master \
unknown's avatar
unknown committed
3432
dump");
unknown's avatar
unknown committed
3433 3434
	goto err;
      }
3435

3436
      mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
3437
      thd->proc_info= "Waiting to reconnect after a failed binlog dump request";
3438 3439 3440
#ifdef SIGNAL_WITH_VIO_CLOSE
      thd->clear_active_vio();
#endif
unknown's avatar
SCRUM  
unknown committed
3441
      end_server(mysql);
unknown's avatar
unknown committed
3442 3443 3444 3445 3446
      /*
	First time retry immediately, assuming that we can recover
	right away - if first time fails, sleep between re-tries
	hopefuly the admin can fix the problem sometime
      */
3447 3448 3449 3450
      if (retry_count++)
      {
	if (retry_count > master_retry_count)
	  goto err;				// Don't retry forever
3451 3452
	safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
		   (void*)mi);
3453
      }
3454
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3455
      {
3456
	sql_print_information("Slave I/O thread killed while retrying master \
unknown's avatar
unknown committed
3457
dump");
unknown's avatar
unknown committed
3458 3459
	goto err;
      }
unknown's avatar
unknown committed
3460

3461
      thd->proc_info = "Reconnecting after a failed binlog dump request";
unknown's avatar
unknown committed
3462 3463
      if (!suppress_warnings)
	sql_print_error("Slave I/O thread: failed dump request, \
3464
reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
3465 3466 3467
			llstr(mi->master_log_pos,llbuff));
      if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	  io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3468
      {
3469
	sql_print_information("Slave I/O thread killed during or \
3470
after reconnect");
unknown's avatar
unknown committed
3471 3472
	goto err;
      }
unknown's avatar
unknown committed
3473

unknown's avatar
unknown committed
3474 3475
      goto connected;
    }
unknown's avatar
unknown committed
3476

3477
    while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3478
    {
3479 3480
      bool suppress_warnings= 0;
      /*
3481
         We say "waiting" because read_event() will wait if there's nothing to
3482 3483 3484
         read. But if there's something to read, it will not wait. The
         important thing is to not confuse users by saying "reading" whereas
         we're in fact receiving nothing.
3485 3486
      */
      thd->proc_info = "Waiting for master to send event";
unknown's avatar
unknown committed
3487
      ulong event_len = read_event(mysql, mi, &suppress_warnings);
3488
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3489
      {
3490
	if (global_system_variables.log_warnings)
3491
	  sql_print_information("Slave I/O thread killed while reading event");
unknown's avatar
unknown committed
3492 3493
	goto err;
      }
3494

unknown's avatar
unknown committed
3495 3496
      if (event_len == packet_error)
      {
unknown's avatar
SCRUM  
unknown committed
3497
	uint mysql_error_number= mysql_errno(mysql);
3498
	if (mysql_error_number == ER_NET_PACKET_TOO_LARGE)
unknown's avatar
unknown committed
3499
	{
3500 3501 3502 3503
	  sql_print_error("\
Log entry on master is longer than max_allowed_packet (%ld) on \
slave. If the entry is correct, restart the server with a higher value of \
max_allowed_packet",
unknown's avatar
unknown committed
3504
			  thd->variables.max_allowed_packet);
unknown's avatar
unknown committed
3505 3506
	  goto err;
	}
3507 3508 3509
	if (mysql_error_number == ER_MASTER_FATAL_ERROR_READING_BINLOG)
	{
	  sql_print_error(ER(mysql_error_number), mysql_error_number,
unknown's avatar
SCRUM  
unknown committed
3510
			  mysql_error(mysql));
3511 3512
	  goto err;
	}
3513
        mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
unknown's avatar
unknown committed
3514
	thd->proc_info = "Waiting to reconnect after a failed master event read";
3515 3516 3517
#ifdef SIGNAL_WITH_VIO_CLOSE
        thd->clear_active_vio();
#endif
unknown's avatar
SCRUM  
unknown committed
3518
	end_server(mysql);
3519 3520 3521 3522
	if (retry_count++)
	{
	  if (retry_count > master_retry_count)
	    goto err;				// Don't retry forever
3523
	  safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3524
		     (void*) mi);
3525
	}
3526
	if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3527
	{
3528
	  if (global_system_variables.log_warnings)
3529
	    sql_print_information("Slave I/O thread killed while waiting to \
unknown's avatar
unknown committed
3530
reconnect after a failed read");
unknown's avatar
unknown committed
3531 3532
	  goto err;
	}
3533
	thd->proc_info = "Reconnecting after a failed master event read";
unknown's avatar
unknown committed
3534
	if (!suppress_warnings)
3535
	  sql_print_information("Slave I/O thread: Failed reading log event, \
3536
reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
3537 3538 3539
			  llstr(mi->master_log_pos, llbuff));
	if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	    io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3540
	{
3541
	  if (global_system_variables.log_warnings)
3542
	    sql_print_information("Slave I/O thread killed during or after a \
unknown's avatar
unknown committed
3543
reconnect done to recover from failed read");
unknown's avatar
unknown committed
3544 3545 3546
	  goto err;
	}
	goto connected;
unknown's avatar
unknown committed
3547
      } // if (event_len == packet_error)
3548

3549
      retry_count=0;			// ok event, reset retry counter
3550
      thd->proc_info = "Queueing master event to the relay log";
unknown's avatar
unknown committed
3551 3552 3553
      if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
		      event_len))
      {
3554
	sql_print_error("Slave I/O thread could not queue event from master");
unknown's avatar
unknown committed
3555 3556
	goto err;
      }
unknown's avatar
unknown committed
3557
      flush_master_info(mi, 1); /* sure that we can flush the relay log */
3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569
      /*
        See if the relay logs take too much space.
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
        and does not introduce any problem:
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
        the clean value is 0), then we are reading only one more event as we
        should, and we'll block only at the next event. No big deal.
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
        the clean value is 1), then we are going into wait_for_relay_log_space()
        for no reason, but this function will do a clean read, notice the clean
        value and exit immediately.
      */
3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580
#ifndef DBUG_OFF
      {
        char llbuf1[22], llbuf2[22];
        DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \
ignore_log_space_limit=%d",
                            llstr(mi->rli.log_space_limit,llbuf1),
                            llstr(mi->rli.log_space_total,llbuf2),
                            (int) mi->rli.ignore_log_space_limit)); 
      }
#endif

unknown's avatar
unknown committed
3581
      if (mi->rli.log_space_limit && mi->rli.log_space_limit <
3582 3583
	  mi->rli.log_space_total &&
          !mi->rli.ignore_log_space_limit)
unknown's avatar
unknown committed
3584 3585 3586 3587 3588 3589
	if (wait_for_relay_log_space(&mi->rli))
	{
	  sql_print_error("Slave I/O thread aborted while waiting for relay \
log space");
	  goto err;
	}
unknown's avatar
unknown committed
3590
      // TODO: check debugging abort code
3591
#ifndef DBUG_OFF
unknown's avatar
unknown committed
3592 3593 3594 3595 3596
      if (abort_slave_event_count && !--events_till_abort)
      {
	sql_print_error("Slave I/O thread: debugging abort");
	goto err;
      }
3597
#endif
3598
    } 
3599
  }
unknown's avatar
unknown committed
3600

unknown's avatar
unknown committed
3601
  // error = 0;
unknown's avatar
unknown committed
3602
err:
3603
  // print the current replication position
3604
  sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
3605
		  IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
3606
  VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
3607
  thd->query = thd->db = 0; // extra safety
3608
  thd->query_length= thd->db_length= 0;
3609
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
3610 3611
  if (mysql)
  {
unknown's avatar
SCRUM  
unknown committed
3612
    mysql_close(mysql);
unknown's avatar
unknown committed
3613 3614
    mi->mysql=0;
  }
unknown's avatar
unknown committed
3615
  thd->proc_info = "Waiting for slave mutex on exit";
3616 3617 3618
  pthread_mutex_lock(&mi->run_lock);
  mi->slave_running = 0;
  mi->io_thd = 0;
3619 3620 3621
  /* Forget the relay log's format */
  delete mi->rli.relay_log.description_event_for_queue;
  mi->rli.relay_log.description_event_for_queue= 0;
3622
  // TODO: make rpl_status part of MASTER_INFO
3623
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
3624 3625
  mi->abort_slave = 0; // TODO: check if this is needed
  DBUG_ASSERT(thd->net.buff != 0);
unknown's avatar
unknown committed
3626
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
unknown's avatar
unknown committed
3627
  close_thread_tables(thd, 0);
3628
  pthread_mutex_lock(&LOCK_thread_count);
3629
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
3630
  delete thd;
3631
  pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
3632
  pthread_cond_broadcast(&mi->stop_cond);	// tell the world we are done
3633
  pthread_mutex_unlock(&mi->run_lock);
unknown's avatar
unknown committed
3634
#ifndef DBUG_OFF
unknown's avatar
unknown committed
3635
  if (abort_slave_event_count && !events_till_abort)
unknown's avatar
unknown committed
3636 3637
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
3638
  my_thread_end();
unknown's avatar
unknown committed
3639 3640 3641 3642
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}

unknown's avatar
unknown committed
3643

unknown's avatar
unknown committed
3644
/* Slave SQL Thread entry point */
unknown's avatar
unknown committed
3645

3646
extern "C" pthread_handler_decl(handle_slave_sql,arg)
3647
{
3648
  THD *thd;			/* needs to be first for thread_stack */
3649 3650
  char llbuff[22],llbuff1[22];
  RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli; 
unknown's avatar
unknown committed
3651 3652 3653 3654
  const char *errmsg;

  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
3655
  DBUG_ENTER("handle_slave_sql");
unknown's avatar
unknown committed
3656 3657 3658 3659 3660

#ifndef DBUG_OFF
slave_begin:  
#endif  

3661 3662 3663
  DBUG_ASSERT(rli->inited);
  pthread_mutex_lock(&rli->run_lock);
  DBUG_ASSERT(!rli->slave_running);
unknown's avatar
unknown committed
3664
  errmsg= 0;
3665 3666 3667
#ifndef DBUG_OFF  
  rli->events_till_abort = abort_slave_event_count;
#endif  
3668

unknown's avatar
unknown committed
3669
  thd = new THD; // note that contructor of THD uses DBUG_ !
3670 3671
  thd->thread_stack = (char*)&thd; // remember where our stack is
  
unknown's avatar
unknown committed
3672 3673 3674
  /* Inform waiting threads that slave has started */
  rli->slave_run_id++;

3675 3676
  pthread_detach_this_thread();
  if (init_slave_thread(thd, SLAVE_THD_SQL))
unknown's avatar
unknown committed
3677 3678 3679 3680 3681 3682 3683 3684 3685 3686
  {
    /*
      TODO: this is currently broken - slave start and change master
      will be stuck if we fail here
    */
    pthread_cond_broadcast(&rli->start_cond);
    pthread_mutex_unlock(&rli->run_lock);
    sql_print_error("Failed during slave thread initialization");
    goto err;
  }
3687
  thd->init_for_queries();
3688
  rli->sql_thd= thd;
3689
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
3690
  pthread_mutex_lock(&LOCK_thread_count);
3691
  threads.append(thd);
3692
  pthread_mutex_unlock(&LOCK_thread_count);
3693 3694 3695 3696 3697 3698 3699 3700
  /*
    We are going to set slave_running to 1. Assuming slave I/O thread is
    alive and connected, this is going to make Seconds_Behind_Master be 0
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
    the moment we start we can think we are caught up, and the next second we
    start receiving data so we realize we are not caught up and
    Seconds_Behind_Master grows. No big deal.
  */
3701 3702 3703
  rli->slave_running = 1;
  rli->abort_slave = 0;
  pthread_mutex_unlock(&rli->run_lock);
3704
  pthread_cond_broadcast(&rli->start_cond);
3705

unknown's avatar
unknown committed
3706 3707 3708
  /*
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
    thread may execute no Query_log_event, so the error will remain even
unknown's avatar
unknown committed
3709
    though there's no problem anymore). Do not reset the master timestamp
3710 3711 3712 3713
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
    as we are not sure that we are going to receive a query, we want to
    remember the last master timestamp (to say how many seconds behind we are
    now.
unknown's avatar
unknown committed
3714
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
unknown's avatar
unknown committed
3715
  */
unknown's avatar
unknown committed
3716
  clear_slave_error(rli);
3717 3718

  //tell the I/O thread to take relay_log_space_limit into account from now on
3719
  pthread_mutex_lock(&rli->log_space_lock);
3720
  rli->ignore_log_space_limit= 0;
3721
  pthread_mutex_unlock(&rli->log_space_lock);
3722
  rli->trans_retries= 0; // start from "no error"
3723

3724
  if (init_relay_log_pos(rli,
3725 3726
			 rli->group_relay_log_name,
			 rli->group_relay_log_pos,
3727 3728
			 1 /*need data lock*/, &errmsg,
                         1 /*look for a description_event*/))
3729 3730 3731 3732 3733
  {
    sql_print_error("Error initializing relay log position: %s",
		    errmsg);
    goto err;
  }
3734
  THD_CHECK_SENTRY(thd);
3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758
#ifndef DBUG_OFF
  {
    char llbuf1[22], llbuf2[22];
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
                        llstr(my_b_tell(rli->cur_log),llbuf1), 
                        llstr(rli->event_relay_log_pos,llbuf2)));
    DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
    /*
      Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
      correct position when it's called just after my_b_seek() (the questionable
      stuff is those "seek is done on next read" comments in the my_b_seek()
      source code).
      The crude reality is that this assertion randomly fails whereas
      replication seems to work fine. And there is no easy explanation why it
      fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
      init_relay_log_pos() called above). Maybe the assertion would be
      meaningful if we held rli->data_lock between the my_b_seek() and the
      DBUG_ASSERT().
    */
#ifdef SHOULD_BE_CHECKED
    DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
#endif
  }
#endif
3759
  DBUG_ASSERT(rli->sql_thd == thd);
3760 3761

  DBUG_PRINT("master_info",("log_file_name: %s  position: %s",
3762 3763
			    rli->group_master_log_name,
			    llstr(rli->group_master_log_pos,llbuff)));
3764
  if (global_system_variables.log_warnings)
3765
    sql_print_information("Slave SQL thread initialized, starting replication in \
unknown's avatar
unknown committed
3766
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
3767 3768
		    llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
		    llstr(rli->group_relay_log_pos,llbuff1));
3769

unknown's avatar
unknown committed
3770
  /* execute init_slave variable */
unknown's avatar
unknown committed
3771
  if (sys_init_slave.value_length)
unknown's avatar
unknown committed
3772
  {
3773
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
unknown's avatar
unknown committed
3774 3775 3776 3777 3778 3779 3780 3781
    if (thd->query_error)
    {
      sql_print_error("\
Slave SQL thread aborted. Can't execute init_slave query");
      goto err;
    }
  }

3782 3783
  /* Read queries from the IO/THREAD until this thread is killed */

3784
  while (!sql_slave_killed(thd,rli))
3785
  {
3786
    thd->proc_info = "Reading event from the relay log";
3787
    DBUG_ASSERT(rli->sql_thd == thd);
3788
    THD_CHECK_SENTRY(thd);
3789 3790 3791
    if (exec_relay_log_event(thd,rli))
    {
      // do not scare the user if SQL thread was simply killed or stopped
3792
      if (!sql_slave_killed(thd,rli))
3793 3794
        sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \
unknown's avatar
unknown committed
3795
the slave SQL thread with \"SLAVE START\". We stopped at log \
3796
'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
3797 3798
      goto err;
    }
3799
  }
3800

3801
  /* Thread stopped. Print the current replication position to the log */
unknown's avatar
unknown committed
3802 3803 3804
  sql_print_information("Slave SQL thread exiting, replication stopped in log "
 			"'%s' at position %s",
		        RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
3805 3806

 err:
3807
  VOID(pthread_mutex_lock(&LOCK_thread_count));
3808 3809 3810 3811 3812 3813
  /*
    Some extra safety, which should not been needed (normally, event deletion
    should already have done these assignments (each event which sets these
    variables is supposed to set them to 0 before terminating)).
  */
  thd->query= thd->db= thd->catalog= 0; 
3814
  thd->query_length= thd->db_length= 0;
3815
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
3816 3817
  thd->proc_info = "Waiting for slave mutex on exit";
  pthread_mutex_lock(&rli->run_lock);
3818 3819
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
  pthread_mutex_lock(&rli->data_lock);
3820
  DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
3821 3822
  /* When master_pos_wait() wakes up it will check this and terminate */
  rli->slave_running= 0; 
3823 3824 3825
  /* Forget the relay log's format */
  delete rli->relay_log.description_event_for_exec;
  rli->relay_log.description_event_for_exec= 0;
3826 3827 3828 3829
  /* Wake up master_pos_wait() */
  pthread_mutex_unlock(&rli->data_lock);
  DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
  pthread_cond_broadcast(&rli->data_cond);
unknown's avatar
unknown committed
3830
  rli->ignore_log_space_limit= 0; /* don't need any lock */
3831 3832
  /* we die so won't remember charset - re-update them on next thread start */
  rli->cached_charset_invalidate();
3833
  rli->save_temporary_tables = thd->temporary_tables;
unknown's avatar
unknown committed
3834 3835 3836 3837 3838

  /*
    TODO: see if we can do this conditionally in next_event() instead
    to avoid unneeded position re-init
  */
3839 3840 3841 3842
  thd->temporary_tables = 0; // remove tempation from destructor to close them
  DBUG_ASSERT(thd->net.buff != 0);
  net_end(&thd->net); // destructor will not free it, because we are weird
  DBUG_ASSERT(rli->sql_thd == thd);
3843
  THD_CHECK_SENTRY(thd);
3844
  rli->sql_thd= 0;
3845
  pthread_mutex_lock(&LOCK_thread_count);
3846
  THD_CHECK_SENTRY(thd);
3847 3848 3849 3850 3851 3852 3853 3854 3855
  delete thd;
  pthread_mutex_unlock(&LOCK_thread_count);
  pthread_cond_broadcast(&rli->stop_cond);
  // tell the world we are done
  pthread_mutex_unlock(&rli->run_lock);
#ifndef DBUG_OFF // TODO: reconsider the code below
  if (abort_slave_event_count && !rli->events_till_abort)
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
3856
  my_thread_end();
3857 3858 3859
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}
unknown's avatar
unknown committed
3860

3861

unknown's avatar
unknown committed
3862
/*
3863
  process_io_create_file()
unknown's avatar
unknown committed
3864
*/
3865

unknown's avatar
unknown committed
3866 3867 3868 3869 3870
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
  int error = 1;
  ulong num_bytes;
  bool cev_not_written;
3871 3872
  THD *thd = mi->io_thd;
  NET *net = &mi->mysql->net;
unknown's avatar
unknown committed
3873
  DBUG_ENTER("process_io_create_file");
unknown's avatar
unknown committed
3874 3875

  if (unlikely(!cev->is_valid()))
unknown's avatar
unknown committed
3876
    DBUG_RETURN(1);
unknown's avatar
unknown committed
3877 3878 3879 3880 3881 3882
  /*
    TODO: fix to honor table rules, not only db rules
  */
  if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
  {
    skip_load_data_infile(net);
unknown's avatar
unknown committed
3883
    DBUG_RETURN(0);
unknown's avatar
unknown committed
3884 3885 3886
  }
  DBUG_ASSERT(cev->inited_from_old);
  thd->file_id = cev->file_id = mi->file_id++;
3887
  thd->server_id = cev->server_id;
unknown's avatar
unknown committed
3888 3889 3890 3891 3892 3893 3894 3895 3896
  cev_not_written = 1;
  
  if (unlikely(net_request_file(net,cev->fname)))
  {
    sql_print_error("Slave I/O: failed requesting download of '%s'",
		    cev->fname);
    goto err;
  }

unknown's avatar
unknown committed
3897 3898 3899 3900
  /*
    This dummy block is so we could instantiate Append_block_log_event
    once and then modify it slightly instead of doing it multiple times
    in the loop
unknown's avatar
unknown committed
3901 3902
  */
  {
3903
    Append_block_log_event aev(thd,0,0,0,0);
unknown's avatar
unknown committed
3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914
  
    for (;;)
    {
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
      {
	sql_print_error("Network read error downloading '%s' from master",
			cev->fname);
	goto err;
      }
      if (unlikely(!num_bytes)) /* eof */
      {
3915
	net_write_command(net, 0, "", 0, "", 0);/* 3.23 master wants it */
unknown's avatar
unknown committed
3916 3917 3918 3919 3920 3921 3922 3923
        /*
          If we wrote Create_file_log_event, then we need to write
          Execute_load_log_event. If we did not write Create_file_log_event,
          then this is an empty file and we can just do as if the LOAD DATA
          INFILE had not existed, i.e. write nothing.
        */
        if (unlikely(cev_not_written))
	  break;
unknown's avatar
unknown committed
3924
	Execute_load_log_event xev(thd,0,0);
3925
	xev.log_pos = cev->log_pos;
unknown's avatar
unknown committed
3926 3927 3928 3929 3930 3931
	if (unlikely(mi->rli.relay_log.append(&xev)))
	{
	  sql_print_error("Slave I/O: error writing Exec_load event to \
relay log");
	  goto err;
	}
unknown's avatar
unknown committed
3932
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945
	break;
      }
      if (unlikely(cev_not_written))
      {
	cev->block = (char*)net->read_pos;
	cev->block_len = num_bytes;
	if (unlikely(mi->rli.relay_log.append(cev)))
	{
	  sql_print_error("Slave I/O: error writing Create_file event to \
relay log");
	  goto err;
	}
	cev_not_written=0;
unknown's avatar
unknown committed
3946
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
3947 3948 3949 3950 3951
      }
      else
      {
	aev.block = (char*)net->read_pos;
	aev.block_len = num_bytes;
3952
	aev.log_pos = cev->log_pos;
unknown's avatar
unknown committed
3953 3954 3955 3956 3957 3958
	if (unlikely(mi->rli.relay_log.append(&aev)))
	{
	  sql_print_error("Slave I/O: error writing Append_block event to \
relay log");
	  goto err;
	}
unknown's avatar
unknown committed
3959
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
unknown's avatar
unknown committed
3960 3961 3962 3963 3964
      }
    }
  }
  error=0;
err:
unknown's avatar
unknown committed
3965
  DBUG_RETURN(error);
unknown's avatar
unknown committed
3966
}
unknown's avatar
unknown committed
3967

3968

unknown's avatar
unknown committed
3969
/*
unknown's avatar
unknown committed
3970 3971 3972 3973 3974 3975 3976 3977
  Start using a new binary log on the master

  SYNOPSIS
    process_io_rotate()
    mi			master_info for the slave
    rev			The rotate log event read from the binary log

  DESCRIPTION
3978
    Updates the master info with the place in the next binary
unknown's avatar
unknown committed
3979
    log where we should start reading.
3980
    Rotate the relay log to avoid mixed-format relay logs.
unknown's avatar
unknown committed
3981 3982 3983 3984 3985 3986 3987

  NOTES
    We assume we already locked mi->data_lock

  RETURN VALUES
    0		ok
    1	        Log event is illegal
unknown's avatar
unknown committed
3988 3989 3990

*/

unknown's avatar
unknown committed
3991
static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
3992
{
3993
  DBUG_ENTER("process_io_rotate");
unknown's avatar
unknown committed
3994
  safe_mutex_assert_owner(&mi->data_lock);
3995

unknown's avatar
unknown committed
3996
  if (unlikely(!rev->is_valid()))
3997
    DBUG_RETURN(1);
unknown's avatar
unknown committed
3998 3999 4000 4001 4002

  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
  mi->master_log_pos= rev->pos;
  DBUG_PRINT("info", ("master_log_pos: '%s' %d",
		      mi->master_log_name, (ulong) mi->master_log_pos));
4003
#ifndef DBUG_OFF
unknown's avatar
unknown committed
4004 4005 4006 4007 4008 4009
  /*
    If we do not do this, we will be getting the first
    rotate event forever, so we need to not disconnect after one.
  */
  if (disconnect_slave_event_count)
    events_till_disconnect++;
4010
#endif
4011

4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025
  /*
    If description_event_for_queue is format <4, there is conversion in the
    relay log to the slave's format (4). And Rotate can mean upgrade or
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
    no need to reset description_event_for_queue now. And if it's nothing (same
    master version as before), no need (still using the slave's format).
  */
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
  {
    delete mi->rli.relay_log.description_event_for_queue;
    /* start from format 3 (MySQL 4.0) again */
    mi->rli.relay_log.description_event_for_queue= new
      Format_description_log_event(3);
  }
4026 4027 4028 4029
  /*
    Rotate the relay log makes binlog format detection easier (at next slave
    start or mysqlbinlog)
  */
4030
  rotate_relay_log(mi); /* will take the right mutexes */
4031
  DBUG_RETURN(0);
4032 4033
}

unknown's avatar
unknown committed
4034
/*
4035 4036
  Reads a 3.23 event and converts it to the slave's format. This code was
  copied from MySQL 4.0.
unknown's avatar
unknown committed
4037
*/
4038
static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf,
unknown's avatar
unknown committed
4039
			   ulong event_len)
4040
{
unknown's avatar
unknown committed
4041
  const char *errmsg = 0;
unknown's avatar
unknown committed
4042 4043 4044 4045
  ulong inc_pos;
  bool ignore_event= 0;
  char *tmp_buf = 0;
  RELAY_LOG_INFO *rli= &mi->rli;
4046
  DBUG_ENTER("queue_binlog_ver_1_event");
unknown's avatar
unknown committed
4047

unknown's avatar
unknown committed
4048 4049 4050
  /*
    If we get Load event, we need to pass a non-reusable buffer
    to read_log_event, so we do a trick
unknown's avatar
unknown committed
4051 4052 4053 4054 4055 4056
  */
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
  {
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
    {
      sql_print_error("Slave I/O: out of memory for Load event");
unknown's avatar
unknown committed
4057
      DBUG_RETURN(1);
unknown's avatar
unknown committed
4058 4059
    }
    memcpy(tmp_buf,buf,event_len);
4060 4061 4062 4063 4064 4065 4066 4067
    /*
      Create_file constructor wants a 0 as last char of buffer, this 0 will
      serve as the string-termination char for the file's name (which is at the
      end of the buffer)
      We must increment event_len, otherwise the event constructor will not see
      this end 0, which leads to segfault.
    */
    tmp_buf[event_len++]=0;
unknown's avatar
unknown committed
4068
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
unknown's avatar
unknown committed
4069 4070
    buf = (const char*)tmp_buf;
  }
4071 4072 4073 4074 4075 4076
  /*
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
    send the loaded file, and write it to the relay log in the form of
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
    connected to the master).
  */
unknown's avatar
unknown committed
4077
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
4078
                                            mi->rli.relay_log.description_event_for_queue);
4079
  if (unlikely(!ev))
4080 4081
  {
    sql_print_error("Read invalid event from master: '%s',\
unknown's avatar
unknown committed
4082
 master could be corrupt but a more likely cause of this is a bug",
4083
		    errmsg);
unknown's avatar
unknown committed
4084 4085
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(1);
4086
  }
4087
  pthread_mutex_lock(&mi->data_lock);
4088
  ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
unknown's avatar
unknown committed
4089
  switch (ev->get_type_code()) {
unknown's avatar
unknown committed
4090
  case STOP_EVENT:
4091
    ignore_event= 1;
unknown's avatar
unknown committed
4092 4093
    inc_pos= event_len;
    break;
4094
  case ROTATE_EVENT:
4095
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
4096 4097
    {
      delete ev;
4098
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
4099
      DBUG_RETURN(1);
4100
    }
unknown's avatar
unknown committed
4101
    inc_pos= 0;
4102
    break;
unknown's avatar
unknown committed
4103
  case CREATE_FILE_EVENT:
4104 4105 4106 4107 4108 4109
    /*
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
      queue_old_event() which is for 3.23 events which don't comprise
      CREATE_FILE_EVENT. This is because read_log_event() above has just
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
    */
unknown's avatar
unknown committed
4110
  {
unknown's avatar
unknown committed
4111
    /* We come here when and only when tmp_buf != 0 */
4112
    DBUG_ASSERT(tmp_buf != 0);
4113 4114
    inc_pos=event_len;
    ev->log_pos+= inc_pos;
unknown's avatar
unknown committed
4115
    int error = process_io_create_file(mi,(Create_file_log_event*)ev);
4116
    delete ev;
4117
    mi->master_log_pos += inc_pos;
4118
    DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
4119
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
4120
    my_free((char*)tmp_buf, MYF(0));
unknown's avatar
unknown committed
4121
    DBUG_RETURN(error);
unknown's avatar
unknown committed
4122
  }
4123
  default:
unknown's avatar
unknown committed
4124
    inc_pos= event_len;
4125 4126
    break;
  }
unknown's avatar
unknown committed
4127
  if (likely(!ignore_event))
4128
  {
4129 4130 4131 4132 4133 4134
    if (ev->log_pos) 
      /* 
         Don't do it for fake Rotate events (see comment in
      Log_event::Log_event(const char* buf...) in log_event.cc).
      */
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
unknown's avatar
unknown committed
4135
    if (unlikely(rli->relay_log.append(ev)))
4136 4137 4138
    {
      delete ev;
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
4139
      DBUG_RETURN(1);
4140
    }
unknown's avatar
unknown committed
4141
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
4142 4143
  }
  delete ev;
unknown's avatar
unknown committed
4144
  mi->master_log_pos+= inc_pos;
4145
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
4146
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
4147
  DBUG_RETURN(0);
4148 4149
}

4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232
/*
  Reads a 4.0 event and converts it to the slave's format. This code was copied
  from queue_binlog_ver_1_event(), with some affordable simplifications.
*/
static int queue_binlog_ver_3_event(MASTER_INFO *mi, const char *buf,
			   ulong event_len)
{
  const char *errmsg = 0;
  ulong inc_pos;
  char *tmp_buf = 0;
  RELAY_LOG_INFO *rli= &mi->rli;
  DBUG_ENTER("queue_binlog_ver_3_event");

  /* read_log_event() will adjust log_pos to be end_log_pos */
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
                                            mi->rli.relay_log.description_event_for_queue);
  if (unlikely(!ev))
  {
    sql_print_error("Read invalid event from master: '%s',\
 master could be corrupt but a more likely cause of this is a bug",
		    errmsg);
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(1);
  }
  pthread_mutex_lock(&mi->data_lock);
  switch (ev->get_type_code()) {
  case STOP_EVENT:
    goto err;
  case ROTATE_EVENT:
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
    {
      delete ev;
      pthread_mutex_unlock(&mi->data_lock);
      DBUG_RETURN(1);
    }
    inc_pos= 0;
    break;
  default:
    inc_pos= event_len;
    break;
  }
  if (unlikely(rli->relay_log.append(ev)))
  {
    delete ev;
    pthread_mutex_unlock(&mi->data_lock);
    DBUG_RETURN(1);
  }
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
  delete ev;
  mi->master_log_pos+= inc_pos;
err:
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
  pthread_mutex_unlock(&mi->data_lock);
  DBUG_RETURN(0);
}

/*
  queue_old_event()

  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
  the 3.23/4.0 bytes, then write this event to the relay log.

  TODO: 
    Test this code before release - it has to be tested on a separate
    setup with 3.23 master or 4.0 master
*/

static int queue_old_event(MASTER_INFO *mi, const char *buf,
			   ulong event_len)
{
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
  {
  case 1:
      return queue_binlog_ver_1_event(mi,buf,event_len);
  case 3:
      return queue_binlog_ver_3_event(mi,buf,event_len);
  default: /* unsupported format; eg version 2 */
    DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()",
                       mi->rli.relay_log.description_event_for_queue->binlog_version));  
    return 1;
  }
}
4233

unknown's avatar
unknown committed
4234
/*
4235 4236
  queue_event()

4237 4238 4239 4240 4241
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
  no format conversion, it's pure read/write of bytes.
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
  any >=5.0.0 format.
unknown's avatar
unknown committed
4242 4243 4244
*/

int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
4245
{
unknown's avatar
unknown committed
4246 4247 4248
  int error= 0;
  ulong inc_pos;
  RELAY_LOG_INFO *rli= &mi->rli;
unknown's avatar
unknown committed
4249 4250
  DBUG_ENTER("queue_event");

4251 4252
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
unknown's avatar
unknown committed
4253
    DBUG_RETURN(queue_old_event(mi,buf,event_len));
4254 4255

  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
4256

unknown's avatar
unknown committed
4257 4258
  /*
    TODO: figure out if other events in addition to Rotate
4259 4260
    require special processing.
    Guilhem 2003-06 : I don't think so.
unknown's avatar
unknown committed
4261 4262
  */
  switch (buf[EVENT_TYPE_OFFSET]) {
4263
  case STOP_EVENT:
4264 4265
    /*
      We needn't write this event to the relay log. Indeed, it just indicates a
unknown's avatar
unknown committed
4266 4267 4268
      master server shutdown. The only thing this does is cleaning. But
      cleaning is already done on a per-master-thread basis (as the master
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
4269
      prepared statements' deletion are TODO only when we binlog prep stmts).
4270
      
unknown's avatar
unknown committed
4271 4272 4273 4274
      We don't even increment mi->master_log_pos, because we may be just after
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
      event from the next binlog (unless the master is presently running
      without --log-bin).
4275 4276
    */
    goto err;
4277 4278
  case ROTATE_EVENT:
  {
4279
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue); 
4280
    if (unlikely(process_io_rotate(mi,&rev)))
unknown's avatar
unknown committed
4281
    {
4282 4283
      error= 1;
      goto err;
unknown's avatar
unknown committed
4284
    }
4285 4286 4287 4288
    /*
      Now the I/O thread has just changed its mi->master_log_name, so
      incrementing mi->master_log_pos is nonsense.
    */
unknown's avatar
unknown committed
4289
    inc_pos= 0;
4290 4291
    break;
  }
4292 4293 4294 4295 4296 4297 4298
  case FORMAT_DESCRIPTION_EVENT:
  {
    /*
      Create an event, and save it (when we rotate the relay log, we will have
      to write this event again).
    */
    /*
4299 4300 4301
      We are the only thread which reads/writes description_event_for_queue.
      The relay_log struct does not move (though some members of it can
      change), so we needn't any lock (no rli->data_lock, no log lock).
4302
    */
4303
    Format_description_log_event* tmp;
4304
    const char* errmsg;
4305
    if (!(tmp= (Format_description_log_event*)
4306
          Log_event::read_log_event(buf, event_len, &errmsg,
4307
                                    mi->rli.relay_log.description_event_for_queue)))
4308 4309 4310 4311
    {
      error= 2;
      goto err;
    }
4312 4313
    delete mi->rli.relay_log.description_event_for_queue;
    mi->rli.relay_log.description_event_for_queue= tmp;
4314 4315 4316 4317 4318 4319 4320 4321 4322 4323 4324 4325 4326 4327
    /* 
       Though this does some conversion to the slave's format, this will
       preserve the master's binlog format version, and number of event types. 
    */
    /* 
       If the event was not requested by the slave (the slave did not ask for
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos 
    */
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
    DBUG_PRINT("info",("binlog format is now %d",
                       mi->rli.relay_log.description_event_for_queue->binlog_version));  

  }
  break;
4328
  default:
unknown's avatar
unknown committed
4329
    inc_pos= event_len;
4330 4331
    break;
  }
4332 4333 4334 4335

  /* 
     If this event is originating from this server, don't queue it. 
     We don't check this for 3.23 events because it's simpler like this; 3.23
unknown's avatar
unknown committed
4336 4337
     will be filtered anyway by the SQL slave thread which also tests the
     server id (we must also keep this test in the SQL thread, in case somebody
4338 4339 4340 4341 4342 4343 4344 4345
     upgrades a 4.0 slave which has a not-filtered relay log).

     ANY event coming from ourselves can be ignored: it is obvious for queries;
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
     (--log-slave-updates would not log that) unless this slave is also its
     direct master (an unsupported, useless setup!).
  */

unknown's avatar
unknown committed
4346 4347
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
      !replicate_same_server_id)
4348
  {
4349 4350 4351 4352 4353
    /*
      Do not write it to the relay log.
      We still want to increment, so that we won't re-read this event from the
      master if the slave IO thread is now stopped/restarted (more efficient if
      the events we are ignoring are big LOAD DATA INFILE).
4354 4355 4356
      But events which were generated by this slave and which do not exist in
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
      mi->master_log_pos.
4357
    */
4358 4359 4360 4361
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
      mi->master_log_pos+= inc_pos;
4362 4363
    DBUG_PRINT("info", ("master_log_pos: %d, event originating from the same server, ignored", (ulong) mi->master_log_pos));
  }  
unknown's avatar
unknown committed
4364 4365 4366
  else
  {
    /* write the event to the relay log */
4367
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
4368 4369 4370 4371 4372
    {
      mi->master_log_pos+= inc_pos;
      DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
    }
4373 4374
    else
      error=3;
unknown's avatar
unknown committed
4375
  }
4376 4377

err:
4378
  pthread_mutex_unlock(&mi->data_lock);
4379
  DBUG_PRINT("info", ("error=%d", error));
unknown's avatar
unknown committed
4380
  DBUG_RETURN(error);
4381 4382
}

4383

4384 4385
void end_relay_log_info(RELAY_LOG_INFO* rli)
{
4386 4387
  DBUG_ENTER("end_relay_log_info");

4388
  if (!rli->inited)
4389
    DBUG_VOID_RETURN;
4390
  if (rli->info_fd >= 0)
unknown's avatar
unknown committed
4391 4392
  {
    end_io_cache(&rli->info_file);
4393
    (void) my_close(rli->info_fd, MYF(MY_WME));
unknown's avatar
unknown committed
4394 4395
    rli->info_fd = -1;
  }
4396
  if (rli->cur_log_fd >= 0)
unknown's avatar
unknown committed
4397 4398 4399 4400 4401
  {
    end_io_cache(&rli->cache_buf);
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
4402
  rli->inited = 0;
4403
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
4404
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
unknown's avatar
unknown committed
4405 4406 4407 4408 4409 4410
  /*
    Delete the slave's temporary tables from memory.
    In the future there will be other actions than this, to ensure persistance
    of slave's temp tables after shutdown.
  */
  rli->close_temporary_tables();
4411
  DBUG_VOID_RETURN;
4412 4413
}

unknown's avatar
unknown committed
4414 4415
/*
  Try to connect until successful or slave killed
4416

unknown's avatar
unknown committed
4417 4418 4419 4420 4421
  SYNPOSIS
    safe_connect()
    thd			Thread handler for slave
    mysql		MySQL connection handle
    mi			Replication handle
4422

unknown's avatar
unknown committed
4423 4424 4425 4426
  RETURN
    0	ok
    #	Error
*/
4427

4428
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
unknown's avatar
unknown committed
4429
{
unknown's avatar
unknown committed
4430
  return connect_to_master(thd, mysql, mi, 0, 0);
unknown's avatar
unknown committed
4431 4432
}

4433

4434
/*
unknown's avatar
unknown committed
4435 4436
  SYNPOSIS
    connect_to_master()
unknown's avatar
unknown committed
4437

unknown's avatar
unknown committed
4438 4439 4440
  IMPLEMENTATION
    Try to connect until successful or slave killed or we have retried
    master_retry_count times
4441
*/
unknown's avatar
unknown committed
4442

unknown's avatar
unknown committed
4443
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
4444
			     bool reconnect, bool suppress_warnings)
unknown's avatar
unknown committed
4445
{
4446
  int slave_was_killed;
4447 4448
  int last_errno= -2;				// impossible error
  ulong err_count=0;
unknown's avatar
unknown committed
4449
  char llbuff[22];
4450
  DBUG_ENTER("connect_to_master");
unknown's avatar
unknown committed
4451

unknown's avatar
unknown committed
4452 4453 4454
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif
4455
  ulong client_flag= CLIENT_REMEMBER_OPTIONS;
4456 4457 4458
  if (opt_slave_compressed_protocol)
    client_flag=CLIENT_COMPRESS;		/* We will use compression */

4459 4460
  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
unknown's avatar
unknown committed
4461 4462 4463 4464 4465 4466 4467 4468 4469 4470 4471
 
#ifdef HAVE_OPENSSL
  if (mi->ssl)
    mysql_ssl_set(mysql, 
                  mi->ssl_key[0]?mi->ssl_key:0,
                  mi->ssl_cert[0]?mi->ssl_cert:0, 
                  mi->ssl_ca[0]?mi->ssl_ca:0,
                  mi->ssl_capath[0]?mi->ssl_capath:0,
                  mi->ssl_cipher[0]?mi->ssl_cipher:0);
#endif

4472 4473 4474 4475
  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
  /* This one is not strictly needed but we have it here for completeness */
  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);

4476
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
4477 4478 4479
	 (reconnect ? mysql_reconnect(mysql) != 0 :
	  mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
			     mi->port, 0, client_flag) == 0))
unknown's avatar
unknown committed
4480
  {
4481
    /* Don't repeat last error */
unknown's avatar
SCRUM  
unknown committed
4482
    if ((int)mysql_errno(mysql) != last_errno)
4483
    {
unknown's avatar
SCRUM  
unknown committed
4484
      last_errno=mysql_errno(mysql);
unknown's avatar
unknown committed
4485
      suppress_warnings= 0;
4486
      sql_print_error("Slave I/O thread: error %s to master \
4487
'%s@%s:%d': \
4488
Error: '%s'  errno: %d  retry-time: %d  retries: %d",
4489
		      (reconnect ? "reconnecting" : "connecting"),
4490
		      mi->user,mi->host,mi->port,
unknown's avatar
SCRUM  
unknown committed
4491
		      mysql_error(mysql), last_errno,
4492 4493
		      mi->connect_retry,
		      master_retry_count);
4494
    }
unknown's avatar
unknown committed
4495 4496 4497
    /*
      By default we try forever. The reason is that failure will trigger
      master election, so if the user did not set master_retry_count we
4498
      do not want to have election triggered on the first failure to
unknown's avatar
unknown committed
4499
      connect
4500
    */
4501
    if (++err_count == master_retry_count)
4502 4503
    {
      slave_was_killed=1;
unknown's avatar
unknown committed
4504 4505
      if (reconnect)
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
4506 4507
      break;
    }
4508 4509
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
	       (void*)mi);
unknown's avatar
unknown committed
4510
  }
4511

4512 4513
  if (!slave_was_killed)
  {
unknown's avatar
unknown committed
4514
    if (reconnect)
unknown's avatar
unknown committed
4515
    { 
4516
      if (!suppress_warnings && global_system_variables.log_warnings)
4517
	sql_print_information("Slave: connected to master '%s@%s:%d',\
4518
replication resumed in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
4519 4520 4521 4522
			mi->host, mi->port,
			IO_RPL_LOG_NAME,
			llstr(mi->master_log_pos,llbuff));
    }
unknown's avatar
unknown committed
4523 4524 4525 4526
    else
    {
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
      mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
unknown's avatar
unknown committed
4527
		      mi->user, mi->host, mi->port);
unknown's avatar
unknown committed
4528
    }
4529
#ifdef SIGNAL_WITH_VIO_CLOSE
4530
    thd->set_active_vio(mysql->net.vio);
4531
#endif      
4532
  }
4533
  mysql->reconnect= 1;
4534 4535
  DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
  DBUG_RETURN(slave_was_killed);
unknown's avatar
unknown committed
4536 4537
}

4538

unknown's avatar
unknown committed
4539
/*
4540
  safe_reconnect()
unknown's avatar
unknown committed
4541

unknown's avatar
unknown committed
4542 4543 4544
  IMPLEMENTATION
    Try to connect until successful or slave killed or we have retried
    master_retry_count times
unknown's avatar
unknown committed
4545 4546
*/

unknown's avatar
unknown committed
4547 4548
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings)
unknown's avatar
unknown committed
4549
{
unknown's avatar
unknown committed
4550 4551
  DBUG_ENTER("safe_reconnect");
  DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
unknown's avatar
unknown committed
4552 4553
}

unknown's avatar
unknown committed
4554

4555 4556 4557 4558 4559 4560 4561 4562 4563 4564 4565 4566 4567 4568 4569 4570 4571 4572 4573 4574 4575 4576 4577 4578 4579 4580 4581 4582 4583 4584
/*
  Store the file and position where the execute-slave thread are in the
  relay log.

  SYNOPSIS
    flush_relay_log_info()
    rli			Relay log information

  NOTES
    - As this is only called by the slave thread, we don't need to
      have a lock on this.
    - If there is an active transaction, then we don't update the position
      in the relay log.  This is to ensure that we re-execute statements
      if we die in the middle of an transaction that was rolled back.
    - As a transaction never spans binary logs, we don't have to handle the
      case where we do a relay-log-rotation in the middle of the transaction.
      If this would not be the case, we would have to ensure that we
      don't delete the relay log file where the transaction started when
      we switch to a new relay log file.

  TODO
    - Change the log file information to a binary format to avoid calling
      longlong2str.

  RETURN VALUES
    0	ok
    1	write error
*/

bool flush_relay_log_info(RELAY_LOG_INFO* rli)
4585
{
4586 4587 4588 4589
  bool error=0;
  IO_CACHE *file = &rli->info_file;
  char buff[FN_REFLEN*2+22*2+4], *pos;

4590
  my_b_seek(file, 0L);
4591
  pos=strmov(buff, rli->group_relay_log_name);
4592
  *pos++='\n';
4593
  pos=longlong2str(rli->group_relay_log_pos, pos, 10);
4594
  *pos++='\n';
4595
  pos=strmov(pos, rli->group_master_log_name);
4596
  *pos++='\n';
4597
  pos=longlong2str(rli->group_master_log_pos, pos, 10);
4598
  *pos='\n';
4599
  if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
4600 4601 4602
    error=1;
  if (flush_io_cache(file))
    error=1;
4603
  /* Flushing the relay log is done by the slave I/O thread */
4604
  return error;
4605 4606
}

4607

unknown's avatar
unknown committed
4608
/*
4609
  Called when we notice that the current "hot" log got rotated under our feet.
unknown's avatar
unknown committed
4610 4611 4612
*/

static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
4613 4614 4615
{
  DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
  DBUG_ASSERT(rli->cur_log_fd == -1);
unknown's avatar
unknown committed
4616 4617 4618
  DBUG_ENTER("reopen_relay_log");

  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
4619
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
unknown's avatar
unknown committed
4620
				   errmsg)) <0)
unknown's avatar
unknown committed
4621
    DBUG_RETURN(0);
4622 4623 4624 4625 4626
  /*
    We want to start exactly where we was before:
    relay_log_pos	Current log pos
    pending		Number of bytes already processed from the event
  */
4627
  rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE);
4628
  my_b_seek(cur_log,rli->event_relay_log_pos);
unknown's avatar
unknown committed
4629
  DBUG_RETURN(cur_log);
4630 4631
}

unknown's avatar
unknown committed
4632

4633 4634 4635 4636
Log_event* next_event(RELAY_LOG_INFO* rli)
{
  Log_event* ev;
  IO_CACHE* cur_log = rli->cur_log;
4637
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); 
4638 4639
  const char* errmsg=0;
  THD* thd = rli->sql_thd;
4640
  
unknown's avatar
unknown committed
4641
  DBUG_ENTER("next_event");
4642 4643
  DBUG_ASSERT(thd != 0);

unknown's avatar
unknown committed
4644 4645
  /*
    For most operations we need to protect rli members with data_lock,
4646 4647 4648 4649
    so we assume calling function acquired this mutex for us and we will
    hold it for the most of the loop below However, we will release it
    whenever it is worth the hassle,  and in the cases when we go into a
    pthread_cond_wait() with the non-data_lock mutex
unknown's avatar
unknown committed
4650
  */
4651
  safe_mutex_assert_owner(&rli->data_lock);
4652
  
4653
  while (!sql_slave_killed(thd,rli))
unknown's avatar
unknown committed
4654 4655 4656
  {
    /*
      We can have two kinds of log reading:
unknown's avatar
unknown committed
4657 4658 4659 4660 4661 4662 4663 4664
      hot_log:
        rli->cur_log points at the IO_CACHE of relay_log, which
        is actively being updated by the I/O thread. We need to be careful
        in this case and make sure that we are not looking at a stale log that
        has already been rotated. If it has been, we reopen the log.

      The other case is much simpler:
        We just have a read only log that nobody else will be updating.
unknown's avatar
unknown committed
4665
    */
4666 4667 4668 4669 4670
    bool hot_log;
    if ((hot_log = (cur_log != &rli->cache_buf)))
    {
      DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
      pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
4671 4672

      /*
unknown's avatar
unknown committed
4673
	Reading xxx_file_id is safe because the log will only
unknown's avatar
unknown committed
4674 4675
	be rotated when we hold relay_log.LOCK_log
      */
unknown's avatar
unknown committed
4676
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
4677
      {
unknown's avatar
unknown committed
4678 4679 4680 4681
	// The master has switched to a new log file; Reopen the old log file
	cur_log=reopen_relay_log(rli, &errmsg);
	pthread_mutex_unlock(log_lock);
	if (!cur_log)				// No more log files
4682
	  goto err;
unknown's avatar
unknown committed
4683
	hot_log=0;				// Using old binary log
4684 4685
      }
    }
4686

4687 4688
#ifndef DBUG_OFF
    {
4689
      /* This is an assertion which sometimes fails, let's try to track it */
4690
      char llbuf1[22], llbuf2[22];
4691
      DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
4692
                          llstr(my_b_tell(cur_log),llbuf1),
4693 4694 4695
                          llstr(rli->event_relay_log_pos,llbuf2)));
      DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
      DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
4696 4697
    }
#endif
unknown's avatar
unknown committed
4698 4699
    /*
      Relay log is always in new format - if the master is 3.23, the
4700
      I/O thread will convert the format for us.
4701 4702
      A problem: the description event may be in a previous relay log. So if
      the slave has been shutdown meanwhile, we would have to look in old relay
4703 4704
      logs, which may even have been deleted. So we need to write this
      description event at the beginning of the relay log.
4705 4706
      When the relay log is created when the I/O thread starts, easy: the
      master will send the description event and we will queue it.
4707 4708
      But if the relay log is created by new_file(): then the solution is:
      MYSQL_LOG::open() will write the buffered description event.
unknown's avatar
unknown committed
4709
    */
4710 4711
    if ((ev=Log_event::read_log_event(cur_log,0,
                                      rli->relay_log.description_event_for_exec)))
4712

4713 4714
    {
      DBUG_ASSERT(thd==rli->sql_thd);
4715 4716 4717 4718 4719
      /*
        read it while we have a lock, to avoid a mutex lock in
        inc_event_relay_log_pos()
      */
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
4720 4721
      if (hot_log)
	pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
4722
      DBUG_RETURN(ev);
4723 4724
    }
    DBUG_ASSERT(thd==rli->sql_thd);
unknown's avatar
unknown committed
4725
    if (opt_reckless_slave)			// For mysql-test
unknown's avatar
unknown committed
4726
      cur_log->error = 0;
unknown's avatar
unknown committed
4727
    if (cur_log->error < 0)
unknown's avatar
unknown committed
4728 4729
    {
      errmsg = "slave SQL thread aborted because of I/O error";
unknown's avatar
unknown committed
4730 4731
      if (hot_log)
	pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
4732 4733
      goto err;
    }
4734 4735
    if (!cur_log->error) /* EOF */
    {
unknown's avatar
unknown committed
4736 4737 4738 4739 4740
      /*
	On a hot log, EOF means that there are no more updates to
	process and we must block until I/O thread adds some and
	signals us to continue
      */
4741 4742
      if (hot_log)
      {
4743 4744 4745 4746 4747 4748 4749 4750 4751 4752 4753 4754 4755 4756 4757
        /*
          We say in Seconds_Behind_Master that we have "caught up". Note that
          for example if network link is broken but I/O slave thread hasn't
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
          up" whereas we're not really caught up. Fixing that would require
          internally cutting timeout in smaller pieces in network read, no
          thanks. Another example: SQL has caught up on I/O, now I/O has read
          a new event and is queuing it; the false "0" will exist until SQL
          finishes executing the new event; it will be look abnormal only if
          the events have old timestamps (then you get "many", 0, "many").
          Transient phases like this can't really be fixed.
        */
        time_t save_timestamp= rli->last_master_timestamp;
        rli->last_master_timestamp= 0;

unknown's avatar
unknown committed
4758
	DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count);
unknown's avatar
unknown committed
4759 4760 4761 4762
	/*
	  We can, and should release data_lock while we are waiting for
	  update. If we do not, show slave status will block
	*/
4763
	pthread_mutex_unlock(&rli->data_lock);
4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784 4785

        /*
          Possible deadlock : 
          - the I/O thread has reached log_space_limit
          - the SQL thread has read all relay logs, but cannot purge for some
          reason:
            * it has already purged all logs except the current one
            * there are other logs than the current one but they're involved in
            a transaction that finishes in the current one (or is not finished)
          Solution :
          Wake up the possibly waiting I/O thread, and set a boolean asking
          the I/O thread to temporarily ignore the log_space_limit
          constraint, because we do not want the I/O thread to block because of
          space (it's ok if it blocks for any other reason (e.g. because the
          master does not send anything). Then the I/O thread stops waiting 
          and reads more events.
          The SQL thread decides when the I/O thread should take log_space_limit
          into account again : ignore_log_space_limit is reset to 0 
          in purge_first_log (when the SQL thread purges the just-read relay
          log), and also when the SQL thread starts. We should also reset
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
          fact, no need as RESET SLAVE requires that the slave
unknown's avatar
unknown committed
4786 4787
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
          it stops.
4788 4789 4790 4791
        */
        pthread_mutex_lock(&rli->log_space_lock);
        // prevent the I/O thread from blocking next times
        rli->ignore_log_space_limit= 1; 
4792 4793 4794 4795 4796 4797
        /*
          If the I/O thread is blocked, unblock it.
          Ok to broadcast after unlock, because the mutex is only destroyed in
          ~st_relay_log_info(), i.e. when rli is destroyed, and rli will not be
          destroyed before we exit the present function.
        */
4798
        pthread_mutex_unlock(&rli->log_space_lock);
4799
        pthread_cond_broadcast(&rli->log_space_cond);
4800
        // Note that wait_for_update unlocks lock_log !
4801
        rli->relay_log.wait_for_update(rli->sql_thd, 1);
4802 4803
        // re-acquire data lock since we released it earlier
        pthread_mutex_lock(&rli->data_lock);
4804
        rli->last_master_timestamp= save_timestamp;
4805 4806
	continue;
      }
unknown's avatar
unknown committed
4807 4808 4809 4810 4811 4812 4813 4814 4815 4816
      /*
	If the log was not hot, we need to move to the next log in
	sequence. The next log could be hot or cold, we deal with both
	cases separately after doing some common initialization
      */
      end_io_cache(cur_log);
      DBUG_ASSERT(rli->cur_log_fd >= 0);
      my_close(rli->cur_log_fd, MYF(MY_WME));
      rli->cur_log_fd = -1;
	
4817
      if (relay_log_purge)
unknown's avatar
unknown committed
4818
      {
4819 4820 4821 4822 4823 4824 4825 4826 4827 4828 4829 4830 4831 4832 4833
	/*
          purge_first_log will properly set up relay log coordinates in rli.
          If the group's coordinates are equal to the event's coordinates
          (i.e. the relay log was not rotated in the middle of a group),
          we can purge this relay log too.
          We do ulonglong and string comparisons, this may be slow but
          - purging the last relay log is nice (it can save 1GB of disk), so we
          like to detect the case where we can do it, and given this,
          - I see no better detection method
          - purge_first_log is not called that often
        */
	if (rli->relay_log.purge_first_log
            (rli,
             rli->group_relay_log_pos == rli->event_relay_log_pos
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
unknown's avatar
unknown committed
4834
	{
4835
	  errmsg = "Error purging processed logs";
unknown's avatar
unknown committed
4836 4837 4838
	  goto err;
	}
      }
4839 4840
      else
      {
unknown's avatar
unknown committed
4841
	/*
4842 4843 4844 4845 4846
	  If hot_log is set, then we already have a lock on
	  LOCK_log.  If not, we have to get the lock.

	  According to Sasha, the only time this code will ever be executed
	  is if we are recovering from a bug.
unknown's avatar
unknown committed
4847
	*/
4848
	if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
4849
	{
unknown's avatar
unknown committed
4850 4851
	  errmsg = "error switching to the next log";
	  goto err;
4852
	}
4853 4854 4855
	rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
	strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
		sizeof(rli->event_relay_log_name)-1);
unknown's avatar
unknown committed
4856 4857
	flush_relay_log_info(rli);
      }
4858 4859 4860 4861 4862 4863 4864 4865 4866 4867 4868 4869 4870 4871

      /*
        Now we want to open this next log. To know if it's a hot log (the one
        being written by the I/O thread now) or a cold log, we can use
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
        the file normally. But if is_active() reports that the log is hot, this
        may change between the test and the consequence of the test. So we may
        open the I/O cache whereas the log is now cold, which is nonsense.
        To guard against this, we need to have LOCK_log.
      */

      DBUG_PRINT("info",("hot_log: %d",hot_log));
      if (!hot_log) /* if hot_log, we already have this mutex */
        pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
4872 4873
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
      {
4874
#ifdef EXTRA_DEBUG
unknown's avatar
unknown committed
4875
	if (global_system_variables.log_warnings)
4876 4877
	  sql_print_information("next log '%s' is currently active",
                                rli->linfo.log_file_name);
4878
#endif	  
unknown's avatar
unknown committed
4879 4880 4881
	rli->cur_log= cur_log= rli->relay_log.get_log_file();
	rli->cur_log_old_open_count= rli->relay_log.get_open_count();
	DBUG_ASSERT(rli->cur_log_fd == -1);
4882
	  
unknown's avatar
unknown committed
4883
	/*
unknown's avatar
unknown committed
4884
	  Read pointer has to be at the start since we are the only
4885 4886 4887 4888
	  reader.
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
          log (same as when we call read_log_event() above: for a hot log we
          take the mutex).
unknown's avatar
unknown committed
4889
	*/
unknown's avatar
unknown committed
4890
	if (check_binlog_magic(cur_log,&errmsg))
4891 4892
        {
          if (!hot_log) pthread_mutex_unlock(log_lock);
4893
	  goto err;
4894 4895
        }
        if (!hot_log) pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
4896
	continue;
4897
      }
4898
      if (!hot_log) pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
4899
      /*
4900 4901 4902
	if we get here, the log was not hot, so we will have to open it
	ourselves. We are sure that the log is still not hot now (a log can get
	from hot to cold, but not from cold to hot). No need for LOCK_log.
unknown's avatar
unknown committed
4903 4904
      */
#ifdef EXTRA_DEBUG
unknown's avatar
unknown committed
4905
      if (global_system_variables.log_warnings)
4906 4907
	sql_print_information("next log '%s' is not active",
                              rli->linfo.log_file_name);
unknown's avatar
unknown committed
4908 4909 4910 4911 4912
#endif	  
      // open_binlog() will check the magic header
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
				       &errmsg)) <0)
	goto err;
4913
    }
unknown's avatar
unknown committed
4914
    else
4915
    {
unknown's avatar
unknown committed
4916 4917 4918 4919 4920 4921
      /*
	Read failed with a non-EOF error.
	TODO: come up with something better to handle this error
      */
      if (hot_log)
	pthread_mutex_unlock(log_lock);
4922
      sql_print_error("Slave SQL thread: I/O error reading \
unknown's avatar
unknown committed
4923
event(errno: %d  cur_log->error: %d)",
4924
		      my_errno,cur_log->error);
unknown's avatar
unknown committed
4925
      // set read position to the beginning of the event
4926
      my_b_seek(cur_log,rli->event_relay_log_pos);
unknown's avatar
unknown committed
4927 4928
      /* otherwise, we have had a partial read */
      errmsg = "Aborting slave SQL thread because of partial event read";
4929
      break;					// To end of function
4930 4931
    }
  }
4932
  if (!errmsg && global_system_variables.log_warnings)
4933 4934 4935 4936 4937
  {
    sql_print_information("Error reading relay log event: %s", 
                          "slave SQL thread was killed");
    DBUG_RETURN(0);
  }
unknown's avatar
unknown committed
4938

4939
err:
4940 4941
  if (errmsg)
    sql_print_error("Error reading relay log event: %s", errmsg);
unknown's avatar
unknown committed
4942
  DBUG_RETURN(0);
4943 4944
}

4945 4946 4947 4948 4949 4950 4951 4952 4953 4954 4955
/*
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
  because of size is simpler because when we do it we already have all relevant
  locks; here we don't, so this function is mainly taking locks). 
  Returns nothing as we cannot catch any error (MYSQL_LOG::new_file() is void).
*/

void rotate_relay_log(MASTER_INFO* mi)
{
  DBUG_ENTER("rotate_relay_log");
  RELAY_LOG_INFO* rli= &mi->rli;
unknown's avatar
unknown committed
4956

4957 4958 4959
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
  pthread_mutex_lock(&mi->run_lock);

unknown's avatar
unknown committed
4960 4961 4962 4963
  /* 
     We need to test inited because otherwise, new_file() will attempt to lock
     LOCK_log, which may not be inited (if we're not a slave).
  */
4964 4965
  if (!rli->inited)
  {
unknown's avatar
unknown committed
4966
    DBUG_PRINT("info", ("rli->inited == 0"));
unknown's avatar
unknown committed
4967
    goto end;
4968
  }
unknown's avatar
unknown committed
4969

4970 4971
  /* If the relay log is closed, new_file() will do nothing. */
  rli->relay_log.new_file(1);
unknown's avatar
unknown committed
4972

4973 4974 4975 4976
  /*
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
    threads are started:
unknown's avatar
unknown committed
4977 4978 4979 4980 4981 4982
    relay_log_space decreases by the size of the deleted relay log, but does
    not increase, so flush-after-flush we may become negative, which is wrong.
    Even if this will be corrected as soon as a query is replicated on the
    slave (because the I/O thread will then call harvest_bytes_written() which
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
4983 4984 4985 4986
    If the log is closed, then this will just harvest the last writes, probably
    0 as they probably have been harvested.
  */
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
unknown's avatar
unknown committed
4987
end:
4988
  pthread_mutex_unlock(&mi->run_lock);
4989 4990 4991
  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
4992

4993
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
unknown's avatar
unknown committed
4994
template class I_List_iterator<i_string>;
unknown's avatar
unknown committed
4995
template class I_List_iterator<i_string_pair>;
unknown's avatar
unknown committed
4996
#endif
4997

4998

unknown's avatar
SCRUM  
unknown committed
4999
#endif /* HAVE_REPLICATION */