sql_repl.cc 29 KB
Newer Older
1
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB & Sasha
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

// Sasha Pachev <sasha@mysql.com> is currently in charge of this file

#include "mysql_priv.h"
#include "sql_repl.h"
#include "sql_acl.h"
#include "log_event.h"
23
#include "mini_client.h"
24
#include <my_dir.h>
25
#include <assert.h>
26 27

extern const char* any_db;
28

29
int max_binlog_dump_events = 0; // unlimited
30
my_bool opt_sporadic_binlog_dump_fail = 0;
31
static int binlog_dump_count = 0;
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
32

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
{
  char magic[4];
  DBUG_ASSERT(my_b_tell(log) == 0);

  if (my_b_read(log, (byte*) magic, sizeof(magic)))
  {
    *errmsg = "I/O error reading the header from the binary log";
    sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
		    log->error);
    return 1;
  }
  if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
  {
    *errmsg = "Binlog has bad magic number;  It's not a binary log file that can be used by this version of MySQL";
    return 1;
  }
  return 0;
}

53 54 55
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
			     const char**errmsg)
{
56
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN];
57 58 59
  memset(header, 0, 4); // when does not matter
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;

60
  char* p = log_file_name+dirname_length(log_file_name);
61
  uint ident_len = (uint) strlen(p);
62
  ulong event_len = ident_len + ROTATE_EVENT_OVERHEAD;
63
  int4store(header + SERVER_ID_OFFSET, server_id);
64
  int4store(header + EVENT_LEN_OFFSET, event_len);
65
  int2store(header + FLAGS_OFFSET, 0);
66 67 68 69
  
  // TODO: check what problems this may cause and fix them
  int4store(header + LOG_POS_OFFSET, 0);
  
70
  packet->append(header, sizeof(header));
71 72 73
  /* We need to split the next statement because of problem with cxx */
  int4store(buf,4); // tell slave to skip magic number
  int4store(buf+4,0);
74
  packet->append(buf, ROTATE_HEADER_LEN);
75
  packet->append(p,ident_len);
76 77 78 79 80
  if (my_net_write(net, (char*)packet->ptr(), packet->length()))
  {
    *errmsg = "failed on my_net_write()";
    return -1;
  }
81 82 83
  return 0;
}

84 85 86 87 88 89
static int send_file(THD *thd)
{
  NET* net = &thd->net;
  int fd = -1,bytes, error = 1;
  char fname[FN_REFLEN+1];
  const char *errmsg = 0;
90 91
  int old_timeout;
  uint packet_len;
92
  char buf[IO_SIZE];				// It's safe to alloc this
93 94
  DBUG_ENTER("send_file");

95 96 97 98 99 100 101 102 103 104 105
  /*
    The client might be slow loading the data, give him wait_timeout to do
    the job
  */
  old_timeout = thd->net.read_timeout;
  thd->net.read_timeout = thd->variables.net_wait_timeout;

  /*
    We need net_flush here because the client will not know it needs to send
    us the file name until it has processed the load event entry
  */
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
106
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
107
  {
108
    errmsg = "while reading file name";
109 110 111
    goto err;
  }

112 113 114
  // terminate with \0 for fn_format
  *((char*)net->read_pos +  packet_len) = 0;
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
115 116 117 118
  // this is needed to make replicate-ignore-db
  if (!strcmp(fname,"/dev/null"))
    goto end;

119
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
120
  {
121
    errmsg = "on open of file";
122 123 124
    goto err;
  }

125
  while ((bytes = (int) my_read(fd, (byte*) buf, IO_SIZE, MYF(0))) > 0)
126 127 128
  {
    if (my_net_write(net, buf, bytes))
    {
129
      errmsg = "while writing data to client";
130 131 132 133 134 135 136 137
      goto err;
    }
  }

 end:
  if (my_net_write(net, "", 0) || net_flush(net) ||
      (my_net_read(net) == packet_error))
  {
138
    errmsg = "while negotiating file transfer close";
139 140 141 142 143
    goto err;
  }
  error = 0;

 err:
144
  thd->net.read_timeout = old_timeout;
145 146
  if (fd >= 0)
    (void) my_close(fd, MYF(0));
147 148
  if (errmsg)
  {
149
    sql_print_error("Failed in send_file() %s", errmsg);
150 151 152 153 154
    DBUG_PRINT("error", (errmsg));
  }
  DBUG_RETURN(error);
}

155

156
File open_binlog(IO_CACHE *log, const char *log_file_name,
157
		 const char **errmsg)
158 159
{
  File file;
160
  DBUG_ENTER("open_binlog");
161

162 163
  if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 ||
      init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
monty@bitch.mysql.fi's avatar
monty@bitch.mysql.fi committed
164
		    MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))
165
  {
166
    *errmsg = "Could not open log file";	// This will not be sent
167 168
    goto err;
  }
169
  if (check_binlog_magic(log,errmsg))
170
    goto err;
171
  DBUG_RETURN(file);
172 173

err:
174 175
  if (file >= 0)
  {
176
    my_close(file,MYF(0));
177 178
    end_io_cache(log);
  }
179
  DBUG_RETURN(-1);
180 181 182
}


183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
/*
  Adjust the position pointer in the binary log file for all running slaves

  SYNOPSIS
    adjust_linfo_offsets()
    purge_offset	Number of bytes removed from start of log index file

  NOTES
    - This is called when doing a PURGE when we delete lines from the
      index log file

  REQUIREMENTS
    - Before calling this function, we have to ensure that no threads are
      using any binary log file before purge_offset.a

  TODO
    - Inform the slave threads that they should sync the position
      in the binary log file with flush_relay_log_info.
      Now they sync is done for next read.
*/

204 205 206
void adjust_linfo_offsets(my_off_t purge_offset)
{
  THD *tmp;
207

208 209 210
  pthread_mutex_lock(&LOCK_thread_count);
  I_List_iterator<THD> it(threads);

211 212 213 214 215 216
  while ((tmp=it++))
  {
    LOG_INFO* linfo;
    if ((linfo = tmp->current_linfo))
    {
      pthread_mutex_lock(&linfo->lock);
217 218 219 220
      /*
	Index file offset can be less that purge offset only if
	we just started reading the index file. In that case
	we have nothing to adjust
221
      */
222 223 224 225 226 227 228
      if (linfo->index_file_offset < purge_offset)
	linfo->fatal = (linfo->index_file_offset != 0);
      else
	linfo->index_file_offset -= purge_offset;
      pthread_mutex_unlock(&linfo->lock);
    }
  }
229 230 231
  pthread_mutex_unlock(&LOCK_thread_count);
}

monty@narttu.mysql.fi's avatar
merge  
monty@narttu.mysql.fi committed
232

233 234 235 236 237
bool log_in_use(const char* log_name)
{
  int log_name_len = strlen(log_name) + 1;
  THD *tmp;
  bool result = 0;
238

239 240
  pthread_mutex_lock(&LOCK_thread_count);
  I_List_iterator<THD> it(threads);
241 242 243 244 245

  while ((tmp=it++))
  {
    LOG_INFO* linfo;
    if ((linfo = tmp->current_linfo))
246
    {
247 248 249
      pthread_mutex_lock(&linfo->lock);
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
      pthread_mutex_unlock(&linfo->lock);
250 251
      if (result)
	break;
252 253
    }
  }
254 255 256 257 258

  pthread_mutex_unlock(&LOCK_thread_count);
  return result;
}

monty@narttu.mysql.fi's avatar
merge  
monty@narttu.mysql.fi committed
259

260 261 262
int purge_master_logs(THD* thd, const char* to_log)
{
  char search_file_name[FN_REFLEN];
263 264
  const char* errmsg = 0;

265 266
  mysql_bin_log.make_log_name(search_file_name, to_log);
  int res = mysql_bin_log.purge_logs(thd, search_file_name);
267 268 269 270 271 272

  switch(res)  {
  case 0: break;
  case LOG_INFO_EOF:	 errmsg = "Target log not found in binlog index"; break;
  case LOG_INFO_IO:	 errmsg = "I/O error reading log index file"; break;
  case LOG_INFO_INVALID: errmsg = "Server configuration does not permit \
273
binlog purge"; break;
274 275 276 277 278 279 280 281 282 283 284 285
  case LOG_INFO_SEEK:	errmsg = "Failed on fseek()"; break;
  case LOG_INFO_PURGE_NO_ROTATE: errmsg = "Cannot purge unrotatable log";
    break;
  case LOG_INFO_MEM:	errmsg = "Out of memory"; break;
  case LOG_INFO_FATAL:	errmsg = "Fatal error during purge"; break;
  case LOG_INFO_IN_USE: errmsg = "A purgeable log is in use, will not purge";
    break;
  default:		errmsg = "Unknown error during purge"; break;
  }

  if (errmsg)
  {
286
    send_error(thd, 0, errmsg);
287 288
    return 1;
  }
289
  else
290
    send_ok(thd);
291

sasha@laptop.slkc.uswest.net's avatar
sasha@laptop.slkc.uswest.net committed
292
  return 0;
293 294
}

295 296 297
/*
  TODO: Clean up loop to only have one call to send_file()
*/
monty@narttu.mysql.fi's avatar
merge  
monty@narttu.mysql.fi committed
298

299 300
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
		       ushort flags)
301 302 303
{
  LOG_INFO linfo;
  char *log_file_name = linfo.log_file_name;
304
  char search_file_name[FN_REFLEN], *name;
305 306
  IO_CACHE log;
  File file = -1;
307 308 309 310
  String* packet = &thd->packet;
  int error;
  const char *errmsg = "Unknown error";
  NET* net = &thd->net;
311 312
#ifndef DBUG_OFF
  int left_events = max_binlog_dump_events;
313
#endif
314
  DBUG_ENTER("mysql_binlog_send");
315 316
  DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));

317
  bzero((char*) &log,sizeof(log));
318

sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
319
#ifndef DBUG_OFF
320 321 322
  if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
  {
    errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
323
    my_errno= ER_UNKNOWN_ERROR;
324 325
    goto err;
  }
326
#endif
327

328
  if (!mysql_bin_log.is_open())
329 330
  {
    errmsg = "Binary log is not open";
331
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
332 333
    goto err;
  }
334 335 336
  if (!server_id_supplied)
  {
    errmsg = "Misconfigured master - server id was not set";
337
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
338 339 340
    goto err;
  }

341
  name=search_file_name;
342
  if (log_ident[0])
343 344
    mysql_bin_log.make_log_name(search_file_name, log_ident);
  else
345
    name=0;					// Find first log
346

347 348
  linfo.index_file_offset = 0;
  thd->current_linfo = &linfo;
349

350
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
351
  {
352
    errmsg = "Could not find first log file name in binary log index file";
353
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
354 355
    goto err;
  }
356

357
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
358 359
  {
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
360
    goto err;
361 362
  }
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
363
  {
364
    errmsg= "Client requested master to start replication from \
365
impossible position";
366
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
367 368
    goto err;
  }
369

370
  my_b_seek(&log, pos);				// Seek will done on next read
371 372 373 374
  /*
    We need to start a packet with something other than 255
    to distiquish it from error
  */
375
  packet->set("\0", 1, system_charset_info);
376

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
377
  // if we are at the start of the log
378
  if (pos == BIN_LOG_HEADER_SIZE)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
379
  {
380
    // tell the client log name with a fake rotate_event
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
381
    if (fake_rotate_event(net, packet, log_file_name, &errmsg))
382 383
    {
      my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
384
      goto err;
385
    }
386
    packet->set("\0", 1, system_charset_info);
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
387
  }
388

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
389
  while (!net->error && net->vio != 0 && !thd->killed)
390 391
  {
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
392

393 394
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
    {
395
#ifndef DBUG_OFF
396
      if (max_binlog_dump_events && !left_events--)
397 398 399
      {
	net_flush(net);
	errmsg = "Debugging binlog dump abort";
400
	my_errno= ER_UNKNOWN_ERROR;
401 402
	goto err;
      }
403
#endif
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
404
      if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
405 406
      {
	errmsg = "Failed on my_net_write()";
407
	my_errno= ER_UNKNOWN_ERROR;
408 409 410 411
	goto err;
      }
      DBUG_PRINT("info", ("log event code %d",
			  (*packet)[LOG_EVENT_OFFSET+1] ));
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
412
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
413
      {
414
	if (send_file(thd))
415
	{
416
	  errmsg = "failed in send_file()";
417
	  my_errno= ER_UNKNOWN_ERROR;
418 419
	  goto err;
	}
420
      }
421
      packet->set("\0", 1, system_charset_info);
422
    }
423 424 425 426
    /*
      TODO: now that we are logging the offset, check to make sure
      the recorded offset and the actual match
    */
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
427
    if (error != LOG_READ_EOF)
428
    {
429
      my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
430
      switch (error) {
431
      case LOG_READ_BOGUS:
432 433
	errmsg = "bogus data in log event";
	break;
434
      case LOG_READ_TOO_LARGE:
435 436
	errmsg = "log event entry exceeded max_allowed_packet; \
Increase max_allowed_packet on master";
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
437
	break;
438 439 440 441 442 443 444 445 446
      case LOG_READ_IO:
	errmsg = "I/O error reading log event";
	break;
      case LOG_READ_MEM:
	errmsg = "memory allocation failed reading log event";
	break;
      case LOG_READ_TRUNC:
	errmsg = "binlog truncated in the middle of event";
	break;
447 448 449
      default:
	errmsg = "unknown error reading log event on the master";
	break;
450 451 452
      }
      goto err;
    }
453

454
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
455
       mysql_bin_log.is_active(log_file_name))
456
    {
457 458 459
      /*
	Block until there is more data in the log
      */
460
      if (net_flush(net))
461 462
      {
	errmsg = "failed on net_flush()";
463
	my_errno= ER_UNKNOWN_ERROR;
464 465
	goto err;
      }
466

467 468 469 470 471 472
      /*
	We may have missed the update broadcast from the log
	that has just happened, let's try to catch it if it did.
	If we did not miss anything, we just wait for other threads
	to signal us.
      */
473 474 475 476
      {
	log.error=0;
	bool read_packet = 0, fatal_error = 0;

477
#ifndef DBUG_OFF
478
	if (max_binlog_dump_events && !left_events--)
479 480
	{
	  errmsg = "Debugging binlog dump abort";
481
	  my_errno= ER_UNKNOWN_ERROR;
482 483
	  goto err;
	}
484
#endif
485

486 487 488 489
	/*
	  No one will update the log while we are reading
	  now, but we'll be quick and just read one record

490 491 492 493
	  TODO:
	  Add an counter that is incremented for each time we update
	  the binary log.  We can avoid the following read if the counter
	  has not been updated since last read.
494
	*/
495

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
496
	pthread_mutex_lock(log_lock);
497
	switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
498
	case 0:
499
	  /* we read successfully, so we'll need to send it to the slave */
500
	  pthread_mutex_unlock(log_lock);
501 502
	  read_packet = 1;
	  break;
503

504
	case LOG_READ_EOF:
505
	  DBUG_PRINT("wait",("waiting for data in binary log"));
506
	  if (!thd->killed)
507 508
	  {
	    /* Note that the following call unlocks lock_log */
509
	    mysql_bin_log.wait_for_update(thd);
510 511 512
	  }
	  else
	    pthread_mutex_unlock(log_lock);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
513
	  DBUG_PRINT("wait",("binary log received update"));
514 515 516
	  break;

	default:
517
	  pthread_mutex_unlock(log_lock);
518 519
	  fatal_error = 1;
	  break;
520
	}
521

522
	if (read_packet)
523 524
	{
	  thd->proc_info = "sending update to slave";
525
	  if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
526 527
	  {
	    errmsg = "Failed on my_net_write()";
528
	    my_errno= ER_UNKNOWN_ERROR;
529 530
	    goto err;
	  }
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
531

532
	  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
533
	  {
534
	    if (send_file(thd))
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
535
	    {
536
	      errmsg = "failed in send_file()";
537
	      my_errno= ER_UNKNOWN_ERROR;
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
538 539
	      goto err;
	    }
540
	  }
541
	  packet->set("\0", 1, system_charset_info);
542 543 544 545
	  /*
	    No need to net_flush because we will get to flush later when
	    we hit EOF pretty quick
	  */
546
	}
547

548
	if (fatal_error)
549 550
	{
	  errmsg = "error reading log entry";
551
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
552
	  goto err;
553
	}
554 555
	log.error=0;
      }
556
    }
557 558 559 560 561
    else
    {
      bool loop_breaker = 0;
      // need this to break out of the for loop from switch
      thd->proc_info = "switching to next log";
562
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
563 564 565 566 567 568 569
      case LOG_INFO_EOF:
	loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
	break;
      case 0:
	break;
      default:
	errmsg = "could not find next log";
570
	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
571 572 573
	goto err;
      }

574
      if (loop_breaker)
575 576 577 578
	break;

      end_io_cache(&log);
      (void) my_close(file, MYF(MY_WME));
579

580 581
      // fake Rotate_log event just in case it did not make it to the log
      // otherwise the slave make get confused about the offset
582
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
583
	  fake_rotate_event(net, packet, log_file_name, &errmsg))
584 585
      {
	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
586
	goto err;
587
      }
588 589
      packet->length(0);
      packet->append("\0",1);
590
    }
591
  }
592

593 594
  end_io_cache(&log);
  (void)my_close(file, MYF(MY_WME));
595

596
  send_eof(thd);
597
  thd->proc_info = "waiting to finalize termination";
598 599 600
  pthread_mutex_lock(&LOCK_thread_count);
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
601
  DBUG_VOID_RETURN;
602

603
 err:
604
  thd->proc_info = "waiting to finalize termination";
605
  end_io_cache(&log);
606 607 608 609 610 611 612
  /*
    Exclude  iteration through thread list
    this is needed for purge_logs() - it will iterate through
    thread list and update thd->current_linfo->index_file_offset
    this mutex will make sure that it never tried to update our linfo
    after we return from this stack frame
  */
613
  pthread_mutex_lock(&LOCK_thread_count);
614 615
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
616 617
  if (file >= 0)
    (void) my_close(file, MYF(MY_WME));
618
  send_error(thd, my_errno, errmsg);
619 620 621
  DBUG_VOID_RETURN;
}

622
int start_slave(THD* thd , MASTER_INFO* mi,  bool net_report)
623
{
624
  int slave_errno = 0;
625 626
  if (!thd)
    thd = current_thd;
627
  int thread_mask;
628
  DBUG_ENTER("start_slave");
629
  
630
  if (check_access(thd, SUPER_ACL, any_db))
631
    DBUG_RETURN(1);
632 633
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
634 635
  if (thd->lex.slave_thd_opt)
    thread_mask &= thd->lex.slave_thd_opt;
636 637
  if (thread_mask)
  {
638 639 640
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0))
      slave_errno=ER_MASTER_INFO;
    else if (server_id_supplied && *mi->host)
641 642 643 644 645
      slave_errno = start_slave_threads(0 /*no mutex */,
					1 /* wait for start */,
					mi,
					master_info_file,relay_log_info_file,
					thread_mask);
646 647 648
    else
      slave_errno = ER_BAD_SLAVE;
  }
649
  else
650
    slave_errno = ER_SLAVE_MUST_STOP;
651 652 653
  
  unlock_slave_threads(mi);
  
654 655
  if (slave_errno)
  {
656
    if (net_report)
657
      send_error(thd, slave_errno);
658
    DBUG_RETURN(1);
659 660
  }
  else if (net_report)
661
    send_ok(thd);
662

663
  DBUG_RETURN(0);
664 665
}

666

667
int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
668
{
669
  int slave_errno = 0;
670 671
  if (!thd)
    thd = current_thd;
672

673
  if (check_access(thd, SUPER_ACL, any_db))
674
    return 1;
675 676 677 678
  thd->proc_info = "Killing slave";
  int thread_mask;
  lock_slave_threads(mi);
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
679 680
  if (thd->lex.slave_thd_opt)
    thread_mask &= thd->lex.slave_thd_opt;
681 682 683 684
  slave_errno = (thread_mask) ?
    terminate_slave_threads(mi,thread_mask,
			    1 /*skip lock */) :    ER_SLAVE_NOT_RUNNING;
  unlock_slave_threads(mi);
685 686
  thd->proc_info = 0;

687 688 689
  if (slave_errno)
  {
    if (net_report)
690
      send_error(thd, slave_errno);
691 692 693
    return 1;
  }
  else if (net_report)
694
    send_ok(thd);
695 696 697 698

  return 0;
}

699
int reset_slave(THD *thd, MASTER_INFO* mi)
700 701 702
{
  MY_STAT stat_area;
  char fname[FN_REFLEN];
703 704
  int restart_thread_mask = 0,error=0;
  const char* errmsg=0;
705 706
  DBUG_ENTER("reset_slave");

707 708 709
  lock_slave_threads(mi);
  init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */);
  if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/))
710 711 712
      || (error=purge_relay_logs(&mi->rli, thd,
				 1 /* just reset */,
				 &errmsg)))
713 714 715 716 717
    goto err;
  
  end_master_info(mi);
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
  if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
718
  {
719 720
    error=1;
    goto err;
721
  }
722
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
723
  if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
724 725 726 727 728
  {
    error=1;
    goto err;
  }
  if (restart_thread_mask)
729 730 731 732
    error=start_slave_threads(0 /* mutex not needed */,
			      1 /* wait for start*/,
			      mi,master_info_file,relay_log_info_file,
			      restart_thread_mask);
733 734 735
  // TODO: fix error messages so they get to the client
err:
  unlock_slave_threads(mi);
736
  DBUG_RETURN(error);
737 738 739 740 741
}

void kill_zombie_dump_threads(uint32 slave_server_id)
{
  pthread_mutex_lock(&LOCK_thread_count);
742 743 744
  I_List_iterator<THD> it(threads);
  THD *tmp;

745 746 747 748
  while ((tmp=it++))
  {
    if (tmp->command == COM_BINLOG_DUMP &&
       tmp->server_id == slave_server_id)
749
    {
750 751
      pthread_mutex_lock(&tmp->LOCK_delete);	// Lock from delete
      break;
752 753
    }
  }
754
  pthread_mutex_unlock(&LOCK_thread_count);
755 756 757 758 759 760 761 762 763 764
  if (tmp)
  {
    /*
      Here we do not call kill_one_thread() as
      it will be slow because it will iterate through the list
      again. We just to do kill the thread ourselves.
    */
    tmp->awake(1/*prepare to die*/);
    pthread_mutex_unlock(&tmp->LOCK_delete);
  }
765 766
}

767

768
int change_master(THD* thd, MASTER_INFO* mi)
769
{
770 771
  int error=0,restart_thread_mask;
  const char* errmsg=0;
772
  bool need_relay_log_purge=1;
773 774
  DBUG_ENTER("change_master");

775
  // kill slave thread
776 777 778 779 780 781 782
  lock_slave_threads(mi);
  init_thread_mask(&restart_thread_mask,mi,0 /*not inverse*/);
  if (restart_thread_mask &&
      (error=terminate_slave_threads(mi,
				     restart_thread_mask,
				     1 /*skip lock*/)))
  {
783
    send_error(thd,error);
784
    unlock_slave_threads(mi);
785
    DBUG_RETURN(1);
786
  }
787 788
  thd->proc_info = "changing master";
  LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
789
  // TODO: see if needs re-write
790
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0))
791
  {
792
    send_error(thd, 0, "Could not initialize master info");
793
    unlock_slave_threads(mi);
794
    DBUG_RETURN(1);
795 796
  }

797 798 799 800
  /*
    Data lock not needed since we have already stopped the running threads,
    and we have the hold on the run locks which will keep all threads that
    could possibly modify the data structures from running
801
  */
802 803 804
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
  {
    // if we change host or port, we must reset the postion
805
    mi->master_log_name[0] = 0;
806
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
807
    mi->rli.pending = 0;
808
  }
809

810
  if (lex_mi->log_file_name)
811 812
    strmake(mi->master_log_name, lex_mi->log_file_name,
	    sizeof(mi->master_log_name));
813
  if (lex_mi->pos)
814
  {
815
    mi->master_log_pos= lex_mi->pos;
816
    mi->rli.pending = 0;
817
  }
818
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
819 820

  if (lex_mi->host)
821
    strmake(mi->host, lex_mi->host, sizeof(mi->host));
822
  if (lex_mi->user)
823
    strmake(mi->user, lex_mi->user, sizeof(mi->user));
824
  if (lex_mi->password)
825
    strmake(mi->password, lex_mi->password, sizeof(mi->password));
826
  if (lex_mi->port)
827
    mi->port = lex_mi->port;
828
  if (lex_mi->connect_retry)
829 830
    mi->connect_retry = lex_mi->connect_retry;

831 832 833
  if (lex_mi->relay_log_name)
  {
    need_relay_log_purge = 0;
834
    mi->rli.skip_log_purge=1;
835
    strmake(mi->rli.relay_log_name,lex_mi->relay_log_name,
836
	    sizeof(mi->rli.relay_log_name)-1);
837 838 839 840 841 842 843 844
  }

  if (lex_mi->relay_log_pos)
  {
    need_relay_log_purge=0;
    mi->rli.relay_log_pos=lex_mi->relay_log_pos;
  }

845
  flush_master_info(mi);
846
  if (need_relay_log_purge)
847
  {
848
    mi->rli.skip_log_purge=0;
849
    thd->proc_info="purging old relay logs";
850 851
    if (purge_relay_logs(&mi->rli, thd,
			 0 /* not only reset, but also reinit */,
852 853
			 &errmsg))
    {
854
      net_printf(thd, 0, "Failed purging old relay logs: %s",errmsg);
855
      DBUG_RETURN(1);
856 857 858 859 860
    }
  }
  else
  {
    const char* msg;
861 862 863 864
    /* Relay log is already initialized */
    if (init_relay_log_pos(&mi->rli,
			   mi->rli.relay_log_name,
			   mi->rli.relay_log_pos,
865 866 867
			   0 /*no data lock*/,
			   &msg))
    {
868
      net_printf(thd,0,"Failed initializing relay log position: %s",msg);
869
      unlock_slave_threads(mi);
870
      DBUG_RETURN(1);
871
    }
872 873
  }
  mi->rli.master_log_pos = mi->master_log_pos;
874
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
875 876
  strmake(mi->rli.master_log_name,mi->master_log_name,
	  sizeof(mi->rli.master_log_name)-1);
877 878
  if (!mi->rli.master_log_name[0]) // uninitialized case
    mi->rli.master_log_pos=0;
879 880

  pthread_mutex_lock(&mi->rli.data_lock);
881
  mi->rli.abort_pos_wait++;
882
  pthread_cond_broadcast(&mi->data_cond);
883
  pthread_mutex_unlock(&mi->rli.data_lock);
884 885

  thd->proc_info = "starting slave";
886 887 888 889 890 891
  if (restart_thread_mask) 
      error=start_slave_threads(0 /* mutex not needed*/,
			        1 /* wait for start*/,
			        mi,master_info_file,relay_log_info_file,
				restart_thread_mask);
  unlock_slave_threads(mi);
892
  thd->proc_info = 0;
893
  if (error)
894
    send_error(thd,error);
895
  else
896
    send_ok(thd);
897
  DBUG_RETURN(0);
898 899
}

900
int reset_master(THD* thd)
901
{
902
  if (!mysql_bin_log.is_open())
903 904
  {
    my_error(ER_FLUSH_MASTER_BINLOG_CLOSED,  MYF(ME_BELL+ME_WAITTANG));
905
    return 1;
906
  }
907
  return mysql_bin_log.reset_logs(thd);
908 909
}

910 911 912 913
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
		   const char* log_file_name2, ulonglong log_pos2)
{
  int res;
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
914 915 916 917 918
  /*
    TODO: Change compare function to work with file name of type
          '.999 and .1000'
  */

919 920 921 922 923 924 925 926 927
  if ((res = strcmp(log_file_name1, log_file_name2)))
    return res;
  if (log_pos1 > log_pos2)
    return 1;
  else if (log_pos1 == log_pos2)
    return 0;
  return -1;
}

monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
928

929 930 931 932 933 934 935
int show_binlog_events(THD* thd)
{
  DBUG_ENTER("show_binlog_events");
  List<Item> field_list;
  const char* errmsg = 0;
  IO_CACHE log;
  File file = -1;
936 937

  Log_event::init_show_field_list(&field_list);
938 939
  if (send_fields(thd, field_list, 1))
    DBUG_RETURN(-1);
940

941 942
  if (mysql_bin_log.is_open())
  {
943
    LEX_MASTER_INFO *lex_mi = &thd->lex.mi;
944
    uint event_count, limit_start, limit_end;
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
945
    my_off_t pos = lex_mi->pos;
946 947
    char search_file_name[FN_REFLEN], *name;
    const char *log_file_name = lex_mi->log_file_name;
948
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
949 950
    LOG_INFO linfo;
    Log_event* ev;
951
  
952 953
    limit_start = thd->lex.current_select->offset_limit;
    limit_end = thd->lex.current_select->select_limit + limit_start;
954

955
    name= search_file_name;
956 957 958
    if (log_file_name)
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
    else
959
      name=0;					// Find first log
960 961 962

    linfo.index_file_offset = 0;
    thd->current_linfo = &linfo;
963

964
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
965 966 967 968 969 970 971 972 973 974 975 976 977 978
    {
      errmsg = "Could not find target log";
      goto err;
    }

    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
      goto err;

    if (pos < 4)
    {
      errmsg = "Invalid log position";
      goto err;
    }

979
    pthread_mutex_lock(log_lock);
980 981
    my_b_seek(&log, pos);

982 983
    for (event_count = 0;
	 (ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,0)); )
984 985
    {
      if (event_count >= limit_start &&
986 987 988 989
	  ev->net_send(thd, linfo.log_file_name, pos))
      {
	errmsg = "Net error";
	delete ev;
990
	pthread_mutex_unlock(log_lock);
991 992 993 994
	goto err;
      }

      pos = my_b_tell(&log);
995 996 997 998 999 1000 1001 1002 1003
      delete ev;

      if (++event_count >= limit_end)
	break;
    }

    if (event_count < limit_end && log.error)
    {
      errmsg = "Wrong offset or I/O error";
1004
      pthread_mutex_unlock(log_lock);
1005 1006
      goto err;
    }
1007

1008
    pthread_mutex_unlock(log_lock);
1009 1010 1011 1012 1013 1014 1015 1016
  }

err:
  if (file >= 0)
  {
    end_io_cache(&log);
    (void) my_close(file, MYF(MY_WME));
  }
1017

1018 1019
  if (errmsg)
  {
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
1020 1021 1022
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
	     "SHOW BINLOG EVENTS", errmsg);
    DBUG_RETURN(-1);
1023 1024
  }

1025
  send_eof(thd);
1026 1027 1028
  DBUG_RETURN(0);
}

monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
1029

1030 1031 1032 1033 1034 1035 1036 1037 1038
int show_binlog_info(THD* thd)
{
  DBUG_ENTER("show_binlog_info");
  List<Item> field_list;
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
  field_list.push_back(new Item_empty_string("Position",20));
  field_list.push_back(new Item_empty_string("Binlog_do_db",20));
  field_list.push_back(new Item_empty_string("Binlog_ignore_db",20));

1039
  if (send_fields(thd, field_list, 1))
1040 1041 1042 1043
    DBUG_RETURN(-1);
  String* packet = &thd->packet;
  packet->length(0);

1044 1045 1046 1047 1048 1049 1050 1051 1052
  if (mysql_bin_log.is_open())
  {
    LOG_INFO li;
    mysql_bin_log.get_current_log(&li);
    int dir_len = dirname_length(li.log_file_name);
    net_store_data(packet, li.log_file_name + dir_len);
    net_store_data(packet, (longlong)li.pos);
    net_store_data(packet, &binlog_do_db);
    net_store_data(packet, &binlog_ignore_db);
1053 1054
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
      DBUG_RETURN(-1);
1055
  }
1056
  send_eof(thd);
1057 1058
  DBUG_RETURN(0);
}
1059

1060

1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
/*
  Send a lost of all binary logs to client

  SYNOPSIS
    show_binlogs()
    thd		Thread specific variable

  RETURN VALUES
    0	ok
    1	error  (Error message sent to client)
*/

1073 1074
int show_binlogs(THD* thd)
{
1075 1076
  const char *errmsg;
  IO_CACHE *index_file;
1077 1078 1079
  char fname[FN_REFLEN];
  NET* net = &thd->net;
  List<Item> field_list;
1080
  String *packet = &thd->packet;
1081
  uint length;
1082 1083

  if (!mysql_bin_log.is_open())
1084
  {
1085 1086 1087
    //TODO:  Replace with ER() error message
    errmsg= "You are not using binary logging";
    goto err_with_msg;
1088
  }
1089

1090
  field_list.push_back(new Item_empty_string("Log_name", 255));
1091
  if (send_fields(thd, field_list, 1))
1092
    return 1;
1093
  mysql_bin_log.lock_index();
1094 1095 1096 1097 1098 1099
  index_file=mysql_bin_log.get_index_file();
  
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);

  /* The file ends with EOF or empty line */
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1100 1101 1102
  {
    int dir_len = dirname_length(fname);
    packet->length(0);
1103 1104
    /* The -1 is for removing newline from fname */
    net_store_data(packet, fname + dir_len, length-1-dir_len);
1105
    if (my_net_write(net, (char*) packet->ptr(), packet->length()))
1106
      goto err;
1107
  }
1108
  mysql_bin_log.unlock_index();
1109
  send_eof(thd);
1110
  return 0;
1111

1112
err_with_msg:
1113
  send_error(thd, ER_UNKNOWN_ERROR, errmsg);
1114 1115
err:
  mysql_bin_log.unlock_index();
1116
  return 1;
1117 1118
}

1119

1120 1121 1122 1123
int log_loaded_block(IO_CACHE* file)
{
  LOAD_FILE_INFO* lf_info;
  uint block_len ;
monty@bitch.mysql.fi's avatar
monty@bitch.mysql.fi committed
1124 1125 1126

  /* file->request_pos contains position where we started last read */
  char* buffer = (char*) file->request_pos;
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1127
  if (!(block_len = (char*) file->read_end - (char*) buffer))
1128
    return 0;
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1129
  lf_info = (LOAD_FILE_INFO*) file->arg;
1130 1131 1132 1133 1134 1135
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
      lf_info->last_pos_in_file >= file->pos_in_file)
    return 0;
  lf_info->last_pos_in_file = file->pos_in_file;
  if (lf_info->wrote_create_file)
  {
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
1136
    Append_block_log_event a(lf_info->thd, buffer, block_len);
1137 1138 1139 1140 1141 1142
    mysql_bin_log.write(&a);
  }
  else
  {
    Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db,
			    lf_info->table_name, *lf_info->fields,
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
1143
			    lf_info->handle_dup, buffer,
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1144
			    block_len);
1145 1146
    mysql_bin_log.write(&c);
    lf_info->wrote_create_file = 1;
1147
    DBUG_SYNC_POINT("debug_lock.created_file_event",10);
1148 1149 1150
  }
  return 0;
}