log_event.cc 67.6 KB
Newer Older
unknown's avatar
unknown committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
   
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#ifndef MYSQL_CLIENT
#ifdef __GNUC__
#pragma implementation				// gcc: Class implementation
#endif
#include  "mysql_priv.h"
23
#include "slave.h"
24
#include <my_dir.h>
unknown's avatar
unknown committed
25 26
#endif /* MYSQL_CLIENT */

27 28
#include <assert.h>

29
#ifdef MYSQL_CLIENT
30
static void pretty_print_str(FILE* file, char* str, int len)
unknown's avatar
unknown committed
31
{
32
  char* end = str + len;
unknown's avatar
unknown committed
33
  fputc('\'', file);
34 35
  while (str < end)
  {
unknown's avatar
unknown committed
36
    char c;
37 38 39 40 41 42 43 44 45 46 47 48
    switch ((c=*str++)) {
    case '\n': fprintf(file, "\\n"); break;
    case '\r': fprintf(file, "\\r"); break;
    case '\\': fprintf(file, "\\\\"); break;
    case '\b': fprintf(file, "\\b"); break;
    case '\t': fprintf(file, "\\t"); break;
    case '\'': fprintf(file, "\\'"); break;
    case 0   : fprintf(file, "\\0"); break;
    default:
      fputc(c, file);
      break;
    }
49 50
  }
  fputc('\'', file);
unknown's avatar
unknown committed
51
}
52
#endif
unknown's avatar
unknown committed
53

unknown's avatar
unknown committed
54 55
#ifndef MYSQL_CLIENT

56 57
inline int ignored_error_code(int err_code)
{
unknown's avatar
unknown committed
58 59
  return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
          (use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
60 61
}

unknown's avatar
unknown committed
62

63
static void pretty_print_str(String* packet, char* str, int len)
unknown's avatar
unknown committed
64
{
65
  char* end = str + len;
unknown's avatar
unknown committed
66
  packet->append('\'');
67 68 69
  while (str < end)
  {
    char c;
70
    switch ((c=*str++)) {
71 72 73 74 75 76 77 78 79 80 81
    case '\n': packet->append( "\\n"); break;
    case '\r': packet->append( "\\r"); break;
    case '\\': packet->append( "\\\\"); break;
    case '\b': packet->append( "\\b"); break;
    case '\t': packet->append( "\\t"); break;
    case '\'': packet->append( "\\'"); break;
    case 0   : packet->append( "\\0"); break;
    default:
      packet->append((char)c);
      break;
    }
unknown's avatar
unknown committed
82 83 84 85
  }
  packet->append('\'');
}

unknown's avatar
unknown committed
86

87 88 89
static inline char* slave_load_file_stem(char*buf, uint file_id,
					 int event_server_id)
{
unknown's avatar
unknown committed
90
  fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
91 92 93 94 95 96 97 98
  buf = strend(buf);
  buf = int10_to_str(::server_id, buf, 10);
  *buf++ = '-';
  buf = int10_to_str(event_server_id, buf, 10);
  *buf++ = '-';
  return int10_to_str(file_id, buf, 10);
}

unknown's avatar
unknown committed
99 100 101 102
#endif

const char* Log_event::get_type_str()
{
103
  switch(get_type_code()) {
unknown's avatar
unknown committed
104 105 106 107 108 109
  case START_EVENT:  return "Start";
  case STOP_EVENT:   return "Stop";
  case QUERY_EVENT:  return "Query";
  case ROTATE_EVENT: return "Rotate";
  case INTVAR_EVENT: return "Intvar";
  case LOAD_EVENT:   return "Load";
110
  case NEW_LOAD_EVENT:   return "New_load";
unknown's avatar
unknown committed
111
  case SLAVE_EVENT:  return "Slave";
112 113 114 115
  case CREATE_FILE_EVENT: return "Create_file";
  case APPEND_BLOCK_EVENT: return "Append_block";
  case DELETE_FILE_EVENT: return "Delete_file";
  case EXEC_LOAD_EVENT: return "Exec_load";
unknown's avatar
unknown committed
116 117 118 119 120
  default: /* impossible */ return "Unknown";
  }
}

#ifndef MYSQL_CLIENT
121
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
122 123
  :log_pos(0), temp_buf(0), exec_time(0), cached_event_len(0), 
   flags(flags_arg), thd(thd_arg)
124
{
125 126 127 128 129 130 131 132 133 134 135 136 137 138
  server_id = thd->server_id;
  when = thd->start_time;
  cache_stmt= (using_trans &&
	       (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)));
}


Log_event::Log_event()
  :temp_buf(0), exec_time(0), cached_event_len(0), flags(0), cache_stmt(0),
   thd(0)
{
  server_id = ::server_id;
  when = time(NULL);
  log_pos=0;
139
}
140

141 142 143
/*
  Delete all temporary files used for SQL_LOAD.
*/
144

145 146 147 148 149
static void cleanup_load_tmpdir()
{
  MY_DIR *dirp;
  FILEINFO *file;
  uint i;
150 151 152 153
  char fname[FN_REFLEN];
  char prefbuf[31];
  char *p;
  
154 155
  if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
    return;
156 157 158 159 160 161 162 163 164 165 166 167 168 169
  
  /* 
     When we are deleting temporary files, we should only remove
     the files associated with the server id of our server.
     We don't use event_server_id here because since we've disabled
     direct binlogging of Create_file/Append_file/Exec_load events
     we cannot meet Start_log event in the middle of events from one 
     LOAD DATA.
  */
  p= strmake(prefbuf,"SQL_LOAD-",9);
  p= int10_to_str(::server_id, p, 10);
  *(p++)= '-';
  *p= 0;

unknown's avatar
unknown committed
170
  for (i=0 ; i < (uint)dirp->number_off_files; i++)
171 172
  {
    file=dirp->dir_entry+i;
173
    if (is_prefix(file->name, prefbuf))
174
    {
unknown's avatar
unknown committed
175
      fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
176 177
      my_delete(fname, MYF(0));
    }
178 179 180 181
  }

  my_dirend(dirp);
}
182 183
#endif

unknown's avatar
unknown committed
184

185
Log_event::Log_event(const char* buf, bool old_format)
186
  :temp_buf(0), cached_event_len(0), cache_stmt(0)
187 188 189
{
  when = uint4korr(buf);
  server_id = uint4korr(buf + SERVER_ID_OFFSET);
190 191
  if (old_format)
  {
192
    log_pos=0;
193 194 195 196
    flags=0;
  }
  else
  {
197
    log_pos = uint4korr(buf + LOG_POS_OFFSET);
198 199
    flags = uint2korr(buf + FLAGS_OFFSET);
  }
200 201 202 203 204 205 206 207
#ifndef MYSQL_CLIENT
  thd = 0;
#endif  
}


#ifndef MYSQL_CLIENT

208
int Log_event::exec_event(struct st_relay_log_info* rli)
209
{
210 211 212 213 214 215 216 217 218 219 220 221 222
  /*
    rli is null when (as far as I (Guilhem) know)
    the caller is
    Load_log_event::exec_event *and* that one is called from
    Execute_load_log_event::exec_event. 
    In this case, we don't do anything here ;
    Execute_load_log_event::exec_event will call Log_event::exec_event
    again later with the proper rli.
    Strictly speaking, if we were sure that rli is null
    only in the case discussed above, 'if (rli)' is useless here.
    But as we are not 100% sure, keep it for now.
  */
  if (rli)  
223
  {
224 225 226 227 228 229 230
    if (rli->inside_transaction)
      rli->inc_pending(get_event_len());
    else
    {
      rli->inc_pos(get_event_len(),log_pos);
      flush_relay_log_info(rli);
    }
231 232 233
  }
  return 0;
}
unknown's avatar
unknown committed
234 235 236 237 238 239 240 241

void Log_event::pack_info(String* packet)
{
  net_store_data(packet, "", 0);
}

void Query_log_event::pack_info(String* packet)
{
242 243 244
  char buf[256];
  String tmp(buf, sizeof(buf));
  tmp.length(0);
245
  if (db && db_len)
unknown's avatar
unknown committed
246
  {
247
   tmp.append("use `",5);
unknown's avatar
unknown committed
248
   tmp.append(db, db_len);
249
   tmp.append("`; ", 3);
unknown's avatar
unknown committed
250 251
  }

252
  if (query && q_len)
unknown's avatar
unknown committed
253 254 255 256 257 258
    tmp.append(query, q_len);
  net_store_data(packet, (char*)tmp.ptr(), tmp.length());
}

void Start_log_event::pack_info(String* packet)
{
259 260 261
  char buf1[256];
  String tmp(buf1, sizeof(buf1));
  tmp.length(0);
unknown's avatar
unknown committed
262 263 264 265 266 267 268 269 270 271 272
  char buf[22];

  tmp.append("Server ver: ");
  tmp.append(server_version);
  tmp.append(", Binlog ver: ");
  tmp.append(llstr(binlog_version, buf));
  net_store_data(packet, tmp.ptr(), tmp.length());
}

void Load_log_event::pack_info(String* packet)
{
273 274 275
  char buf[256];
  String tmp(buf, sizeof(buf));
  tmp.length(0);
unknown's avatar
unknown committed
276
  if (db && db_len)
unknown's avatar
unknown committed
277 278 279 280 281 282 283
  {
   tmp.append("use ");
   tmp.append(db, db_len);
   tmp.append("; ", 2);
  }

  tmp.append("LOAD DATA INFILE '");
284
  tmp.append(fname, fname_len);
unknown's avatar
unknown committed
285
  tmp.append("' ", 2);
unknown's avatar
unknown committed
286
  if (sql_ex.opt_flags & REPLACE_FLAG)
unknown's avatar
unknown committed
287
    tmp.append(" REPLACE ");
unknown's avatar
unknown committed
288
  else if (sql_ex.opt_flags & IGNORE_FLAG)
unknown's avatar
unknown committed
289 290 291 292
    tmp.append(" IGNORE ");
  
  tmp.append("INTO TABLE ");
  tmp.append(table_name);
293
  if (sql_ex.field_term_len)
unknown's avatar
unknown committed
294 295
  {
    tmp.append(" FIELDS TERMINATED BY ");
296
    pretty_print_str(&tmp, sql_ex.field_term, sql_ex.field_term_len);
unknown's avatar
unknown committed
297 298
  }

299
  if (sql_ex.enclosed_len)
unknown's avatar
unknown committed
300
  {
unknown's avatar
unknown committed
301
    if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG )
unknown's avatar
unknown committed
302 303
      tmp.append(" OPTIONALLY ");
    tmp.append( " ENCLOSED BY ");
304
    pretty_print_str(&tmp, sql_ex.enclosed, sql_ex.enclosed_len);
unknown's avatar
unknown committed
305 306
  }
     
307
  if (sql_ex.escaped_len)
unknown's avatar
unknown committed
308 309
  {
    tmp.append( " ESCAPED BY ");
310
    pretty_print_str(&tmp, sql_ex.escaped, sql_ex.escaped_len);
unknown's avatar
unknown committed
311 312
  }
     
unknown's avatar
unknown committed
313
  bool line_lexem_added= false;
314
  if (sql_ex.line_term_len)
unknown's avatar
unknown committed
315 316
  {
    tmp.append(" LINES TERMINATED BY ");
317
    pretty_print_str(&tmp, sql_ex.line_term, sql_ex.line_term_len);
unknown's avatar
unknown committed
318
    line_lexem_added= true;
unknown's avatar
unknown committed
319 320
  }

321
  if (sql_ex.line_start_len)
unknown's avatar
unknown committed
322
  {
unknown's avatar
unknown committed
323 324 325
    if (!line_lexem_added)
      tmp.append(" LINES");
    tmp.append(" STARTING BY ");
326
    pretty_print_str(&tmp, sql_ex.line_start, sql_ex.line_start_len);
unknown's avatar
unknown committed
327 328
  }
     
329 330 331 332 333 334 335 336
  if ((long) skip_lines > 0)
  {
    char nr_buff[32], *end;
    tmp.append( " IGNORE ");
    end= longlong10_to_str((longlong) skip_lines, nr_buff, 10);
    tmp.append(nr_buff, (uint) (end-nr_buff));
    tmp.append( " LINES");
  }
unknown's avatar
unknown committed
337 338 339 340 341 342

  if (num_fields)
  {
    uint i;
    const char* field = fields;
    tmp.append(" (");
343
    for (i = 0; i < num_fields; i++)
unknown's avatar
unknown committed
344
    {
unknown's avatar
unknown committed
345
      if (i)
unknown's avatar
unknown committed
346 347 348 349 350 351 352 353 354 355 356 357 358
	tmp.append(" ,");
      tmp.append( field);
	  
      field += field_lens[i]  + 1;
    }
    tmp.append(')');
  }

  net_store_data(packet, tmp.ptr(), tmp.length());
}

void Rotate_log_event::pack_info(String* packet)
{
359
  char buf1[256], buf[22];
360 361
  String tmp(buf1, sizeof(buf1));
  tmp.length(0);
362
  tmp.append(new_log_ident, ident_len);
363 364
  tmp.append(";pos=");
  tmp.append(llstr(pos,buf));
unknown's avatar
unknown committed
365
  if (flags & LOG_EVENT_FORCED_ROTATE_F)
366 367
    tmp.append("; forced by master");
  net_store_data(packet, tmp.ptr(), tmp.length());
unknown's avatar
unknown committed
368 369 370 371
}

void Intvar_log_event::pack_info(String* packet)
{
372
  char buf1[256], buf[22];
373 374
  String tmp(buf1, sizeof(buf1));
  tmp.length(0);
unknown's avatar
unknown committed
375 376 377 378 379 380
  tmp.append(get_var_type_name());
  tmp.append('=');
  tmp.append(llstr(val, buf));
  net_store_data(packet, tmp.ptr(), tmp.length());
}

unknown's avatar
unknown committed
381 382
void Rand_log_event::pack_info(String* packet)
{
383 384 385 386 387 388
  char buf1[256], *pos;
  pos=strmov(buf1,"rand_seed1=");
  pos=int10_to_str((long) seed1, pos, 10);
  pos=strmov(pos, ",rand_seed2=");
  pos=int10_to_str((long) seed2, pos, 10);
  net_store_data(packet, buf1, (uint) (pos-buf1));
unknown's avatar
unknown committed
389 390
}

unknown's avatar
unknown committed
391 392
void Slave_log_event::pack_info(String* packet)
{
393
  char buf1[256], buf[22], *end;
394 395
  String tmp(buf1, sizeof(buf1));
  tmp.length(0);
unknown's avatar
unknown committed
396 397 398
  tmp.append("host=");
  tmp.append(master_host);
  tmp.append(",port=");
399 400
  end= int10_to_str((long) master_port, buf, 10);
  tmp.append(buf, (uint32) (end-buf));
unknown's avatar
unknown committed
401 402 403 404 405
  tmp.append(",log=");
  tmp.append(master_log);
  tmp.append(",pos=");
  tmp.append(llstr(master_pos,buf));
  net_store_data(packet, tmp.ptr(), tmp.length());
unknown's avatar
unknown committed
406 407 408 409 410 411 412 413 414
}


void Log_event::init_show_field_list(List<Item>* field_list)
{
  field_list->push_back(new Item_empty_string("Log_name", 20));
  field_list->push_back(new Item_empty_string("Pos", 20));
  field_list->push_back(new Item_empty_string("Event_type", 20));
  field_list->push_back(new Item_empty_string("Server_id", 20));
415
  field_list->push_back(new Item_empty_string("Orig_log_pos", 20));
unknown's avatar
unknown committed
416 417 418
  field_list->push_back(new Item_empty_string("Info", 20));
}

unknown's avatar
unknown committed
419 420 421
/*
 * only called by SHOW BINLOG EVENTS
 */
422
int Log_event::net_send(THD* thd_arg, const char* log_name, my_off_t pos)
unknown's avatar
unknown committed
423
{
424
  String* packet = &thd_arg->packet;
unknown's avatar
unknown committed
425 426 427 428 429 430 431
  const char* p = strrchr(log_name, FN_LIBCHAR);
  const char* event_type;
  if (p)
    log_name = p + 1;
  
  packet->length(0);
  net_store_data(packet, log_name, strlen(log_name));
unknown's avatar
unknown committed
432
  net_store_data(packet, (longlong) pos);
unknown's avatar
unknown committed
433 434 435
  event_type = get_type_str();
  net_store_data(packet, event_type, strlen(event_type));
  net_store_data(packet, server_id);
unknown's avatar
unknown committed
436
  net_store_data(packet, (longlong) log_pos);
unknown's avatar
unknown committed
437
  pack_info(packet);
438
  return my_net_write(&thd_arg->net, (char*) packet->ptr(), packet->length());
unknown's avatar
unknown committed
439 440
}

441 442
#endif /* MYSQL_CLIENT */

unknown's avatar
unknown committed
443

444
int Query_log_event::write(IO_CACHE* file)
unknown's avatar
unknown committed
445 446 447 448
{
  return query ? Log_event::write(file) : -1; 
}

449

450
int Log_event::write(IO_CACHE* file)
unknown's avatar
unknown committed
451
{
452
  return (write_header(file) || write_data(file)) ? -1 : 0;
unknown's avatar
unknown committed
453 454
}

455

456
int Log_event::write_header(IO_CACHE* file)
unknown's avatar
unknown committed
457
{
458
  char buf[LOG_EVENT_HEADER_LEN];
unknown's avatar
unknown committed
459
  char* pos = buf;
unknown's avatar
unknown committed
460
  int4store(pos, (ulong) when); // timestamp
unknown's avatar
unknown committed
461 462
  pos += 4;
  *pos++ = get_type_code(); // event type code
463 464
  int4store(pos, server_id);
  pos += 4;
465 466
  long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
  int4store(pos, tmp);
unknown's avatar
unknown committed
467
  pos += 4;
468
  int4store(pos, log_pos);
469 470 471
  pos += 4;
  int2store(pos, flags);
  pos += 2;
472
  return (my_b_safe_write(file, (byte*) buf, (uint) (pos - buf)));
unknown's avatar
unknown committed
473 474 475 476
}

#ifndef MYSQL_CLIENT

477
int Log_event::read_log_event(IO_CACHE* file, String* packet,
478
			      pthread_mutex_t* log_lock)
unknown's avatar
unknown committed
479 480
{
  ulong data_len;
481
  int result=0;
unknown's avatar
unknown committed
482
  char buf[LOG_EVENT_HEADER_LEN];
483
  DBUG_ENTER("read_log_event");
484

485
  if (log_lock)
486
    pthread_mutex_lock(log_lock);
487 488
  if (my_b_read(file, (byte*) buf, sizeof(buf)))
  {
489 490 491 492 493
    /*
      If the read hits eof, we must report it as eof so the caller
      will know it can go into cond_wait to be woken up on the next
      update to the log.
    */
494
    DBUG_PRINT("error",("file->error: %d", file->error));
495 496 497
    if (!file->error)
      result= LOG_READ_EOF;
    else
498
      result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
499
    goto end;
500
  }
501
  data_len= uint4korr(buf + EVENT_LEN_OFFSET);
unknown's avatar
unknown committed
502 503
  if (data_len < LOG_EVENT_HEADER_LEN ||
      data_len > current_thd->variables.max_allowed_packet)
504
  {
505
    DBUG_PRINT("error",("data_len: %ld", data_len));
506 507 508
    result= ((data_len < LOG_EVENT_HEADER_LEN) ? LOG_READ_BOGUS :
	     LOG_READ_TOO_LARGE);
    goto end;
509
  }
unknown's avatar
unknown committed
510
  packet->append(buf, sizeof(buf));
511
  data_len-= LOG_EVENT_HEADER_LEN;
512 513 514
  if (data_len)
  {
    if (packet->append(file, data_len))
515
    {
516
      /*
517 518
	Here we should never hit EOF in a non-error condition.
	EOF means we are reading the event partially, which should
519 520 521 522
	never happen.
      */
      result= file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO;
      /* Implicit goto end; */
523
    }
524
  }
525 526 527 528

end:
  if (log_lock)
    pthread_mutex_unlock(log_lock);
529
  DBUG_RETURN(result);
unknown's avatar
unknown committed
530 531 532 533
}

#endif // MYSQL_CLIENT

unknown's avatar
unknown committed
534
#ifndef MYSQL_CLIENT
unknown's avatar
unknown committed
535 536
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
537
#define max_allowed_packet current_thd->variables.max_allowed_packet
538
#else
539
#define UNLOCK_MUTEX
540 541 542
#define LOCK_MUTEX
#endif

unknown's avatar
unknown committed
543 544
// allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT
545 546 547
Log_event* Log_event::read_log_event(IO_CACHE* file,
				     pthread_mutex_t* log_lock,
				     bool old_format)
unknown's avatar
unknown committed
548
#else
549
Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
unknown's avatar
unknown committed
550
#endif  
unknown's avatar
unknown committed
551
{
552
  char head[LOG_EVENT_HEADER_LEN];
553 554
  uint header_size= old_format ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;

555
  LOCK_MUTEX;
556
  if (my_b_read(file, (byte *) head, header_size))
557
  {
unknown's avatar
unknown committed
558
    UNLOCK_MUTEX;
559
    return 0;
560
  }
unknown's avatar
unknown committed
561

562
  uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
563 564 565
  char *buf= 0;
  const char *error= 0;
  Log_event *res=  0;
unknown's avatar
unknown committed
566

567
  if (data_len > max_allowed_packet)
unknown's avatar
unknown committed
568
  {
569 570
    error = "Event too big";
    goto err;
unknown's avatar
unknown committed
571 572
  }

573
  if (data_len < header_size)
unknown's avatar
unknown committed
574
  {
575 576
    error = "Event too small";
    goto err;
unknown's avatar
unknown committed
577
  }
578 579 580

  // some events use the extra byte to null-terminate strings
  if (!(buf = my_malloc(data_len+1, MYF(MY_WME))))
581 582 583
  {
    error = "Out of memory";
    goto err;
unknown's avatar
unknown committed
584
  }
585
  buf[data_len] = 0;
586
  memcpy(buf, head, header_size);
587
  if (my_b_read(file, (byte*) buf + header_size, data_len - header_size))
588 589 590 591
  {
    error = "read error";
    goto err;
  }
592
  if ((res = read_log_event(buf, data_len, &error, old_format)))
593
    res->register_temp_buf(buf);
594

595
err:
unknown's avatar
unknown committed
596
  UNLOCK_MUTEX;
597
  if (error)
598
  {
599 600 601
    sql_print_error("\
Error in Log_event::read_log_event(): '%s', data_len: %d, event_type: %d",
		    error,data_len,head[EVENT_TYPE_OFFSET]);
602
    my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
603 604 605 606 607 608 609 610 611
    /*
      The SQL slave thread will check if file->error<0 to know
      if there was an I/O error. Even if there is no "low-level" I/O errors
      with 'file', any of the high-level above errors is worrying
      enough to stop the SQL thread now ; as we are skipping the current event,
      going on with reading and successfully executing other events can
      only corrupt the slave's databases. So stop.
    */
    file->error= -1;
612
  }
613
  return res;
unknown's avatar
unknown committed
614 615
}

616

617
Log_event* Log_event::read_log_event(const char* buf, int event_len,
618
				     const char **error, bool old_format)
unknown's avatar
unknown committed
619
{
620
  if (event_len < EVENT_LEN_OFFSET ||
621 622 623
      (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
  {
    *error="Sanity check failed";		// Needed to free buffer
624
    return NULL; // general sanity check - will fail on a partial read
625
  }
626
  
627 628
  Log_event* ev = NULL;
  
629
  switch(buf[EVENT_TYPE_OFFSET]) {
unknown's avatar
unknown committed
630
  case QUERY_EVENT:
631
    ev  = new Query_log_event(buf, event_len, old_format);
632
    break;
unknown's avatar
unknown committed
633
  case LOAD_EVENT:
unknown's avatar
unknown committed
634 635
    ev = new Create_file_log_event(buf, event_len, old_format);
    break;
636
  case NEW_LOAD_EVENT:
637
    ev = new Load_log_event(buf, event_len, old_format);
638
    break;
unknown's avatar
unknown committed
639
  case ROTATE_EVENT:
640
    ev = new Rotate_log_event(buf, event_len, old_format);
641
    break;
unknown's avatar
unknown committed
642
  case SLAVE_EVENT:
643 644 645
    ev = new Slave_log_event(buf, event_len);
    break;
  case CREATE_FILE_EVENT:
unknown's avatar
unknown committed
646
    ev = new Create_file_log_event(buf, event_len, old_format);
647 648 649 650 651 652 653 654 655 656 657
    break;
  case APPEND_BLOCK_EVENT:
    ev = new Append_block_log_event(buf, event_len);
    break;
  case DELETE_FILE_EVENT:
    ev = new Delete_file_log_event(buf, event_len);
    break;
  case EXEC_LOAD_EVENT:
    ev = new Execute_load_log_event(buf, event_len);
    break;
  case START_EVENT:
658
    ev = new Start_log_event(buf, old_format);
659 660
    break;
  case STOP_EVENT:
661
    ev = new Stop_log_event(buf, old_format);
662 663
    break;
  case INTVAR_EVENT:
664
    ev = new Intvar_log_event(buf, old_format);
665
    break;
unknown's avatar
unknown committed
666 667 668
  case RAND_EVENT:
    ev = new Rand_log_event(buf, old_format);
    break;
669 670
  default:
    break;
unknown's avatar
unknown committed
671
  }
672
  if (!ev || !ev->is_valid())
673 674
  {
    delete ev;
675 676 677 678 679 680 681 682 683
#ifdef MYSQL_CLIENT
    if (!force_opt)
    {
      *error= "Found invalid event in binary log";
      return 0;
    }
    ev= new Unknown_log_event(buf, old_format);
#else
    *error= "Found invalid event in binary log";
684
    return 0;
685
#endif
686 687 688
  }
  ev->cached_event_len = event_len;
  return ev;  
unknown's avatar
unknown committed
689 690
}

691

692
#ifdef MYSQL_CLIENT
693 694
void Log_event::print_header(FILE* file)
{
695
  char llbuff[22];
696 697
  fputc('#', file);
  print_timestamp(file);
698
  fprintf(file, " server id %d  log_pos %s ", server_id,
699
	  llstr(log_pos,llbuff)); 
700 701
}

702
void Log_event::print_timestamp(FILE* file, time_t* ts)
unknown's avatar
unknown committed
703
{
unknown's avatar
unknown committed
704
  struct tm *res;
705 706
  if (!ts)
    ts = &when;
707 708
#ifdef MYSQL_SERVER				// This is always false
  struct tm tm_tmp;
unknown's avatar
unknown committed
709
  localtime_r(ts,(res= &tm_tmp));
unknown's avatar
unknown committed
710
#else
711
  res=localtime(ts);
unknown's avatar
unknown committed
712
#endif
713 714

  fprintf(file,"%02d%02d%02d %2d:%02d:%02d",
715 716 717 718 719 720
	  res->tm_year % 100,
	  res->tm_mon+1,
	  res->tm_mday,
	  res->tm_hour,
	  res->tm_min,
	  res->tm_sec);
unknown's avatar
unknown committed
721 722 723
}


724
void Start_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
725 726 727 728
{
  if (short_form)
    return;

729 730 731
  print_header(file);
  fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version,
	  server_version);
732
  print_timestamp(file);
733
  if (created)
734
    fprintf(file," at startup");
735
  fputc('\n', file);
unknown's avatar
unknown committed
736 737 738
  fflush(file);
}

739
void Stop_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
740 741 742 743
{
  if (short_form)
    return;

744
  print_header(file);
unknown's avatar
unknown committed
745 746 747 748
  fprintf(file, "\tStop\n");
  fflush(file);
}

749
void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
750
{
751
  char buf[22];
unknown's avatar
unknown committed
752 753 754
  if (short_form)
    return;

755
  print_header(file);
unknown's avatar
unknown committed
756 757 758 759
  fprintf(file, "\tRotate to ");
  if (new_log_ident)
    my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, 
	      MYF(MY_NABP | MY_WME));
unknown's avatar
unknown committed
760 761 762 763
  fprintf(file, "  pos: %s", llstr(pos, buf));
  if (flags & LOG_EVENT_FORCED_ROTATE_F)
    fprintf(file,"  forced by master");
  fputc('\n', file);
unknown's avatar
unknown committed
764 765 766
  fflush(file);
}

767 768
#endif /* #ifdef MYSQL_CLIENT */

769

770
Start_log_event::Start_log_event(const char* buf,
771 772
				 bool old_format)
  :Log_event(buf, old_format)
773
{
774 775 776
  buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  binlog_version = uint2korr(buf+ST_BINLOG_VER_OFFSET);
  memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
777
	 ST_SERVER_VER_LEN);
778
  created = uint4korr(buf+ST_CREATED_OFFSET);
779 780
}

781 782
int Start_log_event::write_data(IO_CACHE* file)
{
783 784 785 786
  char buff[START_HEADER_LEN];
  int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
  memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
  int4store(buff + ST_CREATED_OFFSET,created);
787
  return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
788 789
}

790

791
Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
792 793
				   bool old_format)
  :Log_event(buf, old_format),new_log_ident(NULL),alloced(0)
unknown's avatar
unknown committed
794
{
unknown's avatar
unknown committed
795
  // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
796 797
  int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  uint ident_offset;
unknown's avatar
unknown committed
798
  if (event_len < header_size)
unknown's avatar
unknown committed
799
    return;
800 801 802
  buf += header_size;
  if (old_format)
  {
unknown's avatar
unknown committed
803
    ident_len = (uint)(event_len - OLD_HEADER_LEN);
804 805 806 807 808
    pos = 4;
    ident_offset = 0;
  }
  else
  {
unknown's avatar
unknown committed
809
    ident_len = (uint)(event_len - ROTATE_EVENT_OVERHEAD);
810 811 812
    pos = uint8korr(buf + R_POS_OFFSET);
    ident_offset = ROTATE_HEADER_LEN;
  }
unknown's avatar
unknown committed
813
  set_if_smaller(ident_len,FN_REFLEN-1);
814 815 816 817
  if (!(new_log_ident= my_strdup_with_length((byte*) buf +
					     ident_offset,
					     (uint) ident_len,
					     MYF(MY_WME))))
unknown's avatar
unknown committed
818 819 820 821
    return;
  alloced = 1;
}

822

823
int Rotate_log_event::write_data(IO_CACHE* file)
unknown's avatar
unknown committed
824
{
825
  char buf[ROTATE_HEADER_LEN];
826
  int8store(buf + R_POS_OFFSET, pos);
827 828
  return (my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
	  my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len));
unknown's avatar
unknown committed
829 830
}

831

832 833
#ifndef MYSQL_CLIENT
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
834
				 ulong query_length, bool using_trans)
835 836
  :Log_event(thd_arg, 0, using_trans), data_buf(0), query(query_arg),
   db(thd_arg->db), q_len((uint32) query_length),
unknown's avatar
unknown committed
837 838 839 840 841
   error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno),
   thread_id(thd_arg->thread_id),
   /* save the original thread id; we already know the server id */
   slave_proxy_id(thd_arg->slave_proxy_id)

842 843 844 845 846 847
{
  time_t end_time;
  time(&end_time);
  exec_time = (ulong) (end_time  - thd->start_time);
  db_len = (db) ? (uint32) strlen(db) : 0;
}
848 849
#endif

850
Query_log_event::Query_log_event(const char* buf, int event_len,
unknown's avatar
unknown committed
851 852
				 bool old_format)
  :Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
unknown's avatar
unknown committed
853 854
{
  ulong data_len;
855 856 857 858 859 860 861 862 863 864 865 866 867 868
  if (old_format)
  {
    if ((uint)event_len < OLD_HEADER_LEN + QUERY_HEADER_LEN)
      return;				
    data_len = event_len - (QUERY_HEADER_LEN + OLD_HEADER_LEN);
    buf += OLD_HEADER_LEN;
  }
  else
  {
    if ((uint)event_len < QUERY_EVENT_OVERHEAD)
      return;				
    data_len = event_len - QUERY_EVENT_OVERHEAD;
    buf += LOG_EVENT_HEADER_LEN;
  }
unknown's avatar
unknown committed
869

870 871
  exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
  error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
unknown's avatar
unknown committed
872

873
  if (!(data_buf = (char*) my_malloc(data_len + 1, MYF(MY_WME))))
unknown's avatar
unknown committed
874 875
    return;

876
  memcpy(data_buf, buf + Q_DATA_OFFSET, data_len);
unknown's avatar
unknown committed
877
  slave_proxy_id= thread_id= uint4korr(buf + Q_THREAD_ID_OFFSET);
unknown's avatar
unknown committed
878
  db = data_buf;
879
  db_len = (uint)buf[Q_DB_LEN_OFFSET];
unknown's avatar
unknown committed
880 881 882 883 884
  query=data_buf + db_len + 1;
  q_len = data_len - 1 - db_len;
  *((char*)query+q_len) = 0;
}

885

886 887
#ifdef MYSQL_CLIENT

888
void Query_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
889
{
890
  char buff[40],*end;				// Enough for SET TIMESTAMP
unknown's avatar
unknown committed
891 892
  if (!short_form)
  {
893
    print_header(file);
894 895
    fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
	    (ulong) thread_id, (ulong) exec_time, error_code);
unknown's avatar
unknown committed
896 897
  }

898 899
  bool same_db = 0;

unknown's avatar
unknown committed
900
  if (db && last_db)
901 902 903 904
  {
    if (!(same_db = !memcmp(last_db, db, db_len + 1)))
      memcpy(last_db, db, db_len + 1);
  }
905 906
  
  if (db && db[0] && !same_db)
unknown's avatar
unknown committed
907
    fprintf(file, "use %s;\n", db);
908 909 910 911
  end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
  *end++=';';
  *end++='\n';
  my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME));
unknown's avatar
unknown committed
912 913 914
  my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME));
  fprintf(file, ";\n");
}
915 916
#endif

917

918
int Query_log_event::write_data(IO_CACHE* file)
unknown's avatar
unknown committed
919
{
920 921
  if (!query)
    return -1;
unknown's avatar
unknown committed
922 923
  
  char buf[QUERY_HEADER_LEN]; 
unknown's avatar
unknown committed
924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957
  /*
    We want to store the thread id:
    (- as an information for the user when he reads the binlog)
    - if the query uses temporary table: for the slave SQL thread to know to
    which master connection the temp table belongs.
    Now imagine we (write_data()) are called by the slave SQL thread (we are
    logging a query executed by this thread; the slave runs with
    --log-slave-updates). Then this query will be logged with
    thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of the
    same name were created simultaneously on the master (in the master binlog
    you have
    CREATE TEMPORARY TABLE t; (thread 1)
    CREATE TEMPORARY TABLE t; (thread 2)
    ...)
    then in the slave's binlog there will be
    CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
    CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
    which is bad (same thread id!).
    To avoid this, we log the thread's thread id EXCEPT for the SQL slave thread
    for which we log the original (master's) thread id.
    Now this moves the bug: what happens if the thread id on the master was 10
    and when the slave replicates the query, a connection number 10 is opened by
    a normal client on the slave, and updates a temp table of the same name? We
    get a problem again. To avoid this, in the handling of temp tables
    (sql_base.cc) we use thread_id AND server_id.
    TODO when this is merged into 4.1: in 4.1, slave_proxy_id has been renamed
    to pseudo_thread_id and is a session variable: that's to make mysqlbinlog
    work with temp tables. We probably need to introduce
    SET PSEUDO_SERVER_ID
    for mysqlbinlog in 4.1. mysqlbinlog would print:
    SET PSEUDO_SERVER_ID=
    SET PSEUDO_THREAD_ID=
    for each query using temp tables.
  */
unknown's avatar
unknown committed
958
  int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
959
  int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
960
  buf[Q_DB_LEN_OFFSET] = (char) db_len;
961
  int2store(buf + Q_ERR_CODE_OFFSET, error_code);
unknown's avatar
unknown committed
962

963 964 965
  return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
	  my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
	  my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0;
unknown's avatar
unknown committed
966 967
}

968 969
Intvar_log_event::Intvar_log_event(const char* buf, bool old_format)
  :Log_event(buf, old_format)
unknown's avatar
unknown committed
970
{
971
  buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
972 973
  type = buf[I_TYPE_OFFSET];
  val = uint8korr(buf+I_VAL_OFFSET);
unknown's avatar
unknown committed
974 975
}

unknown's avatar
unknown committed
976 977
const char* Intvar_log_event::get_var_type_name()
{
978
  switch(type) {
unknown's avatar
unknown committed
979 980 981 982 983 984
  case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
  case INSERT_ID_EVENT: return "INSERT_ID";
  default: /* impossible */ return "UNKNOWN";
  }
}

985
int Intvar_log_event::write_data(IO_CACHE* file)
unknown's avatar
unknown committed
986 987
{
  char buf[9];
988 989
  buf[I_TYPE_OFFSET] = type;
  int8store(buf + I_VAL_OFFSET, val);
990
  return my_b_safe_write(file, (byte*) buf, sizeof(buf));
unknown's avatar
unknown committed
991 992
}

993
#ifdef MYSQL_CLIENT
994
void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
995 996
{
  char llbuff[22];
997 998 999
  const char *msg;
  LINT_INIT(msg);

unknown's avatar
unknown committed
1000
  if (!short_form)
unknown's avatar
unknown committed
1001
  {
1002
    print_header(file);
unknown's avatar
unknown committed
1003 1004 1005 1006
    fprintf(file, "\tIntvar\n");
  }

  fprintf(file, "SET ");
1007
  switch (type) {
unknown's avatar
unknown committed
1008
  case LAST_INSERT_ID_EVENT:
1009
    msg="LAST_INSERT_ID";
unknown's avatar
unknown committed
1010 1011
    break;
  case INSERT_ID_EVENT:
1012
    msg="INSERT_ID";
unknown's avatar
unknown committed
1013 1014
    break;
  }
1015
  fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff));
unknown's avatar
unknown committed
1016 1017
  fflush(file);
}
1018
#endif
unknown's avatar
unknown committed
1019

unknown's avatar
unknown committed
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
/*****************************************************************************
 *
 *  Rand log event
 *
 ****************************************************************************/
Rand_log_event::Rand_log_event(const char* buf, bool old_format)
  :Log_event(buf, old_format)
{
  buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  seed1 = uint8korr(buf+RAND_SEED1_OFFSET);
  seed2 = uint8korr(buf+RAND_SEED2_OFFSET);
}

int Rand_log_event::write_data(IO_CACHE* file)
{
  char buf[16];
  int8store(buf + RAND_SEED1_OFFSET, seed1);
  int8store(buf + RAND_SEED2_OFFSET, seed2);
  return my_b_safe_write(file, (byte*) buf, sizeof(buf));
}

#ifdef MYSQL_CLIENT
void Rand_log_event::print(FILE* file, bool short_form, char* last_db)
{
1044
  char llbuff[22],llbuff2[22];
unknown's avatar
unknown committed
1045 1046 1047 1048 1049
  if (!short_form)
  {
    print_header(file);
    fprintf(file, "\tRand\n");
  }
1050
  fprintf(file, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n",
1051
	  llstr(seed1, llbuff),llstr(seed2, llbuff2));
unknown's avatar
unknown committed
1052 1053 1054
  fflush(file);
}
#endif
1055

1056
int Load_log_event::write_data_header(IO_CACHE* file)
unknown's avatar
unknown committed
1057 1058
{
  char buf[LOAD_HEADER_LEN];
unknown's avatar
unknown committed
1059
  int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
1060 1061 1062 1063 1064
  int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
  int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
  buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
  buf[L_DB_LEN_OFFSET] = (char)db_len;
  int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
1065
  return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN);
1066
}
unknown's avatar
unknown committed
1067

1068 1069
int Load_log_event::write_data_body(IO_CACHE* file)
{
1070 1071
  if (sql_ex.write_data(file))
    return 1;
1072 1073
  if (num_fields && fields && field_lens)
  {
1074 1075
    if (my_b_safe_write(file, (byte*)field_lens, num_fields) ||
	my_b_safe_write(file, (byte*)fields, field_block_len))
1076 1077
      return 1;
  }
1078 1079 1080
  return (my_b_safe_write(file, (byte*)table_name, table_name_len + 1) ||
	  my_b_safe_write(file, (byte*)db, db_len + 1) ||
	  my_b_safe_write(file, (byte*)fname, fname_len));
unknown's avatar
unknown committed
1081 1082
}

1083

1084

1085 1086
static bool write_str(IO_CACHE *file, char *str, byte length)
{
1087 1088
  return (my_b_safe_write(file, &length, 1) ||
	  my_b_safe_write(file, (byte*) str, (int) length));
1089
}
1090

1091

1092 1093 1094 1095
int sql_ex_info::write_data(IO_CACHE* file)
{
  if (new_format())
  {
1096 1097 1098 1099 1100
    return (write_str(file, field_term, field_term_len) ||
	    write_str(file, enclosed,   enclosed_len) ||
	    write_str(file, line_term,  line_term_len) ||
	    write_str(file, line_start, line_start_len) ||
	    write_str(file, escaped,    escaped_len) ||
1101
	    my_b_safe_write(file,(byte*) &opt_flags,1));
1102 1103 1104 1105
  }
  else
  {
    old_sql_ex old_ex;
1106 1107 1108 1109 1110 1111 1112
    old_ex.field_term= *field_term;
    old_ex.enclosed=   *enclosed;
    old_ex.line_term=  *line_term;
    old_ex.line_start= *line_start;
    old_ex.escaped=    *escaped;
    old_ex.opt_flags=  opt_flags;
    old_ex.empty_flags=empty_flags;
1113
    return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex));
1114 1115 1116
  }
}

1117

1118 1119 1120 1121 1122 1123 1124 1125 1126 1127
static inline int read_str(char * &buf, char *buf_end, char * &str,
			   uint8 &len)
{
  if (buf + (uint) (uchar) *buf >= buf_end)
    return 1;
  len = (uint8) *buf;
  str= buf+1;
  buf+= (uint) len+1;
  return 0;
}
1128

1129

1130 1131 1132 1133 1134
char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
{
  cached_new_format = use_new_format;
  if (use_new_format)
  {
1135
    empty_flags=0;
unknown's avatar
unknown committed
1136 1137 1138 1139 1140 1141
    /*
      The code below assumes that buf will not disappear from
      under our feet during the lifetime of the event. This assumption
      holds true in the slave thread if the log is in new format, but is not
      the case when we have old format because we will be reusing net buffer
      to read the actual file before we write out the Create_file event.
unknown's avatar
unknown committed
1142
    */
1143 1144 1145 1146 1147 1148
    if (read_str(buf, buf_end, field_term, field_term_len) ||
	read_str(buf, buf_end, enclosed,   enclosed_len) ||
	read_str(buf, buf_end, line_term,  line_term_len) ||
	read_str(buf, buf_end, line_start, line_start_len) ||
	read_str(buf, buf_end, escaped,	   escaped_len))
      return 0;
1149 1150 1151 1152
    opt_flags = *buf++;
  }
  else
  {
1153
    field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
1154 1155 1156 1157 1158 1159 1160
    field_term = buf++;			// Use first byte in string
    enclosed=	 buf++;
    line_term=   buf++;
    line_start=  buf++;
    escaped=     buf++;
    opt_flags =  *buf++;
    empty_flags= *buf++;
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170
    if (empty_flags & FIELD_TERM_EMPTY)
      field_term_len=0;
    if (empty_flags & ENCLOSED_EMPTY)
      enclosed_len=0;
    if (empty_flags & LINE_TERM_EMPTY)
      line_term_len=0;
    if (empty_flags & LINE_START_EMPTY)
      line_start_len=0;
    if (empty_flags & ESCAPED_EMPTY)
      escaped_len=0;
1171 1172 1173 1174 1175
  }
  return buf;
}


1176
#ifndef MYSQL_CLIENT
1177
Load_log_event::Load_log_event(THD* thd_arg, sql_exchange* ex,
1178
			       const char* db_arg, const char* table_name_arg,
1179
			       List<Item>& fields_arg,
1180 1181
			       enum enum_duplicates handle_dup,
			       bool using_trans)
unknown's avatar
merge  
unknown committed
1182
  :Log_event(thd_arg, 0, using_trans),thread_id(thd_arg->thread_id),
unknown's avatar
unknown committed
1183 1184 1185 1186 1187
   slave_proxy_id(thd_arg->slave_proxy_id),
   num_fields(0),fields(0),
   field_lens(0),field_block_len(0),
   table_name(table_name_arg ? table_name_arg : ""),
   db(db_arg), fname(ex->file_name)
unknown's avatar
unknown committed
1188 1189 1190
{
  time_t end_time;
  time(&end_time);
1191
  exec_time = (ulong) (end_time  - thd_arg->start_time);
1192 1193 1194
  /* db can never be a zero pointer in 4.0 */
  db_len = (uint32) strlen(db);
  table_name_len = (uint32) strlen(table_name);
unknown's avatar
unknown committed
1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
  fname_len = (fname) ? (uint) strlen(fname) : 0;
  sql_ex.field_term = (char*) ex->field_term->ptr();
  sql_ex.field_term_len = (uint8) ex->field_term->length();
  sql_ex.enclosed = (char*) ex->enclosed->ptr();
  sql_ex.enclosed_len = (uint8) ex->enclosed->length();
  sql_ex.line_term = (char*) ex->line_term->ptr();
  sql_ex.line_term_len = (uint8) ex->line_term->length();
  sql_ex.line_start = (char*) ex->line_start->ptr();
  sql_ex.line_start_len = (uint8) ex->line_start->length();
  sql_ex.escaped = (char*) ex->escaped->ptr();
  sql_ex.escaped_len = (uint8) ex->escaped->length();
  sql_ex.opt_flags = 0;
  sql_ex.cached_new_format = -1;
1208
    
unknown's avatar
unknown committed
1209
  if (ex->dumpfile)
unknown's avatar
unknown committed
1210
    sql_ex.opt_flags|= DUMPFILE_FLAG;
unknown's avatar
unknown committed
1211
  if (ex->opt_enclosed)
unknown's avatar
unknown committed
1212
    sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
1213

unknown's avatar
unknown committed
1214
  sql_ex.empty_flags = 0;
1215

1216
  switch (handle_dup) {
unknown's avatar
unknown committed
1217 1218
  case DUP_IGNORE: sql_ex.opt_flags|= IGNORE_FLAG; break;
  case DUP_REPLACE: sql_ex.opt_flags|= REPLACE_FLAG; break;
unknown's avatar
unknown committed
1219 1220
  case DUP_ERROR: break;	
  }
1221

unknown's avatar
unknown committed
1222
  if (!ex->field_term->length())
unknown's avatar
unknown committed
1223
    sql_ex.empty_flags|= FIELD_TERM_EMPTY;
unknown's avatar
unknown committed
1224
  if (!ex->enclosed->length())
unknown's avatar
unknown committed
1225
    sql_ex.empty_flags|= ENCLOSED_EMPTY;
unknown's avatar
unknown committed
1226
  if (!ex->line_term->length())
unknown's avatar
unknown committed
1227
    sql_ex.empty_flags|= LINE_TERM_EMPTY;
unknown's avatar
unknown committed
1228
  if (!ex->line_start->length())
unknown's avatar
unknown committed
1229
    sql_ex.empty_flags|= LINE_START_EMPTY;
unknown's avatar
unknown committed
1230
  if (!ex->escaped->length())
unknown's avatar
unknown committed
1231
    sql_ex.empty_flags|= ESCAPED_EMPTY;
1232
    
unknown's avatar
unknown committed
1233
  skip_lines = ex->skip_lines;
1234

unknown's avatar
unknown committed
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245
  List_iterator<Item> li(fields_arg);
  field_lens_buf.length(0);
  fields_buf.length(0);
  Item* item;
  while ((item = li++))
  {
    num_fields++;
    uchar len = (uchar) strlen(item->name);
    field_block_len += len + 1;
    fields_buf.append(item->name, len + 1);
    field_lens_buf.append((char*)&len, 1);
1246 1247
  }

unknown's avatar
unknown committed
1248 1249 1250 1251
  field_lens = (const uchar*)field_lens_buf.ptr();
  fields = fields_buf.ptr();
}

1252 1253
#endif

1254 1255 1256 1257 1258
/*
  The caller must do buf[event_len] = 0 before he starts using the
  constructed event.
*/

1259
Load_log_event::Load_log_event(const char* buf, int event_len,
1260 1261
			       bool old_format)
  :Log_event(buf, old_format),num_fields(0),fields(0),
1262
  field_lens(0),field_block_len(0),
1263
  table_name(0),db(0),fname(0)
unknown's avatar
unknown committed
1264
{
1265
  if (!event_len) // derived class, will call copy_log_event() itself
1266
    return;
1267
  copy_log_event(buf, event_len, old_format);
1268 1269
}

1270 1271
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
				   bool old_format)
1272
{
1273
  uint data_len;
1274
  char* buf_end = (char*)buf + event_len;
1275 1276
  uint header_len= old_format ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  const char* data_head = buf + header_len;
unknown's avatar
unknown committed
1277
  slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
1278 1279 1280 1281 1282
  exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
  skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
  table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
  db_len = (uint)data_head[L_DB_LEN_OFFSET];
  num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
unknown's avatar
unknown committed
1283
	  
1284
  int body_offset = ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
1285
		     LOAD_HEADER_LEN + header_len :
1286
		     get_data_body_offset());
unknown's avatar
unknown committed
1287
  
unknown's avatar
unknown committed
1288
  if ((int) event_len < body_offset)
1289
    return 1;
1290 1291 1292 1293
  /*
    Sql_ex.init() on success returns the pointer to the first byte after
    the sql_ex structure, which is the start of field lengths array.
  */
1294 1295 1296 1297 1298 1299
  if (!(field_lens=(uchar*)sql_ex.init((char*)buf + body_offset,
		  buf_end,
		  buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
    return 1;
  
  data_len = event_len - body_offset;
1300
  if (num_fields > data_len) // simple sanity check against corruption
1301
    return 1;
1302
  for (uint i = 0; i < num_fields; i++)
1303
    field_block_len += (uint)field_lens[i] + 1;
1304

unknown's avatar
unknown committed
1305 1306 1307 1308
  fields = (char*)field_lens + num_fields;
  table_name  = fields + field_block_len;
  db = table_name + table_name_len + 1;
  fname = db + db_len + 1;
1309 1310
  fname_len = strlen(fname);
  // null termination is accomplished by the caller doing buf[event_len]=0
1311
  return 0;
unknown's avatar
unknown committed
1312 1313
}

1314
#ifdef MYSQL_CLIENT
unknown's avatar
unknown committed
1315

1316
void Load_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
1317 1318 1319 1320 1321
{
  print(file, short_form, last_db, 0);
}

void Load_log_event::print(FILE* file, bool short_form, char* last_db, bool commented)
unknown's avatar
unknown committed
1322 1323 1324
{
  if (!short_form)
  {
1325
    print_header(file);
1326
    fprintf(file, "\tQuery\tthread_id=%ld\texec_time=%ld\n",
unknown's avatar
unknown committed
1327 1328 1329
	    thread_id, exec_time);
  }

1330
  bool same_db = 0;
unknown's avatar
unknown committed
1331 1332 1333 1334 1335
  if (db && last_db)
  {
    if (!(same_db = !memcmp(last_db, db, db_len + 1)))
      memcpy(last_db, db, db_len + 1);
  }
1336
  
unknown's avatar
unknown committed
1337
  if (db && db[0] && !same_db)
unknown's avatar
unknown committed
1338 1339 1340
    fprintf(file, "%suse %s;\n", 
            commented ? "# " : "",
            db);
unknown's avatar
unknown committed
1341

unknown's avatar
unknown committed
1342 1343
  fprintf(file, "%sLOAD DATA ",
          commented ? "# " : "");
unknown's avatar
unknown committed
1344 1345 1346
  if (check_fname_outside_temp_buf())
    fprintf(file, "LOCAL ");
  fprintf(file, "INFILE '%-*s' ", fname_len, fname);
unknown's avatar
unknown committed
1347

unknown's avatar
unknown committed
1348
  if (sql_ex.opt_flags & REPLACE_FLAG )
unknown's avatar
unknown committed
1349
    fprintf(file," REPLACE ");
unknown's avatar
unknown committed
1350
  else if (sql_ex.opt_flags & IGNORE_FLAG )
unknown's avatar
unknown committed
1351 1352 1353
    fprintf(file," IGNORE ");
  
  fprintf(file, "INTO TABLE %s ", table_name);
unknown's avatar
unknown committed
1354
  if (sql_ex.field_term)
unknown's avatar
unknown committed
1355 1356
  {
    fprintf(file, " FIELDS TERMINATED BY ");
1357
    pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len);
unknown's avatar
unknown committed
1358 1359
  }

unknown's avatar
unknown committed
1360
  if (sql_ex.enclosed)
unknown's avatar
unknown committed
1361
  {
unknown's avatar
unknown committed
1362
    if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG )
unknown's avatar
unknown committed
1363 1364
      fprintf(file," OPTIONALLY ");
    fprintf(file, " ENCLOSED BY ");
1365
    pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len);
unknown's avatar
unknown committed
1366 1367
  }
     
1368
  if (sql_ex.escaped)
unknown's avatar
unknown committed
1369 1370
  {
    fprintf(file, " ESCAPED BY ");
1371
    pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len);
unknown's avatar
unknown committed
1372 1373
  }
     
unknown's avatar
unknown committed
1374
  bool line_lexem_added= false;
1375
  if (sql_ex.line_term)
unknown's avatar
unknown committed
1376 1377
  {
    fprintf(file," LINES TERMINATED BY ");
1378
    pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len);
unknown's avatar
unknown committed
1379
    line_lexem_added= true;
unknown's avatar
unknown committed
1380 1381
  }

1382
  if (sql_ex.line_start)
unknown's avatar
unknown committed
1383
  {
unknown's avatar
unknown committed
1384 1385 1386
    if (!line_lexem_added)
      fprintf(file," LINES");
    fprintf(file," STARTING BY ");
1387
    pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len);
unknown's avatar
unknown committed
1388 1389
  }
     
1390 1391
  if ((long) skip_lines > 0)
    fprintf(file, " IGNORE %ld LINES", (long) skip_lines);
unknown's avatar
unknown committed
1392

1393 1394 1395 1396
  if (num_fields)
  {
    uint i;
    const char* field = fields;
1397 1398
    fprintf(file, " (");
    for (i = 0; i < num_fields; i++)
unknown's avatar
unknown committed
1399
    {
unknown's avatar
unknown committed
1400
      if (i)
1401 1402
	fputc(',', file);
      fprintf(file, field);
unknown's avatar
unknown committed
1403
	  
1404
      field += field_lens[i]  + 1;
unknown's avatar
unknown committed
1405
    }
1406 1407
    fputc(')', file);
  }
unknown's avatar
unknown committed
1408 1409 1410 1411

  fprintf(file, ";\n");
}

1412 1413
#endif /* #ifdef MYSQL_CLIENT */

unknown's avatar
unknown committed
1414 1415
#ifndef MYSQL_CLIENT

1416
void Log_event::set_log_pos(MYSQL_LOG* log)
1417 1418 1419 1420 1421
{
  if (!log_pos)
    log_pos = my_b_tell(&log->log_file);
}

1422

1423
void Load_log_event::set_fields(List<Item> &field_list)
unknown's avatar
unknown committed
1424 1425
{
  uint i;
1426 1427
  const char *field= fields;
  for (i= 0; i < num_fields; i++)
unknown's avatar
unknown committed
1428
  {
1429 1430
    field_list.push_back(new Item_field(db, table_name, field));	  
    field+= field_lens[i]  + 1;
unknown's avatar
unknown committed
1431
  }
unknown's avatar
unknown committed
1432 1433
}

unknown's avatar
unknown committed
1434

1435 1436
Slave_log_event::Slave_log_event(THD* thd_arg,
				 struct st_relay_log_info* rli):
1437
  Log_event(thd_arg,0,0),mem_pool(0),master_host(0)
1438
{
1439
  DBUG_ENTER("Slave_log_event");
1440
  if (!rli->inited)				// QQ When can this happen ?
1441
    DBUG_VOID_RETURN;
1442 1443
  
  MASTER_INFO* mi = rli->mi;
unknown's avatar
unknown committed
1444
  // TODO: re-write this better without holding both locks at the same time
1445 1446
  pthread_mutex_lock(&mi->data_lock);
  pthread_mutex_lock(&rli->data_lock);
1447
  master_host_len = strlen(mi->host);
1448
  master_log_len = strlen(rli->master_log_name);
1449
  // on OOM, just do not initialize the structure and print the error
unknown's avatar
unknown committed
1450 1451
  if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
				   MYF(MY_WME))))
1452
  {
unknown's avatar
unknown committed
1453
    master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
1454 1455
    memcpy(master_host, mi->host, master_host_len + 1);
    master_log = master_host + master_host_len + 1;
1456
    memcpy(master_log, rli->master_log_name, master_log_len + 1);
1457
    master_port = mi->port;
1458
    master_pos = rli->master_log_pos;
1459 1460
    DBUG_PRINT("info", ("master_log: %s  pos: %d", master_log,
			(ulong) master_pos));
1461 1462 1463
  }
  else
    sql_print_error("Out of memory while recording slave event");
1464 1465
  pthread_mutex_unlock(&rli->data_lock);
  pthread_mutex_unlock(&mi->data_lock);
1466
  DBUG_VOID_RETURN;
1467 1468
}

1469
#endif /* ! MYSQL_CLIENT */
1470 1471 1472 1473 1474 1475 1476


Slave_log_event::~Slave_log_event()
{
  my_free(mem_pool, MYF(MY_ALLOW_ZERO_PTR));
}

1477 1478
#ifdef MYSQL_CLIENT

1479
void Slave_log_event::print(FILE* file, bool short_form, char* last_db)
1480 1481
{
  char llbuff[22];
unknown's avatar
unknown committed
1482
  if (short_form)
1483 1484 1485
    return;
  print_header(file);
  fputc('\n', file);
1486 1487
  fprintf(file, "\
Slave: master_host: '%s'  master_port: %d  master_log: '%s'  master_pos: %s\n",
unknown's avatar
unknown committed
1488
	  master_host, master_port, master_log, llstr(master_pos, llbuff));
1489 1490
}

1491
#endif /* MYSQL_CLIENT */
1492

1493 1494
int Slave_log_event::get_data_size()
{
unknown's avatar
unknown committed
1495
  return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
1496 1497 1498 1499
}

int Slave_log_event::write_data(IO_CACHE* file)
{
1500 1501
  int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
  int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
1502
  // log and host are already there
1503
  return my_b_safe_write(file, (byte*)mem_pool, get_data_size());
1504 1505
}

1506

1507 1508
void Slave_log_event::init_from_mem_pool(int data_size)
{
1509 1510 1511
  master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
  master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
  master_host = mem_pool + SL_MASTER_HOST_OFFSET;
1512 1513
  master_host_len = strlen(master_host);
  // safety
unknown's avatar
unknown committed
1514
  master_log = master_host + master_host_len + 1;
unknown's avatar
unknown committed
1515
  if (master_log > mem_pool + data_size)
1516 1517 1518 1519 1520 1521 1522
  {
    master_host = 0;
    return;
  }
  master_log_len = strlen(master_log);
}

1523 1524
Slave_log_event::Slave_log_event(const char* buf, int event_len)
  :Log_event(buf,0),mem_pool(0),master_host(0)
1525
{
unknown's avatar
unknown committed
1526
  event_len -= LOG_EVENT_HEADER_LEN;
unknown's avatar
unknown committed
1527
  if (event_len < 0)
unknown's avatar
unknown committed
1528
    return;
1529
  if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME))))
1530
    return;
unknown's avatar
unknown committed
1531
  memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
1532 1533 1534
  mem_pool[event_len] = 0;
  init_from_mem_pool(event_len);
}
1535 1536

#ifndef MYSQL_CLIENT
1537 1538 1539 1540 1541 1542 1543
Create_file_log_event::
Create_file_log_event(THD* thd_arg, sql_exchange* ex,
		      const char* db_arg, const char* table_name_arg,
		      List<Item>& fields_arg, enum enum_duplicates handle_dup,
		      char* block_arg, uint block_len_arg, bool using_trans)
  :Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup,
		  using_trans),
1544 1545
   fake_base(0),block(block_arg),block_len(block_len_arg),
   file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
1546
{
1547
  sql_ex.force_new_format();
1548 1549 1550
}
#endif

1551 1552 1553 1554 1555
int Create_file_log_event::write_data_body(IO_CACHE* file)
{
  int res;
  if ((res = Load_log_event::write_data_body(file)) || fake_base)
    return res;
1556 1557
  return (my_b_safe_write(file, (byte*) "", 1) ||
	  my_b_safe_write(file, (byte*) block, block_len));
1558 1559 1560
}

int Create_file_log_event::write_data_header(IO_CACHE* file)
1561
{
1562 1563 1564
  int res;
  if ((res = Load_log_event::write_data_header(file)) || fake_base)
    return res;
unknown's avatar
unknown committed
1565
  byte buf[CREATE_FILE_HEADER_LEN];
1566
  int4store(buf + CF_FILE_ID_OFFSET, file_id);
1567
  return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN);
1568 1569 1570 1571 1572 1573 1574 1575 1576
}

int Create_file_log_event::write_base(IO_CACHE* file)
{
  int res;
  fake_base = 1; // pretend we are Load event
  res = write(file);
  fake_base = 0;
  return res;
1577 1578
}

unknown's avatar
unknown committed
1579
Create_file_log_event::Create_file_log_event(const char* buf, int len,
1580 1581
					     bool old_format)
  :Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0)
1582
{
1583
  int block_offset;
unknown's avatar
unknown committed
1584
  if (copy_log_event(buf,len,old_format))
1585
    return;
unknown's avatar
unknown committed
1586 1587 1588 1589
  if (!old_format)
  {
    file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
			+ LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
1590 1591 1592
    // + 1 for \0 terminating fname  
    block_offset = (LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
		    CREATE_FILE_HEADER_LEN + 1);
unknown's avatar
unknown committed
1593 1594 1595 1596 1597 1598 1599 1600 1601 1602
    if (len < block_offset)
      return;
    block = (char*)buf + block_offset;
    block_len = len - block_offset;
  }
  else
  {
    sql_ex.force_new_format();
    inited_from_old = 1;
  }
1603
}
1604 1605


1606
#ifdef MYSQL_CLIENT
unknown's avatar
unknown committed
1607 1608
void Create_file_log_event::print(FILE* file, bool short_form, 
				  char* last_db, bool enable_local)
1609
{
1610
  if (short_form)
unknown's avatar
unknown committed
1611 1612 1613
  {
    if (enable_local && check_fname_outside_temp_buf())
      Load_log_event::print(file, 1, last_db);
1614
    return;
unknown's avatar
unknown committed
1615 1616 1617 1618
  }

  if (enable_local)
  {
unknown's avatar
unknown committed
1619 1620 1621 1622 1623 1624
    Load_log_event::print(file, 1, last_db, !check_fname_outside_temp_buf());
    /* 
       That one is for "file_id: etc" below: in mysqlbinlog we want the #, in
       SHOW BINLOG EVENTS we don't.
    */
    fprintf(file, "#"); 
unknown's avatar
unknown committed
1625 1626
  }

unknown's avatar
unknown committed
1627
  fprintf(file, " file_id: %d  block_len: %d\n", file_id, block_len);
1628
}
unknown's avatar
unknown committed
1629 1630 1631 1632 1633 1634

void Create_file_log_event::print(FILE* file, bool short_form,
				  char* last_db)
{
  print(file,short_form,last_db,0);
}
1635 1636 1637 1638 1639
#endif

#ifndef MYSQL_CLIENT
void Create_file_log_event::pack_info(String* packet)
{
1640
  char buf1[256],buf[22], *end;
1641 1642 1643 1644 1645
  String tmp(buf1, sizeof(buf1));
  tmp.length(0);
  tmp.append("db=");
  tmp.append(db, db_len);
  tmp.append(";table=");
1646
  tmp.append(table_name, table_name_len);
1647
  tmp.append(";file_id=");
1648 1649
  end= int10_to_str((long) file_id, buf, 10);
  tmp.append(buf, (uint32) (end-buf));
1650
  tmp.append(";block_len=");
1651 1652 1653
  end= int10_to_str((long) block_len, buf, 10);
  tmp.append(buf, (uint32) (end-buf));
  net_store_data(packet, (char*) tmp.ptr(), tmp.length());
1654 1655 1656 1657
}
#endif  

#ifndef MYSQL_CLIENT  
unknown's avatar
unknown committed
1658 1659
Append_block_log_event::Append_block_log_event(THD* thd_arg, const char* db_arg,
                                               char* block_arg,
1660 1661 1662
					       uint block_len_arg,
					       bool using_trans)
  :Log_event(thd_arg,0, using_trans), block(block_arg),
unknown's avatar
unknown committed
1663
   block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
1664 1665 1666 1667
{
}
#endif  
  
1668 1669
Append_block_log_event::Append_block_log_event(const char* buf, int len)
  :Log_event(buf, 0),block(0)
1670
{
unknown's avatar
unknown committed
1671
  if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
1672 1673 1674 1675 1676 1677 1678 1679
    return;
  file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
  block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD;
  block_len = len - APPEND_BLOCK_EVENT_OVERHEAD;
}

int Append_block_log_event::write_data(IO_CACHE* file)
{
unknown's avatar
unknown committed
1680
  byte buf[APPEND_BLOCK_HEADER_LEN];
1681
  int4store(buf + AB_FILE_ID_OFFSET, file_id);
1682 1683
  return (my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
	  my_b_safe_write(file, (byte*) block, block_len));
1684 1685 1686
}

#ifdef MYSQL_CLIENT  
1687 1688
void Append_block_log_event::print(FILE* file, bool short_form,
				   char* last_db)
1689 1690 1691 1692 1693
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
unknown's avatar
unknown committed
1694
  fprintf(file, "#Append_block: file_id: %d  block_len: %d\n",
1695 1696 1697
	  file_id, block_len);
}
#endif  
1698

1699 1700 1701 1702
#ifndef MYSQL_CLIENT
void Append_block_log_event::pack_info(String* packet)
{
  char buf1[256];
1703 1704
  sprintf(buf1, ";file_id=%u;block_len=%u", file_id, block_len);
  net_store_data(packet, buf1);
1705 1706
}

unknown's avatar
unknown committed
1707 1708 1709
Delete_file_log_event::Delete_file_log_event(THD* thd_arg, const char* db_arg,
                                             bool using_trans) 
  :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
1710 1711 1712 1713
{
}
#endif  

1714 1715 1716

Delete_file_log_event::Delete_file_log_event(const char* buf, int len)
  :Log_event(buf, 0),file_id(0)
1717
{
unknown's avatar
unknown committed
1718
  if ((uint)len < DELETE_FILE_EVENT_OVERHEAD)
1719 1720 1721 1722
    return;
  file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
}

1723

1724 1725
int Delete_file_log_event::write_data(IO_CACHE* file)
{
unknown's avatar
unknown committed
1726
 byte buf[DELETE_FILE_HEADER_LEN];
1727
 int4store(buf + DF_FILE_ID_OFFSET, file_id);
1728
 return my_b_safe_write(file, buf, DELETE_FILE_HEADER_LEN);
1729 1730 1731
}

#ifdef MYSQL_CLIENT  
1732 1733
void Delete_file_log_event::print(FILE* file, bool short_form,
				  char* last_db)
1734 1735 1736 1737 1738
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
1739
  fprintf(file, "#Delete_file: file_id=%u\n", file_id);
1740 1741
}
#endif  
1742

1743 1744 1745
#ifndef MYSQL_CLIENT
void Delete_file_log_event::pack_info(String* packet)
{
1746 1747 1748
  char buf1[64];
  sprintf(buf1, ";file_id=%u", (uint) file_id);
  net_store_data(packet, buf1);
1749 1750 1751
}
#endif  

1752

1753
#ifndef MYSQL_CLIENT  
unknown's avatar
unknown committed
1754 1755 1756
Execute_load_log_event::Execute_load_log_event(THD* thd_arg, const char* db_arg,
                                               bool using_trans)
  :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
1757 1758 1759 1760
{
}
#endif  
  
1761 1762
Execute_load_log_event::Execute_load_log_event(const char* buf, int len)
  :Log_event(buf, 0), file_id(0)
1763
{
unknown's avatar
unknown committed
1764
  if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
1765 1766 1767 1768 1769 1770
    return;
  file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET);
}

int Execute_load_log_event::write_data(IO_CACHE* file)
{
unknown's avatar
unknown committed
1771 1772
  byte buf[EXEC_LOAD_HEADER_LEN];
  int4store(buf + EL_FILE_ID_OFFSET, file_id);
1773
  return my_b_safe_write(file, buf, EXEC_LOAD_HEADER_LEN);
1774 1775 1776
}

#ifdef MYSQL_CLIENT  
1777 1778
void Execute_load_log_event::print(FILE* file, bool short_form,
				   char* last_db)
1779 1780 1781 1782 1783 1784 1785
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
  fprintf(file, "#Exec_load: file_id=%d\n",
	  file_id);
1786 1787
}
#endif  
1788 1789 1790
#ifndef MYSQL_CLIENT
void Execute_load_log_event::pack_info(String* packet)
{
1791 1792 1793
  char buf[64];
  sprintf(buf, ";file_id=%u", (uint) file_id);
  net_store_data(packet, buf);
1794 1795 1796
}
#endif

1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807
#ifdef MYSQL_CLIENT
void Unknown_log_event::print(FILE* file, bool short_form, char* last_db)
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
  fprintf(file, "# %s", "Unknown event\n");
}
#endif  

1808
#ifndef MYSQL_CLIENT
1809
int Query_log_event::exec_event(struct st_relay_log_info* rli)
1810
{
1811
  int expected_error, actual_error= 0;
1812
  init_sql_alloc(&thd->mem_root, thd->variables.query_alloc_block_size,0);
1813
  thd->db= (char*) rewrite_db(db);
1814 1815 1816 1817 1818 1819 1820 1821

  /*
    InnoDB internally stores the master log position it has processed so far;
    position to store is really pos + pending + event_len
    since we must store the pos of the END of the current log event
  */
  rli->event_len= get_event_len();

1822 1823 1824 1825
  if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
  {
    thd->set_time((time_t)when);
    thd->current_tablenr = 0;
1826
    thd->query_length= q_len;
unknown's avatar
unknown committed
1827
    thd->query= (char *) query;
1828 1829 1830 1831 1832 1833
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    thd->query_id = query_id++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
    thd->query_error = 0;			// clear error
    thd->net.last_errno = 0;
    thd->net.last_error[0] = 0;
1834
    thd->slave_proxy_id = thread_id;		// for temp tables
1835
	
1836 1837 1838 1839
    /*
      Sanity check to make sure the master did not get a really bad
      error on the query.
    */
1840 1841
    if (ignored_error_code((expected_error = error_code)) ||
	!check_expected_error(thd,rli,expected_error))
1842
    {
unknown's avatar
unknown committed
1843 1844
      mysql_log.write(thd,COM_QUERY,"%s",thd->query);
      DBUG_PRINT("query",("%s",thd->query));
1845
      mysql_parse(thd, thd->query, q_len);
1846 1847 1848 1849 1850 1851 1852 1853 1854 1855

      /*
	Set a flag if we are inside an transaction so that we can restart
	the transaction from the start if we are killed

	This will only be done if we are supporting transactional tables
	in the slave.
      */
      if (!strcmp(thd->query,"BEGIN"))
	rli->inside_transaction= opt_using_transactions;
unknown's avatar
unknown committed
1856
      else if (!(strcmp(thd->query,"COMMIT") && strcmp(thd->query,"ROLLBACK")))
1857 1858
	rli->inside_transaction=0;

1859 1860 1861 1862
      /*
        If we expected a non-zero error code, and we don't get the same error
        code, and none of them should be ignored.
      */
1863 1864
      if ((expected_error != (actual_error = thd->net.last_errno)) &&
	  expected_error &&
unknown's avatar
unknown committed
1865 1866
	  !ignored_error_code(actual_error) &&
	  !ignored_error_code(expected_error))
1867
      {
1868
	slave_print_error(rli, 0,
1869 1870 1871 1872
                          "\
Query '%s' caused different errors on master and slave. \
Error on master: '%s' (%d), Error on slave: '%s' (%d). \
Default database: '%s'",
1873 1874 1875 1876 1877
                          query,
                          ER_SAFE(expected_error),
                          expected_error,
                          actual_error ? thd->net.last_error: "no error",
                          actual_error,
1878
                          print_slave_db_safe(db));
1879
	thd->query_error= 1;
1880
      }
1881 1882 1883
      /*
        If we get the same error code as expected, or they should be ignored. 
      */
1884 1885
      else if (expected_error == actual_error ||
	       ignored_error_code(actual_error))
1886 1887
      {
	thd->query_error = 0;
1888 1889
	*rli->last_slave_error = 0;
	rli->last_slave_errno = 0;
1890
      }
1891 1892 1893 1894 1895
      /*
        Other cases: mostly we expected no error and get one.
      */
      else if (thd->query_error || thd->fatal_error)
      {
1896 1897 1898 1899 1900 1901
        slave_print_error(rli,actual_error,
			  "Error '%s' on query '%s'. Default database: '%s'",
                          (actual_error ? thd->net.last_error :
			   "unexpected success or fatal error"),
			  query,
                          print_slave_db_safe(db));
1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912
        thd->query_error= 1;
      }
    } 
    /* 
       End of sanity check. If the test was wrong, the query got a really bad
       error on the master, which could be inconsistent, abort and tell DBA to
       check/fix it. check_expected_error() already printed the message to
       stderr and rli, and set thd->query_error to 1.
    */
  } /* End of if (db_ok(... */

1913
  VOID(pthread_mutex_lock(&LOCK_thread_count));
1914
  thd->db= 0;	                        // prevent db from being freed
1915
  thd->query= 0;			// just to be sure
1916
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
1917
  // assume no convert for next query unless set explictly
unknown's avatar
unknown committed
1918
  thd->variables.convert_set = 0;
1919
  close_thread_tables(thd);      
1920
  free_root(&thd->mem_root,0);
1921
  return (thd->query_error ? thd->query_error : Log_event::exec_event(rli)); 
1922 1923
}

1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948
/*
  Does the data loading job when executing a LOAD DATA on the slave

  SYNOPSIS
    Load_log_event::exec_event
      net  
      rli                             
      use_rli_only_for_errors	  - if set to 1, rli is provided to 
                                  Load_log_event::exec_event only for this 
				  function to have RPL_LOG_NAME and 
				  rli->last_slave_error, both being used by 
				  error reports. rli's position advancing
				  is skipped (done by the caller which is
				  Execute_load_log_event::exec_event).
				  - if set to 0, rli is provided for full use,
				  i.e. for error reports and position
				  advancing.

  DESCRIPTION
    Does the data loading job when executing a LOAD DATA on the slave
 
  RETURN VALUE
    0           Success                                                 
    1    	Failure
*/
1949

1950 1951
int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, 
			       bool use_rli_only_for_errors)
1952
{
1953
  init_sql_alloc(&thd->mem_root, thd->variables.query_alloc_block_size, 0);
1954
  thd->db= (char*) rewrite_db(db);
1955 1956
  DBUG_ASSERT(thd->query == 0);
  thd->query = 0;				// Should not be needed
1957
  thd->query_error = 0;
unknown's avatar
unknown committed
1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970

  /*
    We test replicate_*_db rules. Note that we have already prepared the file to
    load, even if we are going to ignore and delete it now. So it is possible
    that we did a lot of disk writes for nothing. In other words, a big LOAD
    DATA INFILE on the master will still consume a lot of space on the slave
    (space in the relay log + space of temp files: twice the space of the file
    to load...) even if it will finally be ignored.
    TODO: fix this; this can be done by testing rules in
    Create_file_log_event::exec_event() and then discarding Append_block and
    al. Another way is do the filtering in the I/O thread (more efficient: no
    disk writes at all).
  */
unknown's avatar
unknown committed
1971
  if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
1972 1973 1974 1975 1976 1977 1978 1979 1980 1981
  {
    thd->set_time((time_t)when);
    thd->current_tablenr = 0;
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    thd->query_id = query_id++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));

    TABLE_LIST tables;
    bzero((char*) &tables,sizeof(tables));
    tables.db = thd->db;
1982
    tables.alias = tables.real_name = (char*)table_name;
1983
    tables.lock_type = TL_WRITE;
1984
    tables.updating= 1;
1985
    // the table will be opened in mysql_load    
unknown's avatar
unknown committed
1986
    if (table_rules_on && !tables_ok(thd, &tables))
1987
    {
1988
      // TODO: this is a bug - this needs to be moved to the I/O thread
1989 1990 1991 1992 1993 1994
      if (net)
        skip_load_data_infile(net);
    }
    else
    {
      char llbuff[22];
1995
      enum enum_duplicates handle_dup;
unknown's avatar
unknown committed
1996
      if (sql_ex.opt_flags & REPLACE_FLAG)
unknown's avatar
unknown committed
1997
	handle_dup= DUP_REPLACE;
1998 1999 2000
      else if (sql_ex.opt_flags & IGNORE_FLAG)
        handle_dup= DUP_IGNORE;
      else
2001
      {
2002
        /*
2003
          When replication is running fine, if it was DUP_ERROR on the 
2004 2005 2006 2007
          master then we could choose DUP_IGNORE here, because if DUP_ERROR
          suceeded on master, and data is identical on the master and slave,
          then there should be no uniqueness errors on slave, so DUP_IGNORE is
          the same as DUP_ERROR. But in the unlikely case of uniqueness errors
2008 2009 2010
          (because the data on the master and slave happen to be different
	  (user error or bug), we want LOAD DATA to print an error message on
	  the slave to discover the problem.
2011 2012 2013 2014 2015

          If reading from net (a 3.23 master), mysql_load() will change this
          to DUP_IGNORE.
        */
        handle_dup= DUP_ERROR;
2016
      }
2017

unknown's avatar
unknown committed
2018
      sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
2019 2020 2021 2022 2023
      String field_term(sql_ex.field_term,sql_ex.field_term_len);
      String enclosed(sql_ex.enclosed,sql_ex.enclosed_len);
      String line_term(sql_ex.line_term,sql_ex.line_term_len);
      String line_start(sql_ex.line_start,sql_ex.line_start_len);
      String escaped(sql_ex.escaped,sql_ex.escaped_len);
2024 2025 2026 2027 2028
      ex.field_term= &field_term;
      ex.enclosed= &enclosed;
      ex.line_term= &line_term;
      ex.line_start= &line_start;
      ex.escaped= &escaped;
2029 2030
	    
      ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
2031
      if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
2032
	ex.field_term->length(0);
2033

2034
      ex.skip_lines = skip_lines;
2035 2036
      List<Item> field_list;
      set_fields(field_list);
2037
      thd->slave_proxy_id = thread_id;
2038 2039 2040 2041
      if (net)
      {
	// mysql_load will use thd->net to read the file
	thd->net.vio = net->vio;
2042 2043 2044
	/*
	  Make sure the client does not get confused about the packet sequence
	*/
2045 2046
	thd->net.pkt_nr = net->pkt_nr;
      }
2047
      if (mysql_load(thd, &ex, &tables, field_list, handle_dup, net != 0,
2048
		     TL_WRITE))
2049
	thd->query_error = 1;
2050
      /* log_pos is the position of the LOAD event in the master log */
unknown's avatar
unknown committed
2051
      if (thd->cuted_fields)
2052 2053 2054 2055
	sql_print_error("\
Slave: load data infile on table '%s' at log position %s in log \
'%s' produced %ld warning(s). Default database: '%s'",
                        (char*) table_name,
2056
                        llstr(log_pos,llbuff), RPL_LOG_NAME, 
2057 2058
			(ulong) thd->cuted_fields,
                        print_slave_db_safe(db));
unknown's avatar
unknown committed
2059
      if (net)
2060
        net->pkt_nr= thd->net.pkt_nr;
2061 2062 2063 2064
    }
  }
  else
  {
2065 2066 2067 2068 2069
    /*
      We will just ask the master to send us /dev/null if we do not
      want to load the data.
      TODO: this a bug - needs to be done in I/O thread
    */
2070 2071 2072 2073 2074
    if (net)
      skip_load_data_infile(net);
  }
	    
  thd->net.vio = 0; 
2075
  thd->db= 0;					// prevent db from being freed
2076
  close_thread_tables(thd);
unknown's avatar
unknown committed
2077
  if (thd->query_error)
2078
  {
2079 2080 2081 2082 2083 2084 2085 2086 2087 2088
    /* this err/sql_errno code is copy-paste from send_error() */
    const char *err;
    int sql_errno;
    if ((err=thd->net.last_error)[0])
      sql_errno=thd->net.last_errno;
    else
    {
      sql_errno=ER_UNKNOWN_ERROR;
      err=ER(sql_errno);       
    }
2089
    slave_print_error(rli,sql_errno,"\
2090
Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'",
2091
		      err, (char*)table_name, print_slave_db_safe(db));
2092 2093 2094 2095 2096
    free_root(&thd->mem_root,0);
    return 1;
  }
  free_root(&thd->mem_root,0);
	    
unknown's avatar
unknown committed
2097
  if (thd->fatal_error)
2098
  {
2099 2100 2101
    slave_print_error(rli,ER_UNKNOWN_ERROR, "\
Fatal error running LOAD DATA INFILE on table '%s'. Default database: '%s'",
		      (char*)table_name, print_slave_db_safe(db));
2102 2103 2104
    return 1;
  }

2105
  return ( use_rli_only_for_errors ? 0 : Log_event::exec_event(rli) ); 
2106 2107
}

2108

2109 2110 2111 2112 2113
/*
  The master started

  IMPLEMENTATION
    - To handle the case where the master died without a stop event,
unknown's avatar
unknown committed
2114 2115
      we clean up all temporary tables that we got, if we are sure we
      can (see below).
2116 2117 2118 2119 2120

  TODO
    - Remove all active user locks
*/

2121
int Start_log_event::exec_event(struct st_relay_log_info* rli)
2122
{
unknown's avatar
unknown committed
2123 2124 2125

  switch (rli->mi->old_format) {
  case BINLOG_FORMAT_CURRENT : 
unknown's avatar
unknown committed
2126
    /* 
unknown's avatar
unknown committed
2127 2128
       This is 4.x, so a Start_log_event is only at master startup,
       so we are sure the master has restarted and cleared his temp tables.
unknown's avatar
unknown committed
2129 2130
    */
    close_temporary_tables(thd);
unknown's avatar
unknown committed
2131
    cleanup_load_tmpdir();
2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142
    /*
      As a transaction NEVER spans on 2 or more binlogs:
      if we have an active transaction at this point, the master died while
      writing the transaction to the binary log, i.e. while flushing the binlog
      cache to the binlog. As the write was started, the transaction had been
      committed on the master, so we lack of information to replay this
      transaction on the slave; all we can do is stop with error.
    */
    if (rli->inside_transaction)
    {
      slave_print_error(rli, 0,
unknown's avatar
unknown committed
2143 2144 2145 2146
                        "\
Rolling back unfinished transaction (no COMMIT or ROLLBACK) from relay log. \
Probably cause is that the master died while writing the transaction to it's \
binary log.");
2147 2148
      return(1);
    }
unknown's avatar
unknown committed
2149 2150 2151 2152 2153 2154
    break;
  /* 
     Now the older formats; in that case load_tmpdir is cleaned up by the I/O
     thread.
  */
  case BINLOG_FORMAT_323_LESS_57 :
unknown's avatar
unknown committed
2155
    /*
unknown's avatar
unknown committed
2156 2157 2158
      Cannot distinguish a Start_log_event generated at master startup and
      one generated by master FLUSH LOGS, so cannot be sure temp tables
      have to be dropped. So do nothing.
unknown's avatar
unknown committed
2159
    */
unknown's avatar
unknown committed
2160 2161
    break;
 case BINLOG_FORMAT_323_GEQ_57 : 
unknown's avatar
unknown committed
2162 2163 2164 2165 2166 2167
    /*
      Can distinguish, based on the value of 'created',
      which was generated at master startup.
    */
    if (created)
      close_temporary_tables(thd);
unknown's avatar
unknown committed
2168 2169 2170 2171
    break;
  default :
    /* this case is impossible */
    return 1;
unknown's avatar
unknown committed
2172
  }
unknown's avatar
unknown committed
2173

2174
  return Log_event::exec_event(rli);
2175 2176
}

2177

2178 2179 2180 2181 2182 2183 2184 2185
/*
  The master stopped. Clean up all temporary tables + locks that the
  master may have set.

  TODO
    - Remove all active user locks
*/

2186
int Stop_log_event::exec_event(struct st_relay_log_info* rli)
2187
{
unknown's avatar
unknown committed
2188 2189 2190 2191 2192 2193 2194 2195
  /*
    do not clean up immediately after rotate event;
    QQ: this should be a useless test: the only case when it is false is when
    shutdown occured just after FLUSH LOGS. It has nothing to do with Rotate?
    By the way, immediately after a Rotate
    the I/O thread does not write the Stop to the relay log,
    so we won't come here in that case.
  */
2196
  if (rli->master_log_pos > BIN_LOG_HEADER_SIZE) 
2197 2198
  {
    close_temporary_tables(thd);
2199
    cleanup_load_tmpdir();
2200
  }
unknown's avatar
unknown committed
2201 2202
  /*
    We do not want to update master_log pos because we get a rotate event
2203 2204
    before stop, so by now master_log_name is set to the next log.
    If we updated it, we will have incorrect master coordinates and this
unknown's avatar
unknown committed
2205
    could give false triggers in MASTER_POS_WAIT() that we have reached
2206
    the target position when in fact we have not.
unknown's avatar
unknown committed
2207
  */
2208 2209
  rli->inc_pos(get_event_len(), 0);  
  flush_relay_log_info(rli);
2210 2211 2212
  return 0;
}

2213 2214 2215 2216 2217

/*
  Got a rotate log even from the master

  IMPLEMENTATION
unknown's avatar
unknown committed
2218 2219
    This is mainly used so that we can later figure out the logname and
    position for the master.
2220

unknown's avatar
unknown committed
2221 2222
    We can't rotate the slave as this will cause infinitive rotations
    in a A -> B -> A setup.
2223 2224

  RETURN VALUES
unknown's avatar
unknown committed
2225
    0	ok
unknown's avatar
unknown committed
2226
*/
2227

2228
int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
2229
{
2230 2231
  DBUG_ENTER("Rotate_log_event::exec_event");

2232
  pthread_mutex_lock(&rli->data_lock);
2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245
  /*
    If we are in a transaction: the only normal case is when the I/O thread was
    copying a big transaction, then it was stopped and restarted: we have this
    in the relay log:
    BEGIN
    ...
    ROTATE (a fake one)
    ...
    COMMIT or ROLLBACK
    In that case, we don't want to touch the coordinates which correspond to the
    beginning of the transaction.
  */
  if (!rli->inside_transaction)
unknown's avatar
unknown committed
2246
  {
2247 2248
    memcpy(rli->master_log_name, new_log_ident, ident_len+1);
    rli->master_log_pos= pos;
unknown's avatar
unknown committed
2249
    DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) rli->master_log_pos));
unknown's avatar
unknown committed
2250
  }
2251 2252
  rli->relay_log_pos += get_event_len();
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
2253
  pthread_cond_broadcast(&rli->data_cond);
2254
  flush_relay_log_info(rli);
2255
  DBUG_RETURN(0);
2256 2257
}

unknown's avatar
unknown committed
2258

2259
int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
2260
{
2261
  switch (type) {
2262 2263 2264 2265 2266 2267 2268 2269
  case LAST_INSERT_ID_EVENT:
    thd->last_insert_id_used = 1;
    thd->last_insert_id = val;
    break;
  case INSERT_ID_EVENT:
    thd->next_insert_id = val;
    break;
  }
2270
  rli->inc_pending(get_event_len());
2271 2272 2273
  return 0;
}

unknown's avatar
unknown committed
2274 2275
int Rand_log_event::exec_event(struct st_relay_log_info* rli)
{
2276 2277
  thd->rand.seed1 = (ulong) seed1;
  thd->rand.seed2 = (ulong) seed2;
unknown's avatar
unknown committed
2278 2279 2280 2281
  rli->inc_pending(get_event_len());
  return 0;
}

2282
int Slave_log_event::exec_event(struct st_relay_log_info* rli)
2283
{
unknown's avatar
unknown committed
2284
  if (mysql_bin_log.is_open())
2285
    mysql_bin_log.write(this);
2286
  return Log_event::exec_event(rli);
2287 2288
}

2289
int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
2290 2291
{
  char fname_buf[FN_REFLEN+10];
unknown's avatar
unknown committed
2292
  char *p;
2293 2294 2295
  int fd = -1;
  IO_CACHE file;
  int error = 1;
unknown's avatar
unknown committed
2296

2297
  bzero((char*)&file, sizeof(file));
unknown's avatar
unknown committed
2298 2299
  p = slave_load_file_stem(fname_buf, file_id, server_id);
  strmov(p, ".info");			// strmov takes less code than memcpy
2300 2301 2302 2303 2304
  if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
		    MYF(MY_WME))) < 0 ||
      init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
		    MYF(MY_WME|MY_NABP)))
  {
unknown's avatar
unknown committed
2305
    slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf);
2306 2307 2308 2309
    goto err;
  }
  
  // a trick to avoid allocating another buffer
unknown's avatar
unknown committed
2310
  strmov(p, ".data");
2311 2312 2313 2314
  fname = fname_buf;
  fname_len = (uint)(p-fname) + 5;
  if (write_base(&file))
  {
unknown's avatar
unknown committed
2315
    strmov(p, ".info"); // to have it right in the error message
unknown's avatar
unknown committed
2316
    slave_print_error(rli,my_errno, "Error in Create_file event: could not write to file '%s'", fname_buf);
2317 2318 2319 2320 2321 2322 2323 2324 2325
    goto err;
  }
  end_io_cache(&file);
  my_close(fd, MYF(0));
  
  // fname_buf now already has .data, not .info, because we did our trick
  if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
		    MYF(MY_WME))) < 0)
  {
unknown's avatar
unknown committed
2326
    slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf);
2327 2328
    goto err;
  }
unknown's avatar
unknown committed
2329
  if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
2330
  {
unknown's avatar
unknown committed
2331
    slave_print_error(rli,my_errno, "Error in Create_file event: write to '%s' failed", fname_buf);
2332 2333
    goto err;
  }
2334 2335
  error=0;					// Everything is ok

2336 2337 2338 2339 2340
err:
  if (error)
    end_io_cache(&file);
  if (fd >= 0)
    my_close(fd, MYF(0));
2341
  return error ? 1 : Log_event::exec_event(rli);
2342 2343
}

2344
int Delete_file_log_event::exec_event(struct st_relay_log_info* rli)
2345 2346
{
  char fname[FN_REFLEN+10];
2347
  char *p= slave_load_file_stem(fname, file_id, server_id);
2348
  memcpy(p, ".data", 6);
2349
  (void) my_delete(fname, MYF(MY_WME));
2350
  memcpy(p, ".info", 6);
2351
  (void) my_delete(fname, MYF(MY_WME));
2352
  return Log_event::exec_event(rli);
2353 2354
}

2355
int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
2356 2357
{
  char fname[FN_REFLEN+10];
2358 2359
  char *p= slave_load_file_stem(fname, file_id, server_id);
  int fd;
2360
  int error = 1;
2361

2362 2363 2364
  memcpy(p, ".data", 6);
  if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0)
  {
unknown's avatar
unknown committed
2365
    slave_print_error(rli,my_errno, "Error in Append_block event: could not open file '%s'", fname);
2366 2367
    goto err;
  }
unknown's avatar
unknown committed
2368
  if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
2369
  {
unknown's avatar
unknown committed
2370
    slave_print_error(rli,my_errno, "Error in Append_block event: write to '%s' failed", fname);
2371 2372 2373
    goto err;
  }
  error=0;
2374

2375 2376 2377
err:
  if (fd >= 0)
    my_close(fd, MYF(0));
2378
  return error ? error : Log_event::exec_event(rli);
2379 2380
}

2381
int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
2382 2383
{
  char fname[FN_REFLEN+10];
2384 2385
  char *p= slave_load_file_stem(fname, file_id, server_id);
  int fd;
2386 2387 2388
  int error = 1;
  IO_CACHE file;
  Load_log_event* lev = 0;
2389

2390 2391 2392 2393 2394
  memcpy(p, ".info", 6);
  if ((fd = my_open(fname, O_RDONLY|O_BINARY, MYF(MY_WME))) < 0 ||
      init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
		    MYF(MY_WME|MY_NABP)))
  {
unknown's avatar
unknown committed
2395
    slave_print_error(rli,my_errno, "Error in Exec_load event: could not open file '%s'", fname);
2396 2397
    goto err;
  }
2398 2399
  if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
							 (pthread_mutex_t*)0,
2400 2401
							 (bool)0)) ||
      lev->get_type_code() != NEW_LOAD_EVENT)
2402
  {
unknown's avatar
unknown committed
2403
    slave_print_error(rli,0, "Error in Exec_load event: file '%s' appears corrupted", fname);
2404 2405
    goto err;
  }
unknown's avatar
unknown committed
2406

2407
  lev->thd = thd;
2408 2409
  /*
    lev->exec_event should use rli only for errors
unknown's avatar
unknown committed
2410 2411 2412
    i.e. should not advance rli's position.
    lev->exec_event is the place where the table is loaded (it calls
    mysql_load()).
2413 2414
  */
  if (lev->exec_event(0,rli,1)) 
2415
  {
2416 2417 2418 2419 2420 2421 2422 2423 2424
    /*
      We want to indicate the name of the file that could not be loaded
      (SQL_LOADxxx).
      But as we are here we are sure the error is in rli->last_slave_error and
      rli->last_slave_errno (example of error: duplicate entry for key), so we
      don't want to overwrite it with the filename.
      What we want instead is add the filename to the current error message.
    */
    char *tmp= my_strdup(rli->last_slave_error,MYF(MY_WME));
2425 2426 2427 2428 2429 2430 2431 2432
    if (tmp)
    {
      slave_print_error(rli,
			rli->last_slave_errno, /* ok to re-use error code */
			"%s. Failed executing load from '%s'", 
			tmp, fname);
      my_free(tmp,MYF(0));
    }
2433 2434
    goto err;
  }
unknown's avatar
unknown committed
2435 2436 2437 2438 2439 2440 2441 2442 2443 2444
  /*
    We have an open file descriptor to the .info file; we need to close it
    or Windows will refuse to delete the file in my_delete().
  */
  if (fd >= 0)
  {
    my_close(fd, MYF(0));
    end_io_cache(&file);
    fd= -1;
  }
2445
  (void) my_delete(fname, MYF(MY_WME));
2446
  memcpy(p, ".data", 6);
2447
  (void) my_delete(fname, MYF(MY_WME));
2448
  error = 0;
2449

2450 2451 2452
err:
  delete lev;
  if (fd >= 0)
2453
  {
2454
    my_close(fd, MYF(0));
2455 2456
    end_io_cache(&file);
  }
2457
  return error ? error : Log_event::exec_event(rli);
2458 2459
}

2460
#endif /* !MYSQL_CLIENT */