sql_repl.cc 32.5 KB
Newer Older
1
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB & Sasha
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

// Sasha Pachev <sasha@mysql.com> is currently in charge of this file

#include "mysql_priv.h"
#include "sql_repl.h"
#include "sql_acl.h"
#include "log_event.h"
23
#include "mini_client.h"
unknown's avatar
unknown committed
24
#include <my_dir.h>
25
#include <assert.h>
26 27

extern const char* any_db;
28

29
int max_binlog_dump_events = 0; // unlimited
30
my_bool opt_sporadic_binlog_dump_fail = 0;
31
static int binlog_dump_count = 0;
unknown's avatar
unknown committed
32

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
{
  char magic[4];
  DBUG_ASSERT(my_b_tell(log) == 0);

  if (my_b_read(log, (byte*) magic, sizeof(magic)))
  {
    *errmsg = "I/O error reading the header from the binary log";
    sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
		    log->error);
    return 1;
  }
  if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
  {
    *errmsg = "Binlog has bad magic number;  It's not a binary log file that can be used by this version of MySQL";
    return 1;
  }
  return 0;
}

unknown's avatar
unknown committed
53
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
54
                             ulonglong position, const char**errmsg)
unknown's avatar
unknown committed
55
{
56
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN];
unknown's avatar
unknown committed
57 58 59
  memset(header, 0, 4); // when does not matter
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;

60
  char* p = log_file_name+dirname_length(log_file_name);
unknown's avatar
unknown committed
61
  uint ident_len = (uint) strlen(p);
62
  ulong event_len = ident_len + ROTATE_EVENT_OVERHEAD;
63
  int4store(header + SERVER_ID_OFFSET, server_id);
unknown's avatar
unknown committed
64
  int4store(header + EVENT_LEN_OFFSET, event_len);
65
  int2store(header + FLAGS_OFFSET, 0);
66 67 68 69
  
  // TODO: check what problems this may cause and fix them
  int4store(header + LOG_POS_OFFSET, 0);
  
unknown's avatar
unknown committed
70
  packet->append(header, sizeof(header));
71
  int8store(buf+R_POS_OFFSET,position);
72
  packet->append(buf, ROTATE_HEADER_LEN);
unknown's avatar
unknown committed
73
  packet->append(p,ident_len);
74 75 76 77 78
  if (my_net_write(net, (char*)packet->ptr(), packet->length()))
  {
    *errmsg = "failed on my_net_write()";
    return -1;
  }
unknown's avatar
unknown committed
79 80 81
  return 0;
}

82 83 84 85 86 87
static int send_file(THD *thd)
{
  NET* net = &thd->net;
  int fd = -1,bytes, error = 1;
  char fname[FN_REFLEN+1];
  const char *errmsg = 0;
unknown's avatar
unknown committed
88 89
  int old_timeout;
  uint packet_len;
90
  char buf[IO_SIZE];				// It's safe to alloc this
91 92
  DBUG_ENTER("send_file");

unknown's avatar
unknown committed
93 94 95 96 97 98 99 100 101 102 103
  /*
    The client might be slow loading the data, give him wait_timeout to do
    the job
  */
  old_timeout = thd->net.read_timeout;
  thd->net.read_timeout = thd->variables.net_wait_timeout;

  /*
    We need net_flush here because the client will not know it needs to send
    us the file name until it has processed the load event entry
  */
unknown's avatar
unknown committed
104
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
105
  {
106
    errmsg = "while reading file name";
107 108 109
    goto err;
  }

110 111 112
  // terminate with \0 for fn_format
  *((char*)net->read_pos +  packet_len) = 0;
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
113 114 115 116
  // this is needed to make replicate-ignore-db
  if (!strcmp(fname,"/dev/null"))
    goto end;

117
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
118
  {
119
    errmsg = "on open of file";
120 121 122
    goto err;
  }

123
  while ((bytes = (int) my_read(fd, (byte*) buf, IO_SIZE, MYF(0))) > 0)
124 125 126
  {
    if (my_net_write(net, buf, bytes))
    {
127
      errmsg = "while writing data to client";
128 129 130 131 132 133 134 135
      goto err;
    }
  }

 end:
  if (my_net_write(net, "", 0) || net_flush(net) ||
      (my_net_read(net) == packet_error))
  {
136
    errmsg = "while negotiating file transfer close";
137 138 139 140 141
    goto err;
  }
  error = 0;

 err:
unknown's avatar
unknown committed
142
  thd->net.read_timeout = old_timeout;
143 144
  if (fd >= 0)
    (void) my_close(fd, MYF(0));
145 146
  if (errmsg)
  {
147
    sql_print_error("Failed in send_file() %s", errmsg);
148 149 150 151 152
    DBUG_PRINT("error", (errmsg));
  }
  DBUG_RETURN(error);
}

153

154
File open_binlog(IO_CACHE *log, const char *log_file_name,
155
		 const char **errmsg)
156 157
{
  File file;
unknown's avatar
unknown committed
158
  DBUG_ENTER("open_binlog");
159

160 161 162 163 164 165 166 167
  if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0)
  {
    sql_print_error("Failed to open log (\
file '%s', errno %d)", log_file_name, my_errno);
    *errmsg = "Could not open log file";	// This will not be sent
    goto err;
  }
  if (init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
unknown's avatar
unknown committed
168
		    MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))
169
  {
170 171
    sql_print_error("Failed to create a cache on log (\
file '%s')", log_file_name);
172
    *errmsg = "Could not open log file";	// This will not be sent
173 174
    goto err;
  }
175
  if (check_binlog_magic(log,errmsg))
176
    goto err;
unknown's avatar
unknown committed
177
  DBUG_RETURN(file);
178 179

err:
180 181
  if (file >= 0)
  {
182
    my_close(file,MYF(0));
183 184
    end_io_cache(log);
  }
unknown's avatar
unknown committed
185
  DBUG_RETURN(-1);
186 187 188
}


189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
/*
  Adjust the position pointer in the binary log file for all running slaves

  SYNOPSIS
    adjust_linfo_offsets()
    purge_offset	Number of bytes removed from start of log index file

  NOTES
    - This is called when doing a PURGE when we delete lines from the
      index log file

  REQUIREMENTS
    - Before calling this function, we have to ensure that no threads are
      using any binary log file before purge_offset.a

  TODO
    - Inform the slave threads that they should sync the position
      in the binary log file with flush_relay_log_info.
      Now they sync is done for next read.
*/

unknown's avatar
unknown committed
210 211 212
void adjust_linfo_offsets(my_off_t purge_offset)
{
  THD *tmp;
213

unknown's avatar
unknown committed
214 215 216
  pthread_mutex_lock(&LOCK_thread_count);
  I_List_iterator<THD> it(threads);

217 218 219 220 221 222
  while ((tmp=it++))
  {
    LOG_INFO* linfo;
    if ((linfo = tmp->current_linfo))
    {
      pthread_mutex_lock(&linfo->lock);
223 224 225 226
      /*
	Index file offset can be less that purge offset only if
	we just started reading the index file. In that case
	we have nothing to adjust
227
      */
228 229 230 231 232 233 234
      if (linfo->index_file_offset < purge_offset)
	linfo->fatal = (linfo->index_file_offset != 0);
      else
	linfo->index_file_offset -= purge_offset;
      pthread_mutex_unlock(&linfo->lock);
    }
  }
unknown's avatar
unknown committed
235 236 237
  pthread_mutex_unlock(&LOCK_thread_count);
}

unknown's avatar
merge  
unknown committed
238

unknown's avatar
unknown committed
239 240 241 242 243
bool log_in_use(const char* log_name)
{
  int log_name_len = strlen(log_name) + 1;
  THD *tmp;
  bool result = 0;
244

unknown's avatar
unknown committed
245 246
  pthread_mutex_lock(&LOCK_thread_count);
  I_List_iterator<THD> it(threads);
247 248 249 250 251

  while ((tmp=it++))
  {
    LOG_INFO* linfo;
    if ((linfo = tmp->current_linfo))
unknown's avatar
unknown committed
252
    {
253 254 255
      pthread_mutex_lock(&linfo->lock);
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
      pthread_mutex_unlock(&linfo->lock);
256 257
      if (result)
	break;
258 259
    }
  }
unknown's avatar
unknown committed
260 261 262 263 264

  pthread_mutex_unlock(&LOCK_thread_count);
  return result;
}

unknown's avatar
merge  
unknown committed
265

unknown's avatar
unknown committed
266 267 268
int purge_master_logs(THD* thd, const char* to_log)
{
  char search_file_name[FN_REFLEN];
269
  const char* errmsg = 0;
270 271
  int res;

unknown's avatar
unknown committed
272
  if (!mysql_bin_log.is_open())
273
    goto end;
274

unknown's avatar
unknown committed
275
  mysql_bin_log.make_log_name(search_file_name, to_log);
276
  res = mysql_bin_log.purge_logs(thd, search_file_name);
277 278 279 280 281 282

  switch(res)  {
  case 0: break;
  case LOG_INFO_EOF:	 errmsg = "Target log not found in binlog index"; break;
  case LOG_INFO_IO:	 errmsg = "I/O error reading log index file"; break;
  case LOG_INFO_INVALID: errmsg = "Server configuration does not permit \
unknown's avatar
unknown committed
283
binlog purge"; break;
284 285 286 287 288 289 290 291 292 293 294 295 296 297
  case LOG_INFO_SEEK:	errmsg = "Failed on fseek()"; break;
  case LOG_INFO_MEM:	errmsg = "Out of memory"; break;
  case LOG_INFO_FATAL:	errmsg = "Fatal error during purge"; break;
  case LOG_INFO_IN_USE: errmsg = "A purgeable log is in use, will not purge";
    break;
  default:		errmsg = "Unknown error during purge"; break;
  }

  if (errmsg)
  {
    send_error(&thd->net, 0, errmsg);
    return 1;
  }

298 299
end:
  send_ok(&thd->net);
unknown's avatar
unknown committed
300
  return 0;
unknown's avatar
unknown committed
301 302
}

303 304 305
/*
  TODO: Clean up loop to only have one call to send_file()
*/
unknown's avatar
merge  
unknown committed
306

307 308
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
		       ushort flags)
309 310 311
{
  LOG_INFO linfo;
  char *log_file_name = linfo.log_file_name;
312
  char search_file_name[FN_REFLEN], *name;
313 314
  IO_CACHE log;
  File file = -1;
315 316 317 318
  String* packet = &thd->packet;
  int error;
  const char *errmsg = "Unknown error";
  NET* net = &thd->net;
319 320
#ifndef DBUG_OFF
  int left_events = max_binlog_dump_events;
321
#endif
322
  DBUG_ENTER("mysql_binlog_send");
323 324
  DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));

325
  bzero((char*) &log,sizeof(log));
326

unknown's avatar
unknown committed
327
#ifndef DBUG_OFF
328 329 330
  if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
  {
    errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
331
    my_errno= ER_UNKNOWN_ERROR;
332 333
    goto err;
  }
334
#endif
335

unknown's avatar
unknown committed
336
  if (!mysql_bin_log.is_open())
337 338
  {
    errmsg = "Binary log is not open";
339
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
340 341
    goto err;
  }
342 343 344
  if (!server_id_supplied)
  {
    errmsg = "Misconfigured master - server id was not set";
345
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
346 347 348
    goto err;
  }

349
  name=search_file_name;
350
  if (log_ident[0])
351 352
    mysql_bin_log.make_log_name(search_file_name, log_ident);
  else
353
    name=0;					// Find first log
354

unknown's avatar
unknown committed
355 356
  linfo.index_file_offset = 0;
  thd->current_linfo = &linfo;
357

358
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
359
  {
360
    errmsg = "Could not find first log file name in binary log index file";
361
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
362 363
    goto err;
  }
364

365
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
366 367
  {
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
368
    goto err;
369 370
  }
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
371
  {
372
    errmsg= "Client requested master to start replication from \
unknown's avatar
unknown committed
373
impossible position";
374
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
375 376
    goto err;
  }
377

378
  my_b_seek(&log, pos);				// Seek will done on next read
379 380 381 382 383
  /*
    We need to start a packet with something other than 255
    to distiquish it from error
  */
  packet->set("\0", 1);
384

385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
  /*
    Before 4.0.14 we called fake_rotate_event below only if 
    (pos == BIN_LOG_HEADER_SIZE), because if this is false then the slave
    already knows the binlog's name.
    Now we always call fake_rotate_event; if the slave already knew the log's
    name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is useless but does not
    harm much. It is nice for 3.23 (>=.58) slaves which test Rotate events
    to see if the master is 4.0 (then they choose to stop because they can't
    replicate 4.0); by always calling fake_rotate_event we are sure that 3.23.58
    and newer will detect the problem as soon as replication starts (BUG#198).
    Always calling fake_rotate_event makes sending of normal
    (=from-binlog) Rotate events a priori unneeded, but it is not so simple: the
    2 Rotate events are not equivalent, the normal one is before the Stop event,
    the fake one is after. If we don't send the normal one, then the Stop event
    will be interpreted (by existing 4.0 slaves) as "the master stopped", which
    is wrong. So for safety, given that we want minimum modification of 4.0, we
    send the normal and fake Rotates.
  */
  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
unknown's avatar
unknown committed
404
  {
405 406
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
    goto err;
unknown's avatar
unknown committed
407
  }
408
  packet->set("\0", 1);
unknown's avatar
unknown committed
409

unknown's avatar
unknown committed
410
  while (!net->error && net->vio != 0 && !thd->killed)
411 412
  {
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
413

414 415
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
    {
416
#ifndef DBUG_OFF
417
      if (max_binlog_dump_events && !left_events--)
418 419 420
      {
	net_flush(net);
	errmsg = "Debugging binlog dump abort";
421
	my_errno= ER_UNKNOWN_ERROR;
422 423
	goto err;
      }
424
#endif
unknown's avatar
unknown committed
425
      if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
426 427
      {
	errmsg = "Failed on my_net_write()";
428
	my_errno= ER_UNKNOWN_ERROR;
429 430 431 432
	goto err;
      }
      DBUG_PRINT("info", ("log event code %d",
			  (*packet)[LOG_EVENT_OFFSET+1] ));
unknown's avatar
unknown committed
433
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
434
      {
435
	if (send_file(thd))
436
	{
437
	  errmsg = "failed in send_file()";
438
	  my_errno= ER_UNKNOWN_ERROR;
439 440
	  goto err;
	}
441
      }
442
      packet->set("\0", 1);
443
    }
444 445 446 447
    /*
      TODO: now that we are logging the offset, check to make sure
      the recorded offset and the actual match
    */
unknown's avatar
unknown committed
448
    if (error != LOG_READ_EOF)
449
    {
450
      my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
451
      switch (error) {
452
      case LOG_READ_BOGUS:
453 454
	errmsg = "bogus data in log event";
	break;
455
      case LOG_READ_TOO_LARGE:
456 457
	errmsg = "log event entry exceeded max_allowed_packet; \
Increase max_allowed_packet on master";
unknown's avatar
unknown committed
458
	break;
459 460 461 462 463 464 465 466 467
      case LOG_READ_IO:
	errmsg = "I/O error reading log event";
	break;
      case LOG_READ_MEM:
	errmsg = "memory allocation failed reading log event";
	break;
      case LOG_READ_TRUNC:
	errmsg = "binlog truncated in the middle of event";
	break;
unknown's avatar
unknown committed
468 469 470
      default:
	errmsg = "unknown error reading log event on the master";
	break;
471 472 473
      }
      goto err;
    }
474

475
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
476
       mysql_bin_log.is_active(log_file_name))
477
    {
478 479 480
      /*
	Block until there is more data in the log
      */
481
      if (net_flush(net))
482 483
      {
	errmsg = "failed on net_flush()";
484
	my_errno= ER_UNKNOWN_ERROR;
485 486
	goto err;
      }
487

488 489 490 491 492 493
      /*
	We may have missed the update broadcast from the log
	that has just happened, let's try to catch it if it did.
	If we did not miss anything, we just wait for other threads
	to signal us.
      */
494 495 496 497
      {
	log.error=0;
	bool read_packet = 0, fatal_error = 0;

498
#ifndef DBUG_OFF
499
	if (max_binlog_dump_events && !left_events--)
500 501
	{
	  errmsg = "Debugging binlog dump abort";
502
	  my_errno= ER_UNKNOWN_ERROR;
503 504
	  goto err;
	}
505
#endif
506

507 508 509 510
	/*
	  No one will update the log while we are reading
	  now, but we'll be quick and just read one record

511 512 513 514
	  TODO:
	  Add an counter that is incremented for each time we update
	  the binary log.  We can avoid the following read if the counter
	  has not been updated since last read.
515
	*/
516

unknown's avatar
unknown committed
517
	pthread_mutex_lock(log_lock);
518
	switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
519
	case 0:
520
	  /* we read successfully, so we'll need to send it to the slave */
521
	  pthread_mutex_unlock(log_lock);
522 523
	  read_packet = 1;
	  break;
524

525
	case LOG_READ_EOF:
526
	  DBUG_PRINT("wait",("waiting for data in binary log"));
unknown's avatar
unknown committed
527
	  if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
unknown's avatar
unknown committed
528 529 530 531
	  {
	    pthread_mutex_unlock(log_lock);
	    goto end;
	  }
unknown's avatar
unknown committed
532
	  if (!thd->killed)
533 534
	  {
	    /* Note that the following call unlocks lock_log */
535
	    mysql_bin_log.wait_for_update(thd, 0);
536 537 538
	  }
	  else
	    pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
539
	  DBUG_PRINT("wait",("binary log received update"));
540 541 542
	  break;

	default:
543
	  pthread_mutex_unlock(log_lock);
544 545
	  fatal_error = 1;
	  break;
546
	}
547

548
	if (read_packet)
549
	{
550
	  thd->proc_info = "Sending binlog event to slave";
551
	  if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
552 553
	  {
	    errmsg = "Failed on my_net_write()";
554
	    my_errno= ER_UNKNOWN_ERROR;
555 556
	    goto err;
	  }
unknown's avatar
unknown committed
557

558
	  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
559
	  {
560
	    if (send_file(thd))
unknown's avatar
unknown committed
561
	    {
562
	      errmsg = "failed in send_file()";
563
	      my_errno= ER_UNKNOWN_ERROR;
unknown's avatar
unknown committed
564 565
	      goto err;
	    }
566
	  }
567 568 569 570 571
	  packet->set("\0", 1);
	  /*
	    No need to net_flush because we will get to flush later when
	    we hit EOF pretty quick
	  */
572
	}
573

574
	if (fatal_error)
575 576
	{
	  errmsg = "error reading log entry";
577
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
578
	  goto err;
579
	}
580 581
	log.error=0;
      }
582
    }
583 584 585 586
    else
    {
      bool loop_breaker = 0;
      // need this to break out of the for loop from switch
587
      thd->proc_info = "Finished reading one binlog; switching to next binlog";
588
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
589 590 591 592 593 594 595
      case LOG_INFO_EOF:
	loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
	break;
      case 0:
	break;
      default:
	errmsg = "could not find next log";
596
	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
597 598 599
	goto err;
      }

600
      if (loop_breaker)
601 602 603 604
	break;

      end_io_cache(&log);
      (void) my_close(file, MYF(MY_WME));
605

606 607 608 609
      /*
        Even if the previous log contained a Rotate_log_event, we still fake
        one.
      */
610
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
611
	  fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE, &errmsg))
612 613
      {
	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
unknown's avatar
unknown committed
614
	goto err;
615
      }
unknown's avatar
unknown committed
616 617
      packet->length(0);
      packet->append("\0",1);
618
    }
619
  }
620

unknown's avatar
unknown committed
621
end:
622 623
  end_io_cache(&log);
  (void)my_close(file, MYF(MY_WME));
624

625
  send_eof(&thd->net);
626
  thd->proc_info = "Waiting to finalize termination";
unknown's avatar
unknown committed
627 628 629
  pthread_mutex_lock(&LOCK_thread_count);
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
630
  DBUG_VOID_RETURN;
631

unknown's avatar
unknown committed
632
err:
633
  thd->proc_info = "Waiting to finalize termination";
634
  end_io_cache(&log);
unknown's avatar
unknown committed
635 636 637 638 639 640 641
  /*
    Exclude  iteration through thread list
    this is needed for purge_logs() - it will iterate through
    thread list and update thd->current_linfo->index_file_offset
    this mutex will make sure that it never tried to update our linfo
    after we return from this stack frame
  */
642
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
643 644
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
645 646
  if (file >= 0)
    (void) my_close(file, MYF(MY_WME));
unknown's avatar
unknown committed
647
  send_error(&thd->net, my_errno, errmsg);
648 649 650
  DBUG_VOID_RETURN;
}

651
int start_slave(THD* thd , MASTER_INFO* mi,  bool net_report)
652
{
653
  int slave_errno = 0;
654 655
  if (!thd) thd = current_thd;
  NET* net = &thd->net;
656
  int thread_mask;
657
  DBUG_ENTER("start_slave");
658
  
unknown's avatar
unknown committed
659
  if (check_access(thd, SUPER_ACL, any_db))
660
    DBUG_RETURN(1);
661 662
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
663 664
  if (thd->lex.slave_thd_opt)
    thread_mask &= thd->lex.slave_thd_opt;
665 666
  if (thread_mask)
  {
667 668 669
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0))
      slave_errno=ER_MASTER_INFO;
    else if (server_id_supplied && *mi->host)
670 671 672 673 674
      slave_errno = start_slave_threads(0 /*no mutex */,
					1 /* wait for start */,
					mi,
					master_info_file,relay_log_info_file,
					thread_mask);
675 676 677
    else
      slave_errno = ER_BAD_SLAVE;
  }
678
  else
679
    slave_errno = ER_SLAVE_MUST_STOP;
680 681 682
  
  unlock_slave_threads(mi);
  
683 684
  if (slave_errno)
  {
685 686
    if (net_report)
      send_error(net, slave_errno);
687
    DBUG_RETURN(1);
688 689
  }
  else if (net_report)
690 691
    send_ok(net);

692
  DBUG_RETURN(0);
693 694
}

695
int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
696
{
697
  int slave_errno = 0;
698 699 700
  if (!thd) thd = current_thd;
  NET* net = &thd->net;

unknown's avatar
unknown committed
701
  if (check_access(thd, SUPER_ACL, any_db))
702
    return 1;
703 704 705 706
  thd->proc_info = "Killing slave";
  int thread_mask;
  lock_slave_threads(mi);
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
707 708
  if (thd->lex.slave_thd_opt)
    thread_mask &= thd->lex.slave_thd_opt;
709 710 711 712
  slave_errno = (thread_mask) ?
    terminate_slave_threads(mi,thread_mask,
			    1 /*skip lock */) :    ER_SLAVE_NOT_RUNNING;
  unlock_slave_threads(mi);
713 714
  thd->proc_info = 0;

715 716 717 718 719 720 721
  if (slave_errno)
  {
    if (net_report)
      send_error(net, slave_errno);
    return 1;
  }
  else if (net_report)
722 723 724 725 726
    send_ok(net);

  return 0;
}

727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748

/*
  Remove all relay logs and start replication from the start

  SYNOPSIS
    reset_slave()
    thd			Thread handler
    mi			Master info for the slave


  NOTES
    We don't send ok in this functions as this is called from
    reload_acl_and_cache() which may have done other tasks, which may
    have failed for which we want to send and error.

  RETURN
    0	ok
    1	error
	In this case error is sent to the client with send_error()
*/


749
int reset_slave(THD *thd, MASTER_INFO* mi)
750 751 752
{
  MY_STAT stat_area;
  char fname[FN_REFLEN];
753 754
  int thread_mask= 0, error= 0;
  uint sql_errno=0;
755
  const char* errmsg=0;
unknown's avatar
unknown committed
756 757
  DBUG_ENTER("reset_slave");

758
  lock_slave_threads(mi);
759 760 761 762 763 764 765
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
  if (thread_mask) // We refuse if any slave thread is running
  {
    sql_errno= ER_SLAVE_MUST_STOP;
    error=1;
    goto err;
  }
766
  //delete relay logs, clear relay log coordinates
767 768 769
  if ((error= purge_relay_logs(&mi->rli, thd,
			       1 /* just reset */,
			       &errmsg)))
770 771
    goto err;
  
unknown's avatar
unknown committed
772 773 774 775 776 777 778 779 780
  /*
    Clear master's log coordinates and reset host/user/etc to the values
    specified in mysqld's options (only for good display of SHOW SLAVE STATUS;
    next init_master_info() (in start_slave() for example) would have set them
    the same way; but here this is for the case where the user does SHOW SLAVE
    STATUS; before doing START SLAVE;
  */
  init_master_info_with_options(mi);
  clear_last_slave_error(&mi->rli);
781
  //close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
782
  end_master_info(mi);
783
  //and delete these two files
784 785
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
  if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
786
  {
787 788
    error=1;
    goto err;
789
  }
790
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
791
  if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
792 793 794 795
  {
    error=1;
    goto err;
  }
796

797 798
err:
  unlock_slave_threads(mi);
799 800
  if (thd && error) 
    send_error(&thd->net, sql_errno, errmsg);
unknown's avatar
unknown committed
801
  DBUG_RETURN(error);
802 803
}

804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
/*

  Kill all Binlog_dump threads which previously talked to the same slave
  ("same" means with the same server id). Indeed, if the slave stops, if the
  Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
  will keep existing until a query is written to the binlog. If the master is
  idle, then this could last long, and if the slave reconnects, we could have 2
  Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
  binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
  the master kills any existing thread with the slave's server id (if this id is
  not zero; it will be true for real slaves, but false for mysqlbinlog when it
  sends COM_BINLOG_DUMP to get a remote binlog dump).

  SYNOPSIS
    kill_zombie_dump_threads()
    slave_server_id     the slave's server id

*/
  
823

824 825 826
void kill_zombie_dump_threads(uint32 slave_server_id)
{
  pthread_mutex_lock(&LOCK_thread_count);
827 828 829
  I_List_iterator<THD> it(threads);
  THD *tmp;

830 831 832 833
  while ((tmp=it++))
  {
    if (tmp->command == COM_BINLOG_DUMP &&
       tmp->server_id == slave_server_id)
834
    {
835 836
      pthread_mutex_lock(&tmp->LOCK_delete);	// Lock from delete
      break;
837 838
    }
  }
839
  pthread_mutex_unlock(&LOCK_thread_count);
840 841 842 843 844 845 846 847 848 849
  if (tmp)
  {
    /*
      Here we do not call kill_one_thread() as
      it will be slow because it will iterate through the list
      again. We just to do kill the thread ourselves.
    */
    tmp->awake(1/*prepare to die*/);
    pthread_mutex_unlock(&tmp->LOCK_delete);
  }
850 851
}

852

853
int change_master(THD* thd, MASTER_INFO* mi)
854
{
855
  int thread_mask;
856
  const char* errmsg=0;
857
  bool need_relay_log_purge=1;
858 859
  DBUG_ENTER("change_master");

860
  lock_slave_threads(mi);
861 862
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
  if (thread_mask) // We refuse if any slave thread is running
863
  {
864
    net_printf(&thd->net,ER_SLAVE_MUST_STOP);
865
    unlock_slave_threads(mi);
866
    DBUG_RETURN(1);
867
  }
868

869
  thd->proc_info = "Changing master";
870
  LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
871
  // TODO: see if needs re-write
872
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0))
873
  {
874
    send_error(&thd->net, ER_MASTER_INFO);
875
    unlock_slave_threads(mi);
876
    DBUG_RETURN(1);
877 878
  }

879 880 881 882
  /*
    Data lock not needed since we have already stopped the running threads,
    and we have the hold on the run locks which will keep all threads that
    could possibly modify the data structures from running
883
  */
884 885 886 887 888 889

  /*
    If the user specified host or port without binlog or position, 
    reset binlog's name to FIRST and position to 4.
  */ 

890 891
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
  {
892
    mi->master_log_name[0] = 0;
893
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
894
    mi->rli.pending = 0;
895
  }
896

897
  if (lex_mi->log_file_name)
898
    strmake(mi->master_log_name, lex_mi->log_file_name,
899
	    sizeof(mi->master_log_name)-1);
900
  if (lex_mi->pos)
unknown's avatar
unknown committed
901
  {
902
    mi->master_log_pos= lex_mi->pos;
903
    mi->rli.pending = 0;
unknown's avatar
unknown committed
904
  }
905
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
906 907

  if (lex_mi->host)
908
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
909
  if (lex_mi->user)
910
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
911
  if (lex_mi->password)
912
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
913
  if (lex_mi->port)
914
    mi->port = lex_mi->port;
915
  if (lex_mi->connect_retry)
916 917
    mi->connect_retry = lex_mi->connect_retry;

918 919
  if (lex_mi->relay_log_name)
  {
920
    need_relay_log_purge= 0;
921
    strmake(mi->rli.relay_log_name,lex_mi->relay_log_name,
unknown's avatar
unknown committed
922
	    sizeof(mi->rli.relay_log_name)-1);
923 924 925 926
  }

  if (lex_mi->relay_log_pos)
  {
927
    need_relay_log_purge= 0;
928 929 930
    mi->rli.relay_log_pos=lex_mi->relay_log_pos;
  }

931
  flush_master_info(mi);
932
  if (need_relay_log_purge)
933
  {
934
    mi->rli.skip_log_purge= 0;
935
    thd->proc_info="Purging old relay logs";
936 937
    if (purge_relay_logs(&mi->rli, thd,
			 0 /* not only reset, but also reinit */,
938 939
			 &errmsg))
    {
940
      net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg);
941
      unlock_slave_threads(mi);
942
      DBUG_RETURN(1);
943 944 945 946 947
    }
  }
  else
  {
    const char* msg;
948
    mi->rli.skip_log_purge= 1;
949 950 951 952
    /* Relay log is already initialized */
    if (init_relay_log_pos(&mi->rli,
			   mi->rli.relay_log_name,
			   mi->rli.relay_log_pos,
953 954 955 956 957
			   0 /*no data lock*/,
			   &msg))
    {
      net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg);
      unlock_slave_threads(mi);
958
      DBUG_RETURN(1);
959
    }
960
  }
961
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
962 963
  /* If changing RELAY_LOG_FILE or RELAY_LOG_POS, this will be nonsense: */
  mi->rli.master_log_pos = mi->master_log_pos;
964 965
  strmake(mi->rli.master_log_name,mi->master_log_name,
	  sizeof(mi->rli.master_log_name)-1);
966 967
  if (!mi->rli.master_log_name[0]) // uninitialized case
    mi->rli.master_log_pos=0;
968 969

  pthread_mutex_lock(&mi->rli.data_lock);
970
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
unknown's avatar
unknown committed
971 972
  /* Clear the error, for a clean start. */
  clear_last_slave_error(&mi->rli);
973 974 975 976 977 978 979 980
  /* 
     If we don't write new coordinates to disk now, then old will remain in
     relay-log.info until START SLAVE is issued; but if mysqld is shutdown
     before START SLAVE, then old will remain in relay-log.info, and will be the
     in-memory value at restart (thus causing errors, as the old relay log does
     not exist anymore).
  */
  flush_relay_log_info(&mi->rli); 
981
  pthread_cond_broadcast(&mi->data_cond);
982
  pthread_mutex_unlock(&mi->rli.data_lock);
983

984
  unlock_slave_threads(mi);
985
  thd->proc_info = 0;
986
  send_ok(&thd->net);
987
  DBUG_RETURN(0);
988 989
}

990
int reset_master(THD* thd)
991
{
unknown's avatar
unknown committed
992
  if (!mysql_bin_log.is_open())
993 994
  {
    my_error(ER_FLUSH_MASTER_BINLOG_CLOSED,  MYF(ME_BELL+ME_WAITTANG));
995
    return 1;
996
  }
997
  return mysql_bin_log.reset_logs(thd);
998 999
}

unknown's avatar
unknown committed
1000 1001 1002 1003
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
		   const char* log_file_name2, ulonglong log_pos2)
{
  int res;
1004 1005
  uint log_file_name1_len=  strlen(log_file_name1);
  uint log_file_name2_len=  strlen(log_file_name2);
unknown's avatar
unknown committed
1006

1007 1008 1009 1010 1011 1012 1013 1014
  //  We assume that both log names match up to '.'
  if (log_file_name1_len == log_file_name2_len)
  {
    if ((res= strcmp(log_file_name1, log_file_name2)))
      return res;
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
  }
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
unknown's avatar
unknown committed
1015 1016
}

unknown's avatar
unknown committed
1017

unknown's avatar
unknown committed
1018 1019 1020 1021 1022 1023 1024
int show_binlog_events(THD* thd)
{
  DBUG_ENTER("show_binlog_events");
  List<Item> field_list;
  const char* errmsg = 0;
  IO_CACHE log;
  File file = -1;
1025 1026

  Log_event::init_show_field_list(&field_list);
unknown's avatar
unknown committed
1027 1028
  if (send_fields(thd, field_list, 1))
    DBUG_RETURN(-1);
1029

unknown's avatar
unknown committed
1030
  if (mysql_bin_log.is_open())
unknown's avatar
unknown committed
1031
  {
1032
    LEX_MASTER_INFO *lex_mi = &thd->lex.mi;
1033
    ha_rows event_count, limit_start, limit_end;
1034
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1035 1036
    char search_file_name[FN_REFLEN], *name;
    const char *log_file_name = lex_mi->log_file_name;
1037
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1038 1039
    LOG_INFO linfo;
    Log_event* ev;
1040
  
unknown's avatar
unknown committed
1041 1042 1043
    limit_start = thd->lex.select->offset_limit;
    limit_end = thd->lex.select->select_limit + limit_start;

1044
    name= search_file_name;
unknown's avatar
unknown committed
1045 1046 1047
    if (log_file_name)
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
    else
1048
      name=0;					// Find first log
unknown's avatar
unknown committed
1049 1050 1051

    linfo.index_file_offset = 0;
    thd->current_linfo = &linfo;
1052

1053
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
unknown's avatar
unknown committed
1054 1055 1056 1057 1058 1059 1060 1061
    {
      errmsg = "Could not find target log";
      goto err;
    }

    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
      goto err;

1062
    pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
1063 1064
    my_b_seek(&log, pos);

1065 1066
    for (event_count = 0;
	 (ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,0)); )
unknown's avatar
unknown committed
1067 1068
    {
      if (event_count >= limit_start &&
1069 1070 1071 1072
	  ev->net_send(thd, linfo.log_file_name, pos))
      {
	errmsg = "Net error";
	delete ev;
1073
	pthread_mutex_unlock(log_lock);
1074 1075 1076 1077
	goto err;
      }

      pos = my_b_tell(&log);
unknown's avatar
unknown committed
1078 1079 1080 1081 1082 1083 1084 1085 1086
      delete ev;

      if (++event_count >= limit_end)
	break;
    }

    if (event_count < limit_end && log.error)
    {
      errmsg = "Wrong offset or I/O error";
1087
      pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
1088 1089
      goto err;
    }
1090

1091
    pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
1092 1093 1094 1095 1096 1097 1098 1099
  }

err:
  if (file >= 0)
  {
    end_io_cache(&log);
    (void) my_close(file, MYF(MY_WME));
  }
1100

unknown's avatar
unknown committed
1101 1102
  if (errmsg)
  {
unknown's avatar
unknown committed
1103 1104 1105
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
	     "SHOW BINLOG EVENTS", errmsg);
    DBUG_RETURN(-1);
unknown's avatar
unknown committed
1106 1107 1108
  }

  send_eof(&thd->net);
1109 1110 1111
  pthread_mutex_lock(&LOCK_thread_count);
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
1112 1113 1114
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1115

1116 1117 1118 1119 1120 1121 1122 1123 1124
int show_binlog_info(THD* thd)
{
  DBUG_ENTER("show_binlog_info");
  List<Item> field_list;
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
  field_list.push_back(new Item_empty_string("Position",20));
  field_list.push_back(new Item_empty_string("Binlog_do_db",20));
  field_list.push_back(new Item_empty_string("Binlog_ignore_db",20));

1125
  if (send_fields(thd, field_list, 1))
1126 1127 1128 1129
    DBUG_RETURN(-1);
  String* packet = &thd->packet;
  packet->length(0);

unknown's avatar
unknown committed
1130
  if (mysql_bin_log.is_open())
1131 1132 1133 1134 1135 1136 1137 1138
  {
    LOG_INFO li;
    mysql_bin_log.get_current_log(&li);
    int dir_len = dirname_length(li.log_file_name);
    net_store_data(packet, li.log_file_name + dir_len);
    net_store_data(packet, (longlong)li.pos);
    net_store_data(packet, &binlog_do_db);
    net_store_data(packet, &binlog_ignore_db);
1139 1140
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
      DBUG_RETURN(-1);
1141
  }
1142 1143 1144
  send_eof(&thd->net);
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1145

1146

1147
/*
unknown's avatar
unknown committed
1148
  Send a list of all binary logs to client
1149 1150 1151 1152 1153 1154 1155 1156 1157 1158

  SYNOPSIS
    show_binlogs()
    thd		Thread specific variable

  RETURN VALUES
    0	ok
    1	error  (Error message sent to client)
*/

unknown's avatar
unknown committed
1159 1160
int show_binlogs(THD* thd)
{
1161
  IO_CACHE *index_file;
unknown's avatar
unknown committed
1162 1163 1164
  char fname[FN_REFLEN];
  NET* net = &thd->net;
  List<Item> field_list;
1165
  String *packet = &thd->packet;
1166
  uint length;
1167

unknown's avatar
unknown committed
1168
  if (!mysql_bin_log.is_open())
1169
  {
1170
    //TODO:  Replace with ER() error message
1171 1172
    send_error(net, 0, "You are not using binary logging");
    return 1;
1173
  }
unknown's avatar
unknown committed
1174

1175
  field_list.push_back(new Item_empty_string("Log_name", 255));
1176
  if (send_fields(thd, field_list, 1))
1177
    return 1;
unknown's avatar
unknown committed
1178
  mysql_bin_log.lock_index();
1179 1180 1181 1182 1183 1184
  index_file=mysql_bin_log.get_index_file();
  
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);

  /* The file ends with EOF or empty line */
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1185 1186 1187
  {
    int dir_len = dirname_length(fname);
    packet->length(0);
1188 1189
    /* The -1 is for removing newline from fname */
    net_store_data(packet, fname + dir_len, length-1-dir_len);
1190
    if (my_net_write(net, (char*) packet->ptr(), packet->length()))
1191
      goto err;
1192
  }
unknown's avatar
unknown committed
1193
  mysql_bin_log.unlock_index();
1194
  send_eof(net);
unknown's avatar
unknown committed
1195
  return 0;
1196

1197 1198
err:
  mysql_bin_log.unlock_index();
1199
  return 1;
unknown's avatar
unknown committed
1200 1201
}

1202

1203 1204 1205 1206
int log_loaded_block(IO_CACHE* file)
{
  LOAD_FILE_INFO* lf_info;
  uint block_len ;
unknown's avatar
unknown committed
1207 1208 1209

  /* file->request_pos contains position where we started last read */
  char* buffer = (char*) file->request_pos;
unknown's avatar
unknown committed
1210
  if (!(block_len = (char*) file->read_end - (char*) buffer))
1211
    return 0;
unknown's avatar
unknown committed
1212
  lf_info = (LOAD_FILE_INFO*) file->arg;
1213 1214 1215 1216 1217 1218
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
      lf_info->last_pos_in_file >= file->pos_in_file)
    return 0;
  lf_info->last_pos_in_file = file->pos_in_file;
  if (lf_info->wrote_create_file)
  {
unknown's avatar
unknown committed
1219
    Append_block_log_event a(lf_info->thd, lf_info->db, buffer, block_len,
1220
			     lf_info->log_delayed);
1221 1222 1223 1224 1225 1226
    mysql_bin_log.write(&a);
  }
  else
  {
    Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db,
			    lf_info->table_name, *lf_info->fields,
unknown's avatar
unknown committed
1227
			    lf_info->handle_dup, buffer,
1228
			    block_len, lf_info->log_delayed);
1229 1230
    mysql_bin_log.write(&c);
    lf_info->wrote_create_file = 1;
1231
    DBUG_SYNC_POINT("debug_lock.created_file_event",10);
1232 1233 1234
  }
  return 0;
}