slave.cc 128 KB
Newer Older
1
/* Copyright (C) 2000-2003 MySQL AB
2

unknown's avatar
unknown committed
3 4
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
5
   the Free Software Foundation; version 2 of the License.
6

unknown's avatar
unknown committed
7 8 9 10
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
11

unknown's avatar
unknown committed
12 13 14 15 16
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "mysql_priv.h"
unknown's avatar
SCRUM  
unknown committed
17

unknown's avatar
unknown committed
18
#include <mysql.h>
19
#include <myisam.h>
20
#include "rpl_rli.h"
21
#include "slave.h"
22
#include "sql_repl.h"
unknown's avatar
unknown committed
23
#include "rpl_filter.h"
24
#include "repl_failsafe.h"
unknown's avatar
unknown committed
25
#include <thr_alarm.h>
unknown's avatar
unknown committed
26
#include <my_dir.h>
unknown's avatar
unknown committed
27
#include <sql_common.h>
28
#include <errmsg.h>
unknown's avatar
SCRUM  
unknown committed
29

30 31 32 33
#ifdef HAVE_REPLICATION

#include "rpl_tblmap.h"

34
int queue_event(MASTER_INFO* mi,const char* buf,ulong event_len);
35 36
static Log_event* next_event(RELAY_LOG_INFO* rli);

37

38
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
39

40
#define MAX_SLAVE_RETRY_PAUSE 5
41 42 43
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;

44 45
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);

46
char* slave_load_tmpdir = 0;
unknown's avatar
unknown committed
47
MASTER_INFO *active_mi= 0;
48
my_bool replicate_same_server_id;
49
ulonglong relay_log_space_limit = 0;
unknown's avatar
unknown committed
50 51 52 53 54 55 56

/*
  When slave thread exits, we need to remember the temporary tables so we
  can re-use them on slave start.

  TODO: move the vars below under MASTER_INFO
*/
57

58
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
59
int events_till_abort = -1;
unknown's avatar
unknown committed
60

61
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
unknown's avatar
unknown committed
62

63
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
unknown's avatar
unknown committed
64
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
unknown's avatar
unknown committed
65
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
66 67
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
68
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
69
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
unknown's avatar
unknown committed
70
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
71
                          bool suppress_warnings);
unknown's avatar
unknown committed
72
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
73
                             bool reconnect, bool suppress_warnings);
74
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
unknown's avatar
unknown committed
75
                      void* thread_killed_arg);
76
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
77
static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
unknown's avatar
unknown committed
78
                                  const char* table_name, bool overwrite);
79
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi);
unknown's avatar
unknown committed
80
static Log_event* next_event(RELAY_LOG_INFO* rli);
81 82

/*
unknown's avatar
unknown committed
83
  Find out which replications threads are running
84

unknown's avatar
unknown committed
85 86
  SYNOPSIS
    init_thread_mask()
unknown's avatar
unknown committed
87 88 89
    mask                Return value here
    mi                  master_info for slave
    inverse             If set, returns which threads are not running
unknown's avatar
unknown committed
90 91 92 93 94 95

  IMPLEMENTATION
    Get a bit mask for which threads are running so that we can later restart
    these threads.

  RETURN
unknown's avatar
unknown committed
96 97
    mask        If inverse == 0, running threads
                If inverse == 1, stopped threads
98 99
*/

100 101 102 103
void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
{
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
  register int tmp_mask=0;
104 105
  DBUG_ENTER("init_thread_mask");

106 107 108 109
  if (set_io)
    tmp_mask |= SLAVE_IO;
  if (set_sql)
    tmp_mask |= SLAVE_SQL;
110 111
  if (inverse)
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
112
  *mask = tmp_mask;
113
  DBUG_VOID_RETURN;
114 115
}

116

unknown's avatar
unknown committed
117
/*
118
  lock_slave_threads()
unknown's avatar
unknown committed
119
*/
120

121 122
void lock_slave_threads(MASTER_INFO* mi)
{
123 124
  DBUG_ENTER("lock_slave_threads");

125 126 127
  //TODO: see if we can do this without dual mutex
  pthread_mutex_lock(&mi->run_lock);
  pthread_mutex_lock(&mi->rli.run_lock);
128
  DBUG_VOID_RETURN;
129 130
}

131

unknown's avatar
unknown committed
132
/*
133
  unlock_slave_threads()
unknown's avatar
unknown committed
134
*/
135

136 137
void unlock_slave_threads(MASTER_INFO* mi)
{
138 139
  DBUG_ENTER("unlock_slave_threads");

140 141 142
  //TODO: see if we can do this without dual mutex
  pthread_mutex_unlock(&mi->rli.run_lock);
  pthread_mutex_unlock(&mi->run_lock);
143
  DBUG_VOID_RETURN;
144 145
}

146

unknown's avatar
unknown committed
147
/* Initialize slave structures */
148

149 150
int init_slave()
{
151
  DBUG_ENTER("init_slave");
152

153 154 155 156 157 158
  /*
    This is called when mysqld starts. Before client connections are
    accepted. However bootstrap may conflict with us if it does START SLAVE.
    So it's safer to take the lock.
  */
  pthread_mutex_lock(&LOCK_active_mi);
159 160 161 162
  /*
    TODO: re-write this to interate through the list of files
    for multi-master
  */
unknown's avatar
unknown committed
163
  active_mi= new MASTER_INFO;
164 165

  /*
166 167 168
    If master_host is not specified, try to read it from the master_info file.
    If master_host is specified, create the master_info file if it doesn't
    exists.
169
  */
170 171 172 173 174
  if (!active_mi)
  {
    sql_print_error("Failed to allocate memory for the master info structure");
    goto err;
  }
175

unknown's avatar
unknown committed
176
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
unknown's avatar
unknown committed
177
                       !master_host, (SLAVE_IO | SLAVE_SQL)))
178
  {
179
    sql_print_error("Failed to initialize the master info structure");
unknown's avatar
unknown committed
180
    goto err;
181
  }
182 183 184 185

  if (server_id && !master_host && active_mi->host[0])
    master_host= active_mi->host;

186 187
  /* If server id is not set, start_slave_thread() will say it */

188
  if (master_host && !opt_skip_slave_start)
189
  {
190
    if (start_slave_threads(1 /* need mutex */,
unknown's avatar
unknown committed
191 192 193 194 195
                            0 /* no wait for start*/,
                            active_mi,
                            master_info_file,
                            relay_log_info_file,
                            SLAVE_IO | SLAVE_SQL))
unknown's avatar
unknown committed
196
    {
197
      sql_print_error("Failed to create slave threads");
unknown's avatar
unknown committed
198 199
      goto err;
    }
200
  }
201
  pthread_mutex_unlock(&LOCK_active_mi);
202
  DBUG_RETURN(0);
203

unknown's avatar
unknown committed
204
err:
205
  pthread_mutex_unlock(&LOCK_active_mi);
unknown's avatar
unknown committed
206
  DBUG_RETURN(1);
207
}
208

209

unknown's avatar
unknown committed
210
/*
211
  Init function to set up array for errors that should be skipped for slave
unknown's avatar
unknown committed
212

unknown's avatar
unknown committed
213 214
  SYNOPSIS
    init_slave_skip_errors()
unknown's avatar
unknown committed
215
    arg         List of errors numbers to skip, separated with ','
unknown's avatar
unknown committed
216 217 218 219

  NOTES
    Called from get_options() in mysqld.cc on start-up
*/
220

unknown's avatar
unknown committed
221
void init_slave_skip_errors(const char* arg)
222
{
unknown's avatar
unknown committed
223
  const char *p;
224 225
  DBUG_ENTER("init_slave_skip_errors");

226
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
227 228 229 230 231
  {
    fprintf(stderr, "Badly out of memory, please check your system status\n");
    exit(1);
  }
  use_slave_mask = 1;
232
  for (;my_isspace(system_charset_info,*arg);++arg)
233
    /* empty */;
234
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
235 236
  {
    bitmap_set_all(&slave_error_mask);
237
    DBUG_VOID_RETURN;
238 239 240 241 242 243 244 245
  }
  for (p= arg ; *p; )
  {
    long err_code;
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
      break;
    if (err_code < MAX_SLAVE_ERROR)
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
246
    while (!my_isdigit(system_charset_info,*p) && *p)
247 248
      p++;
  }
249
  DBUG_VOID_RETURN;
250 251
}

252

253 254
int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
{
255 256
  DBUG_ENTER("terminate_slave_threads");

257
  if (!mi->inited)
258
    DBUG_RETURN(0); /* successfully do nothing */
259 260 261 262 263 264
  int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
  pthread_mutex_t *sql_cond_lock,*io_cond_lock;

  sql_cond_lock=sql_lock;
  io_cond_lock=io_lock;
unknown's avatar
unknown committed
265

266 267 268 269 270 271
  if (skip_lock)
  {
    sql_lock = io_lock = 0;
  }
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
  {
unknown's avatar
unknown committed
272
    DBUG_PRINT("info",("Terminating IO thread"));
273 274
    mi->abort_slave=1;
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
unknown's avatar
unknown committed
275 276 277 278
                                      io_cond_lock,
                                      &mi->stop_cond,
                                      &mi->slave_running)) &&
        !force_all)
279
      DBUG_RETURN(error);
280 281 282
  }
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
  {
unknown's avatar
unknown committed
283
    DBUG_PRINT("info",("Terminating SQL thread"));
284 285 286
    DBUG_ASSERT(mi->rli.sql_thd != 0) ;
    mi->rli.abort_slave=1;
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
unknown's avatar
unknown committed
287 288 289 290
                                      sql_cond_lock,
                                      &mi->rli.stop_cond,
                                      &mi->rli.slave_running)) &&
        !force_all)
291
      DBUG_RETURN(error);
292
  }
293
  DBUG_RETURN(0);
294 295
}

296

297
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
unknown's avatar
unknown committed
298 299 300
                           pthread_mutex_t *cond_lock,
                           pthread_cond_t* term_cond,
                           volatile uint *slave_running)
301
{
302
  DBUG_ENTER("terminate_slave_thread");
303 304 305 306 307 308
  if (term_lock)
  {
    pthread_mutex_lock(term_lock);
    if (!*slave_running)
    {
      pthread_mutex_unlock(term_lock);
309
      DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
310 311 312
    }
  }
  DBUG_ASSERT(thd != 0);
313
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
314
  /*
315
    Is is critical to test if the slave is running. Otherwise, we might
unknown's avatar
unknown committed
316
    be referening freed memory trying to kick it
317
  */
unknown's avatar
unknown committed
318

unknown's avatar
unknown committed
319
  while (*slave_running)                        // Should always be true
320
  {
321
    DBUG_PRINT("loop", ("killing slave thread"));
322
    KICK_SLAVE(thd);
unknown's avatar
unknown committed
323 324 325
    /*
      There is a small chance that slave thread might miss the first
      alarm. To protect againts it, resend the signal until it reacts
326 327
    */
    struct timespec abstime;
unknown's avatar
unknown committed
328
    set_timespec(abstime,2);
329 330 331 332
    pthread_cond_timedwait(term_cond, cond_lock, &abstime);
  }
  if (term_lock)
    pthread_mutex_unlock(term_lock);
333
  DBUG_RETURN(0);
334 335
}

336

unknown's avatar
unknown committed
337
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
unknown's avatar
unknown committed
338 339 340 341 342
                       pthread_mutex_t *cond_lock,
                       pthread_cond_t *start_cond,
                       volatile uint *slave_running,
                       volatile ulong *slave_run_id,
                       MASTER_INFO* mi,
343
                       bool high_priority)
344 345
{
  pthread_t th;
unknown's avatar
unknown committed
346 347 348
  ulong start_id;
  DBUG_ENTER("start_slave_thread");

349 350
  DBUG_ASSERT(mi->inited);

351 352 353 354 355 356 357 358 359
  if (start_lock)
    pthread_mutex_lock(start_lock);
  if (!server_id)
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
    sql_print_error("Server id not set, will not start slave");
unknown's avatar
unknown committed
360
    DBUG_RETURN(ER_BAD_SLAVE);
361
  }
unknown's avatar
unknown committed
362

363
  if (*slave_running)
364 365 366 367 368
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
369
    DBUG_RETURN(ER_SLAVE_MUST_STOP);
370
  }
unknown's avatar
unknown committed
371 372
  start_id= *slave_run_id;
  DBUG_PRINT("info",("Creating new slave thread"));
373 374
  if (high_priority)
    my_pthread_attr_setprio(&connection_attrib,CONNECT_PRIOR);
375 376 377 378
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
  {
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
379
    DBUG_RETURN(ER_SLAVE_THREAD);
380
  }
unknown's avatar
unknown committed
381
  if (start_cond && cond_lock) // caller has cond_lock
382 383
  {
    THD* thd = current_thd;
unknown's avatar
unknown committed
384
    while (start_id == *slave_run_id)
385
    {
unknown's avatar
unknown committed
386
      DBUG_PRINT("sleep",("Waiting for slave thread to start"));
387
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
unknown's avatar
unknown committed
388
                                            "Waiting for slave thread to start");
389 390
      pthread_cond_wait(start_cond,cond_lock);
      thd->exit_cond(old_msg);
unknown's avatar
unknown committed
391
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
392
      if (thd->killed)
unknown's avatar
unknown committed
393
        DBUG_RETURN(thd->killed_errno());
394 395 396 397
    }
  }
  if (start_lock)
    pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
398
  DBUG_RETURN(0);
399
}
unknown's avatar
unknown committed
400

401

unknown's avatar
unknown committed
402
/*
403
  start_slave_threads()
unknown's avatar
unknown committed
404

unknown's avatar
unknown committed
405 406 407 408
  NOTES
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
    sense to do that for starting a slave--we always care if it actually
    started the threads that were not previously running
409
*/
unknown's avatar
unknown committed
410

411
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
unknown's avatar
unknown committed
412 413
                        MASTER_INFO* mi, const char* master_info_fname,
                        const char* slave_info_fname, int thread_mask)
414 415 416 417
{
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
  pthread_cond_t* cond_io=0,*cond_sql=0;
  int error=0;
418
  DBUG_ENTER("start_slave_threads");
unknown's avatar
unknown committed
419

420 421 422 423 424 425 426 427 428 429 430 431
  if (need_slave_mutex)
  {
    lock_io = &mi->run_lock;
    lock_sql = &mi->rli.run_lock;
  }
  if (wait_for_start)
  {
    cond_io = &mi->start_cond;
    cond_sql = &mi->rli.start_cond;
    lock_cond_io = &mi->run_lock;
    lock_cond_sql = &mi->rli.run_lock;
  }
432 433 434

  if (thread_mask & SLAVE_IO)
    error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
unknown's avatar
unknown committed
435 436 437
                             cond_io,
                             &mi->slave_running, &mi->slave_run_id,
                             mi, 1); //high priority, to read the most possible
438
  if (!error && (thread_mask & SLAVE_SQL))
439
  {
440
    error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
unknown's avatar
unknown committed
441 442 443
                             cond_sql,
                             &mi->rli.slave_running, &mi->rli.slave_run_id,
                             mi, 0);
444 445 446
    if (error)
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
  }
447
  DBUG_RETURN(error);
448
}
449

450

451
#ifdef NOT_USED_YET
452 453
static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
{
454 455
  DBUG_ENTER("end_slave_on_walk");

456
  end_master_info(mi);
457
  DBUG_RETURN(0);
458
}
459
#endif
460

461

unknown's avatar
unknown committed
462 463 464 465 466 467
/*
  Free all resources used by slave

  SYNOPSIS
    end_slave()
*/
468

469 470
void end_slave()
{
471 472
  DBUG_ENTER("end_slave");

473 474 475 476 477 478 479 480
  /*
    This is called when the server terminates, in close_connections().
    It terminates slave threads. However, some CHANGE MASTER etc may still be
    running presently. If a START SLAVE was in progress, the mutex lock below
    will make us wait until slave threads have started, and START SLAVE
    returns, then we terminate them here.
  */
  pthread_mutex_lock(&LOCK_active_mi);
unknown's avatar
unknown committed
481 482 483 484 485 486 487 488 489 490 491 492
  if (active_mi)
  {
    /*
      TODO: replace the line below with
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
      once multi-master code is ready.
    */
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
    end_master_info(active_mi);
    delete active_mi;
    active_mi= 0;
  }
493
  pthread_mutex_unlock(&LOCK_active_mi);
494
  DBUG_VOID_RETURN;
495
}
unknown's avatar
unknown committed
496

497

498
static bool io_slave_killed(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
499
{
500 501
  DBUG_ENTER("io_slave_killed");

502
  DBUG_ASSERT(mi->io_thd == thd);
503
  DBUG_ASSERT(mi->slave_running); // tracking buffer overrun
504
  DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
unknown's avatar
unknown committed
505 506
}

507

508
static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
509
{
510 511
  DBUG_ENTER("sql_slave_killed");

512 513
  DBUG_ASSERT(rli->sql_thd == thd);
  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
514 515 516 517 518 519 520 521 522 523 524
  if (abort_loop || thd->killed || rli->abort_slave)
  {
    /*
      If we are in an unsafe situation (stopping could corrupt replication),
      we give one minute to the slave SQL thread of grace before really
      terminating, in the hope that it will be able to read more events and
      the unsafe situation will soon be left. Note that this one minute starts
      from the last time anything happened in the slave SQL thread. So it's
      really one minute of idleness, we don't timeout if the slave SQL thread
      is actively working.
    */
525
    if (rli->last_event_start_time == 0)
526
      DBUG_RETURN(1);
527 528
    DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
                        "it some grace period"));
529
    if (difftime(time(0), rli->last_event_start_time) > 60)
530 531 532 533 534 535 536 537
    {
      slave_print_msg(ERROR_LEVEL, rli, 0,
                      "SQL thread had to stop in an unsafe situation, in "
                      "the middle of applying updates to a "
                      "non-transactional table without any primary key. "
                      "There is a risk of duplicate updates when the slave "
                      "SQL thread is restarted. Please check your tables' "
                      "contents after restart.");
538
      DBUG_RETURN(1);
539 540
    }
  }
541
  DBUG_RETURN(0);
542 543
}

544

545
/*
546 547 548
  Writes a message to stderr, and if it's an error message, to
  rli->last_slave_error and rli->last_slave_errno (which will be displayed by
  SHOW SLAVE STATUS).
549 550

  SYNOPSIS
551 552 553
    slave_print_msg()
    level       The severity level
    rli
554
    err_code    The error code
555
    msg         The message (usually related to the error code, but can
556 557 558 559 560
                contain more information).
    ...         (this is printf-like format, with % symbols in msg)

  RETURN VALUES
    void
561
*/
562

563
void slave_print_msg(enum loglevel level, RELAY_LOG_INFO const *rli,
564
                     int err_code, const char* msg, ...)
565
{
566 567 568
  void (*report_function)(const char *, ...);
  char buff[MAX_SLAVE_ERRMSG], *pbuff= buff;
  uint pbuffsize= sizeof(buff);
569
  va_list args;
570 571
  DBUG_ENTER("slave_print_msg");

572
  va_start(args,msg);
573 574 575 576 577 578 579 580 581 582 583 584
  switch (level)
  {
  case ERROR_LEVEL:
    /*
      This my_error call only has effect in client threads.
      Slave threads do nothing in my_error().
    */
    my_error(ER_UNKNOWN_ERROR, MYF(0), msg);
    /*
      It's an error, it must be reported in Last_error and Last_errno in SHOW
      SLAVE STATUS.
    */
585
    pbuff= const_cast<RELAY_LOG_INFO*>(rli)->last_slave_error;
586
    pbuffsize= sizeof(rli->last_slave_error);
587
    const_cast<RELAY_LOG_INFO*>(rli)->last_slave_errno = err_code;
588 589 590 591 592 593 594 595 596 597
    report_function= sql_print_error;
    break;
  case WARNING_LEVEL:
    report_function= sql_print_warning;
    break;
  case INFORMATION_LEVEL:
    report_function= sql_print_information;
    break;
  default:
    DBUG_ASSERT(0); // should not come here
598
    DBUG_VOID_RETURN; // don't crash production builds, just do nothing
599 600 601 602 603
  }
  my_vsnprintf(pbuff, pbuffsize, msg, args);
  /* If the msg string ends with '.', do not add a ',' it would be ugly */
  if (pbuff[0] && (*(strend(pbuff)-1) == '.'))
    (*report_function)("Slave: %s Error_code: %d", pbuff, err_code);
604
  else
605
    (*report_function)("Slave: %s, Error_code: %d", pbuff, err_code);
606
  DBUG_VOID_RETURN;
607 608
}

unknown's avatar
unknown committed
609
/*
610 611
  skip_load_data_infile()

unknown's avatar
unknown committed
612 613 614
  NOTES
    This is used to tell a 3.23 master to break send_file()
*/
615 616 617

void skip_load_data_infile(NET *net)
{
618 619
  DBUG_ENTER("skip_load_data_infile");

620
  (void)net_request_file(net, "/dev/null");
unknown's avatar
unknown committed
621 622
  (void)my_net_read(net);                               // discard response
  (void)net_write_command(net, 0, "", 0, "", 0);        // Send ok
623
  DBUG_VOID_RETURN;
624 625
}

626

627
bool net_request_file(NET* net, const char* fname)
628
{
629 630
  DBUG_ENTER("net_request_file");
  DBUG_RETURN(net_write_command(net, 251, fname, strlen(fname), "", 0));
631 632
}

633 634
/*
  From other comments and tests in code, it looks like
635
  sometimes Query_log_event and Load_log_event can have db == 0
636 637 638
  (see rewrite_db() above for example)
  (cases where this happens are unclear; it may be when the master is 3.23).
*/
639 640

const char *print_slave_db_safe(const char* db)
641
{
642 643 644
  DBUG_ENTER("*print_slave_db_safe");

  DBUG_RETURN((db ? db : ""));
645
}
646

647
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
unknown's avatar
unknown committed
648
                                 const char *default_val)
unknown's avatar
unknown committed
649
{
unknown's avatar
unknown committed
650
  uint length;
651 652
  DBUG_ENTER("init_strvar_from_file");

unknown's avatar
unknown committed
653 654 655 656 657 658
  if ((length=my_b_gets(f,var, max_size)))
  {
    char* last_p = var + length -1;
    if (*last_p == '\n')
      *last_p = 0; // if we stopped on newline, kill it
    else
unknown's avatar
unknown committed
659
    {
unknown's avatar
unknown committed
660
      /*
unknown's avatar
unknown committed
661 662
        If we truncated a line or stopped on last char, remove all chars
        up to and including newline.
unknown's avatar
unknown committed
663
      */
unknown's avatar
unknown committed
664
      int c;
unknown's avatar
unknown committed
665
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
unknown's avatar
unknown committed
666
    }
667
    DBUG_RETURN(0);
unknown's avatar
unknown committed
668 669 670
  }
  else if (default_val)
  {
unknown's avatar
unknown committed
671
    strmake(var,  default_val, max_size-1);
672
    DBUG_RETURN(0);
unknown's avatar
unknown committed
673
  }
674
  DBUG_RETURN(1);
unknown's avatar
unknown committed
675 676
}

677

678
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
unknown's avatar
unknown committed
679 680
{
  char buf[32];
681 682
  DBUG_ENTER("init_intvar_from_file");

unknown's avatar
unknown committed
683 684

  if (my_b_gets(f, buf, sizeof(buf)))
unknown's avatar
unknown committed
685 686
  {
    *var = atoi(buf);
687
    DBUG_RETURN(0);
unknown's avatar
unknown committed
688
  }
unknown's avatar
unknown committed
689
  else if (default_val)
unknown's avatar
unknown committed
690 691
  {
    *var = default_val;
692
    DBUG_RETURN(0);
unknown's avatar
unknown committed
693
  }
694
  DBUG_RETURN(1);
unknown's avatar
unknown committed
695 696
}

697 698 699 700 701 702 703 704
/*
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
  relying on the binlog's version. This is not perfect: imagine an upgrade
  of the master without waiting that all slaves are in sync with the master;
  then a slave could be fooled about the binlog's format. This is what happens
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
  slaves are fooled. So we do this only to distinguish between 3.23 and more
  recent masters (it's too late to change things for 3.23).
unknown's avatar
unknown committed
705

706 707 708 709
  RETURNS
  0       ok
  1       error
*/
710

unknown's avatar
unknown committed
711
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi)
712
{
713
  const char* errmsg= 0;
714
  DBUG_ENTER("get_master_version_and_clock");
715 716 717 718 719 720 721

  /*
    Free old description_event_for_queue (that is needed if we are in
    a reconnection).
  */
  delete mi->rli.relay_log.description_event_for_queue;
  mi->rli.relay_log.description_event_for_queue= 0;
unknown's avatar
unknown committed
722

723
  if (!my_isdigit(&my_charset_bin,*mysql->server_version))
unknown's avatar
unknown committed
724
    errmsg = "Master reported unrecognized MySQL version";
725 726 727 728 729
  else
  {
    /*
      Note the following switch will bug when we have MySQL branch 30 ;)
    */
unknown's avatar
unknown committed
730
    switch (*mysql->server_version)
731 732 733 734 735 736 737 738
    {
    case '0':
    case '1':
    case '2':
      errmsg = "Master reported unrecognized MySQL version";
      break;
    case '3':
      mi->rli.relay_log.description_event_for_queue= new
unknown's avatar
unknown committed
739
        Format_description_log_event(1, mysql->server_version);
740 741 742
      break;
    case '4':
      mi->rli.relay_log.description_event_for_queue= new
unknown's avatar
unknown committed
743
        Format_description_log_event(3, mysql->server_version);
744
      break;
unknown's avatar
unknown committed
745
    default:
746 747 748 749 750 751 752 753 754
      /*
        Master is MySQL >=5.0. Give a default Format_desc event, so that we can
        take the early steps (like tests for "is this a 3.23 master") which we
        have to take before we receive the real master's Format_desc which will
        override this one. Note that the Format_desc we create below is garbage
        (it has the format of the *slave*); it's only good to help know if the
        master is 3.23, 4.0, etc.
      */
      mi->rli.relay_log.description_event_for_queue= new
unknown's avatar
unknown committed
755
        Format_description_log_event(4, mysql->server_version);
756 757
      break;
    }
758
  }
unknown's avatar
unknown committed
759 760

  /*
761 762 763 764 765
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
     can't read a 6.0 master, this will show up when the slave can't read some
     events sent by the master, and there will be error messages.
  */
unknown's avatar
unknown committed
766

767 768 769
  if (errmsg)
  {
    sql_print_error(errmsg);
770
    DBUG_RETURN(1);
771
  }
772 773 774 775 776

  /* as we are here, we tried to allocate the event */
  if (!mi->rli.relay_log.description_event_for_queue)
  {
    sql_print_error("Slave I/O thread failed to create a default Format_description_log_event");
777
    DBUG_RETURN(1);
778 779
  }

780 781 782 783 784 785
  /*
    Compare the master and slave's clock. Do not die if master's clock is
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
  */
  MYSQL_RES *master_res= 0;
  MYSQL_ROW master_row;
unknown's avatar
unknown committed
786

787
  if (!mysql_real_query(mysql, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
788 789
      (master_res= mysql_store_result(mysql)) &&
      (master_row= mysql_fetch_row(master_res)))
unknown's avatar
unknown committed
790
  {
unknown's avatar
unknown committed
791
    mi->clock_diff_with_master=
792
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
unknown's avatar
unknown committed
793
  }
794
  else
unknown's avatar
unknown committed
795
  {
796
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
797 798 799 800
    sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
                      "do not trust column Seconds_Behind_Master of SHOW "
                      "SLAVE STATUS. Error: %s (%d)",
                      mysql_error(mysql), mysql_errno(mysql));
801 802
  }
  if (master_res)
unknown's avatar
unknown committed
803 804
    mysql_free_result(master_res);

805 806 807 808 809 810 811 812 813 814
  /*
    Check that the master's server id and ours are different. Because if they
    are equal (which can result from a simple copy of master's datadir to slave,
    thus copying some my.cnf), replication will work but all events will be
    skipped.
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
    master?).
    Note: we could have put a @@SERVER_ID in the previous SELECT
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
  */
815 816
  if (!mysql_real_query(mysql,
                        STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
817 818 819 820
      (master_res= mysql_store_result(mysql)))
  {
    if ((master_row= mysql_fetch_row(master_res)) &&
        (::server_id == strtoul(master_row[1], 0, 10)) &&
821
        !mi->rli.replicate_same_server_id)
822 823 824 825 826
      errmsg= "The slave I/O thread stops because master and slave have equal \
MySQL server ids; these ids must be different for replication to work (or \
the --replicate-same-server-id option must be used on slave but this does \
not always make sense; please check the manual before using it).";
    mysql_free_result(master_res);
unknown's avatar
unknown committed
827 828
  }

829 830 831
  /*
    Check that the master's global character_set_server and ours are the same.
    Not fatal if query fails (old master?).
832 833 834 835 836 837
    Note that we don't check for equality of global character_set_client and
    collation_connection (neither do we prevent their setting in
    set_var.cc). That's because from what I (Guilhem) have tested, the global
    values of these 2 are never used (new connections don't use them).
    We don't test equality of global collation_database either as it's is
    going to be deprecated (made read-only) in 4.1 very soon.
838 839 840 841 842 843
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
    charset info in each binlog event.
    We don't do it for 3.23 because masters <3.23.50 hang on
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
    test only if master is 4.x.
844
  */
845 846 847

  /* redundant with rest of code but safer against later additions */
  if (*mysql->server_version == '3')
848
    goto err;
849 850

  if ((*mysql->server_version == '4') &&
851 852
      !mysql_real_query(mysql,
                        STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
853
      (master_res= mysql_store_result(mysql)))
unknown's avatar
unknown committed
854
  {
855 856 857 858 859 860
    if ((master_row= mysql_fetch_row(master_res)) &&
        strcmp(master_row[0], global_system_variables.collation_server->name))
      errmsg= "The slave I/O thread stops because master and slave have \
different values for the COLLATION_SERVER global variable. The values must \
be equal for replication to work";
    mysql_free_result(master_res);
unknown's avatar
unknown committed
861
  }
862

863 864 865 866 867 868 869 870
  /*
    Perform analogous check for time zone. Theoretically we also should
    perform check here to verify that SYSTEM time zones are the same on
    slave and master, but we can't rely on value of @@system_time_zone
    variable (it is time zone abbreviation) since it determined at start
    time and so could differ for slave and master even if they are really
    in the same system time zone. So we are omiting this check and just
    relying on documentation. Also according to Monty there are many users
unknown's avatar
unknown committed
871 872 873
    who are using replication between servers in various time zones. Hence
    such check will broke everything for them. (And now everything will
    work for them because by default both their master and slave will have
874
    'SYSTEM' time zone).
875 876
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
    those were alpha).
877
  */
878
  if ((*mysql->server_version == '4') &&
879
      !mysql_real_query(mysql, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
880
      (master_res= mysql_store_result(mysql)))
unknown's avatar
unknown committed
881
  {
882
    if ((master_row= mysql_fetch_row(master_res)) &&
unknown's avatar
unknown committed
883
        strcmp(master_row[0],
884 885 886 887 888
               global_system_variables.time_zone->get_name()->ptr()))
      errmsg= "The slave I/O thread stops because master and slave have \
different values for the TIME_ZONE global variable. The values must \
be equal for replication to work";
    mysql_free_result(master_res);
unknown's avatar
unknown committed
889 890
  }

891
err:
892 893 894
  if (errmsg)
  {
    sql_print_error(errmsg);
895
    DBUG_RETURN(1);
896
  }
897

898
  DBUG_RETURN(0);
899 900
}

901 902 903 904
/*
  Used by fetch_master_table (used by LOAD TABLE tblname FROM MASTER and LOAD
  DATA FROM MASTER). Drops the table (if 'overwrite' is true) and recreates it
  from the dump. Honours replication inclusion/exclusion rules.
905
  db must be non-zero (guarded by assertion).
906 907 908 909 910

  RETURN VALUES
    0           success
    1           error
*/
911

912
static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
unknown's avatar
unknown committed
913
                                  const char* table_name, bool overwrite)
unknown's avatar
unknown committed
914
{
915
  ulong packet_len;
916 917
  char *query, *save_db;
  uint32 save_db_length;
918 919
  Vio* save_vio;
  HA_CHECK_OPT check_opt;
unknown's avatar
unknown committed
920
  TABLE_LIST tables;
921 922
  int error= 1;
  handler *file;
923
  ulonglong save_options;
924
  NET *net= &mysql->net;
925
  DBUG_ENTER("create_table_from_dump");
unknown's avatar
unknown committed
926

927
  packet_len= my_net_read(net); // read create table statement
928 929
  if (packet_len == packet_error)
  {
unknown's avatar
unknown committed
930
    my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
unknown's avatar
unknown committed
931
    DBUG_RETURN(1);
932 933 934
  }
  if (net->read_pos[0] == 255) // error from master
  {
unknown's avatar
unknown committed
935
    char *err_msg;
936
    err_msg= (char*) net->read_pos + ((mysql->server_capabilities &
unknown's avatar
unknown committed
937 938
                                       CLIENT_PROTOCOL_41) ?
                                      3+SQLSTATE_LENGTH+1 : 3);
939
    my_error(ER_MASTER, MYF(0), err_msg);
unknown's avatar
unknown committed
940
    DBUG_RETURN(1);
941
  }
unknown's avatar
unknown committed
942
  thd->command = COM_TABLE_DUMP;
943
  thd->query_length= packet_len;
944
  /* Note that we should not set thd->query until the area is initalized */
945
  if (!(query = thd->strmake((char*) net->read_pos, packet_len)))
946 947
  {
    sql_print_error("create_table_from_dump: out of memory");
unknown's avatar
unknown committed
948
    my_message(ER_GET_ERRNO, "Out of memory", MYF(0));
unknown's avatar
unknown committed
949
    DBUG_RETURN(1);
950
  }
951
  thd->query= query;
unknown's avatar
unknown committed
952 953
  thd->query_error = 0;
  thd->net.no_send_ok = 1;
954

unknown's avatar
unknown committed
955 956
  bzero((char*) &tables,sizeof(tables));
  tables.db = (char*)db;
957
  tables.alias= tables.table_name= (char*)table_name;
unknown's avatar
unknown committed
958

unknown's avatar
unknown committed
959 960 961 962 963 964 965
  /* Drop the table if 'overwrite' is true */
  if (overwrite && mysql_rm_table(thd,&tables,1,0)) /* drop if exists */
  {
    sql_print_error("create_table_from_dump: failed to drop the table");
    goto err;
  }

966
  /* Create the table. We do not want to log the "create table" statement */
967
  save_options = thd->options;
968
  thd->options &= ~(ulong) (OPTION_BIN_LOG);
unknown's avatar
unknown committed
969
  thd->proc_info = "Creating table from master dump";
unknown's avatar
unknown committed
970
  // save old db in case we are creating in a different database
971
  save_db = thd->db;
972
  save_db_length= thd->db_length;
973
  thd->db = (char*)db;
974
  DBUG_ASSERT(thd->db != 0);
975
  thd->db_length= strlen(thd->db);
unknown's avatar
unknown committed
976
  mysql_parse(thd, thd->query, packet_len); // run create table
unknown's avatar
unknown committed
977
  thd->db = save_db;            // leave things the way the were before
978
  thd->db_length= save_db_length;
979
  thd->options = save_options;
unknown's avatar
unknown committed
980

981
  if (thd->query_error)
unknown's avatar
unknown committed
982
    goto err;                   // mysql_parse took care of the error send
unknown's avatar
unknown committed
983 984

  thd->proc_info = "Opening master dump table";
985
  tables.lock_type = TL_WRITE;
unknown's avatar
unknown committed
986 987 988
  if (!open_ltable(thd, &tables, TL_WRITE))
  {
    sql_print_error("create_table_from_dump: could not open created table");
989
    goto err;
unknown's avatar
unknown committed
990
  }
unknown's avatar
unknown committed
991

992
  file = tables.table->file;
unknown's avatar
unknown committed
993
  thd->proc_info = "Reading master dump table data";
994
  /* Copy the data file */
unknown's avatar
unknown committed
995 996
  if (file->net_read_dump(net))
  {
unknown's avatar
unknown committed
997
    my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
998
    sql_print_error("create_table_from_dump: failed in\
unknown's avatar
unknown committed
999
 handler::net_read_dump()");
1000
    goto err;
unknown's avatar
unknown committed
1001
  }
unknown's avatar
unknown committed
1002 1003

  check_opt.init();
unknown's avatar
unknown committed
1004
  check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK;
unknown's avatar
unknown committed
1005
  thd->proc_info = "Rebuilding the index on master dump table";
unknown's avatar
unknown committed
1006 1007 1008 1009 1010
  /*
    We do not want repair() to spam us with messages
    just send them to the error log, and report the failure in case of
    problems.
  */
1011
  save_vio = thd->net.vio;
unknown's avatar
unknown committed
1012
  thd->net.vio = 0;
1013
  /* Rebuild the index file from the copied data file (with REPAIR) */
unknown's avatar
unknown committed
1014
  error=file->ha_repair(thd,&check_opt) != 0;
unknown's avatar
unknown committed
1015
  thd->net.vio = save_vio;
1016
  if (error)
unknown's avatar
unknown committed
1017
    my_error(ER_INDEX_REBUILD, MYF(0), tables.table->s->table_name.str);
1018 1019

err:
unknown's avatar
unknown committed
1020 1021
  close_thread_tables(thd);
  thd->net.no_send_ok = 0;
unknown's avatar
unknown committed
1022
  DBUG_RETURN(error);
unknown's avatar
unknown committed
1023 1024
}

1025

1026
int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
unknown's avatar
unknown committed
1027
                       MASTER_INFO *mi, MYSQL *mysql, bool overwrite)
unknown's avatar
unknown committed
1028
{
1029 1030 1031 1032 1033
  int error= 1;
  const char *errmsg=0;
  bool called_connected= (mysql != NULL);
  DBUG_ENTER("fetch_master_table");
  DBUG_PRINT("enter", ("db_name: '%s'  table_name: '%s'",
unknown's avatar
unknown committed
1034
                       db_name,table_name));
unknown's avatar
unknown committed
1035

unknown's avatar
merge  
unknown committed
1036
  if (!called_connected)
unknown's avatar
unknown committed
1037
  {
unknown's avatar
SCRUM  
unknown committed
1038
    if (!(mysql = mysql_init(NULL)))
1039 1040 1041
    {
      DBUG_RETURN(1);
    }
unknown's avatar
merge  
unknown committed
1042
    if (connect_to_master(thd, mysql, mi))
1043
    {
1044
      my_error(ER_CONNECT_TO_MASTER, MYF(0), mysql_error(mysql));
1045 1046 1047 1048 1049 1050 1051 1052 1053
      /*
        We need to clear the active VIO since, theoretically, somebody
        might issue an awake() on this thread.  If we are then in the
        middle of closing and destroying the VIO inside the
        mysql_close(), we will have a problem.
       */
#ifdef SIGNAL_WITH_VIO_CLOSE
      thd->clear_active_vio();
#endif
unknown's avatar
SCRUM  
unknown committed
1054
      mysql_close(mysql);
1055
      DBUG_RETURN(1);
1056
    }
1057 1058
    if (thd->killed)
      goto err;
1059
  }
unknown's avatar
unknown committed
1060

unknown's avatar
unknown committed
1061
  if (request_table_dump(mysql, db_name, table_name))
1062
  {
1063 1064
    error= ER_UNKNOWN_ERROR;
    errmsg= "Failed on table dump request";
1065 1066
    goto err;
  }
unknown's avatar
unknown committed
1067
  if (create_table_from_dump(thd, mysql, db_name,
unknown's avatar
unknown committed
1068
                             table_name, overwrite))
unknown's avatar
unknown committed
1069
    goto err;    // create_table_from_dump have sent the error already
unknown's avatar
unknown committed
1070
  error = 0;
1071

unknown's avatar
unknown committed
1072
 err:
1073
  thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
1074
  if (!called_connected)
unknown's avatar
SCRUM  
unknown committed
1075
    mysql_close(mysql);
1076
  if (errmsg && thd->vio_ok())
unknown's avatar
unknown committed
1077
    my_message(error, errmsg, MYF(0));
unknown's avatar
unknown committed
1078
  DBUG_RETURN(test(error));                     // Return 1 on error
unknown's avatar
unknown committed
1079
}
unknown's avatar
unknown committed
1080

1081

unknown's avatar
unknown committed
1082 1083
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
{
1084
  bool slave_killed=0;
unknown's avatar
unknown committed
1085
  MASTER_INFO* mi = rli->mi;
unknown's avatar
unknown committed
1086
  const char *save_proc_info;
unknown's avatar
unknown committed
1087 1088
  THD* thd = mi->io_thd;
  DBUG_ENTER("wait_for_relay_log_space");
1089

unknown's avatar
unknown committed
1090
  pthread_mutex_lock(&rli->log_space_lock);
unknown's avatar
unknown committed
1091
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
unknown's avatar
unknown committed
1092 1093
                                  &rli->log_space_lock,
                                  "\
1094
Waiting for the slave SQL thread to free enough relay log space");
unknown's avatar
unknown committed
1095
  while (rli->log_space_limit < rli->log_space_total &&
unknown's avatar
unknown committed
1096
         !(slave_killed=io_slave_killed(thd,mi)) &&
1097
         !rli->ignore_log_space_limit)
unknown's avatar
unknown committed
1098
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
unknown's avatar
unknown committed
1099
  thd->exit_cond(save_proc_info);
unknown's avatar
unknown committed
1100 1101
  DBUG_RETURN(slave_killed);
}
1102

1103

1104 1105
/*
  Builds a Rotate from the ignored events' info and writes it to relay log.
1106

1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120
  SYNOPSIS
  write_ignored_events_info_to_relay_log()
    thd             pointer to I/O thread's thd
    mi

  DESCRIPTION
    Slave I/O thread, going to die, must leave a durable trace of the
    ignored events' end position for the use of the slave SQL thread, by
    calling this function. Only that thread can call it (see assertion).
 */
static void write_ignored_events_info_to_relay_log(THD *thd, MASTER_INFO *mi)
{
  RELAY_LOG_INFO *rli= &mi->rli;
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1121 1122
  DBUG_ENTER("write_ignored_events_info_to_relay_log");

1123 1124 1125
  DBUG_ASSERT(thd == mi->io_thd);
  pthread_mutex_lock(log_lock);
  if (rli->ign_master_log_name_end[0])
unknown's avatar
unknown committed
1126
  {
1127
    DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
1128
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141
                                               0, rli->ign_master_log_pos_end,
                                               Rotate_log_event::DUP_NAME);
    rli->ign_master_log_name_end[0]= 0;
    /* can unlock before writing as slave SQL thd will soon see our Rotate */
    pthread_mutex_unlock(log_lock);
    if (likely((bool)ev))
    {
      ev->server_id= 0; // don't be ignored by slave SQL thread
      if (unlikely(rli->relay_log.append(ev)))
        sql_print_error("Slave I/O thread failed to write a Rotate event"
                        " to the relay log, "
                        "SHOW SLAVE STATUS may be inaccurate");
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1142 1143
      if (flush_master_info(mi, 1))
        sql_print_error("Failed to flush master info file");
1144 1145 1146 1147 1148 1149
      delete ev;
    }
    else
      sql_print_error("Slave I/O thread failed to create a Rotate event"
                      " (out of memory?), "
                      "SHOW SLAVE STATUS may be inaccurate");
unknown's avatar
unknown committed
1150
  }
1151 1152
  else
    pthread_mutex_unlock(log_lock);
1153
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
1154 1155
}

1156

1157 1158
int register_slave_on_master(MYSQL* mysql)
{
1159 1160
  char buf[1024], *pos= buf;
  uint report_host_len, report_user_len=0, report_password_len=0;
1161
  DBUG_ENTER("register_slave_on_master");
1162

1163
  if (!report_host)
1164
    DBUG_RETURN(0);
1165
  report_host_len= strlen(report_host);
1166
  if (report_user)
1167
    report_user_len= strlen(report_user);
unknown's avatar
unknown committed
1168
  if (report_password)
1169 1170 1171 1172
    report_password_len= strlen(report_password);
  /* 30 is a good safety margin */
  if (report_host_len + report_user_len + report_password_len + 30 >
      sizeof(buf))
unknown's avatar
unknown committed
1173
    DBUG_RETURN(0);                                     // safety
1174 1175

  int4store(pos, server_id); pos+= 4;
unknown's avatar
unknown committed
1176
  pos= net_store_data(pos, report_host, report_host_len);
1177 1178 1179
  pos= net_store_data(pos, report_user, report_user_len);
  pos= net_store_data(pos, report_password, report_password_len);
  int2store(pos, (uint16) report_port); pos+= 2;
unknown's avatar
unknown committed
1180
  int4store(pos, rpl_recovery_rank);    pos+= 4;
1181
  /* The master will fill in master_id */
unknown's avatar
unknown committed
1182
  int4store(pos, 0);                    pos+= 4;
1183

unknown's avatar
SCRUM  
unknown committed
1184
  if (simple_command(mysql, COM_REGISTER_SLAVE, (char*) buf,
unknown's avatar
unknown committed
1185
                        (uint) (pos- buf), 0))
1186
  {
1187
    sql_print_error("Error on COM_REGISTER_SLAVE: %d '%s'",
unknown's avatar
unknown committed
1188 1189
                    mysql_errno(mysql),
                    mysql_error(mysql));
1190
    DBUG_RETURN(1);
1191
  }
1192
  DBUG_RETURN(0);
1193 1194
}

1195

unknown's avatar
unknown committed
1196
bool show_master_info(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
1197
{
1198
  // TODO: fix this for multi-master
unknown's avatar
unknown committed
1199
  List<Item> field_list;
1200 1201 1202
  Protocol *protocol= thd->protocol;
  DBUG_ENTER("show_master_info");

unknown's avatar
unknown committed
1203
  field_list.push_back(new Item_empty_string("Slave_IO_State",
unknown's avatar
unknown committed
1204
                                                     14));
unknown's avatar
unknown committed
1205
  field_list.push_back(new Item_empty_string("Master_Host",
unknown's avatar
unknown committed
1206
                                                     sizeof(mi->host)));
unknown's avatar
unknown committed
1207
  field_list.push_back(new Item_empty_string("Master_User",
unknown's avatar
unknown committed
1208
                                                     sizeof(mi->user)));
1209
  field_list.push_back(new Item_return_int("Master_Port", 7,
unknown's avatar
unknown committed
1210
                                           MYSQL_TYPE_LONG));
1211
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
unknown's avatar
unknown committed
1212
                                           MYSQL_TYPE_LONG));
1213
  field_list.push_back(new Item_empty_string("Master_Log_File",
unknown's avatar
unknown committed
1214
                                             FN_REFLEN));
1215
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
unknown's avatar
unknown committed
1216
                                           MYSQL_TYPE_LONGLONG));
1217
  field_list.push_back(new Item_empty_string("Relay_Log_File",
unknown's avatar
unknown committed
1218
                                             FN_REFLEN));
1219
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
unknown's avatar
unknown committed
1220
                                           MYSQL_TYPE_LONGLONG));
1221
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
unknown's avatar
unknown committed
1222
                                             FN_REFLEN));
1223 1224
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1225 1226 1227 1228 1229 1230
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
unknown's avatar
unknown committed
1231
                                             28));
1232 1233 1234
  field_list.push_back(new Item_return_int("Last_Errno", 4, MYSQL_TYPE_LONG));
  field_list.push_back(new Item_empty_string("Last_Error", 20));
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
unknown's avatar
unknown committed
1235
                                           MYSQL_TYPE_LONG));
1236
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
unknown's avatar
unknown committed
1237
                                           MYSQL_TYPE_LONGLONG));
1238
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
unknown's avatar
unknown committed
1239
                                           MYSQL_TYPE_LONGLONG));
1240
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
1241
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
unknown's avatar
unknown committed
1242
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1243
                                           MYSQL_TYPE_LONGLONG));
unknown's avatar
unknown committed
1244 1245 1246
  field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
  field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
                                             sizeof(mi->ssl_ca)));
unknown's avatar
unknown committed
1247
  field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
unknown's avatar
unknown committed
1248
                                             sizeof(mi->ssl_capath)));
unknown's avatar
unknown committed
1249
  field_list.push_back(new Item_empty_string("Master_SSL_Cert",
unknown's avatar
unknown committed
1250
                                             sizeof(mi->ssl_cert)));
unknown's avatar
unknown committed
1251
  field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
unknown's avatar
unknown committed
1252
                                             sizeof(mi->ssl_cipher)));
unknown's avatar
unknown committed
1253
  field_list.push_back(new Item_empty_string("Master_SSL_Key",
unknown's avatar
unknown committed
1254
                                             sizeof(mi->ssl_key)));
1255
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
unknown's avatar
unknown committed
1256
                                           MYSQL_TYPE_LONGLONG));
1257 1258
  field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
                                             3));
unknown's avatar
unknown committed
1259

1260 1261
  if (protocol->send_fields(&field_list,
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
unknown's avatar
unknown committed
1262
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
1263

1264 1265
  if (mi->host[0])
  {
1266
    DBUG_PRINT("info",("host is set: '%s'", mi->host));
1267
    String *packet= &thd->packet;
1268
    protocol->prepare_for_resend();
unknown's avatar
unknown committed
1269

1270 1271 1272 1273 1274
    /*
      TODO: we read slave_running without run_lock, whereas these variables
      are updated under run_lock and not data_lock. In 5.0 we should lock
      run_lock on top of data_lock (with good order).
    */
1275 1276
    pthread_mutex_lock(&mi->data_lock);
    pthread_mutex_lock(&mi->rli.data_lock);
unknown's avatar
unknown committed
1277 1278

    protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
1279 1280
    protocol->store(mi->host, &my_charset_bin);
    protocol->store(mi->user, &my_charset_bin);
1281 1282
    protocol->store((uint32) mi->port);
    protocol->store((uint32) mi->connect_retry);
1283
    protocol->store(mi->master_log_name, &my_charset_bin);
1284
    protocol->store((ulonglong) mi->master_log_pos);
1285
    protocol->store(mi->rli.group_relay_log_name +
unknown's avatar
unknown committed
1286 1287
                    dirname_length(mi->rli.group_relay_log_name),
                    &my_charset_bin);
1288 1289
    protocol->store((ulonglong) mi->rli.group_relay_log_pos);
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
unknown's avatar
unknown committed
1290 1291
    protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
                    "Yes" : "No", &my_charset_bin);
1292
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1293 1294 1295
    protocol->store(rpl_filter->get_do_db());
    protocol->store(rpl_filter->get_ignore_db());

1296 1297
    char buf[256];
    String tmp(buf, sizeof(buf), &my_charset_bin);
1298
    rpl_filter->get_do_table(&tmp);
1299
    protocol->store(&tmp);
1300
    rpl_filter->get_ignore_table(&tmp);
1301
    protocol->store(&tmp);
1302
    rpl_filter->get_wild_do_table(&tmp);
1303
    protocol->store(&tmp);
1304
    rpl_filter->get_wild_ignore_table(&tmp);
1305 1306
    protocol->store(&tmp);

1307
    protocol->store(mi->rli.last_slave_errno);
1308
    protocol->store(mi->rli.last_slave_error, &my_charset_bin);
1309
    protocol->store((uint32) mi->rli.slave_skip_counter);
1310
    protocol->store((ulonglong) mi->rli.group_master_log_pos);
1311
    protocol->store((ulonglong) mi->rli.log_space_total);
1312 1313

    protocol->store(
unknown's avatar
unknown committed
1314
      mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_NONE ? "None":
1315 1316 1317 1318
        ( mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_MASTER_POS? "Master":
          "Relay"), &my_charset_bin);
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
    protocol->store((ulonglong) mi->rli.until_log_pos);
unknown's avatar
unknown committed
1319 1320

#ifdef HAVE_OPENSSL
unknown's avatar
unknown committed
1321 1322 1323 1324 1325 1326 1327 1328 1329
    protocol->store(mi->ssl? "Yes":"No", &my_charset_bin);
#else
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
#endif
    protocol->store(mi->ssl_ca, &my_charset_bin);
    protocol->store(mi->ssl_capath, &my_charset_bin);
    protocol->store(mi->ssl_cert, &my_charset_bin);
    protocol->store(mi->ssl_cipher, &my_charset_bin);
    protocol->store(mi->ssl_key, &my_charset_bin);
unknown's avatar
unknown committed
1330

1331 1332 1333 1334 1335 1336
    /*
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
      connected, we can compute it otherwise show NULL (i.e. unknown).
    */
    if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) &&
        mi->rli.slave_running)
1337
    {
unknown's avatar
unknown committed
1338 1339 1340
      long time_diff= ((long)((time_t)time((time_t*) 0)
                              - mi->rli.last_master_timestamp)
                       - mi->clock_diff_with_master);
1341
      /*
unknown's avatar
unknown committed
1342 1343
        Apparently on some systems time_diff can be <0. Here are possible
        reasons related to MySQL:
1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355
        - the master is itself a slave of another master whose time is ahead.
        - somebody used an explicit SET TIMESTAMP on the master.
        Possible reason related to granularity-to-second of time functions
        (nothing to do with MySQL), which can explain a value of -1:
        assume the master's and slave's time are perfectly synchronized, and
        that at slave's connection time, when the master's timestamp is read,
        it is at the very end of second 1, and (a very short time later) when
        the slave's timestamp is read it is at the very beginning of second
        2. Then the recorded value for master is 1 and the recorded value for
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
        between timestamp of slave and rli->last_master_timestamp is 0
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1356 1357 1358 1359
        This confuses users, so we don't go below 0: hence the max().

        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
        special marker to say "consider we have caught up".
1360
      */
unknown's avatar
unknown committed
1361 1362
      protocol->store((longlong)(mi->rli.last_master_timestamp ?
                                 max(0, time_diff) : 0));
1363
    }
unknown's avatar
unknown committed
1364
    else
1365
    {
unknown's avatar
unknown committed
1366
      protocol->store_null();
1367 1368
    }
    protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
unknown's avatar
unknown committed
1369

1370 1371
    pthread_mutex_unlock(&mi->rli.data_lock);
    pthread_mutex_unlock(&mi->data_lock);
1372

1373
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
unknown's avatar
unknown committed
1374
      DBUG_RETURN(TRUE);
1375
  }
1376
  send_eof(thd);
unknown's avatar
unknown committed
1377
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
1378 1379
}

unknown's avatar
unknown committed
1380

1381 1382
void set_slave_thread_options(THD* thd)
{
1383
  DBUG_ENTER("set_slave_thread_options");
1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
  /*
     It's nonsense to constrain the slave threads with max_join_size; if a
     query succeeded on master, we HAVE to execute it. So set
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
     SELECT examining more than 4 billion rows would still fail (yes, because
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
     only for client threads.
  */
  ulonglong options= thd->options | OPTION_BIG_SELECTS;
  if (opt_log_slave_updates)
    options|= OPTION_BIN_LOG;
  else
    options&= ~OPTION_BIN_LOG;
  thd->options= options;
1399
  thd->variables.completion_type= 0;
1400
  DBUG_VOID_RETURN;
1401
}
1402

1403
void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO const *rli)
1404
{
1405 1406
  DBUG_ENTER("set_slave_thread_default_charset");

1407 1408 1409 1410 1411 1412 1413
  thd->variables.character_set_client=
    global_system_variables.character_set_client;
  thd->variables.collation_connection=
    global_system_variables.collation_connection;
  thd->variables.collation_server=
    global_system_variables.collation_server;
  thd->update_charset();
1414 1415 1416 1417 1418 1419 1420 1421

  /*
    We use a const cast here since the conceptual (and externally
    visible) behavior of the function is to set the default charset of
    the thread.  That the cache has to be invalidated is a secondary
    effect.
   */
  const_cast<RELAY_LOG_INFO*>(rli)->cached_charset_invalidate();
1422
  DBUG_VOID_RETURN;
1423 1424
}

unknown's avatar
unknown committed
1425
/*
1426
  init_slave_thread()
unknown's avatar
unknown committed
1427
*/
1428

1429
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
unknown's avatar
unknown committed
1430 1431
{
  DBUG_ENTER("init_slave_thread");
1432
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
unknown's avatar
unknown committed
1433
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1434
  thd->security_ctx->skip_grants();
unknown's avatar
unknown committed
1435
  my_net_init(&thd->net, 0);
1436 1437 1438 1439 1440
/*
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
  slave threads, since a replication event can become this much larger
  than the corresponding packet (query) sent from client to master.
*/
1441
  thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1442
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
1443
  thd->slave_thread = 1;
1444
  set_slave_thread_options(thd);
unknown's avatar
unknown committed
1445 1446
  thd->client_capabilities = CLIENT_LOCAL_FILES;
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
1447
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
unknown's avatar
unknown committed
1448 1449
  pthread_mutex_unlock(&LOCK_thread_count);

1450
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
1451
  {
1452 1453
    thd->cleanup();
    delete thd;
unknown's avatar
unknown committed
1454 1455 1456
    DBUG_RETURN(-1);
  }

1457
  if (thd_type == SLAVE_THD_SQL)
1458
    thd->proc_info= "Waiting for the next event in relay log";
1459
  else
1460
    thd->proc_info= "Waiting for master update";
unknown's avatar
unknown committed
1461 1462 1463 1464 1465
  thd->version=refresh_version;
  thd->set_time();
  DBUG_RETURN(0);
}

1466

1467
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
unknown's avatar
unknown committed
1468
                      void* thread_killed_arg)
unknown's avatar
unknown committed
1469
{
1470
  int nap_time;
unknown's avatar
unknown committed
1471
  thr_alarm_t alarmed;
1472 1473
  DBUG_ENTER("safe_sleep");

unknown's avatar
unknown committed
1474 1475 1476 1477
  thr_alarm_init(&alarmed);
  time_t start_time= time((time_t*) 0);
  time_t end_time= start_time+sec;

1478
  while ((nap_time= (int) (end_time - start_time)) > 0)
unknown's avatar
unknown committed
1479
  {
1480
    ALARM alarm_buff;
unknown's avatar
unknown committed
1481
    /*
1482
      The only reason we are asking for alarm is so that
unknown's avatar
unknown committed
1483 1484 1485
      we will be woken up in case of murder, so if we do not get killed,
      set the alarm so it goes off after we wake up naturally
    */
1486
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
unknown's avatar
unknown committed
1487
    sleep(nap_time);
1488
    thr_end_alarm(&alarmed);
unknown's avatar
unknown committed
1489

1490
    if ((*thread_killed)(thd,thread_killed_arg))
1491
      DBUG_RETURN(1);
unknown's avatar
unknown committed
1492 1493
    start_time=time((time_t*) 0);
  }
1494
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1495 1496
}

1497

unknown's avatar
unknown committed
1498
static int request_dump(MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
1499
                        bool *suppress_warnings)
unknown's avatar
unknown committed
1500
{
1501
  char buf[FN_REFLEN + 10];
unknown's avatar
unknown committed
1502 1503
  int len;
  int binlog_flags = 0; // for now
1504
  char* logname = mi->master_log_name;
1505 1506
  DBUG_ENTER("request_dump");

unknown's avatar
unknown committed
1507
  // TODO if big log files: Change next to int8store()
unknown's avatar
unknown committed
1508
  int4store(buf, (ulong) mi->master_log_pos);
unknown's avatar
unknown committed
1509
  int2store(buf + 4, binlog_flags);
1510
  int4store(buf + 6, server_id);
unknown's avatar
unknown committed
1511
  len = (uint) strlen(logname);
1512
  memcpy(buf + 10, logname,len);
unknown's avatar
SCRUM  
unknown committed
1513
  if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
unknown's avatar
unknown committed
1514
  {
unknown's avatar
unknown committed
1515 1516 1517 1518 1519
    /*
      Something went wrong, so we will just reconnect and retry later
      in the future, we should do a better error analysis, but for
      now we just fill up the error log :-)
    */
unknown's avatar
SCRUM  
unknown committed
1520
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
unknown's avatar
unknown committed
1521
      *suppress_warnings= 1;                    // Suppress reconnect warning
unknown's avatar
unknown committed
1522
    else
1523
      sql_print_error("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs",
unknown's avatar
unknown committed
1524 1525
                      mysql_errno(mysql), mysql_error(mysql),
                      master_connect_retry);
1526
    DBUG_RETURN(1);
unknown's avatar
unknown committed
1527
  }
unknown's avatar
unknown committed
1528

1529
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1530 1531
}

1532

1533
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
unknown's avatar
unknown committed
1534 1535
{
  char buf[1024];
1536 1537
  DBUG_ENTER("request_table_dump");

unknown's avatar
unknown committed
1538
  char * p = buf;
unknown's avatar
unknown committed
1539 1540
  uint table_len = (uint) strlen(table);
  uint db_len = (uint) strlen(db);
unknown's avatar
unknown committed
1541
  if (table_len + db_len > sizeof(buf) - 2)
unknown's avatar
unknown committed
1542 1543
  {
    sql_print_error("request_table_dump: Buffer overrun");
1544
    DBUG_RETURN(1);
unknown's avatar
unknown committed
1545 1546
  }

unknown's avatar
unknown committed
1547 1548 1549 1550 1551
  *p++ = db_len;
  memcpy(p, db, db_len);
  p += db_len;
  *p++ = table_len;
  memcpy(p, table, table_len);
unknown's avatar
unknown committed
1552

unknown's avatar
SCRUM  
unknown committed
1553
  if (simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
unknown's avatar
unknown committed
1554 1555
  {
    sql_print_error("request_table_dump: Error sending the table dump \
unknown's avatar
unknown committed
1556
command");
1557
    DBUG_RETURN(1);
unknown's avatar
unknown committed
1558
  }
unknown's avatar
unknown committed
1559

1560
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1561 1562
}

1563

unknown's avatar
unknown committed
1564
/*
1565
  Read one event from the master
unknown's avatar
unknown committed
1566

unknown's avatar
unknown committed
1567 1568
  SYNOPSIS
    read_event()
unknown's avatar
unknown committed
1569 1570 1571 1572 1573 1574
    mysql               MySQL connection
    mi                  Master connection information
    suppress_warnings   TRUE when a normal net read timeout has caused us to
                        try a reconnect.  We do not want to print anything to
                        the error log in this case because this a anormal
                        event in an idle server.
1575

unknown's avatar
unknown committed
1576
    RETURN VALUES
unknown's avatar
unknown committed
1577 1578
    'packet_error'      Error
    number              Length of packet
unknown's avatar
unknown committed
1579 1580 1581
*/

static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
unknown's avatar
unknown committed
1582
{
1583
  ulong len;
1584
  DBUG_ENTER("read_event");
unknown's avatar
unknown committed
1585

1586
  *suppress_warnings= 0;
unknown's avatar
unknown committed
1587 1588 1589 1590
  /*
    my_real_read() will time us out
    We check if we were told to die, and if not, try reading again
  */
unknown's avatar
unknown committed
1591
#ifndef DBUG_OFF
1592
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
unknown's avatar
unknown committed
1593
    DBUG_RETURN(packet_error);
unknown's avatar
unknown committed
1594
#endif
unknown's avatar
unknown committed
1595

1596
  len = cli_safe_read(mysql);
1597
  if (len == packet_error || (long) len < 1)
unknown's avatar
unknown committed
1598
  {
unknown's avatar
SCRUM  
unknown committed
1599
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
unknown's avatar
unknown committed
1600 1601
    {
      /*
unknown's avatar
unknown committed
1602 1603 1604
        We are trying a normal reconnect after a read timeout;
        we suppress prints to .err file as long as the reconnect
        happens without problems
unknown's avatar
unknown committed
1605 1606 1607 1608
      */
      *suppress_warnings= TRUE;
    }
    else
1609
      sql_print_error("Error reading packet from server: %s ( server_errno=%d)",
unknown's avatar
unknown committed
1610
                      mysql_error(mysql), mysql_errno(mysql));
1611
    DBUG_RETURN(packet_error);
unknown's avatar
unknown committed
1612 1613
  }

1614 1615
  /* Check if eof packet */
  if (len < 8 && mysql->net.read_pos[0] == 254)
unknown's avatar
unknown committed
1616
  {
1617 1618
    sql_print_information("Slave: received end packet from server, apparent "
                          "master shutdown: %s",
unknown's avatar
unknown committed
1619
                     mysql_error(mysql));
1620
     DBUG_RETURN(packet_error);
unknown's avatar
unknown committed
1621
  }
unknown's avatar
unknown committed
1622

unknown's avatar
unknown committed
1623
  DBUG_PRINT("exit", ("len: %lu  net->read_pos[4]: %d",
unknown's avatar
unknown committed
1624 1625
                      len, mysql->net.read_pos[4]));
  DBUG_RETURN(len - 1);
unknown's avatar
unknown committed
1626 1627
}

unknown's avatar
unknown committed
1628

1629 1630
int check_expected_error(THD* thd, RELAY_LOG_INFO const *rli,
                         int expected_error)
1631
{
1632 1633
  DBUG_ENTER("check_expected_error");

unknown's avatar
unknown committed
1634 1635
  switch (expected_error) {
  case ER_NET_READ_ERROR:
unknown's avatar
unknown committed
1636 1637
  case ER_NET_ERROR_ON_WRITE:
  case ER_SERVER_SHUTDOWN:
unknown's avatar
unknown committed
1638
  case ER_NEW_ABORTING_CONNECTION:
1639
    DBUG_RETURN(1);
unknown's avatar
unknown committed
1640
  default:
1641
    DBUG_RETURN(0);
unknown's avatar
unknown committed
1642
  }
1643
}
1644

1645

unknown's avatar
unknown committed
1646 1647 1648 1649 1650 1651 1652 1653 1654
/*
  Check if the current error is of temporary nature of not.
  Some errors are temporary in nature, such as
  ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.  Ndb also signals
  that the error is temporary by pushing a warning with the error code
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
*/
static int has_temporary_error(THD *thd)
{
1655 1656
  DBUG_ENTER("has_temporary_error");

unknown's avatar
unknown committed
1657
  if (thd->is_fatal_error)
1658
    DBUG_RETURN(0);
unknown's avatar
unknown committed
1659 1660 1661 1662 1663 1664 1665 1666

  /*
    Temporary error codes:
    currently, InnoDB deadlock detected by InnoDB or lock
    wait timeout (innodb_lock_wait_timeout exceeded
  */
  if (thd->net.last_errno == ER_LOCK_DEADLOCK ||
      thd->net.last_errno == ER_LOCK_WAIT_TIMEOUT)
1667
    DBUG_RETURN(1);
unknown's avatar
unknown committed
1668 1669 1670 1671 1672 1673 1674 1675 1676

#ifdef HAVE_NDB_BINLOG
  /*
    currently temporary error set in ndbcluster
  */
  List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
  MYSQL_ERROR *err;
  while ((err= it++))
  {
unknown's avatar
unknown committed
1677
    DBUG_PRINT("info", ("has warning %d %s", err->code, err->msg));
unknown's avatar
unknown committed
1678 1679 1680
    switch (err->code)
    {
    case ER_GET_TEMPORARY_ERRMSG:
1681
      DBUG_RETURN(1);
unknown's avatar
unknown committed
1682 1683 1684 1685 1686
    default:
      break;
    }
  }
#endif
1687
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1688
}
1689

1690
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
unknown's avatar
unknown committed
1691
{
1692 1693
  DBUG_ENTER("exec_relay_log_event");

1694 1695
  /*
     We acquire this mutex since we need it for all operations except
1696
     event execution. But we will release it in places where we will
1697 1698 1699
     wait for something for example inside of next_event().
   */
  pthread_mutex_lock(&rli->data_lock);
1700 1701 1702 1703 1704 1705 1706 1707
  /*
    This tests if the position of the end of the last previous executed event
    hits the UNTIL barrier.
    We would prefer to test if the position of the start (or possibly) end of
    the to-be-read event hits the UNTIL barrier, this is different if there
    was an event ignored by the I/O thread just before (BUG#13861 to be
    fixed).
  */
1708 1709
  if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE &&
      rli->is_until_satisfied())
1710
  {
1711
    char buf[22];
1712
    sql_print_information("Slave SQL thread stopped because it reached its"
1713
                    " UNTIL position %s", llstr(rli->until_pos(), buf));
1714
    /*
1715 1716 1717 1718 1719
      Setting abort_slave flag because we do not want additional message about
      error in query execution to be printed.
    */
    rli->abort_slave= 1;
    pthread_mutex_unlock(&rli->data_lock);
1720
    DBUG_RETURN(1);
1721
  }
1722

1723
  Log_event * ev = next_event(rli);
1724

1725
  DBUG_ASSERT(rli->sql_thd==thd);
1726

1727
  if (sql_slave_killed(thd,rli))
1728
  {
1729
    pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1730
    delete ev;
1731
    DBUG_RETURN(1);
1732
  }
1733 1734
  if (ev)
  {
1735
    int const type_code= ev->get_type_code();
1736
    int exec_res= 0;
1737 1738 1739

    /*
    */
1740

1741 1742
    DBUG_PRINT("info",("type_code=%d (%s), server_id=%d",
                       type_code, ev->get_type_str(), ev->server_id));
1743 1744 1745
    DBUG_PRINT("info", ("thd->options={ %s%s}",
                        FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
                        FLAGSTR(thd->options, OPTION_BEGIN)));
1746 1747 1748



1749
    /*
1750 1751
      Execute the event to change the database and update the binary
      log coordinates, but first we set some data that is needed for
1752
      the thread.
1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770

      The event will be executed unless it is supposed to be skipped.

      Queries originating from this server must be skipped.  Low-level
      events (Format_description_log_event, Rotate_log_event,
      Stop_log_event) from this server must also be skipped. But for
      those we don't want to modify 'group_master_log_pos', because
      these events did not exist on the master.
      Format_description_log_event is not completely skipped.

      Skip queries specified by the user in 'slave_skip_counter'.  We
      can't however skip events that has something to do with the log
      files themselves.

      Filtering on own server id is extremely important, to ignore
      execution of events created by the creation/rotation of the relay
      log (remember that now the relay log starts with its Format_desc,
      has a Rotate etc).
1771
    */
1772

1773
    thd->server_id = ev->server_id; // use the original server id for logging
unknown's avatar
unknown committed
1774
    thd->set_time();                            // time the query
unknown's avatar
unknown committed
1775
    thd->lex->current_select= 0;
unknown's avatar
unknown committed
1776
    if (!ev->when)
unknown's avatar
unknown committed
1777
      ev->when = time(NULL);
1778
    ev->thd = thd; // because up to this point, ev->thd == 0
1779

1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835
    int reason= ev->shall_skip(rli);
    if (reason == Log_event::EVENT_SKIP_COUNT)
      --rli->slave_skip_counter;
    pthread_mutex_unlock(&rli->data_lock);
    if (reason == Log_event::EVENT_SKIP_NOT)
      exec_res= ev->apply_event(rli);
#ifndef DBUG_OFF
    else
    {
      /*
        This only prints information to the debug trace.

        TODO: Print an informational message to the error log?
       */
      static const char *const explain[] = {
        "event was not skipped",                  // EVENT_SKIP_NOT,
        "event originated from this server",      // EVENT_SKIP_IGNORE,
        "event skip counter was non-zero"         // EVENT_SKIP_COUNT
      };
      DBUG_PRINT("info", ("%s was skipped because %s",
                          ev->get_type_str(), explain[reason]));
    }
#endif

    DBUG_PRINT("info", ("apply_event error = %d", exec_res));
    if (exec_res == 0)
    {
      int error= ev->update_pos(rli);
      char buf[22];
      DBUG_PRINT("info", ("update_pos error = %d", error));
      DBUG_PRINT("info", ("group %s %s",
                          llstr(rli->group_relay_log_pos, buf),
                          rli->group_relay_log_name));
      DBUG_PRINT("info", ("event %s %s",
                          llstr(rli->event_relay_log_pos, buf),
                          rli->event_relay_log_name));
      /*
        The update should not fail, so print an error message and
        return an error code.

        TODO: Replace this with a decent error message when merged
        with BUG#24954 (which adds several new error message).
      */
      if (error)
      {
        slave_print_msg(ERROR_LEVEL, rli, ER_UNKNOWN_ERROR,
                        "It was not possible to update the positions"
                        " of the relay log information: the slave may"
                        " be in an inconsistent state."
                        " Stopped in %s position %s",
                        rli->group_relay_log_name,
                        llstr(rli->group_relay_log_pos, buf));
        DBUG_RETURN(1);
      }
    }

1836
    /*
1837 1838 1839 1840
       Format_description_log_event should not be deleted because it will be
       used to read info about the relay log's format; it will be deleted when
       the SQL thread does not need it, i.e. when this thread terminates.
    */
1841
    if (type_code != FORMAT_DESCRIPTION_EVENT)
1842 1843 1844 1845
    {
      DBUG_PRINT("info", ("Deleting the event after it has been executed"));
      delete ev;
    }
1846 1847
    if (slave_trans_retries)
    {
unknown's avatar
unknown committed
1848
      if (exec_res && has_temporary_error(thd))
1849 1850 1851 1852
      {
        const char *errmsg;
        /*
          We were in a transaction which has been rolled back because of a
unknown's avatar
unknown committed
1853 1854
          temporary error;
          let's seek back to BEGIN log event and retry it all again.
1855
	  Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1856
	  there is no rollback since 5.0.13 (ref: manual).
1857 1858 1859 1860 1861 1862 1863 1864
          We have to not only seek but also
          a) init_master_info(), to seek back to hot relay log's start for later
          (for when we will come back to this hot log after re-processing the
          possibly existing old logs where BEGIN is: check_binlog_magic() will
          then need the cache to be at position 0 (see comments at beginning of
          init_master_info()).
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
        */
1865
        if (rli->trans_retries < slave_trans_retries)
1866 1867 1868 1869 1870 1871
        {
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
            sql_print_error("Failed to initialize the master info structure");
          else if (init_relay_log_pos(rli,
                                      rli->group_relay_log_name,
                                      rli->group_relay_log_pos,
1872
                                      1, &errmsg, 1))
1873 1874 1875 1876 1877
            sql_print_error("Error initializing relay log position: %s",
                            errmsg);
          else
          {
            exec_res= 0;
1878
            end_trans(thd, ROLLBACK);
unknown's avatar
unknown committed
1879
            /* chance for concurrent connection to get more locks */
1880
            safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
unknown's avatar
unknown committed
1881
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1882
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
unknown's avatar
unknown committed
1883
            rli->trans_retries++;
1884 1885 1886 1887
            rli->retried_trans++;
            pthread_mutex_unlock(&rli->data_lock);
            DBUG_PRINT("info", ("Slave retries transaction "
                                "rli->trans_retries: %lu", rli->trans_retries));
unknown's avatar
unknown committed
1888
          }
1889 1890 1891 1892 1893 1894 1895
        }
        else
          sql_print_error("Slave SQL thread retried transaction %lu time(s) "
                          "in vain, giving up. Consider raising the value of "
                          "the slave_transaction_retries variable.",
                          slave_trans_retries);
      }
1896 1897 1898 1899 1900 1901 1902 1903 1904 1905
      else if (!((thd->options & OPTION_BEGIN) && opt_using_transactions))
      {
        /*
          Only reset the retry counter if the event succeeded or
          failed with a non-transient error.  On a successful event,
          the execution will proceed as usual; in the case of a
          non-transient error, the slave will stop with an error.
         */
        rli->trans_retries= 0; // restart from fresh
      }
1906 1907
    }
    DBUG_RETURN(exec_res);
1908
  }
1909 1910
  pthread_mutex_unlock(&rli->data_lock);
  slave_print_msg(ERROR_LEVEL, rli, 0, "\
1911 1912 1913 1914 1915 1916 1917
Could not parse relay log event entry. The possible reasons are: the master's \
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
binary log), the slave's relay log is corrupted (you can check this by running \
'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
or slave's MySQL code. If you want to check the master's binary log or slave's \
relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
on this slave.\
unknown's avatar
unknown committed
1918
");
1919
  DBUG_RETURN(1);
unknown's avatar
unknown committed
1920 1921
}

1922

unknown's avatar
unknown committed
1923
/* Slave I/O Thread entry point */
1924

1925
pthread_handler_t handle_slave_io(void *arg)
unknown's avatar
unknown committed
1926
{
unknown's avatar
unknown committed
1927 1928
  THD *thd; // needs to be first for thread_stack
  MYSQL *mysql;
1929
  MASTER_INFO *mi = (MASTER_INFO*)arg;
1930
  RELAY_LOG_INFO *rli= &mi->rli;
unknown's avatar
unknown committed
1931 1932
  char llbuff[22];
  uint retry_count;
1933

unknown's avatar
unknown committed
1934 1935
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
1936
  DBUG_ENTER("handle_slave_io");
unknown's avatar
unknown committed
1937

1938
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
1939 1940 1941
  mysql= NULL ;
  retry_count= 0;

1942
  pthread_mutex_lock(&mi->run_lock);
unknown's avatar
unknown committed
1943 1944 1945
  /* Inform waiting threads that slave has started */
  mi->slave_run_id++;

1946
#ifndef DBUG_OFF
1947
  mi->events_till_disconnect = disconnect_slave_event_count;
1948 1949
#endif

1950
  thd= new THD; // note that contructor of THD uses DBUG_ !
1951
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
1952 1953

  pthread_detach_this_thread();
1954
  thd->thread_stack= (char*) &thd; // remember where our stack is
1955
  if (init_slave_thread(thd, SLAVE_THD_IO))
unknown's avatar
unknown committed
1956 1957 1958 1959 1960 1961
  {
    pthread_cond_broadcast(&mi->start_cond);
    pthread_mutex_unlock(&mi->run_lock);
    sql_print_error("Failed during slave I/O thread initialization");
    goto err;
  }
1962
  mi->io_thd = thd;
1963
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
1964
  threads.append(thd);
1965
  pthread_mutex_unlock(&LOCK_thread_count);
1966 1967 1968
  mi->slave_running = 1;
  mi->abort_slave = 0;
  pthread_mutex_unlock(&mi->run_lock);
1969
  pthread_cond_broadcast(&mi->start_cond);
1970

1971
  DBUG_PRINT("master_info",("log_file_name: '%s'  position: %s",
unknown's avatar
unknown committed
1972 1973
                            mi->master_log_name,
                            llstr(mi->master_log_pos,llbuff)));
1974

unknown's avatar
SCRUM  
unknown committed
1975
  if (!(mi->mysql = mysql = mysql_init(NULL)))
unknown's avatar
unknown committed
1976
  {
unknown's avatar
unknown committed
1977
    sql_print_error("Slave I/O thread: error in mysql_init()");
unknown's avatar
unknown committed
1978 1979
    goto err;
  }
1980

1981
  thd->proc_info = "Connecting to master";
1982
  // we can get killed during safe_connect
1983
  if (!safe_connect(thd, mysql, mi))
1984
  {
unknown's avatar
unknown committed
1985 1986 1987
    sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',"
                          "replication started in log '%s' at position %s",
                          mi->user, mi->host, mi->port,
1988 1989
			  IO_RPL_LOG_NAME,
			  llstr(mi->master_log_pos,llbuff));
1990 1991 1992 1993 1994
  /*
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
    thread, since a replication event can become this much larger than
    the corresponding packet (query) sent from client to master.
  */
1995 1996
    mysql->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
  }
1997
  else
unknown's avatar
unknown committed
1998
  {
1999
    sql_print_information("Slave I/O thread killed while connecting to master");
unknown's avatar
unknown committed
2000 2001
    goto err;
  }
2002

2003
connected:
2004

2005 2006
  // TODO: the assignment below should be under mutex (5.0)
  mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
2007
  thd->slave_net = &mysql->net;
2008
  thd->proc_info = "Checking master version";
unknown's avatar
unknown committed
2009
  if (get_master_version_and_clock(mysql, mi))
2010
    goto err;
2011 2012

  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2013
  {
unknown's avatar
unknown committed
2014 2015 2016 2017 2018
    /*
      Register ourselves with the master.
      If fails, this is not fatal - we just print the error message and go
      on with life.
    */
2019
    thd->proc_info = "Registering slave on master";
2020
    if (register_slave_on_master(mysql) ||  update_slave_list(mysql, mi))
2021 2022
      goto err;
  }
2023

2024
  DBUG_PRINT("info",("Starting reading binary log from master"));
2025
  while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2026
  {
2027
    bool suppress_warnings= 0;
unknown's avatar
unknown committed
2028
    thd->proc_info = "Requesting binlog dump";
unknown's avatar
unknown committed
2029
    if (request_dump(mysql, mi, &suppress_warnings))
unknown's avatar
unknown committed
2030 2031
    {
      sql_print_error("Failed on request_dump()");
unknown's avatar
unknown committed
2032
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2033
      {
unknown's avatar
unknown committed
2034
        sql_print_information("Slave I/O thread killed while requesting master \
unknown's avatar
unknown committed
2035
dump");
unknown's avatar
unknown committed
2036
        goto err;
unknown's avatar
unknown committed
2037
      }
2038

2039
      mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
2040
      thd->proc_info= "Waiting to reconnect after a failed binlog dump request";
2041 2042 2043
#ifdef SIGNAL_WITH_VIO_CLOSE
      thd->clear_active_vio();
#endif
unknown's avatar
SCRUM  
unknown committed
2044
      end_server(mysql);
unknown's avatar
unknown committed
2045
      /*
unknown's avatar
unknown committed
2046 2047 2048
        First time retry immediately, assuming that we can recover
        right away - if first time fails, sleep between re-tries
        hopefuly the admin can fix the problem sometime
unknown's avatar
unknown committed
2049
      */
2050 2051
      if (retry_count++)
      {
unknown's avatar
unknown committed
2052 2053 2054 2055
        if (retry_count > master_retry_count)
          goto err;                             // Don't retry forever
        safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
                   (void*)mi);
2056
      }
2057
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2058
      {
unknown's avatar
unknown committed
2059
        sql_print_information("Slave I/O thread killed while retrying master \
unknown's avatar
unknown committed
2060
dump");
unknown's avatar
unknown committed
2061
        goto err;
unknown's avatar
unknown committed
2062
      }
unknown's avatar
unknown committed
2063

2064
      thd->proc_info = "Reconnecting after a failed binlog dump request";
unknown's avatar
unknown committed
2065
      if (!suppress_warnings)
unknown's avatar
unknown committed
2066
        sql_print_error("Slave I/O thread: failed dump request, \
2067
reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
2068
                        llstr(mi->master_log_pos,llbuff));
unknown's avatar
unknown committed
2069
      if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
unknown's avatar
unknown committed
2070
          io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2071
      {
unknown's avatar
unknown committed
2072
        sql_print_information("Slave I/O thread killed during or \
2073
after reconnect");
unknown's avatar
unknown committed
2074
        goto err;
unknown's avatar
unknown committed
2075
      }
unknown's avatar
unknown committed
2076

unknown's avatar
unknown committed
2077 2078
      goto connected;
    }
unknown's avatar
unknown committed
2079

2080
    while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2081
    {
2082 2083
      ulong event_len;
      suppress_warnings= 0;
2084
      /*
2085
         We say "waiting" because read_event() will wait if there's nothing to
2086 2087 2088
         read. But if there's something to read, it will not wait. The
         important thing is to not confuse users by saying "reading" whereas
         we're in fact receiving nothing.
2089
      */
2090 2091
      thd->proc_info= "Waiting for master to send event";
      event_len= read_event(mysql, mi, &suppress_warnings);
2092
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2093
      {
unknown's avatar
unknown committed
2094 2095 2096
        if (global_system_variables.log_warnings)
          sql_print_information("Slave I/O thread killed while reading event");
        goto err;
unknown's avatar
unknown committed
2097
      }
2098

unknown's avatar
unknown committed
2099 2100
      if (event_len == packet_error)
      {
unknown's avatar
unknown committed
2101
        uint mysql_error_number= mysql_errno(mysql);
2102
        if (mysql_error_number == CR_NET_PACKET_TOO_LARGE)
unknown's avatar
unknown committed
2103 2104
        {
          sql_print_error("\
2105 2106 2107
Log entry on master is longer than max_allowed_packet (%ld) on \
slave. If the entry is correct, restart the server with a higher value of \
max_allowed_packet",
unknown's avatar
unknown committed
2108 2109 2110 2111 2112 2113 2114 2115 2116
                          thd->variables.max_allowed_packet);
          goto err;
        }
        if (mysql_error_number == ER_MASTER_FATAL_ERROR_READING_BINLOG)
        {
          sql_print_error(ER(mysql_error_number), mysql_error_number,
                          mysql_error(mysql));
          goto err;
        }
2117
        mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
unknown's avatar
unknown committed
2118
        thd->proc_info = "Waiting to reconnect after a failed master event read";
2119 2120 2121
#ifdef SIGNAL_WITH_VIO_CLOSE
        thd->clear_active_vio();
#endif
unknown's avatar
unknown committed
2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133
        end_server(mysql);
        if (retry_count++)
        {
          if (retry_count > master_retry_count)
            goto err;                           // Don't retry forever
          safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
                     (void*) mi);
        }
        if (io_slave_killed(thd,mi))
        {
          if (global_system_variables.log_warnings)
            sql_print_information("Slave I/O thread killed while waiting to \
unknown's avatar
unknown committed
2134
reconnect after a failed read");
unknown's avatar
unknown committed
2135 2136 2137 2138 2139
          goto err;
        }
        thd->proc_info = "Reconnecting after a failed master event read";
        if (!suppress_warnings)
          sql_print_information("Slave I/O thread: Failed reading log event, \
2140
reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
2141 2142 2143 2144 2145 2146
                          llstr(mi->master_log_pos, llbuff));
        if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
            io_slave_killed(thd,mi))
        {
          if (global_system_variables.log_warnings)
            sql_print_information("Slave I/O thread killed during or after a \
unknown's avatar
unknown committed
2147
reconnect done to recover from failed read");
unknown's avatar
unknown committed
2148 2149 2150
          goto err;
        }
        goto connected;
unknown's avatar
unknown committed
2151
      } // if (event_len == packet_error)
2152

unknown's avatar
unknown committed
2153
      retry_count=0;                    // ok event, reset retry counter
2154
      thd->proc_info = "Queueing master event to the relay log";
unknown's avatar
unknown committed
2155
      if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
unknown's avatar
unknown committed
2156
                      event_len))
unknown's avatar
unknown committed
2157
      {
unknown's avatar
unknown committed
2158 2159
        sql_print_error("Slave I/O thread could not queue event from master");
        goto err;
unknown's avatar
unknown committed
2160
      }
2161 2162 2163 2164 2165
      if (flush_master_info(mi, 1))
      {
        sql_print_error("Failed to flush master info file");
        goto err;
      }
2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177
      /*
        See if the relay logs take too much space.
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
        and does not introduce any problem:
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
        the clean value is 0), then we are reading only one more event as we
        should, and we'll block only at the next event. No big deal.
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
        the clean value is 1), then we are going into wait_for_relay_log_space()
        for no reason, but this function will do a clean read, notice the clean
        value and exit immediately.
      */
2178 2179 2180 2181 2182
#ifndef DBUG_OFF
      {
        char llbuf1[22], llbuf2[22];
        DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \
ignore_log_space_limit=%d",
2183 2184
                            llstr(rli->log_space_limit,llbuf1),
                            llstr(rli->log_space_total,llbuf2),
unknown's avatar
unknown committed
2185
                            (int) rli->ignore_log_space_limit));
2186 2187 2188
      }
#endif

2189
      if (rli->log_space_limit && rli->log_space_limit <
unknown's avatar
unknown committed
2190
          rli->log_space_total &&
2191
          !rli->ignore_log_space_limit)
unknown's avatar
unknown committed
2192 2193 2194
        if (wait_for_relay_log_space(rli))
        {
          sql_print_error("Slave I/O thread aborted while waiting for relay \
unknown's avatar
unknown committed
2195
log space");
unknown's avatar
unknown committed
2196 2197 2198
          goto err;
        }
    }
2199
  }
unknown's avatar
unknown committed
2200

unknown's avatar
unknown committed
2201
  // error = 0;
unknown's avatar
unknown committed
2202
err:
2203
  // print the current replication position
2204
  sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
unknown's avatar
unknown committed
2205
                  IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2206
  VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
2207
  thd->query = thd->db = 0; // extra safety
2208
  thd->query_length= thd->db_length= 0;
2209
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
2210 2211
  if (mysql)
  {
2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222
    /*
      Here we need to clear the active VIO before closing the
      connection with the master.  The reason is that THD::awake()
      might be called from terminate_slave_thread() because somebody
      issued a STOP SLAVE.  If that happends, the close_active_vio()
      can be called in the middle of closing the VIO associated with
      the 'mysql' object, causing a crash.
    */
#ifdef SIGNAL_WITH_VIO_CLOSE
    thd->clear_active_vio();
#endif
unknown's avatar
SCRUM  
unknown committed
2223
    mysql_close(mysql);
unknown's avatar
unknown committed
2224 2225
    mi->mysql=0;
  }
2226
  write_ignored_events_info_to_relay_log(thd, mi);
unknown's avatar
unknown committed
2227
  thd->proc_info = "Waiting for slave mutex on exit";
2228
  pthread_mutex_lock(&mi->run_lock);
2229

2230 2231 2232
  /* Forget the relay log's format */
  delete mi->rli.relay_log.description_event_for_queue;
  mi->rli.relay_log.description_event_for_queue= 0;
2233
  // TODO: make rpl_status part of MASTER_INFO
2234
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2235
  DBUG_ASSERT(thd->net.buff != 0);
unknown's avatar
unknown committed
2236
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
unknown's avatar
unknown committed
2237
  close_thread_tables(thd, 0);
2238
  pthread_mutex_lock(&LOCK_thread_count);
2239
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
2240
  delete thd;
2241
  pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
2242 2243 2244
  mi->abort_slave= 0;
  mi->slave_running= 0;
  mi->io_thd= 0;
2245 2246 2247 2248 2249
  /*
    Note: the order of the two following calls (first broadcast, then unlock)
    is important. Otherwise a killer_thread can execute between the calls and
    delete the mi structure leading to a crash! (see BUG#25306 for details)
   */ 
2250
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
2251
  pthread_mutex_unlock(&mi->run_lock);
2252
  my_thread_end();
unknown's avatar
unknown committed
2253
  pthread_exit(0);
unknown's avatar
unknown committed
2254
  DBUG_RETURN(0);                               // Can't return anything here
unknown's avatar
unknown committed
2255 2256
}

unknown's avatar
unknown committed
2257

unknown's avatar
unknown committed
2258
/* Slave SQL Thread entry point */
unknown's avatar
unknown committed
2259

2260
pthread_handler_t handle_slave_sql(void *arg)
2261
{
unknown's avatar
unknown committed
2262
  THD *thd;                     /* needs to be first for thread_stack */
2263
  char llbuff[22],llbuff1[22];
2264

2265
  RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli;
unknown's avatar
unknown committed
2266 2267 2268 2269
  const char *errmsg;

  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
2270
  DBUG_ENTER("handle_slave_sql");
unknown's avatar
unknown committed
2271

2272 2273 2274
  DBUG_ASSERT(rli->inited);
  pthread_mutex_lock(&rli->run_lock);
  DBUG_ASSERT(!rli->slave_running);
unknown's avatar
unknown committed
2275
  errmsg= 0;
unknown's avatar
unknown committed
2276
#ifndef DBUG_OFF
2277
  rli->events_till_abort = abort_slave_event_count;
unknown's avatar
unknown committed
2278
#endif
2279

unknown's avatar
unknown committed
2280
  thd = new THD; // note that contructor of THD uses DBUG_ !
2281
  thd->thread_stack = (char*)&thd; // remember where our stack is
unknown's avatar
unknown committed
2282

unknown's avatar
unknown committed
2283 2284 2285
  /* Inform waiting threads that slave has started */
  rli->slave_run_id++;

2286 2287
  pthread_detach_this_thread();
  if (init_slave_thread(thd, SLAVE_THD_SQL))
unknown's avatar
unknown committed
2288 2289 2290 2291 2292 2293 2294 2295 2296 2297
  {
    /*
      TODO: this is currently broken - slave start and change master
      will be stuck if we fail here
    */
    pthread_cond_broadcast(&rli->start_cond);
    pthread_mutex_unlock(&rli->run_lock);
    sql_print_error("Failed during slave thread initialization");
    goto err;
  }
2298
  thd->init_for_queries();
2299
  rli->sql_thd= thd;
2300
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2301
  pthread_mutex_lock(&LOCK_thread_count);
2302
  threads.append(thd);
2303
  pthread_mutex_unlock(&LOCK_thread_count);
2304 2305 2306 2307 2308 2309 2310 2311
  /*
    We are going to set slave_running to 1. Assuming slave I/O thread is
    alive and connected, this is going to make Seconds_Behind_Master be 0
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
    the moment we start we can think we are caught up, and the next second we
    start receiving data so we realize we are not caught up and
    Seconds_Behind_Master grows. No big deal.
  */
2312 2313 2314
  rli->slave_running = 1;
  rli->abort_slave = 0;
  pthread_mutex_unlock(&rli->run_lock);
2315
  pthread_cond_broadcast(&rli->start_cond);
2316

unknown's avatar
unknown committed
2317 2318 2319
  /*
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
    thread may execute no Query_log_event, so the error will remain even
unknown's avatar
unknown committed
2320
    though there's no problem anymore). Do not reset the master timestamp
2321 2322 2323 2324
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
    as we are not sure that we are going to receive a query, we want to
    remember the last master timestamp (to say how many seconds behind we are
    now.
unknown's avatar
unknown committed
2325
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
unknown's avatar
unknown committed
2326
  */
2327
  rli->clear_slave_error();
2328 2329

  //tell the I/O thread to take relay_log_space_limit into account from now on
2330
  pthread_mutex_lock(&rli->log_space_lock);
2331
  rli->ignore_log_space_limit= 0;
2332
  pthread_mutex_unlock(&rli->log_space_lock);
2333
  rli->trans_retries= 0; // start from "no error"
2334

2335
  if (init_relay_log_pos(rli,
unknown's avatar
unknown committed
2336 2337 2338
                         rli->group_relay_log_name,
                         rli->group_relay_log_pos,
                         1 /*need data lock*/, &errmsg,
2339
                         1 /*look for a description_event*/))
2340 2341
  {
    sql_print_error("Error initializing relay log position: %s",
unknown's avatar
unknown committed
2342
                    errmsg);
2343 2344
    goto err;
  }
2345
  THD_CHECK_SENTRY(thd);
2346 2347 2348 2349
#ifndef DBUG_OFF
  {
    char llbuf1[22], llbuf2[22];
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
unknown's avatar
unknown committed
2350
                        llstr(my_b_tell(rli->cur_log),llbuf1),
2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369
                        llstr(rli->event_relay_log_pos,llbuf2)));
    DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
    /*
      Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
      correct position when it's called just after my_b_seek() (the questionable
      stuff is those "seek is done on next read" comments in the my_b_seek()
      source code).
      The crude reality is that this assertion randomly fails whereas
      replication seems to work fine. And there is no easy explanation why it
      fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
      init_relay_log_pos() called above). Maybe the assertion would be
      meaningful if we held rli->data_lock between the my_b_seek() and the
      DBUG_ASSERT().
    */
#ifdef SHOULD_BE_CHECKED
    DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
#endif
  }
#endif
2370
  DBUG_ASSERT(rli->sql_thd == thd);
2371 2372

  DBUG_PRINT("master_info",("log_file_name: %s  position: %s",
unknown's avatar
unknown committed
2373 2374
                            rli->group_master_log_name,
                            llstr(rli->group_master_log_pos,llbuff)));
2375
  if (global_system_variables.log_warnings)
2376
    sql_print_information("Slave SQL thread initialized, starting replication in \
unknown's avatar
unknown committed
2377
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
unknown's avatar
unknown committed
2378 2379
                    llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
                    llstr(rli->group_relay_log_pos,llbuff1));
2380

unknown's avatar
unknown committed
2381
  /* execute init_slave variable */
unknown's avatar
unknown committed
2382
  if (sys_init_slave.value_length)
unknown's avatar
unknown committed
2383
  {
2384
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
unknown's avatar
unknown committed
2385 2386 2387 2388 2389 2390 2391 2392
    if (thd->query_error)
    {
      sql_print_error("\
Slave SQL thread aborted. Can't execute init_slave query");
      goto err;
    }
  }

2393 2394
  /* Read queries from the IO/THREAD until this thread is killed */

2395
  while (!sql_slave_killed(thd,rli))
2396
  {
2397
    thd->proc_info = "Reading event from the relay log";
2398
    DBUG_ASSERT(rli->sql_thd == thd);
2399
    THD_CHECK_SENTRY(thd);
2400 2401
    if (exec_relay_log_event(thd,rli))
    {
2402
      DBUG_PRINT("info", ("exec_relay_log_event() failed"));
2403
      // do not scare the user if SQL thread was simply killed or stopped
2404
      if (!sql_slave_killed(thd,rli))
unknown's avatar
unknown committed
2405 2406
      {
        /*
2407 2408 2409
          retrieve as much info as possible from the thd and, error
          codes and warnings and print this to the error log as to
          allow the user to locate the error
unknown's avatar
unknown committed
2410
        */
2411 2412
        DBUG_PRINT("info", ("thd->net.last_errno=%d; rli->last_slave_errno=%d",
                            thd->net.last_errno, rli->last_slave_errno));
unknown's avatar
unknown committed
2413 2414 2415 2416 2417 2418 2419 2420
        if (thd->net.last_errno != 0)
        {
          if (rli->last_slave_errno == 0)
          {
            slave_print_msg(ERROR_LEVEL, rli, thd->net.last_errno,
                            thd->net.last_error ?
                            thd->net.last_error : "<no message>");
          }
2421
          else if (rli->last_slave_errno != (int) thd->net.last_errno)
unknown's avatar
unknown committed
2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432
          {
            sql_print_error("Slave (additional info): %s Error_code: %d",
                            thd->net.last_error ?
                            thd->net.last_error : "<no message>",
                            thd->net.last_errno);
          }
        }

        /* Print any warnings issued */
        List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
        MYSQL_ERROR *err;
2433 2434 2435 2436 2437
        /*
          Added controlled slave thread cancel for replication
          of user-defined variables.
        */
        bool udf_error = false;
unknown's avatar
unknown committed
2438
        while ((err= it++))
2439 2440 2441
        {
          if (err->code == ER_CANT_OPEN_LIBRARY)
            udf_error = true;
unknown's avatar
unknown committed
2442
          sql_print_warning("Slave: %s Error_code: %d",err->msg, err->code);
2443 2444 2445 2446 2447 2448 2449 2450 2451
        }
        if (udf_error)
          sql_print_error("Error loading user-defined library, slave SQL "
            "thread aborted. Install the missing library, and restart the "
            "slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
            "position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, 
            llbuff));
        else
          sql_print_error("\
2452
Error running query, slave SQL thread aborted. Fix the problem, and restart \
unknown's avatar
unknown committed
2453
the slave SQL thread with \"SLAVE START\". We stopped at log \
2454
'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
unknown's avatar
unknown committed
2455
      }
2456 2457
      goto err;
    }
2458
  }
2459

2460
  /* Thread stopped. Print the current replication position to the log */
unknown's avatar
unknown committed
2461
  sql_print_information("Slave SQL thread exiting, replication stopped in log "
unknown's avatar
unknown committed
2462 2463
                        "'%s' at position %s",
                        RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
2464 2465

 err:
2466 2467 2468 2469 2470 2471 2472 2473

  /*
    Some events set some playgrounds, which won't be cleared because thread
    stops. Stopping of this thread may not be known to these events ("stop"
    request is detected only by the present function, not by events), so we
    must "proactively" clear playgrounds:
  */
  rli->cleanup_context(thd, 1);
2474
  VOID(pthread_mutex_lock(&LOCK_thread_count));
2475 2476 2477 2478 2479
  /*
    Some extra safety, which should not been needed (normally, event deletion
    should already have done these assignments (each event which sets these
    variables is supposed to set them to 0 before terminating)).
  */
unknown's avatar
unknown committed
2480
  thd->query= thd->db= thd->catalog= 0;
2481
  thd->query_length= thd->db_length= 0;
2482
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
2483 2484
  thd->proc_info = "Waiting for slave mutex on exit";
  pthread_mutex_lock(&rli->run_lock);
2485 2486
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
  pthread_mutex_lock(&rli->data_lock);
2487
  DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
2488
  /* When master_pos_wait() wakes up it will check this and terminate */
unknown's avatar
unknown committed
2489
  rli->slave_running= 0;
2490 2491 2492
  /* Forget the relay log's format */
  delete rli->relay_log.description_event_for_exec;
  rli->relay_log.description_event_for_exec= 0;
2493 2494 2495 2496
  /* Wake up master_pos_wait() */
  pthread_mutex_unlock(&rli->data_lock);
  DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
  pthread_cond_broadcast(&rli->data_cond);
unknown's avatar
unknown committed
2497
  rli->ignore_log_space_limit= 0; /* don't need any lock */
2498 2499
  /* we die so won't remember charset - re-update them on next thread start */
  rli->cached_charset_invalidate();
2500
  rli->save_temporary_tables = thd->temporary_tables;
unknown's avatar
unknown committed
2501 2502 2503 2504 2505

  /*
    TODO: see if we can do this conditionally in next_event() instead
    to avoid unneeded position re-init
  */
2506 2507 2508 2509
  thd->temporary_tables = 0; // remove tempation from destructor to close them
  DBUG_ASSERT(thd->net.buff != 0);
  net_end(&thd->net); // destructor will not free it, because we are weird
  DBUG_ASSERT(rli->sql_thd == thd);
2510
  THD_CHECK_SENTRY(thd);
2511
  rli->sql_thd= 0;
2512
  pthread_mutex_lock(&LOCK_thread_count);
2513
  THD_CHECK_SENTRY(thd);
2514 2515
  delete thd;
  pthread_mutex_unlock(&LOCK_thread_count);
2516 2517 2518 2519 2520
 /*
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
  is important. Otherwise a killer_thread can execute between the calls and
  delete the mi structure leading to a crash! (see BUG#25306 for details)
 */ 
2521
  pthread_cond_broadcast(&rli->stop_cond);
2522 2523
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
  
unknown's avatar
unknown committed
2524
  my_thread_end();
2525
  pthread_exit(0);
unknown's avatar
unknown committed
2526
  DBUG_RETURN(0);                               // Can't return anything here
2527
}
unknown's avatar
unknown committed
2528

2529

unknown's avatar
unknown committed
2530
/*
2531
  process_io_create_file()
unknown's avatar
unknown committed
2532
*/
2533

unknown's avatar
unknown committed
2534 2535 2536 2537 2538
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
  int error = 1;
  ulong num_bytes;
  bool cev_not_written;
2539 2540
  THD *thd = mi->io_thd;
  NET *net = &mi->mysql->net;
unknown's avatar
unknown committed
2541
  DBUG_ENTER("process_io_create_file");
unknown's avatar
unknown committed
2542 2543

  if (unlikely(!cev->is_valid()))
unknown's avatar
unknown committed
2544
    DBUG_RETURN(1);
2545 2546

  if (!rpl_filter->db_ok(cev->db))
unknown's avatar
unknown committed
2547 2548
  {
    skip_load_data_infile(net);
unknown's avatar
unknown committed
2549
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2550 2551 2552
  }
  DBUG_ASSERT(cev->inited_from_old);
  thd->file_id = cev->file_id = mi->file_id++;
2553
  thd->server_id = cev->server_id;
unknown's avatar
unknown committed
2554
  cev_not_written = 1;
unknown's avatar
unknown committed
2555

unknown's avatar
unknown committed
2556 2557 2558
  if (unlikely(net_request_file(net,cev->fname)))
  {
    sql_print_error("Slave I/O: failed requesting download of '%s'",
unknown's avatar
unknown committed
2559
                    cev->fname);
unknown's avatar
unknown committed
2560 2561 2562
    goto err;
  }

unknown's avatar
unknown committed
2563 2564 2565 2566
  /*
    This dummy block is so we could instantiate Append_block_log_event
    once and then modify it slightly instead of doing it multiple times
    in the loop
unknown's avatar
unknown committed
2567 2568
  */
  {
2569
    Append_block_log_event aev(thd,0,0,0,0);
unknown's avatar
unknown committed
2570

unknown's avatar
unknown committed
2571 2572 2573 2574
    for (;;)
    {
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
      {
unknown's avatar
unknown committed
2575 2576 2577
        sql_print_error("Network read error downloading '%s' from master",
                        cev->fname);
        goto err;
unknown's avatar
unknown committed
2578 2579 2580
      }
      if (unlikely(!num_bytes)) /* eof */
      {
unknown's avatar
unknown committed
2581
        net_write_command(net, 0, "", 0, "", 0);/* 3.23 master wants it */
unknown's avatar
unknown committed
2582 2583 2584 2585 2586 2587 2588
        /*
          If we wrote Create_file_log_event, then we need to write
          Execute_load_log_event. If we did not write Create_file_log_event,
          then this is an empty file and we can just do as if the LOAD DATA
          INFILE had not existed, i.e. write nothing.
        */
        if (unlikely(cev_not_written))
unknown's avatar
unknown committed
2589 2590 2591 2592 2593 2594
          break;
        Execute_load_log_event xev(thd,0,0);
        xev.log_pos = cev->log_pos;
        if (unlikely(mi->rli.relay_log.append(&xev)))
        {
          sql_print_error("Slave I/O: error writing Exec_load event to \
unknown's avatar
unknown committed
2595
relay log");
unknown's avatar
unknown committed
2596 2597 2598 2599
          goto err;
        }
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
        break;
unknown's avatar
unknown committed
2600 2601 2602
      }
      if (unlikely(cev_not_written))
      {
unknown's avatar
unknown committed
2603 2604 2605 2606 2607
        cev->block = (char*)net->read_pos;
        cev->block_len = num_bytes;
        if (unlikely(mi->rli.relay_log.append(cev)))
        {
          sql_print_error("Slave I/O: error writing Create_file event to \
unknown's avatar
unknown committed
2608
relay log");
unknown's avatar
unknown committed
2609 2610 2611 2612
          goto err;
        }
        cev_not_written=0;
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
2613 2614 2615
      }
      else
      {
unknown's avatar
unknown committed
2616 2617 2618 2619 2620 2621
        aev.block = (char*)net->read_pos;
        aev.block_len = num_bytes;
        aev.log_pos = cev->log_pos;
        if (unlikely(mi->rli.relay_log.append(&aev)))
        {
          sql_print_error("Slave I/O: error writing Append_block event to \
unknown's avatar
unknown committed
2622
relay log");
unknown's avatar
unknown committed
2623 2624 2625
          goto err;
        }
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
unknown's avatar
unknown committed
2626 2627 2628 2629 2630
      }
    }
  }
  error=0;
err:
unknown's avatar
unknown committed
2631
  DBUG_RETURN(error);
unknown's avatar
unknown committed
2632
}
unknown's avatar
unknown committed
2633

2634

unknown's avatar
unknown committed
2635
/*
unknown's avatar
unknown committed
2636 2637 2638 2639
  Start using a new binary log on the master

  SYNOPSIS
    process_io_rotate()
unknown's avatar
unknown committed
2640 2641
    mi                  master_info for the slave
    rev                 The rotate log event read from the binary log
unknown's avatar
unknown committed
2642 2643

  DESCRIPTION
2644
    Updates the master info with the place in the next binary
unknown's avatar
unknown committed
2645
    log where we should start reading.
2646
    Rotate the relay log to avoid mixed-format relay logs.
unknown's avatar
unknown committed
2647 2648 2649 2650 2651

  NOTES
    We assume we already locked mi->data_lock

  RETURN VALUES
unknown's avatar
unknown committed
2652 2653
    0           ok
    1           Log event is illegal
unknown's avatar
unknown committed
2654 2655 2656

*/

unknown's avatar
unknown committed
2657
static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
2658
{
2659
  DBUG_ENTER("process_io_rotate");
unknown's avatar
unknown committed
2660
  safe_mutex_assert_owner(&mi->data_lock);
2661

unknown's avatar
unknown committed
2662
  if (unlikely(!rev->is_valid()))
2663
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2664

2665
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
unknown's avatar
unknown committed
2666 2667
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
  mi->master_log_pos= rev->pos;
unknown's avatar
unknown committed
2668
  DBUG_PRINT("info", ("master_log_pos: '%s' %lu",
unknown's avatar
unknown committed
2669
                      mi->master_log_name, (ulong) mi->master_log_pos));
2670
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2671 2672 2673 2674 2675
  /*
    If we do not do this, we will be getting the first
    rotate event forever, so we need to not disconnect after one.
  */
  if (disconnect_slave_event_count)
2676
    mi->events_till_disconnect++;
2677
#endif
2678

2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692
  /*
    If description_event_for_queue is format <4, there is conversion in the
    relay log to the slave's format (4). And Rotate can mean upgrade or
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
    no need to reset description_event_for_queue now. And if it's nothing (same
    master version as before), no need (still using the slave's format).
  */
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
  {
    delete mi->rli.relay_log.description_event_for_queue;
    /* start from format 3 (MySQL 4.0) again */
    mi->rli.relay_log.description_event_for_queue= new
      Format_description_log_event(3);
  }
2693 2694 2695 2696
  /*
    Rotate the relay log makes binlog format detection easier (at next slave
    start or mysqlbinlog)
  */
2697
  rotate_relay_log(mi); /* will take the right mutexes */
2698
  DBUG_RETURN(0);
2699 2700
}

unknown's avatar
unknown committed
2701
/*
2702 2703
  Reads a 3.23 event and converts it to the slave's format. This code was
  copied from MySQL 4.0.
unknown's avatar
unknown committed
2704
*/
2705
static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf,
unknown's avatar
unknown committed
2706
                           ulong event_len)
2707
{
unknown's avatar
unknown committed
2708
  const char *errmsg = 0;
unknown's avatar
unknown committed
2709 2710 2711 2712
  ulong inc_pos;
  bool ignore_event= 0;
  char *tmp_buf = 0;
  RELAY_LOG_INFO *rli= &mi->rli;
2713
  DBUG_ENTER("queue_binlog_ver_1_event");
unknown's avatar
unknown committed
2714

unknown's avatar
unknown committed
2715 2716 2717
  /*
    If we get Load event, we need to pass a non-reusable buffer
    to read_log_event, so we do a trick
unknown's avatar
unknown committed
2718 2719 2720 2721 2722 2723
  */
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
  {
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
    {
      sql_print_error("Slave I/O: out of memory for Load event");
unknown's avatar
unknown committed
2724
      DBUG_RETURN(1);
unknown's avatar
unknown committed
2725 2726
    }
    memcpy(tmp_buf,buf,event_len);
2727 2728 2729 2730 2731 2732 2733 2734
    /*
      Create_file constructor wants a 0 as last char of buffer, this 0 will
      serve as the string-termination char for the file's name (which is at the
      end of the buffer)
      We must increment event_len, otherwise the event constructor will not see
      this end 0, which leads to segfault.
    */
    tmp_buf[event_len++]=0;
unknown's avatar
unknown committed
2735
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
unknown's avatar
unknown committed
2736 2737
    buf = (const char*)tmp_buf;
  }
2738 2739 2740 2741 2742 2743
  /*
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
    send the loaded file, and write it to the relay log in the form of
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
    connected to the master).
  */
unknown's avatar
unknown committed
2744
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2745
                                            mi->rli.relay_log.description_event_for_queue);
2746
  if (unlikely(!ev))
2747 2748
  {
    sql_print_error("Read invalid event from master: '%s',\
unknown's avatar
unknown committed
2749
 master could be corrupt but a more likely cause of this is a bug",
unknown's avatar
unknown committed
2750
                    errmsg);
unknown's avatar
unknown committed
2751 2752
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(1);
2753
  }
2754

2755
  pthread_mutex_lock(&mi->data_lock);
2756
  ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
unknown's avatar
unknown committed
2757
  switch (ev->get_type_code()) {
unknown's avatar
unknown committed
2758
  case STOP_EVENT:
2759
    ignore_event= 1;
unknown's avatar
unknown committed
2760 2761
    inc_pos= event_len;
    break;
2762
  case ROTATE_EVENT:
2763
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2764 2765
    {
      delete ev;
2766
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2767
      DBUG_RETURN(1);
2768
    }
unknown's avatar
unknown committed
2769
    inc_pos= 0;
2770
    break;
unknown's avatar
unknown committed
2771
  case CREATE_FILE_EVENT:
2772 2773 2774 2775 2776 2777
    /*
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
      queue_old_event() which is for 3.23 events which don't comprise
      CREATE_FILE_EVENT. This is because read_log_event() above has just
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
    */
unknown's avatar
unknown committed
2778
  {
unknown's avatar
unknown committed
2779
    /* We come here when and only when tmp_buf != 0 */
2780
    DBUG_ASSERT(tmp_buf != 0);
2781 2782
    inc_pos=event_len;
    ev->log_pos+= inc_pos;
unknown's avatar
unknown committed
2783
    int error = process_io_create_file(mi,(Create_file_log_event*)ev);
2784
    delete ev;
2785
    mi->master_log_pos += inc_pos;
unknown's avatar
unknown committed
2786
    DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
2787
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2788
    my_free((char*)tmp_buf, MYF(0));
unknown's avatar
unknown committed
2789
    DBUG_RETURN(error);
unknown's avatar
unknown committed
2790
  }
2791
  default:
unknown's avatar
unknown committed
2792
    inc_pos= event_len;
2793 2794
    break;
  }
unknown's avatar
unknown committed
2795
  if (likely(!ignore_event))
2796
  {
unknown's avatar
unknown committed
2797 2798
    if (ev->log_pos)
      /*
2799 2800 2801 2802
         Don't do it for fake Rotate events (see comment in
      Log_event::Log_event(const char* buf...) in log_event.cc).
      */
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
unknown's avatar
unknown committed
2803
    if (unlikely(rli->relay_log.append(ev)))
2804 2805 2806
    {
      delete ev;
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2807
      DBUG_RETURN(1);
2808
    }
unknown's avatar
unknown committed
2809
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2810 2811
  }
  delete ev;
unknown's avatar
unknown committed
2812
  mi->master_log_pos+= inc_pos;
unknown's avatar
unknown committed
2813
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
2814
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2815
  DBUG_RETURN(0);
2816 2817
}

2818 2819 2820 2821 2822
/*
  Reads a 4.0 event and converts it to the slave's format. This code was copied
  from queue_binlog_ver_1_event(), with some affordable simplifications.
*/
static int queue_binlog_ver_3_event(MASTER_INFO *mi, const char *buf,
unknown's avatar
unknown committed
2823
                           ulong event_len)
2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837
{
  const char *errmsg = 0;
  ulong inc_pos;
  char *tmp_buf = 0;
  RELAY_LOG_INFO *rli= &mi->rli;
  DBUG_ENTER("queue_binlog_ver_3_event");

  /* read_log_event() will adjust log_pos to be end_log_pos */
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
                                            mi->rli.relay_log.description_event_for_queue);
  if (unlikely(!ev))
  {
    sql_print_error("Read invalid event from master: '%s',\
 master could be corrupt but a more likely cause of this is a bug",
unknown's avatar
unknown committed
2838
                    errmsg);
2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(1);
  }
  pthread_mutex_lock(&mi->data_lock);
  switch (ev->get_type_code()) {
  case STOP_EVENT:
    goto err;
  case ROTATE_EVENT:
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
    {
      delete ev;
      pthread_mutex_unlock(&mi->data_lock);
      DBUG_RETURN(1);
    }
    inc_pos= 0;
    break;
  default:
    inc_pos= event_len;
    break;
  }
  if (unlikely(rli->relay_log.append(ev)))
  {
    delete ev;
    pthread_mutex_unlock(&mi->data_lock);
    DBUG_RETURN(1);
  }
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
  delete ev;
  mi->master_log_pos+= inc_pos;
err:
unknown's avatar
unknown committed
2869
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880
  pthread_mutex_unlock(&mi->data_lock);
  DBUG_RETURN(0);
}

/*
  queue_old_event()

  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
  the 3.23/4.0 bytes, then write this event to the relay log.

unknown's avatar
unknown committed
2881
  TODO:
2882 2883 2884 2885 2886
    Test this code before release - it has to be tested on a separate
    setup with 3.23 master or 4.0 master
*/

static int queue_old_event(MASTER_INFO *mi, const char *buf,
unknown's avatar
unknown committed
2887
                           ulong event_len)
2888
{
2889 2890
  DBUG_ENTER("queue_old_event");

2891 2892 2893
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
  {
  case 1:
2894
      DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len));
2895
  case 3:
2896
      DBUG_RETURN(queue_binlog_ver_3_event(mi,buf,event_len));
2897 2898
  default: /* unsupported format; eg version 2 */
    DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()",
unknown's avatar
unknown committed
2899
                       mi->rli.relay_log.description_event_for_queue->binlog_version));
2900
    DBUG_RETURN(1);
2901 2902
  }
}
2903

unknown's avatar
unknown committed
2904
/*
2905 2906
  queue_event()

2907 2908 2909 2910 2911
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
  no format conversion, it's pure read/write of bytes.
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
  any >=5.0.0 format.
unknown's avatar
unknown committed
2912 2913 2914
*/

int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
2915
{
unknown's avatar
unknown committed
2916 2917 2918
  int error= 0;
  ulong inc_pos;
  RELAY_LOG_INFO *rli= &mi->rli;
2919
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
unknown's avatar
unknown committed
2920 2921
  DBUG_ENTER("queue_event");

unknown's avatar
unknown committed
2922 2923
  LINT_INIT(inc_pos);

2924 2925
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
unknown's avatar
unknown committed
2926
    DBUG_RETURN(queue_old_event(mi,buf,event_len));
2927

2928
  LINT_INIT(inc_pos);
2929
  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
2930

unknown's avatar
unknown committed
2931
  switch (buf[EVENT_TYPE_OFFSET]) {
2932
  case STOP_EVENT:
2933 2934
    /*
      We needn't write this event to the relay log. Indeed, it just indicates a
unknown's avatar
unknown committed
2935 2936 2937
      master server shutdown. The only thing this does is cleaning. But
      cleaning is already done on a per-master-thread basis (as the master
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2938
      prepared statements' deletion are TODO only when we binlog prep stmts).
unknown's avatar
unknown committed
2939

unknown's avatar
unknown committed
2940 2941 2942 2943
      We don't even increment mi->master_log_pos, because we may be just after
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
      event from the next binlog (unless the master is presently running
      without --log-bin).
2944 2945
    */
    goto err;
2946 2947
  case ROTATE_EVENT:
  {
unknown's avatar
unknown committed
2948
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2949
    if (unlikely(process_io_rotate(mi,&rev)))
unknown's avatar
unknown committed
2950
    {
2951 2952
      error= 1;
      goto err;
unknown's avatar
unknown committed
2953
    }
2954 2955 2956 2957
    /*
      Now the I/O thread has just changed its mi->master_log_name, so
      incrementing mi->master_log_pos is nonsense.
    */
unknown's avatar
unknown committed
2958
    inc_pos= 0;
2959 2960
    break;
  }
2961 2962 2963 2964 2965 2966 2967
  case FORMAT_DESCRIPTION_EVENT:
  {
    /*
      Create an event, and save it (when we rotate the relay log, we will have
      to write this event again).
    */
    /*
2968 2969 2970
      We are the only thread which reads/writes description_event_for_queue.
      The relay_log struct does not move (though some members of it can
      change), so we needn't any lock (no rli->data_lock, no log lock).
2971
    */
2972
    Format_description_log_event* tmp;
2973
    const char* errmsg;
2974
    if (!(tmp= (Format_description_log_event*)
2975
          Log_event::read_log_event(buf, event_len, &errmsg,
2976
                                    mi->rli.relay_log.description_event_for_queue)))
2977 2978 2979 2980
    {
      error= 2;
      goto err;
    }
2981 2982
    delete mi->rli.relay_log.description_event_for_queue;
    mi->rli.relay_log.description_event_for_queue= tmp;
unknown's avatar
unknown committed
2983
    /*
2984
       Though this does some conversion to the slave's format, this will
unknown's avatar
unknown committed
2985
       preserve the master's binlog format version, and number of event types.
2986
    */
unknown's avatar
unknown committed
2987
    /*
2988
       If the event was not requested by the slave (the slave did not ask for
unknown's avatar
unknown committed
2989
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2990 2991 2992
    */
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
    DBUG_PRINT("info",("binlog format is now %d",
unknown's avatar
unknown committed
2993
                       mi->rli.relay_log.description_event_for_queue->binlog_version));
2994 2995 2996

  }
  break;
2997
  default:
unknown's avatar
unknown committed
2998
    inc_pos= event_len;
2999 3000
    break;
  }
3001

unknown's avatar
unknown committed
3002 3003
  /*
     If this event is originating from this server, don't queue it.
3004
     We don't check this for 3.23 events because it's simpler like this; 3.23
unknown's avatar
unknown committed
3005 3006
     will be filtered anyway by the SQL slave thread which also tests the
     server id (we must also keep this test in the SQL thread, in case somebody
3007 3008 3009 3010 3011 3012 3013 3014
     upgrades a 4.0 slave which has a not-filtered relay log).

     ANY event coming from ourselves can be ignored: it is obvious for queries;
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
     (--log-slave-updates would not log that) unless this slave is also its
     direct master (an unsupported, useless setup!).
  */

3015 3016
  pthread_mutex_lock(log_lock);

unknown's avatar
unknown committed
3017
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
3018
      !mi->rli.replicate_same_server_id)
3019
  {
3020 3021
    /*
      Do not write it to the relay log.
3022 3023 3024 3025 3026 3027 3028 3029
      a) We still want to increment mi->master_log_pos, so that we won't
      re-read this event from the master if the slave IO thread is now
      stopped/restarted (more efficient if the events we are ignoring are big
      LOAD DATA INFILE).
      b) We want to record that we are skipping events, for the information of
      the slave SQL thread, otherwise that thread may let
      rli->group_relay_log_pos stay too small if the last binlog's event is
      ignored.
3030 3031 3032
      But events which were generated by this slave and which do not exist in
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
      mi->master_log_pos.
3033
    */
3034 3035 3036
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
unknown's avatar
unknown committed
3037
    {
3038
      mi->master_log_pos+= inc_pos;
unknown's avatar
unknown committed
3039 3040 3041 3042
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
      DBUG_ASSERT(rli->ign_master_log_name_end[0]);
      rli->ign_master_log_pos_end= mi->master_log_pos;
    }
3043
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
unknown's avatar
unknown committed
3044
    DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored",
unknown's avatar
unknown committed
3045
                        (ulong) mi->master_log_pos));
unknown's avatar
unknown committed
3046
  }
unknown's avatar
unknown committed
3047 3048 3049
  else
  {
    /* write the event to the relay log */
3050
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3051 3052
    {
      mi->master_log_pos+= inc_pos;
unknown's avatar
unknown committed
3053
      DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3054 3055
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
    }
3056
    else
3057
      error= 3;
3058
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
unknown's avatar
unknown committed
3059
  }
3060
  pthread_mutex_unlock(log_lock);
3061 3062 3063


err:
3064
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
3065
  DBUG_PRINT("info", ("error: %d", error));
unknown's avatar
unknown committed
3066
  DBUG_RETURN(error);
3067 3068
}

3069

3070 3071
void end_relay_log_info(RELAY_LOG_INFO* rli)
{
3072 3073
  DBUG_ENTER("end_relay_log_info");

3074
  if (!rli->inited)
3075
    DBUG_VOID_RETURN;
3076
  if (rli->info_fd >= 0)
unknown's avatar
unknown committed
3077 3078
  {
    end_io_cache(&rli->info_file);
3079
    (void) my_close(rli->info_fd, MYF(MY_WME));
unknown's avatar
unknown committed
3080 3081
    rli->info_fd = -1;
  }
3082
  if (rli->cur_log_fd >= 0)
unknown's avatar
unknown committed
3083 3084 3085 3086 3087
  {
    end_io_cache(&rli->cache_buf);
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
3088
  rli->inited = 0;
3089
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3090
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
unknown's avatar
unknown committed
3091 3092 3093 3094 3095 3096
  /*
    Delete the slave's temporary tables from memory.
    In the future there will be other actions than this, to ensure persistance
    of slave's temp tables after shutdown.
  */
  rli->close_temporary_tables();
3097
  DBUG_VOID_RETURN;
3098 3099
}

unknown's avatar
unknown committed
3100 3101
/*
  Try to connect until successful or slave killed
3102

unknown's avatar
unknown committed
3103 3104
  SYNPOSIS
    safe_connect()
unknown's avatar
unknown committed
3105 3106 3107
    thd                 Thread handler for slave
    mysql               MySQL connection handle
    mi                  Replication handle
3108

unknown's avatar
unknown committed
3109
  RETURN
unknown's avatar
unknown committed
3110 3111
    0   ok
    #   Error
unknown's avatar
unknown committed
3112
*/
3113

3114
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
unknown's avatar
unknown committed
3115
{
3116 3117 3118
  DBUG_ENTER("safe_connect");

  DBUG_RETURN(connect_to_master(thd, mysql, mi, 0, 0));
unknown's avatar
unknown committed
3119 3120
}

3121

3122
/*
unknown's avatar
unknown committed
3123 3124
  SYNPOSIS
    connect_to_master()
unknown's avatar
unknown committed
3125

unknown's avatar
unknown committed
3126 3127 3128
  IMPLEMENTATION
    Try to connect until successful or slave killed or we have retried
    master_retry_count times
3129
*/
unknown's avatar
unknown committed
3130

unknown's avatar
unknown committed
3131
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
3132
                             bool reconnect, bool suppress_warnings)
unknown's avatar
unknown committed
3133
{
3134
  int slave_was_killed;
unknown's avatar
unknown committed
3135
  int last_errno= -2;                           // impossible error
3136
  ulong err_count=0;
unknown's avatar
unknown committed
3137
  char llbuff[22];
3138
  DBUG_ENTER("connect_to_master");
unknown's avatar
unknown committed
3139

unknown's avatar
unknown committed
3140
#ifndef DBUG_OFF
3141
  mi->events_till_disconnect = disconnect_slave_event_count;
unknown's avatar
unknown committed
3142
#endif
3143
  ulong client_flag= CLIENT_REMEMBER_OPTIONS;
3144
  if (opt_slave_compressed_protocol)
unknown's avatar
unknown committed
3145
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
3146

3147 3148
  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
unknown's avatar
unknown committed
3149

unknown's avatar
unknown committed
3150 3151
#ifdef HAVE_OPENSSL
  if (mi->ssl)
3152
  {
unknown's avatar
unknown committed
3153
    mysql_ssl_set(mysql,
unknown's avatar
unknown committed
3154
                  mi->ssl_key[0]?mi->ssl_key:0,
unknown's avatar
unknown committed
3155
                  mi->ssl_cert[0]?mi->ssl_cert:0,
unknown's avatar
unknown committed
3156 3157 3158
                  mi->ssl_ca[0]?mi->ssl_ca:0,
                  mi->ssl_capath[0]?mi->ssl_capath:0,
                  mi->ssl_cipher[0]?mi->ssl_cipher:0);
3159 3160 3161
    mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
                  &mi->ssl_verify_server_cert);
  }
unknown's avatar
unknown committed
3162 3163
#endif

3164 3165 3166 3167
  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
  /* This one is not strictly needed but we have it here for completeness */
  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);

3168
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
unknown's avatar
unknown committed
3169 3170 3171
         (reconnect ? mysql_reconnect(mysql) != 0 :
          mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
                             mi->port, 0, client_flag) == 0))
unknown's avatar
unknown committed
3172
  {
3173
    /* Don't repeat last error */
unknown's avatar
SCRUM  
unknown committed
3174
    if ((int)mysql_errno(mysql) != last_errno)
3175
    {
unknown's avatar
SCRUM  
unknown committed
3176
      last_errno=mysql_errno(mysql);
unknown's avatar
unknown committed
3177
      suppress_warnings= 0;
unknown's avatar
unknown committed
3178 3179
      sql_print_error("Slave I/O thread: error %s to master "
                      "'%s@%s:%d':                       \
3180
Error: '%s'  errno: %d  retry-time: %d  retries: %lu",
unknown's avatar
unknown committed
3181
                      (reconnect ? "reconnecting" : "connecting"),
3182
                      mi->user, mi->host, mi->port,
unknown's avatar
unknown committed
3183 3184 3185
                      mysql_error(mysql), last_errno,
                      mi->connect_retry,
                      master_retry_count);
3186
    }
unknown's avatar
unknown committed
3187 3188 3189
    /*
      By default we try forever. The reason is that failure will trigger
      master election, so if the user did not set master_retry_count we
3190
      do not want to have election triggered on the first failure to
unknown's avatar
unknown committed
3191
      connect
3192
    */
3193
    if (++err_count == master_retry_count)
3194 3195
    {
      slave_was_killed=1;
unknown's avatar
unknown committed
3196 3197
      if (reconnect)
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3198 3199
      break;
    }
3200
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
unknown's avatar
unknown committed
3201
               (void*)mi);
unknown's avatar
unknown committed
3202
  }
3203

3204 3205
  if (!slave_was_killed)
  {
unknown's avatar
unknown committed
3206
    if (reconnect)
unknown's avatar
unknown committed
3207
    {
3208
      if (!suppress_warnings && global_system_variables.log_warnings)
unknown's avatar
unknown committed
3209
        sql_print_information("Slave: connected to master '%s@%s:%d',\
3210
replication resumed in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
3211 3212 3213
                        mi->host, mi->port,
                        IO_RPL_LOG_NAME,
                        llstr(mi->master_log_pos,llbuff));
unknown's avatar
unknown committed
3214
    }
unknown's avatar
unknown committed
3215 3216 3217
    else
    {
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
3218 3219
      general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
                        mi->user, mi->host, mi->port);
unknown's avatar
unknown committed
3220
    }
3221
#ifdef SIGNAL_WITH_VIO_CLOSE
3222
    thd->set_active_vio(mysql->net.vio);
unknown's avatar
unknown committed
3223
#endif
3224
  }
3225
  mysql->reconnect= 1;
3226 3227
  DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
  DBUG_RETURN(slave_was_killed);
unknown's avatar
unknown committed
3228 3229
}

3230

unknown's avatar
unknown committed
3231
/*
3232
  safe_reconnect()
unknown's avatar
unknown committed
3233

unknown's avatar
unknown committed
3234 3235 3236
  IMPLEMENTATION
    Try to connect until successful or slave killed or we have retried
    master_retry_count times
unknown's avatar
unknown committed
3237 3238
*/

unknown's avatar
unknown committed
3239
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
3240
                          bool suppress_warnings)
unknown's avatar
unknown committed
3241
{
unknown's avatar
unknown committed
3242 3243
  DBUG_ENTER("safe_reconnect");
  DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
unknown's avatar
unknown committed
3244 3245
}

unknown's avatar
unknown committed
3246

3247 3248 3249 3250 3251 3252
/*
  Store the file and position where the execute-slave thread are in the
  relay log.

  SYNOPSIS
    flush_relay_log_info()
unknown's avatar
unknown committed
3253
    rli                 Relay log information
3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271

  NOTES
    - As this is only called by the slave thread, we don't need to
      have a lock on this.
    - If there is an active transaction, then we don't update the position
      in the relay log.  This is to ensure that we re-execute statements
      if we die in the middle of an transaction that was rolled back.
    - As a transaction never spans binary logs, we don't have to handle the
      case where we do a relay-log-rotation in the middle of the transaction.
      If this would not be the case, we would have to ensure that we
      don't delete the relay log file where the transaction started when
      we switch to a new relay log file.

  TODO
    - Change the log file information to a binary format to avoid calling
      longlong2str.

  RETURN VALUES
unknown's avatar
unknown committed
3272 3273
    0   ok
    1   write error
3274 3275 3276
*/

bool flush_relay_log_info(RELAY_LOG_INFO* rli)
3277
{
3278
  bool error=0;
3279
  DBUG_ENTER("flush_relay_log_info");
3280 3281

  if (unlikely(rli->no_storage))
3282
    DBUG_RETURN(0);
3283

3284 3285 3286
  IO_CACHE *file = &rli->info_file;
  char buff[FN_REFLEN*2+22*2+4], *pos;

3287
  my_b_seek(file, 0L);
3288
  pos=strmov(buff, rli->group_relay_log_name);
3289
  *pos++='\n';
3290
  pos=longlong2str(rli->group_relay_log_pos, pos, 10);
3291
  *pos++='\n';
3292
  pos=strmov(pos, rli->group_master_log_name);
3293
  *pos++='\n';
3294
  pos=longlong2str(rli->group_master_log_pos, pos, 10);
3295
  *pos='\n';
3296
  if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
3297 3298 3299
    error=1;
  if (flush_io_cache(file))
    error=1;
3300

3301
  /* Flushing the relay log is done by the slave I/O thread */
3302
  DBUG_RETURN(error);
3303 3304
}

3305

unknown's avatar
unknown committed
3306
/*
3307
  Called when we notice that the current "hot" log got rotated under our feet.
unknown's avatar
unknown committed
3308 3309 3310
*/

static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
3311
{
3312
  DBUG_ENTER("reopen_relay_log");
3313 3314
  DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
  DBUG_ASSERT(rli->cur_log_fd == -1);
unknown's avatar
unknown committed
3315 3316

  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3317
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
unknown's avatar
unknown committed
3318
                                   errmsg)) <0)
unknown's avatar
unknown committed
3319
    DBUG_RETURN(0);
3320 3321
  /*
    We want to start exactly where we was before:
unknown's avatar
unknown committed
3322 3323
    relay_log_pos       Current log pos
    pending             Number of bytes already processed from the event
3324
  */
3325
  rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE);
3326
  my_b_seek(cur_log,rli->event_relay_log_pos);
unknown's avatar
unknown committed
3327
  DBUG_RETURN(cur_log);
3328 3329
}

unknown's avatar
unknown committed
3330

3331
static Log_event* next_event(RELAY_LOG_INFO* rli)
3332 3333 3334
{
  Log_event* ev;
  IO_CACHE* cur_log = rli->cur_log;
3335
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3336 3337
  const char* errmsg=0;
  THD* thd = rli->sql_thd;
unknown's avatar
unknown committed
3338
  DBUG_ENTER("next_event");
3339

3340 3341
  DBUG_ASSERT(thd != 0);

3342 3343 3344 3345 3346
#ifndef DBUG_OFF
  if (abort_slave_event_count && !rli->events_till_abort--)
    DBUG_RETURN(0);
#endif

unknown's avatar
unknown committed
3347 3348
  /*
    For most operations we need to protect rli members with data_lock,
3349 3350 3351 3352
    so we assume calling function acquired this mutex for us and we will
    hold it for the most of the loop below However, we will release it
    whenever it is worth the hassle,  and in the cases when we go into a
    pthread_cond_wait() with the non-data_lock mutex
unknown's avatar
unknown committed
3353
  */
3354
  safe_mutex_assert_owner(&rli->data_lock);
unknown's avatar
unknown committed
3355

3356
  while (!sql_slave_killed(thd,rli))
unknown's avatar
unknown committed
3357 3358 3359
  {
    /*
      We can have two kinds of log reading:
unknown's avatar
unknown committed
3360 3361 3362 3363 3364 3365 3366 3367
      hot_log:
        rli->cur_log points at the IO_CACHE of relay_log, which
        is actively being updated by the I/O thread. We need to be careful
        in this case and make sure that we are not looking at a stale log that
        has already been rotated. If it has been, we reopen the log.

      The other case is much simpler:
        We just have a read only log that nobody else will be updating.
unknown's avatar
unknown committed
3368
    */
3369 3370 3371 3372 3373
    bool hot_log;
    if ((hot_log = (cur_log != &rli->cache_buf)))
    {
      DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
      pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
3374 3375

      /*
unknown's avatar
unknown committed
3376 3377
        Reading xxx_file_id is safe because the log will only
        be rotated when we hold relay_log.LOCK_log
unknown's avatar
unknown committed
3378
      */
unknown's avatar
unknown committed
3379
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3380
      {
unknown's avatar
unknown committed
3381 3382 3383 3384 3385 3386
        // The master has switched to a new log file; Reopen the old log file
        cur_log=reopen_relay_log(rli, &errmsg);
        pthread_mutex_unlock(log_lock);
        if (!cur_log)                           // No more log files
          goto err;
        hot_log=0;                              // Using old binary log
3387 3388
      }
    }
3389 3390 3391 3392 3393 3394 3395
    /* 
      As there is no guarantee that the relay is open (for example, an I/O
      error during a write by the slave I/O thread may have closed it), we
      have to test it.
    */
    if (!my_b_inited(cur_log))
      goto err;
3396 3397
#ifndef DBUG_OFF
    {
3398
      /* This is an assertion which sometimes fails, let's try to track it */
3399
      char llbuf1[22], llbuf2[22];
3400
      DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
3401
                          llstr(my_b_tell(cur_log),llbuf1),
3402 3403 3404
                          llstr(rli->event_relay_log_pos,llbuf2)));
      DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
      DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
3405 3406
    }
#endif
unknown's avatar
unknown committed
3407 3408
    /*
      Relay log is always in new format - if the master is 3.23, the
3409
      I/O thread will convert the format for us.
3410 3411
      A problem: the description event may be in a previous relay log. So if
      the slave has been shutdown meanwhile, we would have to look in old relay
3412 3413
      logs, which may even have been deleted. So we need to write this
      description event at the beginning of the relay log.
3414 3415
      When the relay log is created when the I/O thread starts, easy: the
      master will send the description event and we will queue it.
3416
      But if the relay log is created by new_file(): then the solution is:
3417
      MYSQL_BIN_LOG::open() will write the buffered description event.
unknown's avatar
unknown committed
3418
    */
3419 3420
    if ((ev=Log_event::read_log_event(cur_log,0,
                                      rli->relay_log.description_event_for_exec)))
3421

3422 3423
    {
      DBUG_ASSERT(thd==rli->sql_thd);
3424 3425 3426 3427 3428
      /*
        read it while we have a lock, to avoid a mutex lock in
        inc_event_relay_log_pos()
      */
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
3429
      if (hot_log)
unknown's avatar
unknown committed
3430
        pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
3431
      DBUG_RETURN(ev);
3432 3433
    }
    DBUG_ASSERT(thd==rli->sql_thd);
unknown's avatar
unknown committed
3434
    if (opt_reckless_slave)                     // For mysql-test
unknown's avatar
unknown committed
3435
      cur_log->error = 0;
unknown's avatar
unknown committed
3436
    if (cur_log->error < 0)
unknown's avatar
unknown committed
3437 3438
    {
      errmsg = "slave SQL thread aborted because of I/O error";
unknown's avatar
unknown committed
3439
      if (hot_log)
unknown's avatar
unknown committed
3440
        pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
3441 3442
      goto err;
    }
3443 3444
    if (!cur_log->error) /* EOF */
    {
unknown's avatar
unknown committed
3445
      /*
unknown's avatar
unknown committed
3446 3447 3448
        On a hot log, EOF means that there are no more updates to
        process and we must block until I/O thread adds some and
        signals us to continue
unknown's avatar
unknown committed
3449
      */
3450 3451
      if (hot_log)
      {
3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466
        /*
          We say in Seconds_Behind_Master that we have "caught up". Note that
          for example if network link is broken but I/O slave thread hasn't
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
          up" whereas we're not really caught up. Fixing that would require
          internally cutting timeout in smaller pieces in network read, no
          thanks. Another example: SQL has caught up on I/O, now I/O has read
          a new event and is queuing it; the false "0" will exist until SQL
          finishes executing the new event; it will be look abnormal only if
          the events have old timestamps (then you get "many", 0, "many").
          Transient phases like this can't really be fixed.
        */
        time_t save_timestamp= rli->last_master_timestamp;
        rli->last_master_timestamp= 0;

unknown's avatar
unknown committed
3467
        DBUG_ASSERT(rli->relay_log.get_open_count() ==
unknown's avatar
unknown committed
3468
                    rli->cur_log_old_open_count);
3469 3470 3471 3472 3473

        if (rli->ign_master_log_name_end[0])
        {
          /* We generate and return a Rotate, to make our positions advance */
          DBUG_PRINT("info",("seeing an ignored end segment"));
3474
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
3475
                                   0, rli->ign_master_log_pos_end,
unknown's avatar
unknown committed
3476
                                   Rotate_log_event::DUP_NAME);
3477
          rli->ign_master_log_name_end[0]= 0;
unknown's avatar
unknown committed
3478
          pthread_mutex_unlock(log_lock);
3479 3480 3481 3482 3483 3484 3485 3486 3487 3488
          if (unlikely(!ev))
          {
            errmsg= "Slave SQL thread failed to create a Rotate event "
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
            goto err;
          }
          ev->server_id= 0; // don't be ignored by slave SQL thread
          DBUG_RETURN(ev);
        }

unknown's avatar
unknown committed
3489 3490 3491 3492 3493
        /*
          We can, and should release data_lock while we are waiting for
          update. If we do not, show slave status will block
        */
        pthread_mutex_unlock(&rli->data_lock);
3494 3495

        /*
unknown's avatar
unknown committed
3496
          Possible deadlock :
3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507
          - the I/O thread has reached log_space_limit
          - the SQL thread has read all relay logs, but cannot purge for some
          reason:
            * it has already purged all logs except the current one
            * there are other logs than the current one but they're involved in
            a transaction that finishes in the current one (or is not finished)
          Solution :
          Wake up the possibly waiting I/O thread, and set a boolean asking
          the I/O thread to temporarily ignore the log_space_limit
          constraint, because we do not want the I/O thread to block because of
          space (it's ok if it blocks for any other reason (e.g. because the
unknown's avatar
unknown committed
3508
          master does not send anything). Then the I/O thread stops waiting
3509 3510
          and reads more events.
          The SQL thread decides when the I/O thread should take log_space_limit
unknown's avatar
unknown committed
3511
          into account again : ignore_log_space_limit is reset to 0
3512 3513 3514 3515
          in purge_first_log (when the SQL thread purges the just-read relay
          log), and also when the SQL thread starts. We should also reset
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
          fact, no need as RESET SLAVE requires that the slave
unknown's avatar
unknown committed
3516 3517
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
          it stops.
3518 3519 3520
        */
        pthread_mutex_lock(&rli->log_space_lock);
        // prevent the I/O thread from blocking next times
unknown's avatar
unknown committed
3521
        rli->ignore_log_space_limit= 1;
3522 3523 3524 3525 3526 3527
        /*
          If the I/O thread is blocked, unblock it.
          Ok to broadcast after unlock, because the mutex is only destroyed in
          ~st_relay_log_info(), i.e. when rli is destroyed, and rli will not be
          destroyed before we exit the present function.
        */
3528
        pthread_mutex_unlock(&rli->log_space_lock);
3529
        pthread_cond_broadcast(&rli->log_space_cond);
3530
        // Note that wait_for_update unlocks lock_log !
3531
        rli->relay_log.wait_for_update(rli->sql_thd, 1);
3532 3533
        // re-acquire data lock since we released it earlier
        pthread_mutex_lock(&rli->data_lock);
3534
        rli->last_master_timestamp= save_timestamp;
unknown's avatar
unknown committed
3535
        continue;
3536
      }
unknown's avatar
unknown committed
3537
      /*
unknown's avatar
unknown committed
3538 3539 3540
        If the log was not hot, we need to move to the next log in
        sequence. The next log could be hot or cold, we deal with both
        cases separately after doing some common initialization
unknown's avatar
unknown committed
3541 3542 3543 3544 3545
      */
      end_io_cache(cur_log);
      DBUG_ASSERT(rli->cur_log_fd >= 0);
      my_close(rli->cur_log_fd, MYF(MY_WME));
      rli->cur_log_fd = -1;
unknown's avatar
unknown committed
3546

3547
      if (relay_log_purge)
unknown's avatar
unknown committed
3548
      {
unknown's avatar
unknown committed
3549
        /*
3550 3551 3552 3553 3554 3555 3556 3557 3558 3559
          purge_first_log will properly set up relay log coordinates in rli.
          If the group's coordinates are equal to the event's coordinates
          (i.e. the relay log was not rotated in the middle of a group),
          we can purge this relay log too.
          We do ulonglong and string comparisons, this may be slow but
          - purging the last relay log is nice (it can save 1GB of disk), so we
          like to detect the case where we can do it, and given this,
          - I see no better detection method
          - purge_first_log is not called that often
        */
unknown's avatar
unknown committed
3560
        if (rli->relay_log.purge_first_log
3561 3562 3563
            (rli,
             rli->group_relay_log_pos == rli->event_relay_log_pos
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
unknown's avatar
unknown committed
3564 3565 3566 3567
        {
          errmsg = "Error purging processed logs";
          goto err;
        }
unknown's avatar
unknown committed
3568
      }
3569 3570
      else
      {
unknown's avatar
unknown committed
3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586
        /*
          If hot_log is set, then we already have a lock on
          LOCK_log.  If not, we have to get the lock.

          According to Sasha, the only time this code will ever be executed
          is if we are recovering from a bug.
        */
        if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
        {
          errmsg = "error switching to the next log";
          goto err;
        }
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
        strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
                sizeof(rli->event_relay_log_name)-1);
        flush_relay_log_info(rli);
unknown's avatar
unknown committed
3587
      }
3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601

      /*
        Now we want to open this next log. To know if it's a hot log (the one
        being written by the I/O thread now) or a cold log, we can use
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
        the file normally. But if is_active() reports that the log is hot, this
        may change between the test and the consequence of the test. So we may
        open the I/O cache whereas the log is now cold, which is nonsense.
        To guard against this, we need to have LOCK_log.
      */

      DBUG_PRINT("info",("hot_log: %d",hot_log));
      if (!hot_log) /* if hot_log, we already have this mutex */
        pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
3602 3603
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
      {
3604
#ifdef EXTRA_DEBUG
unknown's avatar
unknown committed
3605 3606
        if (global_system_variables.log_warnings)
          sql_print_information("next log '%s' is currently active",
3607
                                rli->linfo.log_file_name);
unknown's avatar
unknown committed
3608 3609 3610 3611 3612 3613 3614 3615
#endif
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
        DBUG_ASSERT(rli->cur_log_fd == -1);

        /*
          Read pointer has to be at the start since we are the only
          reader.
3616 3617 3618
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
          log (same as when we call read_log_event() above: for a hot log we
          take the mutex).
unknown's avatar
unknown committed
3619 3620
        */
        if (check_binlog_magic(cur_log,&errmsg))
3621 3622
        {
          if (!hot_log) pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
3623
          goto err;
3624 3625
        }
        if (!hot_log) pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
3626
        continue;
3627
      }
3628
      if (!hot_log) pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
3629
      /*
unknown's avatar
unknown committed
3630 3631 3632
        if we get here, the log was not hot, so we will have to open it
        ourselves. We are sure that the log is still not hot now (a log can get
        from hot to cold, but not from cold to hot). No need for LOCK_log.
unknown's avatar
unknown committed
3633 3634
      */
#ifdef EXTRA_DEBUG
unknown's avatar
unknown committed
3635
      if (global_system_variables.log_warnings)
unknown's avatar
unknown committed
3636
        sql_print_information("next log '%s' is not active",
3637
                              rli->linfo.log_file_name);
unknown's avatar
unknown committed
3638
#endif
unknown's avatar
unknown committed
3639 3640
      // open_binlog() will check the magic header
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
unknown's avatar
unknown committed
3641 3642
                                       &errmsg)) <0)
        goto err;
3643
    }
unknown's avatar
unknown committed
3644
    else
3645
    {
unknown's avatar
unknown committed
3646
      /*
unknown's avatar
unknown committed
3647 3648
        Read failed with a non-EOF error.
        TODO: come up with something better to handle this error
unknown's avatar
unknown committed
3649 3650
      */
      if (hot_log)
unknown's avatar
unknown committed
3651
        pthread_mutex_unlock(log_lock);
3652
      sql_print_error("Slave SQL thread: I/O error reading \
unknown's avatar
unknown committed
3653
event(errno: %d  cur_log->error: %d)",
unknown's avatar
unknown committed
3654
                      my_errno,cur_log->error);
unknown's avatar
unknown committed
3655
      // set read position to the beginning of the event
3656
      my_b_seek(cur_log,rli->event_relay_log_pos);
unknown's avatar
unknown committed
3657 3658
      /* otherwise, we have had a partial read */
      errmsg = "Aborting slave SQL thread because of partial event read";
unknown's avatar
unknown committed
3659
      break;                                    // To end of function
3660 3661
    }
  }
3662
  if (!errmsg && global_system_variables.log_warnings)
3663
  {
unknown's avatar
unknown committed
3664
    sql_print_information("Error reading relay log event: %s",
3665 3666 3667
                          "slave SQL thread was killed");
    DBUG_RETURN(0);
  }
unknown's avatar
unknown committed
3668

3669
err:
3670 3671
  if (errmsg)
    sql_print_error("Error reading relay log event: %s", errmsg);
unknown's avatar
unknown committed
3672
  DBUG_RETURN(0);
3673 3674
}

3675 3676 3677
/*
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
  because of size is simpler because when we do it we already have all relevant
unknown's avatar
unknown committed
3678
  locks; here we don't, so this function is mainly taking locks).
3679 3680
  Returns nothing as we cannot catch any error (MYSQL_BIN_LOG::new_file()
  is void).
3681 3682 3683 3684 3685 3686
*/

void rotate_relay_log(MASTER_INFO* mi)
{
  DBUG_ENTER("rotate_relay_log");
  RELAY_LOG_INFO* rli= &mi->rli;
unknown's avatar
unknown committed
3687

3688 3689 3690
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
  pthread_mutex_lock(&mi->run_lock);

unknown's avatar
unknown committed
3691
  /*
unknown's avatar
unknown committed
3692 3693 3694
     We need to test inited because otherwise, new_file() will attempt to lock
     LOCK_log, which may not be inited (if we're not a slave).
  */
3695 3696
  if (!rli->inited)
  {
unknown's avatar
unknown committed
3697
    DBUG_PRINT("info", ("rli->inited == 0"));
unknown's avatar
unknown committed
3698
    goto end;
3699
  }
unknown's avatar
unknown committed
3700

3701
  /* If the relay log is closed, new_file() will do nothing. */
3702
  rli->relay_log.new_file();
unknown's avatar
unknown committed
3703

3704 3705 3706 3707
  /*
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
    threads are started:
unknown's avatar
unknown committed
3708 3709 3710 3711 3712 3713
    relay_log_space decreases by the size of the deleted relay log, but does
    not increase, so flush-after-flush we may become negative, which is wrong.
    Even if this will be corrected as soon as a query is replicated on the
    slave (because the I/O thread will then call harvest_bytes_written() which
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3714 3715 3716 3717
    If the log is closed, then this will just harvest the last writes, probably
    0 as they probably have been harvested.
  */
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
unknown's avatar
unknown committed
3718
end:
3719
  pthread_mutex_unlock(&mi->run_lock);
3720 3721 3722
  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
3723

unknown's avatar
unknown committed
3724 3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739
/**
   Detects, based on master's version (as found in the relay log), if master
   has a certain bug.
   @param rli RELAY_LOG_INFO which tells the master's version
   @param bug_id Number of the bug as found in bugs.mysql.com
   @return TRUE if master has the bug, FALSE if it does not.
*/
bool rpl_master_has_bug(RELAY_LOG_INFO *rli, uint bug_id)
{
  struct st_version_range_for_one_bug {
    uint        bug_id;
    const uchar introduced_in[3]; // first version with bug
    const uchar fixed_in[3];      // first version with fix
  };
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
  {
unknown's avatar
unknown committed
3740 3741
    {24432, { 5, 0, 24 }, { 5, 0, 38 } },
    {24432, { 5, 1, 12 }, { 5, 1, 17 } }
unknown's avatar
unknown committed
3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761
  };
  const uchar *master_ver=
    rli->relay_log.description_event_for_exec->server_version_split;

  DBUG_ASSERT(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);

  for (uint i= 0;
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
  {
    const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
      *fixed_in= versions_for_all_bugs[i].fixed_in;
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
        (memcmp(fixed_in,      master_ver, 3) >  0))
    {
      // a short message for SHOW SLAVE STATUS (message length constraints)
      my_printf_error(ER_UNKNOWN_ERROR, "master may suffer from"
                      " http://bugs.mysql.com/bug.php?id=%u"
                      " so slave stops; check error log on slave"
                      " for more info", MYF(0), bug_id);
3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781
      // a verbose message for the error log
      slave_print_msg(ERROR_LEVEL, rli, ER_UNKNOWN_ERROR,
                      "According to the master's version ('%s'),"
                      " it is probable that master suffers from this bug:"
                      " http://bugs.mysql.com/bug.php?id=%u"
                      " and thus replicating the current binary log event"
                      " may make the slave's data become different from the"
                      " master's data."
                      " To take no risk, slave refuses to replicate"
                      " this event and stops."
                      " We recommend that all updates be stopped on the"
                      " master and slave, that the data of both be"
                      " manually synchronized,"
                      " that master's binary logs be deleted,"
                      " that master be upgraded to a version at least"
                      " equal to '%d.%d.%d'. Then replication can be"
                      " restarted.",
                      rli->relay_log.description_event_for_exec->server_version,
                      bug_id,
                      fixed_in[0], fixed_in[1], fixed_in[2]);
unknown's avatar
unknown committed
3782 3783 3784 3785 3786 3787
      return TRUE;
    }
  }
  return FALSE;
}

3788
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
unknown's avatar
unknown committed
3789
template class I_List_iterator<i_string>;
unknown's avatar
unknown committed
3790
template class I_List_iterator<i_string_pair>;
unknown's avatar
unknown committed
3791
#endif
3792

unknown's avatar
SCRUM  
unknown committed
3793
#endif /* HAVE_REPLICATION */