slave.cc 43.5 KB
Newer Older
unknown's avatar
unknown committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
   
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#include "mysql_priv.h"
#include <mysql.h>
20
#include <myisam.h>
unknown's avatar
unknown committed
21
#include "mini_client.h"
22
#include "slave.h"
unknown's avatar
unknown committed
23 24 25
#include <thr_alarm.h>
#include <my_dir.h>

unknown's avatar
unknown committed
26 27 28
#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\
 "FIRST")

29
volatile bool slave_running = 0;
unknown's avatar
unknown committed
30 31
pthread_t slave_real_id;
MASTER_INFO glob_mi;
unknown's avatar
unknown committed
32 33
MY_BITMAP slave_error_mask;
bool use_slave_mask = 0;
34
HASH replicate_do_table, replicate_ignore_table;
unknown's avatar
unknown committed
35
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
36
bool do_table_inited = 0, ignore_table_inited = 0;
unknown's avatar
unknown committed
37
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
38
bool table_rules_on = 0;
39 40
uint32 slave_skip_counter = 0; 
static TABLE* save_temporary_tables = 0;
41
THD* slave_thd = 0;
42 43
// when slave thread exits, we need to remember the temporary tables so we
// can re-use them on slave start
44 45 46

static int last_slave_errno = 0;
static char last_slave_error[1024] = "";
unknown's avatar
unknown committed
47
#ifndef DBUG_OFF
48 49
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
static int events_till_disconnect = -1, events_till_abort = -1;
unknown's avatar
unknown committed
50 51
static int stuck_count = 0;
#endif
unknown's avatar
unknown committed
52

unknown's avatar
unknown committed
53

unknown's avatar
unknown committed
54 55
inline void skip_load_data_infile(NET* net);
inline bool slave_killed(THD* thd);
unknown's avatar
unknown committed
56
static int init_slave_thread(THD* thd);
57 58
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
unknown's avatar
unknown committed
59 60 61 62
static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, char* db, char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name);
unknown's avatar
unknown committed
63
inline char* rewrite_db(char* db);
64 65
static int check_expected_error(THD* thd, int expected_error);

66 67
static void free_table_ent(TABLE_RULE_ENT* e)
{
unknown's avatar
unknown committed
68
  my_free((gptr) e, MYF(0));
69 70 71 72 73 74 75 76 77
}

static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
			   my_bool not_used __attribute__((unused)))
{
  *len = e->key_len;
  return (byte*)e->db;
}

unknown's avatar
unknown committed
78 79 80
/* called from get_options() in mysqld.cc on start-up */
void init_slave_skip_errors(char* arg)
{
unknown's avatar
unknown committed
81
  char* p;
unknown's avatar
unknown committed
82 83 84 85 86 87 88 89
  if (bitmap_init(&slave_error_mask,MAX_SLAVE_ERROR,0))
  {
    fprintf(stderr, "Badly out of memory, please check your system status\n");
    exit(1);
  }
  use_slave_mask = 1;
  for (;isspace(*arg);++arg)
    /* empty */;
unknown's avatar
unknown committed
90
  if (!my_casecmp(arg,"all",3))
unknown's avatar
unknown committed
91 92 93 94
  {
    bitmap_set_all(&slave_error_mask);
    return;
  }
unknown's avatar
unknown committed
95
  for (p= arg ; *p; )
unknown's avatar
unknown committed
96
  {
unknown's avatar
unknown committed
97 98 99 100 101 102 103
    long err_code;
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
      break;
    if (err_code < MAX_SLAVE_ERROR)
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
    while (!isdigit(*p) && *p)
      p++;
unknown's avatar
unknown committed
104 105
  }
}
106 107 108 109 110 111 112 113

void init_table_rule_hash(HASH* h, bool* h_inited)
{
  hash_init(h, TABLE_RULE_HASH_SIZE,0,0,
	    (hash_get_key) get_table_key,
	    (void (*)(void*)) free_table_ent, 0);
  *h_inited = 1;
}
unknown's avatar
unknown committed
114

unknown's avatar
unknown committed
115 116
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
{
unknown's avatar
unknown committed
117
  my_init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
unknown's avatar
unknown committed
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
		     TABLE_RULE_ARR_SIZE);
  *a_inited = 1;
}

static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
  uint i;
  const char* key_end = key + len;
  
  for(i = 0; i < a->elements; i++)
    {
      TABLE_RULE_ENT* e ;
      get_dynamic(a, (gptr)&e, i);
      if(!wild_case_compare(key, key_end, (const char*)e->db,
			    (const char*)(e->db + e->key_len),'\\'))
	return e;
    }
  
  return 0;
}

139 140
int tables_ok(THD* thd, TABLE_LIST* tables)
{
unknown's avatar
unknown committed
141 142 143 144 145 146 147 148 149 150
  for (; tables; tables = tables->next)
  {
    if (!tables->updating) 
      continue;
    char hash_key[2*NAME_LEN+2];
    char* p;
    p = strmov(hash_key, tables->db ? tables->db : thd->db);
    *p++ = '.';
    uint len = strmov(p, tables->real_name) - hash_key ;
    if (do_table_inited) // if there are any do's
151
    {
unknown's avatar
unknown committed
152 153 154 155 156 157 158
      if (hash_search(&replicate_do_table, (byte*) hash_key, len))
	return 1;
    }
    if (ignore_table_inited) // if there are any do's
    {
      if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
	return 0; 
159
    }
unknown's avatar
unknown committed
160 161 162 163 164 165 166
    if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
					  hash_key, len))
      return 1;
    if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
					      hash_key, len))
      return 0;
  }
167

unknown's avatar
unknown committed
168
  // if no explicit rule found
169 170
  // and there was a do list, do not replicate. If there was
  // no do list, go ahead
unknown's avatar
unknown committed
171
  return !do_table_inited && !wild_do_table_inited;
172 173 174 175 176
}


int add_table_rule(HASH* h, const char* table_spec)
{
unknown's avatar
unknown committed
177
  const char* dot = strchr(table_spec, '.');
178
  if(!dot) return 1;
unknown's avatar
unknown committed
179
  // len is always > 0 because we know the there exists a '.'
180 181 182 183 184 185 186 187 188 189 190 191
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
  if(!e) return 1;
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  (void)hash_insert(h, (byte*)e);
  return 0;
}

unknown's avatar
unknown committed
192 193
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
unknown's avatar
unknown committed
194
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
195 196 197 198 199 200 201 202 203 204 205 206 207
  if(!dot) return 1;
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
  if(!e) return 1;
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  insert_dynamic(a, (gptr)&e);
  return 0;
}

208 209 210 211 212 213
static void free_string_array(DYNAMIC_ARRAY *a)
{
  uint i;
  for(i = 0; i < a->elements; i++)
    {
      char* p;
unknown's avatar
unknown committed
214
      get_dynamic(a, (gptr) &p, i);
215 216 217 218 219 220 221
      my_free(p, MYF(MY_WME));
    }
  delete_dynamic(a);
}

void end_slave()
{
222
  pthread_mutex_lock(&LOCK_slave);
223 224 225 226 227 228 229 230 231 232
  if (slave_running)
  {
    abort_slave = 1;
    thr_alarm_kill(slave_real_id);
#ifdef SIGNAL_WITH_VIO_CLOSE
    slave_thd->close_active_vio();
#endif    
    while (slave_running)
      pthread_cond_wait(&COND_slave_stopped, &LOCK_slave);
  }
233 234
  pthread_mutex_unlock(&LOCK_slave);
  
235 236 237 238 239 240 241 242 243 244
  end_master_info(&glob_mi);
  if(do_table_inited)
    hash_free(&replicate_do_table);
  if(ignore_table_inited)
    hash_free(&replicate_ignore_table);
  if(wild_do_table_inited)
    free_string_array(&replicate_wild_do_table);
  if(wild_ignore_table_inited)
    free_string_array(&replicate_wild_ignore_table);
}
unknown's avatar
unknown committed
245

unknown's avatar
unknown committed
246
inline bool slave_killed(THD* thd)
unknown's avatar
unknown committed
247 248 249 250
{
  return abort_slave || abort_loop || thd->killed;
}

unknown's avatar
unknown committed
251
inline void skip_load_data_infile(NET* net)
252 253 254 255 256 257 258
{
  (void)my_net_write(net, "\xfb/dev/null", 10);
  (void)net_flush(net);
  (void)my_net_read(net); // discard response
  send_ok(net); // the master expects it
}

unknown's avatar
unknown committed
259
inline char* rewrite_db(char* db)
unknown's avatar
unknown committed
260 261 262 263 264 265 266 267 268 269 270 271 272
{
  if(replicate_rewrite_db.is_empty() || !db) return db;
  I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  i_string_pair* tmp;

  while((tmp=it++))
    {
      if(!strcmp(tmp->key, db))
	return tmp->val;
    }

  return db;
}
273

unknown's avatar
unknown committed
274 275 276 277 278 279
int db_ok(const char* db, I_List<i_string> &do_list,
	  I_List<i_string> &ignore_list )
{
  if(do_list.is_empty() && ignore_list.is_empty())
    return 1; // ok to replicate if the user puts no constraints

unknown's avatar
unknown committed
280
  // if the user has specified restrictions on which databases to replicate
unknown's avatar
unknown committed
281
  // and db was not selected, do not replicate
unknown's avatar
unknown committed
282 283
  if(!db)
    return 0;
unknown's avatar
unknown committed
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311

  if(!do_list.is_empty()) // if the do's are not empty
    {
      I_List_iterator<i_string> it(do_list);
      i_string* tmp;

      while((tmp=it++))
	{
	  if(!strcmp(tmp->ptr, db))
	    return 1; // match
	}
      return 0;
    }
  else // there are some elements in the don't, otherwise we cannot get here
    {
      I_List_iterator<i_string> it(ignore_list);
      i_string* tmp;

      while((tmp=it++))
	{
	  if(!strcmp(tmp->ptr, db))
	    return 0; // match
	}
      
      return 1;
    }
}

unknown's avatar
unknown committed
312
static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f,
unknown's avatar
unknown committed
313 314
			       char* default_val)
{
unknown's avatar
unknown committed
315 316 317 318 319 320 321
  uint length;
  if ((length=my_b_gets(f,var, max_size)))
  {
    char* last_p = var + length -1;
    if (*last_p == '\n')
      *last_p = 0; // if we stopped on newline, kill it
    else
unknown's avatar
unknown committed
322
    {
unknown's avatar
unknown committed
323 324
      // if we truncated a line or stopped on last char, remove all chars
      // up to and including newline
unknown's avatar
unknown committed
325 326
      int c;
      while( ((c=my_b_get(f)) != '\n' && c != my_b_EOF));
unknown's avatar
unknown committed
327
    }
unknown's avatar
unknown committed
328 329 330 331
    return 0;
  }
  else if (default_val)
  {
unknown's avatar
unknown committed
332
    strmake(var,  default_val, max_size-1);
unknown's avatar
unknown committed
333 334
    return 0;
  }
unknown's avatar
unknown committed
335
  return 1;
unknown's avatar
unknown committed
336 337
}

unknown's avatar
unknown committed
338
static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
unknown's avatar
unknown committed
339 340 341
{
  char buf[32];
  
unknown's avatar
unknown committed
342 343 344 345 346
  if (my_b_gets(f, buf, sizeof(buf))) 
  {
    *var = atoi(buf);
    return 0;
  }
unknown's avatar
unknown committed
347
  else if(default_val)
unknown's avatar
unknown committed
348 349 350 351
  {
    *var = default_val;
    return 0;
  }
unknown's avatar
unknown committed
352
  return 1;
unknown's avatar
unknown committed
353 354 355 356 357 358 359
}


static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name)
{
  uint packet_len = my_net_read(net); // read create table statement
360 361
  Vio* save_vio;
  HA_CHECK_OPT check_opt;
unknown's avatar
unknown committed
362
  TABLE_LIST tables;
363 364
  int error= 1;
  handler *file;
unknown's avatar
unknown committed
365
  
366 367 368 369 370 371 372 373 374 375 376
  if (packet_len == packet_error)
  {
    send_error(&thd->net, ER_MASTER_NET_READ);
    return 1;
  }
  if (net->read_pos[0] == 255) // error from master
  {
    net->read_pos[packet_len] = 0;
    net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
    return 1;
  }
unknown's avatar
unknown committed
377 378
  thd->command = COM_TABLE_DUMP;
  thd->query = sql_alloc(packet_len + 1);
379 380 381 382 383 384
  if (!thd->query)
  {
    sql_print_error("create_table_from_dump: out of memory");
    net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
    return 1;
  }
unknown's avatar
unknown committed
385 386 387 388 389 390
  memcpy(thd->query, net->read_pos, packet_len);
  thd->query[packet_len] = 0;
  thd->current_tablenr = 0;
  thd->query_error = 0;
  thd->net.no_send_ok = 1;
  thd->proc_info = "Creating table from master dump";
unknown's avatar
unknown committed
391
  // save old db in case we are creating in a different database
unknown's avatar
unknown committed
392
  char* save_db = thd->db;
unknown's avatar
unknown committed
393
  thd->db = thd->last_nx_db;
unknown's avatar
unknown committed
394
  mysql_parse(thd, thd->query, packet_len); // run create table
395
  thd->db = save_db;		// leave things the way the were before
unknown's avatar
unknown committed
396
  
397 398
  if (thd->query_error)
    goto err;			// mysql_parse took care of the error send
unknown's avatar
unknown committed
399 400 401 402 403 404

  bzero((char*) &tables,sizeof(tables));
  tables.db = (char*)db;
  tables.name = tables.real_name = (char*)table_name;
  tables.lock_type = TL_WRITE;
  thd->proc_info = "Opening master dump table";
unknown's avatar
unknown committed
405 406
  if (!open_ltable(thd, &tables, TL_WRITE))
  {
407
    send_error(&thd->net,0,0);			// Send error from open_ltable
unknown's avatar
unknown committed
408
    sql_print_error("create_table_from_dump: could not open created table");
409
    goto err;
unknown's avatar
unknown committed
410
  }
unknown's avatar
unknown committed
411
  
412
  file = tables.table->file;
unknown's avatar
unknown committed
413
  thd->proc_info = "Reading master dump table data";
unknown's avatar
unknown committed
414 415 416 417
  if (file->net_read_dump(net))
  {
    net_printf(&thd->net, ER_MASTER_NET_READ);
    sql_print_error("create_table_from_dump::failed in\
unknown's avatar
unknown committed
418
 handler::net_read_dump()");
419
    goto err;
unknown's avatar
unknown committed
420
  }
unknown's avatar
unknown committed
421 422

  check_opt.init();
423
  check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM;
unknown's avatar
unknown committed
424
  check_opt.quick = 1;
unknown's avatar
unknown committed
425
  thd->proc_info = "Rebuilding the index on master dump table";
unknown's avatar
unknown committed
426
  // we do not want repair() to spam us with messages
unknown's avatar
unknown committed
427 428
  // just send them to the error log, and report the failure in case of
  // problems
429
  save_vio = thd->net.vio;
unknown's avatar
unknown committed
430
  thd->net.vio = 0;
431
  error=file->repair(thd,&check_opt) != 0;
unknown's avatar
unknown committed
432
  thd->net.vio = save_vio;
433 434 435 436
  if (error)
    net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name);

err:
unknown's avatar
unknown committed
437 438 439 440 441 442 443 444 445 446
  close_thread_tables(thd);
  thd->net.no_send_ok = 0;
  return error; 
}

int fetch_nx_table(THD* thd, MASTER_INFO* mi)
{
  MYSQL* mysql = mc_mysql_init(NULL);
  int error = 1;
  int nx_errno = 0;
447 448 449 450 451 452
  if (!mysql)
  {
    sql_print_error("fetch_nx_table: Error in mysql_init()");
    nx_errno = ER_GET_ERRNO;
    goto err;
  }
unknown's avatar
unknown committed
453

454 455 456 457 458
  if (!mi->host || !*mi->host)
  {
    nx_errno = ER_BAD_HOST_ERROR;
    goto err;
  }
unknown's avatar
unknown committed
459
  mysql->net.timeout=slave_net_timeout;
unknown's avatar
unknown committed
460
  safe_connect(thd, mysql, mi);
461
  if (slave_killed(thd))
unknown's avatar
unknown committed
462 463
    goto err;

464 465 466 467 468 469
  if (request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table))
  {
    nx_errno = ER_GET_ERRNO;
    sql_print_error("fetch_nx_table: failed on table dump request ");
    goto err;
  }
unknown's avatar
unknown committed
470

471 472 473 474 475 476 477
  if (create_table_from_dump(thd, &mysql->net, thd->last_nx_db,
			     thd->last_nx_table))
  {
    // create_table_from_dump will have sent the error alread
    sql_print_error("fetch_nx_table: failed on create table ");
    goto err;
  }
unknown's avatar
unknown committed
478 479
  
  error = 0;
unknown's avatar
unknown committed
480

unknown's avatar
unknown committed
481
 err:
unknown's avatar
unknown committed
482 483 484
  if (mysql)
    mc_mysql_close(mysql);
  if (nx_errno && thd->net.vio)
unknown's avatar
unknown committed
485
    send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
486
  thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
unknown's avatar
unknown committed
487 488 489
  return error;
}

490 491 492 493 494 495 496 497 498 499 500
void end_master_info(MASTER_INFO* mi)
{
  if(mi->fd >= 0)
    {
      end_io_cache(&mi->file);
      (void)my_close(mi->fd, MYF(MY_WME));
      mi->fd = -1;
    }
  mi->inited = 0;
}

501
int init_master_info(MASTER_INFO* mi)
unknown's avatar
unknown committed
502
{
unknown's avatar
unknown committed
503
  if (mi->inited)
504
    return 0;
unknown's avatar
unknown committed
505
  int fd,length,error;
unknown's avatar
unknown committed
506
  MY_STAT stat_area;
unknown's avatar
unknown committed
507
  char fname[FN_REFLEN+128];
unknown's avatar
unknown committed
508
  const char *msg;
unknown's avatar
unknown committed
509 510 511 512 513 514
  fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);

  // we need a mutex while we are changing master info parameters to
  // keep other threads from reading bogus info

  pthread_mutex_lock(&mi->lock);
515
  mi->pending = 0;
unknown's avatar
unknown committed
516
  fd = mi->fd;
unknown's avatar
unknown committed
517
  
unknown's avatar
unknown committed
518 519 520 521 522 523 524 525 526 527
  // we do not want any messages if the file does not exist
  if (!my_stat(fname, &stat_area, MYF(0)))
  {
    // if someone removed the file from underneath our feet, just close
    // the old descriptor and re-create the old file
    if (fd >= 0)
      my_close(fd, MYF(MY_WME));
    if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0
	|| init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
			 MYF(MY_WME)))
unknown's avatar
unknown committed
528
    {
unknown's avatar
unknown committed
529 530 531 532
      if(fd >= 0)
	my_close(fd, MYF(0));
      pthread_mutex_unlock(&mi->lock);
      return 1;
unknown's avatar
unknown committed
533
    }
unknown's avatar
unknown committed
534 535 536 537 538 539 540 541 542
    mi->log_file_name[0] = 0;
    mi->pos = 4; // skip magic number
    mi->fd = fd;
      
    if (master_host)
      strmake(mi->host, master_host, sizeof(mi->host) - 1);
    if (master_user)
      strmake(mi->user, master_user, sizeof(mi->user) - 1);
    if (master_password)
543
      strmake(mi->password, master_password, HASH_PASSWORD_LENGTH);
unknown's avatar
unknown committed
544 545 546
    mi->port = master_port;
    mi->connect_retry = master_connect_retry;
  }
547
  else // file exists
unknown's avatar
unknown committed
548 549 550 551 552 553
  {
    if(fd >= 0)
      reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
    else if((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
	    || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
			     0, MYF(MY_WME)))
unknown's avatar
unknown committed
554
    {
unknown's avatar
unknown committed
555
      if(fd >= 0)
unknown's avatar
unknown committed
556 557 558 559
	my_close(fd, MYF(0));
      pthread_mutex_unlock(&mi->lock);
      return 1;
    }
unknown's avatar
unknown committed
560
      
561 562
    if ((length=my_b_gets(&mi->file, mi->log_file_name,
			   sizeof(mi->log_file_name))) < 1)
unknown's avatar
unknown committed
563 564 565 566
    {
      msg="Error reading log file name from master info file ";
      goto error;
    }
unknown's avatar
unknown committed
567

568
    mi->log_file_name[length-1]= 0; // kill \n
unknown's avatar
unknown committed
569 570
    /* Reuse fname buffer */
    if(!my_b_gets(&mi->file, fname, sizeof(fname)))
unknown's avatar
unknown committed
571 572 573 574
    {
      msg="Error reading log file position from master info file";
      goto error;
    }
unknown's avatar
unknown committed
575
    mi->pos = strtoull(fname,(char**) 0, 10);
unknown's avatar
unknown committed
576

unknown's avatar
unknown committed
577 578 579 580 581
    mi->fd = fd;
    if(init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
			     master_host) ||
       init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
			     master_user) || 
582
       init_strvar_from_file(mi->password, HASH_PASSWORD_LENGTH+1, &mi->file,
unknown's avatar
unknown committed
583 584 585 586 587 588 589
			     master_password) ||
       init_intvar_from_file((int*)&mi->port, &mi->file, master_port) ||
       init_intvar_from_file((int*)&mi->connect_retry, &mi->file,
			     master_connect_retry))
    {
      msg="Error reading master configuration";
      goto error;
unknown's avatar
unknown committed
590
    }
unknown's avatar
unknown committed
591
  }
unknown's avatar
unknown committed
592
  
unknown's avatar
unknown committed
593
  mi->inited = 1;
unknown's avatar
unknown committed
594 595 596
  // now change the cache from READ to WRITE - must do this
  // before flush_master_info
  reinit_io_cache(&mi->file, WRITE_CACHE, 0L,0,1);
unknown's avatar
unknown committed
597
  error=test(flush_master_info(mi));
unknown's avatar
unknown committed
598
  pthread_mutex_unlock(&mi->lock);
unknown's avatar
unknown committed
599 600 601 602 603 604 605 606
  return error;

error:
  sql_print_error(msg);
  end_io_cache(&mi->file);
  my_close(fd, MYF(0));
  pthread_mutex_unlock(&mi->lock);
  return 1;
unknown's avatar
unknown committed
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624
}

int show_master_info(THD* thd)
{
  DBUG_ENTER("show_master_info");
  List<Item> field_list;
  field_list.push_back(new Item_empty_string("Master_Host",
						     sizeof(glob_mi.host)));
  field_list.push_back(new Item_empty_string("Master_User",
						     sizeof(glob_mi.user)));
  field_list.push_back(new Item_empty_string("Master_Port", 6));
  field_list.push_back(new Item_empty_string("Connect_retry", 6));
  field_list.push_back( new Item_empty_string("Log_File",
						     FN_REFLEN));
  field_list.push_back(new Item_empty_string("Pos", 12));
  field_list.push_back(new Item_empty_string("Slave_Running", 3));
  field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
  field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
625 626 627
  field_list.push_back(new Item_empty_string("Last_errno", 4));
  field_list.push_back(new Item_empty_string("Last_error", 20));
  field_list.push_back(new Item_empty_string("Skip_counter", 12));
unknown's avatar
unknown committed
628 629 630 631 632 633 634 635 636 637 638 639
  if(send_fields(thd, field_list, 1))
    DBUG_RETURN(-1);

  String* packet = &thd->packet;
  packet->length(0);
  
  pthread_mutex_lock(&glob_mi.lock);
  net_store_data(packet, glob_mi.host);
  net_store_data(packet, glob_mi.user);
  net_store_data(packet, (uint32) glob_mi.port);
  net_store_data(packet, (uint32) glob_mi.connect_retry);
  net_store_data(packet, glob_mi.log_file_name);
unknown's avatar
unknown committed
640
  net_store_data(packet, (uint32) glob_mi.pos);	// QQ: Should be fixed
unknown's avatar
unknown committed
641 642 643 644 645 646
  pthread_mutex_unlock(&glob_mi.lock);
  pthread_mutex_lock(&LOCK_slave);
  net_store_data(packet, slave_running ? "Yes":"No");
  pthread_mutex_unlock(&LOCK_slave);
  net_store_data(packet, &replicate_do_db);
  net_store_data(packet, &replicate_ignore_db);
647 648 649
  net_store_data(packet, (uint32)last_slave_errno);
  net_store_data(packet, last_slave_error);
  net_store_data(packet, slave_skip_counter);
unknown's avatar
unknown committed
650
  
unknown's avatar
unknown committed
651
  if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
unknown's avatar
unknown committed
652 653 654 655 656 657 658 659
    DBUG_RETURN(-1);

  send_eof(&thd->net);
  DBUG_RETURN(0);
}

int flush_master_info(MASTER_INFO* mi)
{
unknown's avatar
unknown committed
660
  IO_CACHE* file = &mi->file;
unknown's avatar
unknown committed
661 662
  char lbuf[22];
  
unknown's avatar
unknown committed
663 664
  my_b_seek(file, 0L);
  my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
unknown's avatar
unknown committed
665 666
	      mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user,
	      mi->password, mi->port, mi->connect_retry);
unknown's avatar
unknown committed
667
  flush_io_cache(file);
unknown's avatar
unknown committed
668 669 670
  return 0;
}

unknown's avatar
unknown committed
671
int st_master_info::wait_for_pos(THD* thd, String* log_name, ulonglong log_pos)
unknown's avatar
unknown committed
672
{
unknown's avatar
unknown committed
673 674
  if (!inited) return -1;
  bool pos_reached;
unknown's avatar
unknown committed
675
  int event_count = 0;
unknown's avatar
unknown committed
676
  pthread_mutex_lock(&lock);
677
  while(!thd->killed)
unknown's avatar
unknown committed
678 679 680
  {
    int cmp_result;
    if (*log_file_name)
unknown's avatar
unknown committed
681
    {
unknown's avatar
unknown committed
682 683 684 685 686 687
      /*
	We should use dirname_length() here when we have a version of
	this that doesn't modify the argument */
      char *basename = strrchr(log_file_name, FN_LIBCHAR);
      if (basename)
	++basename;
unknown's avatar
unknown committed
688
      else
unknown's avatar
unknown committed
689 690 691 692 693 694
	basename = log_file_name;
      cmp_result =  strncmp(basename, log_name->ptr(),
			    log_name->length());
    }
    else
      cmp_result = 0;
unknown's avatar
unknown committed
695
      
unknown's avatar
unknown committed
696 697 698
    pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0);
    if (pos_reached || thd->killed)
      break;
699 700 701 702 703 704 705
    
    const char* msg = thd->enter_cond(&cond, &lock,
				      "Waiting for master update");
    pthread_cond_wait(&cond, &lock);
    thd->exit_cond(msg);
    event_count++;
  }
unknown's avatar
unknown committed
706 707
  pthread_mutex_unlock(&lock);
  return thd->killed ? -1 : event_count;
unknown's avatar
unknown committed
708 709
}

unknown's avatar
unknown committed
710 711 712 713 714 715 716

static int init_slave_thread(THD* thd)
{
  DBUG_ENTER("init_slave_thread");
  thd->system_thread = thd->bootstrap = 1;
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
unknown's avatar
unknown committed
717
  thd->net.timeout = slave_net_timeout;
unknown's avatar
unknown committed
718 719 720
  thd->max_packet_length=thd->net.max_packet;
  thd->master_access= ~0;
  thd->priv_user = 0;
721
  thd->slave_thread = 1;
722
  thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ;
unknown's avatar
unknown committed
723 724 725 726 727 728 729 730 731
  thd->system_thread = 1;
  thd->client_capabilities = CLIENT_LOCAL_FILES;
  slave_real_id=thd->real_id=pthread_self();
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

  if (init_thr_lock() ||
      my_pthread_setspecific_ptr(THR_THD,  thd) ||
732
      my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
unknown's avatar
unknown committed
733 734 735 736 737 738 739 740 741
      my_pthread_setspecific_ptr(THR_NET,  &thd->net))
  {
    close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
    end_thread(thd,0);
    DBUG_RETURN(-1);
  }

  thd->mysys_var=my_thread_var;
  thd->dbug_thread_id=my_thread_id();
unknown's avatar
unknown committed
742
#if !defined(__WIN__) && !defined(OS2)
unknown's avatar
unknown committed
743 744 745 746 747
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

748
  thd->mem_root.free=thd->mem_root.used=0;	// Probably not needed
unknown's avatar
unknown committed
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769
  if (thd->max_join_size == (ulong) ~0L)
    thd->options |= OPTION_BIG_SELECTS;

  thd->proc_info="Waiting for master update";
  thd->version=refresh_version;
  thd->set_time();

  DBUG_RETURN(0);
}

static int safe_sleep(THD* thd, int sec)
{
  thr_alarm_t alarmed;
  thr_alarm_init(&alarmed);
  time_t start_time= time((time_t*) 0);
  time_t end_time= start_time+sec;
  ALARM  alarm_buff;

  while (start_time < end_time)
  {
    int nap_time = (int) (end_time - start_time);
unknown's avatar
unknown committed
770 771 772 773 774 775
    /*
      the only reason we are asking for alarm is so that
      we will be woken up in case of murder, so if we do not get killed,
      set the alarm so it goes off after we wake up naturally
    */
    thr_alarm(&alarmed, 2 * nap_time,&alarm_buff);
unknown's avatar
unknown committed
776
    sleep(nap_time);
unknown's avatar
unknown committed
777 778 779 780
    // if we wake up before the alarm goes off, hit the button
    // so it will not wake up the wife and kids :-)
    if (thr_alarm_in_use(&alarmed))
      thr_end_alarm(&alarmed);
unknown's avatar
unknown committed
781 782 783 784 785 786 787 788 789 790 791
    
    if (slave_killed(thd))
      return 1;
    start_time=time((time_t*) 0);
  }
  return 0;
}


static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
{
792
  char buf[FN_REFLEN + 10];
unknown's avatar
unknown committed
793 794 795 796 797
  int len;
  int binlog_flags = 0; // for now
  char* logname = mi->log_file_name;
  int4store(buf, mi->pos);
  int2store(buf + 4, binlog_flags);
798
  int4store(buf + 6, server_id);
unknown's avatar
unknown committed
799
  len = (uint) strlen(logname);
800
  memcpy(buf + 10, logname,len);
unknown's avatar
unknown committed
801 802 803 804 805 806 807 808 809
  if (mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
  {
    // something went wrong, so we will just reconnect and retry later
    // in the future, we should do a better error analysis, but for
    // now we just fill up the error log :-)
    sql_print_error("Error on COM_BINLOG_DUMP: %s, will retry in %d secs",
		    mc_mysql_error(mysql), master_connect_retry);
    return 1;
  }
unknown's avatar
unknown committed
810 811 812 813 814 815 816 817

  return 0;
}

static int request_table_dump(MYSQL* mysql, char* db, char* table)
{
  char buf[1024];
  char * p = buf;
unknown's avatar
unknown committed
818 819
  uint table_len = (uint) strlen(table);
  uint db_len = (uint) strlen(db);
unknown's avatar
unknown committed
820 821 822 823 824 825 826 827 828 829 830 831
  if(table_len + db_len > sizeof(buf) - 2)
    {
      sql_print_error("request_table_dump: Buffer overrun");
      return 1;
    } 
  
  *p++ = db_len;
  memcpy(p, db, db_len);
  p += db_len;
  *p++ = table_len;
  memcpy(p, table, table_len);
  
unknown's avatar
unknown committed
832 833 834
  if (mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
  {
    sql_print_error("request_table_dump: Error sending the table dump \
unknown's avatar
unknown committed
835
command");
unknown's avatar
unknown committed
836 837
    return 1;
  }
unknown's avatar
unknown committed
838 839 840 841 842 843 844 845

  return 0;
}


static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
{
  uint len = packet_error;
unknown's avatar
unknown committed
846

unknown's avatar
unknown committed
847 848
  // my_real_read() will time us out
  // we check if we were told to die, and if not, try reading again
unknown's avatar
unknown committed
849
#ifndef DBUG_OFF
unknown's avatar
unknown committed
850
  if (disconnect_slave_event_count && !(events_till_disconnect--))
unknown's avatar
unknown committed
851 852 853
    return packet_error;      
#endif
  
854 855 856
  len = mc_net_safe_read(mysql);

  if (len == packet_error || (long) len < 1)
unknown's avatar
unknown committed
857
  {
858
    sql_print_error("Error reading packet from server: %s (\
unknown's avatar
unknown committed
859
server_errno=%d)",
860
		    mc_mysql_error(mysql), mc_mysql_errno(mysql));
unknown's avatar
unknown committed
861 862 863
    return packet_error;
  }

unknown's avatar
unknown committed
864 865
  if (len == 1)
  {
unknown's avatar
unknown committed
866
     sql_print_error("Slave: received 0 length packet from server, apparent\
867 868
 master shutdown: %s",
		     mc_mysql_error(mysql));
unknown's avatar
unknown committed
869
     return packet_error;
unknown's avatar
unknown committed
870
  }
unknown's avatar
unknown committed
871 872
  
  DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
873
		      len, mysql->net.read_pos[4]));
unknown's avatar
unknown committed
874 875 876
  return len - 1;   
}

877 878 879 880 881 882 883 884 885 886 887 888 889 890 891
static int check_expected_error(THD* thd, int expected_error)
{
  switch(expected_error)
    {
    case ER_NET_READ_ERROR:
    case ER_NET_ERROR_ON_WRITE:  
    case ER_SERVER_SHUTDOWN:  
    case ER_NEW_ABORTING_CONNECTION:
      my_snprintf(last_slave_error, sizeof(last_slave_error), 
		 "Slave: query '%s' partially completed on the master \
and was aborted. There is a chance that your master is inconsistent at this \
point. If you are sure that your master is ok, run this query manually on the\
 slave and then restart the slave with SET SQL_SLAVE_SKIP_COUNTER=1;\
 SLAVE START;", thd->query);
      last_slave_errno = expected_error;
892
      sql_print_error("%s",last_slave_error);
893 894 895 896 897
      return 1;
    default:
      return 0;
    }
}
898

unknown's avatar
unknown committed
899 900 901 902 903
inline int ignored_error_code(int err_code)
{
  return use_slave_mask && bitmap_is_set(&slave_error_mask, err_code);
}

unknown's avatar
unknown committed
904 905
static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{
unknown's avatar
unknown committed
906 907
  Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
					     event_len);
unknown's avatar
unknown committed
908
  char llbuff[22];
909
  
unknown's avatar
unknown committed
910 911 912 913 914 915
  mi->event_len = event_len; /* Added by Heikki: InnoDB internally stores the
				master log position it has processed so far;
				position to store is really
				mi->pos + mi->pending + mi->event_len
				since we must store the pos of the END of the
				current log event */
916 917
  if (ev)
  {
918
    int type_code = ev->get_type_code();
unknown's avatar
unknown committed
919
    if (ev->server_id == ::server_id || slave_skip_counter)
unknown's avatar
unknown committed
920
    {
unknown's avatar
unknown committed
921
      if(type_code == LOAD_EVENT)
unknown's avatar
unknown committed
922
	skip_load_data_infile(net);
923
	
unknown's avatar
unknown committed
924 925
      mi->inc_pos(event_len);
      flush_master_info(mi);
unknown's avatar
unknown committed
926 927 928 929 930 931
      if(slave_skip_counter && /* protect against common user error of
				  setting the counter to 1 instead of 2
			          while recovering from an failed
			          auto-increment insert */
	 	 !(type_code == INTVAR_EVENT &&
				 slave_skip_counter == 1))
unknown's avatar
unknown committed
932
        --slave_skip_counter;
unknown's avatar
unknown committed
933 934 935
      delete ev;     
      return 0;					// avoid infinite update loops
    }
936 937
  
    thd->server_id = ev->server_id; // use the original server id for logging
unknown's avatar
unknown committed
938
    thd->set_time();				// time the query
unknown's avatar
unknown committed
939 940
    if(!ev->when)
      ev->when = time(NULL);
941
    
unknown's avatar
unknown committed
942
    switch(type_code) {
943 944 945 946
    case QUERY_EVENT:
    {
      Query_log_event* qev = (Query_log_event*)ev;
      int q_len = qev->q_len;
947
      int expected_error,actual_error = 0;
948
      init_sql_alloc(&thd->mem_root, 8192,0);
unknown's avatar
unknown committed
949
      thd->db = rewrite_db((char*)qev->db);
unknown's avatar
unknown committed
950
      if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
951 952 953 954 955 956 957 958
      {
	thd->query = (char*)qev->query;
	thd->set_time((time_t)qev->when);
	thd->current_tablenr = 0;
	VOID(pthread_mutex_lock(&LOCK_thread_count));
	thd->query_id = query_id++;
	VOID(pthread_mutex_unlock(&LOCK_thread_count));
	thd->last_nx_table = thd->last_nx_db = 0;
unknown's avatar
unknown committed
959
	thd->query_error = 0;			// clear error
960 961
	thd->net.last_errno = 0;
	thd->net.last_error[0] = 0;
unknown's avatar
unknown committed
962
	thd->slave_proxy_id = qev->thread_id;	// for temp tables
963 964 965
	
	// sanity check to make sure the master did not get a really bad
	// error on the query
unknown's avatar
unknown committed
966 967
	if (ignored_error_code((expected_error=qev->error_code)) ||
	    !check_expected_error(thd, expected_error))
968 969 970
	{
	  mysql_parse(thd, thd->query, q_len);
	  if (expected_error !=
unknown's avatar
unknown committed
971 972
	      (actual_error = thd->net.last_errno) && expected_error &&
	      !ignored_error_code(actual_error))
973
	  {
974
	    const char* errmsg = "Slave: did not get the expected error\
975
 running query from master - expected: '%s' (%d), got '%s' (%d)"; 
976 977 978 979 980
	    sql_print_error(errmsg, ER_SAFE(expected_error),
			    expected_error,
			    actual_error ? thd->net.last_error:"no error",
			    actual_error);
	    thd->query_error = 1;
981
	  }
unknown's avatar
unknown committed
982 983
	  else if (expected_error == actual_error ||
		   ignored_error_code(actual_error))
984
	  {
985 986 987
	    thd->query_error = 0;
	    *last_slave_error = 0;
	    last_slave_errno = 0;
988
	  }
989 990 991 992 993 994 995 996 997 998 999
	}
	else
	{
	  // master could be inconsistent, abort and tell DBA to check/fix it
	  thd->db = thd->query = 0;
	  thd->convert_set = 0;
	  close_thread_tables(thd);
	  free_root(&thd->mem_root,0);
	  delete ev;
	  return 1;
	}
1000
      }
unknown's avatar
unknown committed
1001 1002 1003 1004
      thd->db = 0;				// prevent db from being freed
      thd->query = 0;				// just to be sure
      // assume no convert for next query unless set explictly
      thd->convert_set = 0;
1005
      close_thread_tables(thd);
unknown's avatar
unknown committed
1006
      
1007
      if (thd->query_error || thd->fatal_error)
1008 1009 1010
      {
	sql_print_error("Slave:  error running query '%s' ",
			qev->query);
1011 1012 1013 1014 1015 1016 1017
	last_slave_errno = actual_error ? actual_error : -1;
	my_snprintf(last_slave_error, sizeof(last_slave_error),
		    "error '%s' on query '%s'",
		    actual_error ? thd->net.last_error :
		    "unexpected success or fatal error",
		    qev->query
		    );
1018
        free_root(&thd->mem_root,0);
1019
	delete ev;
1020 1021
	return 1;
      }
1022
      free_root(&thd->mem_root,0);
1023
      delete ev;
1024 1025

      mi->inc_pos(event_len);
unknown's avatar
unknown committed
1026
      
unknown's avatar
unknown committed
1027
      if (!(thd->options & OPTION_BEGIN)) {
unknown's avatar
unknown committed
1028 1029 1030 1031 1032 1033 1034 1035

      	/* We only flush the master info position to the master.info file if
        the transaction is not open any more: an incomplete transaction will
      	be rolled back automatically in crash recovery in transactional
      	table handlers */

        flush_master_info(mi);
      }
1036 1037
      break;
    }
unknown's avatar
unknown committed
1038
	  
1039 1040 1041 1042
    case LOAD_EVENT:
    {
      Load_log_event* lev = (Load_log_event*)ev;
      init_sql_alloc(&thd->mem_root, 8192,0);
unknown's avatar
unknown committed
1043
      thd->db = rewrite_db((char*)lev->db);
1044 1045
      thd->query = 0;
      thd->query_error = 0;
unknown's avatar
unknown committed
1046
	    
1047 1048 1049 1050 1051 1052 1053 1054
      if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
      {
	thd->set_time((time_t)lev->when);
	thd->current_tablenr = 0;
	VOID(pthread_mutex_lock(&LOCK_thread_count));
	thd->query_id = query_id++;
	VOID(pthread_mutex_unlock(&LOCK_thread_count));

unknown's avatar
unknown committed
1055 1056 1057 1058 1059 1060 1061
	TABLE_LIST tables;
	bzero((char*) &tables,sizeof(tables));
	tables.db = thd->db;
	tables.name = tables.real_name = (char*)lev->table_name;
	tables.lock_type = TL_WRITE;
	// the table will be opened in mysql_load    
        if(table_rules_on && !tables_ok(thd, &tables))
unknown's avatar
unknown committed
1062
	{
unknown's avatar
unknown committed
1063
	  skip_load_data_infile(net);
unknown's avatar
unknown committed
1064
	}
unknown's avatar
unknown committed
1065
	else
unknown's avatar
unknown committed
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076
	{
	  enum enum_duplicates handle_dup = DUP_IGNORE;
	  if(lev->sql_ex.opt_flags && REPLACE_FLAG)
	    handle_dup = DUP_REPLACE;
	  sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags &&
			  DUMPFILE_FLAG );
	  String field_term(&lev->sql_ex.field_term, 1),
	    enclosed(&lev->sql_ex.enclosed, 1),
	    line_term(&lev->sql_ex.line_term,1),
	    escaped(&lev->sql_ex.escaped, 1),
	    line_start(&lev->sql_ex.line_start, 1);
unknown's avatar
unknown committed
1077
	    
unknown's avatar
unknown committed
1078 1079 1080
	  ex.field_term = &field_term;
	  if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
	    ex.field_term->length(0);
unknown's avatar
unknown committed
1081
	    
unknown's avatar
unknown committed
1082 1083 1084
	  ex.enclosed = &enclosed;
	  if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
	    ex.enclosed->length(0);
unknown's avatar
unknown committed
1085

unknown's avatar
unknown committed
1086 1087 1088
	  ex.line_term = &line_term;
	  if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
	    ex.line_term->length(0);
unknown's avatar
unknown committed
1089

unknown's avatar
unknown committed
1090 1091 1092
	  ex.line_start = &line_start;
	  if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
	    ex.line_start->length(0);
unknown's avatar
unknown committed
1093

unknown's avatar
unknown committed
1094 1095 1096
	  ex.escaped = &escaped;
	  if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
	    ex.escaped->length(0);
unknown's avatar
unknown committed
1097

unknown's avatar
unknown committed
1098 1099 1100
	  ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
	  if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
	    ex.field_term->length(0);
unknown's avatar
unknown committed
1101
	    
unknown's avatar
unknown committed
1102
	  ex.skip_lines = lev->skip_lines;
unknown's avatar
unknown committed
1103
	    
1104

unknown's avatar
unknown committed
1105 1106 1107 1108 1109 1110
	  List<Item> fields;
	  lev->set_fields(fields);
	  thd->slave_proxy_id = thd->thread_id;
	  thd->net.vio = net->vio;
	  // mysql_load will use thd->net to read the file
	  thd->net.pkt_nr = net->pkt_nr;
1111
	  // make sure the client does not get confused
unknown's avatar
unknown committed
1112 1113 1114 1115 1116
	  // about the packet sequence
	  if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
			TL_WRITE))
	    thd->query_error = 1;
	  if(thd->cuted_fields)
unknown's avatar
unknown committed
1117 1118 1119
	    sql_print_error("Slave: load data infile at position %s in log \
'%s' produced %d warning(s)", llstr(glob_mi.pos,llbuff), RPL_LOG_NAME,
			    thd->cuted_fields );
unknown's avatar
unknown committed
1120 1121
	  net->pkt_nr = thd->net.pkt_nr;
	}
1122
      }
unknown's avatar
unknown committed
1123
      else
1124
      {
unknown's avatar
unknown committed
1125 1126
	// we will just ask the master to send us /dev/null if we do not
	// want to load the data :-)
unknown's avatar
unknown committed
1127
	skip_load_data_infile(net);
1128
      }
unknown's avatar
unknown committed
1129
	    
1130 1131 1132 1133 1134 1135 1136 1137
      thd->net.vio = 0; 
      thd->db = 0;// prevent db from being freed
      close_thread_tables(thd);
      if(thd->query_error)
      {
	int sql_error = thd->net.last_errno;
	if(!sql_error)
	  sql_error = ER_UNKNOWN_ERROR;
unknown's avatar
unknown committed
1138
		
unknown's avatar
unknown committed
1139
	sql_print_error("Slave: Error '%s' running load data infile ",
1140 1141
			ER(sql_error));
	delete ev;
1142
        free_root(&thd->mem_root,0);
1143 1144
	return 1;
      }
1145
      
1146
      delete ev;
1147
      free_root(&thd->mem_root,0);
unknown's avatar
unknown committed
1148
	    
1149 1150 1151 1152 1153 1154
      if(thd->fatal_error)
      {
	sql_print_error("Slave: Fatal error running query '%s' ",
			thd->query);
	return 1;
      }
unknown's avatar
unknown committed
1155

1156
      mi->inc_pos(event_len);
unknown's avatar
unknown committed
1157

unknown's avatar
unknown committed
1158
      if (!(thd->options & OPTION_BEGIN))
unknown's avatar
unknown committed
1159 1160
        flush_master_info(mi);

1161 1162 1163
      break;
    }

unknown's avatar
unknown committed
1164 1165 1166
    /* Question: in a START or STOP event, what happens if we have transaction
    open? */

1167 1168 1169
    case START_EVENT:
      mi->inc_pos(event_len);
      flush_master_info(mi);
1170
      delete ev;
1171
      break;
unknown's avatar
unknown committed
1172
                  
1173
    case STOP_EVENT:
1174 1175 1176 1177 1178 1179
      if(mi->pos > 4) // stop event should be ignored after rotate event
	{
          close_temporary_tables(thd);
          mi->inc_pos(event_len);
          flush_master_info(mi);
	}
1180
      delete ev;
1181 1182 1183 1184 1185
      break;
    case ROTATE_EVENT:
    {
      Rotate_log_event* rev = (Rotate_log_event*)ev;
      int ident_len = rev->ident_len;
unknown's avatar
unknown committed
1186
      pthread_mutex_lock(&mi->lock);
1187 1188
      memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
      mi->log_file_name[ident_len] = 0;
unknown's avatar
unknown committed
1189
      mi->pos = 4; // skip magic number
unknown's avatar
unknown committed
1190 1191
      pthread_cond_broadcast(&mi->cond);
      pthread_mutex_unlock(&mi->lock);
unknown's avatar
unknown committed
1192

unknown's avatar
unknown committed
1193
      if (!(thd->options & OPTION_BEGIN))
unknown's avatar
unknown committed
1194
        flush_master_info(mi);
unknown's avatar
unknown committed
1195 1196 1197 1198
#ifndef DBUG_OFF
      if(abort_slave_event_count)
	++events_till_abort;
#endif      
1199
      delete ev;
1200 1201
      break;
    }
unknown's avatar
unknown committed
1202

1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214
    case INTVAR_EVENT:
    {
      Intvar_log_event* iev = (Intvar_log_event*)ev;
      switch(iev->type)
      {
      case LAST_INSERT_ID_EVENT:
	thd->last_insert_id_used = 1;
	thd->last_insert_id = iev->val;
	break;
      case INSERT_ID_EVENT:
	thd->next_insert_id = iev->val;
	break;
unknown's avatar
unknown committed
1215
		
1216
      }
unknown's avatar
unknown committed
1217
      mi->inc_pending(event_len);
1218
      delete ev;
1219 1220
      break;
    }
unknown's avatar
unknown committed
1221
    }
1222
  }
unknown's avatar
unknown committed
1223
  else
1224
  {
unknown's avatar
unknown committed
1225 1226 1227 1228
    sql_print_error("\
Could not parse log event entry, check the master for binlog corruption\n\
This may also be a network problem, or just a bug in the master or slave code.\
");
1229 1230 1231
    return 1;
  }
  return 0;	  
unknown's avatar
unknown committed
1232 1233 1234 1235 1236 1237
}
      
// slave thread

pthread_handler_decl(handle_slave,arg __attribute__((unused)))
{
unknown's avatar
unknown committed
1238 1239 1240
#ifndef DBUG_OFF
 slave_begin:  
#endif  
1241
  THD *thd; // needs to be first for thread_stack
unknown's avatar
unknown committed
1242
  MYSQL *mysql = NULL ;
unknown's avatar
unknown committed
1243
  char llbuff[22];
unknown's avatar
unknown committed
1244

1245
  pthread_mutex_lock(&LOCK_slave);
unknown's avatar
unknown committed
1246 1247 1248 1249 1250 1251 1252
  if (!server_id)
  {
    pthread_cond_broadcast(&COND_slave_start);
    pthread_mutex_unlock(&LOCK_slave);
    sql_print_error("Server id not set, will not start slave");
    pthread_exit((void*)1);
  }
1253
  
unknown's avatar
unknown committed
1254 1255
  if(slave_running)
    {
1256
      pthread_cond_broadcast(&COND_slave_start);
unknown's avatar
unknown committed
1257
      pthread_mutex_unlock(&LOCK_slave);
1258
      pthread_exit((void*)1);  // safety just in case
unknown's avatar
unknown committed
1259 1260 1261
    }
  slave_running = 1;
  abort_slave = 0;
1262 1263 1264
#ifndef DBUG_OFF  
  events_till_abort = abort_slave_event_count;
#endif  
unknown's avatar
unknown committed
1265 1266
  pthread_cond_broadcast(&COND_slave_start);
  pthread_mutex_unlock(&LOCK_slave);
unknown's avatar
unknown committed
1267
  
unknown's avatar
unknown committed
1268
  // int error = 1;
unknown's avatar
unknown committed
1269
  bool retried_once = 0;
unknown's avatar
unknown committed
1270
  ulonglong last_failed_pos = 0;
unknown's avatar
unknown committed
1271
  
unknown's avatar
unknown committed
1272 1273
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
1274
  slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
1275
  thd->set_time();
unknown's avatar
unknown committed
1276 1277 1278
  DBUG_ENTER("handle_slave");

  pthread_detach_this_thread();
unknown's avatar
unknown committed
1279
  if (init_slave_thread(thd) || init_master_info(&glob_mi))
unknown's avatar
unknown committed
1280 1281 1282 1283
    {
      sql_print_error("Failed during slave thread initialization");
      goto err;
    }
unknown's avatar
unknown committed
1284
  thd->thread_stack = (char*)&thd; // remember where our stack is
1285
  thd->temporary_tables = save_temporary_tables; // restore temp tables
unknown's avatar
unknown committed
1286
  threads.append(thd);
1287 1288
  glob_mi.pending = 0;  //this should always be set to 0 when the slave thread
  // is started
unknown's avatar
unknown committed
1289
  
unknown's avatar
unknown committed
1290 1291
  DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
		     glob_mi.log_file_name, llstr(glob_mi.pos,llbuff)));
unknown's avatar
unknown committed
1292

unknown's avatar
unknown committed
1293 1294 1295 1296 1297 1298
  
  if (!(mysql = mc_mysql_init(NULL)))
  {
    sql_print_error("Slave thread: error in mc_mysql_init()");
    goto err;
  }
unknown's avatar
unknown committed
1299
  
unknown's avatar
unknown committed
1300
  mysql->net = thd->net;
unknown's avatar
unknown committed
1301
  thd->proc_info = "connecting to master";
unknown's avatar
unknown committed
1302 1303
#ifndef DBUG_OFF  
  sql_print_error("Slave thread initialized");
1304 1305
#endif
  // we can get killed during safe_connect
unknown's avatar
unknown committed
1306
  if (!safe_connect(thd, mysql, &glob_mi))
1307
   sql_print_error("Slave: connected to master '%s@%s:%d',\
unknown's avatar
unknown committed
1308 1309 1310 1311
  replication started in log '%s' at position %s", glob_mi.user,
		   glob_mi.host, glob_mi.port,
		   RPL_LOG_NAME,
		   llstr(glob_mi.pos,llbuff));
1312
  else
unknown's avatar
unknown committed
1313 1314 1315 1316
  {
    sql_print_error("Slave thread killed while connecting to master");
    goto err;
  }
unknown's avatar
unknown committed
1317
  
1318 1319
connected:
  
unknown's avatar
unknown committed
1320 1321
  while (!slave_killed(thd))
  {
unknown's avatar
unknown committed
1322
      thd->proc_info = "Requesting binlog dump";
unknown's avatar
unknown committed
1323 1324 1325 1326
      if(request_dump(mysql, &glob_mi))
	{
	  sql_print_error("Failed on request_dump()");
	  if(slave_killed(thd))
unknown's avatar
unknown committed
1327
	    {
unknown's avatar
unknown committed
1328 1329 1330 1331
	      sql_print_error("Slave thread killed while requesting master \
dump");
              goto err;
	    }
unknown's avatar
unknown committed
1332
	  
unknown's avatar
unknown committed
1333
	  thd->proc_info = "Waiiting to reconnect after a failed dump request";
unknown's avatar
unknown committed
1334
	  if(mysql->net.vio)
unknown's avatar
unknown committed
1335 1336 1337 1338 1339 1340 1341 1342 1343
	    vio_close(mysql->net.vio);
	  // first time retry immediately, assuming that we can recover
	  // right away - if first time fails, sleep between re-tries
	  // hopefuly the admin can fix the problem sometime
	  if(retried_once)
	    safe_sleep(thd, glob_mi.connect_retry);
	  else
	    retried_once = 1;
	  
unknown's avatar
unknown committed
1344
	  if(slave_killed(thd))
unknown's avatar
unknown committed
1345 1346 1347
	    {
	      sql_print_error("Slave thread killed while retrying master \
dump");
unknown's avatar
unknown committed
1348
	      goto err;
unknown's avatar
unknown committed
1349
	    }
unknown's avatar
unknown committed
1350

unknown's avatar
unknown committed
1351
	  thd->proc_info = "Reconnecting after a failed dump request";
unknown's avatar
unknown committed
1352
	  last_failed_pos=glob_mi.pos;
unknown's avatar
unknown committed
1353
          sql_print_error("Slave: failed dump request, reconnecting to \
unknown's avatar
unknown committed
1354 1355
try again, log '%s' at postion %s", RPL_LOG_NAME,
			  llstr(last_failed_pos,llbuff));
1356
	  if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
unknown's avatar
unknown committed
1357 1358
	    {
	      sql_print_error("Slave thread killed during or after reconnect");
unknown's avatar
unknown committed
1359
	      goto err;
unknown's avatar
unknown committed
1360
	    }
unknown's avatar
unknown committed
1361

1362
	  goto connected;
unknown's avatar
unknown committed
1363 1364 1365 1366
	}

      while(!slave_killed(thd))
	{
unknown's avatar
unknown committed
1367
	  thd->proc_info = "Reading master update";
unknown's avatar
unknown committed
1368 1369
	  uint event_len = read_event(mysql, &glob_mi);
	  if(slave_killed(thd))
unknown's avatar
unknown committed
1370 1371 1372 1373
	    {
	      sql_print_error("Slave thread killed while reading event");
	      goto err;
	    }
1374
	  	  
unknown's avatar
unknown committed
1375 1376
	  if (event_len == packet_error)
	  {
1377 1378 1379 1380 1381 1382 1383 1384 1385
	    if(mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE)
	      {
		sql_print_error("Log entry on master is longer than \
max_allowed_packet on slave. Slave thread will be aborted. If the entry is \
really supposed to be that long, restart the server with a higher value of \
max_allowed_packet. The current value is %ld", max_allowed_packet);
		goto err;
	      }
	    
unknown's avatar
unknown committed
1386
	    thd->proc_info = "Waiting to reconnect after a failed read";
unknown's avatar
unknown committed
1387
	    if(mysql->net.vio)
unknown's avatar
unknown committed
1388 1389 1390 1391 1392 1393
 	      vio_close(mysql->net.vio);
	    if(retried_once) // punish repeat offender with sleep
	      safe_sleep(thd, glob_mi.connect_retry);
	    else
	      retried_once = 1; 
	    
unknown's avatar
unknown committed
1394
	    if(slave_killed(thd))
unknown's avatar
unknown committed
1395 1396 1397 1398 1399
	      {
		sql_print_error("Slave thread killed while waiting to \
reconnect after a failed read");
	        goto err;
	      }
unknown's avatar
unknown committed
1400
	    thd->proc_info = "Reconnecting after a failed read";
unknown's avatar
unknown committed
1401
	    last_failed_pos= glob_mi.pos;
unknown's avatar
unknown committed
1402
	    sql_print_error("Slave: Failed reading log event, \
unknown's avatar
unknown committed
1403 1404
reconnecting to retry, log '%s' position %s", RPL_LOG_NAME,
			    llstr(last_failed_pos, llbuff));
1405
	    if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
unknown's avatar
unknown committed
1406 1407 1408 1409 1410
	      {
		sql_print_error("Slave thread killed during or after a \
reconnect done to recover from failed read");
	        goto err;
	      }
1411 1412 1413
	    
	    goto connected;
	  } // if(event_len == packet_error)
unknown's avatar
unknown committed
1414
	  
unknown's avatar
unknown committed
1415
	  thd->proc_info = "Processing master log event"; 
unknown's avatar
unknown committed
1416 1417
	  if(exec_event(thd, &mysql->net, &glob_mi, event_len))
	    {
1418 1419 1420
	      sql_print_error("\
Error running query, slave aborted. Fix the problem, and re-start \
the slave thread with \"mysqladmin start-slave\". We stopped at log \
unknown's avatar
unknown committed
1421 1422
'%s' position %s",
			      RPL_LOG_NAME, llstr(glob_mi.pos, llbuff));
unknown's avatar
unknown committed
1423 1424
	      goto err;
	      // there was an error running the query
unknown's avatar
unknown committed
1425 1426 1427
	      // abort the slave thread, when the problem is fixed, the user
	      // should restart the slave with mysqladmin start-slave
	    }
1428 1429 1430 1431 1432 1433 1434
#ifndef DBUG_OFF
	  if(abort_slave_event_count && !--events_till_abort)
	    {
	      sql_print_error("Slave: debugging abort");
	      goto err;
	    }
#endif	  
unknown's avatar
unknown committed
1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447
	  
	  // successful exec with offset advance,
	  // the slave repents and his sins are forgiven!
	  if(glob_mi.pos > last_failed_pos)
	    {
	     retried_once = 0;
#ifndef DBUG_OFF
	     stuck_count = 0;
#endif
	    }
#ifndef DBUG_OFF
	  else
	    {
unknown's avatar
unknown committed
1448 1449
	      // show a little mercy, allow slave to read one more event
	      // before cutting him off - otherwise he gets stuck
1450
	      // on Intvar events, since they do not advance the offset
unknown's avatar
unknown committed
1451 1452
	      // immediately
	      if (++stuck_count > 2)
unknown's avatar
unknown committed
1453 1454 1455
	        events_till_disconnect++;
	    }
#endif	  
1456 1457
	} // while(!slave_killed(thd)) - read/exec loop
  } // while(!slave_killed(thd)) - slave loop
unknown's avatar
unknown committed
1458

unknown's avatar
unknown committed
1459
  // error = 0;
unknown's avatar
unknown committed
1460
 err:
unknown's avatar
unknown committed
1461 1462
  // print the current replication position 
  sql_print_error("Slave thread exiting, replication stopped in log '%s' at \
unknown's avatar
unknown committed
1463 1464
position %s",
		  RPL_LOG_NAME, llstr(glob_mi.pos,llbuff));
unknown's avatar
unknown committed
1465 1466
  thd->query = thd->db = 0; // extra safety
  if(mysql)
unknown's avatar
unknown committed
1467
      mc_mysql_close(mysql);
unknown's avatar
unknown committed
1468
  thd->proc_info = "Waiting for slave mutex on exit";
unknown's avatar
unknown committed
1469 1470 1471
  pthread_mutex_lock(&LOCK_slave);
  slave_running = 0;
  abort_slave = 0;
1472 1473
  save_temporary_tables = thd->temporary_tables;
  thd->temporary_tables = 0; // remove tempation from destructor to close them
unknown's avatar
unknown committed
1474 1475
  pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done
  pthread_mutex_unlock(&LOCK_slave);
1476
  net_end(&thd->net); // destructor will not free it, because we are weird
1477
  slave_thd = 0;
unknown's avatar
unknown committed
1478 1479
  delete thd;
  my_thread_end();
unknown's avatar
unknown committed
1480 1481 1482 1483
#ifndef DBUG_OFF
  if(abort_slave_event_count && !events_till_abort)
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
1484 1485 1486 1487
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}

unknown's avatar
unknown committed
1488 1489 1490

/* try to connect until successful or slave killed */

1491
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
unknown's avatar
unknown committed
1492
{
1493
  int slave_was_killed;
unknown's avatar
unknown committed
1494 1495 1496
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif  
1497
  while(!(slave_was_killed = slave_killed(thd)) &&
unknown's avatar
unknown committed
1498 1499 1500
	!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
			  mi->port, 0, 0))
  {
1501
    sql_print_error("Slave thread: error connecting to master: %s (%d),\
unknown's avatar
unknown committed
1502
 retry in %d sec", mc_mysql_error(mysql), errno, mi->connect_retry);
unknown's avatar
unknown committed
1503 1504 1505
    safe_sleep(thd, mi->connect_retry);
  }
  
1506
  if(!slave_was_killed)
1507 1508
    {
      mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
1509
		  mi->user, mi->host, mi->port);
1510 1511
#ifdef SIGNAL_WITH_VIO_CLOSE
      thd->set_active_vio(mysql->net.vio);
1512 1513
#endif      
    }
unknown's avatar
unknown committed
1514
  
1515
  return slave_was_killed;
unknown's avatar
unknown committed
1516 1517
}

1518 1519 1520 1521
/*
  Try to connect until successful or slave killed or we have retried
  master_retry_count times
*/
unknown's avatar
unknown committed
1522

1523
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
unknown's avatar
unknown committed
1524
{
1525
  int slave_was_killed;
1526 1527
  int last_errno= -2;				// impossible error
  ulong err_count=0;
unknown's avatar
unknown committed
1528 1529
  char llbuff[22];

1530 1531 1532 1533
  /*
    If we lost connection after reading a state set event
    we will be re-reading it, so pending needs to be cleared
  */
unknown's avatar
unknown committed
1534
  mi->pending = 0;
unknown's avatar
unknown committed
1535 1536 1537
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif
1538
  while (!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql))
unknown's avatar
unknown committed
1539
  {
1540 1541 1542 1543
    /* Don't repeat last error */
    if (mc_mysql_errno(mysql) != last_errno)
    {
      sql_print_error("Slave thread: error re-connecting to master: \
1544
%s, last_errno=%d, retry in %d sec",
1545 1546 1547
		      mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
		      mi->connect_retry);
    }
unknown's avatar
unknown committed
1548 1549 1550
    safe_sleep(thd, mi->connect_retry);
    /* if master_retry_count is not set, keep trying until success */
    if (master_retry_count && err_count++ == master_retry_count)
1551 1552 1553 1554
    {
      slave_was_killed=1;
      break;
    }
unknown's avatar
unknown committed
1555
  }
1556

1557 1558 1559
  if (!slave_was_killed)
  {
    sql_print_error("Slave: reconnected to master '%s@%s:%d',\
unknown's avatar
unknown committed
1560 1561 1562 1563
replication resumed in log '%s' at position %s", glob_mi.user,
		    glob_mi.host, glob_mi.port,
		    RPL_LOG_NAME,
		    llstr(glob_mi.pos,llbuff));
1564
#ifdef SIGNAL_WITH_VIO_CLOSE
1565
    thd->set_active_vio(mysql->net.vio);
1566
#endif      
1567
  }
unknown's avatar
unknown committed
1568

1569
  return slave_was_killed;
unknown's avatar
unknown committed
1570 1571 1572 1573
}

#ifdef __GNUC__
template class I_List_iterator<i_string>;
unknown's avatar
unknown committed
1574
template class I_List_iterator<i_string_pair>;
unknown's avatar
unknown committed
1575
#endif