rpl_rli.cc 64.1 KB
Newer Older
Sergei Golubchik's avatar
Sergei Golubchik committed
1
/* Copyright (c) 2006, 2013, Oracle and/or its affiliates.
Sergei Golubchik's avatar
Sergei Golubchik committed
2
   Copyright (c) 2010, 2013, Monty Program Ab
3 4 5

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
6
   the Free Software Foundation; version 2 of the License.
7 8 9 10 11 12 13

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
Sergei Golubchik's avatar
Sergei Golubchik committed
14 15
   along with this program; if not, write to the Free Software Foundation,
   51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
16

17 18
#include "sql_priv.h"
#include "unireg.h"                             // HAVE_*
19
#include "rpl_mi.h"
20
#include "rpl_rli.h"
21
#include "sql_base.h"                        // close_thread_tables
22 23
#include <my_dir.h>    // For MY_STAT
#include "sql_repl.h"  // For check_binlog_magic
24 25 26
#include "log_event.h" // Format_description_log_event, Log_event,
                       // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT,
                       // PREFIX_SQL_LOAD
27
#include "rpl_utility.h"
Konstantin Osipov's avatar
Konstantin Osipov committed
28
#include "transaction.h"
29
#include "sql_parse.h"                          // end_trans, ROLLBACK
30 31
#include <mysql/plugin.h>
#include <mysql/service_thd_wait.h>
32

33
static int count_relay_log_space(Relay_log_info* rli);
34

35 36 37 38 39
/**
   Current replication state (hash of last GTID executed, per replication
   domain).
*/
rpl_slave_state rpl_global_gtid_slave_state;
40 41
/* Object used for MASTER_GTID_WAIT(). */
gtid_waiting rpl_global_gtid_waiting;
42 43


44 45 46 47 48
// Defined in slave.cc
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
			  const char *default_val);

49
Relay_log_info::Relay_log_info(bool is_slave_recovery)
50
  :Slave_reporting_capability("SQL"),
51
   no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
52
   info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
53
   sync_counter(0), is_relay_log_recovery(is_slave_recovery),
54
   save_temporary_tables(0), mi(0),
55
   inuse_relaylog_list(0), last_inuse_relaylog(0),
56
   cur_log_old_open_count(0), group_relay_log_pos(0), 
57
   event_relay_log_pos(0),
58
#if HAVE_valgrind
59 60
   is_fake(FALSE),
#endif
61
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
62
   last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
Michael Widenius's avatar
Michael Widenius committed
63
   abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
64 65
   inited(0), abort_slave(0), stop_for_until(0),
   slave_running(0), until_condition(UNTIL_NONE),
66
   until_log_pos(0), retried_trans(0), executed_entries(0),
67
   m_flags(0)
68
{
69
  DBUG_ENTER("Relay_log_info::Relay_log_info");
70

71
  relay_log.is_relay_log= TRUE;
72 73 74 75
#ifdef HAVE_PSI_INTERFACE
  relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
                         key_RELAYLOG_update_cond,
                         key_file_relaylog,
76 77
                         key_file_relaylog_index,
                         key_RELAYLOG_COND_queue_busy);
78 79
#endif

80 81
  group_relay_log_name[0]= event_relay_log_name[0]=
    group_master_log_name[0]= 0;
82
  until_log_name[0]= ign_master_log_name_end[0]= 0;
83
  max_relay_log_size= global_system_variables.max_relay_log_size;
84 85
  bzero((char*) &info_file, sizeof(info_file));
  bzero((char*) &cache_buf, sizeof(cache_buf));
Marc Alff's avatar
Marc Alff committed
86 87 88 89 90 91 92 93 94
  mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
  mysql_mutex_init(key_relay_log_info_data_lock,
                   &data_lock, MY_MUTEX_INIT_FAST);
  mysql_mutex_init(key_relay_log_info_log_space_lock,
                   &log_space_lock, MY_MUTEX_INIT_FAST);
  mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
  mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
  mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
  mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
95
  my_atomic_rwlock_init(&inuse_relaylog_atomic_lock);
96 97 98 99 100
  relay_log.init_pthread_objects();
  DBUG_VOID_RETURN;
}


101
Relay_log_info::~Relay_log_info()
102
{
103
  inuse_relaylog *cur;
104
  DBUG_ENTER("Relay_log_info::~Relay_log_info");
105

106 107 108 109 110 111 112 113
  cur= inuse_relaylog_list;
  while (cur)
  {
    DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
    inuse_relaylog *next= cur->next;
    my_free(cur);
    cur= next;
  }
Marc Alff's avatar
Marc Alff committed
114 115 116 117 118 119 120
  mysql_mutex_destroy(&run_lock);
  mysql_mutex_destroy(&data_lock);
  mysql_mutex_destroy(&log_space_lock);
  mysql_cond_destroy(&data_cond);
  mysql_cond_destroy(&start_cond);
  mysql_cond_destroy(&stop_cond);
  mysql_cond_destroy(&log_space_cond);
121
  my_atomic_rwlock_destroy(&inuse_relaylog_atomic_lock);
122 123 124 125 126
  relay_log.cleanup();
  DBUG_VOID_RETURN;
}


127
int init_relay_log_info(Relay_log_info* rli,
128 129 130 131 132 133 134 135 136 137 138 139
			const char* info_fname)
{
  char fname[FN_REFLEN+128];
  int info_fd;
  const char* msg = 0;
  int error = 0;
  DBUG_ENTER("init_relay_log_info");
  DBUG_ASSERT(!rli->no_storage);         // Don't init if there is no storage

  if (rli->inited)                       // Set if this function called
    DBUG_RETURN(0);
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
Marc Alff's avatar
Marc Alff committed
140
  mysql_mutex_lock(&rli->data_lock);
141 142 143 144 145 146 147
  info_fd = rli->info_fd;
  rli->cur_log_fd = -1;
  rli->slave_skip_counter=0;
  rli->abort_pos_wait=0;
  rli->log_space_limit= relay_log_space_limit;
  rli->log_space_total= 0;

148
  char pattern[FN_REFLEN];
149 150
  (void) my_realpath(pattern, slave_load_tmpdir, 0);
  if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
151 152
            MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
  {
Marc Alff's avatar
Marc Alff committed
153
    mysql_mutex_unlock(&rli->data_lock);
154 155 156 157 158
    sql_print_error("Unable to use slave's temporary directory %s",
                    slave_load_tmpdir);
    DBUG_RETURN(1);
  }
  unpack_filename(rli->slave_patternload_file, pattern);
159 160
  rli->slave_patternload_file_size= strlen(rli->slave_patternload_file);

161 162 163
  /*
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
    Note that the I/O thread flushes it to disk after writing every
164
    event, in flush_master_info(mi, 1, ?).
165 166 167
  */

  {
168 169 170 171 172
    /* Reports an error and returns, if the --relay-log's path 
       is a directory.*/
    if (opt_relay_logname && 
        opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
    {
Marc Alff's avatar
Marc Alff committed
173
      mysql_mutex_unlock(&rli->data_lock);
174 175 176 177 178 179 180 181 182 183 184
      sql_print_error("Path '%s' is a directory name, please specify \
a file name for --relay-log option", opt_relay_logname);
      DBUG_RETURN(1);
    }

    /* Reports an error and returns, if the --relay-log-index's path 
       is a directory.*/
    if (opt_relaylog_index_name && 
        opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1] 
        == FN_LIBCHAR)
    {
Marc Alff's avatar
Marc Alff committed
185
      mysql_mutex_unlock(&rli->data_lock);
186 187 188 189 190
      sql_print_error("Path '%s' is a directory name, please specify \
a file name for --relay-log-index option", opt_relaylog_index_name);
      DBUG_RETURN(1);
    }

191 192 193 194 195 196
    char buf[FN_REFLEN];
    const char *ln;
    static bool name_warning_sent= 0;
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
                                     1, buf);
    /* We send the warning only at startup, not after every RESET SLAVE */
197 198
    if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent &&
        !opt_bootstrap)
199 200 201 202 203 204 205 206 207 208 209 210
    {
      /*
        User didn't give us info to name the relay log index file.
        Picking `hostname`-relay-bin.index like we do, causes replication to
        fail if this slave's hostname is changed later. So, we would like to
        instead require a name. But as we don't want to break many existing
        setups, we only give warning, not error.
      */
      sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
                        " so replication "
                        "may break when this MySQL server acts as a "
                        "slave and has his hostname changed!! Please "
211
                        "use '--log-basename=#' or '--relay-log=%s' to avoid "
212
                        "this problem.", ln);
213 214
      name_warning_sent= 1;
    }
215

216 217 218 219 220
    /* For multimaster, add connection name to relay log filenames */
    Master_info* mi= rli->mi;
    char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
    char *buf_relaylog_index_name= opt_relaylog_index_name;

221 222 223
    create_logfile_name_with_suffix(buf_relay_logname,
                                    sizeof(buf_relay_logname),
                                    ln, 1, &mi->cmp_connection_name);
224 225 226 227 228
    ln= buf_relay_logname;

    if (opt_relaylog_index_name)
    {
      buf_relaylog_index_name= buf_relaylog_index_name_buff; 
229
      create_logfile_name_with_suffix(buf_relaylog_index_name_buff,
230 231 232
                                      sizeof(buf_relaylog_index_name_buff),
                                      opt_relaylog_index_name, 0,
                                      &mi->cmp_connection_name);
233 234
    }

235 236 237 238
    /*
      note, that if open() fails, we'll still have index file open
      but a destructor will take care of that
    */
239
    if (rli->relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
240
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND,
241
                            mi->rli.max_relay_log_size, 1, TRUE))
242
    {
Marc Alff's avatar
Marc Alff committed
243
      mysql_mutex_unlock(&rli->data_lock);
244
      sql_print_error("Failed when trying to open logs for '%s' in init_relay_log_info(). Error: %M", ln, my_errno);
245 246 247 248 249 250 251 252 253 254 255 256
      DBUG_RETURN(1);
    }
  }

  /* if file does not exist */
  if (access(fname,F_OK))
  {
    /*
      If someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
    if (info_fd >= 0)
Marc Alff's avatar
Marc Alff committed
257 258 259
      mysql_file_close(info_fd, MYF(MY_WME));
    if ((info_fd= mysql_file_open(key_file_relay_log_info,
                                  fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
260 261 262
    {
      sql_print_error("Failed to create a new relay log info file (\
file '%s', errno %d)", fname, my_errno);
263
      msg= current_thd->get_stmt_da()->message();
264 265 266 267 268 269 270
      goto err;
    }
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
                      MYF(MY_WME)))
    {
      sql_print_error("Failed to create a cache on relay log info file '%s'",
                      fname);
271
      msg= current_thd->get_stmt_da()->message();
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
      goto err;
    }

    /* Init relay log with first entry in the relay index file */
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
                           &msg, 0))
    {
      sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
      goto err;
    }
    rli->group_master_log_name[0]= 0;
    rli->group_master_log_pos= 0;
    rli->info_fd= info_fd;
  }
  else // file exists
  {
    if (info_fd >= 0)
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
    else
    {
      int error=0;
Marc Alff's avatar
Marc Alff committed
293 294
      if ((info_fd= mysql_file_open(key_file_relay_log_info,
                                    fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
      {
        sql_print_error("\
Failed to open the existing relay log info file '%s' (errno %d)",
                        fname, my_errno);
        error= 1;
      }
      else if (init_io_cache(&rli->info_file, info_fd,
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
      {
        sql_print_error("Failed to create a cache on relay log info file '%s'",
                        fname);
        error= 1;
      }
      if (error)
      {
        if (info_fd >= 0)
Marc Alff's avatar
Marc Alff committed
311
          mysql_file_close(info_fd, MYF(0));
312 313
        rli->info_fd= -1;
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
Marc Alff's avatar
Marc Alff committed
314
        mysql_mutex_unlock(&rli->data_lock);
315 316 317 318 319
        DBUG_RETURN(1);
      }
    }

    rli->info_fd = info_fd;
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
    int relay_log_pos, master_log_pos, lines;
    char *first_non_digit;
    /*
      In MySQL 5.6, there is a MASTER_DELAY option to CHANGE MASTER. This is
      not yet merged into MariaDB (as of 10.0.13). However, we detect the
      presense of the new option in relay-log.info, as a placeholder for
      possible later merge of the feature, and to maintain file format
      compatibility with MySQL 5.6+.
    */
    int dummy_sql_delay;

    /*
      Starting from MySQL 5.6.x, relay-log.info has a new format.
      Now, its first line contains the number of lines in the file.
      By reading this number we can determine which version our master.info
      comes from. We can't simply count the lines in the file, since
      versions before 5.6.x could generate files with more lines than
      needed. If first line doesn't contain a number, or if it
      contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
      then the file is treated like a file from pre-5.6.x version.
      There is no ambiguity when reading an old master.info: before
      5.6.x, the first line contained the binlog's name, which is
      either empty or has an extension (contains a '.'), so can't be
      confused with an integer.

      So we're just reading first line and trying to figure which
      version is this.
    */

    /*
      The first row is temporarily stored in mi->master_log_name, if
      it is line count and not binlog name (new format) it will be
      overwritten by the second row later.
    */
354 355
    if (init_strvar_from_file(rli->group_relay_log_name,
                              sizeof(rli->group_relay_log_name),
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
                              &rli->info_file, ""))
    {
      msg="Error reading slave log configuration";
      goto err;
    }

    lines= strtoul(rli->group_relay_log_name, &first_non_digit, 10);

    if (rli->group_relay_log_name[0] != '\0' &&
        *first_non_digit == '\0' &&
        lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
    {
      DBUG_PRINT("info", ("relay_log_info file is in new format."));
      /* Seems to be new format => read relay log name from next line */
      if (init_strvar_from_file(rli->group_relay_log_name,
                                sizeof(rli->group_relay_log_name),
                                &rli->info_file, ""))
      {
        msg="Error reading slave log configuration";
        goto err;
      }
    }
    else
      DBUG_PRINT("info", ("relay_log_info file is in old format."));

    if (init_intvar_from_file(&relay_log_pos,
                              &rli->info_file, BIN_LOG_HEADER_SIZE) ||
        init_strvar_from_file(rli->group_master_log_name,
                              sizeof(rli->group_master_log_name),
385
                              &rli->info_file, "") ||
386 387 388
        init_intvar_from_file(&master_log_pos, &rli->info_file, 0) ||
        (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY &&
         init_intvar_from_file(&dummy_sql_delay, &rli->info_file, 0)))
389 390 391 392
    {
      msg="Error reading slave log configuration";
      goto err;
    }
393

394
    strmake_buf(rli->event_relay_log_name,rli->group_relay_log_name);
395 396 397
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
    rli->group_master_log_pos= master_log_pos;

398 399 400
    if (rli->is_relay_log_recovery && init_recovery(rli->mi, &msg))
      goto err;

401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
    if (init_relay_log_pos(rli,
                           rli->group_relay_log_name,
                           rli->group_relay_log_pos,
                           0 /* no data lock*/,
                           &msg, 0))
    {
      char llbuf[22];
      sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
                      rli->group_relay_log_name,
                      llstr(rli->group_relay_log_pos, llbuf));
      goto err;
    }
  }

#ifndef DBUG_OFF
  {
    char llbuf1[22], llbuf2[22];
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
                        llstr(my_b_tell(rli->cur_log),llbuf1),
                        llstr(rli->event_relay_log_pos,llbuf2)));
    DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
    DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
  }
#endif

  /*
    Now change the cache from READ to WRITE - must do this
    before flush_relay_log_info
  */
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
  if ((error= flush_relay_log_info(rli)))
432 433 434 435
  {
    msg= "Failed to flush relay log info file";
    goto err;
  }
436 437 438 439 440 441
  if (count_relay_log_space(rli))
  {
    msg="Error counting relay log space";
    goto err;
  }
  rli->inited= 1;
Marc Alff's avatar
Marc Alff committed
442
  mysql_mutex_unlock(&rli->data_lock);
443 444 445
  DBUG_RETURN(error);

err:
446
  sql_print_error("%s", msg);
447 448
  end_io_cache(&rli->info_file);
  if (info_fd >= 0)
Marc Alff's avatar
Marc Alff committed
449
    mysql_file_close(info_fd, MYF(0));
450 451
  rli->info_fd= -1;
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
Marc Alff's avatar
Marc Alff committed
452
  mysql_mutex_unlock(&rli->data_lock);
453 454 455 456
  DBUG_RETURN(1);
}


457
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
458 459 460
{
  MY_STAT s;
  DBUG_ENTER("add_relay_log");
461
  if (!mysql_file_stat(key_file_relaylog,
Marc Alff's avatar
Marc Alff committed
462
                       linfo->log_file_name, &s, MYF(0)))
463 464 465 466 467 468 469 470 471 472 473 474 475 476
  {
    sql_print_error("log %s listed in the index, but failed to stat",
                    linfo->log_file_name);
    DBUG_RETURN(1);
  }
  rli->log_space_total += s.st_size;
#ifndef DBUG_OFF
  char buf[22];
  DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
#endif
  DBUG_RETURN(0);
}


477
static int count_relay_log_space(Relay_log_info* rli)
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
{
  LOG_INFO linfo;
  DBUG_ENTER("count_relay_log_space");
  rli->log_space_total= 0;
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
  {
    sql_print_error("Could not find first log while counting relay log space");
    DBUG_RETURN(1);
  }
  do
  {
    if (add_relay_log(rli,&linfo))
      DBUG_RETURN(1);
  } while (!rli->relay_log.find_next_log(&linfo, 1));
  /*
     As we have counted everything, including what may have written in a
     preceding write, we must reset bytes_written, or we may count some space
     twice.
  */
  rli->relay_log.reset_bytes_written();
  DBUG_RETURN(0);
}


/*
503
   Reset UNTIL condition for Relay_log_info
504 505 506

   SYNOPSYS
    clear_until_condition()
507
      rli - Relay_log_info structure where UNTIL condition should be reset
508 509
 */

510
void Relay_log_info::clear_until_condition()
511 512 513
{
  DBUG_ENTER("clear_until_condition");

514
  until_condition= Relay_log_info::UNTIL_NONE;
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
  until_log_name[0]= 0;
  until_log_pos= 0;
  DBUG_VOID_RETURN;
}


/*
  Open the given relay log

  SYNOPSIS
    init_relay_log_pos()
    rli                 Relay information (will be initialized)
    log                 Name of relay log file to read from. NULL = First log
    pos                 Position in relay log file
    need_data_lock      Set to 1 if this functions should do mutex locks
    errmsg              Store pointer to error message here
    look_for_description_event
                        1 if we should look for such an event. We only need
                        this when the SQL thread starts and opens an existing
                        relay log and has to execute it (possibly from an
                        offset >4); then we need to read the first event of
                        the relay log to be able to parse the events we have
                        to execute.

  DESCRIPTION
  - Close old open relay log files.
  - If we are using the same relay log as the running IO-thread, then set
    rli->cur_log to point to the same IO_CACHE entry.
  - If not, open the 'log' binary file.

  TODO
    - check proper initialization of group_master_log_name/group_master_log_pos

  RETURN VALUES
    0   ok
    1   error.  errmsg is set to point to the error message
*/

553
int init_relay_log_pos(Relay_log_info* rli,const char* log,
554 555 556 557 558
                       ulonglong pos, bool need_data_lock,
                       const char** errmsg,
                       bool look_for_description_event)
{
  DBUG_ENTER("init_relay_log_pos");
unknown's avatar
unknown committed
559
  DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
560 561

  *errmsg=0;
Marc Alff's avatar
Marc Alff committed
562
  mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
563 564

  if (need_data_lock)
Marc Alff's avatar
Marc Alff committed
565
    mysql_mutex_lock(&rli->data_lock);
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584

  /*
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
    is, too, and init_slave() too; these 2 functions allocate a description
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
    thread as that thread is not started by these functions. So we have to free
    the description_event here, in case, so that there is no memory leak in
    running, say, CHANGE MASTER.
  */
  delete rli->relay_log.description_event_for_exec;
  /*
    By default the relay log is in binlog format 3 (4.0).
    Even if format is 4, this will work enough to read the first event
    (Format_desc) (remember that format 4 is just lenghtened compared to format
    3; format 3 is a prefix of format 4).
  */
  rli->relay_log.description_event_for_exec= new
    Format_description_log_event(3);

Marc Alff's avatar
Marc Alff committed
585
  mysql_mutex_lock(log_lock);
586 587 588 589 590

  /* Close log file and free buffers if it's already open */
  if (rli->cur_log_fd >= 0)
  {
    end_io_cache(&rli->cache_buf);
Marc Alff's avatar
Marc Alff committed
591
    mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
592 593 594 595
    rli->cur_log_fd = -1;
  }

  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
Michael Widenius's avatar
Michael Widenius committed
596 597
  rli->clear_flag(Relay_log_info::IN_STMT);
  rli->clear_flag(Relay_log_info::IN_TRANSACTION);
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613

  /*
    Test to see if the previous run was with the skip of purging
    If yes, we do not purge when we restart
  */
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
  {
    *errmsg="Could not find first log during relay log initialization";
    goto err;
  }

  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
  {
    *errmsg="Could not find target log during relay log initialization";
    goto err;
  }
614 615
  strmake_buf(rli->group_relay_log_name,rli->linfo.log_file_name);
  strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name);
616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
  {
    /*
      The IO thread is using this log file.
      In this case, we will use the same IO_CACHE pointer to
      read data as the IO thread is using to write data.
    */
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
    if (check_binlog_magic(rli->cur_log,errmsg))
      goto err;
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
  }
  else
  {
    /*
      Open the relay log and set rli->cur_log to point at this one
    */
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
                                     rli->linfo.log_file_name,errmsg)) < 0)
      goto err;
    rli->cur_log = &rli->cache_buf;
  }
  /*
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
    sure.
  */
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
  {
    Log_event* ev;
    while (look_for_description_event)
    {
      /*
        Read the possible Format_description_log_event; if position
        was 4, no need, it will be read naturally.
      */
      DBUG_PRINT("info",("looking for a Format_description_log_event"));

      if (my_b_tell(rli->cur_log) >= pos)
        break;

      /*
        Because of we have rli->data_lock and log_lock, we can safely read an
        event
      */
660 661 662
      if (!(ev= Log_event::read_log_event(rli->cur_log, 0,
                                          rli->relay_log.description_event_for_exec,
                                          opt_slave_sql_verify_checksum)))
663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
      {
        DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
                           rli->cur_log->error));
        if (rli->cur_log->error) /* not EOF */
        {
          *errmsg= "I/O error reading event at position 4";
          goto err;
        }
        break;
      }
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
      {
        DBUG_PRINT("info",("found Format_description_log_event"));
        delete rli->relay_log.description_event_for_exec;
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
        /*
          As ev was returned by read_log_event, it has passed is_valid(), so
          my_malloc() in ctor worked, no need to check again.
        */
        /*
          Ok, we found a Format_description event. But it is not sure that this
          describes the whole relay log; indeed, one can have this sequence
          (starting from position 4):
          Format_desc (of slave)
          Rotate (of master)
          Format_desc (of master)
          So the Format_desc which really describes the rest of the relay log
          is the 3rd event (it can't be further than that, because we rotate
          the relay log when we queue a Rotate event from the master).
          But what describes the Rotate is the first Format_desc.
          So what we do is:
          go on searching for Format_description events, until you exceed the
          position (argument 'pos') or until you find another event than Rotate
          or Format_desc.
        */
      }
      else
      {
        DBUG_PRINT("info",("found event of another type=%d",
                           ev->get_type_code()));
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
        delete ev;
      }
    }
    my_b_seek(rli->cur_log,(off_t)pos);
#ifndef DBUG_OFF
  {
    char llbuf1[22], llbuf2[22];
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
                        llstr(my_b_tell(rli->cur_log),llbuf1),
                        llstr(rli->event_relay_log_pos,llbuf2)));
  }
#endif

  }

err:
  /*
    If we don't purge, we can't honour relay_log_space_limit ;
    silently discard it
  */
  if (!relay_log_purge)
    rli->log_space_limit= 0;
Marc Alff's avatar
Marc Alff committed
726
  mysql_cond_broadcast(&rli->data_cond);
727

Marc Alff's avatar
Marc Alff committed
728
  mysql_mutex_unlock(log_lock);
729 730

  if (need_data_lock)
Marc Alff's avatar
Marc Alff committed
731
    mysql_mutex_unlock(&rli->data_lock);
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
    *errmsg= "Invalid Format_description log event; could be out of memory";

  DBUG_RETURN ((*errmsg) ? 1 : 0);
}


/*
  Waits until the SQL thread reaches (has executed up to) the
  log/position or timed out.

  SYNOPSIS
    wait_for_pos()
    thd             client thread that sent SELECT MASTER_POS_WAIT
    log_name        log name to wait for
    log_pos         position to wait for
    timeout         timeout in seconds before giving up waiting

  NOTES
    timeout is longlong whereas it should be ulong ; but this is
    to catch if the user submitted a negative timeout.

  RETURN VALUES
    -2          improper arguments (log_pos<0)
                or slave not running, or master info changed
                during the function's execution,
                or client thread killed. -2 is translated to NULL by caller
    -1          timed out
    >=0         number of log events the function had to wait
                before reaching the desired log/position
 */

764
int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
765 766 767 768 769 770 771
                                    longlong log_pos,
                                    longlong timeout)
{
  int event_count = 0;
  ulong init_abort_pos_wait;
  int error=0;
  struct timespec abstime; // for timeout checking
Sergei Golubchik's avatar
Sergei Golubchik committed
772
  PSI_stage_info old_stage;
773
  DBUG_ENTER("Relay_log_info::wait_for_pos");
774 775

  if (!inited)
unknown's avatar
unknown committed
776
    DBUG_RETURN(-2);
777 778 779 780 781

  DBUG_PRINT("enter",("log_name: '%s'  log_pos: %lu  timeout: %lu",
                      log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));

  set_timespec(abstime,timeout);
Marc Alff's avatar
Marc Alff committed
782
  mysql_mutex_lock(&data_lock);
Sergei Golubchik's avatar
Sergei Golubchik committed
783 784 785
  thd->ENTER_COND(&data_cond, &data_lock,
                  &stage_waiting_for_the_slave_thread_to_advance_position,
                  &old_stage);
786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
  /*
     This function will abort when it notices that some CHANGE MASTER or
     RESET MASTER has changed the master info.
     To catch this, these commands modify abort_pos_wait ; We just monitor
     abort_pos_wait and see if it has changed.
     Why do we have this mechanism instead of simply monitoring slave_running
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
     the SQL thread be stopped?
     This is becasue if someones does:
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
     the change may happen very quickly and we may not notice that
     slave_running briefly switches between 1/0/1.
  */
  init_abort_pos_wait= abort_pos_wait;

  /*
    We'll need to
    handle all possible log names comparisons (e.g. 999 vs 1000).
    We use ulong for string->number conversion ; this is no
    stronger limitation than in find_uniq_filename in sql/log.cc
  */
  ulong log_name_extension;
  char log_name_tmp[FN_REFLEN]; //make a char[] from String

810
  strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1));
811 812 813 814 815 816 817 818 819

  char *p= fn_ext(log_name_tmp);
  char *p_end;
  if (!*p || log_pos<0)
  {
    error= -2; //means improper arguments
    goto err;
  }
  // Convert 0-3 to 4
820
  log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE);
821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894
  /* p points to '.' */
  log_name_extension= strtoul(++p, &p_end, 10);
  /*
    p_end points to the first invalid character.
    If it equals to p, no digits were found, error.
    If it contains '\0' it means conversion went ok.
  */
  if (p_end==p || *p_end)
  {
    error= -2;
    goto err;
  }

  /* The "compare and wait" main loop */
  while (!thd->killed &&
         init_abort_pos_wait == abort_pos_wait &&
         slave_running)
  {
    bool pos_reached;
    int cmp_result= 0;

    DBUG_PRINT("info",
               ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
                init_abort_pos_wait, abort_pos_wait));
    DBUG_PRINT("info",("group_master_log_name: '%s'  pos: %lu",
                       group_master_log_name, (ulong) group_master_log_pos));

    /*
      group_master_log_name can be "", if we are just after a fresh
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
      (before we have executed one Rotate event from the master) or
      (rare) if the user is doing a weird slave setup (see next
      paragraph).  If group_master_log_name is "", we assume we don't
      have enough info to do the comparison yet, so we just wait until
      more data. In this case master_log_pos is always 0 except if
      somebody (wrongly) sets this slave to be a slave of itself
      without using --replicate-same-server-id (an unsupported
      configuration which does nothing), then group_master_log_pos
      will grow and group_master_log_name will stay "".
    */
    if (*group_master_log_name)
    {
      char *basename= (group_master_log_name +
                       dirname_length(group_master_log_name));
      /*
        First compare the parts before the extension.
        Find the dot in the master's log basename,
        and protect against user's input error :
        if the names do not match up to '.' included, return error
      */
      char *q= (char*)(fn_ext(basename)+1);
      if (strncmp(basename, log_name_tmp, (int)(q-basename)))
      {
        error= -2;
        break;
      }
      // Now compare extensions.
      char *q_end;
      ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
      if (group_master_log_name_extension < log_name_extension)
        cmp_result= -1 ;
      else
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;

      pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
                    cmp_result > 0);
      if (pos_reached || thd->killed)
        break;
    }

    //wait for master update, with optional timeout.

    DBUG_PRINT("info",("Waiting for master update"));
    /*
Marc Alff's avatar
Marc Alff committed
895
      We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
896 897
      will wake us up.
    */
898
    thd_wait_begin(thd, THD_WAIT_BINLOG);
899 900 901
    if (timeout > 0)
    {
      /*
Marc Alff's avatar
Marc Alff committed
902
        Note that mysql_cond_timedwait checks for the timeout
903 904 905 906 907
        before for the condition ; i.e. it returns ETIMEDOUT
        if the system time equals or exceeds the time specified by abstime
        before the condition variable is signaled or broadcast, _or_ if
        the absolute time specified by abstime has already passed at the time
        of the call.
Marc Alff's avatar
Marc Alff committed
908
        For that reason, mysql_cond_timedwait will do the "timeoutting" job
909 910 911
        even if its condition is always immediately signaled (case of a loaded
        master).
      */
Marc Alff's avatar
Marc Alff committed
912
      error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
913 914
    }
    else
Marc Alff's avatar
Marc Alff committed
915
      mysql_cond_wait(&data_cond, &data_lock);
916
    thd_wait_end(thd);
917 918 919 920 921 922 923 924 925 926 927 928
    DBUG_PRINT("info",("Got signal of master update or timed out"));
    if (error == ETIMEDOUT || error == ETIME)
    {
      error= -1;
      break;
    }
    error=0;
    event_count++;
    DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
  }

err:
Sergei Golubchik's avatar
Sergei Golubchik committed
929
  thd->EXIT_COND(&old_stage);
930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945
  DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
improper_arguments: %d  timed_out: %d",
                     thd->killed_errno(),
                     (int) (init_abort_pos_wait != abort_pos_wait),
                     (int) slave_running,
                     (int) (error == -2),
                     (int) (error == -1)));
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
      !slave_running)
  {
    error= -2;
  }
  DBUG_RETURN( error ? error : event_count );
}


946
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
947 948
                                             rpl_group_info *rgi,
                                             bool skip_lock)
949
{
950
  DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
951 952

  if (!skip_lock)
Marc Alff's avatar
Marc Alff committed
953
    mysql_mutex_lock(&data_lock);
954
  rgi->inc_event_relay_log_pos();
unknown's avatar
unknown committed
955 956 957
  DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
                      (long) log_pos, (long) group_master_log_pos));
  if (rgi->is_parallel_exec)
958 959 960 961 962
  {
    /* In case of parallel replication, do not update the position backwards. */
    int cmp= strcmp(group_relay_log_name, event_relay_log_name);
    if (cmp < 0)
    {
963
      group_relay_log_pos= rgi->future_event_relay_log_pos;
964 965
      strmake_buf(group_relay_log_name, event_relay_log_name);
      notify_group_relay_log_name_update();
966 967
    } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
      group_relay_log_pos= rgi->future_event_relay_log_pos;
968

969 970 971 972
    /*
      In the parallel case we need to update the master_log_name here, rather
      than in Rotate_log_event::do_update_pos().
    */
unknown's avatar
unknown committed
973 974 975 976 977 978 979
    cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name);
    if (cmp <= 0)
    {
      if (cmp < 0)
      {
        strcpy(group_master_log_name, rgi->future_event_master_log_name);
        notify_group_master_log_name_update();
980
        group_master_log_pos= log_pos;
unknown's avatar
unknown committed
981
      }
982
      else if (group_master_log_pos < log_pos)
unknown's avatar
unknown committed
983 984
        group_master_log_pos= log_pos;
    }
985 986 987 988 989 990 991
  }
  else
  {
    /* Non-parallel case. */
    group_relay_log_pos= event_relay_log_pos;
    strmake_buf(group_relay_log_name, event_relay_log_name);
    notify_group_relay_log_name_update();
unknown's avatar
unknown committed
992 993
    if (log_pos) // 3.23 binlogs don't have log_posx
      group_master_log_pos= log_pos;
994
  }
995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025

  /*
    If the slave does not support transactions and replicates a transaction,
    users should not trust group_master_log_pos (which they can display with
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
    group_master_log_pos the slave relies on log_pos stored in the master's
    binlog, but if we are in a master's transaction these positions are always
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
    not advance as it should on the non-transactional slave (it advances by
    big leaps, whereas it should advance by small leaps).
  */
  /*
    In 4.x we used the event's len to compute the positions here. This is
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
    then the event's len is not what is was in the master's binlog, so this
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
    have the original offset of the end of the event the relay log. This is
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
    end_log_pos instead of begin_log_pos.
    If we had not done this fix here, the problem would also have appeared
    when the slave and master are 5.0 but with different event length (for
    example the slave is more recent than the master and features the event
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
    value which would lead to badly broken replication.
    Even the relay_log_pos will be corrupted in this case, because the len is
    the relay log is not "val".
    With the end_log_pos solution, we avoid computations involving lengthes.
  */
Marc Alff's avatar
Marc Alff committed
1026
  mysql_cond_broadcast(&data_cond);
1027
  if (!skip_lock)
Marc Alff's avatar
Marc Alff committed
1028
    mysql_mutex_unlock(&data_lock);
1029 1030 1031 1032
  DBUG_VOID_RETURN;
}


1033
void Relay_log_info::close_temporary_tables()
1034 1035
{
  TABLE *table,*next;
1036
  DBUG_ENTER("Relay_log_info::close_temporary_tables");
1037 1038 1039 1040

  for (table=save_temporary_tables ; table ; table=next)
  {
    next=table->next;
Michael Widenius's avatar
Michael Widenius committed
1041 1042 1043

    /* Reset in_use as the table may have been created by another thd */
    table->in_use=0;
1044 1045 1046 1047
    /*
      Don't ask for disk deletion. For now, anyway they will be deleted when
      slave restarts, but it is a better intention to not delete them.
    */
unknown's avatar
unknown committed
1048
    DBUG_PRINT("info", ("table: 0x%lx", (long) table));
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062
    close_temporary(table, 1, 0);
  }
  save_temporary_tables= 0;
  slave_open_temp_tables= 0;
  DBUG_VOID_RETURN;
}

/*
  purge_relay_logs()

  NOTES
    Assumes to have a run lock on rli and that no slave thread are running.
*/

1063
int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099
                     const char** errmsg)
{
  int error=0;
  DBUG_ENTER("purge_relay_logs");

  /*
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
    Indeed, rli->inited==0 does not imply that they already are empty.
    It could be that slave's info initialization partly succeeded :
    for example if relay-log.info existed but *relay-bin*.*
    have been manually removed, init_relay_log_info reads the old
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
    checks for the existence of the relay log, this fails and
    init_relay_log_info leaves rli->inited to 0.
    In that pathological case, rli->master_log_pos* will be properly reinited
    at the next START SLAVE (as RESET SLAVE or CHANGE
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
    or replace them with correct files), however if the user does SHOW SLAVE
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
    to display fine in any case.
  */

  rli->group_master_log_name[0]= 0;
  rli->group_master_log_pos= 0;

  if (!rli->inited)
  {
    DBUG_PRINT("info", ("rli->inited == 0"));
    DBUG_RETURN(0);
  }

  DBUG_ASSERT(rli->slave_running == 0);
  DBUG_ASSERT(rli->mi->slave_running == 0);

  rli->slave_skip_counter=0;
Marc Alff's avatar
Marc Alff committed
1100
  mysql_mutex_lock(&rli->data_lock);
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110

  /*
    we close the relay log fd possibly left open by the slave SQL thread,
    to be able to delete it; the relay log fd possibly left open by the slave
    I/O thread will be closed naturally in reset_logs() by the
    close(LOG_CLOSE_TO_BE_OPENED) call
  */
  if (rli->cur_log_fd >= 0)
  {
    end_io_cache(&rli->cache_buf);
Marc Alff's avatar
Marc Alff committed
1111
    mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
1112 1113 1114
    rli->cur_log_fd= -1;
  }

unknown's avatar
unknown committed
1115
  if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0))
1116 1117 1118 1119 1120 1121
  {
    *errmsg = "Failed during log reset";
    error=1;
    goto err;
  }
  if (!just_reset)
1122 1123
  {
    /* Save name of used relay log file */
Sergei Golubchik's avatar
Sergei Golubchik committed
1124 1125
    strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
    strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
1126 1127 1128 1129 1130 1131 1132 1133 1134
    rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
    rli->log_space_total= 0;

    if (count_relay_log_space(rli))
    {
      *errmsg= "Error counting relay log space";
      error=1;
      goto err;
    }
1135 1136 1137
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
                              rli->group_relay_log_pos,
                              0 /* do not need data lock */, errmsg, 0);
1138 1139 1140 1141 1142 1143
  }
  else
  {
    /* Ensure relay log names are not used */
    rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0;
  }
1144 1145 1146 1147 1148 1149

err:
#ifndef DBUG_OFF
  char buf[22];
#endif
  DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
Marc Alff's avatar
Marc Alff committed
1150
  mysql_mutex_unlock(&rli->data_lock);
1151 1152 1153 1154 1155 1156 1157
  DBUG_RETURN(error);
}


/*
     Check if condition stated in UNTIL clause of START SLAVE is reached.
   SYNOPSYS
1158
     Relay_log_info::is_until_satisfied()
1159 1160 1161 1162 1163
     master_beg_pos    position of the beginning of to be executed event
                       (not log_pos member of the event that points to the
                        beginning of the following event)


1164 1165 1166 1167
   DESCRIPTION
     Checks if UNTIL condition is reached. Uses caching result of last
     comparison of current log file name and target log file name. So cached
     value should be invalidated if current log file name changes
1168
     (see Relay_log_info::notify_... functions).
1169 1170 1171 1172 1173

     This caching is needed to avoid of expensive string comparisons and
     strtol() conversions needed for log names comparison. We don't need to
     compare them each time this function is called, we only need to do this
     when current log name changes. If we have UNTIL_MASTER_POS condition we
1174
     need to do this only after Rotate_log_event::do_apply_event() (which is
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
     condition then we should invalidate cached comarison value after
     inc_group_relay_log_pos() which called for each group of events (so we
     have some benefit if we have something like queries that use
     autoincrement or if we have transactions).

     Should be called ONLY if until_condition != UNTIL_NONE !
   RETURN VALUE
     true - condition met or error happened (condition seems to have
            bad log file name)
     false - condition not met
*/

1188
bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
1189 1190 1191
{
  const char *log_name;
  ulonglong log_pos;
1192
  DBUG_ENTER("Relay_log_info::is_until_satisfied");
1193

unknown's avatar
unknown committed
1194 1195
  DBUG_ASSERT(until_condition == UNTIL_MASTER_POS ||
              until_condition == UNTIL_RELAY_POS);
1196 1197 1198

  if (until_condition == UNTIL_MASTER_POS)
  {
1199 1200
    if (ev && ev->server_id == (uint32) global_system_variables.server_id &&
        !replicate_same_server_id)
1201
      DBUG_RETURN(FALSE);
1202
    log_name= group_master_log_name;
Michael Widenius's avatar
Michael Widenius committed
1203 1204 1205
    log_pos= ((!ev)? group_master_log_pos :
              (get_flag(IN_TRANSACTION) || !ev->log_pos) ?
              group_master_log_pos : ev->log_pos - ev->data_written);
1206 1207 1208 1209 1210 1211 1212
  }
  else
  { /* until_condition == UNTIL_RELAY_POS */
    log_name= group_relay_log_name;
    log_pos= group_relay_log_pos;
  }

1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228
#ifndef DBUG_OFF
  {
    char buf[32];
    DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
                        group_master_log_name, llstr(group_master_log_pos, buf)));
    DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
                        group_relay_log_name, llstr(group_relay_log_pos, buf)));
    DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
                        until_condition == UNTIL_MASTER_POS ? "master" : "relay",
                        log_name, llstr(log_pos, buf)));
    DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
                        until_condition == UNTIL_MASTER_POS ? "master" : "relay",
                        until_log_name, llstr(until_log_pos, buf)));
  }
#endif

1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
  {
    /*
      We have no cached comparison results so we should compare log names
      and cache result.
      If we are after RESET SLAVE, and the SQL slave thread has not processed
      any event yet, it could be that group_master_log_name is "". In that case,
      just wait for more events (as there is no sensible comparison to do).
    */

    if (*log_name)
    {
      const char *basename= log_name + dirname_length(log_name);

      const char *q= (const char*)(fn_ext(basename)+1);
      if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
      {
        /* Now compare extensions. */
        char *q_end;
        ulong log_name_extension= strtoul(q, &q_end, 10);
        if (log_name_extension < until_log_name_extension)
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
        else
          until_log_names_cmp_result=
            (log_name_extension > until_log_name_extension) ?
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
      }
      else
      {
        /* Probably error so we aborting */
        sql_print_error("Slave SQL thread is stopped because UNTIL "
                        "condition is bad.");
        DBUG_RETURN(TRUE);
      }
    }
    else
      DBUG_RETURN(until_log_pos == 0);
  }

  DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
           log_pos >= until_log_pos) ||
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
}


1274
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
1275
                               time_t event_creation_time, THD *thd,
unknown's avatar
unknown committed
1276
                               rpl_group_info *rgi)
1277
{
1278 1279 1280
#ifndef DBUG_OFF
  extern uint debug_not_change_ts_if_art_event;
#endif
Michael Widenius's avatar
Michael Widenius committed
1281
  DBUG_ENTER("Relay_log_info::stmt_done");
1282

1283
  DBUG_ASSERT(rgi->rli == this);
1284 1285 1286 1287 1288 1289
  /*
    If in a transaction, and if the slave supports transactions, just
    inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
    (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
    BEGIN/COMMIT, not with SET AUTOCOMMIT= .

Michael Widenius's avatar
Michael Widenius committed
1290 1291 1292
    We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
    is also used for single row transactions.

1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
    CAUTION: opt_using_transactions means innodb || bdb ; suppose the
    master supports InnoDB and BDB, but the slave supports only BDB,
    problems will arise: - suppose an InnoDB table is created on the
    master, - then it will be MyISAM on the slave - but as
    opt_using_transactions is true, the slave will believe he is
    transactional with the MyISAM table. And problems will come when
    one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
    resume at BEGIN whereas there has not been any rollback).  This is
    the problem of using opt_using_transactions instead of a finer
    "does the slave support _transactional handler used on the
    master_".

    More generally, we'll have problems when a query mixes a
    transactional handler and MyISAM and STOP SLAVE is issued in the
    middle of the "transaction". START SLAVE will resume at BEGIN
    while the MyISAM table has already been updated.
  */
Michael Widenius's avatar
Michael Widenius committed
1310 1311
  if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
      opt_using_transactions)
1312
    rgi->inc_event_relay_log_pos();
1313 1314
  else
  {
1315
    inc_group_relay_log_pos(event_master_log_pos, rgi);
1316
    if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi))
unknown's avatar
unknown committed
1317
    {
1318
      report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
unknown's avatar
unknown committed
1319 1320 1321
             "Failed to update GTID state in %s.%s, slave state may become "
             "inconsistent: %d: %s",
             "mysql", rpl_gtid_slave_state_table_name.str,
Sergei Golubchik's avatar
Sergei Golubchik committed
1322
             thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message());
unknown's avatar
unknown committed
1323 1324 1325 1326 1327 1328 1329
      /*
        At this point we are not in a transaction (for example after DDL),
        so we can not roll back. Anyway, normally updates to the slave
        state table should not fail, and if they do, at least we made the
        DBA aware of the problem in the error log.
      */
    }
unknown's avatar
unknown committed
1330
    DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
unknown's avatar
unknown committed
1331 1332
    if (mi->using_gtid == Master_info::USE_GTID_NO)
      flush_relay_log_info(this);
unknown's avatar
unknown committed
1333
    DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
1334 1335 1336 1337 1338 1339 1340
    /*
      Note that Rotate_log_event::do_apply_event() does not call this
      function, so there is no chance that a fake rotate event resets
      last_master_timestamp.  Note that we update without mutex
      (probably ok - except in some very rare cases, only consequence
      is that value may take some time to display in
      Seconds_Behind_Master - not critical).
1341 1342 1343

      In parallel replication, we take care to not set last_master_timestamp
      backwards, in case of out-of-order calls here.
1344
    */
Sergei Golubchik's avatar
Sergei Golubchik committed
1345
    if (!(event_creation_time == 0 &&
1346 1347 1348
          IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)) &&
        !(rgi->is_parallel_exec && event_creation_time <= last_master_timestamp)
        )
1349
        last_master_timestamp= event_creation_time;
1350
  }
1351
  DBUG_VOID_RETURN;
1352 1353
}

1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364

int
Relay_log_info::alloc_inuse_relaylog(const char *name)
{
  inuse_relaylog *ir;

  if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
  {
    my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
    return 1;
  }
1365
  strmake_buf(ir->name, name);
1366 1367 1368 1369

  if (!inuse_relaylog_list)
    inuse_relaylog_list= ir;
  else
1370 1371
  {
    last_inuse_relaylog->completed= true;
1372
    last_inuse_relaylog->next= ir;
1373
  }
1374 1375 1376 1377 1378 1379
  last_inuse_relaylog= ir;

  return 0;
}


1380
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
unknown's avatar
unknown committed
1381 1382 1383 1384 1385 1386 1387
int
rpl_load_gtid_slave_state(THD *thd)
{
  TABLE_LIST tlist;
  TABLE *table;
  bool table_opened= false;
  bool table_scanned= false;
1388
  bool array_inited= false;
unknown's avatar
unknown committed
1389
  struct local_element { uint64 sub_id; rpl_gtid gtid; };
1390
  struct local_element tmp_entry, *entry;
unknown's avatar
unknown committed
1391
  HASH hash;
1392
  DYNAMIC_ARRAY array;
unknown's avatar
unknown committed
1393 1394 1395 1396
  int err= 0;
  uint32 i;
  DBUG_ENTER("rpl_load_gtid_slave_state");

1397
  mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
unknown's avatar
unknown committed
1398
  bool loaded= rpl_global_gtid_slave_state.loaded;
1399
  mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
unknown's avatar
unknown committed
1400 1401 1402 1403 1404 1405
  if (loaded)
    DBUG_RETURN(0);

  my_hash_init(&hash, &my_charset_bin, 32,
               offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
               sizeof(uint32), NULL, my_free, HASH_UNIQUE);
1406 1407 1408
  if ((err= my_init_dynamic_array(&array, sizeof(local_element), 0, 0, MYF(0))))
    goto end;
  array_inited= true;
unknown's avatar
unknown committed
1409

Sergei Golubchik's avatar
Sergei Golubchik committed
1410
  mysql_reset_thd_for_next_command(thd);
unknown's avatar
unknown committed
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425

  tlist.init_one_table(STRING_WITH_LEN("mysql"),
                       rpl_gtid_slave_state_table_name.str,
                       rpl_gtid_slave_state_table_name.length,
                       NULL, TL_READ);
  if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
    goto end;
  table_opened= true;
  table= tlist.table;

  if ((err= gtid_check_rpl_slave_state_table(table)))
    goto end;

  bitmap_set_all(table->read_set);
  if ((err= table->file->ha_rnd_init_with_error(1)))
1426 1427
  {
    table->file->print_error(err, MYF(0));
unknown's avatar
unknown committed
1428
    goto end;
1429
  }
unknown's avatar
unknown committed
1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443
  table_scanned= true;
  for (;;)
  {
    uint32 domain_id, server_id;
    uint64 sub_id, seq_no;
    uchar *rec;

    if ((err= table->file->ha_rnd_next(table->record[0])))
    {
      if (err == HA_ERR_RECORD_DELETED)
        continue;
      else if (err == HA_ERR_END_OF_FILE)
        break;
      else
1444 1445
      {
        table->file->print_error(err, MYF(0));
unknown's avatar
unknown committed
1446
        goto end;
1447
      }
unknown's avatar
unknown committed
1448 1449 1450 1451 1452 1453 1454 1455 1456
    }
    domain_id= (ulonglong)table->field[0]->val_int();
    sub_id= (ulonglong)table->field[1]->val_int();
    server_id= (ulonglong)table->field[2]->val_int();
    seq_no= (ulonglong)table->field[3]->val_int();
    DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n",
                        (unsigned)domain_id, (unsigned)server_id,
                        (ulong)seq_no, (ulong)sub_id));

1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
    tmp_entry.sub_id= sub_id;
    tmp_entry.gtid.domain_id= domain_id;
    tmp_entry.gtid.server_id= server_id;
    tmp_entry.gtid.seq_no= seq_no;
    if ((err= insert_dynamic(&array, (uchar *)&tmp_entry)))
    {
      my_error(ER_OUT_OF_RESOURCES, MYF(0));
      goto end;
    }

unknown's avatar
unknown committed
1467 1468 1469 1470 1471
    if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
    {
      entry= (struct local_element *)rec;
      if (entry->sub_id >= sub_id)
        continue;
unknown's avatar
unknown committed
1472 1473 1474 1475
      entry->sub_id= sub_id;
      DBUG_ASSERT(entry->gtid.domain_id == domain_id);
      entry->gtid.server_id= server_id;
      entry->gtid.seq_no= seq_no;
unknown's avatar
unknown committed
1476 1477 1478 1479 1480 1481
    }
    else
    {
      if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
                                                     MYF(MY_WME))))
      {
1482
        my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
unknown's avatar
unknown committed
1483 1484 1485
        err= 1;
        goto end;
      }
unknown's avatar
unknown committed
1486 1487 1488 1489
      entry->sub_id= sub_id;
      entry->gtid.domain_id= domain_id;
      entry->gtid.server_id= server_id;
      entry->gtid.seq_no= seq_no;
unknown's avatar
unknown committed
1490 1491 1492
      if ((err= my_hash_insert(&hash, (uchar *)entry)))
      {
        my_free(entry);
1493
        my_error(ER_OUT_OF_RESOURCES, MYF(0));
unknown's avatar
unknown committed
1494 1495 1496 1497 1498
        goto end;
      }
    }
  }

1499
  mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
1500 1501
  if (rpl_global_gtid_slave_state.loaded)
  {
1502
    mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
1503 1504
    goto end;
  }
1505 1506

  for (i= 0; i < array.elements; ++i)
unknown's avatar
unknown committed
1507
  {
1508 1509 1510 1511
    get_dynamic(&array, (uchar *)&tmp_entry, i);
    if ((err= rpl_global_gtid_slave_state.update(tmp_entry.gtid.domain_id,
                                                 tmp_entry.gtid.server_id,
                                                 tmp_entry.sub_id,
1512 1513
                                                 tmp_entry.gtid.seq_no,
                                                 NULL)))
unknown's avatar
unknown committed
1514
    {
1515
      mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
1516
      my_error(ER_OUT_OF_RESOURCES, MYF(0));
unknown's avatar
unknown committed
1517 1518
      goto end;
    }
1519 1520 1521 1522 1523
  }

  for (i= 0; i < hash.records; ++i)
  {
    entry= (struct local_element *)my_hash_element(&hash, i);
1524 1525 1526 1527
    if (opt_bin_log &&
        mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
                                                    entry->gtid.seq_no))
    {
1528
      mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
1529
      my_error(ER_OUT_OF_RESOURCES, MYF(0));
1530 1531
      goto end;
    }
unknown's avatar
unknown committed
1532
  }
1533

unknown's avatar
unknown committed
1534
  rpl_global_gtid_slave_state.loaded= true;
1535
  mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
unknown's avatar
unknown committed
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550

  err= 0;                                       /* Clear HA_ERR_END_OF_FILE */

end:
  if (table_scanned)
  {
    table->file->ha_index_or_rnd_end();
    ha_commit_trans(thd, FALSE);
    ha_commit_trans(thd, TRUE);
  }
  if (table_opened)
  {
    close_thread_tables(thd);
    thd->mdl_context.release_transactional_locks();
  }
1551 1552
  if (array_inited)
    delete_dynamic(&array);
unknown's avatar
unknown committed
1553 1554 1555 1556
  my_hash_free(&hash);
  DBUG_RETURN(err);
}

1557

unknown's avatar
unknown committed
1558 1559 1560 1561 1562 1563 1564 1565
void
rpl_group_info::reinit(Relay_log_info *rli)
{
  this->rli= rli;
  tables_to_lock= NULL;
  tables_to_lock_count= 0;
  trans_retries= 0;
  last_event_start_time= 0;
1566
  gtid_sub_id= 0;
1567
  commit_id= 0;
1568
  gtid_pending= false;
1569
  worker_error= 0;
unknown's avatar
unknown committed
1570 1571 1572
  row_stmt_start_timestamp= 0;
  long_find_row_note_printed= false;
  did_mark_start_commit= false;
1573
  gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
unknown's avatar
unknown committed
1574 1575 1576 1577
  commit_orderer.reinit();
}

rpl_group_info::rpl_group_info(Relay_log_info *rli)
1578
  : thd(0), wait_commit_sub_id(0),
unknown's avatar
unknown committed
1579 1580
    wait_commit_group_info(0), parallel_entry(0),
    deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
1581
{
unknown's avatar
unknown committed
1582
  reinit(rli);
1583
  bzero(&current_gtid, sizeof(current_gtid));
Michael Widenius's avatar
Michael Widenius committed
1584 1585 1586
  mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
                   MY_MUTEX_INIT_FAST);
  mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
1587 1588 1589
}


1590 1591 1592
rpl_group_info::~rpl_group_info()
{
  free_annotate_event();
unknown's avatar
unknown committed
1593
  delete deferred_events;
Michael Widenius's avatar
Michael Widenius committed
1594 1595
  mysql_mutex_destroy(&sleep_lock);
  mysql_cond_destroy(&sleep_cond);
1596 1597 1598
}


1599 1600 1601
int
event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
{
1602
  uint64 sub_id= rpl_global_gtid_slave_state.next_sub_id(gev->domain_id);
1603 1604
  if (!sub_id)
  {
unknown's avatar
unknown committed
1605
    /* Out of memory caused hash insertion to fail. */
1606 1607 1608 1609 1610 1611
    return 1;
  }
  rgi->gtid_sub_id= sub_id;
  rgi->current_gtid.server_id= gev->server_id;
  rgi->current_gtid.domain_id= gev->domain_id;
  rgi->current_gtid.seq_no= gev->seq_no;
1612
  rgi->commit_id= gev->commit_id;
1613
  rgi->gtid_pending= true;
1614 1615 1616
  return 0;
}

1617 1618

void
1619
delete_or_keep_event_post_apply(rpl_group_info *rgi,
1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
                                Log_event_type typ, Log_event *ev)
{
  /*
    ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
    thread-safe for parallel replication.
  */

  switch (typ) {
  case FORMAT_DESCRIPTION_EVENT:
    /*
      Format_description_log_event should not be deleted because it
      will be used to read info about the relay log's format;
      it will be deleted when the SQL thread does not need it,
      i.e. when this thread terminates.
    */
    break;
  case ANNOTATE_ROWS_EVENT:
    /*
      Annotate_rows event should not be deleted because after it has
      been applied, thd->query points to the string inside this event.
      The thd->query will be used to generate new Annotate_rows event
      during applying the subsequent Rows events.
    */
1643
    rgi->set_annotate_event((Annotate_rows_log_event*) ev);
1644
    break;
1645 1646 1647
  case DELETE_ROWS_EVENT_V1:
  case UPDATE_ROWS_EVENT_V1:
  case WRITE_ROWS_EVENT_V1:
1648 1649 1650 1651 1652 1653 1654 1655
  case DELETE_ROWS_EVENT:
  case UPDATE_ROWS_EVENT:
  case WRITE_ROWS_EVENT:
    /*
      After the last Rows event has been applied, the saved Annotate_rows
      event (if any) is not needed anymore and can be deleted.
    */
    if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
1656
      rgi->free_annotate_event();
1657 1658 1659
    /* fall through */
  default:
    DBUG_PRINT("info", ("Deleting the event after it has been executed"));
1660
    if (!rgi->is_deferred_event(ev))
1661 1662 1663 1664 1665
      delete ev;
    break;
  }
}

unknown's avatar
unknown committed
1666 1667 1668

void rpl_group_info::cleanup_context(THD *thd, bool error)
{
1669
  DBUG_ENTER("rpl_group_info::cleanup_context");
Michael Widenius's avatar
Michael Widenius committed
1670 1671
  DBUG_PRINT("enter", ("error: %d", (int) error));
  
unknown's avatar
unknown committed
1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687
  DBUG_ASSERT(this->thd == thd);
  /*
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
    may have opened tables, which we cannot be sure have been closed (because
    maybe the Rows_log_event have not been found or will not be, because slave
    SQL thread is stopping, or relay log has a missing tail etc). So we close
    all thread's tables. And so the table mappings have to be cancelled.
    2) Rows_log_event::do_apply_event() may even have started statements or
    transactions on them, which we need to rollback in case of error.
    3) If finding a Format_description_log_event after a BEGIN, we also need
    to rollback before continuing with the next events.
    4) so we need this "context cleanup" function.
  */
  if (error)
  {
    trans_rollback_stmt(thd); // if a "statement transaction"
1688
    /* trans_rollback() also resets OPTION_GTID_BEGIN */
unknown's avatar
unknown committed
1689 1690 1691 1692 1693
    trans_rollback(thd);      // if a "real transaction"
  }
  m_table_map.clear_tables();
  slave_close_thread_tables(thd);
  if (error)
Michael Widenius's avatar
Michael Widenius committed
1694
  {
unknown's avatar
unknown committed
1695
    thd->mdl_context.release_transactional_locks();
Michael Widenius's avatar
Michael Widenius committed
1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707

    if (thd == rli->sql_driver_thd)
    {
      /*
        Reset flags. This is needed to handle incident events and errors in
        the relay log noticed by the sql driver thread.
      */
      rli->clear_flag(Relay_log_info::IN_STMT);
      rli->clear_flag(Relay_log_info::IN_TRANSACTION);
    }
  }

unknown's avatar
unknown committed
1708 1709 1710 1711 1712 1713
  /*
    Cleanup for the flags that have been set at do_apply_event.
  */
  thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
  thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;

1714 1715 1716 1717 1718 1719 1720
  /*
    Ensure we always release the domain for others to process, when using
    --gtid-ignore-duplicates.
  */
  if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL)
    rpl_global_gtid_slave_state.release_domain_owner(this);

unknown's avatar
unknown committed
1721 1722 1723 1724 1725
  /*
    Reset state related to long_find_row notes in the error log:
    - timestamp
    - flag that decides whether the slave prints or not
  */
1726 1727
  reset_row_stmt_start_timestamp();
  unset_long_find_row_note_printed();
unknown's avatar
unknown committed
1728 1729 1730 1731 1732 1733 1734

  DBUG_VOID_RETURN;
}


void rpl_group_info::clear_tables_to_lock()
{
1735
  DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
unknown's avatar
unknown committed
1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781
#ifndef DBUG_OFF
  /**
    When replicating in RBR and MyISAM Merge tables are involved
    open_and_lock_tables (called in do_apply_event) appends the 
    base tables to the list of tables_to_lock. Then these are 
    removed from the list in close_thread_tables (which is called 
    before we reach this point).

    This assertion just confirms that we get no surprises at this
    point.
   */
  uint i=0;
  for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
  DBUG_ASSERT(i == tables_to_lock_count);
#endif  

  while (tables_to_lock)
  {
    uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
    if (tables_to_lock->m_tabledef_valid)
    {
      tables_to_lock->m_tabledef.table_def::~table_def();
      tables_to_lock->m_tabledef_valid= FALSE;
    }

    /*
      If blob fields were used during conversion of field values 
      from the master table into the slave table, then we need to 
      free the memory used temporarily to store their values before
      copying into the slave's table.
    */
    if (tables_to_lock->m_conv_table)
      free_blobs(tables_to_lock->m_conv_table);

    tables_to_lock=
      static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
    tables_to_lock_count--;
    my_free(to_free);
  }
  DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
  DBUG_VOID_RETURN;
}


void rpl_group_info::slave_close_thread_tables(THD *thd)
{
1782
  DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
1783
  thd->get_stmt_da()->set_overwrite_status(true);
unknown's avatar
unknown committed
1784
  thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1785
  thd->get_stmt_da()->set_overwrite_status(false);
unknown's avatar
unknown committed
1786 1787 1788

  close_thread_tables(thd);
  /*
1789 1790
    - If transaction rollback was requested due to deadlock
    perform it and release metadata locks.
unknown's avatar
unknown committed
1791 1792 1793 1794 1795 1796 1797 1798 1799
    - If inside a multi-statement transaction,
    defer the release of metadata locks until the current
    transaction is either committed or rolled back. This prevents
    other statements from modifying the table for the entire
    duration of this transaction.  This provides commit ordering
    and guarantees serializability across multiple transactions.
    - If in autocommit mode, or outside a transactional context,
    automatically release metadata locks of the current statement.
  */
1800 1801 1802 1803 1804 1805
  if (thd->transaction_rollback_request)
  {
    trans_rollback_implicit(thd);
    thd->mdl_context.release_transactional_locks();
  }
  else if (! thd->in_multi_stmt_transaction_mode())
unknown's avatar
unknown committed
1806 1807 1808 1809 1810 1811 1812 1813 1814
    thd->mdl_context.release_transactional_locks();
  else
    thd->mdl_context.release_statement_locks();

  clear_tables_to_lock();
  DBUG_VOID_RETURN;
}


unknown's avatar
unknown committed
1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850

static void
mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco)
{
  uint64 count= ++e->count_committing_event_groups;
  if (gco->next_gco && gco->next_gco->wait_count == count)
    mysql_cond_broadcast(&gco->next_gco->COND_group_commit_orderer);
}


void
rpl_group_info::mark_start_commit_no_lock()
{
  if (did_mark_start_commit)
    return;
  mark_start_commit_inner(parallel_entry, gco);
  did_mark_start_commit= true;
}


void
rpl_group_info::mark_start_commit()
{
  rpl_parallel_entry *e;

  if (did_mark_start_commit)
    return;

  e= this->parallel_entry;
  mysql_mutex_lock(&e->LOCK_parallel_entry);
  mark_start_commit_inner(e, gco);
  mysql_mutex_unlock(&e->LOCK_parallel_entry);
  did_mark_start_commit= true;
}


1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870
/*
  Format the current GTID as a string suitable for printing in error messages.

  The string is stored in a buffer inside rpl_group_info, so remains valid
  until next call to gtid_info() or until destruction of rpl_group_info.

  If no GTID is available, then NULL is returned.
*/
char *
rpl_group_info::gtid_info()
{
  if (!gtid_sub_id || !current_gtid.seq_no)
    return NULL;
  my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu",
              current_gtid.domain_id, current_gtid.server_id,
              current_gtid.seq_no);
  return gtid_info_buf;
}


1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898
/*
  Undo the effect of a prior mark_start_commit().

  This is only used for retrying a transaction in parallel replication, after
  we have encountered a deadlock or other temporary error.

  When we get such a deadlock, it means that the current group of transactions
  did not yet all start committing (else they would not have deadlocked). So
  we will not yet have woken up anything in the next group, our rgi->gco is
  still live, and we can simply decrement the counter (to be incremented again
  later, when the retry succeeds and reaches the commit step).
*/
void
rpl_group_info::unmark_start_commit()
{
  rpl_parallel_entry *e;

  if (!did_mark_start_commit)
    return;

  e= this->parallel_entry;
  mysql_mutex_lock(&e->LOCK_parallel_entry);
  --e->count_committing_event_groups;
  mysql_mutex_unlock(&e->LOCK_parallel_entry);
  did_mark_start_commit= false;
}


1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927
rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
  : rpl_filter(filter)
{
  cached_charset_invalidate();
}


void rpl_sql_thread_info::cached_charset_invalidate()
{
  DBUG_ENTER("rpl_group_info::cached_charset_invalidate");

  /* Full of zeroes means uninitialized. */
  bzero(cached_charset, sizeof(cached_charset));
  DBUG_VOID_RETURN;
}


bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
{
  DBUG_ENTER("rpl_group_info::cached_charset_compare");

  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
  {
    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
    DBUG_RETURN(1);
  }
  DBUG_RETURN(0);
}

1928
#endif