slave.cc 45.1 KB
Newer Older
unknown's avatar
unknown committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
   
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#include "mysql_priv.h"
#include <mysql.h>
20
#include <myisam.h>
unknown's avatar
unknown committed
21
#include "mini_client.h"
22
#include "slave.h"
unknown's avatar
unknown committed
23 24
#include <thr_alarm.h>
#include <my_dir.h>
25
#include <assert.h>
unknown's avatar
unknown committed
26

unknown's avatar
unknown committed
27 28 29
#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\
 "FIRST")

30
volatile bool slave_running = 0;
unknown's avatar
unknown committed
31 32
pthread_t slave_real_id;
MASTER_INFO glob_mi;
unknown's avatar
unknown committed
33 34
MY_BITMAP slave_error_mask;
bool use_slave_mask = 0;
35
HASH replicate_do_table, replicate_ignore_table;
unknown's avatar
unknown committed
36
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
37
bool do_table_inited = 0, ignore_table_inited = 0;
unknown's avatar
unknown committed
38
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
39
bool table_rules_on = 0;
40 41
uint32 slave_skip_counter = 0; 
static TABLE* save_temporary_tables = 0;
42
THD* slave_thd = 0;
43 44
// when slave thread exits, we need to remember the temporary tables so we
// can re-use them on slave start
45 46 47

static int last_slave_errno = 0;
static char last_slave_error[1024] = "";
unknown's avatar
unknown committed
48
#ifndef DBUG_OFF
49 50
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
static int events_till_disconnect = -1, events_till_abort = -1;
unknown's avatar
unknown committed
51 52
static int stuck_count = 0;
#endif
unknown's avatar
unknown committed
53

unknown's avatar
unknown committed
54 55
inline void skip_load_data_infile(NET* net);
inline bool slave_killed(THD* thd);
unknown's avatar
unknown committed
56
static int init_slave_thread(THD* thd);
57
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
unknown's avatar
unknown committed
58
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
59
			  bool suppress_warnings);
unknown's avatar
unknown committed
60 61 62 63
static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, char* db, char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name);
unknown's avatar
unknown committed
64
inline char* rewrite_db(char* db);
65 66
static int check_expected_error(THD* thd, int expected_error);

67 68
static void free_table_ent(TABLE_RULE_ENT* e)
{
unknown's avatar
unknown committed
69
  my_free((gptr) e, MYF(0));
70 71 72 73 74 75 76 77 78
}

static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
			   my_bool not_used __attribute__((unused)))
{
  *len = e->key_len;
  return (byte*)e->db;
}

unknown's avatar
unknown committed
79 80 81
/* called from get_options() in mysqld.cc on start-up */
void init_slave_skip_errors(char* arg)
{
unknown's avatar
unknown committed
82
  char* p;
unknown's avatar
unknown committed
83 84 85 86 87 88 89 90
  if (bitmap_init(&slave_error_mask,MAX_SLAVE_ERROR,0))
  {
    fprintf(stderr, "Badly out of memory, please check your system status\n");
    exit(1);
  }
  use_slave_mask = 1;
  for (;isspace(*arg);++arg)
    /* empty */;
unknown's avatar
unknown committed
91
  if (!my_casecmp(arg,"all",3))
unknown's avatar
unknown committed
92 93 94 95
  {
    bitmap_set_all(&slave_error_mask);
    return;
  }
unknown's avatar
unknown committed
96
  for (p= arg ; *p; )
unknown's avatar
unknown committed
97
  {
unknown's avatar
unknown committed
98 99 100 101 102 103 104
    long err_code;
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
      break;
    if (err_code < MAX_SLAVE_ERROR)
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
    while (!isdigit(*p) && *p)
      p++;
unknown's avatar
unknown committed
105 106
  }
}
107 108 109 110 111 112 113 114

void init_table_rule_hash(HASH* h, bool* h_inited)
{
  hash_init(h, TABLE_RULE_HASH_SIZE,0,0,
	    (hash_get_key) get_table_key,
	    (void (*)(void*)) free_table_ent, 0);
  *h_inited = 1;
}
unknown's avatar
unknown committed
115

unknown's avatar
unknown committed
116 117
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
{
unknown's avatar
unknown committed
118
  my_init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
unknown's avatar
unknown committed
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
		     TABLE_RULE_ARR_SIZE);
  *a_inited = 1;
}

static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
  uint i;
  const char* key_end = key + len;
  
  for(i = 0; i < a->elements; i++)
    {
      TABLE_RULE_ENT* e ;
      get_dynamic(a, (gptr)&e, i);
      if(!wild_case_compare(key, key_end, (const char*)e->db,
			    (const char*)(e->db + e->key_len),'\\'))
	return e;
    }
  
  return 0;
}

140 141
int tables_ok(THD* thd, TABLE_LIST* tables)
{
unknown's avatar
unknown committed
142 143 144 145 146 147 148 149 150 151
  for (; tables; tables = tables->next)
  {
    if (!tables->updating) 
      continue;
    char hash_key[2*NAME_LEN+2];
    char* p;
    p = strmov(hash_key, tables->db ? tables->db : thd->db);
    *p++ = '.';
    uint len = strmov(p, tables->real_name) - hash_key ;
    if (do_table_inited) // if there are any do's
152
    {
unknown's avatar
unknown committed
153 154 155 156 157 158 159
      if (hash_search(&replicate_do_table, (byte*) hash_key, len))
	return 1;
    }
    if (ignore_table_inited) // if there are any do's
    {
      if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
	return 0; 
160
    }
unknown's avatar
unknown committed
161 162 163 164 165 166 167
    if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
					  hash_key, len))
      return 1;
    if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
					      hash_key, len))
      return 0;
  }
168

unknown's avatar
unknown committed
169
  // if no explicit rule found
170 171
  // and there was a do list, do not replicate. If there was
  // no do list, go ahead
unknown's avatar
unknown committed
172
  return !do_table_inited && !wild_do_table_inited;
173 174 175 176 177
}


int add_table_rule(HASH* h, const char* table_spec)
{
unknown's avatar
unknown committed
178
  const char* dot = strchr(table_spec, '.');
179
  if(!dot) return 1;
unknown's avatar
unknown committed
180
  // len is always > 0 because we know the there exists a '.'
181 182 183 184 185 186 187 188 189 190 191 192
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
  if(!e) return 1;
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  (void)hash_insert(h, (byte*)e);
  return 0;
}

unknown's avatar
unknown committed
193 194
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
unknown's avatar
unknown committed
195
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
196 197 198 199 200 201 202 203 204 205 206 207 208
  if(!dot) return 1;
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
  if(!e) return 1;
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  insert_dynamic(a, (gptr)&e);
  return 0;
}

209 210 211 212 213 214
static void free_string_array(DYNAMIC_ARRAY *a)
{
  uint i;
  for(i = 0; i < a->elements; i++)
    {
      char* p;
unknown's avatar
unknown committed
215
      get_dynamic(a, (gptr) &p, i);
216 217 218 219 220 221 222
      my_free(p, MYF(MY_WME));
    }
  delete_dynamic(a);
}

void end_slave()
{
223
  pthread_mutex_lock(&LOCK_slave);
224 225 226 227 228 229 230 231 232 233
  if (slave_running)
  {
    abort_slave = 1;
    thr_alarm_kill(slave_real_id);
#ifdef SIGNAL_WITH_VIO_CLOSE
    slave_thd->close_active_vio();
#endif    
    while (slave_running)
      pthread_cond_wait(&COND_slave_stopped, &LOCK_slave);
  }
234 235
  pthread_mutex_unlock(&LOCK_slave);
  
236 237 238 239 240 241 242 243 244 245
  end_master_info(&glob_mi);
  if(do_table_inited)
    hash_free(&replicate_do_table);
  if(ignore_table_inited)
    hash_free(&replicate_ignore_table);
  if(wild_do_table_inited)
    free_string_array(&replicate_wild_do_table);
  if(wild_ignore_table_inited)
    free_string_array(&replicate_wild_ignore_table);
}
unknown's avatar
unknown committed
246

unknown's avatar
unknown committed
247
inline bool slave_killed(THD* thd)
unknown's avatar
unknown committed
248 249 250 251
{
  return abort_slave || abort_loop || thd->killed;
}

unknown's avatar
unknown committed
252
inline void skip_load_data_infile(NET* net)
253 254 255 256 257 258 259
{
  (void)my_net_write(net, "\xfb/dev/null", 10);
  (void)net_flush(net);
  (void)my_net_read(net); // discard response
  send_ok(net); // the master expects it
}

unknown's avatar
unknown committed
260
inline char* rewrite_db(char* db)
unknown's avatar
unknown committed
261 262 263 264 265 266 267 268 269 270 271 272 273
{
  if(replicate_rewrite_db.is_empty() || !db) return db;
  I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  i_string_pair* tmp;

  while((tmp=it++))
    {
      if(!strcmp(tmp->key, db))
	return tmp->val;
    }

  return db;
}
274

unknown's avatar
unknown committed
275 276 277 278 279 280
int db_ok(const char* db, I_List<i_string> &do_list,
	  I_List<i_string> &ignore_list )
{
  if(do_list.is_empty() && ignore_list.is_empty())
    return 1; // ok to replicate if the user puts no constraints

unknown's avatar
unknown committed
281
  // if the user has specified restrictions on which databases to replicate
unknown's avatar
unknown committed
282
  // and db was not selected, do not replicate
unknown's avatar
unknown committed
283 284
  if(!db)
    return 0;
unknown's avatar
unknown committed
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312

  if(!do_list.is_empty()) // if the do's are not empty
    {
      I_List_iterator<i_string> it(do_list);
      i_string* tmp;

      while((tmp=it++))
	{
	  if(!strcmp(tmp->ptr, db))
	    return 1; // match
	}
      return 0;
    }
  else // there are some elements in the don't, otherwise we cannot get here
    {
      I_List_iterator<i_string> it(ignore_list);
      i_string* tmp;

      while((tmp=it++))
	{
	  if(!strcmp(tmp->ptr, db))
	    return 0; // match
	}
      
      return 1;
    }
}

unknown's avatar
unknown committed
313
static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f,
unknown's avatar
unknown committed
314 315
			       char* default_val)
{
unknown's avatar
unknown committed
316 317 318 319 320 321 322
  uint length;
  if ((length=my_b_gets(f,var, max_size)))
  {
    char* last_p = var + length -1;
    if (*last_p == '\n')
      *last_p = 0; // if we stopped on newline, kill it
    else
unknown's avatar
unknown committed
323
    {
unknown's avatar
unknown committed
324 325
      // if we truncated a line or stopped on last char, remove all chars
      // up to and including newline
unknown's avatar
unknown committed
326 327
      int c;
      while( ((c=my_b_get(f)) != '\n' && c != my_b_EOF));
unknown's avatar
unknown committed
328
    }
unknown's avatar
unknown committed
329 330 331 332
    return 0;
  }
  else if (default_val)
  {
unknown's avatar
unknown committed
333
    strmake(var,  default_val, max_size-1);
unknown's avatar
unknown committed
334 335
    return 0;
  }
unknown's avatar
unknown committed
336
  return 1;
unknown's avatar
unknown committed
337 338
}

unknown's avatar
unknown committed
339
static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
unknown's avatar
unknown committed
340 341 342
{
  char buf[32];
  
unknown's avatar
unknown committed
343 344 345 346 347
  if (my_b_gets(f, buf, sizeof(buf))) 
  {
    *var = atoi(buf);
    return 0;
  }
unknown's avatar
unknown committed
348
  else if(default_val)
unknown's avatar
unknown committed
349 350 351 352
  {
    *var = default_val;
    return 0;
  }
unknown's avatar
unknown committed
353
  return 1;
unknown's avatar
unknown committed
354 355 356 357 358 359 360
}


static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name)
{
  uint packet_len = my_net_read(net); // read create table statement
361 362
  Vio* save_vio;
  HA_CHECK_OPT check_opt;
unknown's avatar
unknown committed
363
  TABLE_LIST tables;
364 365
  int error= 1;
  handler *file;
366
  char *query;
unknown's avatar
unknown committed
367
  
368 369 370 371 372 373 374 375 376 377 378
  if (packet_len == packet_error)
  {
    send_error(&thd->net, ER_MASTER_NET_READ);
    return 1;
  }
  if (net->read_pos[0] == 255) // error from master
  {
    net->read_pos[packet_len] = 0;
    net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
    return 1;
  }
unknown's avatar
unknown committed
379
  thd->command = COM_TABLE_DUMP;
380 381
  /* Note that we should not set thd->query until the area is initalized */
  if (!(query = sql_alloc(packet_len + 1)))
382 383 384 385 386
  {
    sql_print_error("create_table_from_dump: out of memory");
    net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
    return 1;
  }
387 388 389 390 391 392 393 394 395 396
  memcpy(query, net->read_pos, packet_len);
  query[packet_len]= 0;
  thd->query_length= packet_len;
  /*
    We make the following lock in an attempt to ensure that the compiler will
    not rearrange the code so that thd->query is set too soon
  */
  VOID(pthread_mutex_lock(&LOCK_thread_count));
  thd->query= query;
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
397 398 399 400
  thd->current_tablenr = 0;
  thd->query_error = 0;
  thd->net.no_send_ok = 1;
  thd->proc_info = "Creating table from master dump";
unknown's avatar
unknown committed
401
  // save old db in case we are creating in a different database
unknown's avatar
unknown committed
402
  char* save_db = thd->db;
unknown's avatar
unknown committed
403
  thd->db = thd->last_nx_db;
unknown's avatar
unknown committed
404
  mysql_parse(thd, thd->query, packet_len); // run create table
405
  thd->db = save_db;		// leave things the way the were before
unknown's avatar
unknown committed
406
  
407 408
  if (thd->query_error)
    goto err;			// mysql_parse took care of the error send
unknown's avatar
unknown committed
409 410 411

  bzero((char*) &tables,sizeof(tables));
  tables.db = (char*)db;
unknown's avatar
unknown committed
412
  tables.alias= tables.real_name= (char*)table_name;
unknown's avatar
unknown committed
413 414
  tables.lock_type = TL_WRITE;
  thd->proc_info = "Opening master dump table";
unknown's avatar
unknown committed
415 416
  if (!open_ltable(thd, &tables, TL_WRITE))
  {
417
    send_error(&thd->net,0,0);			// Send error from open_ltable
unknown's avatar
unknown committed
418
    sql_print_error("create_table_from_dump: could not open created table");
419
    goto err;
unknown's avatar
unknown committed
420
  }
unknown's avatar
unknown committed
421
  
422
  file = tables.table->file;
unknown's avatar
unknown committed
423
  thd->proc_info = "Reading master dump table data";
unknown's avatar
unknown committed
424 425 426 427
  if (file->net_read_dump(net))
  {
    net_printf(&thd->net, ER_MASTER_NET_READ);
    sql_print_error("create_table_from_dump::failed in\
unknown's avatar
unknown committed
428
 handler::net_read_dump()");
429
    goto err;
unknown's avatar
unknown committed
430
  }
unknown's avatar
unknown committed
431 432

  check_opt.init();
433
  check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM;
unknown's avatar
unknown committed
434
  check_opt.quick = 1;
unknown's avatar
unknown committed
435
  thd->proc_info = "Rebuilding the index on master dump table";
unknown's avatar
unknown committed
436
  // we do not want repair() to spam us with messages
unknown's avatar
unknown committed
437 438
  // just send them to the error log, and report the failure in case of
  // problems
439
  save_vio = thd->net.vio;
unknown's avatar
unknown committed
440
  thd->net.vio = 0;
441
  error=file->repair(thd,&check_opt) != 0;
unknown's avatar
unknown committed
442
  thd->net.vio = save_vio;
443 444 445 446
  if (error)
    net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name);

err:
unknown's avatar
unknown committed
447 448 449 450 451 452 453 454 455 456
  close_thread_tables(thd);
  thd->net.no_send_ok = 0;
  return error; 
}

int fetch_nx_table(THD* thd, MASTER_INFO* mi)
{
  MYSQL* mysql = mc_mysql_init(NULL);
  int error = 1;
  int nx_errno = 0;
457 458 459 460 461 462
  if (!mysql)
  {
    sql_print_error("fetch_nx_table: Error in mysql_init()");
    nx_errno = ER_GET_ERRNO;
    goto err;
  }
unknown's avatar
unknown committed
463

464 465 466 467 468
  if (!mi->host || !*mi->host)
  {
    nx_errno = ER_BAD_HOST_ERROR;
    goto err;
  }
unknown's avatar
unknown committed
469

unknown's avatar
unknown committed
470
  safe_connect(thd, mysql, mi);
471
  if (slave_killed(thd))
unknown's avatar
unknown committed
472 473
    goto err;

474 475 476 477 478 479
  if (request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table))
  {
    nx_errno = ER_GET_ERRNO;
    sql_print_error("fetch_nx_table: failed on table dump request ");
    goto err;
  }
unknown's avatar
unknown committed
480

481 482 483 484 485 486 487
  if (create_table_from_dump(thd, &mysql->net, thd->last_nx_db,
			     thd->last_nx_table))
  {
    // create_table_from_dump will have sent the error alread
    sql_print_error("fetch_nx_table: failed on create table ");
    goto err;
  }
unknown's avatar
unknown committed
488 489
  
  error = 0;
unknown's avatar
unknown committed
490

unknown's avatar
unknown committed
491
 err:
unknown's avatar
unknown committed
492 493 494
  if (mysql)
    mc_mysql_close(mysql);
  if (nx_errno && thd->net.vio)
unknown's avatar
unknown committed
495
    send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
496
  thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
unknown's avatar
unknown committed
497 498 499
  return error;
}

500 501 502 503 504 505 506 507 508 509 510
void end_master_info(MASTER_INFO* mi)
{
  if(mi->fd >= 0)
    {
      end_io_cache(&mi->file);
      (void)my_close(mi->fd, MYF(MY_WME));
      mi->fd = -1;
    }
  mi->inited = 0;
}

511
int init_master_info(MASTER_INFO* mi)
unknown's avatar
unknown committed
512
{
unknown's avatar
unknown committed
513
  if (mi->inited)
514
    return 0;
unknown's avatar
unknown committed
515
  int fd,length,error;
unknown's avatar
unknown committed
516
  MY_STAT stat_area;
unknown's avatar
unknown committed
517
  char fname[FN_REFLEN+128];
unknown's avatar
unknown committed
518
  const char *msg;
unknown's avatar
unknown committed
519 520 521 522 523 524
  fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);

  // we need a mutex while we are changing master info parameters to
  // keep other threads from reading bogus info

  pthread_mutex_lock(&mi->lock);
525
  mi->pending = 0;
unknown's avatar
unknown committed
526
  fd = mi->fd;
unknown's avatar
unknown committed
527
  
unknown's avatar
unknown committed
528 529 530 531 532 533 534 535 536 537
  // we do not want any messages if the file does not exist
  if (!my_stat(fname, &stat_area, MYF(0)))
  {
    // if someone removed the file from underneath our feet, just close
    // the old descriptor and re-create the old file
    if (fd >= 0)
      my_close(fd, MYF(MY_WME));
    if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0
	|| init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
			 MYF(MY_WME)))
unknown's avatar
unknown committed
538
    {
unknown's avatar
unknown committed
539 540 541 542
      if(fd >= 0)
	my_close(fd, MYF(0));
      pthread_mutex_unlock(&mi->lock);
      return 1;
unknown's avatar
unknown committed
543
    }
unknown's avatar
unknown committed
544 545 546 547 548 549 550 551 552
    mi->log_file_name[0] = 0;
    mi->pos = 4; // skip magic number
    mi->fd = fd;
      
    if (master_host)
      strmake(mi->host, master_host, sizeof(mi->host) - 1);
    if (master_user)
      strmake(mi->user, master_user, sizeof(mi->user) - 1);
    if (master_password)
553
      strmake(mi->password, master_password, HASH_PASSWORD_LENGTH);
unknown's avatar
unknown committed
554 555 556
    mi->port = master_port;
    mi->connect_retry = master_connect_retry;
  }
557
  else // file exists
unknown's avatar
unknown committed
558 559 560 561 562 563
  {
    if(fd >= 0)
      reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
    else if((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
	    || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
			     0, MYF(MY_WME)))
unknown's avatar
unknown committed
564
    {
unknown's avatar
unknown committed
565
      if(fd >= 0)
unknown's avatar
unknown committed
566 567 568 569
	my_close(fd, MYF(0));
      pthread_mutex_unlock(&mi->lock);
      return 1;
    }
unknown's avatar
unknown committed
570
      
571 572
    if ((length=my_b_gets(&mi->file, mi->log_file_name,
			   sizeof(mi->log_file_name))) < 1)
unknown's avatar
unknown committed
573 574 575 576
    {
      msg="Error reading log file name from master info file ";
      goto error;
    }
unknown's avatar
unknown committed
577

578
    mi->log_file_name[length-1]= 0; // kill \n
unknown's avatar
unknown committed
579 580
    /* Reuse fname buffer */
    if(!my_b_gets(&mi->file, fname, sizeof(fname)))
unknown's avatar
unknown committed
581 582 583 584
    {
      msg="Error reading log file position from master info file";
      goto error;
    }
unknown's avatar
unknown committed
585
    mi->pos = strtoull(fname,(char**) 0, 10);
unknown's avatar
unknown committed
586

unknown's avatar
unknown committed
587 588 589 590 591
    mi->fd = fd;
    if(init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
			     master_host) ||
       init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
			     master_user) || 
592
       init_strvar_from_file(mi->password, HASH_PASSWORD_LENGTH+1, &mi->file,
unknown's avatar
unknown committed
593 594 595 596 597 598 599
			     master_password) ||
       init_intvar_from_file((int*)&mi->port, &mi->file, master_port) ||
       init_intvar_from_file((int*)&mi->connect_retry, &mi->file,
			     master_connect_retry))
    {
      msg="Error reading master configuration";
      goto error;
unknown's avatar
unknown committed
600
    }
unknown's avatar
unknown committed
601
  }
unknown's avatar
unknown committed
602
  
unknown's avatar
unknown committed
603
  mi->inited = 1;
unknown's avatar
unknown committed
604 605 606
  // now change the cache from READ to WRITE - must do this
  // before flush_master_info
  reinit_io_cache(&mi->file, WRITE_CACHE, 0L,0,1);
unknown's avatar
unknown committed
607
  error=test(flush_master_info(mi));
unknown's avatar
unknown committed
608
  pthread_mutex_unlock(&mi->lock);
unknown's avatar
unknown committed
609 610 611 612 613 614 615 616
  return error;

error:
  sql_print_error(msg);
  end_io_cache(&mi->file);
  my_close(fd, MYF(0));
  pthread_mutex_unlock(&mi->lock);
  return 1;
unknown's avatar
unknown committed
617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634
}

int show_master_info(THD* thd)
{
  DBUG_ENTER("show_master_info");
  List<Item> field_list;
  field_list.push_back(new Item_empty_string("Master_Host",
						     sizeof(glob_mi.host)));
  field_list.push_back(new Item_empty_string("Master_User",
						     sizeof(glob_mi.user)));
  field_list.push_back(new Item_empty_string("Master_Port", 6));
  field_list.push_back(new Item_empty_string("Connect_retry", 6));
  field_list.push_back( new Item_empty_string("Log_File",
						     FN_REFLEN));
  field_list.push_back(new Item_empty_string("Pos", 12));
  field_list.push_back(new Item_empty_string("Slave_Running", 3));
  field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
  field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
635 636 637
  field_list.push_back(new Item_empty_string("Last_errno", 4));
  field_list.push_back(new Item_empty_string("Last_error", 20));
  field_list.push_back(new Item_empty_string("Skip_counter", 12));
unknown's avatar
unknown committed
638 639 640 641 642 643 644 645 646 647 648 649
  if(send_fields(thd, field_list, 1))
    DBUG_RETURN(-1);

  String* packet = &thd->packet;
  packet->length(0);
  
  pthread_mutex_lock(&glob_mi.lock);
  net_store_data(packet, glob_mi.host);
  net_store_data(packet, glob_mi.user);
  net_store_data(packet, (uint32) glob_mi.port);
  net_store_data(packet, (uint32) glob_mi.connect_retry);
  net_store_data(packet, glob_mi.log_file_name);
unknown's avatar
unknown committed
650
  net_store_data(packet, (uint32) glob_mi.pos);	// QQ: Should be fixed
unknown's avatar
unknown committed
651 652 653 654 655 656
  pthread_mutex_unlock(&glob_mi.lock);
  pthread_mutex_lock(&LOCK_slave);
  net_store_data(packet, slave_running ? "Yes":"No");
  pthread_mutex_unlock(&LOCK_slave);
  net_store_data(packet, &replicate_do_db);
  net_store_data(packet, &replicate_ignore_db);
657 658 659
  net_store_data(packet, (uint32)last_slave_errno);
  net_store_data(packet, last_slave_error);
  net_store_data(packet, slave_skip_counter);
unknown's avatar
unknown committed
660
  
unknown's avatar
unknown committed
661
  if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
unknown's avatar
unknown committed
662 663 664 665 666 667 668 669
    DBUG_RETURN(-1);

  send_eof(&thd->net);
  DBUG_RETURN(0);
}

int flush_master_info(MASTER_INFO* mi)
{
unknown's avatar
unknown committed
670
  IO_CACHE* file = &mi->file;
unknown's avatar
unknown committed
671 672
  char lbuf[22];
  
unknown's avatar
unknown committed
673 674
  my_b_seek(file, 0L);
  my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
unknown's avatar
unknown committed
675 676
	      mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user,
	      mi->password, mi->port, mi->connect_retry);
unknown's avatar
unknown committed
677
  flush_io_cache(file);
unknown's avatar
unknown committed
678 679 680
  return 0;
}

unknown's avatar
unknown committed
681
int st_master_info::wait_for_pos(THD* thd, String* log_name, ulonglong log_pos)
unknown's avatar
unknown committed
682
{
unknown's avatar
unknown committed
683 684
  if (!inited) return -1;
  bool pos_reached;
unknown's avatar
unknown committed
685
  int event_count = 0;
unknown's avatar
unknown committed
686
  pthread_mutex_lock(&lock);
687
  while(!thd->killed)
unknown's avatar
unknown committed
688 689 690
  {
    int cmp_result;
    if (*log_file_name)
unknown's avatar
unknown committed
691
    {
unknown's avatar
unknown committed
692 693 694 695 696 697
      /*
	We should use dirname_length() here when we have a version of
	this that doesn't modify the argument */
      char *basename = strrchr(log_file_name, FN_LIBCHAR);
      if (basename)
	++basename;
unknown's avatar
unknown committed
698
      else
unknown's avatar
unknown committed
699 700 701 702 703 704
	basename = log_file_name;
      cmp_result =  strncmp(basename, log_name->ptr(),
			    log_name->length());
    }
    else
      cmp_result = 0;
unknown's avatar
unknown committed
705
      
unknown's avatar
unknown committed
706 707 708
    pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0);
    if (pos_reached || thd->killed)
      break;
709 710 711 712 713 714 715
    
    const char* msg = thd->enter_cond(&cond, &lock,
				      "Waiting for master update");
    pthread_cond_wait(&cond, &lock);
    thd->exit_cond(msg);
    event_count++;
  }
unknown's avatar
unknown committed
716 717
  pthread_mutex_unlock(&lock);
  return thd->killed ? -1 : event_count;
unknown's avatar
unknown committed
718 719
}

unknown's avatar
unknown committed
720 721 722 723 724 725 726

static int init_slave_thread(THD* thd)
{
  DBUG_ENTER("init_slave_thread");
  thd->system_thread = thd->bootstrap = 1;
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
unknown's avatar
unknown committed
727
  thd->net.timeout = slave_net_timeout;
unknown's avatar
unknown committed
728 729 730
  thd->max_packet_length=thd->net.max_packet;
  thd->master_access= ~0;
  thd->priv_user = 0;
731
  thd->slave_thread = 1;
732
  thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ;
unknown's avatar
unknown committed
733 734 735 736 737 738 739 740 741
  thd->system_thread = 1;
  thd->client_capabilities = CLIENT_LOCAL_FILES;
  slave_real_id=thd->real_id=pthread_self();
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

  if (init_thr_lock() ||
      my_pthread_setspecific_ptr(THR_THD,  thd) ||
742
      my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
unknown's avatar
unknown committed
743 744 745 746 747 748 749 750 751
      my_pthread_setspecific_ptr(THR_NET,  &thd->net))
  {
    close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
    end_thread(thd,0);
    DBUG_RETURN(-1);
  }

  thd->mysys_var=my_thread_var;
  thd->dbug_thread_id=my_thread_id();
unknown's avatar
unknown committed
752
#if !defined(__WIN__) && !defined(OS2)
unknown's avatar
unknown committed
753 754 755 756 757
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

758
  thd->mem_root.free=thd->mem_root.used=0;	// Probably not needed
unknown's avatar
unknown committed
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779
  if (thd->max_join_size == (ulong) ~0L)
    thd->options |= OPTION_BIG_SELECTS;

  thd->proc_info="Waiting for master update";
  thd->version=refresh_version;
  thd->set_time();

  DBUG_RETURN(0);
}

static int safe_sleep(THD* thd, int sec)
{
  thr_alarm_t alarmed;
  thr_alarm_init(&alarmed);
  time_t start_time= time((time_t*) 0);
  time_t end_time= start_time+sec;
  ALARM  alarm_buff;

  while (start_time < end_time)
  {
    int nap_time = (int) (end_time - start_time);
unknown's avatar
unknown committed
780 781 782 783 784 785
    /*
      the only reason we are asking for alarm is so that
      we will be woken up in case of murder, so if we do not get killed,
      set the alarm so it goes off after we wake up naturally
    */
    thr_alarm(&alarmed, 2 * nap_time,&alarm_buff);
unknown's avatar
unknown committed
786
    sleep(nap_time);
unknown's avatar
unknown committed
787 788 789 790
    // if we wake up before the alarm goes off, hit the button
    // so it will not wake up the wife and kids :-)
    if (thr_alarm_in_use(&alarmed))
      thr_end_alarm(&alarmed);
unknown's avatar
unknown committed
791 792 793 794 795 796 797 798 799 800 801
    
    if (slave_killed(thd))
      return 1;
    start_time=time((time_t*) 0);
  }
  return 0;
}


static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
{
802
  char buf[FN_REFLEN + 10];
unknown's avatar
unknown committed
803 804 805 806 807
  int len;
  int binlog_flags = 0; // for now
  char* logname = mi->log_file_name;
  int4store(buf, mi->pos);
  int2store(buf + 4, binlog_flags);
808
  int4store(buf + 6, server_id);
unknown's avatar
unknown committed
809
  len = (uint) strlen(logname);
810
  memcpy(buf + 10, logname,len);
unknown's avatar
unknown committed
811 812 813 814 815 816 817 818 819
  if (mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
  {
    // something went wrong, so we will just reconnect and retry later
    // in the future, we should do a better error analysis, but for
    // now we just fill up the error log :-)
    sql_print_error("Error on COM_BINLOG_DUMP: %s, will retry in %d secs",
		    mc_mysql_error(mysql), master_connect_retry);
    return 1;
  }
unknown's avatar
unknown committed
820 821 822 823 824 825 826 827

  return 0;
}

static int request_table_dump(MYSQL* mysql, char* db, char* table)
{
  char buf[1024];
  char * p = buf;
unknown's avatar
unknown committed
828 829
  uint table_len = (uint) strlen(table);
  uint db_len = (uint) strlen(db);
unknown's avatar
unknown committed
830 831 832 833 834 835 836 837 838 839 840 841
  if(table_len + db_len > sizeof(buf) - 2)
    {
      sql_print_error("request_table_dump: Buffer overrun");
      return 1;
    } 
  
  *p++ = db_len;
  memcpy(p, db, db_len);
  p += db_len;
  *p++ = table_len;
  memcpy(p, table, table_len);
  
unknown's avatar
unknown committed
842 843 844
  if (mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
  {
    sql_print_error("request_table_dump: Error sending the table dump \
unknown's avatar
unknown committed
845
command");
unknown's avatar
unknown committed
846 847
    return 1;
  }
unknown's avatar
unknown committed
848 849 850 851 852

  return 0;
}


853 854 855 856 857 858 859 860
/*
  We set suppress_warnings TRUE when a normal net read timeout has
  caused us to try a reconnect.  We do not want to print anything to
  the error log in this case because this a anormal event in an idle
  server.
*/

static uint read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
unknown's avatar
unknown committed
861 862
{
  uint len = packet_error;
unknown's avatar
unknown committed
863

unknown's avatar
unknown committed
864 865
  // my_real_read() will time us out
  // we check if we were told to die, and if not, try reading again
unknown's avatar
unknown committed
866
#ifndef DBUG_OFF
unknown's avatar
unknown committed
867
  if (disconnect_slave_event_count && !(events_till_disconnect--))
unknown's avatar
unknown committed
868 869
    return packet_error;      
#endif
870
  *suppress_warnings= 0;
unknown's avatar
unknown committed
871

872 873 874
  len = mc_net_safe_read(mysql);

  if (len == packet_error || (long) len < 1)
unknown's avatar
unknown committed
875
  {
876 877 878 879 880 881 882 883
    if (mc_mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
    {
      /*
	We are trying a normal reconnect after a read timeout;
	we suppress prints to .err file as long as the reconnect
	happens without problems
      */
      *suppress_warnings= TRUE;
unknown's avatar
unknown committed
884
    }
885
    else
unknown's avatar
unknown committed
886
      sql_print_error("Error reading packet from server: %s (\
unknown's avatar
unknown committed
887
server_errno=%d)",
888
		      mc_mysql_error(mysql), mc_mysql_errno(mysql));
unknown's avatar
unknown committed
889 890 891
    return packet_error;
  }

unknown's avatar
unknown committed
892 893
  if (len == 1)
  {
unknown's avatar
unknown committed
894
     sql_print_error("Slave: received 0 length packet from server, apparent\
895 896
 master shutdown: %s",
		     mc_mysql_error(mysql));
unknown's avatar
unknown committed
897
     return packet_error;
unknown's avatar
unknown committed
898
  }
unknown's avatar
unknown committed
899 900
  
  DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
901
		      len, mysql->net.read_pos[4]));
unknown's avatar
unknown committed
902 903 904
  return len - 1;   
}

905 906 907 908 909 910 911 912 913 914 915 916 917 918 919
static int check_expected_error(THD* thd, int expected_error)
{
  switch(expected_error)
    {
    case ER_NET_READ_ERROR:
    case ER_NET_ERROR_ON_WRITE:  
    case ER_SERVER_SHUTDOWN:  
    case ER_NEW_ABORTING_CONNECTION:
      my_snprintf(last_slave_error, sizeof(last_slave_error), 
		 "Slave: query '%s' partially completed on the master \
and was aborted. There is a chance that your master is inconsistent at this \
point. If you are sure that your master is ok, run this query manually on the\
 slave and then restart the slave with SET SQL_SLAVE_SKIP_COUNTER=1;\
 SLAVE START;", thd->query);
      last_slave_errno = expected_error;
920
      sql_print_error("%s",last_slave_error);
921 922 923 924 925
      return 1;
    default:
      return 0;
    }
}
926

unknown's avatar
unknown committed
927 928 929 930 931
inline int ignored_error_code(int err_code)
{
  return use_slave_mask && bitmap_is_set(&slave_error_mask, err_code);
}

unknown's avatar
unknown committed
932 933
static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{
unknown's avatar
unknown committed
934 935
  Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
					     event_len);
unknown's avatar
unknown committed
936
  char llbuff[22];
937
  
unknown's avatar
unknown committed
938 939 940 941 942 943
  mi->event_len = event_len; /* Added by Heikki: InnoDB internally stores the
				master log position it has processed so far;
				position to store is really
				mi->pos + mi->pending + mi->event_len
				since we must store the pos of the END of the
				current log event */
944 945
  if (ev)
  {
946
    int type_code = ev->get_type_code();
unknown's avatar
unknown committed
947
    if (ev->server_id == ::server_id || slave_skip_counter)
unknown's avatar
unknown committed
948
    {
unknown's avatar
unknown committed
949
      if(type_code == LOAD_EVENT)
unknown's avatar
unknown committed
950
	skip_load_data_infile(net);
951
	
unknown's avatar
unknown committed
952 953
      mi->inc_pos(event_len);
      flush_master_info(mi);
unknown's avatar
unknown committed
954 955 956 957 958 959
      if(slave_skip_counter && /* protect against common user error of
				  setting the counter to 1 instead of 2
			          while recovering from an failed
			          auto-increment insert */
	 	 !(type_code == INTVAR_EVENT &&
				 slave_skip_counter == 1))
unknown's avatar
unknown committed
960
        --slave_skip_counter;
unknown's avatar
unknown committed
961 962 963
      delete ev;     
      return 0;					// avoid infinite update loops
    }
964 965
  
    thd->server_id = ev->server_id; // use the original server id for logging
unknown's avatar
unknown committed
966
    thd->set_time();				// time the query
unknown's avatar
unknown committed
967 968
    if(!ev->when)
      ev->when = time(NULL);
969
    
unknown's avatar
unknown committed
970
    switch(type_code) {
971 972 973 974
    case QUERY_EVENT:
    {
      Query_log_event* qev = (Query_log_event*)ev;
      int q_len = qev->q_len;
975
      int expected_error,actual_error = 0;
976
      init_sql_alloc(&thd->mem_root, 8192,0);
unknown's avatar
unknown committed
977
      thd->db = rewrite_db((char*)qev->db);
unknown's avatar
unknown committed
978
      if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
979
      {
980
	thd->query_length= q_len;
981 982 983
	thd->set_time((time_t)qev->when);
	thd->current_tablenr = 0;
	VOID(pthread_mutex_lock(&LOCK_thread_count));
984
	thd->query = (char*)qev->query;
985 986 987
	thd->query_id = query_id++;
	VOID(pthread_mutex_unlock(&LOCK_thread_count));
	thd->last_nx_table = thd->last_nx_db = 0;
unknown's avatar
unknown committed
988
	thd->query_error = 0;			// clear error
989 990
	thd->net.last_errno = 0;
	thd->net.last_error[0] = 0;
unknown's avatar
unknown committed
991
	thd->slave_proxy_id = qev->thread_id;	// for temp tables
992 993 994
	
	// sanity check to make sure the master did not get a really bad
	// error on the query
unknown's avatar
unknown committed
995 996
	if (ignored_error_code((expected_error=qev->error_code)) ||
	    !check_expected_error(thd, expected_error))
997 998 999
	{
	  mysql_parse(thd, thd->query, q_len);
	  if (expected_error !=
unknown's avatar
unknown committed
1000 1001
	      (actual_error = thd->net.last_errno) && expected_error &&
	      !ignored_error_code(actual_error))
1002
	  {
1003
	    const char* errmsg = "Slave: did not get the expected error\
1004
 running query from master - expected: '%s' (%d), got '%s' (%d)"; 
1005 1006 1007 1008 1009
	    sql_print_error(errmsg, ER_SAFE(expected_error),
			    expected_error,
			    actual_error ? thd->net.last_error:"no error",
			    actual_error);
	    thd->query_error = 1;
1010
	  }
unknown's avatar
unknown committed
1011 1012
	  else if (expected_error == actual_error ||
		   ignored_error_code(actual_error))
1013
	  {
1014 1015 1016
	    thd->query_error = 0;
	    *last_slave_error = 0;
	    last_slave_errno = 0;
1017
	  }
1018 1019 1020 1021
	}
	else
	{
	  // master could be inconsistent, abort and tell DBA to check/fix it
1022
	  VOID(pthread_mutex_lock(&LOCK_thread_count));
1023
	  thd->db = thd->query = 0;
1024
	  VOID(pthread_mutex_unlock(&LOCK_thread_count));
1025 1026 1027 1028 1029 1030
	  thd->convert_set = 0;
	  close_thread_tables(thd);
	  free_root(&thd->mem_root,0);
	  delete ev;
	  return 1;
	}
1031
      }
unknown's avatar
unknown committed
1032
      thd->db = 0;				// prevent db from being freed
1033
      VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
1034
      thd->query = 0;				// just to be sure
1035
      VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
1036 1037
      // assume no convert for next query unless set explictly
      thd->convert_set = 0;
1038
      close_thread_tables(thd);
unknown's avatar
unknown committed
1039
      
1040
      if (thd->query_error || thd->fatal_error)
1041 1042 1043
      {
	sql_print_error("Slave:  error running query '%s' ",
			qev->query);
1044 1045 1046 1047 1048 1049 1050
	last_slave_errno = actual_error ? actual_error : -1;
	my_snprintf(last_slave_error, sizeof(last_slave_error),
		    "error '%s' on query '%s'",
		    actual_error ? thd->net.last_error :
		    "unexpected success or fatal error",
		    qev->query
		    );
1051
        free_root(&thd->mem_root,0);
1052
	delete ev;
1053 1054
	return 1;
      }
1055
      free_root(&thd->mem_root,0);
1056
      delete ev;
1057 1058

      mi->inc_pos(event_len);
unknown's avatar
unknown committed
1059
      
unknown's avatar
unknown committed
1060
      if (!(thd->options & OPTION_BEGIN)) {
unknown's avatar
unknown committed
1061 1062 1063 1064 1065 1066 1067 1068

      	/* We only flush the master info position to the master.info file if
        the transaction is not open any more: an incomplete transaction will
      	be rolled back automatically in crash recovery in transactional
      	table handlers */

        flush_master_info(mi);
      }
1069 1070
      break;
    }
unknown's avatar
unknown committed
1071
	  
1072 1073 1074 1075
    case LOAD_EVENT:
    {
      Load_log_event* lev = (Load_log_event*)ev;
      init_sql_alloc(&thd->mem_root, 8192,0);
unknown's avatar
unknown committed
1076
      thd->db = rewrite_db((char*)lev->db);
1077
      DBUG_ASSERT(thd->query == 0);
1078 1079
      thd->query = 0;
      thd->query_error = 0;
unknown's avatar
unknown committed
1080
	    
1081
      if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
1082 1083 1084 1085 1086 1087 1088
      {
	thd->set_time((time_t)lev->when);
	thd->current_tablenr = 0;
	VOID(pthread_mutex_lock(&LOCK_thread_count));
	thd->query_id = query_id++;
	VOID(pthread_mutex_unlock(&LOCK_thread_count));

unknown's avatar
unknown committed
1089 1090 1091
	TABLE_LIST tables;
	bzero((char*) &tables,sizeof(tables));
	tables.db = thd->db;
unknown's avatar
unknown committed
1092
	tables.alias= tables.real_name= (char*)lev->table_name;
unknown's avatar
unknown committed
1093 1094 1095
	tables.lock_type = TL_WRITE;
	// the table will be opened in mysql_load    
        if(table_rules_on && !tables_ok(thd, &tables))
unknown's avatar
unknown committed
1096
	{
unknown's avatar
unknown committed
1097
	  skip_load_data_infile(net);
unknown's avatar
unknown committed
1098
	}
unknown's avatar
unknown committed
1099
	else
unknown's avatar
unknown committed
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
	{
	  enum enum_duplicates handle_dup = DUP_IGNORE;
	  if(lev->sql_ex.opt_flags && REPLACE_FLAG)
	    handle_dup = DUP_REPLACE;
	  sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags &&
			  DUMPFILE_FLAG );
	  String field_term(&lev->sql_ex.field_term, 1),
	    enclosed(&lev->sql_ex.enclosed, 1),
	    line_term(&lev->sql_ex.line_term,1),
	    escaped(&lev->sql_ex.escaped, 1),
	    line_start(&lev->sql_ex.line_start, 1);
unknown's avatar
unknown committed
1111
	    
unknown's avatar
unknown committed
1112 1113 1114
	  ex.field_term = &field_term;
	  if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
	    ex.field_term->length(0);
unknown's avatar
unknown committed
1115
	    
unknown's avatar
unknown committed
1116 1117 1118
	  ex.enclosed = &enclosed;
	  if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
	    ex.enclosed->length(0);
unknown's avatar
unknown committed
1119

unknown's avatar
unknown committed
1120 1121 1122
	  ex.line_term = &line_term;
	  if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
	    ex.line_term->length(0);
unknown's avatar
unknown committed
1123

unknown's avatar
unknown committed
1124 1125 1126
	  ex.line_start = &line_start;
	  if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
	    ex.line_start->length(0);
unknown's avatar
unknown committed
1127

unknown's avatar
unknown committed
1128 1129 1130
	  ex.escaped = &escaped;
	  if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
	    ex.escaped->length(0);
unknown's avatar
unknown committed
1131

unknown's avatar
unknown committed
1132 1133 1134
	  ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
	  if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
	    ex.field_term->length(0);
unknown's avatar
unknown committed
1135
	    
unknown's avatar
unknown committed
1136
	  ex.skip_lines = lev->skip_lines;
unknown's avatar
unknown committed
1137
	    
1138

unknown's avatar
unknown committed
1139 1140 1141 1142 1143 1144
	  List<Item> fields;
	  lev->set_fields(fields);
	  thd->slave_proxy_id = thd->thread_id;
	  thd->net.vio = net->vio;
	  // mysql_load will use thd->net to read the file
	  thd->net.pkt_nr = net->pkt_nr;
unknown's avatar
unknown committed
1145
	  // make sure the client does not get confused
unknown's avatar
unknown committed
1146 1147 1148 1149 1150
	  // about the packet sequence
	  if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
			TL_WRITE))
	    thd->query_error = 1;
	  if(thd->cuted_fields)
unknown's avatar
unknown committed
1151 1152 1153
	    sql_print_error("Slave: load data infile at position %s in log \
'%s' produced %d warning(s)", llstr(glob_mi.pos,llbuff), RPL_LOG_NAME,
			    thd->cuted_fields );
unknown's avatar
unknown committed
1154 1155
	  net->pkt_nr = thd->net.pkt_nr;
	}
1156
      }
unknown's avatar
unknown committed
1157
      else
1158
      {
unknown's avatar
unknown committed
1159 1160
	// we will just ask the master to send us /dev/null if we do not
	// want to load the data :-)
unknown's avatar
unknown committed
1161
	skip_load_data_infile(net);
1162
      }
unknown's avatar
unknown committed
1163
	    
1164 1165 1166 1167 1168 1169 1170 1171
      thd->net.vio = 0; 
      thd->db = 0;// prevent db from being freed
      close_thread_tables(thd);
      if(thd->query_error)
      {
	int sql_error = thd->net.last_errno;
	if(!sql_error)
	  sql_error = ER_UNKNOWN_ERROR;
unknown's avatar
unknown committed
1172
		
unknown's avatar
unknown committed
1173
	sql_print_error("Slave: Error '%s' running load data infile ",
1174 1175
			ER(sql_error));
	delete ev;
1176
        free_root(&thd->mem_root,0);
1177 1178
	return 1;
      }
1179
      
1180
      delete ev;
1181
      free_root(&thd->mem_root,0);
unknown's avatar
unknown committed
1182
	    
1183 1184 1185 1186 1187 1188
      if(thd->fatal_error)
      {
	sql_print_error("Slave: Fatal error running query '%s' ",
			thd->query);
	return 1;
      }
unknown's avatar
unknown committed
1189

1190
      mi->inc_pos(event_len);
unknown's avatar
unknown committed
1191

unknown's avatar
unknown committed
1192
      if (!(thd->options & OPTION_BEGIN))
unknown's avatar
unknown committed
1193 1194
        flush_master_info(mi);

1195 1196 1197
      break;
    }

unknown's avatar
unknown committed
1198 1199 1200
    /* Question: in a START or STOP event, what happens if we have transaction
    open? */

1201 1202 1203
    case START_EVENT:
      mi->inc_pos(event_len);
      flush_master_info(mi);
1204
      delete ev;
1205
      break;
unknown's avatar
unknown committed
1206
                  
1207
    case STOP_EVENT:
1208 1209 1210 1211 1212 1213
      if(mi->pos > 4) // stop event should be ignored after rotate event
	{
          close_temporary_tables(thd);
          mi->inc_pos(event_len);
          flush_master_info(mi);
	}
1214
      delete ev;
1215 1216 1217 1218 1219
      break;
    case ROTATE_EVENT:
    {
      Rotate_log_event* rev = (Rotate_log_event*)ev;
      int ident_len = rev->ident_len;
unknown's avatar
unknown committed
1220
      pthread_mutex_lock(&mi->lock);
1221 1222
      memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
      mi->log_file_name[ident_len] = 0;
unknown's avatar
unknown committed
1223
      mi->pos = 4; // skip magic number
unknown's avatar
unknown committed
1224 1225
      pthread_cond_broadcast(&mi->cond);
      pthread_mutex_unlock(&mi->lock);
unknown's avatar
unknown committed
1226

unknown's avatar
unknown committed
1227
      if (!(thd->options & OPTION_BEGIN))
unknown's avatar
unknown committed
1228
        flush_master_info(mi);
unknown's avatar
unknown committed
1229 1230 1231 1232
#ifndef DBUG_OFF
      if(abort_slave_event_count)
	++events_till_abort;
#endif      
1233
      delete ev;
1234 1235
      break;
    }
unknown's avatar
unknown committed
1236

1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
    case INTVAR_EVENT:
    {
      Intvar_log_event* iev = (Intvar_log_event*)ev;
      switch(iev->type)
      {
      case LAST_INSERT_ID_EVENT:
	thd->last_insert_id_used = 1;
	thd->last_insert_id = iev->val;
	break;
      case INSERT_ID_EVENT:
	thd->next_insert_id = iev->val;
	break;
unknown's avatar
unknown committed
1249
		
1250
      }
unknown's avatar
unknown committed
1251
      mi->inc_pending(event_len);
1252
      delete ev;
1253 1254
      break;
    }
unknown's avatar
unknown committed
1255
    }
1256
  }
unknown's avatar
unknown committed
1257
  else
1258
  {
unknown's avatar
unknown committed
1259 1260 1261 1262
    sql_print_error("\
Could not parse log event entry, check the master for binlog corruption\n\
This may also be a network problem, or just a bug in the master or slave code.\
");
1263 1264 1265
    return 1;
  }
  return 0;	  
unknown's avatar
unknown committed
1266 1267 1268 1269 1270 1271
}
      
// slave thread

pthread_handler_decl(handle_slave,arg __attribute__((unused)))
{
unknown's avatar
unknown committed
1272 1273 1274
#ifndef DBUG_OFF
 slave_begin:  
#endif  
1275
  THD *thd; // needs to be first for thread_stack
unknown's avatar
unknown committed
1276
  MYSQL *mysql = NULL ;
unknown's avatar
unknown committed
1277
  char llbuff[22];
unknown's avatar
unknown committed
1278

1279
  pthread_mutex_lock(&LOCK_slave);
unknown's avatar
unknown committed
1280 1281 1282 1283 1284 1285 1286
  if (!server_id)
  {
    pthread_cond_broadcast(&COND_slave_start);
    pthread_mutex_unlock(&LOCK_slave);
    sql_print_error("Server id not set, will not start slave");
    pthread_exit((void*)1);
  }
1287
  
unknown's avatar
unknown committed
1288 1289
  if(slave_running)
    {
1290
      pthread_cond_broadcast(&COND_slave_start);
unknown's avatar
unknown committed
1291
      pthread_mutex_unlock(&LOCK_slave);
1292
      pthread_exit((void*)1);  // safety just in case
unknown's avatar
unknown committed
1293 1294 1295
    }
  slave_running = 1;
  abort_slave = 0;
1296 1297 1298
#ifndef DBUG_OFF  
  events_till_abort = abort_slave_event_count;
#endif  
unknown's avatar
unknown committed
1299 1300
  pthread_cond_broadcast(&COND_slave_start);
  pthread_mutex_unlock(&LOCK_slave);
unknown's avatar
unknown committed
1301
  
unknown's avatar
unknown committed
1302
  // int error = 1;
unknown's avatar
unknown committed
1303
  bool retried_once = 0;
unknown's avatar
unknown committed
1304
  ulonglong last_failed_pos = 0;
unknown's avatar
unknown committed
1305
  
unknown's avatar
unknown committed
1306 1307
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
1308
  slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
1309
  thd->set_time();
unknown's avatar
unknown committed
1310 1311 1312
  DBUG_ENTER("handle_slave");

  pthread_detach_this_thread();
unknown's avatar
unknown committed
1313
  if (init_slave_thread(thd) || init_master_info(&glob_mi))
unknown's avatar
unknown committed
1314 1315 1316 1317
    {
      sql_print_error("Failed during slave thread initialization");
      goto err;
    }
unknown's avatar
unknown committed
1318
  thd->thread_stack = (char*)&thd; // remember where our stack is
1319
  thd->temporary_tables = save_temporary_tables; // restore temp tables
1320
  (void) pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
1321
  threads.append(thd);
1322
  (void) pthread_mutex_unlock(&LOCK_thread_count);
1323 1324
  glob_mi.pending = 0;  //this should always be set to 0 when the slave thread
  // is started
unknown's avatar
unknown committed
1325
  
unknown's avatar
unknown committed
1326 1327
  DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
		     glob_mi.log_file_name, llstr(glob_mi.pos,llbuff)));
unknown's avatar
unknown committed
1328

unknown's avatar
unknown committed
1329 1330 1331 1332 1333 1334
  
  if (!(mysql = mc_mysql_init(NULL)))
  {
    sql_print_error("Slave thread: error in mc_mysql_init()");
    goto err;
  }
unknown's avatar
unknown committed
1335
  
unknown's avatar
unknown committed
1336

unknown's avatar
unknown committed
1337
  thd->proc_info = "connecting to master";
unknown's avatar
unknown committed
1338 1339
#ifndef DBUG_OFF  
  sql_print_error("Slave thread initialized");
1340 1341
#endif
  // we can get killed during safe_connect
unknown's avatar
unknown committed
1342
  if (!safe_connect(thd, mysql, &glob_mi))
1343
   sql_print_error("Slave: connected to master '%s@%s:%d',\
unknown's avatar
unknown committed
1344 1345 1346 1347
  replication started in log '%s' at position %s", glob_mi.user,
		   glob_mi.host, glob_mi.port,
		   RPL_LOG_NAME,
		   llstr(glob_mi.pos,llbuff));
1348
  else
unknown's avatar
unknown committed
1349 1350 1351 1352
  {
    sql_print_error("Slave thread killed while connecting to master");
    goto err;
  }
unknown's avatar
unknown committed
1353
  
unknown's avatar
unknown committed
1354
connected:
unknown's avatar
unknown committed
1355 1356

  mysql->net.timeout=slave_net_timeout;
unknown's avatar
unknown committed
1357 1358
  while (!slave_killed(thd))
  {
unknown's avatar
unknown committed
1359
      thd->proc_info = "Requesting binlog dump";
unknown's avatar
unknown committed
1360 1361 1362 1363
      if(request_dump(mysql, &glob_mi))
	{
	  sql_print_error("Failed on request_dump()");
	  if(slave_killed(thd))
unknown's avatar
unknown committed
1364
	    {
unknown's avatar
unknown committed
1365 1366 1367 1368
	      sql_print_error("Slave thread killed while requesting master \
dump");
              goto err;
	    }
unknown's avatar
unknown committed
1369
	  
unknown's avatar
unknown committed
1370
	  thd->proc_info = "Waiiting to reconnect after a failed dump request";
unknown's avatar
unknown committed
1371
	  if(mysql->net.vio)
unknown's avatar
unknown committed
1372 1373 1374 1375 1376 1377 1378 1379 1380
	    vio_close(mysql->net.vio);
	  // first time retry immediately, assuming that we can recover
	  // right away - if first time fails, sleep between re-tries
	  // hopefuly the admin can fix the problem sometime
	  if(retried_once)
	    safe_sleep(thd, glob_mi.connect_retry);
	  else
	    retried_once = 1;
	  
unknown's avatar
unknown committed
1381
	  if(slave_killed(thd))
unknown's avatar
unknown committed
1382 1383 1384
	    {
	      sql_print_error("Slave thread killed while retrying master \
dump");
unknown's avatar
unknown committed
1385
	      goto err;
unknown's avatar
unknown committed
1386
	    }
unknown's avatar
unknown committed
1387

unknown's avatar
unknown committed
1388
	  thd->proc_info = "Reconnecting after a failed dump request";
unknown's avatar
unknown committed
1389
	  last_failed_pos=glob_mi.pos;
unknown's avatar
unknown committed
1390
          sql_print_error("Slave: failed dump request, reconnecting to \
unknown's avatar
unknown committed
1391 1392
try again, log '%s' at postion %s", RPL_LOG_NAME,
			  llstr(last_failed_pos,llbuff));
unknown's avatar
unknown committed
1393
	  if(safe_reconnect(thd, mysql, &glob_mi, 0) || slave_killed(thd))
unknown's avatar
unknown committed
1394 1395
	    {
	      sql_print_error("Slave thread killed during or after reconnect");
unknown's avatar
unknown committed
1396
	      goto err;
unknown's avatar
unknown committed
1397
	    }
unknown's avatar
unknown committed
1398

unknown's avatar
unknown committed
1399
	  goto connected;
unknown's avatar
unknown committed
1400 1401 1402 1403
	}

      while(!slave_killed(thd))
	{
1404
          bool suppress_warnings= 0;
unknown's avatar
unknown committed
1405
	
unknown's avatar
unknown committed
1406
	  thd->proc_info = "Reading master update";
1407
	  uint event_len = read_event(mysql, &glob_mi, &suppress_warnings);
unknown's avatar
unknown committed
1408
	  
unknown's avatar
unknown committed
1409
	  if(slave_killed(thd))
unknown's avatar
unknown committed
1410 1411 1412 1413
	    {
	      sql_print_error("Slave thread killed while reading event");
	      goto err;
	    }
1414
	  	  
unknown's avatar
unknown committed
1415 1416
	  if (event_len == packet_error)
	  {
1417 1418 1419 1420 1421 1422 1423 1424 1425
	    if(mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE)
	      {
		sql_print_error("Log entry on master is longer than \
max_allowed_packet on slave. Slave thread will be aborted. If the entry is \
really supposed to be that long, restart the server with a higher value of \
max_allowed_packet. The current value is %ld", max_allowed_packet);
		goto err;
	      }
	    
unknown's avatar
unknown committed
1426
	    thd->proc_info = "Waiting to reconnect after a failed read";
unknown's avatar
unknown committed
1427
	    if(mysql->net.vio)
unknown's avatar
unknown committed
1428 1429 1430 1431 1432 1433
 	      vio_close(mysql->net.vio);
	    if(retried_once) // punish repeat offender with sleep
	      safe_sleep(thd, glob_mi.connect_retry);
	    else
	      retried_once = 1; 
	    
unknown's avatar
unknown committed
1434
	    if(slave_killed(thd))
unknown's avatar
unknown committed
1435 1436 1437 1438 1439
	      {
		sql_print_error("Slave thread killed while waiting to \
reconnect after a failed read");
	        goto err;
	      }
unknown's avatar
unknown committed
1440
	    thd->proc_info = "Reconnecting after a failed read";
unknown's avatar
unknown committed
1441
	    last_failed_pos= glob_mi.pos;
unknown's avatar
unknown committed
1442
	    
1443
	    if (!suppress_warnings)
unknown's avatar
unknown committed
1444
	      sql_print_error("Slave: Failed reading log event, \
unknown's avatar
unknown committed
1445 1446
reconnecting to retry, log '%s' position %s", RPL_LOG_NAME,
			    llstr(last_failed_pos, llbuff));
unknown's avatar
unknown committed
1447
	    if(safe_reconnect(thd, mysql, &glob_mi,
1448
			      suppress_warnings)
unknown's avatar
unknown committed
1449
			 || slave_killed(thd))
unknown's avatar
unknown committed
1450 1451 1452 1453 1454
	      {
		sql_print_error("Slave thread killed during or after a \
reconnect done to recover from failed read");
	        goto err;
	      }
unknown's avatar
unknown committed
1455 1456 1457
	    
	    goto connected;
	  } // if(event_len == packet_error)
unknown's avatar
unknown committed
1458
	  
unknown's avatar
unknown committed
1459
	  thd->proc_info = "Processing master log event"; 
unknown's avatar
unknown committed
1460 1461
	  if(exec_event(thd, &mysql->net, &glob_mi, event_len))
	    {
1462 1463 1464
	      sql_print_error("\
Error running query, slave aborted. Fix the problem, and re-start \
the slave thread with \"mysqladmin start-slave\". We stopped at log \
unknown's avatar
unknown committed
1465 1466
'%s' position %s",
			      RPL_LOG_NAME, llstr(glob_mi.pos, llbuff));
unknown's avatar
unknown committed
1467 1468
	      goto err;
	      // there was an error running the query
unknown's avatar
unknown committed
1469 1470 1471
	      // abort the slave thread, when the problem is fixed, the user
	      // should restart the slave with mysqladmin start-slave
	    }
1472 1473 1474 1475 1476 1477 1478
#ifndef DBUG_OFF
	  if(abort_slave_event_count && !--events_till_abort)
	    {
	      sql_print_error("Slave: debugging abort");
	      goto err;
	    }
#endif	  
unknown's avatar
unknown committed
1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491
	  
	  // successful exec with offset advance,
	  // the slave repents and his sins are forgiven!
	  if(glob_mi.pos > last_failed_pos)
	    {
	     retried_once = 0;
#ifndef DBUG_OFF
	     stuck_count = 0;
#endif
	    }
#ifndef DBUG_OFF
	  else
	    {
unknown's avatar
unknown committed
1492 1493
	      // show a little mercy, allow slave to read one more event
	      // before cutting him off - otherwise he gets stuck
1494
	      // on Intvar events, since they do not advance the offset
unknown's avatar
unknown committed
1495 1496
	      // immediately
	      if (++stuck_count > 2)
unknown's avatar
unknown committed
1497 1498 1499
	        events_till_disconnect++;
	    }
#endif	  
unknown's avatar
unknown committed
1500 1501
	} // while(!slave_killed(thd)) - read/exec loop
  } // while(!slave_killed(thd)) - slave loop
unknown's avatar
unknown committed
1502

unknown's avatar
unknown committed
1503
  // error = 0;
unknown's avatar
unknown committed
1504
 err:
unknown's avatar
unknown committed
1505 1506
  // print the current replication position 
  sql_print_error("Slave thread exiting, replication stopped in log '%s' at \
unknown's avatar
unknown committed
1507 1508
position %s",
		  RPL_LOG_NAME, llstr(glob_mi.pos,llbuff));
1509
  VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
1510
  thd->query = thd->db = 0; // extra safety
1511 1512 1513
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
  if (mysql)
    mc_mysql_close(mysql);
unknown's avatar
unknown committed
1514
  thd->proc_info = "Waiting for slave mutex on exit";
unknown's avatar
unknown committed
1515 1516 1517
  pthread_mutex_lock(&LOCK_slave);
  slave_running = 0;
  abort_slave = 0;
1518 1519
  save_temporary_tables = thd->temporary_tables;
  thd->temporary_tables = 0; // remove tempation from destructor to close them
unknown's avatar
unknown committed
1520 1521
  pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done
  pthread_mutex_unlock(&LOCK_slave);
1522
  net_end(&thd->net); // destructor will not free it, because we are weird
1523
  slave_thd = 0;
1524
  (void) pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
1525
  delete thd;
1526
  (void) pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
1527
  my_thread_end();
unknown's avatar
unknown committed
1528 1529 1530 1531
#ifndef DBUG_OFF
  if(abort_slave_event_count && !events_till_abort)
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
1532 1533 1534 1535
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}

unknown's avatar
unknown committed
1536 1537 1538

/* try to connect until successful or slave killed */

1539
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
unknown's avatar
unknown committed
1540
{
1541
  int slave_was_killed;
unknown's avatar
unknown committed
1542 1543 1544
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif  
1545
  while(!(slave_was_killed = slave_killed(thd)) &&
unknown's avatar
unknown committed
1546 1547 1548
	!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
			  mi->port, 0, 0))
  {
1549
    sql_print_error("Slave thread: error connecting to master: %s (%d),\
unknown's avatar
unknown committed
1550
 retry in %d sec", mc_mysql_error(mysql), errno, mi->connect_retry);
unknown's avatar
unknown committed
1551 1552 1553
    safe_sleep(thd, mi->connect_retry);
  }
  
1554
  if(!slave_was_killed)
1555 1556
    {
      mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
1557
		  mi->user, mi->host, mi->port);
1558 1559
#ifdef SIGNAL_WITH_VIO_CLOSE
      thd->set_active_vio(mysql->net.vio);
1560 1561
#endif      
    }
unknown's avatar
unknown committed
1562
  
1563
  return slave_was_killed;
unknown's avatar
unknown committed
1564 1565
}

1566 1567 1568 1569
/*
  Try to connect until successful or slave killed or we have retried
  master_retry_count times
*/
unknown's avatar
unknown committed
1570

unknown's avatar
unknown committed
1571
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
1572
			  bool suppress_warnings)
unknown's avatar
unknown committed
1573
{
1574
  int slave_was_killed;
1575 1576
  int last_errno= -2;				// impossible error
  ulong err_count=0;
unknown's avatar
unknown committed
1577 1578
  char llbuff[22];

1579 1580 1581 1582
  /*
    If we lost connection after reading a state set event
    we will be re-reading it, so pending needs to be cleared
  */
unknown's avatar
unknown committed
1583
  mi->pending = 0;
unknown's avatar
unknown committed
1584 1585 1586
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif
1587
  while (!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql))
unknown's avatar
unknown committed
1588
  {
1589 1590 1591
    /* Don't repeat last error */
    if (mc_mysql_errno(mysql) != last_errno)
    {
1592
      suppress_warnings= 0;
1593
      sql_print_error("Slave thread: error re-connecting to master: \
1594
%s, last_errno=%d, retry in %d sec",
1595 1596 1597
		      mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
		      mi->connect_retry);
    }
unknown's avatar
unknown committed
1598 1599 1600
    safe_sleep(thd, mi->connect_retry);
    /* if master_retry_count is not set, keep trying until success */
    if (master_retry_count && err_count++ == master_retry_count)
1601 1602 1603 1604
    {
      slave_was_killed=1;
      break;
    }
unknown's avatar
unknown committed
1605
  }
1606

1607 1608
  if (!slave_was_killed)
  {
1609
    if (!suppress_warnings)
unknown's avatar
unknown committed
1610
      sql_print_error("Slave: reconnected to master '%s@%s:%d',\
unknown's avatar
unknown committed
1611 1612 1613 1614
replication resumed in log '%s' at position %s", glob_mi.user,
		    glob_mi.host, glob_mi.port,
		    RPL_LOG_NAME,
		    llstr(glob_mi.pos,llbuff));
1615
#ifdef SIGNAL_WITH_VIO_CLOSE
1616
    thd->set_active_vio(mysql->net.vio);
1617
#endif      
1618
  }
unknown's avatar
unknown committed
1619

1620
  return slave_was_killed;
unknown's avatar
unknown committed
1621 1622 1623 1624
}

#ifdef __GNUC__
template class I_List_iterator<i_string>;
unknown's avatar
unknown committed
1625
template class I_List_iterator<i_string_pair>;
unknown's avatar
unknown committed
1626
#endif