sql_repl.cc 29.5 KB
Newer Older
1
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB & Sasha
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

// Sasha Pachev <sasha@mysql.com> is currently in charge of this file

#include "mysql_priv.h"
#include "sql_repl.h"
#include "sql_acl.h"
#include "log_event.h"
23
#include "mini_client.h"
unknown's avatar
unknown committed
24
#include <my_dir.h>
25
#include <assert.h>
26 27

extern const char* any_db;
28

29
int max_binlog_dump_events = 0; // unlimited
30
my_bool opt_sporadic_binlog_dump_fail = 0;
31
static int binlog_dump_count = 0;
unknown's avatar
unknown committed
32

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
{
  char magic[4];
  DBUG_ASSERT(my_b_tell(log) == 0);

  if (my_b_read(log, (byte*) magic, sizeof(magic)))
  {
    *errmsg = "I/O error reading the header from the binary log";
    sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
		    log->error);
    return 1;
  }
  if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
  {
    *errmsg = "Binlog has bad magic number;  It's not a binary log file that can be used by this version of MySQL";
    return 1;
  }
  return 0;
}

unknown's avatar
unknown committed
53 54 55
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
			     const char**errmsg)
{
56
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN];
unknown's avatar
unknown committed
57 58 59
  memset(header, 0, 4); // when does not matter
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;

60
  char* p = log_file_name+dirname_length(log_file_name);
unknown's avatar
unknown committed
61
  uint ident_len = (uint) strlen(p);
62
  ulong event_len = ident_len + ROTATE_EVENT_OVERHEAD;
63
  int4store(header + SERVER_ID_OFFSET, server_id);
unknown's avatar
unknown committed
64
  int4store(header + EVENT_LEN_OFFSET, event_len);
65
  int2store(header + FLAGS_OFFSET, 0);
66 67 68 69
  
  // TODO: check what problems this may cause and fix them
  int4store(header + LOG_POS_OFFSET, 0);
  
unknown's avatar
unknown committed
70
  packet->append(header, sizeof(header));
71 72 73
  /* We need to split the next statement because of problem with cxx */
  int4store(buf,4); // tell slave to skip magic number
  int4store(buf+4,0);
74
  packet->append(buf, ROTATE_HEADER_LEN);
unknown's avatar
unknown committed
75
  packet->append(p,ident_len);
76 77 78 79 80
  if (my_net_write(net, (char*)packet->ptr(), packet->length()))
  {
    *errmsg = "failed on my_net_write()";
    return -1;
  }
unknown's avatar
unknown committed
81 82 83
  return 0;
}

84 85 86 87 88 89
static int send_file(THD *thd)
{
  NET* net = &thd->net;
  int fd = -1,bytes, error = 1;
  char fname[FN_REFLEN+1];
  const char *errmsg = 0;
unknown's avatar
unknown committed
90 91
  int old_timeout;
  uint packet_len;
92
  char buf[IO_SIZE];				// It's safe to alloc this
93 94
  DBUG_ENTER("send_file");

unknown's avatar
unknown committed
95 96 97 98 99 100 101 102 103 104 105
  /*
    The client might be slow loading the data, give him wait_timeout to do
    the job
  */
  old_timeout = thd->net.read_timeout;
  thd->net.read_timeout = thd->variables.net_wait_timeout;

  /*
    We need net_flush here because the client will not know it needs to send
    us the file name until it has processed the load event entry
  */
unknown's avatar
unknown committed
106
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
107
  {
108
    errmsg = "while reading file name";
109 110 111
    goto err;
  }

112 113 114
  // terminate with \0 for fn_format
  *((char*)net->read_pos +  packet_len) = 0;
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
115 116 117 118
  // this is needed to make replicate-ignore-db
  if (!strcmp(fname,"/dev/null"))
    goto end;

119
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
120
  {
121
    errmsg = "on open of file";
122 123 124
    goto err;
  }

125
  while ((bytes = (int) my_read(fd, (byte*) buf, IO_SIZE, MYF(0))) > 0)
126 127 128
  {
    if (my_net_write(net, buf, bytes))
    {
129
      errmsg = "while writing data to client";
130 131 132 133 134 135 136 137
      goto err;
    }
  }

 end:
  if (my_net_write(net, "", 0) || net_flush(net) ||
      (my_net_read(net) == packet_error))
  {
138
    errmsg = "while negotiating file transfer close";
139 140 141 142 143
    goto err;
  }
  error = 0;

 err:
unknown's avatar
unknown committed
144
  thd->net.read_timeout = old_timeout;
145 146
  if (fd >= 0)
    (void) my_close(fd, MYF(0));
147 148
  if (errmsg)
  {
149
    sql_print_error("Failed in send_file() %s", errmsg);
150 151 152 153 154
    DBUG_PRINT("error", (errmsg));
  }
  DBUG_RETURN(error);
}

155

156
File open_binlog(IO_CACHE *log, const char *log_file_name,
157
		 const char **errmsg)
158 159
{
  File file;
unknown's avatar
unknown committed
160
  DBUG_ENTER("open_binlog");
161

162 163
  if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 ||
      init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
unknown's avatar
unknown committed
164
		    MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))
165
  {
166
    *errmsg = "Could not open log file";	// This will not be sent
167 168
    goto err;
  }
169
  if (check_binlog_magic(log,errmsg))
170
    goto err;
unknown's avatar
unknown committed
171
  DBUG_RETURN(file);
172 173

err:
174 175
  if (file >= 0)
  {
176
    my_close(file,MYF(0));
177 178
    end_io_cache(log);
  }
unknown's avatar
unknown committed
179
  DBUG_RETURN(-1);
180 181 182
}


183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
/*
  Adjust the position pointer in the binary log file for all running slaves

  SYNOPSIS
    adjust_linfo_offsets()
    purge_offset	Number of bytes removed from start of log index file

  NOTES
    - This is called when doing a PURGE when we delete lines from the
      index log file

  REQUIREMENTS
    - Before calling this function, we have to ensure that no threads are
      using any binary log file before purge_offset.a

  TODO
    - Inform the slave threads that they should sync the position
      in the binary log file with flush_relay_log_info.
      Now they sync is done for next read.
*/

unknown's avatar
unknown committed
204 205 206
void adjust_linfo_offsets(my_off_t purge_offset)
{
  THD *tmp;
207

unknown's avatar
unknown committed
208 209 210
  pthread_mutex_lock(&LOCK_thread_count);
  I_List_iterator<THD> it(threads);

211 212 213 214 215 216
  while ((tmp=it++))
  {
    LOG_INFO* linfo;
    if ((linfo = tmp->current_linfo))
    {
      pthread_mutex_lock(&linfo->lock);
217 218 219 220
      /*
	Index file offset can be less that purge offset only if
	we just started reading the index file. In that case
	we have nothing to adjust
221
      */
222 223 224 225 226 227 228
      if (linfo->index_file_offset < purge_offset)
	linfo->fatal = (linfo->index_file_offset != 0);
      else
	linfo->index_file_offset -= purge_offset;
      pthread_mutex_unlock(&linfo->lock);
    }
  }
unknown's avatar
unknown committed
229 230 231
  pthread_mutex_unlock(&LOCK_thread_count);
}

unknown's avatar
merge  
unknown committed
232

unknown's avatar
unknown committed
233 234 235 236 237
bool log_in_use(const char* log_name)
{
  int log_name_len = strlen(log_name) + 1;
  THD *tmp;
  bool result = 0;
238

unknown's avatar
unknown committed
239 240
  pthread_mutex_lock(&LOCK_thread_count);
  I_List_iterator<THD> it(threads);
241 242 243 244 245

  while ((tmp=it++))
  {
    LOG_INFO* linfo;
    if ((linfo = tmp->current_linfo))
unknown's avatar
unknown committed
246
    {
247 248 249
      pthread_mutex_lock(&linfo->lock);
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
      pthread_mutex_unlock(&linfo->lock);
250 251
      if (result)
	break;
252 253
    }
  }
unknown's avatar
unknown committed
254 255 256 257 258

  pthread_mutex_unlock(&LOCK_thread_count);
  return result;
}

unknown's avatar
merge  
unknown committed
259

unknown's avatar
unknown committed
260 261 262
int purge_master_logs(THD* thd, const char* to_log)
{
  char search_file_name[FN_REFLEN];
263 264
  const char* errmsg = 0;

unknown's avatar
unknown committed
265 266
  mysql_bin_log.make_log_name(search_file_name, to_log);
  int res = mysql_bin_log.purge_logs(thd, search_file_name);
267 268 269 270 271 272

  switch(res)  {
  case 0: break;
  case LOG_INFO_EOF:	 errmsg = "Target log not found in binlog index"; break;
  case LOG_INFO_IO:	 errmsg = "I/O error reading log index file"; break;
  case LOG_INFO_INVALID: errmsg = "Server configuration does not permit \
unknown's avatar
unknown committed
273
binlog purge"; break;
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
  case LOG_INFO_SEEK:	errmsg = "Failed on fseek()"; break;
  case LOG_INFO_PURGE_NO_ROTATE: errmsg = "Cannot purge unrotatable log";
    break;
  case LOG_INFO_MEM:	errmsg = "Out of memory"; break;
  case LOG_INFO_FATAL:	errmsg = "Fatal error during purge"; break;
  case LOG_INFO_IN_USE: errmsg = "A purgeable log is in use, will not purge";
    break;
  default:		errmsg = "Unknown error during purge"; break;
  }

  if (errmsg)
  {
    send_error(&thd->net, 0, errmsg);
    return 1;
  }
unknown's avatar
unknown committed
289 290
  else
    send_ok(&thd->net);
291

unknown's avatar
unknown committed
292
  return 0;
unknown's avatar
unknown committed
293 294
}

295 296 297
/*
  TODO: Clean up loop to only have one call to send_file()
*/
unknown's avatar
merge  
unknown committed
298

299 300
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
		       ushort flags)
301 302 303
{
  LOG_INFO linfo;
  char *log_file_name = linfo.log_file_name;
304
  char search_file_name[FN_REFLEN], *name;
305 306
  IO_CACHE log;
  File file = -1;
307 308 309 310
  String* packet = &thd->packet;
  int error;
  const char *errmsg = "Unknown error";
  NET* net = &thd->net;
311 312
#ifndef DBUG_OFF
  int left_events = max_binlog_dump_events;
313
#endif
314
  DBUG_ENTER("mysql_binlog_send");
315 316
  DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));

317
  bzero((char*) &log,sizeof(log));
318

unknown's avatar
unknown committed
319
#ifndef DBUG_OFF
320 321 322
  if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
  {
    errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
323
    my_errno= ER_UNKNOWN_ERROR;
324 325
    goto err;
  }
326
#endif
327

328
  if (!mysql_bin_log.is_open())
329 330
  {
    errmsg = "Binary log is not open";
331
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
332 333
    goto err;
  }
334 335 336
  if (!server_id_supplied)
  {
    errmsg = "Misconfigured master - server id was not set";
337
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
338 339 340
    goto err;
  }

341
  name=search_file_name;
342
  if (log_ident[0])
343 344
    mysql_bin_log.make_log_name(search_file_name, log_ident);
  else
345
    name=0;					// Find first log
346

unknown's avatar
unknown committed
347 348
  linfo.index_file_offset = 0;
  thd->current_linfo = &linfo;
349

350
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
351
  {
352
    errmsg = "Could not find first log file name in binary log index file";
353
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
354 355
    goto err;
  }
356

357
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
358 359
  {
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
360
    goto err;
361 362
  }
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
363
  {
364
    errmsg= "Client requested master to start replication from \
unknown's avatar
unknown committed
365
impossible position";
366
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
367 368
    goto err;
  }
369

370
  my_b_seek(&log, pos);				// Seek will done on next read
371 372 373 374 375
  /*
    We need to start a packet with something other than 255
    to distiquish it from error
  */
  packet->set("\0", 1);
376

unknown's avatar
unknown committed
377
  // if we are at the start of the log
378
  if (pos == BIN_LOG_HEADER_SIZE)
unknown's avatar
unknown committed
379
  {
380
    // tell the client log name with a fake rotate_event
unknown's avatar
unknown committed
381
    if (fake_rotate_event(net, packet, log_file_name, &errmsg))
382 383
    {
      my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
unknown's avatar
unknown committed
384
      goto err;
385
    }
386
    packet->set("\0", 1);
unknown's avatar
unknown committed
387
  }
unknown's avatar
unknown committed
388

unknown's avatar
unknown committed
389
  while (!net->error && net->vio != 0 && !thd->killed)
390 391
  {
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
392

393 394
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
    {
395
#ifndef DBUG_OFF
396
      if (max_binlog_dump_events && !left_events--)
397 398 399
      {
	net_flush(net);
	errmsg = "Debugging binlog dump abort";
400
	my_errno= ER_UNKNOWN_ERROR;
401 402
	goto err;
      }
403
#endif
unknown's avatar
unknown committed
404
      if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
405 406
      {
	errmsg = "Failed on my_net_write()";
407
	my_errno= ER_UNKNOWN_ERROR;
408 409 410 411
	goto err;
      }
      DBUG_PRINT("info", ("log event code %d",
			  (*packet)[LOG_EVENT_OFFSET+1] ));
unknown's avatar
unknown committed
412
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
413
      {
414
	if (send_file(thd))
415
	{
416
	  errmsg = "failed in send_file()";
417
	  my_errno= ER_UNKNOWN_ERROR;
418 419
	  goto err;
	}
420
      }
421
      packet->set("\0", 1);
422
    }
423 424 425 426
    /*
      TODO: now that we are logging the offset, check to make sure
      the recorded offset and the actual match
    */
unknown's avatar
unknown committed
427
    if (error != LOG_READ_EOF)
428
    {
429
      my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
430
      switch (error) {
431
      case LOG_READ_BOGUS:
432 433
	errmsg = "bogus data in log event";
	break;
434
      case LOG_READ_TOO_LARGE:
435 436
	errmsg = "log event entry exceeded max_allowed_packet; \
Increase max_allowed_packet on master";
unknown's avatar
unknown committed
437
	break;
438 439 440 441 442 443 444 445 446
      case LOG_READ_IO:
	errmsg = "I/O error reading log event";
	break;
      case LOG_READ_MEM:
	errmsg = "memory allocation failed reading log event";
	break;
      case LOG_READ_TRUNC:
	errmsg = "binlog truncated in the middle of event";
	break;
unknown's avatar
unknown committed
447 448 449
      default:
	errmsg = "unknown error reading log event on the master";
	break;
450 451 452
      }
      goto err;
    }
453

454
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
455
       mysql_bin_log.is_active(log_file_name))
456
    {
457 458 459
      /*
	Block until there is more data in the log
      */
460
      if (net_flush(net))
461 462
      {
	errmsg = "failed on net_flush()";
463
	my_errno= ER_UNKNOWN_ERROR;
464 465
	goto err;
      }
466

467 468 469 470 471 472
      /*
	We may have missed the update broadcast from the log
	that has just happened, let's try to catch it if it did.
	If we did not miss anything, we just wait for other threads
	to signal us.
      */
473 474 475 476
      {
	log.error=0;
	bool read_packet = 0, fatal_error = 0;

477
#ifndef DBUG_OFF
478
	if (max_binlog_dump_events && !left_events--)
479 480
	{
	  errmsg = "Debugging binlog dump abort";
481
	  my_errno= ER_UNKNOWN_ERROR;
482 483
	  goto err;
	}
484
#endif
485

486 487 488 489
	/*
	  No one will update the log while we are reading
	  now, but we'll be quick and just read one record

490 491 492 493
	  TODO:
	  Add an counter that is incremented for each time we update
	  the binary log.  We can avoid the following read if the counter
	  has not been updated since last read.
494
	*/
495

unknown's avatar
unknown committed
496
	pthread_mutex_lock(log_lock);
497
	switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
498
	case 0:
499
	  /* we read successfully, so we'll need to send it to the slave */
500
	  pthread_mutex_unlock(log_lock);
501 502
	  read_packet = 1;
	  break;
503

504
	case LOG_READ_EOF:
505
	  DBUG_PRINT("wait",("waiting for data in binary log"));
unknown's avatar
unknown committed
506
	  if (!thd->killed)
507 508
	  {
	    /* Note that the following call unlocks lock_log */
509
	    mysql_bin_log.wait_for_update(thd);
510 511 512
	  }
	  else
	    pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
513
	  DBUG_PRINT("wait",("binary log received update"));
514 515 516
	  break;

	default:
517
	  pthread_mutex_unlock(log_lock);
518 519
	  fatal_error = 1;
	  break;
520
	}
521

522
	if (read_packet)
523 524
	{
	  thd->proc_info = "sending update to slave";
525
	  if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
526 527
	  {
	    errmsg = "Failed on my_net_write()";
528
	    my_errno= ER_UNKNOWN_ERROR;
529 530
	    goto err;
	  }
unknown's avatar
unknown committed
531

532
	  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
533
	  {
534
	    if (send_file(thd))
unknown's avatar
unknown committed
535
	    {
536
	      errmsg = "failed in send_file()";
537
	      my_errno= ER_UNKNOWN_ERROR;
unknown's avatar
unknown committed
538 539
	      goto err;
	    }
540
	  }
541 542 543 544 545
	  packet->set("\0", 1);
	  /*
	    No need to net_flush because we will get to flush later when
	    we hit EOF pretty quick
	  */
546
	}
547

548
	if (fatal_error)
549 550
	{
	  errmsg = "error reading log entry";
551
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
552
	  goto err;
553
	}
554 555
	log.error=0;
      }
556
    }
557 558 559 560 561
    else
    {
      bool loop_breaker = 0;
      // need this to break out of the for loop from switch
      thd->proc_info = "switching to next log";
562
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
563 564 565 566 567 568 569
      case LOG_INFO_EOF:
	loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
	break;
      case 0:
	break;
      default:
	errmsg = "could not find next log";
570
	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
571 572 573
	goto err;
      }

574
      if (loop_breaker)
575 576 577 578
	break;

      end_io_cache(&log);
      (void) my_close(file, MYF(MY_WME));
579

580 581
      // fake Rotate_log event just in case it did not make it to the log
      // otherwise the slave make get confused about the offset
582
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
unknown's avatar
unknown committed
583
	  fake_rotate_event(net, packet, log_file_name, &errmsg))
584 585
      {
	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
unknown's avatar
unknown committed
586
	goto err;
587
      }
unknown's avatar
unknown committed
588 589
      packet->length(0);
      packet->append("\0",1);
590
    }
591
  }
592

593 594
  end_io_cache(&log);
  (void)my_close(file, MYF(MY_WME));
595

596
  send_eof(&thd->net);
597
  thd->proc_info = "waiting to finalize termination";
unknown's avatar
unknown committed
598 599 600
  pthread_mutex_lock(&LOCK_thread_count);
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
601
  DBUG_VOID_RETURN;
602

603
 err:
604
  thd->proc_info = "waiting to finalize termination";
605
  end_io_cache(&log);
unknown's avatar
unknown committed
606 607 608 609 610 611 612
  /*
    Exclude  iteration through thread list
    this is needed for purge_logs() - it will iterate through
    thread list and update thd->current_linfo->index_file_offset
    this mutex will make sure that it never tried to update our linfo
    after we return from this stack frame
  */
613
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
614 615
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
616 617
  if (file >= 0)
    (void) my_close(file, MYF(MY_WME));
unknown's avatar
unknown committed
618
  send_error(&thd->net, my_errno, errmsg);
619 620 621
  DBUG_VOID_RETURN;
}

622
int start_slave(THD* thd , MASTER_INFO* mi,  bool net_report)
623
{
624
  int slave_errno = 0;
625 626
  if (!thd) thd = current_thd;
  NET* net = &thd->net;
627
  int thread_mask;
628
  DBUG_ENTER("start_slave");
629
  
unknown's avatar
unknown committed
630
  if (check_access(thd, SUPER_ACL, any_db))
631
    DBUG_RETURN(1);
632 633
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
634 635
  if (thd->lex.slave_thd_opt)
    thread_mask &= thd->lex.slave_thd_opt;
636 637
  if (thread_mask)
  {
638 639 640
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0))
      slave_errno=ER_MASTER_INFO;
    else if (server_id_supplied && *mi->host)
641 642 643 644 645
      slave_errno = start_slave_threads(0 /*no mutex */,
					1 /* wait for start */,
					mi,
					master_info_file,relay_log_info_file,
					thread_mask);
646 647 648
    else
      slave_errno = ER_BAD_SLAVE;
  }
649
  else
650
    slave_errno = ER_SLAVE_MUST_STOP;
651 652 653
  
  unlock_slave_threads(mi);
  
654 655
  if (slave_errno)
  {
656 657
    if (net_report)
      send_error(net, slave_errno);
658
    DBUG_RETURN(1);
659 660
  }
  else if (net_report)
661 662
    send_ok(net);

663
  DBUG_RETURN(0);
664 665
}

666
int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
667
{
668
  int slave_errno = 0;
669 670 671
  if (!thd) thd = current_thd;
  NET* net = &thd->net;

unknown's avatar
unknown committed
672
  if (check_access(thd, SUPER_ACL, any_db))
673
    return 1;
674 675 676 677
  thd->proc_info = "Killing slave";
  int thread_mask;
  lock_slave_threads(mi);
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
678 679
  if (thd->lex.slave_thd_opt)
    thread_mask &= thd->lex.slave_thd_opt;
680 681 682 683
  slave_errno = (thread_mask) ?
    terminate_slave_threads(mi,thread_mask,
			    1 /*skip lock */) :    ER_SLAVE_NOT_RUNNING;
  unlock_slave_threads(mi);
684 685
  thd->proc_info = 0;

686 687 688 689 690 691 692
  if (slave_errno)
  {
    if (net_report)
      send_error(net, slave_errno);
    return 1;
  }
  else if (net_report)
693 694 695 696 697
    send_ok(net);

  return 0;
}

698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719

/*
  Remove all relay logs and start replication from the start

  SYNOPSIS
    reset_slave()
    thd			Thread handler
    mi			Master info for the slave


  NOTES
    We don't send ok in this functions as this is called from
    reload_acl_and_cache() which may have done other tasks, which may
    have failed for which we want to send and error.

  RETURN
    0	ok
    1	error
	In this case error is sent to the client with send_error()
*/


720
int reset_slave(THD *thd, MASTER_INFO* mi)
721 722 723
{
  MY_STAT stat_area;
  char fname[FN_REFLEN];
724 725
  int thread_mask= 0, error= 0;
  uint sql_errno=0;
726
  const char* errmsg=0;
unknown's avatar
unknown committed
727 728
  DBUG_ENTER("reset_slave");

729
  lock_slave_threads(mi);
730 731 732 733 734 735 736
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
  if (thread_mask) // We refuse if any slave thread is running
  {
    sql_errno= ER_SLAVE_MUST_STOP;
    error=1;
    goto err;
  }
737
  //delete relay logs, clear relay log coordinates
738 739 740
  if ((error= purge_relay_logs(&mi->rli, thd,
			       1 /* just reset */,
			       &errmsg)))
741 742
    goto err;
  
743 744 745 746
  //Clear master's log coordinates (only for good display of SHOW SLAVE STATUS)
  mi->master_log_name[0]= 0;
  mi->master_log_pos= BIN_LOG_HEADER_SIZE;
  //close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
747
  end_master_info(mi);
748
  //and delete these two files
749 750
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
  if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
751
  {
752 753
    error=1;
    goto err;
754
  }
755
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
756
  if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
757 758 759 760
  {
    error=1;
    goto err;
  }
761

762 763
err:
  unlock_slave_threads(mi);
764 765
  if (thd && error) 
    send_error(&thd->net, sql_errno, errmsg);
unknown's avatar
unknown committed
766
  DBUG_RETURN(error);
767 768
}

769

770 771 772
void kill_zombie_dump_threads(uint32 slave_server_id)
{
  pthread_mutex_lock(&LOCK_thread_count);
773 774 775
  I_List_iterator<THD> it(threads);
  THD *tmp;

776 777 778 779
  while ((tmp=it++))
  {
    if (tmp->command == COM_BINLOG_DUMP &&
       tmp->server_id == slave_server_id)
780
    {
781 782
      pthread_mutex_lock(&tmp->LOCK_delete);	// Lock from delete
      break;
783 784
    }
  }
785
  pthread_mutex_unlock(&LOCK_thread_count);
786 787 788 789 790 791 792 793 794 795
  if (tmp)
  {
    /*
      Here we do not call kill_one_thread() as
      it will be slow because it will iterate through the list
      again. We just to do kill the thread ourselves.
    */
    tmp->awake(1/*prepare to die*/);
    pthread_mutex_unlock(&tmp->LOCK_delete);
  }
796 797
}

798

799
int change_master(THD* thd, MASTER_INFO* mi)
800
{
801
  int thread_mask;
802
  const char* errmsg=0;
803
  bool need_relay_log_purge=1;
804 805
  DBUG_ENTER("change_master");

806
  lock_slave_threads(mi);
807 808
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
  if (thread_mask) // We refuse if any slave thread is running
809
  {
810
    net_printf(&thd->net,ER_SLAVE_MUST_STOP);
811
    unlock_slave_threads(mi);
812
    DBUG_RETURN(1);
813
  }
814

815 816
  thd->proc_info = "changing master";
  LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
817
  // TODO: see if needs re-write
818
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0))
819 820
  {
    send_error(&thd->net, 0, "Could not initialize master info");
821
    unlock_slave_threads(mi);
822
    DBUG_RETURN(1);
823 824
  }

825 826 827 828
  /*
    Data lock not needed since we have already stopped the running threads,
    and we have the hold on the run locks which will keep all threads that
    could possibly modify the data structures from running
829
  */
830 831 832
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
  {
    // if we change host or port, we must reset the postion
833
    mi->master_log_name[0] = 0;
834
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
835
    mi->rli.pending = 0;
836
  }
837

838
  if (lex_mi->log_file_name)
839 840
    strmake(mi->master_log_name, lex_mi->log_file_name,
	    sizeof(mi->master_log_name));
841
  if (lex_mi->pos)
unknown's avatar
unknown committed
842
  {
843
    mi->master_log_pos= lex_mi->pos;
844
    mi->rli.pending = 0;
unknown's avatar
unknown committed
845
  }
846
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
847 848

  if (lex_mi->host)
849
    strmake(mi->host, lex_mi->host, sizeof(mi->host));
850
  if (lex_mi->user)
851
    strmake(mi->user, lex_mi->user, sizeof(mi->user));
852
  if (lex_mi->password)
853
    strmake(mi->password, lex_mi->password, sizeof(mi->password));
854
  if (lex_mi->port)
855
    mi->port = lex_mi->port;
856
  if (lex_mi->connect_retry)
857 858
    mi->connect_retry = lex_mi->connect_retry;

859 860
  if (lex_mi->relay_log_name)
  {
861
    need_relay_log_purge= 0;
862
    strmake(mi->rli.relay_log_name,lex_mi->relay_log_name,
unknown's avatar
unknown committed
863
	    sizeof(mi->rli.relay_log_name)-1);
864 865 866 867
  }

  if (lex_mi->relay_log_pos)
  {
868
    need_relay_log_purge= 0;
869 870 871
    mi->rli.relay_log_pos=lex_mi->relay_log_pos;
  }

872
  flush_master_info(mi);
873
  if (need_relay_log_purge)
874
  {
875
    mi->rli.skip_log_purge= 0;
876
    thd->proc_info="purging old relay logs";
877 878
    if (purge_relay_logs(&mi->rli, thd,
			 0 /* not only reset, but also reinit */,
879 880
			 &errmsg))
    {
881
      net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg);
882
      unlock_slave_threads(mi);
883
      DBUG_RETURN(1);
884 885 886 887 888
    }
  }
  else
  {
    const char* msg;
889
    mi->rli.skip_log_purge= 1;
890 891 892 893
    /* Relay log is already initialized */
    if (init_relay_log_pos(&mi->rli,
			   mi->rli.relay_log_name,
			   mi->rli.relay_log_pos,
894 895 896 897 898
			   0 /*no data lock*/,
			   &msg))
    {
      net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg);
      unlock_slave_threads(mi);
899
      DBUG_RETURN(1);
900
    }
901 902
  }
  mi->rli.master_log_pos = mi->master_log_pos;
903
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
904 905
  strmake(mi->rli.master_log_name,mi->master_log_name,
	  sizeof(mi->rli.master_log_name)-1);
906 907
  if (!mi->rli.master_log_name[0]) // uninitialized case
    mi->rli.master_log_pos=0;
908 909

  pthread_mutex_lock(&mi->rli.data_lock);
910
  mi->rli.abort_pos_wait++;
911
  pthread_cond_broadcast(&mi->data_cond);
912
  pthread_mutex_unlock(&mi->rli.data_lock);
913

914
  unlock_slave_threads(mi);
915
  thd->proc_info = 0;
916
  send_ok(&thd->net);
917
  DBUG_RETURN(0);
918 919
}

920
int reset_master(THD* thd)
921
{
922
  if (!mysql_bin_log.is_open())
923 924
  {
    my_error(ER_FLUSH_MASTER_BINLOG_CLOSED,  MYF(ME_BELL+ME_WAITTANG));
925
    return 1;
926
  }
927
  return mysql_bin_log.reset_logs(thd);
928 929
}

unknown's avatar
unknown committed
930 931 932 933
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
		   const char* log_file_name2, ulonglong log_pos2)
{
  int res;
934 935
  uint log_file_name1_len=  strlen(log_file_name1);
  uint log_file_name2_len=  strlen(log_file_name2);
unknown's avatar
unknown committed
936

937 938 939 940 941 942 943 944
  //  We assume that both log names match up to '.'
  if (log_file_name1_len == log_file_name2_len)
  {
    if ((res= strcmp(log_file_name1, log_file_name2)))
      return res;
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
  }
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
unknown's avatar
unknown committed
945 946
}

unknown's avatar
unknown committed
947

unknown's avatar
unknown committed
948 949 950 951 952 953 954
int show_binlog_events(THD* thd)
{
  DBUG_ENTER("show_binlog_events");
  List<Item> field_list;
  const char* errmsg = 0;
  IO_CACHE log;
  File file = -1;
955 956

  Log_event::init_show_field_list(&field_list);
unknown's avatar
unknown committed
957 958
  if (send_fields(thd, field_list, 1))
    DBUG_RETURN(-1);
959

unknown's avatar
unknown committed
960 961
  if (mysql_bin_log.is_open())
  {
962
    LEX_MASTER_INFO *lex_mi = &thd->lex.mi;
963
    ha_rows event_count, limit_start, limit_end;
964
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
965 966
    char search_file_name[FN_REFLEN], *name;
    const char *log_file_name = lex_mi->log_file_name;
967
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
968 969
    LOG_INFO linfo;
    Log_event* ev;
970
  
unknown's avatar
unknown committed
971 972 973
    limit_start = thd->lex.select->offset_limit;
    limit_end = thd->lex.select->select_limit + limit_start;

974
    name= search_file_name;
unknown's avatar
unknown committed
975 976 977
    if (log_file_name)
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
    else
978
      name=0;					// Find first log
unknown's avatar
unknown committed
979 980 981

    linfo.index_file_offset = 0;
    thd->current_linfo = &linfo;
982

983
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
unknown's avatar
unknown committed
984 985 986 987 988 989 990 991
    {
      errmsg = "Could not find target log";
      goto err;
    }

    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
      goto err;

992
    pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
993 994
    my_b_seek(&log, pos);

995 996
    for (event_count = 0;
	 (ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,0)); )
unknown's avatar
unknown committed
997 998
    {
      if (event_count >= limit_start &&
999 1000 1001 1002
	  ev->net_send(thd, linfo.log_file_name, pos))
      {
	errmsg = "Net error";
	delete ev;
1003
	pthread_mutex_unlock(log_lock);
1004 1005 1006 1007
	goto err;
      }

      pos = my_b_tell(&log);
unknown's avatar
unknown committed
1008 1009 1010 1011 1012 1013 1014 1015 1016
      delete ev;

      if (++event_count >= limit_end)
	break;
    }

    if (event_count < limit_end && log.error)
    {
      errmsg = "Wrong offset or I/O error";
1017
      pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
1018 1019
      goto err;
    }
1020

1021
    pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
1022 1023 1024 1025 1026 1027 1028 1029
  }

err:
  if (file >= 0)
  {
    end_io_cache(&log);
    (void) my_close(file, MYF(MY_WME));
  }
1030

unknown's avatar
unknown committed
1031 1032
  if (errmsg)
  {
unknown's avatar
unknown committed
1033 1034 1035
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
	     "SHOW BINLOG EVENTS", errmsg);
    DBUG_RETURN(-1);
unknown's avatar
unknown committed
1036 1037 1038
  }

  send_eof(&thd->net);
1039 1040 1041
  pthread_mutex_lock(&LOCK_thread_count);
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
1042 1043 1044
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1045

1046 1047 1048 1049 1050 1051 1052 1053 1054
int show_binlog_info(THD* thd)
{
  DBUG_ENTER("show_binlog_info");
  List<Item> field_list;
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
  field_list.push_back(new Item_empty_string("Position",20));
  field_list.push_back(new Item_empty_string("Binlog_do_db",20));
  field_list.push_back(new Item_empty_string("Binlog_ignore_db",20));

1055
  if (send_fields(thd, field_list, 1))
1056 1057 1058 1059
    DBUG_RETURN(-1);
  String* packet = &thd->packet;
  packet->length(0);

1060 1061 1062 1063 1064 1065 1066 1067 1068
  if (mysql_bin_log.is_open())
  {
    LOG_INFO li;
    mysql_bin_log.get_current_log(&li);
    int dir_len = dirname_length(li.log_file_name);
    net_store_data(packet, li.log_file_name + dir_len);
    net_store_data(packet, (longlong)li.pos);
    net_store_data(packet, &binlog_do_db);
    net_store_data(packet, &binlog_ignore_db);
1069 1070
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
      DBUG_RETURN(-1);
1071
  }
1072 1073 1074
  send_eof(&thd->net);
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1075

1076

1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088
/*
  Send a lost of all binary logs to client

  SYNOPSIS
    show_binlogs()
    thd		Thread specific variable

  RETURN VALUES
    0	ok
    1	error  (Error message sent to client)
*/

unknown's avatar
unknown committed
1089 1090
int show_binlogs(THD* thd)
{
1091 1092
  const char *errmsg;
  IO_CACHE *index_file;
unknown's avatar
unknown committed
1093 1094 1095
  char fname[FN_REFLEN];
  NET* net = &thd->net;
  List<Item> field_list;
1096
  String *packet = &thd->packet;
1097
  uint length;
1098 1099

  if (!mysql_bin_log.is_open())
1100
  {
1101 1102 1103
    //TODO:  Replace with ER() error message
    errmsg= "You are not using binary logging";
    goto err_with_msg;
1104
  }
unknown's avatar
unknown committed
1105

1106
  field_list.push_back(new Item_empty_string("Log_name", 255));
1107
  if (send_fields(thd, field_list, 1))
1108
    return 1;
unknown's avatar
unknown committed
1109
  mysql_bin_log.lock_index();
1110 1111 1112 1113 1114 1115
  index_file=mysql_bin_log.get_index_file();
  
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);

  /* The file ends with EOF or empty line */
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1116 1117 1118
  {
    int dir_len = dirname_length(fname);
    packet->length(0);
1119 1120
    /* The -1 is for removing newline from fname */
    net_store_data(packet, fname + dir_len, length-1-dir_len);
1121
    if (my_net_write(net, (char*) packet->ptr(), packet->length()))
1122
      goto err;
1123
  }
unknown's avatar
unknown committed
1124
  mysql_bin_log.unlock_index();
1125
  send_eof(net);
unknown's avatar
unknown committed
1126
  return 0;
1127

1128
err_with_msg:
1129
  send_error(net, 0, errmsg);
1130 1131
err:
  mysql_bin_log.unlock_index();
1132
  return 1;
unknown's avatar
unknown committed
1133 1134
}

1135

1136 1137 1138 1139
int log_loaded_block(IO_CACHE* file)
{
  LOAD_FILE_INFO* lf_info;
  uint block_len ;
unknown's avatar
unknown committed
1140 1141 1142

  /* file->request_pos contains position where we started last read */
  char* buffer = (char*) file->request_pos;
unknown's avatar
unknown committed
1143
  if (!(block_len = (char*) file->read_end - (char*) buffer))
1144
    return 0;
unknown's avatar
unknown committed
1145
  lf_info = (LOAD_FILE_INFO*) file->arg;
1146 1147 1148 1149 1150 1151
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
      lf_info->last_pos_in_file >= file->pos_in_file)
    return 0;
  lf_info->last_pos_in_file = file->pos_in_file;
  if (lf_info->wrote_create_file)
  {
1152 1153
    Append_block_log_event a(lf_info->thd, buffer, block_len,
			     lf_info->log_delayed);
1154 1155 1156 1157 1158 1159
    mysql_bin_log.write(&a);
  }
  else
  {
    Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db,
			    lf_info->table_name, *lf_info->fields,
unknown's avatar
unknown committed
1160
			    lf_info->handle_dup, buffer,
1161
			    block_len, lf_info->log_delayed);
1162 1163
    mysql_bin_log.write(&c);
    lf_info->wrote_create_file = 1;
1164
    DBUG_SYNC_POINT("debug_lock.created_file_event",10);
1165 1166 1167
  }
  return 0;
}