/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
   
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#include "mysql_priv.h"
#include <mysql.h>
#include "mini_client.h"
#include "slave.h"
#include <thr_alarm.h>
#include <my_dir.h>

#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\
 "FIRST")

bool slave_running = 0;
pthread_t slave_real_id;
MASTER_INFO glob_mi;
HASH replicate_do_table, replicate_ignore_table;
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
bool do_table_inited = 0, ignore_table_inited = 0;
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
bool table_rules_on = 0;

// when slave thread exits, we need to remember the temporary tables so we
// can re-use them on slave start
static TABLE* save_temporary_tables = 0;
#ifndef DBUG_OFF
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
static int events_till_disconnect = -1, events_till_abort = -1;
static int stuck_count = 0;
#endif


static inline void skip_load_data_infile(NET* net);
static inline bool slave_killed(THD* thd);
static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, char* db, char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name);
static inline char* rewrite_db(char* db);
static void free_table_ent(TABLE_RULE_ENT* e)
{
  my_free((gptr) e, MYF(0));
}

static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
			   my_bool not_used __attribute__((unused)))
{
  *len = e->key_len;
  return (byte*)e->db;
}


void init_table_rule_hash(HASH* h, bool* h_inited)
{
  hash_init(h, TABLE_RULE_HASH_SIZE,0,0,
	    (hash_get_key) get_table_key,
	    (void (*)(void*)) free_table_ent, 0);
  *h_inited = 1;
}

void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
{
  init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
		     TABLE_RULE_ARR_SIZE);
  *a_inited = 1;
}

static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
  uint i;
  const char* key_end = key + len;
  
  for(i = 0; i < a->elements; i++)
    {
      TABLE_RULE_ENT* e ;
      get_dynamic(a, (gptr)&e, i);
      if(!wild_case_compare(key, key_end, (const char*)e->db,
			    (const char*)(e->db + e->key_len),'\\'))
	return e;
    }
  
  return 0;
}

int tables_ok(THD* thd, TABLE_LIST* tables)
{
  for(; tables; tables = tables->next)
    {
      if(!tables->updating) 
	continue;
      char hash_key[2*NAME_LEN+2];
      char* p;
      p = strmov(hash_key, tables->db ? tables->db : thd->db);
      *p++ = '.';
      uint len = strmov(p, tables->real_name) - hash_key ;
      if(do_table_inited) // if there are any do's
	{
	  if(hash_search(&replicate_do_table, (byte*) hash_key, len))
	    return 1;
	}
      if(ignore_table_inited) // if there are any do's
	{
	  if(hash_search(&replicate_ignore_table, (byte*) hash_key, len))
	    return 0; 
	}
      if(wild_do_table_inited && find_wild(&replicate_wild_do_table,
					   hash_key, len)) return 1;
      if(wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
					   hash_key, len)) return 0;
    }

  return !do_table_inited && !wild_do_table_inited;
  // if no explicit rule found
  // and there was a do list, do not replicate. If there was
  // no do list, go ahead
}


int add_table_rule(HASH* h, const char* table_spec)
{
  const char* dot = strchr(table_spec, '.');
  if(!dot) return 1;
  uint len = (uint)strlen(table_spec);
  if(!len) return 1;
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
  if(!e) return 1;
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  (void)hash_insert(h, (byte*)e);
  return 0;
}

int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
  const char* dot = strchr(table_spec, '.');
  if(!dot) return 1;
  uint len = (uint)strlen(table_spec);
  if(!len) return 1;
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
  if(!e) return 1;
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  insert_dynamic(a, (gptr)&e);
  return 0;
}

static void free_string_array(DYNAMIC_ARRAY *a)
{
  uint i;
  for(i = 0; i < a->elements; i++)
    {
      char* p;
      get_dynamic(a, (gptr)&p, i);
      my_free(p, MYF(MY_WME));
    }
  delete_dynamic(a);
}

void end_slave()
{
  end_master_info(&glob_mi);
  if(do_table_inited)
    hash_free(&replicate_do_table);
  if(ignore_table_inited)
    hash_free(&replicate_ignore_table);
  if(wild_do_table_inited)
    free_string_array(&replicate_wild_do_table);
  if(wild_ignore_table_inited)
    free_string_array(&replicate_wild_ignore_table);
}

static inline bool slave_killed(THD* thd)
{
  return abort_slave || abort_loop || thd->killed;
}

static inline void skip_load_data_infile(NET* net)
{
  (void)my_net_write(net, "\xfb/dev/null", 10);
  (void)net_flush(net);
  (void)my_net_read(net); // discard response
  send_ok(net); // the master expects it
}

static inline char* rewrite_db(char* db)
{
  if(replicate_rewrite_db.is_empty() || !db) return db;
  I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  i_string_pair* tmp;

  while((tmp=it++))
    {
      if(!strcmp(tmp->key, db))
	return tmp->val;
    }

  return db;
}

int db_ok(const char* db, I_List<i_string> &do_list,
	  I_List<i_string> &ignore_list )
{
  if(do_list.is_empty() && ignore_list.is_empty())
    return 1; // ok to replicate if the user puts no constraints

  if(!db)
    return 0; // if the user has specified restrictions on which databases to replicate
  // and db was not selected, do not replicate

  if(!do_list.is_empty()) // if the do's are not empty
    {
      I_List_iterator<i_string> it(do_list);
      i_string* tmp;

      while((tmp=it++))
	{
	  if(!strcmp(tmp->ptr, db))
	    return 1; // match
	}
      return 0;
    }
  else // there are some elements in the don't, otherwise we cannot get here
    {
      I_List_iterator<i_string> it(ignore_list);
      i_string* tmp;

      while((tmp=it++))
	{
	  if(!strcmp(tmp->ptr, db))
	    return 0; // match
	}
      
      return 1;
      
    }
}

static void init_strvar_from_file(char* var, int max_size, IO_CACHE* f,
			       char* default_val)
{

  if(my_b_gets(f,var, max_size)) 
    {
      char* last_p = strend(var) - 1;
      int c;
      if(*last_p == '\n') *last_p = 0; // if we stopped on newline, kill it
      else
	while( ((c=my_b_get(f)) != '\n' && c != my_b_EOF));
      // if we truncated a line or stopped on last char, remove all chars
      // up to and including newline
    }
  else if(default_val)
   strmake(var,  default_val, max_size);
}

static void init_intvar_from_file(int* var, IO_CACHE* f,
			       int default_val)
{
  char buf[32];
  
  if(my_b_gets(f, buf, sizeof(buf))) 
    {
      *var = atoi(buf);
    }
  else if(default_val)
   *var = default_val;
}


static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name)
{
  uint packet_len = my_net_read(net); // read create table statement
  TABLE_LIST tables;
  int error = 0;
  
  if(packet_len == packet_error)
    {
      send_error(&thd->net, ER_MASTER_NET_READ);
      return 1;
    }
  if(net->read_pos[0] == 255) // error from master
    {
      net->read_pos[packet_len] = 0;
      net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
      return 1;
    }
  thd->command = COM_TABLE_DUMP;
  thd->query = sql_alloc(packet_len + 1);
  if(!thd->query)
    {
      sql_print_error("create_table_from_dump: out of memory");
      net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
      return 1;
    }
  memcpy(thd->query, net->read_pos, packet_len);
  thd->query[packet_len] = 0;
  thd->current_tablenr = 0;
  thd->query_error = 0;
  thd->net.no_send_ok = 1;
  thd->proc_info = "Creating table from master dump";
  char* save_db = thd->db;
  thd->db = thd->last_nx_db; // in case we are creating in a different
  // database
  mysql_parse(thd, thd->query, packet_len); // run create table
  thd->db = save_db; // leave things the way the were before
  
  if(thd->query_error)
    {
      close_thread_tables(thd); // mysql_parse takes care of the error send
      return 1;
    }

  bzero((char*) &tables,sizeof(tables));
  tables.db = (char*)db;
  tables.name = tables.real_name = (char*)table_name;
  tables.lock_type = TL_WRITE;
  thd->proc_info = "Opening master dump table";
  if(!open_ltable(thd, &tables, TL_WRITE))
    {
      // open tables will send the error
      sql_print_error("create_table_from_dump: could not open created table");
      close_thread_tables(thd);
      return 1;
    }
  
  handler *file = tables.table->file;
  thd->proc_info = "Reading master dump table data";
  if(file->net_read_dump(net))
    {
      net_printf(&thd->net, ER_MASTER_NET_READ);
      sql_print_error("create_table_from_dump::failed in\
 handler::net_read_dump()");
      close_thread_tables(thd);
      return 1;
    }

  HA_CHECK_OPT check_opt;
  check_opt.init();
  check_opt.quick = 1;
  thd->proc_info = "rebuilding the index on master dump table";
  Vio* save_vio = thd->net.vio;
  thd->net.vio = 0; // we do not want repair() to spam us with messages
  // just send them to the error log, and report the failure in case of
  // problems
  if(file->repair(thd,&check_opt ))
    {
      net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name );
      error = 1;
    }
  thd->net.vio = save_vio;
  close_thread_tables(thd);
  
  thd->net.no_send_ok = 0;
  return error; 
}

int fetch_nx_table(THD* thd, MASTER_INFO* mi)
{
  MYSQL* mysql = mc_mysql_init(NULL);
  int error = 1;
  int nx_errno = 0;
  if(!mysql)
    {
      sql_print_error("fetch_nx_table: Error in mysql_init()");
      nx_errno = ER_GET_ERRNO;
      goto err;
    }

  safe_connect(thd, mysql, mi);
  if(slave_killed(thd))
    goto err;

  if(request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table))
    {
      nx_errno = ER_GET_ERRNO;
      sql_print_error("fetch_nx_table: failed on table dump request ");
      goto err;
    }

  if(create_table_from_dump(thd, &mysql->net, thd->last_nx_db,
			    thd->last_nx_table))
    {
      // create_table_from_dump will have sent the error alread
      sql_print_error("fetch_nx_table: failed on create table ");
      goto err;
    }
  
  error = 0;
 err:
  if(mysql)
    {
     mc_mysql_close(mysql);
     mysql = 0;
    }
  if(nx_errno && thd->net.vio)
    send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
  
  return error;
}

void end_master_info(MASTER_INFO* mi)
{
  if(mi->fd >= 0)
    {
      end_io_cache(&mi->file);
      (void)my_close(mi->fd, MYF(MY_WME));
      mi->fd = -1;
    }
  mi->inited = 0;
}

int init_master_info(MASTER_INFO* mi)
{
  if(mi->inited)
    return 0;
  int fd;
  MY_STAT stat_area;
  char fname[FN_REFLEN+128];
  fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
  

  // we need a mutex while we are changing master info parameters to
  // keep other threads from reading bogus info

  pthread_mutex_lock(&mi->lock);
  mi->pending = 0;
  fd = mi->fd;
  
  if(!my_stat(fname, &stat_area, MYF(0))) // we do not want any messages
    // if the file does not exist
    {
      // if someone removed the file from underneath our feet, just close
      // the old descriptor and re-create the old file
      if(fd >= 0) my_close(fd, MYF(MY_WME));
      if((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0
	  || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
			   MYF(MY_WME)))
	{
	  pthread_mutex_unlock(&mi->lock);
	  return 1;
	}
      mi->log_file_name[0] = 0;
      mi->pos = 4; // skip magic number
      mi->fd = fd;
      
      if(master_host)
        strmake(mi->host, master_host, sizeof(mi->host) - 1);
      if(master_user)
        strmake(mi->user, master_user, sizeof(mi->user) - 1);
      if(master_password)
        strmake(mi->password, master_password, sizeof(mi->password) - 1);
      mi->port = master_port;
      mi->connect_retry = master_connect_retry;
      
    }
  else // file exists
    {
      if(fd >= 0)
	reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
      else if((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
	  || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
			   0, MYF(MY_WME)))
	{
	  pthread_mutex_unlock(&mi->lock);
	  return 1;
	}
      
      if(!my_b_gets(&mi->file, mi->log_file_name, sizeof(mi->log_file_name)))
	{
	  sql_print_error("Error reading log file name from master info file ");
	  pthread_mutex_unlock(&mi->lock);
          return 1;
	}

      *(strend(mi->log_file_name) - 1) = 0; // kill \n
      char buf[FN_REFLEN];
      if(!my_b_gets(&mi->file, buf, sizeof(buf)))
	{
	  sql_print_error("Error reading log file position from master info file");
	  pthread_mutex_unlock(&mi->lock);
	  return 1;
	}

      mi->pos = atoi(buf);
      mi->fd = fd;
      init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
			    master_host);
      init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
			    master_user); 
      init_strvar_from_file(mi->password, sizeof(mi->password), &mi->file,
			 master_password);
      
      init_intvar_from_file((int*)&mi->port, &mi->file, master_port);	
      init_intvar_from_file((int*)&mi->connect_retry, &mi->file,
			    master_connect_retry);
      
    }
  
  mi->inited = 1;
  // now change the cache from READ to WRITE - must do this
  // before flush_master_info
  reinit_io_cache(&mi->file, WRITE_CACHE, 0L,0,1);
  if(flush_master_info(mi))
    {
      pthread_mutex_unlock(&mi->lock);
      return 1;
    }
  pthread_mutex_unlock(&mi->lock);
  
  return 0;
}

int show_master_info(THD* thd)
{
  DBUG_ENTER("show_master_info");
  List<Item> field_list;
  field_list.push_back(new Item_empty_string("Master_Host",
						     sizeof(glob_mi.host)));
  field_list.push_back(new Item_empty_string("Master_User",
						     sizeof(glob_mi.user)));
  field_list.push_back(new Item_empty_string("Master_Port", 6));
  field_list.push_back(new Item_empty_string("Connect_retry", 6));
  field_list.push_back( new Item_empty_string("Log_File",
						     FN_REFLEN));
  field_list.push_back(new Item_empty_string("Pos", 12));
  field_list.push_back(new Item_empty_string("Slave_Running", 3));
  field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
  field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
  if(send_fields(thd, field_list, 1))
    DBUG_RETURN(-1);

  String* packet = &thd->packet;
  packet->length(0);
  
  pthread_mutex_lock(&glob_mi.lock);
  net_store_data(packet, glob_mi.host);
  net_store_data(packet, glob_mi.user);
  net_store_data(packet, (uint32) glob_mi.port);
  net_store_data(packet, (uint32) glob_mi.connect_retry);
  net_store_data(packet, glob_mi.log_file_name);
  net_store_data(packet, (longlong)glob_mi.pos);
  pthread_mutex_unlock(&glob_mi.lock);
  pthread_mutex_lock(&LOCK_slave);
  net_store_data(packet, slave_running ? "Yes":"No");
  pthread_mutex_unlock(&LOCK_slave);
  net_store_data(packet, &replicate_do_db);
  net_store_data(packet, &replicate_ignore_db);
  
  if(my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
    DBUG_RETURN(-1);

  send_eof(&thd->net);
  DBUG_RETURN(0);
}

int flush_master_info(MASTER_INFO* mi)
{
  IO_CACHE* file = &mi->file;
  char lbuf[22];
  
  my_b_seek(file, 0L);
  my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
        mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user, mi->password,
	     mi->port, mi->connect_retry);
  flush_io_cache(file);
  return 0;
}


static int init_slave_thread(THD* thd)
{
  DBUG_ENTER("init_slave_thread");
  thd->system_thread = thd->bootstrap = 1;
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
  thd->max_packet_length=thd->net.max_packet;
  thd->master_access= ~0;
  thd->priv_user = 0;
  thd->slave_thread = 1;
  thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ;
  thd->system_thread = 1;
  thd->client_capabilities = CLIENT_LOCAL_FILES;
  slave_real_id=thd->real_id=pthread_self();
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

  if (init_thr_lock() ||
      my_pthread_setspecific_ptr(THR_THD,  thd) ||
      my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
      my_pthread_setspecific_ptr(THR_NET,  &thd->net))
  {
    close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
    end_thread(thd,0);
    DBUG_RETURN(-1);
  }

  thd->mysys_var=my_thread_var;
  thd->dbug_thread_id=my_thread_id();
#ifndef __WIN__
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

  thd->mem_root.free=thd->mem_root.used=0;	// Probably not needed
  if (thd->max_join_size == (ulong) ~0L)
    thd->options |= OPTION_BIG_SELECTS;

  thd->proc_info="Waiting for master update";
  thd->version=refresh_version;
  thd->set_time();

  DBUG_RETURN(0);
}

static int safe_sleep(THD* thd, int sec)
{
  thr_alarm_t alarmed;
  thr_alarm_init(&alarmed);
  time_t start_time= time((time_t*) 0);
  time_t end_time= start_time+sec;
  ALARM  alarm_buff;

  while (start_time < end_time)
  {
    int nap_time = (int) (end_time - start_time);
    thr_alarm(&alarmed, 2 * nap_time,&alarm_buff); // the only reason we are asking for alarm is so that
    // we will be woken up in case of murder, so if we do not get killed, set the alarm
    // so it goes off after we wake up naturally
    sleep(nap_time);
    if (thr_alarm_in_use(&alarmed)) // if we wake up before the alarm goes off, hit the button
      thr_end_alarm(&alarmed);     // so it will not wake up the wife and kids :-)
    
    if (slave_killed(thd))
      return 1;
    start_time=time((time_t*) 0);
  }
  return 0;
}


static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
{
  char buf[FN_REFLEN + 10];
  int len;
  int binlog_flags = 0; // for now
  char* logname = mi->log_file_name;
  int4store(buf, mi->pos);
  int2store(buf + 4, binlog_flags);
  int4store(buf + 6, server_id);
  len = (uint) strlen(logname);
  memcpy(buf + 10, logname,len);
  if(mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
	// something went wrong, so we will just reconnect and retry later
	// in the future, we should do a better error analysis, but for
	// now we just fill up the error log :-)
    {
      sql_print_error("Error on COM_BINLOG_DUMP: %s, will retry in %d secs",
			  mc_mysql_error(mysql), master_connect_retry);
      return 1;
    }

  return 0;
}

static int request_table_dump(MYSQL* mysql, char* db, char* table)
{
  char buf[1024];
  char * p = buf;
  uint table_len = (uint) strlen(table);
  uint db_len = (uint) strlen(db);
  if(table_len + db_len > sizeof(buf) - 2)
    {
      sql_print_error("request_table_dump: Buffer overrun");
      return 1;
    } 
  
  *p++ = db_len;
  memcpy(p, db, db_len);
  p += db_len;
  *p++ = table_len;
  memcpy(p, table, table_len);
  
  if(mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
    {
      sql_print_error("request_table_dump: Error sending the table dump \
command");
      return 1;
    }

  return 0;
}


static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
{
  uint len = packet_error;
  int read_errno = EINTR; // for convinience lets think we start by
  // being in the interrupted state :-)
  // my_real_read() will time us out
  // we check if we were told to die, and if not, try reading again
#ifndef DBUG_OFF
  if(disconnect_slave_event_count && !(events_till_disconnect--))
    return packet_error;      
#endif
  
  while (!abort_loop && !abort_slave && len == packet_error && read_errno == EINTR )
  {
    len = mc_net_safe_read(mysql);
    read_errno = errno;
  }
  if(abort_loop || abort_slave)
    return packet_error;
  if (len == packet_error || (int) len < 1)
  {
    sql_print_error("Error reading packet from server: %s (read_errno %d,\
server_errno=%d)",
		    mc_mysql_error(mysql), read_errno, mc_mysql_errno(mysql));
    return packet_error;
  }

  if(len == 1)
    {
     sql_print_error("Slave: received 0 length packet from server, apparent\
 master shutdown: %s (%d)",
		    mc_mysql_error(mysql), read_errno);
     return packet_error;
    }
  
  DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
		      len, mysql->net.read_pos[4]));

  return len - 1;   
}


static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{
  Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
					     event_len);
  
  if (ev)
  {
    int type_code = ev->get_type_code();
    if(ev->server_id == ::server_id)
      {
	if(type_code == LOAD_EVENT)
	  skip_load_data_infile(net);
	
	mi->inc_pos(event_len);
	flush_master_info(mi);
	delete ev;     
	return 0; // avoid infinite update loops
      }
  
    thd->server_id = ev->server_id; // use the original server id for logging
    thd->set_time(); // time the query
    ev->when = time(NULL);
    
    switch(type_code)
    {
    case QUERY_EVENT:
    {
      Query_log_event* qev = (Query_log_event*)ev;
      int q_len = qev->q_len;
      init_sql_alloc(&thd->mem_root, 8192,0);
      thd->db = rewrite_db((char*)qev->db);
      if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
      {
	thd->query = (char*)qev->query;
	thd->set_time((time_t)qev->when);
	thd->current_tablenr = 0;
	VOID(pthread_mutex_lock(&LOCK_thread_count));
	thd->query_id = query_id++;
	VOID(pthread_mutex_unlock(&LOCK_thread_count));
	thd->last_nx_table = thd->last_nx_db = 0;
	thd->query_error = 0; // clear error
	thd->net.last_errno = 0;
	thd->net.last_error[0] = 0;
	thd->slave_proxy_id = qev->thread_id; // for temp tables
	mysql_parse(thd, thd->query, q_len);
	int expected_error,actual_error;
	if((expected_error = qev->error_code) !=
	   (actual_error = thd->net.last_errno) && expected_error)
	  {
	    sql_print_error("Slave: did not get the expected error\
 running query from master - expected: '%s', got '%s'",
			    ER(expected_error),
			    actual_error ? ER(actual_error):"no error"
			    );
	    thd->query_error = 1;
	  }
	else if(expected_error == actual_error)
	  thd->query_error = 0;
      }
      thd->db = 0;// prevent db from being freed
      thd->query = 0; // just to be sure
      thd->convert_set = 0; // assume no convert for next query
      // unless set explictly
      close_thread_tables(thd);
      
      if (thd->query_error || thd->fatal_error)
      {
	sql_print_error("Slave:  error running query '%s' ",
			qev->query);
        free_root(&thd->mem_root,0);
	delete ev;
	return 1;
      }
      free_root(&thd->mem_root,0);
      delete ev;

      mi->inc_pos(event_len);
      flush_master_info(mi);
      break;
    }
	  
    case LOAD_EVENT:
    {
      Load_log_event* lev = (Load_log_event*)ev;
      init_sql_alloc(&thd->mem_root, 8192,0);
      thd->db = rewrite_db((char*)lev->db);
      thd->query = 0;
      thd->query_error = 0;
	    
      if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
      {
	thd->set_time((time_t)lev->when);
	thd->current_tablenr = 0;
	VOID(pthread_mutex_lock(&LOCK_thread_count));
	thd->query_id = query_id++;
	VOID(pthread_mutex_unlock(&LOCK_thread_count));

	enum enum_duplicates handle_dup = DUP_IGNORE;
	if(lev->sql_ex.opt_flags && REPLACE_FLAG)
	  handle_dup = DUP_REPLACE;
	sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags &&
			DUMPFILE_FLAG );
	String field_term(&lev->sql_ex.field_term, 1),
	  enclosed(&lev->sql_ex.enclosed, 1),
	  line_term(&lev->sql_ex.line_term,1),
	  escaped(&lev->sql_ex.escaped, 1),
	  line_start(&lev->sql_ex.line_start, 1);
	    
	ex.field_term = &field_term;
	if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
	  ex.field_term->length(0);
	    
	ex.enclosed = &enclosed;
	if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
	  ex.enclosed->length(0);

	ex.line_term = &line_term;
	if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
	  ex.line_term->length(0);

	ex.line_start = &line_start;
	if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
	  ex.line_start->length(0);

	ex.escaped = &escaped;
	if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
	  ex.escaped->length(0);

	ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
	if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
	  ex.field_term->length(0);
	    
	ex.skip_lines = lev->skip_lines;
	    
	TABLE_LIST tables;
	bzero((char*) &tables,sizeof(tables));
	tables.db = thd->db;
	tables.name = tables.real_name = (char*)lev->table_name;
	tables.lock_type = TL_WRITE;
	// the table will be opened in mysql_load    

	List<Item> fields;
	lev->set_fields(fields);
	thd->slave_proxy_id = thd->thread_id;
	thd->net.vio = net->vio;
	// mysql_load will use thd->net to read the file
	thd->net.pkt_nr = net->pkt_nr;
	// make sure the client does get confused
	// about the packet sequence
	if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
		      TL_WRITE))
	  thd->query_error = 1;
	if(thd->cuted_fields)
	  sql_print_error("Slave: load data infile at position %d in log \
'%s' produced %d warning(s)", glob_mi.pos, RPL_LOG_NAME, thd->cuted_fields );
	net->pkt_nr = thd->net.pkt_nr;
      }
      else // we will just ask the master to send us /dev/null if we do not want to
	// load the data :-)
      {
	skip_load_data_infile(net);
      }
	    
      thd->net.vio = 0; 
      thd->db = 0;// prevent db from being freed
      close_thread_tables(thd);
      if(thd->query_error)
      {
	int sql_error = thd->net.last_errno;
	if(!sql_error)
	  sql_error = ER_UNKNOWN_ERROR;
		
	sql_print_error("Slave:  error '%s' running load data infile ",
			ER(sql_error));
	delete ev;
        free_root(&thd->mem_root,0);
	return 1;
      }
      
      delete ev;
      free_root(&thd->mem_root,0);
	    
      if(thd->fatal_error)
      {
	sql_print_error("Slave: Fatal error running query '%s' ",
			thd->query);
	return 1;
      }

      mi->inc_pos(event_len);
      flush_master_info(mi);
      break;
    }

    case START_EVENT:
      close_temporary_tables(thd);
      mi->inc_pos(event_len);
      flush_master_info(mi);
      delete ev;
      break;
                  
    case STOP_EVENT:
      close_temporary_tables(thd);
      mi->inc_pos(event_len);
      flush_master_info(mi);
      delete ev;
      break;
    case ROTATE_EVENT:
    {
      Rotate_log_event* rev = (Rotate_log_event*)ev;
      int ident_len = rev->ident_len;
      memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
      mi->log_file_name[ident_len] = 0;
      mi->pos = 4; // skip magic number
      flush_master_info(mi);
      delete ev;
      break;
    }

    case INTVAR_EVENT:
    {
      Intvar_log_event* iev = (Intvar_log_event*)ev;
      switch(iev->type)
      {
      case LAST_INSERT_ID_EVENT:
	thd->last_insert_id_used = 1;
	thd->last_insert_id = iev->val;
	break;
      case INSERT_ID_EVENT:
	thd->next_insert_id = iev->val;
	break;
		
      }
      mi->inc_pending(event_len);
      delete ev;
      break;
    }
    }
                  
  }
  else
  {
    sql_print_error("Could not parse log event entry, check the master for binlog corruption\n\
 This may also be a network problem, or just a bug in the master or slave code");
    return 1;
  }
  return 0;	  
}
      
// slave thread

pthread_handler_decl(handle_slave,arg __attribute__((unused)))
{
  THD *thd; // needs to be first for thread_stack
  MYSQL *mysql = NULL ;

  pthread_mutex_lock(&LOCK_slave);
  if(!server_id)
    {
     pthread_cond_broadcast(&COND_slave_start);
     pthread_mutex_unlock(&LOCK_slave);
     sql_print_error("Server id not set, will not start slave");
     pthread_exit((void*)1);
    }
  
  if(slave_running)
    {
      pthread_cond_broadcast(&COND_slave_start);
      pthread_mutex_unlock(&LOCK_slave);
      pthread_exit((void*)1);  // safety just in case
    }
  slave_running = 1;
  abort_slave = 0;
#ifndef DBUG_OFF  
  events_till_abort = abort_slave_event_count;
#endif  
 pthread_cond_broadcast(&COND_slave_start);
 pthread_mutex_unlock(&LOCK_slave);
  
  int error = 1;
  bool retried_once = 0;
  ulonglong last_failed_pos = 0;
  
  my_thread_init(); // needs to be up here, otherwise we get a coredump
  // trying to use DBUG_ stuff
  thd = new THD; // note that contructor of THD uses DBUG_ !
  thd->set_time();
  DBUG_ENTER("handle_slave");

  pthread_detach_this_thread();
  if(init_slave_thread(thd) || init_master_info(&glob_mi))
    goto err;
  thd->thread_stack = (char*)&thd; // remember where our stack is
  thd->temporary_tables = save_temporary_tables; // restore temp tables
  threads.append(thd);
  
  DBUG_PRINT("info",("master info: log_file_name=%s, position=%d",
		     glob_mi.log_file_name, glob_mi.pos));

  mysql = mc_mysql_init(NULL);
  if(!mysql)
    {
      sql_print_error("Slave thread: error in mc_mysql_init()");
      goto err;
    }
  
  thd->proc_info = "connecting to master";
#ifndef DBUG_OFF  
  sql_print_error("Slave thread initialized");
#endif
  // we can get killed during safe_connect
  if(!safe_connect(thd, mysql, &glob_mi))
   sql_print_error("Slave: connected to master '%s@%s:%d',\
  replication started in log '%s' at position %ld", glob_mi.user,
		  glob_mi.host, glob_mi.port,
		  RPL_LOG_NAME,
		  glob_mi.pos);
  else
    goto err;
  
  while(!slave_killed(thd))
    {
      thd->proc_info = "requesting binlog dump";
      if(request_dump(mysql, &glob_mi))
	{
	  sql_print_error("Failed on request_dump()");
	  if(slave_killed(thd))
           goto err;
	  
	  thd->proc_info = "waiting to reconnect after a failed dump request";
	  if(mysql->net.vio)
	    vio_close(mysql->net.vio);
	  // first time retry immediately, assuming that we can recover
	  // right away - if first time fails, sleep between re-tries
	  // hopefuly the admin can fix the problem sometime
	  if(retried_once)
	    safe_sleep(thd, glob_mi.connect_retry);
	  else
	    retried_once = 1;
	  
	  if(slave_killed(thd))
	      goto err;

	  thd->proc_info = "reconnecting after a failed dump request";
          sql_print_error("Slave: failed dump request, reconnecting to \
try again, log '%s' at postion %ld", RPL_LOG_NAME,
			  last_failed_pos = glob_mi.pos );
	  if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
	      goto err;

	  continue;
	}


      while(!slave_killed(thd))
	{
	  thd->proc_info = "reading master update";
	  uint event_len = read_event(mysql, &glob_mi);
	  if(slave_killed(thd))
	    goto err;
	  
	  if (event_len == packet_error)
	  {
	    thd->proc_info = "waiting to reconnect after a failed read";
	    if(mysql->net.vio)
 	      vio_close(mysql->net.vio);
	    if(retried_once) // punish repeat offender with sleep
	      safe_sleep(thd, glob_mi.connect_retry);
	    else
	      retried_once = 1; 
	    
	    if(slave_killed(thd))
	      goto err;
	    thd->proc_info = "reconnecting after a failed read";
	    sql_print_error("Slave: Failed reading log event, \
reconnecting to retry, log '%s' position %ld", RPL_LOG_NAME,
			    last_failed_pos = glob_mi.pos);
	    if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
	      goto err;
	    break;
	  }
	  
	  thd->proc_info = "processing master log event"; 
	  if(exec_event(thd, &mysql->net, &glob_mi, event_len))
	    {
	      sql_print_error("Error running query, slave aborted. Fix the problem, and re-start\
 the slave thread with mysqladmin start-slave - log '%s' position %ld",
			      RPL_LOG_NAME, glob_mi.pos);
	      goto err;
	      // there was an error running the query
	      // abort the slave thread, when the problem is fixed, the user
	      // should restart the slave with mysqladmin start-slave
	    }
#ifndef DBUG_OFF
	  if(abort_slave_event_count && !--events_till_abort)
	    {
	      sql_print_error("Slave: debugging abort");
	      goto err;
	    }
#endif	  
	  
	  // successful exec with offset advance,
	  // the slave repents and his sins are forgiven!
	  if(glob_mi.pos > last_failed_pos)
	    {
	     retried_once = 0;
#ifndef DBUG_OFF
	     stuck_count = 0;
#endif
	    }
#ifndef DBUG_OFF
	  else
	    {
	      stuck_count++;
	    // show a little mercy, allow slave to read one more event
	       // before cutting him off - otherwise he gets stuck
	       // on Invar events, since they do not advance the offset
	       // immediately
	      if(stuck_count > 2)
	        events_till_disconnect++;
	    }
#endif	  

	}
    }

  error = 0;
 err:
  // print the current replication position 
  sql_print_error("Slave thread exiting, replication stopped in log '%s' at \
position %ld",
		  RPL_LOG_NAME, glob_mi.pos);
  thd->query = thd->db = 0; // extra safety
  if(mysql)
    {
      mc_mysql_close(mysql);
      mysql = 0;
    }
  thd->proc_info = "waiting for slave mutex on exit";
  pthread_mutex_lock(&LOCK_slave);
  slave_running = 0;
  abort_slave = 0;
  save_temporary_tables = thd->temporary_tables;
  thd->temporary_tables = 0; // remove tempation from destructor to close them
  pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done
  pthread_mutex_unlock(&LOCK_slave);
  net_end(&thd->net); // destructor will not free it, because we are weird
  delete thd;
  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}

static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
  // will try to connect until successful or slave killed
{
  int slave_was_killed;
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif  
  while(!(slave_was_killed = slave_killed(thd)) &&
	!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
			  mi->port, 0, 0))
  {
    sql_print_error("Slave thread: error connecting to master:%s(%d),\
 retry in %d sec", mc_mysql_error(mysql), errno, mi->connect_retry);
    safe_sleep(thd, mi->connect_retry);
  }
  
  if(!slave_was_killed)
   mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
		  mi->user, mi->host, mi->port);
  
  return slave_was_killed;
}

// will try to connect until successful or slave killed

static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
  int slave_was_killed;
  mi->pending = 0; // if we lost connection after reading a state set event
  // we will be re-reading it, so pending needs to be cleared
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif
  while(!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql))
  {
    sql_print_error("Slave thread: error re-connecting to master:\
%s, last_errno=%d, retry in %d sec",
		    mc_mysql_error(mysql), errno, mi->connect_retry);
     safe_sleep(thd, mi->connect_retry);
  }

  if(!slave_was_killed)
    sql_print_error("Slave: reconnected to master '%s@%s:%d',\
replication resumed in log '%s' at position %ld", glob_mi.user,
		  glob_mi.host, glob_mi.port,
		  RPL_LOG_NAME,
		  glob_mi.pos);
  
  return slave_was_killed;
}

#ifdef __GNUC__
template class I_List_iterator<i_string>;
template class I_List_iterator<i_string_pair>;
#endif