slave.cc 135 KB
Newer Older
1
/* Copyright (C) 2000-2003 MySQL AB
unknown's avatar
unknown committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "mysql_priv.h"
unknown's avatar
SCRUM  
unknown committed
18 19 20

#ifdef HAVE_REPLICATION

unknown's avatar
unknown committed
21
#include <mysql.h>
22
#include <myisam.h>
23
#include "slave.h"
24
#include "sql_repl.h"
25
#include "repl_failsafe.h"
unknown's avatar
unknown committed
26
#include <thr_alarm.h>
unknown's avatar
unknown committed
27
#include <my_dir.h>
unknown's avatar
unknown committed
28
#include <sql_common.h>
unknown's avatar
SCRUM  
unknown committed
29

30 31 32
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;

33 34
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);

35
volatile bool slave_sql_running = 0, slave_io_running = 0;
36
char* slave_load_tmpdir = 0;
unknown's avatar
unknown committed
37
MASTER_INFO *active_mi;
38
HASH replicate_do_table, replicate_ignore_table;
unknown's avatar
unknown committed
39
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
40
bool do_table_inited = 0, ignore_table_inited = 0;
unknown's avatar
unknown committed
41
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
42
bool table_rules_on= 0, replicate_same_server_id;
43
ulonglong relay_log_space_limit = 0;
unknown's avatar
unknown committed
44 45 46 47 48 49 50

/*
  When slave thread exits, we need to remember the temporary tables so we
  can re-use them on slave start.

  TODO: move the vars below under MASTER_INFO
*/
51

52
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
53
int events_till_abort = -1;
54
static int events_till_disconnect = -1;
unknown's avatar
unknown committed
55

56
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
unknown's avatar
unknown committed
57

58
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
unknown's avatar
unknown committed
59
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
unknown's avatar
unknown committed
60
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
61 62
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
unknown's avatar
unknown committed
63
static int count_relay_log_space(RELAY_LOG_INFO* rli);
64
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
65
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
unknown's avatar
unknown committed
66 67
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings);
unknown's avatar
unknown committed
68
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
69
			     bool reconnect, bool suppress_warnings);
70 71
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg);
72
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
73
static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
74
				  const char* table_name, bool overwrite);
75
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi);
76

77 78

/*
unknown's avatar
unknown committed
79
  Find out which replications threads are running
80

unknown's avatar
unknown committed
81 82 83 84 85 86 87 88 89 90 91 92 93
  SYNOPSIS
    init_thread_mask()
    mask		Return value here
    mi			master_info for slave
    inverse		If set, returns which threads are not running

  IMPLEMENTATION
    Get a bit mask for which threads are running so that we can later restart
    these threads.

  RETURN
    mask	If inverse == 0, running threads
		If inverse == 1, stopped threads    
94 95
*/

96 97 98 99 100 101 102 103
void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
{
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
  register int tmp_mask=0;
  if (set_io)
    tmp_mask |= SLAVE_IO;
  if (set_sql)
    tmp_mask |= SLAVE_SQL;
104 105
  if (inverse)
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
106 107 108
  *mask = tmp_mask;
}

109

unknown's avatar
unknown committed
110
/*
111
  lock_slave_threads()
unknown's avatar
unknown committed
112
*/
113

114 115 116 117 118 119 120
void lock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_lock(&mi->run_lock);
  pthread_mutex_lock(&mi->rli.run_lock);
}

121

unknown's avatar
unknown committed
122
/*
123
  unlock_slave_threads()
unknown's avatar
unknown committed
124
*/
125

126 127 128 129 130 131 132
void unlock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_unlock(&mi->rli.run_lock);
  pthread_mutex_unlock(&mi->run_lock);
}

133

unknown's avatar
unknown committed
134
/* Initialize slave structures */
135

136 137
int init_slave()
{
138
  DBUG_ENTER("init_slave");
139

140 141 142 143 144 145
  /*
    This is called when mysqld starts. Before client connections are
    accepted. However bootstrap may conflict with us if it does START SLAVE.
    So it's safer to take the lock.
  */
  pthread_mutex_lock(&LOCK_active_mi);
146 147 148 149
  /*
    TODO: re-write this to interate through the list of files
    for multi-master
  */
unknown's avatar
unknown committed
150
  active_mi= new MASTER_INFO;
151 152

  /*
153 154 155
    If master_host is not specified, try to read it from the master_info file.
    If master_host is specified, create the master_info file if it doesn't
    exists.
156
  */
157 158 159 160 161 162
  if (!active_mi)
  {
    sql_print_error("Failed to allocate memory for the master info structure");
    goto err;
  }
    
unknown's avatar
unknown committed
163
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
164
		       !master_host))
165
  {
166
    sql_print_error("Failed to initialize the master info structure");
unknown's avatar
unknown committed
167
    goto err;
168
  }
169 170 171 172

  if (server_id && !master_host && active_mi->host[0])
    master_host= active_mi->host;

173 174
  /* If server id is not set, start_slave_thread() will say it */

175
  if (master_host && !opt_skip_slave_start)
176
  {
177 178 179 180 181 182
    if (start_slave_threads(1 /* need mutex */,
			    0 /* no wait for start*/,
			    active_mi,
			    master_info_file,
			    relay_log_info_file,
			    SLAVE_IO | SLAVE_SQL))
unknown's avatar
unknown committed
183
    {
184
      sql_print_error("Failed to create slave threads");
unknown's avatar
unknown committed
185 186
      goto err;
    }
187
  }
188
  pthread_mutex_unlock(&LOCK_active_mi);
189
  DBUG_RETURN(0);
190

unknown's avatar
unknown committed
191
err:
192
  pthread_mutex_unlock(&LOCK_active_mi);
unknown's avatar
unknown committed
193
  DBUG_RETURN(1);
194
}
195

196

197 198
static void free_table_ent(TABLE_RULE_ENT* e)
{
unknown's avatar
unknown committed
199
  my_free((gptr) e, MYF(0));
200 201
}

202

203 204 205 206 207 208 209
static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
			   my_bool not_used __attribute__((unused)))
{
  *len = e->key_len;
  return (byte*)e->db;
}

210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228

/*
  Open the given relay log

  SYNOPSIS
    init_relay_log_pos()
    rli			Relay information (will be initialized)
    log			Name of relay log file to read from. NULL = First log
    pos			Position in relay log file 
    need_data_lock	Set to 1 if this functions should do mutex locks
    errmsg		Store pointer to error message here

  DESCRIPTION
  - Close old open relay log files.
  - If we are using the same relay log as the running IO-thread, then set
    rli->cur_log to point to the same IO_CACHE entry.
  - If not, open the 'log' binary file.

  TODO
229
    - check proper initialization of group_master_log_name/group_master_log_pos
230 231 232 233 234

  RETURN VALUES
    0	ok
    1	error.  errmsg is set to point to the error message
*/
unknown's avatar
unknown committed
235

236 237 238 239
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
		       ulonglong pos, bool need_data_lock,
		       const char** errmsg)
{
unknown's avatar
unknown committed
240 241
  DBUG_ENTER("init_relay_log_pos");

242
  *errmsg=0;
243
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
244
  
245 246 247
  if (need_data_lock)
    pthread_mutex_lock(&rli->data_lock);
  
248 249
  pthread_mutex_lock(log_lock);
  
250
  /* Close log file and free buffers if it's already open */
251 252 253 254 255 256 257
  if (rli->cur_log_fd >= 0)
  {
    end_io_cache(&rli->cache_buf);
    my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
  
258
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
unknown's avatar
unknown committed
259

unknown's avatar
unknown committed
260 261 262 263
  /*
    Test to see if the previous run was with the skip of purging
    If yes, we do not purge when we restart
  */
264
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
265 266 267 268
  {
    *errmsg="Could not find first log during relay log initialization";
    goto err;
  }
269

270
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
unknown's avatar
unknown committed
271
  {
272 273
    *errmsg="Could not find target log during relay log initialization";
    goto err;
unknown's avatar
unknown committed
274
  }
275 276 277 278
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
	  sizeof(rli->group_relay_log_name)-1);
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
	  sizeof(rli->event_relay_log_name)-1);
279 280
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
  {
281 282 283 284 285
    /*
      The IO thread is using this log file.
      In this case, we will use the same IO_CACHE pointer to
      read data as the IO thread is using to write data.
    */
unknown's avatar
unknown committed
286 287 288
    rli->cur_log= rli->relay_log.get_log_file();
    if (my_b_tell(rli->cur_log) == 0 &&
	check_binlog_magic(rli->cur_log, errmsg))
289
      goto err;
unknown's avatar
unknown committed
290
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
291 292 293
  }
  else
  {
294 295 296
    /*
      Open the relay log and set rli->cur_log to point at this one
    */
297 298 299 300 301
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
				     rli->linfo.log_file_name,errmsg)) < 0)
      goto err;
    rli->cur_log = &rli->cache_buf;
  }
302
  if (pos >= BIN_LOG_HEADER_SIZE)
unknown's avatar
unknown committed
303 304
    my_b_seek(rli->cur_log,(off_t)pos);

305
err:
306 307 308 309
  /*
    If we don't purge, we can't honour relay_log_space_limit ;
    silently discard it
  */
310
  if (!relay_log_purge)
311
    rli->log_space_limit= 0;
unknown's avatar
unknown committed
312
  pthread_cond_broadcast(&rli->data_cond);
313 314 315
  
  pthread_mutex_unlock(log_lock);

unknown's avatar
unknown committed
316 317
  if (need_data_lock)
    pthread_mutex_unlock(&rli->data_lock);
318

unknown's avatar
unknown committed
319
  DBUG_RETURN ((*errmsg) ? 1 : 0);
320 321
}

322

unknown's avatar
unknown committed
323 324
/*
  Init functio to set up array for errors that should be skipped for slave
unknown's avatar
unknown committed
325

unknown's avatar
unknown committed
326 327 328 329 330 331 332
  SYNOPSIS
    init_slave_skip_errors()
    arg		List of errors numbers to skip, separated with ','

  NOTES
    Called from get_options() in mysqld.cc on start-up
*/
333

unknown's avatar
unknown committed
334
void init_slave_skip_errors(const char* arg)
335
{
unknown's avatar
unknown committed
336
  const char *p;
337
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
338 339 340 341 342
  {
    fprintf(stderr, "Badly out of memory, please check your system status\n");
    exit(1);
  }
  use_slave_mask = 1;
343
  for (;my_isspace(system_charset_info,*arg);++arg)
344
    /* empty */;
345
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
346 347 348 349 350 351 352 353 354 355 356
  {
    bitmap_set_all(&slave_error_mask);
    return;
  }
  for (p= arg ; *p; )
  {
    long err_code;
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
      break;
    if (err_code < MAX_SLAVE_ERROR)
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
357
    while (!my_isdigit(system_charset_info,*p) && *p)
358 359 360 361
      p++;
  }
}

362

unknown's avatar
unknown committed
363 364 365
void st_relay_log_info::inc_group_relay_log_pos(ulonglong val,
                                                ulonglong log_pos,
                                                bool skip_lock)
366 367 368
{
  if (!skip_lock)
    pthread_mutex_lock(&data_lock);
unknown's avatar
unknown committed
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
  inc_event_relay_log_pos(val);
  group_relay_log_pos= event_relay_log_pos;
  strmake(group_relay_log_name,event_relay_log_name,
          sizeof(group_relay_log_name)-1);

  notify_group_relay_log_name_update();
        
  /*
    If the slave does not support transactions and replicates a transaction,
    users should not trust group_master_log_pos (which they can display with
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
    group_master_log_pos the slave relies on log_pos stored in the master's
    binlog, but if we are in a master's transaction these positions are always
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
    not advance as it should on the non-transactional slave (it advances by
    big leaps, whereas it should advance by small leaps).
  */
  if (log_pos) // 3.23 binlogs don't have log_posx
  {
388 389 390 391 392 393 394 395 396 397 398
#if MYSQL_VERSION_ID < 50000
    /*
      If the event was converted from a 3.23 format, get_event_len() has
      grown by 6 bytes (at least for most events, except LOAD DATA INFILE
      which is already a big problem for 3.23->4.0 replication); 6 bytes is
      the difference between the header's size in 4.0 (LOG_EVENT_HEADER_LEN)
      and the header's size in 3.23 (OLD_HEADER_LEN). Note that using
      mi->old_format will not help if the I/O thread has not started yet.
      Yes this is a hack but it's just to make 3.23->4.x replication work;
      3.23->5.0 replication is working much better.
    */
unknown's avatar
unknown committed
399
    group_master_log_pos= log_pos + val -
400
      (mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
unknown's avatar
unknown committed
401 402 403 404
#else
    group_master_log_pos= log_pos+ val;
#endif /* MYSQL_VERSION_ID < 5000 */
  }
405 406 407 408 409 410
  pthread_cond_broadcast(&data_cond);
  if (!skip_lock)
    pthread_mutex_unlock(&data_lock);
}


unknown's avatar
unknown committed
411 412 413 414 415 416 417 418 419 420 421 422 423
void st_relay_log_info::close_temporary_tables()
{
  TABLE *table,*next;

  for (table=save_temporary_tables ; table ; table=next)
  {
    next=table->next;
    /*
      Don't ask for disk deletion. For now, anyway they will be deleted when
      slave restarts, but it is a better intention to not delete them.
    */
    close_temporary(table, 0);
  }
424 425
  save_temporary_tables= 0;
  slave_open_temp_tables= 0;
unknown's avatar
unknown committed
426
}
unknown's avatar
unknown committed
427

unknown's avatar
unknown committed
428
/*
429 430
  purge_relay_logs()

unknown's avatar
unknown committed
431 432
  NOTES
    Assumes to have a run lock on rli and that no slave thread are running.
unknown's avatar
unknown committed
433 434
*/

435 436
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
		     const char** errmsg)
437
{
438
  int error=0;
unknown's avatar
unknown committed
439
  DBUG_ENTER("purge_relay_logs");
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458

  /*
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
    Indeed, rli->inited==0 does not imply that they already are empty.
    It could be that slave's info initialization partly succeeded : 
    for example if relay-log.info existed but *relay-bin*.*
    have been manually removed, init_relay_log_info reads the old 
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
    checks for the existence of the relay log, this fails and
    init_relay_log_info leaves rli->inited to 0.
    In that pathological case, rli->master_log_pos* will be properly reinited
    at the next START SLAVE (as RESET SLAVE or CHANGE
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
    or replace them with correct files), however if the user does SHOW SLAVE
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS 
    to display fine in any case.
  */

459 460
  rli->group_master_log_name[0]= 0;
  rli->group_master_log_pos= 0;
461

462
  if (!rli->inited)
463 464
  {
    DBUG_PRINT("info", ("rli->inited == 0"));
465
    DBUG_RETURN(0);
466
  }
unknown's avatar
unknown committed
467

468 469
  DBUG_ASSERT(rli->slave_running == 0);
  DBUG_ASSERT(rli->mi->slave_running == 0);
unknown's avatar
unknown committed
470

471 472
  rli->slave_skip_counter=0;
  pthread_mutex_lock(&rli->data_lock);
473
  if (rli->relay_log.reset_logs(thd))
474 475 476 477 478
  {
    *errmsg = "Failed during log reset";
    error=1;
    goto err;
  }
479
  /* Save name of used relay log file */
480 481 482 483
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
	  sizeof(rli->group_relay_log_name)-1);
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
 	  sizeof(rli->event_relay_log_name)-1);
unknown's avatar
unknown committed
484 485
  // Just first log with magic number and nothing else
  rli->log_space_total= BIN_LOG_HEADER_SIZE;
486
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
unknown's avatar
unknown committed
487
  rli->relay_log.reset_bytes_written();
488
  if (!just_reset)
489 490 491
    error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos,
  			      0 /* do not need data lock */, errmsg);
  
unknown's avatar
unknown committed
492 493 494 495
err:
#ifndef DBUG_OFF
  char buf[22];
#endif  
unknown's avatar
unknown committed
496
  DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
497
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
498
  DBUG_RETURN(error);
499 500
}

501

502 503 504 505 506 507 508
int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
{
  if (!mi->inited)
    return 0; /* successfully do nothing */
  int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
  pthread_mutex_t *sql_cond_lock,*io_cond_lock;
509
  DBUG_ENTER("terminate_slave_threads");
510 511 512 513 514 515 516 517 518 519

  sql_cond_lock=sql_lock;
  io_cond_lock=io_lock;
  
  if (skip_lock)
  {
    sql_lock = io_lock = 0;
  }
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
  {
unknown's avatar
unknown committed
520
    DBUG_PRINT("info",("Terminating IO thread"));
521 522
    mi->abort_slave=1;
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
unknown's avatar
unknown committed
523 524 525
				      io_cond_lock,
				      &mi->stop_cond,
				      &mi->slave_running)) &&
526
	!force_all)
527
      DBUG_RETURN(error);
528 529 530
  }
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
  {
unknown's avatar
unknown committed
531
    DBUG_PRINT("info",("Terminating SQL thread"));
532 533 534 535 536 537 538
    DBUG_ASSERT(mi->rli.sql_thd != 0) ;
    mi->rli.abort_slave=1;
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
				      sql_cond_lock,
				      &mi->rli.stop_cond,
				      &mi->rli.slave_running)) &&
	!force_all)
539
      DBUG_RETURN(error);
540
  }
541
  DBUG_RETURN(0);
542 543
}

544

545 546 547 548 549 550 551 552 553 554 555 556 557 558 559
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
			   pthread_mutex_t *cond_lock,
			   pthread_cond_t* term_cond,
			   volatile bool* slave_running)
{
  if (term_lock)
  {
    pthread_mutex_lock(term_lock);
    if (!*slave_running)
    {
      pthread_mutex_unlock(term_lock);
      return ER_SLAVE_NOT_RUNNING;
    }
  }
  DBUG_ASSERT(thd != 0);
unknown's avatar
unknown committed
560 561 562
  /*
    Is is criticate to test if the slave is running. Otherwise, we might
    be referening freed memory trying to kick it
563
  */
564
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
565 566

  while (*slave_running)			// Should always be true
567 568
  {
    KICK_SLAVE(thd);
unknown's avatar
unknown committed
569 570 571
    /*
      There is a small chance that slave thread might miss the first
      alarm. To protect againts it, resend the signal until it reacts
572 573
    */
    struct timespec abstime;
unknown's avatar
unknown committed
574
    set_timespec(abstime,2);
575 576 577 578 579 580 581
    pthread_cond_timedwait(term_cond, cond_lock, &abstime);
  }
  if (term_lock)
    pthread_mutex_unlock(term_lock);
  return 0;
}

582

unknown's avatar
unknown committed
583
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
584
		       pthread_mutex_t *cond_lock,
unknown's avatar
unknown committed
585 586 587
		       pthread_cond_t *start_cond,
		       volatile bool *slave_running,
		       volatile ulong *slave_run_id,
588 589
		       MASTER_INFO* mi,
                       bool high_priority)
590 591
{
  pthread_t th;
unknown's avatar
unknown committed
592
  ulong start_id;
593
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
594 595
  DBUG_ENTER("start_slave_thread");

596 597 598 599 600 601 602 603 604
  if (start_lock)
    pthread_mutex_lock(start_lock);
  if (!server_id)
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
    sql_print_error("Server id not set, will not start slave");
unknown's avatar
unknown committed
605
    DBUG_RETURN(ER_BAD_SLAVE);
606 607 608
  }
  
  if (*slave_running)
609 610 611 612 613
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
614
    DBUG_RETURN(ER_SLAVE_MUST_STOP);
615
  }
unknown's avatar
unknown committed
616 617
  start_id= *slave_run_id;
  DBUG_PRINT("info",("Creating new slave thread"));
618 619
  if (high_priority)
    my_pthread_attr_setprio(&connection_attrib,CONNECT_PRIOR);
620 621 622 623
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
  {
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
624
    DBUG_RETURN(ER_SLAVE_THREAD);
625 626 627 628
  }
  if (start_cond && cond_lock)
  {
    THD* thd = current_thd;
unknown's avatar
unknown committed
629
    while (start_id == *slave_run_id)
630
    {
unknown's avatar
unknown committed
631
      DBUG_PRINT("sleep",("Waiting for slave thread to start"));
632
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
633
					    "Waiting for slave thread to start");
634 635 636 637 638
      pthread_cond_wait(start_cond,cond_lock);
      thd->exit_cond(old_msg);
      if (thd->killed)
      {
	pthread_mutex_unlock(cond_lock);
unknown's avatar
unknown committed
639
	DBUG_RETURN(ER_SERVER_SHUTDOWN);
640 641 642 643 644
      }
    }
  }
  if (start_lock)
    pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
645
  DBUG_RETURN(0);
646
}
unknown's avatar
unknown committed
647

648

unknown's avatar
unknown committed
649
/*
650
  start_slave_threads()
unknown's avatar
unknown committed
651

unknown's avatar
unknown committed
652 653 654 655
  NOTES
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
    sense to do that for starting a slave--we always care if it actually
    started the threads that were not previously running
656
*/
unknown's avatar
unknown committed
657

658 659 660 661 662 663 664
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
			MASTER_INFO* mi, const char* master_info_fname,
			const char* slave_info_fname, int thread_mask)
{
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
  pthread_cond_t* cond_io=0,*cond_sql=0;
  int error=0;
665
  DBUG_ENTER("start_slave_threads");
666 667 668 669 670 671 672 673 674 675 676 677 678
  
  if (need_slave_mutex)
  {
    lock_io = &mi->run_lock;
    lock_sql = &mi->rli.run_lock;
  }
  if (wait_for_start)
  {
    cond_io = &mi->start_cond;
    cond_sql = &mi->rli.start_cond;
    lock_cond_io = &mi->run_lock;
    lock_cond_sql = &mi->rli.run_lock;
  }
679 680 681

  if (thread_mask & SLAVE_IO)
    error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
unknown's avatar
unknown committed
682 683
			     cond_io,
			     &mi->slave_running, &mi->slave_run_id,
684
			     mi, 1); //high priority, to read the most possible
685
  if (!error && (thread_mask & SLAVE_SQL))
686
  {
687 688
    error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
			     cond_sql,
unknown's avatar
unknown committed
689
			     &mi->rli.slave_running, &mi->rli.slave_run_id,
690
			     mi, 0);
691 692 693
    if (error)
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
  }
694
  DBUG_RETURN(error);
695
}
696

697

698 699
void init_table_rule_hash(HASH* h, bool* h_inited)
{
unknown's avatar
unknown committed
700
  hash_init(h, system_charset_info,TABLE_RULE_HASH_SIZE,0,0,
701
	    (hash_get_key) get_table_key,
702
	    (hash_free_key) free_table_ent, 0);
703 704
  *h_inited = 1;
}
unknown's avatar
unknown committed
705

706

unknown's avatar
unknown committed
707 708
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
{
709
  my_init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
unknown's avatar
unknown committed
710 711 712 713
		     TABLE_RULE_ARR_SIZE);
  *a_inited = 1;
}

714

unknown's avatar
unknown committed
715 716 717 718 719
static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
  uint i;
  const char* key_end = key + len;
  
720
  for (i = 0; i < a->elements; i++)
unknown's avatar
unknown committed
721 722 723
    {
      TABLE_RULE_ENT* e ;
      get_dynamic(a, (gptr)&e, i);
unknown's avatar
unknown committed
724
      if (!my_wildcmp(system_charset_info, key, key_end, 
725
                            (const char*)e->db,
unknown's avatar
unknown committed
726 727
			    (const char*)(e->db + e->key_len),
			    '\\',wild_one,wild_many))
unknown's avatar
unknown committed
728 729 730 731 732 733
	return e;
    }
  
  return 0;
}

734

735 736 737 738 739 740 741 742 743 744 745 746 747
/*
  Checks whether tables match some (wild_)do_table and (wild_)ignore_table
  rules (for replication)

  SYNOPSIS
    tables_ok()
    thd             thread (SQL slave thread normally)
    tables          list of tables to check

  NOTES
    Note that changing the order of the tables in the list can lead to
    different results. Note also the order of precedence of the do/ignore 
    rules (see code below). For that reason, users should not set conflicting 
748 749 750 751 752 753 754 755 756 757
    rules because they may get unpredicted results (precedence order is
    explained in the manual).
    If no table of the list is marked "updating" (so far this can only happen
    if the statement is a multi-delete (SQLCOM_DELETE_MULTI) and the "tables"
    is the tables in the FROM): then we always return 0, because there is no
    reason we play this statement on this slave if it updates nothing. In the
    case of SQLCOM_DELETE_MULTI, there will be a second call to tables_ok(),
    with tables having "updating==TRUE" (those after the DELETE), so this
    second call will make the decision (because
    all_tables_not_ok() = !tables_ok(1st_list) && !tables_ok(2nd_list)).
758 759 760 761 762

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated                  
*/
763

764 765
int tables_ok(THD* thd, TABLE_LIST* tables)
{
766
  bool some_tables_updating= 0;
767 768
  DBUG_ENTER("tables_ok");

unknown's avatar
unknown committed
769 770
  for (; tables; tables = tables->next)
  {
771 772 773 774
    char hash_key[2*NAME_LEN+2];
    char *end;
    uint len;

unknown's avatar
unknown committed
775 776
    if (!tables->updating) 
      continue;
777
    some_tables_updating= 1;
778 779 780
    end= strmov(hash_key, tables->db ? tables->db : thd->db);
    *end++= '.';
    len= (uint) (strmov(end, tables->real_name) - hash_key);
unknown's avatar
unknown committed
781
    if (do_table_inited) // if there are any do's
782
    {
unknown's avatar
unknown committed
783
      if (hash_search(&replicate_do_table, (byte*) hash_key, len))
784
	DBUG_RETURN(1);
unknown's avatar
unknown committed
785
    }
786
    if (ignore_table_inited) // if there are any ignores
unknown's avatar
unknown committed
787 788
    {
      if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
789
	DBUG_RETURN(0); 
790
    }
unknown's avatar
unknown committed
791 792
    if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
					  hash_key, len))
793
      DBUG_RETURN(1);
unknown's avatar
unknown committed
794 795
    if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
					      hash_key, len))
796
      DBUG_RETURN(0);
unknown's avatar
unknown committed
797
  }
798

unknown's avatar
unknown committed
799
  /*
800 801
    If no table was to be updated, ignore statement (no reason we play it on
    slave, slave is supposed to replicate _changes_ only).
unknown's avatar
unknown committed
802 803 804
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
805 806
  DBUG_RETURN(some_tables_updating &&
              !do_table_inited && !wild_do_table_inited);
807 808
}

809

810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859
/*
  Checks whether a db matches wild_do_table and wild_ignore_table
  rules (for replication)

  SYNOPSIS
    db_ok_with_wild_table()
    db		name of the db to check.
		Is tested with check_db_name() before calling this function.

  NOTES
    Here is the reason for this function.
    We advise users who want to exclude a database 'db1' safely to do it
    with replicate_wild_ignore_table='db1.%' instead of binlog_ignore_db or
    replicate_ignore_db because the two lasts only check for the selected db,
    which won't work in that case:
    USE db2;
    UPDATE db1.t SET ... #this will be replicated and should not
    whereas replicate_wild_ignore_table will work in all cases.
    With replicate_wild_ignore_table, we only check tables. When
    one does 'DROP DATABASE db1', tables are not involved and the
    statement will be replicated, while users could expect it would not (as it
    rougly means 'DROP db1.first_table, DROP db1.second_table...').
    In other words, we want to interpret 'db1.%' as "everything touching db1".
    That is why we want to match 'db1' against 'db1.%' wild table rules.

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated
 */

int db_ok_with_wild_table(const char *db)
{
  char hash_key[NAME_LEN+2];
  char *end;
  int len;
  end= strmov(hash_key, db);
  *end++= '.';
  len= end - hash_key ;
  if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
                                        hash_key, len))
    return 1;
  if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
                                            hash_key, len))
    return 0;
  
  /*
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
  return !wild_do_table_inited;
860 861 862 863 864
}


int add_table_rule(HASH* h, const char* table_spec)
{
unknown's avatar
unknown committed
865
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
866
  if (!dot) return 1;
unknown's avatar
unknown committed
867
  // len is always > 0 because we know the there exists a '.'
868 869 870
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
unknown's avatar
unknown committed
871
  if (!e) return 1;
872 873 874 875
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
unknown's avatar
SCRUM  
unknown committed
876
  (void)my_hash_insert(h, (byte*)e);
877 878 879
  return 0;
}

880

unknown's avatar
unknown committed
881 882 883
/*
  Add table expression with wildcards to dynamic array
*/
884

unknown's avatar
unknown committed
885 886
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
unknown's avatar
unknown committed
887
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
888
  if (!dot) return 1;
unknown's avatar
unknown committed
889 890 891
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
unknown's avatar
unknown committed
892
  if (!e) return 1;
unknown's avatar
unknown committed
893 894 895 896 897 898 899 900
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  insert_dynamic(a, (gptr)&e);
  return 0;
}

901

902 903 904
static void free_string_array(DYNAMIC_ARRAY *a)
{
  uint i;
905
  for (i = 0; i < a->elements; i++)
906 907
    {
      char* p;
unknown's avatar
unknown committed
908
      get_dynamic(a, (gptr) &p, i);
909 910 911 912 913
      my_free(p, MYF(MY_WME));
    }
  delete_dynamic(a);
}

914

915
#ifdef NOT_USED_YET
916 917 918 919 920
static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
{
  end_master_info(mi);
  return 0;
}
921
#endif
922

923

unknown's avatar
unknown committed
924 925 926 927 928 929
/*
  Free all resources used by slave

  SYNOPSIS
    end_slave()
*/
930

931 932
void end_slave()
{
933 934 935 936 937 938 939 940
  /*
    This is called when the server terminates, in close_connections().
    It terminates slave threads. However, some CHANGE MASTER etc may still be
    running presently. If a START SLAVE was in progress, the mutex lock below
    will make us wait until slave threads have started, and START SLAVE
    returns, then we terminate them here.
  */
  pthread_mutex_lock(&LOCK_active_mi);
unknown's avatar
unknown committed
941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960
  if (active_mi)
  {
    /*
      TODO: replace the line below with
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
      once multi-master code is ready.
    */
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
    end_master_info(active_mi);
    if (do_table_inited)
      hash_free(&replicate_do_table);
    if (ignore_table_inited)
      hash_free(&replicate_ignore_table);
    if (wild_do_table_inited)
      free_string_array(&replicate_wild_do_table);
    if (wild_ignore_table_inited)
      free_string_array(&replicate_wild_ignore_table);
    delete active_mi;
    active_mi= 0;
  }
961
  pthread_mutex_unlock(&LOCK_active_mi);
962
}
unknown's avatar
unknown committed
963

964

965
static bool io_slave_killed(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
966
{
967 968 969
  DBUG_ASSERT(mi->io_thd == thd);
  DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun
  return mi->abort_slave || abort_loop || thd->killed;
unknown's avatar
unknown committed
970 971
}

972

973
static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
974 975 976 977 978 979
{
  DBUG_ASSERT(rli->sql_thd == thd);
  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
  return rli->abort_slave || abort_loop || thd->killed;
}

980

981 982 983 984 985 986 987 988 989 990 991 992 993 994
/*
  Writes an error message to rli->last_slave_error and rli->last_slave_errno
  (which will be displayed by SHOW SLAVE STATUS), and prints it to stderr.

  SYNOPSIS
    slave_print_error()
    rli		
    err_code    The error code
    msg         The error message (usually related to the error code, but can
                contain more information).
    ...         (this is printf-like format, with % symbols in msg)

  RETURN VALUES
    void
995
*/
996

997
void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
998 999 1000
{
  va_list args;
  va_start(args,msg);
1001 1002 1003
  my_vsnprintf(rli->last_slave_error,
	       sizeof(rli->last_slave_error), msg, args);
  rli->last_slave_errno = err_code;
1004 1005
  /* If the error string ends with '.', do not add a ',' it would be ugly */
  if (rli->last_slave_error[0] && 
1006 1007
      (*(strend(rli->last_slave_error)-1) == '.'))
    sql_print_error("Slave: %s Error_code: %d", rli->last_slave_error,
1008 1009
                    err_code);
  else
1010
    sql_print_error("Slave: %s, Error_code: %d", rli->last_slave_error,
1011 1012
                    err_code);

1013 1014
}

unknown's avatar
unknown committed
1015
/*
1016 1017
  skip_load_data_infile()

unknown's avatar
unknown committed
1018 1019 1020
  NOTES
    This is used to tell a 3.23 master to break send_file()
*/
1021 1022 1023 1024 1025 1026 1027 1028

void skip_load_data_infile(NET *net)
{
  (void)net_request_file(net, "/dev/null");
  (void)my_net_read(net);				// discard response
  (void)net_write_command(net, 0, "", 0, "", 0);	// Send ok
}

1029

1030
bool net_request_file(NET* net, const char* fname)
1031
{
1032 1033
  DBUG_ENTER("net_request_file");
  DBUG_RETURN(net_write_command(net, 251, fname, strlen(fname), "", 0));
1034 1035
}

1036

1037
const char *rewrite_db(const char* db)
unknown's avatar
unknown committed
1038
{
unknown's avatar
unknown committed
1039 1040
  if (replicate_rewrite_db.is_empty() || !db)
    return db;
unknown's avatar
unknown committed
1041 1042 1043
  I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  i_string_pair* tmp;

unknown's avatar
unknown committed
1044 1045 1046 1047 1048
  while ((tmp=it++))
  {
    if (!strcmp(tmp->key, db))
      return tmp->val;
  }
unknown's avatar
unknown committed
1049 1050
  return db;
}
1051

1052 1053
/*
  From other comments and tests in code, it looks like
1054
  sometimes Query_log_event and Load_log_event can have db == 0
1055 1056 1057
  (see rewrite_db() above for example)
  (cases where this happens are unclear; it may be when the master is 3.23).
*/
1058 1059

const char *print_slave_db_safe(const char* db)
1060
{
1061
  return (db ? rewrite_db(db) : "");
1062
}
1063

1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077
/*
  Checks whether a db matches some do_db and ignore_db rules
  (for logging or replication)

  SYNOPSIS
    db_ok()
    db              name of the db to check
    do_list         either binlog_do_db or replicate_do_db
    ignore_list     either binlog_ignore_db or replicate_ignore_db

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated                  
*/
unknown's avatar
unknown committed
1078

unknown's avatar
unknown committed
1079 1080 1081
int db_ok(const char* db, I_List<i_string> &do_list,
	  I_List<i_string> &ignore_list )
{
unknown's avatar
unknown committed
1082
  if (do_list.is_empty() && ignore_list.is_empty())
unknown's avatar
unknown committed
1083 1084
    return 1; // ok to replicate if the user puts no constraints

unknown's avatar
unknown committed
1085 1086 1087 1088 1089
  /*
    If the user has specified restrictions on which databases to replicate
    and db was not selected, do not replicate.
  */
  if (!db)
unknown's avatar
unknown committed
1090
    return 0;
unknown's avatar
unknown committed
1091

unknown's avatar
unknown committed
1092 1093 1094 1095
  if (!do_list.is_empty()) // if the do's are not empty
  {
    I_List_iterator<i_string> it(do_list);
    i_string* tmp;
unknown's avatar
unknown committed
1096

unknown's avatar
unknown committed
1097 1098 1099 1100
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 1; // match
unknown's avatar
unknown committed
1101
    }
unknown's avatar
unknown committed
1102 1103
    return 0;
  }
unknown's avatar
unknown committed
1104
  else // there are some elements in the don't, otherwise we cannot get here
unknown's avatar
unknown committed
1105 1106 1107
  {
    I_List_iterator<i_string> it(ignore_list);
    i_string* tmp;
unknown's avatar
unknown committed
1108

unknown's avatar
unknown committed
1109 1110 1111 1112
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 0; // match
unknown's avatar
unknown committed
1113
    }
unknown's avatar
unknown committed
1114 1115
    return 1;
  }
unknown's avatar
unknown committed
1116 1117
}

1118

unknown's avatar
unknown committed
1119 1120
static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
				 const char *default_val)
unknown's avatar
unknown committed
1121
{
unknown's avatar
unknown committed
1122 1123 1124 1125 1126 1127 1128
  uint length;
  if ((length=my_b_gets(f,var, max_size)))
  {
    char* last_p = var + length -1;
    if (*last_p == '\n')
      *last_p = 0; // if we stopped on newline, kill it
    else
unknown's avatar
unknown committed
1129
    {
unknown's avatar
unknown committed
1130 1131 1132 1133
      /*
	If we truncated a line or stopped on last char, remove all chars
	up to and including newline.
      */
unknown's avatar
unknown committed
1134
      int c;
unknown's avatar
unknown committed
1135
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
unknown's avatar
unknown committed
1136
    }
unknown's avatar
unknown committed
1137 1138 1139 1140
    return 0;
  }
  else if (default_val)
  {
unknown's avatar
unknown committed
1141
    strmake(var,  default_val, max_size-1);
unknown's avatar
unknown committed
1142 1143
    return 0;
  }
unknown's avatar
unknown committed
1144
  return 1;
unknown's avatar
unknown committed
1145 1146
}

1147

unknown's avatar
unknown committed
1148
static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
unknown's avatar
unknown committed
1149 1150 1151
{
  char buf[32];
  
unknown's avatar
unknown committed
1152 1153 1154 1155 1156
  if (my_b_gets(f, buf, sizeof(buf))) 
  {
    *var = atoi(buf);
    return 0;
  }
unknown's avatar
unknown committed
1157
  else if (default_val)
unknown's avatar
unknown committed
1158 1159 1160 1161
  {
    *var = default_val;
    return 0;
  }
unknown's avatar
unknown committed
1162
  return 1;
unknown's avatar
unknown committed
1163 1164
}

1165

unknown's avatar
unknown committed
1166
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi)
1167
{
1168
  const char* errmsg= 0;
1169
  
unknown's avatar
unknown committed
1170 1171 1172
  /*
    Note the following switch will bug when we have MySQL branch 30 ;)
  */
1173
  switch (*mysql->server_version) {
1174
  case '3':
unknown's avatar
unknown committed
1175 1176 1177 1178
    mi->old_format = 
      (strncmp(mysql->server_version, "3.23.57", 7) < 0) /* < .57 */ ?
      BINLOG_FORMAT_323_LESS_57 : 
      BINLOG_FORMAT_323_GEQ_57 ;
1179 1180
    break;
  case '4':
unknown's avatar
unknown committed
1181
    mi->old_format = BINLOG_FORMAT_CURRENT;
1182 1183
    break;
  default:
1184
    /* 5.0 is not supported */
unknown's avatar
unknown committed
1185
    errmsg = "Master reported an unrecognized MySQL version. Note that 4.1 \
1186
slaves can't replicate a 5.0 or newer master.";
1187
    break;
1188
  }
1189

1190 1191 1192 1193 1194 1195 1196 1197 1198 1199
  /*
    Compare the master and slave's clock. Do not die if master's clock is
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
  */
  MYSQL_RES *master_res= 0;
  MYSQL_ROW master_row;
  
  if (!mysql_real_query(mysql, "SELECT UNIX_TIMESTAMP()", 23) &&
      (master_res= mysql_store_result(mysql)) &&
      (master_row= mysql_fetch_row(master_res)))
unknown's avatar
unknown committed
1200
  {
1201 1202
    mi->clock_diff_with_master= 
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
unknown's avatar
unknown committed
1203
  }
1204
  else
unknown's avatar
unknown committed
1205
  {
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
    sql_print_error("Warning: \"SELECT UNIX_TIMESTAMP()\" failed on master, \
do not trust column Seconds_Behind_Master of SHOW SLAVE STATUS");
  }
  if (master_res)
    mysql_free_result(master_res);      
 
  /*
    Check that the master's server id and ours are different. Because if they
    are equal (which can result from a simple copy of master's datadir to slave,
    thus copying some my.cnf), replication will work but all events will be
    skipped.
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
    master?).
    Note: we could have put a @@SERVER_ID in the previous SELECT
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
  */
  if (!mysql_real_query(mysql, "SHOW VARIABLES LIKE 'SERVER_ID'", 31) &&
      (master_res= mysql_store_result(mysql)))
  {
    if ((master_row= mysql_fetch_row(master_res)) &&
        (::server_id == strtoul(master_row[1], 0, 10)) &&
        !replicate_same_server_id)
      errmsg= "The slave I/O thread stops because master and slave have equal \
MySQL server ids; these ids must be different for replication to work (or \
the --replicate-same-server-id option must be used on slave but this does \
not always make sense; please check the manual before using it).";
    mysql_free_result(master_res);
unknown's avatar
unknown committed
1234 1235
  }

1236 1237 1238
  /*
    Check that the master's global character_set_server and ours are the same.
    Not fatal if query fails (old master?).
1239 1240 1241 1242 1243 1244
    Note that we don't check for equality of global character_set_client and
    collation_connection (neither do we prevent their setting in
    set_var.cc). That's because from what I (Guilhem) have tested, the global
    values of these 2 are never used (new connections don't use them).
    We don't test equality of global collation_database either as it's is
    going to be deprecated (made read-only) in 4.1 very soon.
1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256
  */
  if (!mysql_real_query(mysql, "SELECT @@GLOBAL.COLLATION_SERVER", 32) &&
      (master_res= mysql_store_result(mysql)))
  {
    if ((master_row= mysql_fetch_row(master_res)) &&
        strcmp(master_row[0], global_system_variables.collation_server->name))
      errmsg= "The slave I/O thread stops because master and slave have \
different values for the COLLATION_SERVER global variable. The values must \
be equal for replication to work";
    mysql_free_result(master_res);
  }

1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280
  /*
    Perform analogous check for time zone. Theoretically we also should
    perform check here to verify that SYSTEM time zones are the same on
    slave and master, but we can't rely on value of @@system_time_zone
    variable (it is time zone abbreviation) since it determined at start
    time and so could differ for slave and master even if they are really
    in the same system time zone. So we are omiting this check and just
    relying on documentation. Also according to Monty there are many users
    who are using replication between servers in various time zones. Hence 
    such check will broke everything for them. (And now everything will 
    work for them because by default both their master and slave will have 
    'SYSTEM' time zone).
  */
  if (!mysql_real_query(mysql, "SELECT @@GLOBAL.TIME_ZONE", 25) &&
      (master_res= mysql_store_result(mysql)))
  {
    if ((master_row= mysql_fetch_row(master_res)) &&
        strcmp(master_row[0], 
               global_system_variables.time_zone->get_name()->ptr()))
      errmsg= "The slave I/O thread stops because master and slave have \
different values for the TIME_ZONE global variable. The values must \
be equal for replication to work";
    mysql_free_result(master_res);
  }
1281

1282 1283 1284 1285 1286
  if (errmsg)
  {
    sql_print_error(errmsg);
    return 1;
  }
1287

1288 1289 1290
  return 0;
}

1291 1292 1293 1294 1295 1296 1297 1298 1299
/*
  Used by fetch_master_table (used by LOAD TABLE tblname FROM MASTER and LOAD
  DATA FROM MASTER). Drops the table (if 'overwrite' is true) and recreates it
  from the dump. Honours replication inclusion/exclusion rules.

  RETURN VALUES
    0           success
    1           error
*/
1300

1301
static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
1302
				  const char* table_name, bool overwrite)
unknown's avatar
unknown committed
1303
{
1304
  ulong packet_len;
1305
  char *query;
1306
  char* save_db;
1307 1308
  Vio* save_vio;
  HA_CHECK_OPT check_opt;
unknown's avatar
unknown committed
1309
  TABLE_LIST tables;
1310 1311
  int error= 1;
  handler *file;
1312
  ulong save_options;
1313
  NET *net= &mysql->net;
unknown's avatar
unknown committed
1314 1315
  DBUG_ENTER("create_table_from_dump");  

1316
  packet_len= my_net_read(net); // read create table statement
1317 1318
  if (packet_len == packet_error)
  {
1319
    send_error(thd, ER_MASTER_NET_READ);
unknown's avatar
unknown committed
1320
    DBUG_RETURN(1);
1321 1322 1323
  }
  if (net->read_pos[0] == 255) // error from master
  {
1324 1325 1326 1327 1328
    char *err_msg; 
    err_msg= (char*) net->read_pos + ((mysql->server_capabilities &
				       CLIENT_PROTOCOL_41) ?
				      3+SQLSTATE_LENGTH+1 : 3);
    net_printf(thd, ER_MASTER, err_msg);
unknown's avatar
unknown committed
1329
    DBUG_RETURN(1);
1330
  }
unknown's avatar
unknown committed
1331
  thd->command = COM_TABLE_DUMP;
1332
  thd->query_length= packet_len;
1333
  /* Note that we should not set thd->query until the area is initalized */
1334
  if (!(query = thd->strmake((char*) net->read_pos, packet_len)))
1335 1336
  {
    sql_print_error("create_table_from_dump: out of memory");
1337
    net_printf(thd, ER_GET_ERRNO, "Out of memory");
unknown's avatar
unknown committed
1338
    DBUG_RETURN(1);
1339
  }
1340
  thd->query= query;
unknown's avatar
unknown committed
1341 1342
  thd->query_error = 0;
  thd->net.no_send_ok = 1;
1343

unknown's avatar
unknown committed
1344 1345 1346
  bzero((char*) &tables,sizeof(tables));
  tables.db = (char*)db;
  tables.alias= tables.real_name= (char*)table_name;
unknown's avatar
unknown committed
1347

unknown's avatar
unknown committed
1348 1349 1350 1351 1352 1353 1354 1355
  /* Drop the table if 'overwrite' is true */
  if (overwrite && mysql_rm_table(thd,&tables,1,0)) /* drop if exists */
  {
    send_error(thd);
    sql_print_error("create_table_from_dump: failed to drop the table");
    goto err;
  }

1356
  /* Create the table. We do not want to log the "create table" statement */
1357
  save_options = thd->options;
1358
  thd->options &= ~(ulong) (OPTION_BIN_LOG);
unknown's avatar
unknown committed
1359
  thd->proc_info = "Creating table from master dump";
unknown's avatar
unknown committed
1360
  // save old db in case we are creating in a different database
1361
  save_db = thd->db;
1362
  thd->db = (char*)db;
unknown's avatar
unknown committed
1363
  mysql_parse(thd, thd->query, packet_len); // run create table
1364
  thd->db = save_db;		// leave things the way the were before
1365
  thd->options = save_options;
unknown's avatar
unknown committed
1366
  
1367 1368
  if (thd->query_error)
    goto err;			// mysql_parse took care of the error send
unknown's avatar
unknown committed
1369 1370

  thd->proc_info = "Opening master dump table";
1371
  tables.lock_type = TL_WRITE;
unknown's avatar
unknown committed
1372 1373
  if (!open_ltable(thd, &tables, TL_WRITE))
  {
1374
    send_error(thd,0,0);			// Send error from open_ltable
unknown's avatar
unknown committed
1375
    sql_print_error("create_table_from_dump: could not open created table");
1376
    goto err;
unknown's avatar
unknown committed
1377
  }
unknown's avatar
unknown committed
1378
  
1379
  file = tables.table->file;
unknown's avatar
unknown committed
1380
  thd->proc_info = "Reading master dump table data";
1381
  /* Copy the data file */
unknown's avatar
unknown committed
1382 1383
  if (file->net_read_dump(net))
  {
1384
    net_printf(thd, ER_MASTER_NET_READ);
1385
    sql_print_error("create_table_from_dump: failed in\
unknown's avatar
unknown committed
1386
 handler::net_read_dump()");
1387
    goto err;
unknown's avatar
unknown committed
1388
  }
unknown's avatar
unknown committed
1389 1390

  check_opt.init();
unknown's avatar
unknown committed
1391
  check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK;
unknown's avatar
unknown committed
1392
  thd->proc_info = "Rebuilding the index on master dump table";
unknown's avatar
unknown committed
1393 1394 1395 1396 1397
  /*
    We do not want repair() to spam us with messages
    just send them to the error log, and report the failure in case of
    problems.
  */
1398
  save_vio = thd->net.vio;
unknown's avatar
unknown committed
1399
  thd->net.vio = 0;
1400
  /* Rebuild the index file from the copied data file (with REPAIR) */
1401
  error=file->repair(thd,&check_opt) != 0;
unknown's avatar
unknown committed
1402
  thd->net.vio = save_vio;
1403
  if (error)
1404
    net_printf(thd, ER_INDEX_REBUILD,tables.table->real_name);
1405 1406

err:
unknown's avatar
unknown committed
1407 1408
  close_thread_tables(thd);
  thd->net.no_send_ok = 0;
unknown's avatar
unknown committed
1409
  DBUG_RETURN(error); 
unknown's avatar
unknown committed
1410 1411
}

1412

1413
int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
1414
		       MASTER_INFO *mi, MYSQL *mysql, bool overwrite)
unknown's avatar
unknown committed
1415
{
1416 1417 1418 1419 1420 1421
  int error= 1;
  const char *errmsg=0;
  bool called_connected= (mysql != NULL);
  DBUG_ENTER("fetch_master_table");
  DBUG_PRINT("enter", ("db_name: '%s'  table_name: '%s'",
		       db_name,table_name));
unknown's avatar
unknown committed
1422

unknown's avatar
merge  
unknown committed
1423
  if (!called_connected)
1424
  { 
unknown's avatar
SCRUM  
unknown committed
1425
    if (!(mysql = mysql_init(NULL)))
1426
    {
1427
      send_error(thd);			// EOM
1428 1429
      DBUG_RETURN(1);
    }
unknown's avatar
merge  
unknown committed
1430
    if (connect_to_master(thd, mysql, mi))
1431
    {
unknown's avatar
SCRUM  
unknown committed
1432 1433
      net_printf(thd, ER_CONNECT_TO_MASTER, mysql_error(mysql));
      mysql_close(mysql);
1434
      DBUG_RETURN(1);
1435
    }
1436 1437
    if (thd->killed)
      goto err;
1438
  }
unknown's avatar
unknown committed
1439

unknown's avatar
unknown committed
1440
  if (request_table_dump(mysql, db_name, table_name))
1441
  {
1442 1443
    error= ER_UNKNOWN_ERROR;
    errmsg= "Failed on table dump request";
1444 1445
    goto err;
  }
unknown's avatar
unknown committed
1446 1447 1448
  if (create_table_from_dump(thd, mysql, db_name,
			     table_name, overwrite))
    goto err;    // create_table_from_dump have sent the error already
unknown's avatar
unknown committed
1449
  error = 0;
1450

unknown's avatar
unknown committed
1451
 err:
1452
  thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
1453
  if (!called_connected)
unknown's avatar
SCRUM  
unknown committed
1454
    mysql_close(mysql);
1455
  if (errmsg && thd->vio_ok())
1456
    send_error(thd, error, errmsg);
1457
  DBUG_RETURN(test(error));			// Return 1 on error
unknown's avatar
unknown committed
1458 1459
}

1460

1461 1462
void end_master_info(MASTER_INFO* mi)
{
1463 1464
  DBUG_ENTER("end_master_info");

1465
  if (!mi->inited)
1466
    DBUG_VOID_RETURN;
1467 1468
  end_relay_log_info(&mi->rli);
  if (mi->fd >= 0)
1469 1470 1471 1472 1473
  {
    end_io_cache(&mi->file);
    (void)my_close(mi->fd, MYF(MY_WME));
    mi->fd = -1;
  }
1474
  mi->inited = 0;
1475 1476

  DBUG_VOID_RETURN;
1477 1478
}

1479

1480 1481 1482 1483 1484 1485
int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
{
  char fname[FN_REFLEN+128];
  int info_fd;
  const char* msg = 0;
  int error = 0;
1486
  DBUG_ENTER("init_relay_log_info");
unknown's avatar
unknown committed
1487

1488
  if (rli->inited)				// Set if this function called
unknown's avatar
unknown committed
1489 1490
    DBUG_RETURN(0);
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
1491 1492 1493 1494
  pthread_mutex_lock(&rli->data_lock);
  info_fd = rli->info_fd;
  rli->cur_log_fd = -1;
  rli->slave_skip_counter=0;
1495
  rli->abort_pos_wait=0;
1496 1497
  rli->log_space_limit= relay_log_space_limit;
  rli->log_space_total= 0;
1498

1499 1500 1501 1502
  // TODO: make this work with multi-master
  if (!opt_relay_logname)
  {
    char tmp[FN_REFLEN];
unknown's avatar
unknown committed
1503 1504 1505
    /*
      TODO: The following should be using fn_format();  We just need to
      first change fn_format() to cut the file name if it's too long.
1506 1507 1508 1509 1510
    */
    strmake(tmp,glob_hostname,FN_REFLEN-5);
    strmov(strcend(tmp,'.'),"-relay-bin");
    opt_relay_logname=my_strdup(tmp,MYF(MY_WME));
  }
1511 1512

  /*
unknown's avatar
unknown committed
1513 1514 1515
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
    Note that the I/O thread flushes it to disk after writing every event, in
    flush_master_info(mi, 1).
1516 1517
  */

unknown's avatar
unknown committed
1518 1519 1520 1521 1522 1523 1524 1525 1526 1527
  /*
    For the maximum log size, we choose max_relay_log_size if it is
    non-zero, max_binlog_size otherwise. If later the user does SET
    GLOBAL on one of these variables, fix_max_binlog_size and
    fix_max_relay_log_size will reconsider the choice (for example
    if the user changes max_relay_log_size to zero, we have to
    switch to using max_binlog_size for the relay log) and update
    rli->relay_log.max_size (and mysql_bin_log.max_size).
  */

1528 1529 1530
  if (open_log(&rli->relay_log, glob_hostname, opt_relay_logname,
	       "-relay-bin", opt_relaylog_index_name,
	       LOG_BIN, 1 /* read_append cache */,
1531 1532
	       1 /* no auto events */,
               max_relay_log_size ? max_relay_log_size : max_binlog_size))
1533
  {
1534
    pthread_mutex_unlock(&rli->data_lock);
1535
    sql_print_error("Failed in open_log() called from init_relay_log_info()");
1536
    DBUG_RETURN(1);
1537
  }
1538

1539
  /* if file does not exist */
unknown's avatar
unknown committed
1540
  if (access(fname,F_OK))
1541
  {
unknown's avatar
unknown committed
1542 1543 1544 1545
    /*
      If someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
1546 1547
    if (info_fd >= 0)
      my_close(info_fd, MYF(MY_WME));
1548
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
1549
    {
1550 1551 1552 1553 1554 1555 1556
      sql_print_error("Failed to create a new relay log info file (\
file '%s', errno %d)", fname, my_errno);
      msg= current_thd->net.last_error;
      goto err;
    }
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
		      MYF(MY_WME))) 
1557
    {
1558 1559
      sql_print_error("Failed to create a cache on relay log info file '%s'",
		      fname);
1560 1561
      msg= current_thd->net.last_error;
      goto err;
1562
    }
1563 1564 1565

    /* Init relay log with first entry in the relay index file */
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
unknown's avatar
unknown committed
1566
			   &msg))
1567
    {
1568
      sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
1569
      goto err;
1570
    }
1571 1572
    rli->group_master_log_name[0]= 0;
    rli->group_master_log_pos= 0;		
1573
    rli->info_fd= info_fd;
1574 1575 1576
  }
  else // file exists
  {
unknown's avatar
unknown committed
1577
    if (info_fd >= 0)
1578
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
1579
    else 
1580
    {
1581 1582 1583
      int error=0;
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
      {
1584 1585 1586
        sql_print_error("\
Failed to open the existing relay log info file '%s' (errno %d)",
			fname, my_errno);
1587 1588 1589 1590 1591
        error= 1;
      }
      else if (init_io_cache(&rli->info_file, info_fd,
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
      {
1592 1593
        sql_print_error("Failed to create a cache on relay log info file '%s'",
			fname);
1594 1595 1596 1597 1598 1599 1600
        error= 1;
      }
      if (error)
      {
        if (info_fd >= 0)
          my_close(info_fd, MYF(0));
        rli->info_fd= -1;
1601
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1602 1603 1604
        pthread_mutex_unlock(&rli->data_lock);
        DBUG_RETURN(1);
      }
1605
    }
1606
         
1607
    rli->info_fd = info_fd;
1608
    int relay_log_pos, master_log_pos;
1609
    if (init_strvar_from_file(rli->group_relay_log_name,
unknown's avatar
unknown committed
1610 1611
			      sizeof(rli->group_relay_log_name),
                              &rli->info_file, "") ||
1612
       init_intvar_from_file(&relay_log_pos,
unknown's avatar
unknown committed
1613
			     &rli->info_file, BIN_LOG_HEADER_SIZE) ||
1614
       init_strvar_from_file(rli->group_master_log_name,
unknown's avatar
unknown committed
1615 1616
			     sizeof(rli->group_master_log_name),
                             &rli->info_file, "") ||
1617
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
1618 1619 1620 1621
    {
      msg="Error reading slave log configuration";
      goto err;
    }
1622 1623 1624 1625
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
            sizeof(rli->event_relay_log_name)-1);
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
    rli->group_master_log_pos= master_log_pos;
1626

1627
    if (init_relay_log_pos(rli,
1628 1629
			   rli->group_relay_log_name,
			   rli->group_relay_log_pos,
1630 1631
			   0 /* no data lock*/,
			   &msg))
1632 1633
    {
      char llbuf[22];
1634
      sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
unknown's avatar
unknown committed
1635 1636
		      rli->group_relay_log_name,
		      llstr(rli->group_relay_log_pos, llbuf));
1637
      goto err;
1638
    }
1639
  }
1640 1641
  DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
  DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
unknown's avatar
unknown committed
1642 1643 1644 1645
  /*
    Now change the cache from READ to WRITE - must do this
    before flush_relay_log_info
  */
1646
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
1647 1648
  if ((error= flush_relay_log_info(rli)))
    sql_print_error("Failed to flush relay log info file");
unknown's avatar
unknown committed
1649 1650 1651 1652 1653
  if (count_relay_log_space(rli))
  {
    msg="Error counting relay log space";
    goto err;
  }
1654
  rli->inited= 1;
1655
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1656
  DBUG_RETURN(error);
1657 1658 1659 1660

err:
  sql_print_error(msg);
  end_io_cache(&rli->info_file);
1661 1662
  if (info_fd >= 0)
    my_close(info_fd, MYF(0));
unknown's avatar
unknown committed
1663
  rli->info_fd= -1;
1664
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1665
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1666
  DBUG_RETURN(1);
1667 1668
}

1669

unknown's avatar
unknown committed
1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683
static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo)
{
  MY_STAT s;
  DBUG_ENTER("add_relay_log");
  if (!my_stat(linfo->log_file_name,&s,MYF(0)))
  {
    sql_print_error("log %s listed in the index, but failed to stat",
		    linfo->log_file_name);
    DBUG_RETURN(1);
  }
  rli->log_space_total += s.st_size;
#ifndef DBUG_OFF
  char buf[22];
  DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
1684
#endif  
unknown's avatar
unknown committed
1685 1686 1687
  DBUG_RETURN(0);
}

1688

unknown's avatar
unknown committed
1689 1690
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
{
1691
  bool slave_killed=0;
unknown's avatar
unknown committed
1692
  MASTER_INFO* mi = rli->mi;
unknown's avatar
unknown committed
1693
  const char *save_proc_info;
unknown's avatar
unknown committed
1694
  THD* thd = mi->io_thd;
1695

unknown's avatar
unknown committed
1696
  DBUG_ENTER("wait_for_relay_log_space");
1697

unknown's avatar
unknown committed
1698
  pthread_mutex_lock(&rli->log_space_lock);
unknown's avatar
unknown committed
1699 1700
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
				  &rli->log_space_lock, 
unknown's avatar
unknown committed
1701
				  "\
1702
Waiting for the slave SQL thread to free enough relay log space");
unknown's avatar
unknown committed
1703
  while (rli->log_space_limit < rli->log_space_total &&
1704 1705
	 !(slave_killed=io_slave_killed(thd,mi)) &&
         !rli->ignore_log_space_limit)
unknown's avatar
unknown committed
1706
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
unknown's avatar
unknown committed
1707
  thd->exit_cond(save_proc_info);
unknown's avatar
unknown committed
1708 1709 1710 1711
  pthread_mutex_unlock(&rli->log_space_lock);
  DBUG_RETURN(slave_killed);
}

unknown's avatar
unknown committed
1712

unknown's avatar
unknown committed
1713 1714 1715 1716
static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
  LOG_INFO linfo;
  DBUG_ENTER("count_relay_log_space");
1717
  rli->log_space_total= 0;
1718
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
unknown's avatar
unknown committed
1719 1720 1721 1722
  {
    sql_print_error("Could not find first log while counting relay log space");
    DBUG_RETURN(1);
  }
unknown's avatar
unknown committed
1723
  do
unknown's avatar
unknown committed
1724 1725 1726
  {
    if (add_relay_log(rli,&linfo))
      DBUG_RETURN(1);
1727
  } while (!rli->relay_log.find_next_log(&linfo, 1));
1728 1729 1730 1731 1732 1733
  /* 
     As we have counted everything, including what may have written in a
     preceding write, we must reset bytes_written, or we may count some space 
     twice.
  */
  rli->relay_log.reset_bytes_written();
unknown's avatar
unknown committed
1734 1735
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1736

1737

unknown's avatar
unknown committed
1738 1739 1740 1741 1742 1743 1744 1745 1746 1747
void init_master_info_with_options(MASTER_INFO* mi)
{
  mi->master_log_name[0] = 0;
  mi->master_log_pos = BIN_LOG_HEADER_SIZE;		// skip magic number
  
  if (master_host)
    strmake(mi->host, master_host, sizeof(mi->host) - 1);
  if (master_user)
    strmake(mi->user, master_user, sizeof(mi->user) - 1);
  if (master_password)
unknown's avatar
unknown committed
1748
    strmake(mi->password, master_password, MAX_PASSWORD_LENGTH);
unknown's avatar
unknown committed
1749 1750
  mi->port = master_port;
  mi->connect_retry = master_connect_retry;
unknown's avatar
unknown committed
1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762
  
  mi->ssl= master_ssl;
  if (master_ssl_ca)
    strmake(mi->ssl_ca, master_ssl_ca, sizeof(mi->ssl_ca)-1);
  if (master_ssl_capath)
    strmake(mi->ssl_capath, master_ssl_capath, sizeof(mi->ssl_capath)-1);
  if (master_ssl_cert)
    strmake(mi->ssl_cert, master_ssl_cert, sizeof(mi->ssl_cert)-1);
  if (master_ssl_cipher)
    strmake(mi->ssl_cipher, master_ssl_cipher, sizeof(mi->ssl_cipher)-1);
  if (master_ssl_key)
    strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1);
unknown's avatar
unknown committed
1763 1764
}

unknown's avatar
unknown committed
1765
static void clear_slave_error(RELAY_LOG_INFO* rli)
unknown's avatar
unknown committed
1766
{
unknown's avatar
unknown committed
1767 1768 1769
  /* Clear the errors displayed by SHOW SLAVE STATUS */
  rli->last_slave_error[0]= 0;
  rli->last_slave_errno= 0;
unknown's avatar
unknown committed
1770
}
1771

unknown's avatar
unknown committed
1772 1773 1774 1775 1776
void clear_slave_error_timestamp(RELAY_LOG_INFO* rli)
{
  rli->last_master_timestamp= 0;
  clear_slave_error(rli);
}
unknown's avatar
unknown committed
1777

1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791
/*
    Reset UNTIL condition for RELAY_LOG_INFO
   SYNOPSYS
    clear_until_condition()
      rli - RELAY_LOG_INFO structure where UNTIL condition should be reset
 */
void clear_until_condition(RELAY_LOG_INFO* rli)
{
  rli->until_condition= RELAY_LOG_INFO::UNTIL_NONE;
  rli->until_log_name[0]= 0;
  rli->until_log_pos= 0;
}


unknown's avatar
unknown committed
1792
#define LINES_IN_MASTER_INFO_WITH_SSL 14
1793

unknown's avatar
unknown committed
1794

1795
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
1796 1797
		     const char* slave_info_fname,
		     bool abort_if_no_master_info_file)
unknown's avatar
unknown committed
1798
{
unknown's avatar
unknown committed
1799 1800 1801 1802
  int fd,error;
  char fname[FN_REFLEN+128];
  DBUG_ENTER("init_master_info");

unknown's avatar
unknown committed
1803
  if (mi->inited)
unknown's avatar
unknown committed
1804 1805 1806 1807 1808 1809 1810 1811 1812
  {
    /*
      We have to reset read position of relay-log-bin as we may have
      already been reading from 'hotlog' when the slave was stopped
      last time. If this case pos_in_file would be set and we would
      get a crash when trying to read the signature for the binary
      relay log.
    */
    my_b_seek(mi->rli.cur_log, (my_off_t) 0);
unknown's avatar
unknown committed
1813
    DBUG_RETURN(0);
unknown's avatar
unknown committed
1814 1815
  }

unknown's avatar
unknown committed
1816 1817
  mi->mysql=0;
  mi->file_id=1;
1818
  fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
unknown's avatar
unknown committed
1819

unknown's avatar
unknown committed
1820 1821 1822 1823
  /*
    We need a mutex while we are changing master info parameters to
    keep other threads from reading bogus info
  */
unknown's avatar
unknown committed
1824

1825
  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
1826
  fd = mi->fd;
1827 1828

  /* does master.info exist ? */
unknown's avatar
unknown committed
1829
  
1830
  if (access(fname,F_OK))
unknown's avatar
unknown committed
1831
  {
1832 1833 1834 1835 1836
    if (abort_if_no_master_info_file)
    {
      pthread_mutex_unlock(&mi->data_lock);
      DBUG_RETURN(0);
    }
unknown's avatar
unknown committed
1837 1838 1839 1840
    /*
      if someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
unknown's avatar
unknown committed
1841 1842
    if (fd >= 0)
      my_close(fd, MYF(MY_WME));
1843 1844 1845 1846 1847 1848 1849
    if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
    {
      sql_print_error("Failed to create a new master info file (\
file '%s', errno %d)", fname, my_errno);
      goto err;
    }
    if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
unknown's avatar
unknown committed
1850
		      MYF(MY_WME)))
1851 1852 1853
    {
      sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
unknown's avatar
unknown committed
1854
      goto err;
1855
    }
unknown's avatar
unknown committed
1856

unknown's avatar
unknown committed
1857
    mi->fd = fd;
unknown's avatar
unknown committed
1858 1859
    init_master_info_with_options(mi);

unknown's avatar
unknown committed
1860
  }
1861
  else // file exists
unknown's avatar
unknown committed
1862
  {
unknown's avatar
unknown committed
1863
    if (fd >= 0)
unknown's avatar
unknown committed
1864
      reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880
    else 
    {
      if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
      {
        sql_print_error("Failed to open the existing master info file (\
file '%s', errno %d)", fname, my_errno);
        goto err;
      }
      if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
                        0, MYF(MY_WME)))
      {
        sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
        goto err;
      }
    }
unknown's avatar
unknown committed
1881

unknown's avatar
unknown committed
1882
    mi->fd = fd;
unknown's avatar
unknown committed
1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908
    int port, connect_retry, master_log_pos, ssl= 0, lines;
    char *first_non_digit;
    
    /*
       Starting from 4.1.x master.info has new format. Now its
       first line contains number of lines in file. By reading this 
       number we will be always distinguish to which version our 
       master.info corresponds to. We can't simply count lines in 
       file since versions before 4.1.x could generate files with more
       lines than needed.
       If first line doesn't contain a number or contain number less than 
       14 then such file is treated like file from pre 4.1.1 version.
       There is no ambiguity when reading an old master.info, as before 
       4.1.1, the first line contained the binlog's name, which is either
       empty or has an extension (contains a '.'), so can't be confused 
       with an integer.

       So we're just reading first line and trying to figure which version 
       is this.
    */
    
    /* 
       The first row is temporarily stored in mi->master_log_name, 
       if it is line count and not binlog name (new format) it will be 
       overwritten by the second row later.
    */
1909
    if (init_strvar_from_file(mi->master_log_name,
unknown's avatar
unknown committed
1910
			      sizeof(mi->master_log_name), &mi->file,
unknown's avatar
unknown committed
1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926
			      ""))
      goto errwithmsg;
    
    lines= strtoul(mi->master_log_name, &first_non_digit, 10);

    if (mi->master_log_name[0]!='\0' && 
        *first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL)
    {                                          // Seems to be new format
      if (init_strvar_from_file(mi->master_log_name,     
            sizeof(mi->master_log_name), &mi->file, ""))
        goto errwithmsg;
    }
    else
      lines= 7;
    
    if (init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
unknown's avatar
unknown committed
1927 1928 1929 1930
	init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
			      master_host) ||
	init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
			      master_user) || 
1931 1932
        init_strvar_from_file(mi->password, SCRAMBLED_PASSWORD_CHAR_LENGTH+1,
                              &mi->file, master_password) ||
1933 1934
	init_intvar_from_file(&port, &mi->file, master_port) ||
	init_intvar_from_file(&connect_retry, &mi->file,
unknown's avatar
unknown committed
1935
			      master_connect_retry))
unknown's avatar
unknown committed
1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963
      goto errwithmsg;

    /* 
       If file has ssl part use it even if we have server without 
       SSL support. But these option will be ignored later when 
       slave will try connect to master, so in this case warning 
       is printed.
     */
    if (lines >= LINES_IN_MASTER_INFO_WITH_SSL && 
        (init_intvar_from_file(&ssl, &mi->file, master_ssl) ||
         init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca), 
                               &mi->file, master_ssl_ca) ||
         init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath), 
                               &mi->file, master_ssl_capath) ||
         init_strvar_from_file(mi->ssl_cert, sizeof(mi->ssl_cert),
                               &mi->file, master_ssl_cert) ||
         init_strvar_from_file(mi->ssl_cipher, sizeof(mi->ssl_cipher),
                               &mi->file, master_ssl_cipher) ||
         init_strvar_from_file(mi->ssl_key, sizeof(mi->ssl_key),
                              &mi->file, master_ssl_key)))
      goto errwithmsg;
#ifndef HAVE_OPENSSL
    if (ssl)
      sql_print_error("SSL information in the master info file "
                      "('%s') are ignored because this MySQL slave was compiled "
                      "without SSL support.", fname);
#endif /* HAVE_OPENSSL */
    
1964 1965 1966 1967 1968 1969 1970
    /*
      This has to be handled here as init_intvar_from_file can't handle
      my_off_t types
    */
    mi->master_log_pos= (my_off_t) master_log_pos;
    mi->port= (uint) port;
    mi->connect_retry= (uint) connect_retry;
unknown's avatar
unknown committed
1971
    mi->ssl= (my_bool) ssl;
unknown's avatar
unknown committed
1972
  }
1973 1974 1975
  DBUG_PRINT("master_info",("log_file_name: %s  position: %ld",
			    mi->master_log_name,
			    (ulong) mi->master_log_pos));
1976

1977
  mi->rli.mi = mi;
1978 1979 1980
  if (init_relay_log_info(&mi->rli, slave_info_fname))
    goto err;

unknown's avatar
unknown committed
1981
  mi->inited = 1;
unknown's avatar
unknown committed
1982
  // now change cache READ -> WRITE - must do this before flush_master_info
1983
  reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
unknown's avatar
unknown committed
1984
  if ((error=test(flush_master_info(mi, 1))))
1985
    sql_print_error("Failed to flush master info file");
1986
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
1987
  DBUG_RETURN(error);
unknown's avatar
unknown committed
1988 1989 1990 1991
  
errwithmsg:
  sql_print_error("Error reading master configuration");
  
1992
err:
unknown's avatar
unknown committed
1993 1994 1995 1996 1997 1998
  if (fd >= 0)
  {
    my_close(fd, MYF(0));
    end_io_cache(&mi->file);
  }
  mi->fd= -1;
1999
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2000
  DBUG_RETURN(1);
unknown's avatar
unknown committed
2001 2002
}

2003

2004 2005
int register_slave_on_master(MYSQL* mysql)
{
2006 2007
  char buf[1024], *pos= buf;
  uint report_host_len, report_user_len=0, report_password_len=0;
2008

2009
  if (!report_host)
2010
    return 0;
2011
  report_host_len= strlen(report_host);
2012
  if (report_user)
2013
    report_user_len= strlen(report_user);
unknown's avatar
unknown committed
2014
  if (report_password)
2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029
    report_password_len= strlen(report_password);
  /* 30 is a good safety margin */
  if (report_host_len + report_user_len + report_password_len + 30 >
      sizeof(buf))
    return 0;					// safety

  int4store(pos, server_id); pos+= 4;
  pos= net_store_data(pos, report_host, report_host_len); 
  pos= net_store_data(pos, report_user, report_user_len);
  pos= net_store_data(pos, report_password, report_password_len);
  int2store(pos, (uint16) report_port); pos+= 2;
  int4store(pos, rpl_recovery_rank);	pos+= 4;
  /* The master will fill in master_id */
  int4store(pos, 0);			pos+= 4;

unknown's avatar
SCRUM  
unknown committed
2030
  if (simple_command(mysql, COM_REGISTER_SLAVE, (char*) buf,
2031
			(uint) (pos- buf), 0))
2032
  {
2033
    sql_print_error("Error on COM_REGISTER_SLAVE: %d '%s'",
unknown's avatar
SCRUM  
unknown committed
2034 2035
		    mysql_errno(mysql),
		    mysql_error(mysql));
2036 2037 2038 2039 2040
    return 1;
  }
  return 0;
}

2041

2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083
/*
  Builds a String from a HASH of TABLE_RULE_ENT. Cannot be used for any other 
  hash, as it assumes that the hash entries are TABLE_RULE_ENT.

  SYNOPSIS
    table_rule_ent_hash_to_str()
    s               pointer to the String to fill
    h               pointer to the HASH to read

  RETURN VALUES
    none
*/

void table_rule_ent_hash_to_str(String* s, HASH* h)
{
  s->length(0);
  for (uint i=0 ; i < h->records ; i++)
  {
    TABLE_RULE_ENT* e= (TABLE_RULE_ENT*) hash_element(h, i);
    if (s->length())
      s->append(',');
    s->append(e->db,e->key_len);
  }
}

/*
  Mostly the same thing as above
*/

void table_rule_ent_dynamic_array_to_str(String* s, DYNAMIC_ARRAY* a)
{
  s->length(0);
  for (uint i=0 ; i < a->elements ; i++)
  {
    TABLE_RULE_ENT* e;
    get_dynamic(a, (gptr)&e, i);
    if (s->length())
      s->append(',');
    s->append(e->db,e->key_len);
  }
}

2084
int show_master_info(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
2085
{
2086
  // TODO: fix this for multi-master
unknown's avatar
unknown committed
2087
  List<Item> field_list;
2088 2089 2090
  Protocol *protocol= thd->protocol;
  DBUG_ENTER("show_master_info");

unknown's avatar
unknown committed
2091 2092
  field_list.push_back(new Item_empty_string("Slave_IO_State",
						     14));
unknown's avatar
unknown committed
2093
  field_list.push_back(new Item_empty_string("Master_Host",
2094
						     sizeof(mi->host)));
unknown's avatar
unknown committed
2095
  field_list.push_back(new Item_empty_string("Master_User",
2096
						     sizeof(mi->user)));
2097 2098
  field_list.push_back(new Item_return_int("Master_Port", 7,
					   MYSQL_TYPE_LONG));
2099
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
2100
					   MYSQL_TYPE_LONG));
2101
  field_list.push_back(new Item_empty_string("Master_Log_File",
2102 2103 2104
					     FN_REFLEN));
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
					   MYSQL_TYPE_LONGLONG));
2105
  field_list.push_back(new Item_empty_string("Relay_Log_File",
2106 2107 2108
					     FN_REFLEN));
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
					   MYSQL_TYPE_LONGLONG));
2109
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
2110
					     FN_REFLEN));
2111 2112
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
2113 2114 2115 2116 2117 2118
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
2119
					     28));
2120 2121 2122
  field_list.push_back(new Item_return_int("Last_Errno", 4, MYSQL_TYPE_LONG));
  field_list.push_back(new Item_empty_string("Last_Error", 20));
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
2123
					   MYSQL_TYPE_LONG));
2124
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
2125
					   MYSQL_TYPE_LONGLONG));
2126
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
2127
					   MYSQL_TYPE_LONGLONG));
2128
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
2129
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
2130
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10, 
2131
                                           MYSQL_TYPE_LONGLONG));
unknown's avatar
unknown committed
2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142
  field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
  field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
                                             sizeof(mi->ssl_ca)));
  field_list.push_back(new Item_empty_string("Master_SSL_CA_Path", 
                                             sizeof(mi->ssl_capath)));
  field_list.push_back(new Item_empty_string("Master_SSL_Cert", 
                                             sizeof(mi->ssl_cert)));
  field_list.push_back(new Item_empty_string("Master_SSL_Cipher", 
                                             sizeof(mi->ssl_cipher)));
  field_list.push_back(new Item_empty_string("Master_SSL_Key", 
                                             sizeof(mi->ssl_key)));
2143
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
unknown's avatar
unknown committed
2144
                                           MYSQL_TYPE_LONGLONG));
unknown's avatar
unknown committed
2145
  
2146
  if (protocol->send_fields(&field_list, 1))
unknown's avatar
unknown committed
2147 2148
    DBUG_RETURN(-1);

2149 2150
  if (mi->host[0])
  {
2151
    DBUG_PRINT("info",("host is set: '%s'", mi->host));
2152
    String *packet= &thd->packet;
2153
    protocol->prepare_for_resend();
unknown's avatar
unknown committed
2154
  
2155 2156
    pthread_mutex_lock(&mi->data_lock);
    pthread_mutex_lock(&mi->rli.data_lock);
unknown's avatar
unknown committed
2157 2158

    protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
2159 2160
    protocol->store(mi->host, &my_charset_bin);
    protocol->store(mi->user, &my_charset_bin);
2161 2162
    protocol->store((uint32) mi->port);
    protocol->store((uint32) mi->connect_retry);
2163
    protocol->store(mi->master_log_name, &my_charset_bin);
2164
    protocol->store((ulonglong) mi->master_log_pos);
2165
    protocol->store(mi->rli.group_relay_log_name +
2166 2167
		    dirname_length(mi->rli.group_relay_log_name),
		    &my_charset_bin);
2168 2169
    protocol->store((ulonglong) mi->rli.group_relay_log_pos);
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
2170 2171
    protocol->store(mi->slave_running ? "Yes":"No", &my_charset_bin);
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
2172 2173
    protocol->store(&replicate_do_db);
    protocol->store(&replicate_ignore_db);
2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190
    /*
      We can't directly use some protocol->store for 
      replicate_*_table,
      as Protocol doesn't know the TABLE_RULE_ENT struct.
      We first build Strings and then pass them to protocol->store.
    */
    char buf[256];
    String tmp(buf, sizeof(buf), &my_charset_bin);
    table_rule_ent_hash_to_str(&tmp, &replicate_do_table);
    protocol->store(&tmp);
    table_rule_ent_hash_to_str(&tmp, &replicate_ignore_table);
    protocol->store(&tmp);
    table_rule_ent_dynamic_array_to_str(&tmp, &replicate_wild_do_table);
    protocol->store(&tmp);
    table_rule_ent_dynamic_array_to_str(&tmp, &replicate_wild_ignore_table);
    protocol->store(&tmp);

2191
    protocol->store((uint32) mi->rli.last_slave_errno);
2192
    protocol->store(mi->rli.last_slave_error, &my_charset_bin);
2193
    protocol->store((uint32) mi->rli.slave_skip_counter);
2194
    protocol->store((ulonglong) mi->rli.group_master_log_pos);
2195
    protocol->store((ulonglong) mi->rli.log_space_total);
2196 2197 2198 2199 2200 2201 2202 2203

    protocol->store(
      mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_NONE ? "None": 
        ( mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_MASTER_POS? "Master":
          "Relay"), &my_charset_bin);
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
    protocol->store((ulonglong) mi->rli.until_log_pos);
    
unknown's avatar
unknown committed
2204 2205 2206 2207 2208 2209 2210 2211 2212 2213
#ifdef HAVE_OPENSSL 
    protocol->store(mi->ssl? "Yes":"No", &my_charset_bin);
#else
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
#endif
    protocol->store(mi->ssl_ca, &my_charset_bin);
    protocol->store(mi->ssl_capath, &my_charset_bin);
    protocol->store(mi->ssl_cert, &my_charset_bin);
    protocol->store(mi->ssl_cipher, &my_charset_bin);
    protocol->store(mi->ssl_key, &my_charset_bin);
unknown's avatar
unknown committed
2214 2215

    if (mi->rli.last_master_timestamp)
2216 2217 2218 2219 2220
    {
      long tmp= (long)((time_t)time((time_t*) 0)
                               - mi->rli.last_master_timestamp)
        - mi->clock_diff_with_master;
      /*
2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235
        Apparently on some systems tmp can be <0. Here are possible reasons
        related to MySQL:
        - the master is itself a slave of another master whose time is ahead.
        - somebody used an explicit SET TIMESTAMP on the master.
        Possible reason related to granularity-to-second of time functions
        (nothing to do with MySQL), which can explain a value of -1:
        assume the master's and slave's time are perfectly synchronized, and
        that at slave's connection time, when the master's timestamp is read,
        it is at the very end of second 1, and (a very short time later) when
        the slave's timestamp is read it is at the very beginning of second
        2. Then the recorded value for master is 1 and the recorded value for
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
        between timestamp of slave and rli->last_master_timestamp is 0
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
        This confuses users, so we don't go below 0.
2236 2237 2238
      */
      protocol->store((longlong)(max(0, tmp)));
    }
unknown's avatar
unknown committed
2239 2240 2241
    else
      protocol->store_null();

2242 2243
    pthread_mutex_unlock(&mi->rli.data_lock);
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2244
  
2245 2246 2247
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
      DBUG_RETURN(-1);
  }
2248
  send_eof(thd);
unknown's avatar
unknown committed
2249 2250 2251
  DBUG_RETURN(0);
}

2252

unknown's avatar
unknown committed
2253
bool flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache)
unknown's avatar
unknown committed
2254
{
unknown's avatar
unknown committed
2255
  IO_CACHE* file = &mi->file;
unknown's avatar
unknown committed
2256
  char lbuf[22];
2257 2258 2259
  DBUG_ENTER("flush_master_info");
  DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));

2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283
  /*
    Flush the relay log to disk. If we don't do it, then the relay log while
    have some part (its last kilobytes) in memory only, so if the slave server
    dies now, with, say, from master's position 100 to 150 in memory only (not
    on disk), and with position 150 in master.info, then when the slave
    restarts, the I/O thread will fetch binlogs from 150, so in the relay log
    we will have "[0, 100] U [150, infinity[" and nobody will notice it, so the
    SQL thread will jump from 100 to 150, and replication will silently break.

    When we come to this place in code, relay log may or not be initialized;
    the caller is responsible for setting 'flush_relay_log_cache' accordingly.
  */
  if (flush_relay_log_cache)
    flush_io_cache(mi->rli.relay_log.get_log_file());

  /*
    We flushed the relay log BEFORE the master.info file, because if we crash
    now, we will get a duplicate event in the relay log at restart. If we
    flushed in the other order, we would get a hole in the relay log.
    And duplicate is better than hole (with a duplicate, in later versions we
    can add detection and scrap one event; with a hole there's nothing we can
    do).
  */

unknown's avatar
unknown committed
2284 2285 2286 2287 2288 2289 2290 2291
  /*
     In certain cases this code may create master.info files that seems 
     corrupted, because of extra lines filled with garbage in the end 
     file (this happens if new contents take less space than previous 
     contents of file). But because of number of lines in the first line 
     of file we don't care about this garbage.
  */
  
unknown's avatar
unknown committed
2292
  my_b_seek(file, 0L);
unknown's avatar
unknown committed
2293 2294 2295
  my_b_printf(file, "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n",
	      LINES_IN_MASTER_INFO_WITH_SSL,
              mi->master_log_name, llstr(mi->master_log_pos, lbuf),
2296
	      mi->host, mi->user,
unknown's avatar
unknown committed
2297 2298 2299
	      mi->password, mi->port, mi->connect_retry,
              (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
              mi->ssl_cipher, mi->ssl_key);
unknown's avatar
unknown committed
2300
  flush_io_cache(file);
2301
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2302 2303
}

2304

unknown's avatar
unknown committed
2305
st_relay_log_info::st_relay_log_info()
2306 2307
  :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
   cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
unknown's avatar
unknown committed
2308 2309 2310 2311
   ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0),
   abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0),
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
   until_log_pos(0)
2312
{
unknown's avatar
unknown committed
2313 2314
  group_relay_log_name[0]= event_relay_log_name[0]=
    group_master_log_name[0]= 0;
2315 2316
  last_slave_error[0]=0; until_log_name[0]= 0;

unknown's avatar
unknown committed
2317 2318
  bzero((char*) &info_file, sizeof(info_file));
  bzero((char*) &cache_buf, sizeof(cache_buf));
unknown's avatar
unknown committed
2319 2320 2321 2322 2323 2324 2325
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
  pthread_cond_init(&data_cond, NULL);
  pthread_cond_init(&start_cond, NULL);
  pthread_cond_init(&stop_cond, NULL);
  pthread_cond_init(&log_space_cond, NULL);
unknown's avatar
unknown committed
2326
  relay_log.init_pthread_objects();
unknown's avatar
unknown committed
2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340
}


st_relay_log_info::~st_relay_log_info()
{
  pthread_mutex_destroy(&run_lock);
  pthread_mutex_destroy(&data_lock);
  pthread_mutex_destroy(&log_space_lock);
  pthread_cond_destroy(&data_cond);
  pthread_cond_destroy(&start_cond);
  pthread_cond_destroy(&stop_cond);
  pthread_cond_destroy(&log_space_cond);
}

2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364
/*
  Waits until the SQL thread reaches (has executed up to) the
  log/position or timed out.

  SYNOPSIS
    wait_for_pos()
    thd             client thread that sent SELECT MASTER_POS_WAIT
    log_name        log name to wait for
    log_pos         position to wait for 
    timeout         timeout in seconds before giving up waiting

  NOTES
    timeout is longlong whereas it should be ulong ; but this is
    to catch if the user submitted a negative timeout.

  RETURN VALUES
    -2          improper arguments (log_pos<0)
                or slave not running, or master info changed
                during the function's execution,
                or client thread killed. -2 is translated to NULL by caller
    -1          timed out
    >=0         number of log events the function had to wait
                before reaching the desired log/position
 */
2365

2366
int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
2367 2368
                                    longlong log_pos,
                                    longlong timeout)
unknown's avatar
unknown committed
2369
{
2370 2371
  if (!inited)
    return -1;
unknown's avatar
unknown committed
2372
  int event_count = 0;
2373
  ulong init_abort_pos_wait;
2374 2375 2376 2377
  int error=0;
  struct timespec abstime; // for timeout checking
  set_timespec(abstime,timeout);

2378
  DBUG_ENTER("wait_for_pos");
2379 2380
  DBUG_PRINT("enter",("group_master_log_name: '%s'  pos: %lu timeout: %ld",
                      group_master_log_name, (ulong) group_master_log_pos, 
2381
                      (long) timeout));
2382

2383
  pthread_mutex_lock(&data_lock);
2384
  /* 
unknown's avatar
unknown committed
2385 2386 2387 2388 2389 2390 2391 2392
     This function will abort when it notices that some CHANGE MASTER or
     RESET MASTER has changed the master info.
     To catch this, these commands modify abort_pos_wait ; We just monitor
     abort_pos_wait and see if it has changed.
     Why do we have this mechanism instead of simply monitoring slave_running
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
     the SQL thread be stopped?
     This is becasue if someones does:
2393
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
unknown's avatar
unknown committed
2394 2395
     the change may happen very quickly and we may not notice that
     slave_running briefly switches between 1/0/1.
2396
  */
2397
  init_abort_pos_wait= abort_pos_wait;
2398

2399 2400 2401 2402 2403 2404 2405 2406
  /*
    We'll need to 
    handle all possible log names comparisons (e.g. 999 vs 1000).
    We use ulong for string->number conversion ; this is no 
    stronger limitation than in find_uniq_filename in sql/log.cc
  */
  ulong log_name_extension;
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
unknown's avatar
unknown committed
2407 2408
  char *end= strmake(log_name_tmp, log_name->ptr(), min(log_name->length(),
							FN_REFLEN-1));
2409 2410 2411 2412 2413 2414 2415
  char *p= fn_ext(log_name_tmp);
  char *p_end;
  if (!*p || log_pos<0)   
  {
    error= -2; //means improper arguments
    goto err;
  }
2416 2417
  // Convert 0-3 to 4
  log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
unknown's avatar
unknown committed
2418
  /* p points to '.' */
2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430
  log_name_extension= strtoul(++p, &p_end, 10);
  /*
    p_end points to the first invalid character.
    If it equals to p, no digits were found, error.
    If it contains '\0' it means conversion went ok.
  */
  if (p_end==p || *p_end)
  {
    error= -2;
    goto err;
  }    

unknown's avatar
unknown committed
2431
  /* The "compare and wait" main loop */
2432
  while (!thd->killed &&
2433
         init_abort_pos_wait == abort_pos_wait &&
2434
         slave_running)
unknown's avatar
unknown committed
2435
  {
2436 2437
    bool pos_reached;
    int cmp_result= 0;
unknown's avatar
unknown committed
2438

2439
    /*
unknown's avatar
unknown committed
2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450
      group_master_log_name can be "", if we are just after a fresh
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
      (before we have executed one Rotate event from the master) or
      (rare) if the user is doing a weird slave setup (see next
      paragraph).  If group_master_log_name is "", we assume we don't
      have enough info to do the comparison yet, so we just wait until
      more data. In this case master_log_pos is always 0 except if
      somebody (wrongly) sets this slave to be a slave of itself
      without using --replicate-same-server-id (an unsupported
      configuration which does nothing), then group_master_log_pos
      will grow and group_master_log_name will stay "".
2451
    */
2452
    if (*group_master_log_name)
unknown's avatar
unknown committed
2453
    {
unknown's avatar
unknown committed
2454 2455
      char *basename= (group_master_log_name +
                       dirname_length(group_master_log_name));
unknown's avatar
unknown committed
2456
      /*
2457 2458 2459 2460
        First compare the parts before the extension.
        Find the dot in the master's log basename,
        and protect against user's input error :
        if the names do not match up to '.' included, return error
2461
      */
2462 2463 2464 2465 2466 2467 2468 2469
      char *q= (char*)(fn_ext(basename)+1);
      if (strncmp(basename, log_name_tmp, (int)(q-basename)))
      {
        error= -2;
        break;
      }
      // Now compare extensions.
      char *q_end;
2470 2471
      ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
      if (group_master_log_name_extension < log_name_extension)
2472
        cmp_result= -1 ;
2473
      else
2474
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
2475

unknown's avatar
unknown committed
2476
      pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
2477 2478 2479
                    cmp_result > 0);
      if (pos_reached || thd->killed)
        break;
unknown's avatar
unknown committed
2480
    }
2481 2482

    //wait for master update, with optional timeout.
2483
    
unknown's avatar
unknown committed
2484
    DBUG_PRINT("info",("Waiting for master update"));
2485
    const char* msg = thd->enter_cond(&data_cond, &data_lock,
2486
                                      "Waiting for the slave SQL thread to \
2487
advance position");
2488 2489 2490 2491
    /*
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
      will wake us up.
    */
2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508
    if (timeout > 0)
    {
      /*
        Note that pthread_cond_timedwait checks for the timeout
        before for the condition ; i.e. it returns ETIMEDOUT 
        if the system time equals or exceeds the time specified by abstime
        before the condition variable is signaled or broadcast, _or_ if
        the absolute time specified by abstime has already passed at the time
        of the call.
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
        even if its condition is always immediately signaled (case of a loaded
        master).
      */
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
    }
    else
      pthread_cond_wait(&data_cond, &data_lock);
2509
    DBUG_PRINT("info",("Got signal of master update"));
2510
    thd->exit_cond(msg);
2511 2512 2513 2514 2515
    if (error == ETIMEDOUT || error == ETIME)
    {
      error= -1;
      break;
    }
unknown's avatar
unknown committed
2516
    error=0;
2517
    event_count++;
2518
    DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
2519
  }
2520 2521

err:
2522
  pthread_mutex_unlock(&data_lock);
2523
  DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
unknown's avatar
unknown committed
2524
improper_arguments: %d  timed_out: %d",
2525 2526
                     (int) thd->killed,
                     (int) (init_abort_pos_wait != abort_pos_wait),
2527
                     (int) slave_running,
2528 2529 2530
                     (int) (error == -2),
                     (int) (error == -1)));
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
2531
      !slave_running) 
2532 2533 2534 2535
  {
    error= -2;
  }
  DBUG_RETURN( error ? error : event_count );
unknown's avatar
unknown committed
2536 2537
}

2538

unknown's avatar
unknown committed
2539
/*
2540
  init_slave_thread()
unknown's avatar
unknown committed
2541
*/
2542

2543
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
unknown's avatar
unknown committed
2544 2545
{
  DBUG_ENTER("init_slave_thread");
2546 2547
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO; 
2548
  thd->host_or_ip= "";
unknown's avatar
unknown committed
2549 2550
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
unknown's avatar
unknown committed
2551
  thd->net.read_timeout = slave_net_timeout;
unknown's avatar
unknown committed
2552 2553
  thd->master_access= ~0;
  thd->priv_user = 0;
2554
  thd->slave_thread = 1;
2555
  /* 
2556
     It's nonsense to constrain the slave threads with max_join_size; if a
2557 2558 2559 2560 2561 2562
     query succeeded on master, we HAVE to execute it. So set
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
     SELECT examining more than 4 billion rows would still fail (yes, because
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
     only for client threads.
2563
  */
2564 2565
  thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) |
    OPTION_AUTO_IS_NULL | OPTION_BIG_SELECTS;
unknown's avatar
unknown committed
2566
  thd->client_capabilities = CLIENT_LOCAL_FILES;
2567
  thd->real_id=pthread_self();
unknown's avatar
unknown committed
2568 2569 2570 2571
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

2572
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
2573
  {
2574 2575
    thd->cleanup();
    delete thd;
unknown's avatar
unknown committed
2576 2577 2578
    DBUG_RETURN(-1);
  }

unknown's avatar
unknown committed
2579
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
unknown's avatar
unknown committed
2580 2581 2582 2583 2584
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

2585
  if (thd_type == SLAVE_THD_SQL)
2586
    thd->proc_info= "Waiting for the next event in relay log";
2587
  else
2588
    thd->proc_info= "Waiting for master update";
unknown's avatar
unknown committed
2589 2590 2591 2592 2593
  thd->version=refresh_version;
  thd->set_time();
  DBUG_RETURN(0);
}

2594

2595 2596
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg)
unknown's avatar
unknown committed
2597
{
2598
  int nap_time;
unknown's avatar
unknown committed
2599 2600 2601 2602 2603
  thr_alarm_t alarmed;
  thr_alarm_init(&alarmed);
  time_t start_time= time((time_t*) 0);
  time_t end_time= start_time+sec;

2604
  while ((nap_time= (int) (end_time - start_time)) > 0)
unknown's avatar
unknown committed
2605
  {
2606
    ALARM alarm_buff;
unknown's avatar
unknown committed
2607
    /*
2608
      The only reason we are asking for alarm is so that
unknown's avatar
unknown committed
2609 2610 2611
      we will be woken up in case of murder, so if we do not get killed,
      set the alarm so it goes off after we wake up naturally
    */
2612
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
unknown's avatar
unknown committed
2613
    sleep(nap_time);
2614
    thr_end_alarm(&alarmed);
unknown's avatar
unknown committed
2615
    
2616
    if ((*thread_killed)(thd,thread_killed_arg))
unknown's avatar
unknown committed
2617 2618 2619 2620 2621 2622
      return 1;
    start_time=time((time_t*) 0);
  }
  return 0;
}

2623

unknown's avatar
unknown committed
2624 2625
static int request_dump(MYSQL* mysql, MASTER_INFO* mi,
			bool *suppress_warnings)
unknown's avatar
unknown committed
2626
{
2627
  char buf[FN_REFLEN + 10];
unknown's avatar
unknown committed
2628 2629
  int len;
  int binlog_flags = 0; // for now
2630
  char* logname = mi->master_log_name;
2631 2632
  DBUG_ENTER("request_dump");

unknown's avatar
unknown committed
2633 2634
  // TODO if big log files: Change next to int8store()
  int4store(buf, (longlong) mi->master_log_pos);
unknown's avatar
unknown committed
2635
  int2store(buf + 4, binlog_flags);
2636
  int4store(buf + 6, server_id);
unknown's avatar
unknown committed
2637
  len = (uint) strlen(logname);
2638
  memcpy(buf + 10, logname,len);
unknown's avatar
SCRUM  
unknown committed
2639
  if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
unknown's avatar
unknown committed
2640
  {
unknown's avatar
unknown committed
2641 2642 2643 2644 2645
    /*
      Something went wrong, so we will just reconnect and retry later
      in the future, we should do a better error analysis, but for
      now we just fill up the error log :-)
    */
unknown's avatar
SCRUM  
unknown committed
2646
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
unknown's avatar
unknown committed
2647 2648
      *suppress_warnings= 1;			// Suppress reconnect warning
    else
2649
      sql_print_error("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs",
unknown's avatar
SCRUM  
unknown committed
2650
		      mysql_errno(mysql), mysql_error(mysql),
2651 2652
		      master_connect_retry);
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2653
  }
unknown's avatar
unknown committed
2654

2655
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2656 2657
}

2658

2659
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
unknown's avatar
unknown committed
2660 2661 2662
{
  char buf[1024];
  char * p = buf;
unknown's avatar
unknown committed
2663 2664
  uint table_len = (uint) strlen(table);
  uint db_len = (uint) strlen(db);
unknown's avatar
unknown committed
2665
  if (table_len + db_len > sizeof(buf) - 2)
unknown's avatar
unknown committed
2666 2667 2668 2669
  {
    sql_print_error("request_table_dump: Buffer overrun");
    return 1;
  } 
unknown's avatar
unknown committed
2670 2671 2672 2673 2674 2675 2676
  
  *p++ = db_len;
  memcpy(p, db, db_len);
  p += db_len;
  *p++ = table_len;
  memcpy(p, table, table_len);
  
unknown's avatar
SCRUM  
unknown committed
2677
  if (simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
unknown's avatar
unknown committed
2678 2679
  {
    sql_print_error("request_table_dump: Error sending the table dump \
unknown's avatar
unknown committed
2680
command");
unknown's avatar
unknown committed
2681 2682
    return 1;
  }
unknown's avatar
unknown committed
2683 2684 2685 2686

  return 0;
}

2687

unknown's avatar
unknown committed
2688
/*
2689
  Read one event from the master
unknown's avatar
unknown committed
2690 2691 2692 2693 2694 2695 2696 2697 2698
  
  SYNOPSIS
    read_event()
    mysql		MySQL connection
    mi			Master connection information
    suppress_warnings	TRUE when a normal net read timeout has caused us to
			try a reconnect.  We do not want to print anything to
			the error log in this case because this a anormal
			event in an idle server.
2699

unknown's avatar
unknown committed
2700 2701 2702 2703 2704 2705
    RETURN VALUES
    'packet_error'	Error
    number		Length of packet
*/

static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
unknown's avatar
unknown committed
2706
{
2707
  ulong len;
unknown's avatar
unknown committed
2708

2709
  *suppress_warnings= 0;
unknown's avatar
unknown committed
2710 2711 2712
  /*
    my_real_read() will time us out
    We check if we were told to die, and if not, try reading again
2713 2714

    TODO:  Move 'events_till_disconnect' to the MASTER_INFO structure
unknown's avatar
unknown committed
2715
  */
unknown's avatar
unknown committed
2716
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2717
  if (disconnect_slave_event_count && !(events_till_disconnect--))
unknown's avatar
unknown committed
2718 2719 2720
    return packet_error;      
#endif
  
unknown's avatar
SCRUM  
unknown committed
2721
  len = net_safe_read(mysql);
2722
  if (len == packet_error || (long) len < 1)
unknown's avatar
unknown committed
2723
  {
unknown's avatar
SCRUM  
unknown committed
2724
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
unknown's avatar
unknown committed
2725 2726 2727 2728 2729 2730 2731 2732 2733 2734
    {
      /*
	We are trying a normal reconnect after a read timeout;
	we suppress prints to .err file as long as the reconnect
	happens without problems
      */
      *suppress_warnings= TRUE;
    }
    else
      sql_print_error("Error reading packet from server: %s (\
unknown's avatar
unknown committed
2735
server_errno=%d)",
unknown's avatar
SCRUM  
unknown committed
2736
		      mysql_error(mysql), mysql_errno(mysql));
unknown's avatar
unknown committed
2737 2738 2739
    return packet_error;
  }

2740 2741
  /* Check if eof packet */
  if (len < 8 && mysql->net.read_pos[0] == 254)
unknown's avatar
unknown committed
2742
  {
2743
     sql_print_error("Slave: received end packet from server, apparent\
unknown's avatar
unknown committed
2744
 master shutdown: %s",
unknown's avatar
SCRUM  
unknown committed
2745
		     mysql_error(mysql));
unknown's avatar
unknown committed
2746
     return packet_error;
unknown's avatar
unknown committed
2747
  }
unknown's avatar
unknown committed
2748 2749
  
  DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
2750
		      len, mysql->net.read_pos[4]));
unknown's avatar
unknown committed
2751 2752 2753
  return len - 1;   
}

unknown's avatar
unknown committed
2754

2755
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
2756
{
unknown's avatar
unknown committed
2757 2758 2759 2760 2761 2762 2763 2764 2765
  switch (expected_error) {
  case ER_NET_READ_ERROR:
  case ER_NET_ERROR_ON_WRITE:  
  case ER_SERVER_SHUTDOWN:  
  case ER_NEW_ABORTING_CONNECTION:
    return 1;
  default:
    return 0;
  }
2766
}
2767

2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844
/*
     Check if condition stated in UNTIL clause of START SLAVE is reached.
   SYNOPSYS
     st_relay_log_info::is_until_satisfied()
   DESCRIPTION
     Checks if UNTIL condition is reached. Uses caching result of last 
     comparison of current log file name and target log file name. So cached 
     value should be invalidated if current log file name changes 
     (see st_relay_log_info::notify_... functions).
     
     This caching is needed to avoid of expensive string comparisons and 
     strtol() conversions needed for log names comparison. We don't need to
     compare them each time this function is called, we only need to do this 
     when current log name changes. If we have UNTIL_MASTER_POS condition we 
     need to do this only after Rotate_log_event::exec_event() (which is 
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS 
     condition then we should invalidate cached comarison value after 
     inc_group_relay_log_pos() which called for each group of events (so we
     have some benefit if we have something like queries that use 
     autoincrement or if we have transactions).
     
     Should be called ONLY if until_condition != UNTIL_NONE !
   RETURN VALUE
     true - condition met or error happened (condition seems to have 
            bad log file name)
     false - condition not met
*/

bool st_relay_log_info::is_until_satisfied()
{
  const char *log_name;
  ulonglong log_pos;

  DBUG_ASSERT(until_condition != UNTIL_NONE);
  
  if (until_condition == UNTIL_MASTER_POS)
  {
    log_name= group_master_log_name;
    log_pos= group_master_log_pos;
  }
  else
  { /* until_condition == UNTIL_RELAY_POS */
    log_name= group_relay_log_name;
    log_pos= group_relay_log_pos;
  }
  
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
  {
    /* 
       We have no cached comaprison results so we should compare log names
       and cache result
    */

    DBUG_ASSERT(*log_name || log_pos == 0);
    
    if (*log_name)
    {
      const char *basename= log_name + dirname_length(log_name);
      
      const char *q= (const char*)(fn_ext(basename)+1);
      if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
      {
        /* Now compare extensions. */
        char *q_end;
        ulong log_name_extension= strtoul(q, &q_end, 10);
        if (log_name_extension < until_log_name_extension)
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
        else
          until_log_names_cmp_result= 
            (log_name_extension > until_log_name_extension) ? 
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
      }
      else  
      {
        /* Probably error so we aborting */
        sql_print_error("Slave SQL thread is stopped because UNTIL "
                        "condition is bad.");
unknown's avatar
unknown committed
2845
        return TRUE;
2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856
      }
    }
    else
      return until_log_pos == 0;
  }
    
  return ((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL && 
           log_pos >= until_log_pos) ||
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER);
}

2857

2858
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
unknown's avatar
unknown committed
2859
{
2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870
  /*
     We acquire this mutex since we need it for all operations except
     event execution. But we will release it in places where we will 
     wait for something for example inside of next_event().
   */
  pthread_mutex_lock(&rli->data_lock);
  
  if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE && 
      rli->is_until_satisfied()) 
  {
    sql_print_error("Slave SQL thread stopped because it reached its"
2871
                    " UNTIL position %ld", (long) rli->until_pos());
2872 2873 2874 2875 2876 2877 2878 2879 2880
    /* 
      Setting abort_slave flag because we do not want additional message about
      error in query execution to be printed.
    */
    rli->abort_slave= 1;
    pthread_mutex_unlock(&rli->data_lock);
    return 1;
  }
  
2881
  Log_event * ev = next_event(rli);
2882
  
2883
  DBUG_ASSERT(rli->sql_thd==thd);
2884
  
2885
  if (sql_slave_killed(thd,rli))
2886
  {
2887
    pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
2888
    delete ev;
2889
    return 1;
2890
  }
2891 2892
  if (ev)
  {
2893
    int type_code = ev->get_type_code();
2894
    int exec_res;
2895 2896 2897 2898 2899 2900 2901 2902

    /*
      Skip queries originating from this server or number of
      queries specified by the user in slave_skip_counter
      We can't however skip event's that has something to do with the
      log files themselves.
    */

2903
    if ((ev->server_id == (uint32) ::server_id && !replicate_same_server_id) ||
2904
	(rli->slave_skip_counter && type_code != ROTATE_EVENT))
unknown's avatar
unknown committed
2905
    {
2906
      /* TODO: I/O thread should not even log events with the same server id */
2907
      rli->inc_group_relay_log_pos(ev->get_event_len(),
2908
		   type_code != STOP_EVENT ? ev->log_pos : LL(0),
2909 2910
		   1/* skip lock*/);
      flush_relay_log_info(rli);
unknown's avatar
unknown committed
2911 2912 2913 2914 2915 2916 2917 2918

      /*
	Protect against common user error of setting the counter to 1
	instead of 2 while recovering from an failed auto-increment insert
      */
      if (rli->slave_skip_counter && 
	  !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
	    rli->slave_skip_counter == 1))
2919 2920
        --rli->slave_skip_counter;
      pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
2921 2922
      delete ev;     
      return 0;					// avoid infinite update loops
2923
    } 
2924
    pthread_mutex_unlock(&rli->data_lock);
2925 2926
  
    thd->server_id = ev->server_id; // use the original server id for logging
unknown's avatar
unknown committed
2927
    thd->set_time();				// time the query
2928
    thd->lex->current_select= 0;
unknown's avatar
unknown committed
2929
    if (!ev->when)
unknown's avatar
unknown committed
2930
      ev->when = time(NULL);
2931
    ev->thd = thd;
2932 2933
    exec_res = ev->exec_event(rli);
    DBUG_ASSERT(rli->sql_thd==thd);
2934 2935
    delete ev;
    return exec_res;
2936
  }
unknown's avatar
unknown committed
2937
  else
2938
  {
2939
    pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
2940
    slave_print_error(rli, 0, "\
2941 2942 2943 2944 2945 2946 2947
Could not parse relay log event entry. The possible reasons are: the master's \
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
binary log), the slave's relay log is corrupted (you can check this by running \
'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
or slave's MySQL code. If you want to check the master's binary log or slave's \
relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
on this slave.\
unknown's avatar
unknown committed
2948
");
2949 2950
    return 1;
  }
unknown's avatar
unknown committed
2951 2952
}

2953

unknown's avatar
unknown committed
2954
/* Slave I/O Thread entry point */
2955

2956
extern "C" pthread_handler_decl(handle_slave_io,arg)
unknown's avatar
unknown committed
2957
{
unknown's avatar
unknown committed
2958 2959 2960 2961 2962 2963 2964 2965
  THD *thd; // needs to be first for thread_stack
  MYSQL *mysql;
  MASTER_INFO *mi = (MASTER_INFO*)arg; 
  char llbuff[22];
  uint retry_count;
  
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
2966
  DBUG_ENTER("handle_slave_io");
unknown's avatar
unknown committed
2967

unknown's avatar
unknown committed
2968
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2969
slave_begin:
unknown's avatar
unknown committed
2970
#endif  
2971
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
2972 2973 2974
  mysql= NULL ;
  retry_count= 0;

2975
  pthread_mutex_lock(&mi->run_lock);
unknown's avatar
unknown committed
2976 2977 2978
  /* Inform waiting threads that slave has started */
  mi->slave_run_id++;

2979
#ifndef DBUG_OFF  
2980
  mi->events_till_abort = abort_slave_event_count;
2981
#endif  
unknown's avatar
unknown committed
2982
  
2983
  thd= new THD; // note that contructor of THD uses DBUG_ !
2984
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
2985 2986

  pthread_detach_this_thread();
2987
  if (init_slave_thread(thd, SLAVE_THD_IO))
unknown's avatar
unknown committed
2988 2989 2990 2991 2992 2993
  {
    pthread_cond_broadcast(&mi->start_cond);
    pthread_mutex_unlock(&mi->run_lock);
    sql_print_error("Failed during slave I/O thread initialization");
    goto err;
  }
2994
  mi->io_thd = thd;
unknown's avatar
unknown committed
2995
  thd->thread_stack = (char*)&thd; // remember where our stack is
2996
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
2997
  threads.append(thd);
2998
  pthread_mutex_unlock(&LOCK_thread_count);
2999 3000 3001
  mi->slave_running = 1;
  mi->abort_slave = 0;
  pthread_mutex_unlock(&mi->run_lock);
3002
  pthread_cond_broadcast(&mi->start_cond);
unknown's avatar
unknown committed
3003
  
3004 3005 3006
  DBUG_PRINT("master_info",("log_file_name: '%s'  position: %s",
			    mi->master_log_name,
			    llstr(mi->master_log_pos,llbuff)));
unknown's avatar
unknown committed
3007
  
unknown's avatar
SCRUM  
unknown committed
3008
  if (!(mi->mysql = mysql = mysql_init(NULL)))
unknown's avatar
unknown committed
3009
  {
unknown's avatar
unknown committed
3010
    sql_print_error("Slave I/O thread: error in mysql_init()");
unknown's avatar
unknown committed
3011 3012
    goto err;
  }
unknown's avatar
unknown committed
3013
  
unknown's avatar
unknown committed
3014

3015
  thd->proc_info = "Connecting to master";
3016
  // we can get killed during safe_connect
3017
  if (!safe_connect(thd, mysql, mi))
unknown's avatar
unknown committed
3018
    sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\
3019
  replication started in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
3020 3021 3022
		    mi->host, mi->port,
		    IO_RPL_LOG_NAME,
		    llstr(mi->master_log_pos,llbuff));
3023
  else
unknown's avatar
unknown committed
3024
  {
3025
    sql_print_error("Slave I/O thread killed while connecting to master");
unknown's avatar
unknown committed
3026 3027
    goto err;
  }
3028

3029
connected:
3030

3031
  thd->slave_net = &mysql->net;
3032
  thd->proc_info = "Checking master version";
unknown's avatar
unknown committed
3033
  if (get_master_version_and_clock(mysql, mi))
3034
    goto err;
3035
  if (!mi->old_format)
3036
  {
unknown's avatar
unknown committed
3037 3038 3039 3040 3041
    /*
      Register ourselves with the master.
      If fails, this is not fatal - we just print the error message and go
      on with life.
    */
3042
    thd->proc_info = "Registering slave on master";
3043
    if (register_slave_on_master(mysql) ||  update_slave_list(mysql, mi))
3044 3045
      goto err;
  }
unknown's avatar
unknown committed
3046
  
3047
  DBUG_PRINT("info",("Starting reading binary log from master"));
3048
  while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3049
  {
unknown's avatar
unknown committed
3050
    bool suppress_warnings= 0;    
unknown's avatar
unknown committed
3051
    thd->proc_info = "Requesting binlog dump";
unknown's avatar
unknown committed
3052
    if (request_dump(mysql, mi, &suppress_warnings))
unknown's avatar
unknown committed
3053 3054
    {
      sql_print_error("Failed on request_dump()");
unknown's avatar
unknown committed
3055
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3056 3057
      {
	sql_print_error("Slave I/O thread killed while requesting master \
unknown's avatar
unknown committed
3058
dump");
unknown's avatar
unknown committed
3059 3060
	goto err;
      }
unknown's avatar
unknown committed
3061
	  
3062
      thd->proc_info= "Waiting to reconnect after a failed binlog dump request";
unknown's avatar
SCRUM  
unknown committed
3063
      end_server(mysql);
unknown's avatar
unknown committed
3064 3065 3066 3067 3068
      /*
	First time retry immediately, assuming that we can recover
	right away - if first time fails, sleep between re-tries
	hopefuly the admin can fix the problem sometime
      */
3069 3070 3071 3072
      if (retry_count++)
      {
	if (retry_count > master_retry_count)
	  goto err;				// Don't retry forever
3073 3074
	safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
		   (void*)mi);
3075
      }
3076
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3077 3078
      {
	sql_print_error("Slave I/O thread killed while retrying master \
unknown's avatar
unknown committed
3079
dump");
unknown's avatar
unknown committed
3080 3081
	goto err;
      }
unknown's avatar
unknown committed
3082

3083
      thd->proc_info = "Reconnecting after a failed binlog dump request";
unknown's avatar
unknown committed
3084 3085
      if (!suppress_warnings)
	sql_print_error("Slave I/O thread: failed dump request, \
3086
reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
3087 3088 3089
			llstr(mi->master_log_pos,llbuff));
      if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	  io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3090 3091
      {
	sql_print_error("Slave I/O thread killed during or \
3092
after reconnect");
unknown's avatar
unknown committed
3093 3094
	goto err;
      }
unknown's avatar
unknown committed
3095

unknown's avatar
unknown committed
3096 3097
      goto connected;
    }
unknown's avatar
unknown committed
3098

3099
    while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3100
    {
unknown's avatar
unknown committed
3101
      bool suppress_warnings= 0;    
3102 3103 3104 3105 3106 3107 3108
      /* 
         We say "waiting" because read_event() will wait if there's nothing to
         read. But if there's something to read, it will not wait. The important
         thing is to not confuse users by saying "reading" whereas we're in fact
         receiving nothing.
      */
      thd->proc_info = "Waiting for master to send event";
unknown's avatar
unknown committed
3109
      ulong event_len = read_event(mysql, mi, &suppress_warnings);
3110
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3111
      {
3112 3113
	if (global_system_variables.log_warnings)
	  sql_print_error("Slave I/O thread killed while reading event");
unknown's avatar
unknown committed
3114 3115
	goto err;
      }
3116
	  	  
unknown's avatar
unknown committed
3117 3118
      if (event_len == packet_error)
      {
unknown's avatar
SCRUM  
unknown committed
3119
	uint mysql_error_number= mysql_errno(mysql);
3120
	if (mysql_error_number == ER_NET_PACKET_TOO_LARGE)
unknown's avatar
unknown committed
3121
	{
3122 3123 3124 3125
	  sql_print_error("\
Log entry on master is longer than max_allowed_packet (%ld) on \
slave. If the entry is correct, restart the server with a higher value of \
max_allowed_packet",
unknown's avatar
unknown committed
3126
			  thd->variables.max_allowed_packet);
unknown's avatar
unknown committed
3127 3128
	  goto err;
	}
3129 3130 3131
	if (mysql_error_number == ER_MASTER_FATAL_ERROR_READING_BINLOG)
	{
	  sql_print_error(ER(mysql_error_number), mysql_error_number,
unknown's avatar
SCRUM  
unknown committed
3132
			  mysql_error(mysql));
3133 3134
	  goto err;
	}
unknown's avatar
unknown committed
3135
	thd->proc_info = "Waiting to reconnect after a failed master event read";
unknown's avatar
SCRUM  
unknown committed
3136
	end_server(mysql);
3137 3138 3139 3140
	if (retry_count++)
	{
	  if (retry_count > master_retry_count)
	    goto err;				// Don't retry forever
3141
	  safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3142 3143
		     (void*) mi);
	}	    
3144
	if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3145
	{
3146 3147
	  if (global_system_variables.log_warnings)
	    sql_print_error("Slave I/O thread killed while waiting to \
unknown's avatar
unknown committed
3148
reconnect after a failed read");
unknown's avatar
unknown committed
3149 3150
	  goto err;
	}
3151
	thd->proc_info = "Reconnecting after a failed master event read";
unknown's avatar
unknown committed
3152 3153
	if (!suppress_warnings)
	  sql_print_error("Slave I/O thread: Failed reading log event, \
3154
reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
3155 3156 3157
			  llstr(mi->master_log_pos, llbuff));
	if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	    io_slave_killed(thd,mi))
unknown's avatar
unknown committed
3158
	{
3159 3160
	  if (global_system_variables.log_warnings)
	    sql_print_error("Slave I/O thread killed during or after a \
unknown's avatar
unknown committed
3161
reconnect done to recover from failed read");
unknown's avatar
unknown committed
3162 3163 3164
	  goto err;
	}
	goto connected;
unknown's avatar
unknown committed
3165
      } // if (event_len == packet_error)
unknown's avatar
unknown committed
3166
	  
3167
      retry_count=0;			// ok event, reset retry counter
3168
      thd->proc_info = "Queueing master event to the relay log";
unknown's avatar
unknown committed
3169 3170 3171
      if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
		      event_len))
      {
3172
	sql_print_error("Slave I/O thread could not queue event from master");
unknown's avatar
unknown committed
3173 3174
	goto err;
      }
unknown's avatar
unknown committed
3175
      flush_master_info(mi, 1); /* sure that we can flush the relay log */
3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187
      /*
        See if the relay logs take too much space.
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
        and does not introduce any problem:
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
        the clean value is 0), then we are reading only one more event as we
        should, and we'll block only at the next event. No big deal.
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
        the clean value is 1), then we are going into wait_for_relay_log_space()
        for no reason, but this function will do a clean read, notice the clean
        value and exit immediately.
      */
3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198
#ifndef DBUG_OFF
      {
        char llbuf1[22], llbuf2[22];
        DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \
ignore_log_space_limit=%d",
                            llstr(mi->rli.log_space_limit,llbuf1),
                            llstr(mi->rli.log_space_total,llbuf2),
                            (int) mi->rli.ignore_log_space_limit)); 
      }
#endif

unknown's avatar
unknown committed
3199
      if (mi->rli.log_space_limit && mi->rli.log_space_limit <
3200 3201
	  mi->rli.log_space_total &&
          !mi->rli.ignore_log_space_limit)
unknown's avatar
unknown committed
3202 3203 3204 3205 3206 3207
	if (wait_for_relay_log_space(&mi->rli))
	{
	  sql_print_error("Slave I/O thread aborted while waiting for relay \
log space");
	  goto err;
	}
unknown's avatar
unknown committed
3208
      // TODO: check debugging abort code
3209
#ifndef DBUG_OFF
unknown's avatar
unknown committed
3210 3211 3212 3213 3214
      if (abort_slave_event_count && !--events_till_abort)
      {
	sql_print_error("Slave I/O thread: debugging abort");
	goto err;
      }
3215
#endif
3216
    } 
3217
  }
unknown's avatar
unknown committed
3218

unknown's avatar
unknown committed
3219
  // error = 0;
unknown's avatar
unknown committed
3220
err:
3221 3222 3223
  // print the current replication position
  sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
		  IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
3224
  VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
3225
  thd->query = thd->db = 0; // extra safety
unknown's avatar
unknown committed
3226
  thd->query_length = 0;
3227
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
3228 3229
  if (mysql)
  {
unknown's avatar
SCRUM  
unknown committed
3230
    mysql_close(mysql);
unknown's avatar
unknown committed
3231 3232
    mi->mysql=0;
  }
unknown's avatar
unknown committed
3233
  thd->proc_info = "Waiting for slave mutex on exit";
3234 3235 3236 3237
  pthread_mutex_lock(&mi->run_lock);
  mi->slave_running = 0;
  mi->io_thd = 0;
  // TODO: make rpl_status part of MASTER_INFO
3238
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
3239 3240
  mi->abort_slave = 0; // TODO: check if this is needed
  DBUG_ASSERT(thd->net.buff != 0);
unknown's avatar
unknown committed
3241
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
3242
  pthread_mutex_lock(&LOCK_thread_count);
3243
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
3244
  delete thd;
3245
  pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
3246
  pthread_cond_broadcast(&mi->stop_cond);	// tell the world we are done
3247
  pthread_mutex_unlock(&mi->run_lock);
unknown's avatar
unknown committed
3248
#ifndef DBUG_OFF
unknown's avatar
unknown committed
3249
  if (abort_slave_event_count && !events_till_abort)
unknown's avatar
unknown committed
3250 3251
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
3252
  my_thread_end();
unknown's avatar
unknown committed
3253 3254 3255 3256
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}

unknown's avatar
unknown committed
3257

unknown's avatar
unknown committed
3258
/* Slave SQL Thread entry point */
unknown's avatar
unknown committed
3259

3260
extern "C" pthread_handler_decl(handle_slave_sql,arg)
3261
{
3262
  THD *thd;			/* needs to be first for thread_stack */
3263 3264
  char llbuff[22],llbuff1[22];
  RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli; 
unknown's avatar
unknown committed
3265 3266 3267 3268
  const char *errmsg;

  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
3269
  DBUG_ENTER("handle_slave_sql");
unknown's avatar
unknown committed
3270 3271 3272 3273 3274

#ifndef DBUG_OFF
slave_begin:  
#endif  

3275 3276 3277
  DBUG_ASSERT(rli->inited);
  pthread_mutex_lock(&rli->run_lock);
  DBUG_ASSERT(!rli->slave_running);
unknown's avatar
unknown committed
3278
  errmsg= 0;
3279 3280 3281
#ifndef DBUG_OFF  
  rli->events_till_abort = abort_slave_event_count;
#endif  
3282

unknown's avatar
unknown committed
3283
  thd = new THD; // note that contructor of THD uses DBUG_ !
3284 3285
  thd->thread_stack = (char*)&thd; // remember where our stack is
  
unknown's avatar
unknown committed
3286 3287 3288
  /* Inform waiting threads that slave has started */
  rli->slave_run_id++;

3289 3290
  pthread_detach_this_thread();
  if (init_slave_thread(thd, SLAVE_THD_SQL))
unknown's avatar
unknown committed
3291 3292 3293 3294 3295 3296 3297 3298 3299 3300
  {
    /*
      TODO: this is currently broken - slave start and change master
      will be stuck if we fail here
    */
    pthread_cond_broadcast(&rli->start_cond);
    pthread_mutex_unlock(&rli->run_lock);
    sql_print_error("Failed during slave thread initialization");
    goto err;
  }
3301
  thd->init_for_queries();
3302
  rli->sql_thd= thd;
3303
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
3304
  pthread_mutex_lock(&LOCK_thread_count);
3305
  threads.append(thd);
3306
  pthread_mutex_unlock(&LOCK_thread_count);
3307 3308 3309
  rli->slave_running = 1;
  rli->abort_slave = 0;
  pthread_mutex_unlock(&rli->run_lock);
3310
  pthread_cond_broadcast(&rli->start_cond);
3311

unknown's avatar
unknown committed
3312 3313 3314
  /*
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
    thread may execute no Query_log_event, so the error will remain even
unknown's avatar
unknown committed
3315
    though there's no problem anymore). Do not reset the master timestamp
3316 3317 3318 3319
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
    as we are not sure that we are going to receive a query, we want to
    remember the last master timestamp (to say how many seconds behind we are
    now.
unknown's avatar
unknown committed
3320
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
unknown's avatar
unknown committed
3321
  */
unknown's avatar
unknown committed
3322
  clear_slave_error(rli);
3323 3324

  //tell the I/O thread to take relay_log_space_limit into account from now on
3325
  pthread_mutex_lock(&rli->log_space_lock);
3326
  rli->ignore_log_space_limit= 0;
3327
  pthread_mutex_unlock(&rli->log_space_lock);
3328

3329
  if (init_relay_log_pos(rli,
3330 3331
			 rli->group_relay_log_name,
			 rli->group_relay_log_pos,
3332
			 1 /*need data lock*/, &errmsg))
3333 3334 3335 3336 3337
  {
    sql_print_error("Error initializing relay log position: %s",
		    errmsg);
    goto err;
  }
3338
  THD_CHECK_SENTRY(thd);
3339 3340
  DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
  DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
3341
  DBUG_ASSERT(rli->sql_thd == thd);
3342 3343

  DBUG_PRINT("master_info",("log_file_name: %s  position: %s",
3344 3345
			    rli->group_master_log_name,
			    llstr(rli->group_master_log_pos,llbuff)));
3346 3347
  if (global_system_variables.log_warnings)
    sql_print_error("Slave SQL thread initialized, starting replication in \
unknown's avatar
unknown committed
3348
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
3349 3350
		    llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
		    llstr(rli->group_relay_log_pos,llbuff1));
3351

unknown's avatar
unknown committed
3352
  /* execute init_slave variable */
unknown's avatar
unknown committed
3353
  if (sys_init_slave.value_length)
unknown's avatar
unknown committed
3354
  {
3355
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
unknown's avatar
unknown committed
3356 3357 3358 3359 3360 3361 3362 3363
    if (thd->query_error)
    {
      sql_print_error("\
Slave SQL thread aborted. Can't execute init_slave query");
      goto err;
    }
  }

3364 3365
  /* Read queries from the IO/THREAD until this thread is killed */

3366
  while (!sql_slave_killed(thd,rli))
3367
  {
3368
    thd->proc_info = "Reading event from the relay log"; 
3369
    DBUG_ASSERT(rli->sql_thd == thd);
3370
    THD_CHECK_SENTRY(thd);
3371 3372 3373
    if (exec_relay_log_event(thd,rli))
    {
      // do not scare the user if SQL thread was simply killed or stopped
3374
      if (!sql_slave_killed(thd,rli))
3375 3376
        sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \
unknown's avatar
unknown committed
3377
the slave SQL thread with \"SLAVE START\". We stopped at log \
3378
'%s' position %s",
3379
		      RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
3380 3381
      goto err;
    }
3382
  }
3383

3384
  /* Thread stopped. Print the current replication position to the log */
3385 3386
  sql_print_error("Slave SQL thread exiting, replication stopped in log \
 '%s' at position %s",
3387
		  RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
3388 3389

 err:
3390
  VOID(pthread_mutex_lock(&LOCK_thread_count));
3391
  thd->query = thd->db = 0; // extra safety
unknown's avatar
unknown committed
3392
  thd->query_length = 0;
3393
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
3394 3395
  thd->proc_info = "Waiting for slave mutex on exit";
  pthread_mutex_lock(&rli->run_lock);
3396 3397
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
  pthread_mutex_lock(&rli->data_lock);
3398
  DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
3399 3400
  /* When master_pos_wait() wakes up it will check this and terminate */
  rli->slave_running= 0; 
unknown's avatar
unknown committed
3401 3402 3403 3404 3405
  /* 
     Going out of the transaction. Necessary to mark it, in case the user
     restarts replication from a non-transactional statement (with CHANGE
     MASTER).
  */
3406 3407 3408 3409
  /* Wake up master_pos_wait() */
  pthread_mutex_unlock(&rli->data_lock);
  DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
  pthread_cond_broadcast(&rli->data_cond);
unknown's avatar
unknown committed
3410
  rli->ignore_log_space_limit= 0; /* don't need any lock */
3411
  rli->save_temporary_tables = thd->temporary_tables;
unknown's avatar
unknown committed
3412 3413 3414 3415 3416

  /*
    TODO: see if we can do this conditionally in next_event() instead
    to avoid unneeded position re-init
  */
3417 3418 3419 3420
  thd->temporary_tables = 0; // remove tempation from destructor to close them
  DBUG_ASSERT(thd->net.buff != 0);
  net_end(&thd->net); // destructor will not free it, because we are weird
  DBUG_ASSERT(rli->sql_thd == thd);
3421
  THD_CHECK_SENTRY(thd);
3422
  rli->sql_thd= 0;
3423
  pthread_mutex_lock(&LOCK_thread_count);
3424
  THD_CHECK_SENTRY(thd);
3425 3426 3427 3428 3429 3430 3431 3432 3433
  delete thd;
  pthread_mutex_unlock(&LOCK_thread_count);
  pthread_cond_broadcast(&rli->stop_cond);
  // tell the world we are done
  pthread_mutex_unlock(&rli->run_lock);
#ifndef DBUG_OFF // TODO: reconsider the code below
  if (abort_slave_event_count && !rli->events_till_abort)
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
3434
  my_thread_end();
3435 3436 3437
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}
unknown's avatar
unknown committed
3438

3439

unknown's avatar
unknown committed
3440
/*
3441
  process_io_create_file()
unknown's avatar
unknown committed
3442
*/
3443

unknown's avatar
unknown committed
3444 3445 3446 3447 3448
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
  int error = 1;
  ulong num_bytes;
  bool cev_not_written;
3449 3450
  THD *thd = mi->io_thd;
  NET *net = &mi->mysql->net;
unknown's avatar
unknown committed
3451
  DBUG_ENTER("process_io_create_file");
unknown's avatar
unknown committed
3452 3453

  if (unlikely(!cev->is_valid()))
unknown's avatar
unknown committed
3454
    DBUG_RETURN(1);
unknown's avatar
unknown committed
3455 3456 3457 3458 3459 3460
  /*
    TODO: fix to honor table rules, not only db rules
  */
  if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
  {
    skip_load_data_infile(net);
unknown's avatar
unknown committed
3461
    DBUG_RETURN(0);
unknown's avatar
unknown committed
3462 3463 3464
  }
  DBUG_ASSERT(cev->inited_from_old);
  thd->file_id = cev->file_id = mi->file_id++;
3465
  thd->server_id = cev->server_id;
unknown's avatar
unknown committed
3466 3467 3468 3469 3470 3471 3472 3473 3474
  cev_not_written = 1;
  
  if (unlikely(net_request_file(net,cev->fname)))
  {
    sql_print_error("Slave I/O: failed requesting download of '%s'",
		    cev->fname);
    goto err;
  }

unknown's avatar
unknown committed
3475 3476 3477 3478
  /*
    This dummy block is so we could instantiate Append_block_log_event
    once and then modify it slightly instead of doing it multiple times
    in the loop
unknown's avatar
unknown committed
3479 3480
  */
  {
3481
    Append_block_log_event aev(thd,0,0,0,0);
unknown's avatar
unknown committed
3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492
  
    for (;;)
    {
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
      {
	sql_print_error("Network read error downloading '%s' from master",
			cev->fname);
	goto err;
      }
      if (unlikely(!num_bytes)) /* eof */
      {
3493
	net_write_command(net, 0, "", 0, "", 0);/* 3.23 master wants it */
unknown's avatar
unknown committed
3494 3495 3496 3497 3498 3499 3500 3501
        /*
          If we wrote Create_file_log_event, then we need to write
          Execute_load_log_event. If we did not write Create_file_log_event,
          then this is an empty file and we can just do as if the LOAD DATA
          INFILE had not existed, i.e. write nothing.
        */
        if (unlikely(cev_not_written))
	  break;
unknown's avatar
unknown committed
3502
	Execute_load_log_event xev(thd,0,0);
3503
	xev.log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
3504 3505 3506 3507 3508 3509
	if (unlikely(mi->rli.relay_log.append(&xev)))
	{
	  sql_print_error("Slave I/O: error writing Exec_load event to \
relay log");
	  goto err;
	}
unknown's avatar
unknown committed
3510
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
3511 3512 3513 3514 3515 3516
	break;
      }
      if (unlikely(cev_not_written))
      {
	cev->block = (char*)net->read_pos;
	cev->block_len = num_bytes;
3517
	cev->log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
3518 3519 3520 3521 3522 3523 3524
	if (unlikely(mi->rli.relay_log.append(cev)))
	{
	  sql_print_error("Slave I/O: error writing Create_file event to \
relay log");
	  goto err;
	}
	cev_not_written=0;
unknown's avatar
unknown committed
3525
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
3526 3527 3528 3529 3530
      }
      else
      {
	aev.block = (char*)net->read_pos;
	aev.block_len = num_bytes;
3531
	aev.log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
3532 3533 3534 3535 3536 3537
	if (unlikely(mi->rli.relay_log.append(&aev)))
	{
	  sql_print_error("Slave I/O: error writing Append_block event to \
relay log");
	  goto err;
	}
unknown's avatar
unknown committed
3538
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
unknown's avatar
unknown committed
3539 3540 3541 3542 3543
      }
    }
  }
  error=0;
err:
unknown's avatar
unknown committed
3544
  DBUG_RETURN(error);
unknown's avatar
unknown committed
3545
}
unknown's avatar
unknown committed
3546

3547

unknown's avatar
unknown committed
3548
/*
unknown's avatar
unknown committed
3549 3550 3551 3552 3553 3554 3555 3556
  Start using a new binary log on the master

  SYNOPSIS
    process_io_rotate()
    mi			master_info for the slave
    rev			The rotate log event read from the binary log

  DESCRIPTION
3557
    Updates the master info with the place in the next binary
unknown's avatar
unknown committed
3558 3559 3560 3561 3562 3563 3564 3565
    log where we should start reading.

  NOTES
    We assume we already locked mi->data_lock

  RETURN VALUES
    0		ok
    1	        Log event is illegal
unknown's avatar
unknown committed
3566 3567 3568

*/

unknown's avatar
unknown committed
3569
static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
3570
{
3571
  DBUG_ENTER("process_io_rotate");
unknown's avatar
unknown committed
3572
  safe_mutex_assert_owner(&mi->data_lock);
3573

unknown's avatar
unknown committed
3574
  if (unlikely(!rev->is_valid()))
3575
    DBUG_RETURN(1);
unknown's avatar
unknown committed
3576 3577 3578 3579 3580

  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
  mi->master_log_pos= rev->pos;
  DBUG_PRINT("info", ("master_log_pos: '%s' %d",
		      mi->master_log_name, (ulong) mi->master_log_pos));
3581
#ifndef DBUG_OFF
unknown's avatar
unknown committed
3582 3583 3584 3585 3586 3587
  /*
    If we do not do this, we will be getting the first
    rotate event forever, so we need to not disconnect after one.
  */
  if (disconnect_slave_event_count)
    events_till_disconnect++;
3588
#endif
3589
  DBUG_RETURN(0);
3590 3591
}

3592

unknown's avatar
unknown committed
3593
/*
3594 3595
  queue_old_event()

3596 3597
  Writes a 3.23 event to the relay log.

unknown's avatar
unknown committed
3598 3599 3600
  TODO: 
    Test this code before release - it has to be tested on a separate
    setup with 3.23 master 
unknown's avatar
unknown committed
3601 3602 3603 3604
*/

static int queue_old_event(MASTER_INFO *mi, const char *buf,
			   ulong event_len)
3605
{
unknown's avatar
unknown committed
3606
  const char *errmsg = 0;
unknown's avatar
unknown committed
3607 3608 3609 3610
  ulong inc_pos;
  bool ignore_event= 0;
  char *tmp_buf = 0;
  RELAY_LOG_INFO *rli= &mi->rli;
unknown's avatar
unknown committed
3611 3612
  DBUG_ENTER("queue_old_event");

unknown's avatar
unknown committed
3613 3614 3615
  /*
    If we get Load event, we need to pass a non-reusable buffer
    to read_log_event, so we do a trick
unknown's avatar
unknown committed
3616 3617 3618 3619 3620 3621
  */
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
  {
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
    {
      sql_print_error("Slave I/O: out of memory for Load event");
unknown's avatar
unknown committed
3622
      DBUG_RETURN(1);
unknown's avatar
unknown committed
3623 3624
    }
    memcpy(tmp_buf,buf,event_len);
3625 3626 3627 3628 3629 3630 3631 3632
    /*
      Create_file constructor wants a 0 as last char of buffer, this 0 will
      serve as the string-termination char for the file's name (which is at the
      end of the buffer)
      We must increment event_len, otherwise the event constructor will not see
      this end 0, which leads to segfault.
    */
    tmp_buf[event_len++]=0;
unknown's avatar
unknown committed
3633
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
unknown's avatar
unknown committed
3634 3635
    buf = (const char*)tmp_buf;
  }
3636 3637 3638 3639 3640 3641
  /*
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
    send the loaded file, and write it to the relay log in the form of
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
    connected to the master).
  */
unknown's avatar
unknown committed
3642 3643
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
					    1 /*old format*/ );
3644
  if (unlikely(!ev))
3645 3646
  {
    sql_print_error("Read invalid event from master: '%s',\
unknown's avatar
unknown committed
3647
 master could be corrupt but a more likely cause of this is a bug",
3648
		    errmsg);
unknown's avatar
unknown committed
3649 3650
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(1);
3651
  }
3652
  pthread_mutex_lock(&mi->data_lock);
3653
  ev->log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
3654
  switch (ev->get_type_code()) {
unknown's avatar
unknown committed
3655
  case STOP_EVENT:
3656
    ignore_event= 1;
unknown's avatar
unknown committed
3657 3658
    inc_pos= event_len;
    break;
3659
  case ROTATE_EVENT:
3660
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
3661 3662
    {
      delete ev;
3663
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
3664
      DBUG_RETURN(1);
3665
    }
unknown's avatar
unknown committed
3666
    inc_pos= 0;
3667
    break;
unknown's avatar
unknown committed
3668
  case CREATE_FILE_EVENT:
3669 3670 3671 3672 3673 3674
    /*
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
      queue_old_event() which is for 3.23 events which don't comprise
      CREATE_FILE_EVENT. This is because read_log_event() above has just
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
    */
unknown's avatar
unknown committed
3675
  {
unknown's avatar
unknown committed
3676 3677
    /* We come here when and only when tmp_buf != 0 */
    DBUG_ASSERT(tmp_buf);
unknown's avatar
unknown committed
3678
    int error = process_io_create_file(mi,(Create_file_log_event*)ev);
3679
    delete ev;
3680 3681 3682 3683 3684
    /*
      We had incremented event_len, but now when it is used to calculate the
      position in the master's log, we must use the original value.
    */
    mi->master_log_pos += --event_len;
3685
    DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
3686
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
3687
    my_free((char*)tmp_buf, MYF(0));
unknown's avatar
unknown committed
3688
    DBUG_RETURN(error);
unknown's avatar
unknown committed
3689
  }
3690
  default:
unknown's avatar
unknown committed
3691
    inc_pos= event_len;
3692 3693
    break;
  }
unknown's avatar
unknown committed
3694
  if (likely(!ignore_event))
3695
  {
unknown's avatar
unknown committed
3696
    if (unlikely(rli->relay_log.append(ev)))
3697 3698 3699
    {
      delete ev;
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
3700
      DBUG_RETURN(1);
3701
    }
unknown's avatar
unknown committed
3702
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3703 3704
  }
  delete ev;
unknown's avatar
unknown committed
3705
  mi->master_log_pos+= inc_pos;
3706
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
3707
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
3708
  DBUG_RETURN(0);
3709 3710
}

3711

unknown's avatar
unknown committed
3712
/*
3713 3714
  queue_event()

unknown's avatar
unknown committed
3715 3716 3717
*/

int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
3718
{
unknown's avatar
unknown committed
3719 3720 3721
  int error= 0;
  ulong inc_pos;
  RELAY_LOG_INFO *rli= &mi->rli;
unknown's avatar
unknown committed
3722 3723
  DBUG_ENTER("queue_event");

3724
  if (mi->old_format)
unknown's avatar
unknown committed
3725
    DBUG_RETURN(queue_old_event(mi,buf,event_len));
3726 3727

  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
3728

unknown's avatar
unknown committed
3729 3730
  /*
    TODO: figure out if other events in addition to Rotate
3731 3732
    require special processing.
    Guilhem 2003-06 : I don't think so.
unknown's avatar
unknown committed
3733 3734
  */
  switch (buf[EVENT_TYPE_OFFSET]) {
3735
  case STOP_EVENT:
3736 3737
    /*
      We needn't write this event to the relay log. Indeed, it just indicates a
unknown's avatar
unknown committed
3738 3739 3740 3741
      master server shutdown. The only thing this does is cleaning. But
      cleaning is already done on a per-master-thread basis (as the master
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
      and DO RELEASE_LOCK; prepared statements' deletion are TODO).
3742
      
unknown's avatar
unknown committed
3743 3744 3745 3746
      We don't even increment mi->master_log_pos, because we may be just after
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
      event from the next binlog (unless the master is presently running
      without --log-bin).
3747 3748
    */
    goto err;
3749 3750 3751
  case ROTATE_EVENT:
  {
    Rotate_log_event rev(buf,event_len,0);
3752
    if (unlikely(process_io_rotate(mi,&rev)))
unknown's avatar
unknown committed
3753
    {
3754 3755
      error= 1;
      goto err;
unknown's avatar
unknown committed
3756
    }
3757 3758 3759 3760
    /*
      Now the I/O thread has just changed its mi->master_log_name, so
      incrementing mi->master_log_pos is nonsense.
    */
unknown's avatar
unknown committed
3761
    inc_pos= 0;
3762 3763 3764
    break;
  }
  default:
unknown's avatar
unknown committed
3765
    inc_pos= event_len;
3766 3767
    break;
  }
3768 3769 3770 3771

  /* 
     If this event is originating from this server, don't queue it. 
     We don't check this for 3.23 events because it's simpler like this; 3.23
unknown's avatar
unknown committed
3772 3773
     will be filtered anyway by the SQL slave thread which also tests the
     server id (we must also keep this test in the SQL thread, in case somebody
3774 3775 3776 3777 3778 3779 3780 3781
     upgrades a 4.0 slave which has a not-filtered relay log).

     ANY event coming from ourselves can be ignored: it is obvious for queries;
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
     (--log-slave-updates would not log that) unless this slave is also its
     direct master (an unsupported, useless setup!).
  */

unknown's avatar
unknown committed
3782 3783
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
      !replicate_same_server_id)
3784
  {
3785 3786 3787 3788 3789 3790
    /*
      Do not write it to the relay log.
      We still want to increment, so that we won't re-read this event from the
      master if the slave IO thread is now stopped/restarted (more efficient if
      the events we are ignoring are big LOAD DATA INFILE).
    */
unknown's avatar
unknown committed
3791
    mi->master_log_pos+= inc_pos;
3792 3793
    DBUG_PRINT("info", ("master_log_pos: %d, event originating from the same server, ignored", (ulong) mi->master_log_pos));
  }  
unknown's avatar
unknown committed
3794 3795 3796
  else
  {
    /* write the event to the relay log */
3797 3798 3799 3800 3801 3802
    if (likely(!(error= rli->relay_log.appendv(buf,event_len,0))))
    {
      mi->master_log_pos+= inc_pos;
      DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
    }
unknown's avatar
unknown committed
3803
  }
3804 3805

err:
3806
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
3807
  DBUG_RETURN(error);
3808 3809
}

3810

3811 3812
void end_relay_log_info(RELAY_LOG_INFO* rli)
{
3813 3814
  DBUG_ENTER("end_relay_log_info");

3815
  if (!rli->inited)
3816
    DBUG_VOID_RETURN;
3817
  if (rli->info_fd >= 0)
unknown's avatar
unknown committed
3818 3819
  {
    end_io_cache(&rli->info_file);
3820
    (void) my_close(rli->info_fd, MYF(MY_WME));
unknown's avatar
unknown committed
3821 3822
    rli->info_fd = -1;
  }
3823
  if (rli->cur_log_fd >= 0)
unknown's avatar
unknown committed
3824 3825 3826 3827 3828
  {
    end_io_cache(&rli->cache_buf);
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
3829
  rli->inited = 0;
3830
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
unknown's avatar
unknown committed
3831 3832 3833 3834 3835 3836
  /*
    Delete the slave's temporary tables from memory.
    In the future there will be other actions than this, to ensure persistance
    of slave's temp tables after shutdown.
  */
  rli->close_temporary_tables();
3837
  DBUG_VOID_RETURN;
3838 3839
}

unknown's avatar
unknown committed
3840 3841
/*
  Try to connect until successful or slave killed
3842

unknown's avatar
unknown committed
3843 3844 3845 3846 3847
  SYNPOSIS
    safe_connect()
    thd			Thread handler for slave
    mysql		MySQL connection handle
    mi			Replication handle
3848

unknown's avatar
unknown committed
3849 3850 3851 3852
  RETURN
    0	ok
    #	Error
*/
3853

3854
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
unknown's avatar
unknown committed
3855
{
unknown's avatar
unknown committed
3856
  return connect_to_master(thd, mysql, mi, 0, 0);
unknown's avatar
unknown committed
3857 3858
}

3859

3860
/*
unknown's avatar
unknown committed
3861 3862
  SYNPOSIS
    connect_to_master()
unknown's avatar
unknown committed
3863

unknown's avatar
unknown committed
3864 3865 3866
  IMPLEMENTATION
    Try to connect until successful or slave killed or we have retried
    master_retry_count times
3867
*/
unknown's avatar
unknown committed
3868

unknown's avatar
unknown committed
3869
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
3870
			     bool reconnect, bool suppress_warnings)
unknown's avatar
unknown committed
3871
{
3872
  int slave_was_killed;
3873 3874
  int last_errno= -2;				// impossible error
  ulong err_count=0;
unknown's avatar
unknown committed
3875
  char llbuff[22];
3876
  DBUG_ENTER("connect_to_master");
unknown's avatar
unknown committed
3877

unknown's avatar
unknown committed
3878 3879 3880
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif
3881
  ulong client_flag= CLIENT_REMEMBER_OPTIONS;
3882 3883 3884
  if (opt_slave_compressed_protocol)
    client_flag=CLIENT_COMPRESS;		/* We will use compression */

3885 3886
  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
unknown's avatar
unknown committed
3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897
 
#ifdef HAVE_OPENSSL
  if (mi->ssl)
    mysql_ssl_set(mysql, 
                  mi->ssl_key[0]?mi->ssl_key:0,
                  mi->ssl_cert[0]?mi->ssl_cert:0, 
                  mi->ssl_ca[0]?mi->ssl_ca:0,
                  mi->ssl_capath[0]?mi->ssl_capath:0,
                  mi->ssl_cipher[0]?mi->ssl_cipher:0);
#endif

3898 3899 3900 3901
  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
  /* This one is not strictly needed but we have it here for completeness */
  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);

3902
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3903 3904 3905
	 (reconnect ? mysql_reconnect(mysql) != 0 :
	  mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
			     mi->port, 0, client_flag) == 0))
unknown's avatar
unknown committed
3906
  {
3907
    /* Don't repeat last error */
unknown's avatar
SCRUM  
unknown committed
3908
    if ((int)mysql_errno(mysql) != last_errno)
3909
    {
unknown's avatar
SCRUM  
unknown committed
3910
      last_errno=mysql_errno(mysql);
unknown's avatar
unknown committed
3911
      suppress_warnings= 0;
3912
      sql_print_error("Slave I/O thread: error %s to master \
3913
'%s@%s:%d': \
3914
Error: '%s'  errno: %d  retry-time: %d  retries: %d",
3915
		      (reconnect ? "reconnecting" : "connecting"),
3916
		      mi->user,mi->host,mi->port,
unknown's avatar
SCRUM  
unknown committed
3917
		      mysql_error(mysql), last_errno,
3918 3919
		      mi->connect_retry,
		      master_retry_count);
3920
    }
unknown's avatar
unknown committed
3921 3922 3923
    /*
      By default we try forever. The reason is that failure will trigger
      master election, so if the user did not set master_retry_count we
3924
      do not want to have election triggered on the first failure to
unknown's avatar
unknown committed
3925
      connect
3926
    */
3927
    if (++err_count == master_retry_count)
3928 3929
    {
      slave_was_killed=1;
unknown's avatar
unknown committed
3930 3931
      if (reconnect)
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3932 3933
      break;
    }
3934 3935
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
	       (void*)mi);
unknown's avatar
unknown committed
3936
  }
3937

3938 3939
  if (!slave_was_killed)
  {
unknown's avatar
unknown committed
3940
    if (reconnect)
unknown's avatar
unknown committed
3941
    { 
3942
      if (!suppress_warnings && global_system_variables.log_warnings)
unknown's avatar
unknown committed
3943
	sql_print_error("Slave: connected to master '%s@%s:%d',\
3944
replication resumed in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
3945 3946 3947 3948
			mi->host, mi->port,
			IO_RPL_LOG_NAME,
			llstr(mi->master_log_pos,llbuff));
    }
unknown's avatar
unknown committed
3949 3950 3951 3952
    else
    {
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
      mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
unknown's avatar
unknown committed
3953
		      mi->user, mi->host, mi->port);
unknown's avatar
unknown committed
3954
    }
3955
#ifdef SIGNAL_WITH_VIO_CLOSE
3956
    thd->set_active_vio(mysql->net.vio);
3957
#endif      
3958
  }
3959 3960
  DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
  DBUG_RETURN(slave_was_killed);
unknown's avatar
unknown committed
3961 3962
}

3963

unknown's avatar
unknown committed
3964
/*
3965
  safe_reconnect()
unknown's avatar
unknown committed
3966

unknown's avatar
unknown committed
3967 3968 3969
  IMPLEMENTATION
    Try to connect until successful or slave killed or we have retried
    master_retry_count times
unknown's avatar
unknown committed
3970 3971
*/

unknown's avatar
unknown committed
3972 3973
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings)
unknown's avatar
unknown committed
3974
{
unknown's avatar
unknown committed
3975 3976
  DBUG_ENTER("safe_reconnect");
  DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
unknown's avatar
unknown committed
3977 3978
}

unknown's avatar
unknown committed
3979

3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009
/*
  Store the file and position where the execute-slave thread are in the
  relay log.

  SYNOPSIS
    flush_relay_log_info()
    rli			Relay log information

  NOTES
    - As this is only called by the slave thread, we don't need to
      have a lock on this.
    - If there is an active transaction, then we don't update the position
      in the relay log.  This is to ensure that we re-execute statements
      if we die in the middle of an transaction that was rolled back.
    - As a transaction never spans binary logs, we don't have to handle the
      case where we do a relay-log-rotation in the middle of the transaction.
      If this would not be the case, we would have to ensure that we
      don't delete the relay log file where the transaction started when
      we switch to a new relay log file.

  TODO
    - Change the log file information to a binary format to avoid calling
      longlong2str.

  RETURN VALUES
    0	ok
    1	write error
*/

bool flush_relay_log_info(RELAY_LOG_INFO* rli)
4010
{
4011 4012 4013 4014
  bool error=0;
  IO_CACHE *file = &rli->info_file;
  char buff[FN_REFLEN*2+22*2+4], *pos;

4015
  my_b_seek(file, 0L);
4016
  pos=strmov(buff, rli->group_relay_log_name);
4017
  *pos++='\n';
4018
  pos=longlong2str(rli->group_relay_log_pos, pos, 10);
4019
  *pos++='\n';
4020
  pos=strmov(pos, rli->group_master_log_name);
4021
  *pos++='\n';
4022
  pos=longlong2str(rli->group_master_log_pos, pos, 10);
4023
  *pos='\n';
4024
  if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
4025 4026 4027
    error=1;
  if (flush_io_cache(file))
    error=1;
4028
  /* Flushing the relay log is done by the slave I/O thread */
4029
  return error;
4030 4031
}

4032

unknown's avatar
unknown committed
4033
/*
4034
  Called when we notice that the current "hot" log got rotated under our feet.
unknown's avatar
unknown committed
4035 4036 4037
*/

static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
4038 4039 4040
{
  DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
  DBUG_ASSERT(rli->cur_log_fd == -1);
unknown's avatar
unknown committed
4041 4042 4043
  DBUG_ENTER("reopen_relay_log");

  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
4044
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
unknown's avatar
unknown committed
4045
				   errmsg)) <0)
unknown's avatar
unknown committed
4046
    DBUG_RETURN(0);
4047 4048 4049 4050 4051
  /*
    We want to start exactly where we was before:
    relay_log_pos	Current log pos
    pending		Number of bytes already processed from the event
  */
4052
  my_b_seek(cur_log,rli->event_relay_log_pos);
unknown's avatar
unknown committed
4053
  DBUG_RETURN(cur_log);
4054 4055
}

unknown's avatar
unknown committed
4056

4057 4058 4059 4060
Log_event* next_event(RELAY_LOG_INFO* rli)
{
  Log_event* ev;
  IO_CACHE* cur_log = rli->cur_log;
4061
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); 
4062 4063
  const char* errmsg=0;
  THD* thd = rli->sql_thd;
4064
  
unknown's avatar
unknown committed
4065
  DBUG_ENTER("next_event");
4066 4067
  DBUG_ASSERT(thd != 0);

unknown's avatar
unknown committed
4068 4069
  /*
    For most operations we need to protect rli members with data_lock,
4070 4071 4072 4073
    so we assume calling function acquired this mutex for us and we will
    hold it for the most of the loop below However, we will release it
    whenever it is worth the hassle,  and in the cases when we go into a
    pthread_cond_wait() with the non-data_lock mutex
unknown's avatar
unknown committed
4074
  */
4075
  safe_mutex_assert_owner(&rli->data_lock);
4076
  
4077
  while (!sql_slave_killed(thd,rli))
unknown's avatar
unknown committed
4078 4079 4080
  {
    /*
      We can have two kinds of log reading:
unknown's avatar
unknown committed
4081 4082 4083 4084 4085 4086 4087 4088
      hot_log:
        rli->cur_log points at the IO_CACHE of relay_log, which
        is actively being updated by the I/O thread. We need to be careful
        in this case and make sure that we are not looking at a stale log that
        has already been rotated. If it has been, we reopen the log.

      The other case is much simpler:
        We just have a read only log that nobody else will be updating.
unknown's avatar
unknown committed
4089
    */
4090 4091 4092 4093 4094
    bool hot_log;
    if ((hot_log = (cur_log != &rli->cache_buf)))
    {
      DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
      pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
4095 4096

      /*
unknown's avatar
unknown committed
4097
	Reading xxx_file_id is safe because the log will only
unknown's avatar
unknown committed
4098 4099
	be rotated when we hold relay_log.LOCK_log
      */
unknown's avatar
unknown committed
4100
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
4101
      {
unknown's avatar
unknown committed
4102 4103 4104 4105
	// The master has switched to a new log file; Reopen the old log file
	cur_log=reopen_relay_log(rli, &errmsg);
	pthread_mutex_unlock(log_lock);
	if (!cur_log)				// No more log files
4106
	  goto err;
unknown's avatar
unknown committed
4107
	hot_log=0;				// Using old binary log
4108 4109
      }
    }
4110 4111 4112
#ifndef DBUG_OFF
    {
      char llbuf1[22], llbuf2[22];
unknown's avatar
unknown committed
4113 4114 4115 4116 4117 4118
      DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
      /*
	The next assertion sometimes (very rarely) fails, let's try to track
	it
      */
      DBUG_PRINT("info", ("\
unknown's avatar
unknown committed
4119
Before assert, my_b_tell(cur_log)=%s  rli->event_relay_log_pos=%s",
4120
                          llstr(my_b_tell(cur_log),llbuf1), 
unknown's avatar
unknown committed
4121
                          llstr(rli->group_relay_log_pos,llbuf2)));
unknown's avatar
unknown committed
4122
       DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
4123 4124
    }
#endif
unknown's avatar
unknown committed
4125 4126 4127
    /*
      Relay log is always in new format - if the master is 3.23, the
      I/O thread will convert the format for us
unknown's avatar
unknown committed
4128
    */
unknown's avatar
unknown committed
4129
    if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */)))
4130 4131 4132 4133
    {
      DBUG_ASSERT(thd==rli->sql_thd);
      if (hot_log)
	pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
4134
      DBUG_RETURN(ev);
4135 4136
    }
    DBUG_ASSERT(thd==rli->sql_thd);
unknown's avatar
unknown committed
4137
    if (opt_reckless_slave)			// For mysql-test
unknown's avatar
unknown committed
4138
      cur_log->error = 0;
unknown's avatar
unknown committed
4139
    if (cur_log->error < 0)
unknown's avatar
unknown committed
4140 4141
    {
      errmsg = "slave SQL thread aborted because of I/O error";
unknown's avatar
unknown committed
4142 4143
      if (hot_log)
	pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
4144 4145
      goto err;
    }
4146 4147
    if (!cur_log->error) /* EOF */
    {
unknown's avatar
unknown committed
4148 4149 4150 4151 4152
      /*
	On a hot log, EOF means that there are no more updates to
	process and we must block until I/O thread adds some and
	signals us to continue
      */
4153 4154
      if (hot_log)
      {
unknown's avatar
unknown committed
4155
	DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count);
unknown's avatar
unknown committed
4156 4157 4158 4159
	/*
	  We can, and should release data_lock while we are waiting for
	  update. If we do not, show slave status will block
	*/
4160
	pthread_mutex_unlock(&rli->data_lock);
4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182

        /*
          Possible deadlock : 
          - the I/O thread has reached log_space_limit
          - the SQL thread has read all relay logs, but cannot purge for some
          reason:
            * it has already purged all logs except the current one
            * there are other logs than the current one but they're involved in
            a transaction that finishes in the current one (or is not finished)
          Solution :
          Wake up the possibly waiting I/O thread, and set a boolean asking
          the I/O thread to temporarily ignore the log_space_limit
          constraint, because we do not want the I/O thread to block because of
          space (it's ok if it blocks for any other reason (e.g. because the
          master does not send anything). Then the I/O thread stops waiting 
          and reads more events.
          The SQL thread decides when the I/O thread should take log_space_limit
          into account again : ignore_log_space_limit is reset to 0 
          in purge_first_log (when the SQL thread purges the just-read relay
          log), and also when the SQL thread starts. We should also reset
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
          fact, no need as RESET SLAVE requires that the slave
unknown's avatar
unknown committed
4183 4184
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
          it stops.
4185 4186 4187 4188
        */
        pthread_mutex_lock(&rli->log_space_lock);
        // prevent the I/O thread from blocking next times
        rli->ignore_log_space_limit= 1; 
4189 4190 4191 4192 4193 4194
        /*
          If the I/O thread is blocked, unblock it.
          Ok to broadcast after unlock, because the mutex is only destroyed in
          ~st_relay_log_info(), i.e. when rli is destroyed, and rli will not be
          destroyed before we exit the present function.
        */
4195
        pthread_mutex_unlock(&rli->log_space_lock);
4196
        pthread_cond_broadcast(&rli->log_space_cond);
4197
        // Note that wait_for_update unlocks lock_log !
4198
        rli->relay_log.wait_for_update(rli->sql_thd, 1);
4199 4200
        // re-acquire data lock since we released it earlier
        pthread_mutex_lock(&rli->data_lock);
4201 4202
	continue;
      }
unknown's avatar
unknown committed
4203 4204 4205 4206 4207 4208 4209 4210 4211 4212
      /*
	If the log was not hot, we need to move to the next log in
	sequence. The next log could be hot or cold, we deal with both
	cases separately after doing some common initialization
      */
      end_io_cache(cur_log);
      DBUG_ASSERT(rli->cur_log_fd >= 0);
      my_close(rli->cur_log_fd, MYF(MY_WME));
      rli->cur_log_fd = -1;
	
4213
      if (relay_log_purge)
unknown's avatar
unknown committed
4214
      {
4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229
	/*
          purge_first_log will properly set up relay log coordinates in rli.
          If the group's coordinates are equal to the event's coordinates
          (i.e. the relay log was not rotated in the middle of a group),
          we can purge this relay log too.
          We do ulonglong and string comparisons, this may be slow but
          - purging the last relay log is nice (it can save 1GB of disk), so we
          like to detect the case where we can do it, and given this,
          - I see no better detection method
          - purge_first_log is not called that often
        */
	if (rli->relay_log.purge_first_log
            (rli,
             rli->group_relay_log_pos == rli->event_relay_log_pos
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
unknown's avatar
unknown committed
4230
	{
4231
	  errmsg = "Error purging processed logs";
unknown's avatar
unknown committed
4232 4233 4234
	  goto err;
	}
      }
4235 4236
      else
      {
unknown's avatar
unknown committed
4237
	/*
4238 4239 4240 4241 4242
	  If hot_log is set, then we already have a lock on
	  LOCK_log.  If not, we have to get the lock.

	  According to Sasha, the only time this code will ever be executed
	  is if we are recovering from a bug.
unknown's avatar
unknown committed
4243
	*/
4244
	if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
4245
	{
unknown's avatar
unknown committed
4246 4247
	  errmsg = "error switching to the next log";
	  goto err;
4248
	}
4249 4250 4251
	rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
	strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
		sizeof(rli->event_relay_log_name)-1);
unknown's avatar
unknown committed
4252 4253
	flush_relay_log_info(rli);
      }
4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267

      /*
        Now we want to open this next log. To know if it's a hot log (the one
        being written by the I/O thread now) or a cold log, we can use
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
        the file normally. But if is_active() reports that the log is hot, this
        may change between the test and the consequence of the test. So we may
        open the I/O cache whereas the log is now cold, which is nonsense.
        To guard against this, we need to have LOCK_log.
      */

      DBUG_PRINT("info",("hot_log: %d",hot_log));
      if (!hot_log) /* if hot_log, we already have this mutex */
        pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
4268 4269
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
      {
4270
#ifdef EXTRA_DEBUG
unknown's avatar
unknown committed
4271 4272 4273
	if (global_system_variables.log_warnings)
	  sql_print_error("next log '%s' is currently active",
			  rli->linfo.log_file_name);
4274
#endif	  
unknown's avatar
unknown committed
4275 4276 4277
	rli->cur_log= cur_log= rli->relay_log.get_log_file();
	rli->cur_log_old_open_count= rli->relay_log.get_open_count();
	DBUG_ASSERT(rli->cur_log_fd == -1);
4278
	  
unknown's avatar
unknown committed
4279
	/*
unknown's avatar
unknown committed
4280
	  Read pointer has to be at the start since we are the only
4281 4282 4283 4284
	  reader.
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
          log (same as when we call read_log_event() above: for a hot log we
          take the mutex).
unknown's avatar
unknown committed
4285
	*/
unknown's avatar
unknown committed
4286
	if (check_binlog_magic(cur_log,&errmsg))
4287 4288
        {
          if (!hot_log) pthread_mutex_unlock(log_lock);
4289
	  goto err;
4290 4291
        }
        if (!hot_log) pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
4292
	continue;
4293
      }
4294
      if (!hot_log) pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
4295
      /*
4296 4297 4298
	if we get here, the log was not hot, so we will have to open it
	ourselves. We are sure that the log is still not hot now (a log can get
	from hot to cold, but not from cold to hot). No need for LOCK_log.
unknown's avatar
unknown committed
4299 4300
      */
#ifdef EXTRA_DEBUG
unknown's avatar
unknown committed
4301 4302 4303
      if (global_system_variables.log_warnings)
	sql_print_error("next log '%s' is not active",
			rli->linfo.log_file_name);
unknown's avatar
unknown committed
4304 4305 4306 4307 4308
#endif	  
      // open_binlog() will check the magic header
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
				       &errmsg)) <0)
	goto err;
4309
    }
unknown's avatar
unknown committed
4310
    else
4311
    {
unknown's avatar
unknown committed
4312 4313 4314 4315 4316 4317
      /*
	Read failed with a non-EOF error.
	TODO: come up with something better to handle this error
      */
      if (hot_log)
	pthread_mutex_unlock(log_lock);
4318
      sql_print_error("Slave SQL thread: I/O error reading \
unknown's avatar
unknown committed
4319
event(errno: %d  cur_log->error: %d)",
4320
		      my_errno,cur_log->error);
unknown's avatar
unknown committed
4321
      // set read position to the beginning of the event
4322
      my_b_seek(cur_log,rli->event_relay_log_pos);
unknown's avatar
unknown committed
4323 4324
      /* otherwise, we have had a partial read */
      errmsg = "Aborting slave SQL thread because of partial event read";
4325
      break;					// To end of function
4326 4327
    }
  }
4328
  if (!errmsg && global_system_variables.log_warnings)
4329
    errmsg = "slave SQL thread was killed";
unknown's avatar
unknown committed
4330

4331
err:
4332 4333
  if (errmsg)
    sql_print_error("Error reading relay log event: %s", errmsg);
unknown's avatar
unknown committed
4334
  DBUG_RETURN(0);
4335 4336
}

4337 4338 4339 4340 4341 4342 4343 4344 4345 4346 4347
/*
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
  because of size is simpler because when we do it we already have all relevant
  locks; here we don't, so this function is mainly taking locks). 
  Returns nothing as we cannot catch any error (MYSQL_LOG::new_file() is void).
*/

void rotate_relay_log(MASTER_INFO* mi)
{
  DBUG_ENTER("rotate_relay_log");
  RELAY_LOG_INFO* rli= &mi->rli;
unknown's avatar
unknown committed
4348 4349 4350 4351 4352 4353 4354

  lock_slave_threads(mi);
  pthread_mutex_lock(&rli->data_lock);
  /* 
     We need to test inited because otherwise, new_file() will attempt to lock
     LOCK_log, which may not be inited (if we're not a slave).
  */
4355 4356
  if (!rli->inited)
  {
unknown's avatar
unknown committed
4357
    DBUG_PRINT("info", ("rli->inited == 0"));
unknown's avatar
unknown committed
4358
    goto end;
4359
  }
unknown's avatar
unknown committed
4360

4361 4362
  /* If the relay log is closed, new_file() will do nothing. */
  rli->relay_log.new_file(1);
unknown's avatar
unknown committed
4363

4364 4365 4366 4367
  /*
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
    threads are started:
unknown's avatar
unknown committed
4368 4369 4370 4371 4372 4373
    relay_log_space decreases by the size of the deleted relay log, but does
    not increase, so flush-after-flush we may become negative, which is wrong.
    Even if this will be corrected as soon as a query is replicated on the
    slave (because the I/O thread will then call harvest_bytes_written() which
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
4374 4375 4376 4377
    If the log is closed, then this will just harvest the last writes, probably
    0 as they probably have been harvested.
  */
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
unknown's avatar
unknown committed
4378
end:
4379 4380 4381 4382 4383
  pthread_mutex_unlock(&rli->data_lock);
  unlock_slave_threads(mi);
  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
4384

unknown's avatar
unknown committed
4385 4386
#ifdef __GNUC__
template class I_List_iterator<i_string>;
unknown's avatar
unknown committed
4387
template class I_List_iterator<i_string_pair>;
unknown's avatar
unknown committed
4388
#endif
4389

unknown's avatar
SCRUM  
unknown committed
4390
#endif /* HAVE_REPLICATION */