sql_repl.cc 30.9 KB
Newer Older
1
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB & Sasha
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

// Sasha Pachev <sasha@mysql.com> is currently in charge of this file

#include "mysql_priv.h"
#include "sql_repl.h"
#include "sql_acl.h"
#include "log_event.h"
23
#include "mini_client.h"
unknown's avatar
unknown committed
24
#include <my_dir.h>
25
#include <assert.h>
26 27

extern const char* any_db;
28

29
int max_binlog_dump_events = 0; // unlimited
30
my_bool opt_sporadic_binlog_dump_fail = 0;
31
static int binlog_dump_count = 0;
unknown's avatar
unknown committed
32

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
{
  char magic[4];
  DBUG_ASSERT(my_b_tell(log) == 0);

  if (my_b_read(log, (byte*) magic, sizeof(magic)))
  {
    *errmsg = "I/O error reading the header from the binary log";
    sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
		    log->error);
    return 1;
  }
  if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
  {
    *errmsg = "Binlog has bad magic number;  It's not a binary log file that can be used by this version of MySQL";
    return 1;
  }
  return 0;
}

unknown's avatar
unknown committed
53
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
54
                             ulonglong position, const char**errmsg)
unknown's avatar
unknown committed
55
{
56
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN];
unknown's avatar
unknown committed
57 58 59
  memset(header, 0, 4); // when does not matter
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;

60
  char* p = log_file_name+dirname_length(log_file_name);
unknown's avatar
unknown committed
61
  uint ident_len = (uint) strlen(p);
62
  ulong event_len = ident_len + ROTATE_EVENT_OVERHEAD;
63
  int4store(header + SERVER_ID_OFFSET, server_id);
unknown's avatar
unknown committed
64
  int4store(header + EVENT_LEN_OFFSET, event_len);
65
  int2store(header + FLAGS_OFFSET, 0);
66 67 68 69
  
  // TODO: check what problems this may cause and fix them
  int4store(header + LOG_POS_OFFSET, 0);
  
unknown's avatar
unknown committed
70
  packet->append(header, sizeof(header));
71
  /* We need to split the next statement because of problem with cxx */
72
  int4store(buf,position);
73
  int4store(buf+4,0);
74
  packet->append(buf, ROTATE_HEADER_LEN);
unknown's avatar
unknown committed
75
  packet->append(p,ident_len);
76 77 78 79 80
  if (my_net_write(net, (char*)packet->ptr(), packet->length()))
  {
    *errmsg = "failed on my_net_write()";
    return -1;
  }
unknown's avatar
unknown committed
81 82 83
  return 0;
}

84 85 86 87 88 89
static int send_file(THD *thd)
{
  NET* net = &thd->net;
  int fd = -1,bytes, error = 1;
  char fname[FN_REFLEN+1];
  const char *errmsg = 0;
unknown's avatar
unknown committed
90 91
  int old_timeout;
  uint packet_len;
92
  char buf[IO_SIZE];				// It's safe to alloc this
93 94
  DBUG_ENTER("send_file");

unknown's avatar
unknown committed
95 96 97 98 99 100 101 102 103 104 105
  /*
    The client might be slow loading the data, give him wait_timeout to do
    the job
  */
  old_timeout = thd->net.read_timeout;
  thd->net.read_timeout = thd->variables.net_wait_timeout;

  /*
    We need net_flush here because the client will not know it needs to send
    us the file name until it has processed the load event entry
  */
unknown's avatar
unknown committed
106
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
107
  {
108
    errmsg = "while reading file name";
109 110 111
    goto err;
  }

112 113 114
  // terminate with \0 for fn_format
  *((char*)net->read_pos +  packet_len) = 0;
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
115 116 117 118
  // this is needed to make replicate-ignore-db
  if (!strcmp(fname,"/dev/null"))
    goto end;

119
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
120
  {
121
    errmsg = "on open of file";
122 123 124
    goto err;
  }

125
  while ((bytes = (int) my_read(fd, (byte*) buf, IO_SIZE, MYF(0))) > 0)
126 127 128
  {
    if (my_net_write(net, buf, bytes))
    {
129
      errmsg = "while writing data to client";
130 131 132 133 134 135 136 137
      goto err;
    }
  }

 end:
  if (my_net_write(net, "", 0) || net_flush(net) ||
      (my_net_read(net) == packet_error))
  {
138
    errmsg = "while negotiating file transfer close";
139 140 141 142 143
    goto err;
  }
  error = 0;

 err:
unknown's avatar
unknown committed
144
  thd->net.read_timeout = old_timeout;
145 146
  if (fd >= 0)
    (void) my_close(fd, MYF(0));
147 148
  if (errmsg)
  {
149
    sql_print_error("Failed in send_file() %s", errmsg);
150 151 152 153 154
    DBUG_PRINT("error", (errmsg));
  }
  DBUG_RETURN(error);
}

155

156
File open_binlog(IO_CACHE *log, const char *log_file_name,
157
		 const char **errmsg)
158 159
{
  File file;
unknown's avatar
unknown committed
160
  DBUG_ENTER("open_binlog");
161

162 163 164 165 166 167 168 169
  if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0)
  {
    sql_print_error("Failed to open log (\
file '%s', errno %d)", log_file_name, my_errno);
    *errmsg = "Could not open log file";	// This will not be sent
    goto err;
  }
  if (init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
unknown's avatar
unknown committed
170
		    MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))
171
  {
172 173
    sql_print_error("Failed to create a cache on log (\
file '%s')", log_file_name);
174
    *errmsg = "Could not open log file";	// This will not be sent
175 176
    goto err;
  }
177
  if (check_binlog_magic(log,errmsg))
178
    goto err;
unknown's avatar
unknown committed
179
  DBUG_RETURN(file);
180 181

err:
182 183
  if (file >= 0)
  {
184
    my_close(file,MYF(0));
185 186
    end_io_cache(log);
  }
unknown's avatar
unknown committed
187
  DBUG_RETURN(-1);
188 189 190
}


191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
/*
  Adjust the position pointer in the binary log file for all running slaves

  SYNOPSIS
    adjust_linfo_offsets()
    purge_offset	Number of bytes removed from start of log index file

  NOTES
    - This is called when doing a PURGE when we delete lines from the
      index log file

  REQUIREMENTS
    - Before calling this function, we have to ensure that no threads are
      using any binary log file before purge_offset.a

  TODO
    - Inform the slave threads that they should sync the position
      in the binary log file with flush_relay_log_info.
      Now they sync is done for next read.
*/

unknown's avatar
unknown committed
212 213 214
void adjust_linfo_offsets(my_off_t purge_offset)
{
  THD *tmp;
215

unknown's avatar
unknown committed
216 217 218
  pthread_mutex_lock(&LOCK_thread_count);
  I_List_iterator<THD> it(threads);

219 220 221 222 223 224
  while ((tmp=it++))
  {
    LOG_INFO* linfo;
    if ((linfo = tmp->current_linfo))
    {
      pthread_mutex_lock(&linfo->lock);
225 226 227 228
      /*
	Index file offset can be less that purge offset only if
	we just started reading the index file. In that case
	we have nothing to adjust
229
      */
230 231 232 233 234 235 236
      if (linfo->index_file_offset < purge_offset)
	linfo->fatal = (linfo->index_file_offset != 0);
      else
	linfo->index_file_offset -= purge_offset;
      pthread_mutex_unlock(&linfo->lock);
    }
  }
unknown's avatar
unknown committed
237 238 239
  pthread_mutex_unlock(&LOCK_thread_count);
}

unknown's avatar
merge  
unknown committed
240

unknown's avatar
unknown committed
241 242 243 244 245
bool log_in_use(const char* log_name)
{
  int log_name_len = strlen(log_name) + 1;
  THD *tmp;
  bool result = 0;
246

unknown's avatar
unknown committed
247 248
  pthread_mutex_lock(&LOCK_thread_count);
  I_List_iterator<THD> it(threads);
249 250 251 252 253

  while ((tmp=it++))
  {
    LOG_INFO* linfo;
    if ((linfo = tmp->current_linfo))
unknown's avatar
unknown committed
254
    {
255 256 257
      pthread_mutex_lock(&linfo->lock);
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
      pthread_mutex_unlock(&linfo->lock);
258 259
      if (result)
	break;
260 261
    }
  }
unknown's avatar
unknown committed
262 263 264 265 266

  pthread_mutex_unlock(&LOCK_thread_count);
  return result;
}

unknown's avatar
merge  
unknown committed
267

unknown's avatar
unknown committed
268 269 270
int purge_master_logs(THD* thd, const char* to_log)
{
  char search_file_name[FN_REFLEN];
271 272
  const char* errmsg = 0;

unknown's avatar
unknown committed
273 274
  mysql_bin_log.make_log_name(search_file_name, to_log);
  int res = mysql_bin_log.purge_logs(thd, search_file_name);
275 276 277 278 279 280

  switch(res)  {
  case 0: break;
  case LOG_INFO_EOF:	 errmsg = "Target log not found in binlog index"; break;
  case LOG_INFO_IO:	 errmsg = "I/O error reading log index file"; break;
  case LOG_INFO_INVALID: errmsg = "Server configuration does not permit \
unknown's avatar
unknown committed
281
binlog purge"; break;
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
  case LOG_INFO_SEEK:	errmsg = "Failed on fseek()"; break;
  case LOG_INFO_PURGE_NO_ROTATE: errmsg = "Cannot purge unrotatable log";
    break;
  case LOG_INFO_MEM:	errmsg = "Out of memory"; break;
  case LOG_INFO_FATAL:	errmsg = "Fatal error during purge"; break;
  case LOG_INFO_IN_USE: errmsg = "A purgeable log is in use, will not purge";
    break;
  default:		errmsg = "Unknown error during purge"; break;
  }

  if (errmsg)
  {
    send_error(&thd->net, 0, errmsg);
    return 1;
  }
unknown's avatar
unknown committed
297 298
  else
    send_ok(&thd->net);
299

unknown's avatar
unknown committed
300
  return 0;
unknown's avatar
unknown committed
301 302
}

303 304 305
/*
  TODO: Clean up loop to only have one call to send_file()
*/
unknown's avatar
merge  
unknown committed
306

307 308
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
		       ushort flags)
309 310 311
{
  LOG_INFO linfo;
  char *log_file_name = linfo.log_file_name;
312
  char search_file_name[FN_REFLEN], *name;
313 314
  IO_CACHE log;
  File file = -1;
315 316 317 318
  String* packet = &thd->packet;
  int error;
  const char *errmsg = "Unknown error";
  NET* net = &thd->net;
319 320
#ifndef DBUG_OFF
  int left_events = max_binlog_dump_events;
321
#endif
322
  DBUG_ENTER("mysql_binlog_send");
323 324
  DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));

325
  bzero((char*) &log,sizeof(log));
326

unknown's avatar
unknown committed
327
#ifndef DBUG_OFF
328 329 330
  if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
  {
    errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
331
    my_errno= ER_UNKNOWN_ERROR;
332 333
    goto err;
  }
334
#endif
335

336
  if (!mysql_bin_log.is_open())
337 338
  {
    errmsg = "Binary log is not open";
339
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
340 341
    goto err;
  }
342 343 344
  if (!server_id_supplied)
  {
    errmsg = "Misconfigured master - server id was not set";
345
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
346 347 348
    goto err;
  }

349
  name=search_file_name;
350
  if (log_ident[0])
351 352
    mysql_bin_log.make_log_name(search_file_name, log_ident);
  else
353
    name=0;					// Find first log
354

unknown's avatar
unknown committed
355 356
  linfo.index_file_offset = 0;
  thd->current_linfo = &linfo;
357

358
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
359
  {
360
    errmsg = "Could not find first log file name in binary log index file";
361
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
362 363
    goto err;
  }
364

365
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
366 367
  {
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
368
    goto err;
369 370
  }
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
371
  {
372
    errmsg= "Client requested master to start replication from \
unknown's avatar
unknown committed
373
impossible position";
374
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
375 376
    goto err;
  }
377

378
  my_b_seek(&log, pos);				// Seek will done on next read
379 380 381 382 383
  /*
    We need to start a packet with something other than 255
    to distiquish it from error
  */
  packet->set("\0", 1);
384

385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
  /*
    Before 4.0.14 we called fake_rotate_event below only if 
    (pos == BIN_LOG_HEADER_SIZE), because if this is false then the slave
    already knows the binlog's name.
    Now we always call fake_rotate_event; if the slave already knew the log's
    name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is useless but does not
    harm much. It is nice for 3.23 (>=.58) slaves which test Rotate events
    to see if the master is 4.0 (then they choose to stop because they can't
    replicate 4.0); by always calling fake_rotate_event we are sure that 3.23.58
    and newer will detect the problem as soon as replication starts (BUG#198).
    Always calling fake_rotate_event makes sending of normal
    (=from-binlog) Rotate events a priori unneeded, but it is not so simple: the
    2 Rotate events are not equivalent, the normal one is before the Stop event,
    the fake one is after. If we don't send the normal one, then the Stop event
    will be interpreted (by existing 4.0 slaves) as "the master stopped", which
    is wrong. So for safety, given that we want minimum modification of 4.0, we
    send the normal and fake Rotates.
  */
  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
unknown's avatar
unknown committed
404
  {
405 406
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
    goto err;
unknown's avatar
unknown committed
407
  }
408
  packet->set("\0", 1);
unknown's avatar
unknown committed
409

unknown's avatar
unknown committed
410
  while (!net->error && net->vio != 0 && !thd->killed)
411 412
  {
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
413

414 415
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
    {
416
#ifndef DBUG_OFF
417
      if (max_binlog_dump_events && !left_events--)
418 419 420
      {
	net_flush(net);
	errmsg = "Debugging binlog dump abort";
421
	my_errno= ER_UNKNOWN_ERROR;
422 423
	goto err;
      }
424
#endif
unknown's avatar
unknown committed
425
      if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
426 427
      {
	errmsg = "Failed on my_net_write()";
428
	my_errno= ER_UNKNOWN_ERROR;
429 430 431 432
	goto err;
      }
      DBUG_PRINT("info", ("log event code %d",
			  (*packet)[LOG_EVENT_OFFSET+1] ));
unknown's avatar
unknown committed
433
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
434
      {
435
	if (send_file(thd))
436
	{
437
	  errmsg = "failed in send_file()";
438
	  my_errno= ER_UNKNOWN_ERROR;
439 440
	  goto err;
	}
441
      }
442
      packet->set("\0", 1);
443
    }
444 445 446 447
    /*
      TODO: now that we are logging the offset, check to make sure
      the recorded offset and the actual match
    */
unknown's avatar
unknown committed
448
    if (error != LOG_READ_EOF)
449
    {
450
      my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
451
      switch (error) {
452
      case LOG_READ_BOGUS:
453 454
	errmsg = "bogus data in log event";
	break;
455
      case LOG_READ_TOO_LARGE:
456 457
	errmsg = "log event entry exceeded max_allowed_packet; \
Increase max_allowed_packet on master";
unknown's avatar
unknown committed
458
	break;
459 460 461 462 463 464 465 466 467
      case LOG_READ_IO:
	errmsg = "I/O error reading log event";
	break;
      case LOG_READ_MEM:
	errmsg = "memory allocation failed reading log event";
	break;
      case LOG_READ_TRUNC:
	errmsg = "binlog truncated in the middle of event";
	break;
unknown's avatar
unknown committed
468 469 470
      default:
	errmsg = "unknown error reading log event on the master";
	break;
471 472 473
      }
      goto err;
    }
474

475
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
476
       mysql_bin_log.is_active(log_file_name))
477
    {
478 479 480
      /*
	Block until there is more data in the log
      */
481
      if (net_flush(net))
482 483
      {
	errmsg = "failed on net_flush()";
484
	my_errno= ER_UNKNOWN_ERROR;
485 486
	goto err;
      }
487

488 489 490 491 492 493
      /*
	We may have missed the update broadcast from the log
	that has just happened, let's try to catch it if it did.
	If we did not miss anything, we just wait for other threads
	to signal us.
      */
494 495 496 497
      {
	log.error=0;
	bool read_packet = 0, fatal_error = 0;

498
#ifndef DBUG_OFF
499
	if (max_binlog_dump_events && !left_events--)
500 501
	{
	  errmsg = "Debugging binlog dump abort";
502
	  my_errno= ER_UNKNOWN_ERROR;
503 504
	  goto err;
	}
505
#endif
506

507 508 509 510
	/*
	  No one will update the log while we are reading
	  now, but we'll be quick and just read one record

511 512 513 514
	  TODO:
	  Add an counter that is incremented for each time we update
	  the binary log.  We can avoid the following read if the counter
	  has not been updated since last read.
515
	*/
516

unknown's avatar
unknown committed
517
	pthread_mutex_lock(log_lock);
518
	switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
519
	case 0:
520
	  /* we read successfully, so we'll need to send it to the slave */
521
	  pthread_mutex_unlock(log_lock);
522 523
	  read_packet = 1;
	  break;
524

525
	case LOG_READ_EOF:
526
	  DBUG_PRINT("wait",("waiting for data in binary log"));
unknown's avatar
unknown committed
527
	  if (!thd->killed)
528 529
	  {
	    /* Note that the following call unlocks lock_log */
530
	    mysql_bin_log.wait_for_update(thd);
531 532 533
	  }
	  else
	    pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
534
	  DBUG_PRINT("wait",("binary log received update"));
535 536 537
	  break;

	default:
538
	  pthread_mutex_unlock(log_lock);
539 540
	  fatal_error = 1;
	  break;
541
	}
542

543
	if (read_packet)
544 545
	{
	  thd->proc_info = "sending update to slave";
546
	  if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
547 548
	  {
	    errmsg = "Failed on my_net_write()";
549
	    my_errno= ER_UNKNOWN_ERROR;
550 551
	    goto err;
	  }
unknown's avatar
unknown committed
552

553
	  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
554
	  {
555
	    if (send_file(thd))
unknown's avatar
unknown committed
556
	    {
557
	      errmsg = "failed in send_file()";
558
	      my_errno= ER_UNKNOWN_ERROR;
unknown's avatar
unknown committed
559 560
	      goto err;
	    }
561
	  }
562 563 564 565 566
	  packet->set("\0", 1);
	  /*
	    No need to net_flush because we will get to flush later when
	    we hit EOF pretty quick
	  */
567
	}
568

569
	if (fatal_error)
570 571
	{
	  errmsg = "error reading log entry";
572
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
573
	  goto err;
574
	}
575 576
	log.error=0;
      }
577
    }
578 579 580 581 582
    else
    {
      bool loop_breaker = 0;
      // need this to break out of the for loop from switch
      thd->proc_info = "switching to next log";
583
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
584 585 586 587 588 589 590
      case LOG_INFO_EOF:
	loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
	break;
      case 0:
	break;
      default:
	errmsg = "could not find next log";
591
	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
592 593 594
	goto err;
      }

595
      if (loop_breaker)
596 597 598 599
	break;

      end_io_cache(&log);
      (void) my_close(file, MYF(MY_WME));
600

601 602 603 604
      /*
        Even if the previous log contained a Rotate_log_event, we still fake
        one.
      */
605
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
606
	  fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE, &errmsg))
607 608
      {
	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
unknown's avatar
unknown committed
609
	goto err;
610
      }
unknown's avatar
unknown committed
611 612
      packet->length(0);
      packet->append("\0",1);
613
    }
614
  }
615

616 617
  end_io_cache(&log);
  (void)my_close(file, MYF(MY_WME));
618

619
  send_eof(&thd->net);
620
  thd->proc_info = "waiting to finalize termination";
unknown's avatar
unknown committed
621 622 623
  pthread_mutex_lock(&LOCK_thread_count);
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
624
  DBUG_VOID_RETURN;
625

626
 err:
627
  thd->proc_info = "waiting to finalize termination";
628
  end_io_cache(&log);
unknown's avatar
unknown committed
629 630 631 632 633 634 635
  /*
    Exclude  iteration through thread list
    this is needed for purge_logs() - it will iterate through
    thread list and update thd->current_linfo->index_file_offset
    this mutex will make sure that it never tried to update our linfo
    after we return from this stack frame
  */
636
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
637 638
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
639 640
  if (file >= 0)
    (void) my_close(file, MYF(MY_WME));
unknown's avatar
unknown committed
641
  send_error(&thd->net, my_errno, errmsg);
642 643 644
  DBUG_VOID_RETURN;
}

645
int start_slave(THD* thd , MASTER_INFO* mi,  bool net_report)
646
{
647
  int slave_errno = 0;
648 649
  if (!thd) thd = current_thd;
  NET* net = &thd->net;
650
  int thread_mask;
651
  DBUG_ENTER("start_slave");
652
  
unknown's avatar
unknown committed
653
  if (check_access(thd, SUPER_ACL, any_db))
654
    DBUG_RETURN(1);
655 656
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
657 658
  if (thd->lex.slave_thd_opt)
    thread_mask &= thd->lex.slave_thd_opt;
659 660
  if (thread_mask)
  {
661 662 663
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0))
      slave_errno=ER_MASTER_INFO;
    else if (server_id_supplied && *mi->host)
664 665 666 667 668
      slave_errno = start_slave_threads(0 /*no mutex */,
					1 /* wait for start */,
					mi,
					master_info_file,relay_log_info_file,
					thread_mask);
669 670 671
    else
      slave_errno = ER_BAD_SLAVE;
  }
672
  else
673
    slave_errno = ER_SLAVE_MUST_STOP;
674 675 676
  
  unlock_slave_threads(mi);
  
677 678
  if (slave_errno)
  {
679 680
    if (net_report)
      send_error(net, slave_errno);
681
    DBUG_RETURN(1);
682 683
  }
  else if (net_report)
684 685
    send_ok(net);

686
  DBUG_RETURN(0);
687 688
}

689
int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
690
{
691
  int slave_errno = 0;
692 693 694
  if (!thd) thd = current_thd;
  NET* net = &thd->net;

unknown's avatar
unknown committed
695
  if (check_access(thd, SUPER_ACL, any_db))
696
    return 1;
697 698 699 700
  thd->proc_info = "Killing slave";
  int thread_mask;
  lock_slave_threads(mi);
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
701 702
  if (thd->lex.slave_thd_opt)
    thread_mask &= thd->lex.slave_thd_opt;
703 704 705 706
  slave_errno = (thread_mask) ?
    terminate_slave_threads(mi,thread_mask,
			    1 /*skip lock */) :    ER_SLAVE_NOT_RUNNING;
  unlock_slave_threads(mi);
707 708
  thd->proc_info = 0;

709 710 711 712 713 714 715
  if (slave_errno)
  {
    if (net_report)
      send_error(net, slave_errno);
    return 1;
  }
  else if (net_report)
716 717 718 719 720
    send_ok(net);

  return 0;
}

721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742

/*
  Remove all relay logs and start replication from the start

  SYNOPSIS
    reset_slave()
    thd			Thread handler
    mi			Master info for the slave


  NOTES
    We don't send ok in this functions as this is called from
    reload_acl_and_cache() which may have done other tasks, which may
    have failed for which we want to send and error.

  RETURN
    0	ok
    1	error
	In this case error is sent to the client with send_error()
*/


743
int reset_slave(THD *thd, MASTER_INFO* mi)
744 745 746
{
  MY_STAT stat_area;
  char fname[FN_REFLEN];
747 748
  int thread_mask= 0, error= 0;
  uint sql_errno=0;
749
  const char* errmsg=0;
unknown's avatar
unknown committed
750 751
  DBUG_ENTER("reset_slave");

752
  lock_slave_threads(mi);
753 754 755 756 757 758 759
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
  if (thread_mask) // We refuse if any slave thread is running
  {
    sql_errno= ER_SLAVE_MUST_STOP;
    error=1;
    goto err;
  }
760
  //delete relay logs, clear relay log coordinates
761 762 763
  if ((error= purge_relay_logs(&mi->rli, thd,
			       1 /* just reset */,
			       &errmsg)))
764 765
    goto err;
  
766 767 768
  //Clear master's log coordinates (only for good display of SHOW SLAVE STATUS)
  mi->master_log_name[0]= 0;
  mi->master_log_pos= BIN_LOG_HEADER_SIZE;
769 770 771
  //Clear the errors displayed by SHOW SLAVE STATUS
  mi->rli.last_slave_error[0]=0;
  mi->rli.last_slave_errno=0;
772
  //close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
773
  end_master_info(mi);
774
  //and delete these two files
775 776
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
  if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
777
  {
778 779
    error=1;
    goto err;
780
  }
781
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
782
  if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
783 784 785 786
  {
    error=1;
    goto err;
  }
787

788 789
err:
  unlock_slave_threads(mi);
790 791
  if (thd && error) 
    send_error(&thd->net, sql_errno, errmsg);
unknown's avatar
unknown committed
792
  DBUG_RETURN(error);
793 794
}

795

796 797 798
void kill_zombie_dump_threads(uint32 slave_server_id)
{
  pthread_mutex_lock(&LOCK_thread_count);
799 800 801
  I_List_iterator<THD> it(threads);
  THD *tmp;

802 803 804 805
  while ((tmp=it++))
  {
    if (tmp->command == COM_BINLOG_DUMP &&
       tmp->server_id == slave_server_id)
806
    {
807 808
      pthread_mutex_lock(&tmp->LOCK_delete);	// Lock from delete
      break;
809 810
    }
  }
811
  pthread_mutex_unlock(&LOCK_thread_count);
812 813 814 815 816 817 818 819 820 821
  if (tmp)
  {
    /*
      Here we do not call kill_one_thread() as
      it will be slow because it will iterate through the list
      again. We just to do kill the thread ourselves.
    */
    tmp->awake(1/*prepare to die*/);
    pthread_mutex_unlock(&tmp->LOCK_delete);
  }
822 823
}

824

825
int change_master(THD* thd, MASTER_INFO* mi)
826
{
827
  int thread_mask;
828
  const char* errmsg=0;
829
  bool need_relay_log_purge=1;
830 831
  DBUG_ENTER("change_master");

832
  lock_slave_threads(mi);
833 834
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
  if (thread_mask) // We refuse if any slave thread is running
835
  {
836
    net_printf(&thd->net,ER_SLAVE_MUST_STOP);
837
    unlock_slave_threads(mi);
838
    DBUG_RETURN(1);
839
  }
840

841 842
  thd->proc_info = "changing master";
  LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
843
  // TODO: see if needs re-write
844
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0))
845
  {
846
    send_error(&thd->net, ER_MASTER_INFO);
847
    unlock_slave_threads(mi);
848
    DBUG_RETURN(1);
849 850
  }

851 852 853 854
  /*
    Data lock not needed since we have already stopped the running threads,
    and we have the hold on the run locks which will keep all threads that
    could possibly modify the data structures from running
855
  */
856 857 858
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
  {
    // if we change host or port, we must reset the postion
859
    mi->master_log_name[0] = 0;
860
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
861
    mi->rli.pending = 0;
862
  }
863

864
  if (lex_mi->log_file_name)
865 866
    strmake(mi->master_log_name, lex_mi->log_file_name,
	    sizeof(mi->master_log_name));
867
  if (lex_mi->pos)
unknown's avatar
unknown committed
868
  {
869
    mi->master_log_pos= lex_mi->pos;
870
    mi->rli.pending = 0;
unknown's avatar
unknown committed
871
  }
872
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
873 874

  if (lex_mi->host)
875
    strmake(mi->host, lex_mi->host, sizeof(mi->host));
876
  if (lex_mi->user)
877
    strmake(mi->user, lex_mi->user, sizeof(mi->user));
878
  if (lex_mi->password)
879
    strmake(mi->password, lex_mi->password, sizeof(mi->password));
880
  if (lex_mi->port)
881
    mi->port = lex_mi->port;
882
  if (lex_mi->connect_retry)
883 884
    mi->connect_retry = lex_mi->connect_retry;

885 886
  if (lex_mi->relay_log_name)
  {
887
    need_relay_log_purge= 0;
888
    strmake(mi->rli.relay_log_name,lex_mi->relay_log_name,
unknown's avatar
unknown committed
889
	    sizeof(mi->rli.relay_log_name)-1);
890 891 892 893
  }

  if (lex_mi->relay_log_pos)
  {
894
    need_relay_log_purge= 0;
895 896 897
    mi->rli.relay_log_pos=lex_mi->relay_log_pos;
  }

898
  flush_master_info(mi);
899
  if (need_relay_log_purge)
900
  {
901
    mi->rli.skip_log_purge= 0;
902
    thd->proc_info="purging old relay logs";
903 904
    if (purge_relay_logs(&mi->rli, thd,
			 0 /* not only reset, but also reinit */,
905 906
			 &errmsg))
    {
907
      net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg);
908
      unlock_slave_threads(mi);
909
      DBUG_RETURN(1);
910 911 912 913 914
    }
  }
  else
  {
    const char* msg;
915
    mi->rli.skip_log_purge= 1;
916 917 918 919
    /* Relay log is already initialized */
    if (init_relay_log_pos(&mi->rli,
			   mi->rli.relay_log_name,
			   mi->rli.relay_log_pos,
920 921 922 923 924
			   0 /*no data lock*/,
			   &msg))
    {
      net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg);
      unlock_slave_threads(mi);
925
      DBUG_RETURN(1);
926
    }
927 928
  }
  mi->rli.master_log_pos = mi->master_log_pos;
929
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
930 931
  strmake(mi->rli.master_log_name,mi->master_log_name,
	  sizeof(mi->rli.master_log_name)-1);
932 933
  if (!mi->rli.master_log_name[0]) // uninitialized case
    mi->rli.master_log_pos=0;
934 935

  pthread_mutex_lock(&mi->rli.data_lock);
936
  mi->rli.abort_pos_wait++;
937
  pthread_cond_broadcast(&mi->data_cond);
938
  pthread_mutex_unlock(&mi->rli.data_lock);
939

940
  unlock_slave_threads(mi);
941
  thd->proc_info = 0;
942
  send_ok(&thd->net);
943
  DBUG_RETURN(0);
944 945
}

946
int reset_master(THD* thd)
947
{
948
  if (!mysql_bin_log.is_open())
949 950
  {
    my_error(ER_FLUSH_MASTER_BINLOG_CLOSED,  MYF(ME_BELL+ME_WAITTANG));
951
    return 1;
952
  }
953
  return mysql_bin_log.reset_logs(thd);
954 955
}

unknown's avatar
unknown committed
956 957 958 959
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
		   const char* log_file_name2, ulonglong log_pos2)
{
  int res;
960 961
  uint log_file_name1_len=  strlen(log_file_name1);
  uint log_file_name2_len=  strlen(log_file_name2);
unknown's avatar
unknown committed
962

963 964 965 966 967 968 969 970
  //  We assume that both log names match up to '.'
  if (log_file_name1_len == log_file_name2_len)
  {
    if ((res= strcmp(log_file_name1, log_file_name2)))
      return res;
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
  }
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
unknown's avatar
unknown committed
971 972
}

unknown's avatar
unknown committed
973

unknown's avatar
unknown committed
974 975 976 977 978 979 980
int show_binlog_events(THD* thd)
{
  DBUG_ENTER("show_binlog_events");
  List<Item> field_list;
  const char* errmsg = 0;
  IO_CACHE log;
  File file = -1;
981 982

  Log_event::init_show_field_list(&field_list);
unknown's avatar
unknown committed
983 984
  if (send_fields(thd, field_list, 1))
    DBUG_RETURN(-1);
985

unknown's avatar
unknown committed
986 987
  if (mysql_bin_log.is_open())
  {
988
    LEX_MASTER_INFO *lex_mi = &thd->lex.mi;
989
    ha_rows event_count, limit_start, limit_end;
990
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
991 992
    char search_file_name[FN_REFLEN], *name;
    const char *log_file_name = lex_mi->log_file_name;
993
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
994 995
    LOG_INFO linfo;
    Log_event* ev;
996
  
unknown's avatar
unknown committed
997 998 999
    limit_start = thd->lex.select->offset_limit;
    limit_end = thd->lex.select->select_limit + limit_start;

1000
    name= search_file_name;
unknown's avatar
unknown committed
1001 1002 1003
    if (log_file_name)
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
    else
1004
      name=0;					// Find first log
unknown's avatar
unknown committed
1005 1006 1007

    linfo.index_file_offset = 0;
    thd->current_linfo = &linfo;
1008

1009
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
unknown's avatar
unknown committed
1010 1011 1012 1013 1014 1015 1016 1017
    {
      errmsg = "Could not find target log";
      goto err;
    }

    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
      goto err;

1018
    pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
1019 1020
    my_b_seek(&log, pos);

1021 1022
    for (event_count = 0;
	 (ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,0)); )
unknown's avatar
unknown committed
1023 1024
    {
      if (event_count >= limit_start &&
1025 1026 1027 1028
	  ev->net_send(thd, linfo.log_file_name, pos))
      {
	errmsg = "Net error";
	delete ev;
1029
	pthread_mutex_unlock(log_lock);
1030 1031 1032 1033
	goto err;
      }

      pos = my_b_tell(&log);
unknown's avatar
unknown committed
1034 1035 1036 1037 1038 1039 1040 1041 1042
      delete ev;

      if (++event_count >= limit_end)
	break;
    }

    if (event_count < limit_end && log.error)
    {
      errmsg = "Wrong offset or I/O error";
1043
      pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
1044 1045
      goto err;
    }
1046

1047
    pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
1048 1049 1050 1051 1052 1053 1054 1055
  }

err:
  if (file >= 0)
  {
    end_io_cache(&log);
    (void) my_close(file, MYF(MY_WME));
  }
1056

unknown's avatar
unknown committed
1057 1058
  if (errmsg)
  {
unknown's avatar
unknown committed
1059 1060 1061
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
	     "SHOW BINLOG EVENTS", errmsg);
    DBUG_RETURN(-1);
unknown's avatar
unknown committed
1062 1063 1064
  }

  send_eof(&thd->net);
1065 1066 1067
  pthread_mutex_lock(&LOCK_thread_count);
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
1068 1069 1070
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1071

1072 1073 1074 1075 1076 1077 1078 1079 1080
int show_binlog_info(THD* thd)
{
  DBUG_ENTER("show_binlog_info");
  List<Item> field_list;
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
  field_list.push_back(new Item_empty_string("Position",20));
  field_list.push_back(new Item_empty_string("Binlog_do_db",20));
  field_list.push_back(new Item_empty_string("Binlog_ignore_db",20));

1081
  if (send_fields(thd, field_list, 1))
1082 1083 1084 1085
    DBUG_RETURN(-1);
  String* packet = &thd->packet;
  packet->length(0);

1086 1087 1088 1089 1090 1091 1092 1093 1094
  if (mysql_bin_log.is_open())
  {
    LOG_INFO li;
    mysql_bin_log.get_current_log(&li);
    int dir_len = dirname_length(li.log_file_name);
    net_store_data(packet, li.log_file_name + dir_len);
    net_store_data(packet, (longlong)li.pos);
    net_store_data(packet, &binlog_do_db);
    net_store_data(packet, &binlog_ignore_db);
1095 1096
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
      DBUG_RETURN(-1);
1097
  }
1098 1099 1100
  send_eof(&thd->net);
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1101

1102

1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
/*
  Send a lost of all binary logs to client

  SYNOPSIS
    show_binlogs()
    thd		Thread specific variable

  RETURN VALUES
    0	ok
    1	error  (Error message sent to client)
*/

unknown's avatar
unknown committed
1115 1116
int show_binlogs(THD* thd)
{
1117 1118
  const char *errmsg;
  IO_CACHE *index_file;
unknown's avatar
unknown committed
1119 1120 1121
  char fname[FN_REFLEN];
  NET* net = &thd->net;
  List<Item> field_list;
1122
  String *packet = &thd->packet;
1123
  uint length;
1124 1125

  if (!mysql_bin_log.is_open())
1126
  {
1127 1128 1129
    //TODO:  Replace with ER() error message
    errmsg= "You are not using binary logging";
    goto err_with_msg;
1130
  }
unknown's avatar
unknown committed
1131

1132
  field_list.push_back(new Item_empty_string("Log_name", 255));
1133
  if (send_fields(thd, field_list, 1))
1134
    return 1;
unknown's avatar
unknown committed
1135
  mysql_bin_log.lock_index();
1136 1137 1138 1139 1140 1141
  index_file=mysql_bin_log.get_index_file();
  
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);

  /* The file ends with EOF or empty line */
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1142 1143 1144
  {
    int dir_len = dirname_length(fname);
    packet->length(0);
1145 1146
    /* The -1 is for removing newline from fname */
    net_store_data(packet, fname + dir_len, length-1-dir_len);
1147
    if (my_net_write(net, (char*) packet->ptr(), packet->length()))
1148
      goto err;
1149
  }
unknown's avatar
unknown committed
1150
  mysql_bin_log.unlock_index();
1151
  send_eof(net);
unknown's avatar
unknown committed
1152
  return 0;
1153

1154
err_with_msg:
1155
  send_error(net, 0, errmsg);
1156 1157
err:
  mysql_bin_log.unlock_index();
1158
  return 1;
unknown's avatar
unknown committed
1159 1160
}

1161

1162 1163 1164 1165
int log_loaded_block(IO_CACHE* file)
{
  LOAD_FILE_INFO* lf_info;
  uint block_len ;
unknown's avatar
unknown committed
1166 1167 1168

  /* file->request_pos contains position where we started last read */
  char* buffer = (char*) file->request_pos;
unknown's avatar
unknown committed
1169
  if (!(block_len = (char*) file->read_end - (char*) buffer))
1170
    return 0;
unknown's avatar
unknown committed
1171
  lf_info = (LOAD_FILE_INFO*) file->arg;
1172 1173 1174 1175 1176 1177
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
      lf_info->last_pos_in_file >= file->pos_in_file)
    return 0;
  lf_info->last_pos_in_file = file->pos_in_file;
  if (lf_info->wrote_create_file)
  {
1178 1179
    Append_block_log_event a(lf_info->thd, buffer, block_len,
			     lf_info->log_delayed);
1180 1181 1182 1183 1184 1185
    mysql_bin_log.write(&a);
  }
  else
  {
    Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db,
			    lf_info->table_name, *lf_info->fields,
unknown's avatar
unknown committed
1186
			    lf_info->handle_dup, buffer,
1187
			    block_len, lf_info->log_delayed);
1188 1189
    mysql_bin_log.write(&c);
    lf_info->wrote_create_file = 1;
1190
    DBUG_SYNC_POINT("debug_lock.created_file_event",10);
1191 1192 1193
  }
  return 0;
}