log_event.cc 54.7 KB
Newer Older
unknown's avatar
unknown committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
   
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#ifndef MYSQL_CLIENT
#ifdef __GNUC__
#pragma implementation				// gcc: Class implementation
#endif
#include  "mysql_priv.h"
23
#include "slave.h"
24
#include <my_dir.h>
unknown's avatar
unknown committed
25 26
#endif /* MYSQL_CLIENT */

27 28
#include <assert.h>

unknown's avatar
unknown committed
29 30 31 32 33
inline int my_b_safe_write(IO_CACHE* file, const byte *buf,
			   int len)
{
  /*
    Sasha: We are not writing this with the ? operator to avoid hitting
34 35 36
    a possible compiler bug. At least gcc 2.95 cannot deal with 
    several layers of ternary operators that evaluated comma(,) operator
    expressions inside - I do have a test case if somebody wants it
unknown's avatar
unknown committed
37
  */
38
  if (file->type == SEQ_READ_APPEND)
unknown's avatar
unknown committed
39 40 41
    return my_b_append(file, buf,len);
  return my_b_write(file, buf,len);
}
42

43
#ifdef MYSQL_CLIENT
44
static void pretty_print_str(FILE* file, char* str, int len)
unknown's avatar
unknown committed
45
{
46
  char* end = str + len;
unknown's avatar
unknown committed
47
  fputc('\'', file);
48 49
  while (str < end)
  {
unknown's avatar
unknown committed
50
    char c;
51 52 53 54 55 56 57 58 59 60 61 62
    switch ((c=*str++)) {
    case '\n': fprintf(file, "\\n"); break;
    case '\r': fprintf(file, "\\r"); break;
    case '\\': fprintf(file, "\\\\"); break;
    case '\b': fprintf(file, "\\b"); break;
    case '\t': fprintf(file, "\\t"); break;
    case '\'': fprintf(file, "\\'"); break;
    case 0   : fprintf(file, "\\0"); break;
    default:
      fputc(c, file);
      break;
    }
63 64
  }
  fputc('\'', file);
unknown's avatar
unknown committed
65
}
66
#endif
unknown's avatar
unknown committed
67

unknown's avatar
unknown committed
68 69
#ifndef MYSQL_CLIENT

70 71 72 73 74
inline int ignored_error_code(int err_code)
{
  return use_slave_mask && bitmap_is_set(&slave_error_mask, err_code);
}

unknown's avatar
unknown committed
75

76
static void pretty_print_str(String* packet, char* str, int len)
unknown's avatar
unknown committed
77
{
78
  char* end = str + len;
unknown's avatar
unknown committed
79
  packet->append('\'');
80 81 82
  while (str < end)
  {
    char c;
83
    switch ((c=*str++)) {
84 85 86 87 88 89 90 91 92 93 94
    case '\n': packet->append( "\\n"); break;
    case '\r': packet->append( "\\r"); break;
    case '\\': packet->append( "\\\\"); break;
    case '\b': packet->append( "\\b"); break;
    case '\t': packet->append( "\\t"); break;
    case '\'': packet->append( "\\'"); break;
    case 0   : packet->append( "\\0"); break;
    default:
      packet->append((char)c);
      break;
    }
unknown's avatar
unknown committed
95 96 97 98
  }
  packet->append('\'');
}

unknown's avatar
unknown committed
99

100 101 102
static inline char* slave_load_file_stem(char*buf, uint file_id,
					 int event_server_id)
{
103
  fn_format(buf,"SQL_LOAD-",slave_load_tmpdir,"",0); /* 4+32); */
104 105 106 107 108 109 110 111
  buf = strend(buf);
  buf = int10_to_str(::server_id, buf, 10);
  *buf++ = '-';
  buf = int10_to_str(event_server_id, buf, 10);
  *buf++ = '-';
  return int10_to_str(file_id, buf, 10);
}

unknown's avatar
unknown committed
112 113 114 115
#endif

const char* Log_event::get_type_str()
{
116
  switch(get_type_code()) {
unknown's avatar
unknown committed
117 118 119 120 121 122
  case START_EVENT:  return "Start";
  case STOP_EVENT:   return "Stop";
  case QUERY_EVENT:  return "Query";
  case ROTATE_EVENT: return "Rotate";
  case INTVAR_EVENT: return "Intvar";
  case LOAD_EVENT:   return "Load";
123
  case NEW_LOAD_EVENT:   return "New_load";
unknown's avatar
unknown committed
124
  case SLAVE_EVENT:  return "Slave";
125 126 127 128
  case CREATE_FILE_EVENT: return "Create_file";
  case APPEND_BLOCK_EVENT: return "Append_block";
  case DELETE_FILE_EVENT: return "Delete_file";
  case EXEC_LOAD_EVENT: return "Exec_load";
unknown's avatar
unknown committed
129 130 131 132 133
  default: /* impossible */ return "Unknown";
  }
}

#ifndef MYSQL_CLIENT
134 135 136
Log_event::Log_event(THD* thd_arg, uint16 flags_arg)
  :exec_time(0), flags(flags_arg), cached_event_len(0),
   temp_buf(0), thd(thd_arg)
137 138 139 140 141
{
  if (thd)
  {
    server_id = thd->server_id;
    when = thd->start_time;
142
    log_pos = thd->log_pos;
143 144 145 146 147
  }
  else
  {
    server_id = ::server_id;
    when = time(NULL);
148
    log_pos=0;
149 150
  }
}
151

152 153 154 155 156 157 158 159
/*
  Delete all temporary files used for SQL_LOAD.

  TODO
  - When we get a 'server start' event, we should only remove
    the files associated with the server id that just started.
    Easily fixable by adding server_id as a prefix to the log files.
*/
160

161 162 163 164 165 166 167 168
static void cleanup_load_tmpdir()
{
  MY_DIR *dirp;
  FILEINFO *file;
  uint i;
  if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
    return;

unknown's avatar
unknown committed
169
  for (i=0 ; i < (uint)dirp->number_off_files; i++)
170 171
  {
    file=dirp->dir_entry+i;
unknown's avatar
unknown committed
172
    if (is_prefix(file->name,"SQL_LOAD-"))
173
      my_delete(file->name, MYF(0));
174 175 176 177 178
  }

  my_dirend(dirp);
}

179 180
#endif

181 182
Log_event::Log_event(const char* buf, bool old_format)
  :cached_event_len(0), temp_buf(0)
183 184 185
{
  when = uint4korr(buf);
  server_id = uint4korr(buf + SERVER_ID_OFFSET);
186 187
  if (old_format)
  {
188
    log_pos=0;
189 190 191 192
    flags=0;
  }
  else
  {
193
    log_pos = uint4korr(buf + LOG_POS_OFFSET);
194 195
    flags = uint2korr(buf + FLAGS_OFFSET);
  }
196 197 198 199 200 201 202 203
#ifndef MYSQL_CLIENT
  thd = 0;
#endif  
}


#ifndef MYSQL_CLIENT

204
int Log_event::exec_event(struct st_relay_log_info* rli)
205
{
206
  if (rli)					// QQ When is this not true ?
207
  {
208 209 210
    rli->inc_pos(get_event_len(),log_pos);
    DBUG_ASSERT(rli->sql_thd != 0);
    flush_relay_log_info(rli);
211 212 213
  }
  return 0;
}
unknown's avatar
unknown committed
214 215 216 217 218 219 220 221

void Log_event::pack_info(String* packet)
{
  net_store_data(packet, "", 0);
}

void Query_log_event::pack_info(String* packet)
{
222
  char buf[256];
223
  String tmp(buf, sizeof(buf), system_charset_info);
224
  tmp.length(0);
225
  if (db && db_len)
unknown's avatar
unknown committed
226
  {
unknown's avatar
unknown committed
227
   tmp.append("use `", 5);
unknown's avatar
unknown committed
228
   tmp.append(db, db_len);
unknown's avatar
unknown committed
229
   tmp.append("`; ", 3);
unknown's avatar
unknown committed
230 231
  }

232
  if (query && q_len)
unknown's avatar
unknown committed
233 234 235 236 237 238
    tmp.append(query, q_len);
  net_store_data(packet, (char*)tmp.ptr(), tmp.length());
}

void Start_log_event::pack_info(String* packet)
{
239
  char buf1[256];
240
  String tmp(buf1, sizeof(buf1), system_charset_info);
241
  tmp.length(0);
unknown's avatar
unknown committed
242 243 244 245 246 247 248 249 250 251 252
  char buf[22];

  tmp.append("Server ver: ");
  tmp.append(server_version);
  tmp.append(", Binlog ver: ");
  tmp.append(llstr(binlog_version, buf));
  net_store_data(packet, tmp.ptr(), tmp.length());
}

void Load_log_event::pack_info(String* packet)
{
253
  char buf[256];
254
  String tmp(buf, sizeof(buf), system_charset_info);
255
  tmp.length(0);
unknown's avatar
unknown committed
256
  if (db && db_len)
unknown's avatar
unknown committed
257 258 259 260 261 262 263
  {
   tmp.append("use ");
   tmp.append(db, db_len);
   tmp.append("; ", 2);
  }

  tmp.append("LOAD DATA INFILE '");
264
  tmp.append(fname, fname_len);
unknown's avatar
unknown committed
265
  tmp.append("' ", 2);
unknown's avatar
unknown committed
266
  if (sql_ex.opt_flags && REPLACE_FLAG )
unknown's avatar
unknown committed
267
    tmp.append(" REPLACE ");
unknown's avatar
unknown committed
268
  else if (sql_ex.opt_flags && IGNORE_FLAG )
unknown's avatar
unknown committed
269 270 271 272
    tmp.append(" IGNORE ");
  
  tmp.append("INTO TABLE ");
  tmp.append(table_name);
273
  if (sql_ex.field_term_len)
unknown's avatar
unknown committed
274 275
  {
    tmp.append(" FIELDS TERMINATED BY ");
276
    pretty_print_str(&tmp, sql_ex.field_term, sql_ex.field_term_len);
unknown's avatar
unknown committed
277 278
  }

279
  if (sql_ex.enclosed_len)
unknown's avatar
unknown committed
280 281 282 283
  {
    if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
      tmp.append(" OPTIONALLY ");
    tmp.append( " ENCLOSED BY ");
284
    pretty_print_str(&tmp, sql_ex.enclosed, sql_ex.enclosed_len);
unknown's avatar
unknown committed
285 286
  }
     
287
  if (sql_ex.escaped_len)
unknown's avatar
unknown committed
288 289
  {
    tmp.append( " ESCAPED BY ");
290
    pretty_print_str(&tmp, sql_ex.escaped, sql_ex.escaped_len);
unknown's avatar
unknown committed
291 292
  }
     
293
  if (sql_ex.line_term_len)
unknown's avatar
unknown committed
294 295
  {
    tmp.append(" LINES TERMINATED BY ");
296
    pretty_print_str(&tmp, sql_ex.line_term, sql_ex.line_term_len);
unknown's avatar
unknown committed
297 298
  }

299
  if (sql_ex.line_start_len)
unknown's avatar
unknown committed
300 301
  {
    tmp.append(" LINES STARTING BY ");
302
    pretty_print_str(&tmp, sql_ex.line_start, sql_ex.line_start_len);
unknown's avatar
unknown committed
303 304 305 306 307 308 309 310 311 312
  }
     
  if ((int)skip_lines > 0)
    tmp.append( " IGNORE %ld LINES ", (long) skip_lines);

  if (num_fields)
  {
    uint i;
    const char* field = fields;
    tmp.append(" (");
313
    for (i = 0; i < num_fields; i++)
unknown's avatar
unknown committed
314
    {
unknown's avatar
unknown committed
315
      if (i)
unknown's avatar
unknown committed
316 317 318 319 320 321 322 323 324 325 326 327 328
	tmp.append(" ,");
      tmp.append( field);
	  
      field += field_lens[i]  + 1;
    }
    tmp.append(')');
  }

  net_store_data(packet, tmp.ptr(), tmp.length());
}

void Rotate_log_event::pack_info(String* packet)
{
329
  char buf1[256], buf[22];
330
  String tmp(buf1, sizeof(buf1), system_charset_info);
331
  tmp.length(0);
332
  tmp.append(new_log_ident, ident_len);
333 334
  tmp.append(";pos=");
  tmp.append(llstr(pos,buf));
unknown's avatar
unknown committed
335
  if (flags & LOG_EVENT_FORCED_ROTATE_F)
336 337
    tmp.append("; forced by master");
  net_store_data(packet, tmp.ptr(), tmp.length());
unknown's avatar
unknown committed
338 339 340 341
}

void Intvar_log_event::pack_info(String* packet)
{
342
  char buf1[256], buf[22];
343
  String tmp(buf1, sizeof(buf1), system_charset_info);
344
  tmp.length(0);
unknown's avatar
unknown committed
345 346 347 348 349 350 351 352
  tmp.append(get_var_type_name());
  tmp.append('=');
  tmp.append(llstr(val, buf));
  net_store_data(packet, tmp.ptr(), tmp.length());
}

void Slave_log_event::pack_info(String* packet)
{
353
  char buf1[256], buf[22], *end;
354
  String tmp(buf1, sizeof(buf1), system_charset_info);
355
  tmp.length(0);
unknown's avatar
unknown committed
356 357 358
  tmp.append("host=");
  tmp.append(master_host);
  tmp.append(",port=");
359 360
  end= int10_to_str((long) master_port, buf, 10);
  tmp.append(buf, (uint32) (end-buf));
unknown's avatar
unknown committed
361 362 363 364 365
  tmp.append(",log=");
  tmp.append(master_log);
  tmp.append(",pos=");
  tmp.append(llstr(master_pos,buf));
  net_store_data(packet, tmp.ptr(), tmp.length());
unknown's avatar
unknown committed
366 367 368 369 370 371 372 373 374
}


void Log_event::init_show_field_list(List<Item>* field_list)
{
  field_list->push_back(new Item_empty_string("Log_name", 20));
  field_list->push_back(new Item_empty_string("Pos", 20));
  field_list->push_back(new Item_empty_string("Event_type", 20));
  field_list->push_back(new Item_empty_string("Server_id", 20));
375
  field_list->push_back(new Item_empty_string("Orig_log_pos", 20));
unknown's avatar
unknown committed
376 377 378
  field_list->push_back(new Item_empty_string("Info", 20));
}

unknown's avatar
unknown committed
379
int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos)
unknown's avatar
unknown committed
380 381 382 383 384 385 386 387 388
{
  String* packet = &thd->packet;
  const char* p = strrchr(log_name, FN_LIBCHAR);
  const char* event_type;
  if (p)
    log_name = p + 1;
  
  packet->length(0);
  net_store_data(packet, log_name, strlen(log_name));
unknown's avatar
unknown committed
389
  net_store_data(packet, (longlong) pos);
unknown's avatar
unknown committed
390 391 392
  event_type = get_type_str();
  net_store_data(packet, event_type, strlen(event_type));
  net_store_data(packet, server_id);
unknown's avatar
unknown committed
393
  net_store_data(packet, (longlong) log_pos);
unknown's avatar
unknown committed
394
  pack_info(packet);
unknown's avatar
unknown committed
395
  return my_net_write(&thd->net, (char*) packet->ptr(), packet->length());
unknown's avatar
unknown committed
396 397
}

398 399
#endif /* MYSQL_CLIENT */

unknown's avatar
unknown committed
400

401
int Query_log_event::write(IO_CACHE* file)
unknown's avatar
unknown committed
402 403 404 405
{
  return query ? Log_event::write(file) : -1; 
}

406

407
int Log_event::write(IO_CACHE* file)
unknown's avatar
unknown committed
408
{
409
  return (write_header(file) || write_data(file)) ? -1 : 0;
unknown's avatar
unknown committed
410 411
}

412

413
int Log_event::write_header(IO_CACHE* file)
unknown's avatar
unknown committed
414
{
415
  char buf[LOG_EVENT_HEADER_LEN];
unknown's avatar
unknown committed
416
  char* pos = buf;
unknown's avatar
unknown committed
417
  int4store(pos, (ulong) when); // timestamp
unknown's avatar
unknown committed
418 419
  pos += 4;
  *pos++ = get_type_code(); // event type code
420 421
  int4store(pos, server_id);
  pos += 4;
422 423
  long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
  int4store(pos, tmp);
unknown's avatar
unknown committed
424
  pos += 4;
425
  int4store(pos, log_pos);
426 427 428
  pos += 4;
  int2store(pos, flags);
  pos += 2;
429
  return (my_b_safe_write(file, (byte*) buf, (uint) (pos - buf)));
unknown's avatar
unknown committed
430 431 432 433
}

#ifndef MYSQL_CLIENT

434
int Log_event::read_log_event(IO_CACHE* file, String* packet,
435
			      pthread_mutex_t* log_lock)
unknown's avatar
unknown committed
436 437
{
  ulong data_len;
438
  int result=0;
unknown's avatar
unknown committed
439
  char buf[LOG_EVENT_HEADER_LEN];
440
  DBUG_ENTER("read_log_event");
441

442
  if (log_lock)
443
    pthread_mutex_lock(log_lock);
444 445
  if (my_b_read(file, (byte*) buf, sizeof(buf)))
  {
446 447 448 449 450
    /*
      If the read hits eof, we must report it as eof so the caller
      will know it can go into cond_wait to be woken up on the next
      update to the log.
    */
451
    DBUG_PRINT("error",("file->error: %d", file->error));
452 453 454
    if (!file->error)
      result= LOG_READ_EOF;
    else
455
      result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
456
    goto end;
457
  }
458
  data_len= uint4korr(buf + EVENT_LEN_OFFSET);
unknown's avatar
unknown committed
459 460
  if (data_len < LOG_EVENT_HEADER_LEN ||
      data_len > current_thd->variables.max_allowed_packet)
461
  {
462
    DBUG_PRINT("error",("data_len: %ld", data_len));
463 464 465
    result= ((data_len < LOG_EVENT_HEADER_LEN) ? LOG_READ_BOGUS :
	     LOG_READ_TOO_LARGE);
    goto end;
466
  }
unknown's avatar
unknown committed
467
  packet->append(buf, sizeof(buf));
468
  data_len-= LOG_EVENT_HEADER_LEN;
469 470 471
  if (data_len)
  {
    if (packet->append(file, data_len))
472
    {
473
      /*
474 475
	Here we should never hit EOF in a non-error condition.
	EOF means we are reading the event partially, which should
476 477 478 479
	never happen.
      */
      result= file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO;
      /* Implicit goto end; */
480
    }
481
  }
482 483 484 485

end:
  if (log_lock)
    pthread_mutex_unlock(log_lock);
486
  DBUG_RETURN(result);
unknown's avatar
unknown committed
487 488 489 490
}

#endif // MYSQL_CLIENT

unknown's avatar
unknown committed
491
#ifndef MYSQL_CLIENT
unknown's avatar
unknown committed
492 493
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
494
#define max_allowed_packet current_thd->variables.max_allowed_packet
495
#else
496
#define UNLOCK_MUTEX
497 498 499
#define LOCK_MUTEX
#endif

unknown's avatar
unknown committed
500 501
// allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT
502 503 504
Log_event* Log_event::read_log_event(IO_CACHE* file,
				     pthread_mutex_t* log_lock,
				     bool old_format)
unknown's avatar
unknown committed
505
#else
506
Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
unknown's avatar
unknown committed
507
#endif  
unknown's avatar
unknown committed
508
{
509
  char head[LOG_EVENT_HEADER_LEN];
510 511
  uint header_size= old_format ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;

512
  LOCK_MUTEX;
513
  if (my_b_read(file, (byte *) head, header_size))
514
  {
unknown's avatar
merged  
unknown committed
515
    UNLOCK_MUTEX;
516
    return 0;
517
  }
unknown's avatar
unknown committed
518

519
  uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
520 521 522
  char *buf= 0;
  const char *error= 0;
  Log_event *res=  0;
unknown's avatar
unknown committed
523

524
  if (data_len > max_allowed_packet)
unknown's avatar
unknown committed
525
  {
526 527
    error = "Event too big";
    goto err;
unknown's avatar
unknown committed
528 529
  }

530
  if (data_len < header_size)
unknown's avatar
unknown committed
531
  {
532 533
    error = "Event too small";
    goto err;
unknown's avatar
unknown committed
534
  }
535 536 537

  // some events use the extra byte to null-terminate strings
  if (!(buf = my_malloc(data_len+1, MYF(MY_WME))))
538 539 540
  {
    error = "Out of memory";
    goto err;
unknown's avatar
unknown committed
541
  }
542
  buf[data_len] = 0;
543
  memcpy(buf, head, header_size);
544
  if (my_b_read(file, (byte*) buf + header_size, data_len - header_size))
545 546 547 548
  {
    error = "read error";
    goto err;
  }
549
  if ((res = read_log_event(buf, data_len, &error, old_format)))
550
    res->register_temp_buf(buf);
551

552
err:
unknown's avatar
merged  
unknown committed
553
  UNLOCK_MUTEX;
554
  if (error)
555
  {
556 557
    sql_print_error("Error in Log_event::read_log_event(): '%s', \
data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]);
558 559
    my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
  }
560
  return res;
unknown's avatar
unknown committed
561 562
}

563

564
Log_event* Log_event::read_log_event(const char* buf, int event_len,
565
				     const char **error, bool old_format)
unknown's avatar
unknown committed
566
{
567
  if (event_len < EVENT_LEN_OFFSET ||
568 569 570
      (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
  {
    *error="Sanity check failed";		// Needed to free buffer
571
    return NULL; // general sanity check - will fail on a partial read
572
  }
573
  
574 575
  Log_event* ev = NULL;
  
576
  switch(buf[EVENT_TYPE_OFFSET]) {
unknown's avatar
unknown committed
577
  case QUERY_EVENT:
578
    ev  = new Query_log_event(buf, event_len, old_format);
579
    break;
unknown's avatar
unknown committed
580
  case LOAD_EVENT:
unknown's avatar
unknown committed
581 582
    ev = new Create_file_log_event(buf, event_len, old_format);
    break;
583
  case NEW_LOAD_EVENT:
584
    ev = new Load_log_event(buf, event_len, old_format);
585
    break;
unknown's avatar
unknown committed
586
  case ROTATE_EVENT:
587
    ev = new Rotate_log_event(buf, event_len, old_format);
588
    break;
unknown's avatar
unknown committed
589
  case SLAVE_EVENT:
590 591 592
    ev = new Slave_log_event(buf, event_len);
    break;
  case CREATE_FILE_EVENT:
unknown's avatar
unknown committed
593
    ev = new Create_file_log_event(buf, event_len, old_format);
594 595 596 597 598 599 600 601 602 603 604
    break;
  case APPEND_BLOCK_EVENT:
    ev = new Append_block_log_event(buf, event_len);
    break;
  case DELETE_FILE_EVENT:
    ev = new Delete_file_log_event(buf, event_len);
    break;
  case EXEC_LOAD_EVENT:
    ev = new Execute_load_log_event(buf, event_len);
    break;
  case START_EVENT:
605
    ev = new Start_log_event(buf, old_format);
606 607
    break;
  case STOP_EVENT:
608
    ev = new Stop_log_event(buf, old_format);
609 610
    break;
  case INTVAR_EVENT:
611
    ev = new Intvar_log_event(buf, old_format);
612
    break;
613 614
  default:
    break;
unknown's avatar
unknown committed
615
  }
616
  if (!ev || !ev->is_valid())
617
  {
618
    *error= "Found invalid event in binary log";
619 620 621 622 623
    delete ev;
    return 0;
  }
  ev->cached_event_len = event_len;
  return ev;  
unknown's avatar
unknown committed
624 625
}

626

627
#ifdef MYSQL_CLIENT
628 629
void Log_event::print_header(FILE* file)
{
630
  char llbuff[22];
631 632
  fputc('#', file);
  print_timestamp(file);
633
  fprintf(file, " server id %d  log_pos %s ", server_id,
634
	  llstr(log_pos,llbuff)); 
635 636
}

637
void Log_event::print_timestamp(FILE* file, time_t* ts)
unknown's avatar
unknown committed
638
{
unknown's avatar
unknown committed
639
  struct tm *res;
640 641
  if (!ts)
    ts = &when;
642 643
#ifdef MYSQL_SERVER				// This is always false
  struct tm tm_tmp;
unknown's avatar
unknown committed
644
  localtime_r(ts,(res= &tm_tmp));
unknown's avatar
unknown committed
645
#else
646
  res=localtime(ts);
unknown's avatar
unknown committed
647
#endif
648 649

  fprintf(file,"%02d%02d%02d %2d:%02d:%02d",
650 651 652 653 654 655
	  res->tm_year % 100,
	  res->tm_mon+1,
	  res->tm_mday,
	  res->tm_hour,
	  res->tm_min,
	  res->tm_sec);
unknown's avatar
unknown committed
656 657 658
}


659
void Start_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
660 661 662 663
{
  if (short_form)
    return;

664 665 666 667 668
  print_header(file);
  fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version,
	  server_version);
  print_timestamp(file, (time_t*)&created);
  fputc('\n', file);
unknown's avatar
unknown committed
669 670 671
  fflush(file);
}

672
void Stop_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
673 674 675 676
{
  if (short_form)
    return;

677
  print_header(file);
unknown's avatar
unknown committed
678 679 680 681
  fprintf(file, "\tStop\n");
  fflush(file);
}

682
void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
683
{
684
  char buf[22];
unknown's avatar
unknown committed
685 686 687
  if (short_form)
    return;

688
  print_header(file);
unknown's avatar
unknown committed
689 690 691 692
  fprintf(file, "\tRotate to ");
  if (new_log_ident)
    my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, 
	      MYF(MY_NABP | MY_WME));
unknown's avatar
unknown committed
693 694 695 696
  fprintf(file, "  pos: %s", llstr(pos, buf));
  if (flags & LOG_EVENT_FORCED_ROTATE_F)
    fprintf(file,"  forced by master");
  fputc('\n', file);
unknown's avatar
unknown committed
697 698 699
  fflush(file);
}

700 701
#endif /* #ifdef MYSQL_CLIENT */

702

703
Start_log_event::Start_log_event(const char* buf,
704 705
				 bool old_format)
  :Log_event(buf, old_format)
706
{
707 708 709
  buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  binlog_version = uint2korr(buf+ST_BINLOG_VER_OFFSET);
  memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
710
	 ST_SERVER_VER_LEN);
711
  created = uint4korr(buf+ST_CREATED_OFFSET);
712 713
}

714 715
int Start_log_event::write_data(IO_CACHE* file)
{
716 717 718 719
  char buff[START_HEADER_LEN];
  int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
  memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
  int4store(buff + ST_CREATED_OFFSET,created);
720
  return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
721 722
}

723

724
Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
725 726
				   bool old_format)
  :Log_event(buf, old_format),new_log_ident(NULL),alloced(0)
unknown's avatar
unknown committed
727
{
unknown's avatar
unknown committed
728
  // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
729 730
  int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  uint ident_offset;
unknown's avatar
unknown committed
731
  if (event_len < header_size)
unknown's avatar
unknown committed
732
    return;
733 734 735
  buf += header_size;
  if (old_format)
  {
unknown's avatar
unknown committed
736
    ident_len = (uint)(event_len - OLD_HEADER_LEN);
737 738 739 740 741
    pos = 4;
    ident_offset = 0;
  }
  else
  {
unknown's avatar
unknown committed
742
    ident_len = (uint)(event_len - ROTATE_EVENT_OVERHEAD);
743 744 745
    pos = uint8korr(buf + R_POS_OFFSET);
    ident_offset = ROTATE_HEADER_LEN;
  }
unknown's avatar
unknown committed
746
  set_if_smaller(ident_len,FN_REFLEN-1);
747 748 749 750
  if (!(new_log_ident= my_strdup_with_length((byte*) buf +
					     ident_offset,
					     (uint) ident_len,
					     MYF(MY_WME))))
unknown's avatar
unknown committed
751 752 753 754
    return;
  alloced = 1;
}

755

756
int Rotate_log_event::write_data(IO_CACHE* file)
unknown's avatar
unknown committed
757
{
758 759
  char buf[ROTATE_HEADER_LEN];
  int8store(buf, pos + R_POS_OFFSET);
760 761
  return (my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
	  my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len));
unknown's avatar
unknown committed
762 763
}

764

765 766
#ifndef MYSQL_CLIENT
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
767
				 ulong query_length, bool using_trans)
768
  :Log_event(thd_arg), data_buf(0), query(query_arg),  db(thd_arg->db),
769
  q_len((uint32) query_length),
770 771 772
  error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno),
  thread_id(thd_arg->thread_id),
  cache_stmt(using_trans &&
unknown's avatar
unknown committed
773
	     (thd_arg->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
774 775 776 777 778 779
{
  time_t end_time;
  time(&end_time);
  exec_time = (ulong) (end_time  - thd->start_time);
  db_len = (db) ? (uint32) strlen(db) : 0;
}
780 781
#endif

782
Query_log_event::Query_log_event(const char* buf, int event_len,
unknown's avatar
unknown committed
783 784
				 bool old_format)
  :Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
unknown's avatar
unknown committed
785 786
{
  ulong data_len;
787 788 789 790 791 792 793 794 795 796 797 798 799 800
  if (old_format)
  {
    if ((uint)event_len < OLD_HEADER_LEN + QUERY_HEADER_LEN)
      return;				
    data_len = event_len - (QUERY_HEADER_LEN + OLD_HEADER_LEN);
    buf += OLD_HEADER_LEN;
  }
  else
  {
    if ((uint)event_len < QUERY_EVENT_OVERHEAD)
      return;				
    data_len = event_len - QUERY_EVENT_OVERHEAD;
    buf += LOG_EVENT_HEADER_LEN;
  }
unknown's avatar
unknown committed
801

802 803
  exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
  error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
unknown's avatar
unknown committed
804

805
  if (!(data_buf = (char*) my_malloc(data_len + 1, MYF(MY_WME))))
unknown's avatar
unknown committed
806 807
    return;

808 809
  memcpy(data_buf, buf + Q_DATA_OFFSET, data_len);
  thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
unknown's avatar
unknown committed
810
  db = data_buf;
811
  db_len = (uint)buf[Q_DB_LEN_OFFSET];
unknown's avatar
unknown committed
812 813 814 815 816
  query=data_buf + db_len + 1;
  q_len = data_len - 1 - db_len;
  *((char*)query+q_len) = 0;
}

817

818 819
#ifdef MYSQL_CLIENT

820
void Query_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
821
{
822
  char buff[40],*end;				// Enough for SET TIMESTAMP
unknown's avatar
unknown committed
823 824
  if (!short_form)
  {
825
    print_header(file);
826 827
    fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
	    (ulong) thread_id, (ulong) exec_time, error_code);
unknown's avatar
unknown committed
828 829
  }

830 831
  bool same_db = 0;

unknown's avatar
unknown committed
832
  if (db && last_db)
833 834 835 836
  {
    if (!(same_db = !memcmp(last_db, db, db_len + 1)))
      memcpy(last_db, db, db_len + 1);
  }
837 838
  
  if (db && db[0] && !same_db)
unknown's avatar
unknown committed
839
    fprintf(file, "use %s;\n", db);
840 841 842 843
  end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
  *end++=';';
  *end++='\n';
  my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME));
unknown's avatar
unknown committed
844 845 846
  my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME));
  fprintf(file, ";\n");
}
847 848
#endif

849

850
int Query_log_event::write_data(IO_CACHE* file)
unknown's avatar
unknown committed
851
{
852 853
  if (!query)
    return -1;
unknown's avatar
unknown committed
854 855
  
  char buf[QUERY_HEADER_LEN]; 
856 857
  int4store(buf + Q_THREAD_ID_OFFSET, thread_id);
  int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
858
  buf[Q_DB_LEN_OFFSET] = (char) db_len;
859
  int2store(buf + Q_ERR_CODE_OFFSET, error_code);
unknown's avatar
unknown committed
860

861 862 863
  return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
	  my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
	  my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0;
unknown's avatar
unknown committed
864 865
}

866 867
Intvar_log_event::Intvar_log_event(const char* buf, bool old_format)
  :Log_event(buf, old_format)
unknown's avatar
unknown committed
868
{
869
  buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
870 871
  type = buf[I_TYPE_OFFSET];
  val = uint8korr(buf+I_VAL_OFFSET);
unknown's avatar
unknown committed
872 873
}

unknown's avatar
unknown committed
874 875
const char* Intvar_log_event::get_var_type_name()
{
876
  switch(type) {
unknown's avatar
unknown committed
877 878 879 880 881 882
  case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
  case INSERT_ID_EVENT: return "INSERT_ID";
  default: /* impossible */ return "UNKNOWN";
  }
}

883
int Intvar_log_event::write_data(IO_CACHE* file)
unknown's avatar
unknown committed
884 885
{
  char buf[9];
886 887
  buf[I_TYPE_OFFSET] = type;
  int8store(buf + I_VAL_OFFSET, val);
888
  return my_b_safe_write(file, (byte*) buf, sizeof(buf));
unknown's avatar
unknown committed
889 890
}

891
#ifdef MYSQL_CLIENT
892
void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
893 894
{
  char llbuff[22];
895 896 897
  const char *msg;
  LINT_INIT(msg);

unknown's avatar
unknown committed
898
  if (!short_form)
unknown's avatar
unknown committed
899
  {
900
    print_header(file);
unknown's avatar
unknown committed
901 902 903 904
    fprintf(file, "\tIntvar\n");
  }

  fprintf(file, "SET ");
905
  switch (type) {
unknown's avatar
unknown committed
906
  case LAST_INSERT_ID_EVENT:
907
    msg="LAST_INSERT_ID";
unknown's avatar
unknown committed
908 909
    break;
  case INSERT_ID_EVENT:
910
    msg="INSERT_ID";
unknown's avatar
unknown committed
911 912
    break;
  }
913
  fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff));
unknown's avatar
unknown committed
914 915
  fflush(file);
}
916
#endif
unknown's avatar
unknown committed
917

918

919
int Load_log_event::write_data_header(IO_CACHE* file)
unknown's avatar
unknown committed
920 921
{
  char buf[LOAD_HEADER_LEN];
922 923 924 925 926 927
  int4store(buf + L_THREAD_ID_OFFSET, thread_id);
  int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
  int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
  buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
  buf[L_DB_LEN_OFFSET] = (char)db_len;
  int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
928
  return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN);
929
}
unknown's avatar
unknown committed
930

931 932
int Load_log_event::write_data_body(IO_CACHE* file)
{
933 934
  if (sql_ex.write_data(file))
    return 1;
935 936
  if (num_fields && fields && field_lens)
  {
937 938
    if (my_b_safe_write(file, (byte*)field_lens, num_fields) ||
	my_b_safe_write(file, (byte*)fields, field_block_len))
939 940
      return 1;
  }
941 942 943
  return (my_b_safe_write(file, (byte*)table_name, table_name_len + 1) ||
	  my_b_safe_write(file, (byte*)db, db_len + 1) ||
	  my_b_safe_write(file, (byte*)fname, fname_len));
unknown's avatar
unknown committed
944 945
}

946

947

948 949
static bool write_str(IO_CACHE *file, char *str, byte length)
{
950 951
  return (my_b_safe_write(file, &length, 1) ||
	  my_b_safe_write(file, (byte*) str, (int) length));
952
}
953

954

955 956 957 958
int sql_ex_info::write_data(IO_CACHE* file)
{
  if (new_format())
  {
959 960 961 962 963
    return (write_str(file, field_term, field_term_len) ||
	    write_str(file, enclosed,   enclosed_len) ||
	    write_str(file, line_term,  line_term_len) ||
	    write_str(file, line_start, line_start_len) ||
	    write_str(file, escaped,    escaped_len) ||
964
	    my_b_safe_write(file,(byte*) &opt_flags,1));
965 966 967 968
  }
  else
  {
    old_sql_ex old_ex;
969 970 971 972 973 974 975
    old_ex.field_term= *field_term;
    old_ex.enclosed=   *enclosed;
    old_ex.line_term=  *line_term;
    old_ex.line_start= *line_start;
    old_ex.escaped=    *escaped;
    old_ex.opt_flags=  opt_flags;
    old_ex.empty_flags=empty_flags;
976
    return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex));
977 978 979
  }
}

980

981 982 983 984 985 986 987 988 989 990
static inline int read_str(char * &buf, char *buf_end, char * &str,
			   uint8 &len)
{
  if (buf + (uint) (uchar) *buf >= buf_end)
    return 1;
  len = (uint8) *buf;
  str= buf+1;
  buf+= (uint) len+1;
  return 0;
}
991

992

993 994 995 996 997
char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
{
  cached_new_format = use_new_format;
  if (use_new_format)
  {
998
    empty_flags=0;
unknown's avatar
unknown committed
999 1000 1001 1002 1003 1004
    /*
      The code below assumes that buf will not disappear from
      under our feet during the lifetime of the event. This assumption
      holds true in the slave thread if the log is in new format, but is not
      the case when we have old format because we will be reusing net buffer
      to read the actual file before we write out the Create_file event.
unknown's avatar
unknown committed
1005
    */
1006 1007 1008 1009 1010 1011
    if (read_str(buf, buf_end, field_term, field_term_len) ||
	read_str(buf, buf_end, enclosed,   enclosed_len) ||
	read_str(buf, buf_end, line_term,  line_term_len) ||
	read_str(buf, buf_end, line_start, line_start_len) ||
	read_str(buf, buf_end, escaped,	   escaped_len))
      return 0;
1012 1013 1014 1015
    opt_flags = *buf++;
  }
  else
  {
1016
    field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
1017 1018 1019 1020 1021 1022 1023
    field_term = buf++;			// Use first byte in string
    enclosed=	 buf++;
    line_term=   buf++;
    line_start=  buf++;
    escaped=     buf++;
    opt_flags =  *buf++;
    empty_flags= *buf++;
1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
    if (empty_flags & FIELD_TERM_EMPTY)
      field_term_len=0;
    if (empty_flags & ENCLOSED_EMPTY)
      enclosed_len=0;
    if (empty_flags & LINE_TERM_EMPTY)
      line_term_len=0;
    if (empty_flags & LINE_START_EMPTY)
      line_start_len=0;
    if (empty_flags & ESCAPED_EMPTY)
      escaped_len=0;
1034 1035 1036 1037 1038
  }
  return buf;
}


1039 1040 1041
#ifndef MYSQL_CLIENT
Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
			       const char* db_arg, const char* table_name_arg,
1042 1043
			       List<Item>& fields_arg,
			       enum enum_duplicates handle_dup)
unknown's avatar
unknown committed
1044
  :Log_event(thd),thread_id(thd->thread_id), num_fields(0),fields(0),
1045 1046
  field_lens(0),field_block_len(0),
  table_name(table_name_arg ? table_name_arg : ""),
1047
  db(db_arg), fname(ex->file_name)
unknown's avatar
unknown committed
1048 1049 1050 1051
{
  time_t end_time;
  time(&end_time);
  exec_time = (ulong) (end_time  - thd->start_time);
1052 1053 1054
  /* db can never be a zero pointer in 4.0 */
  db_len = (uint32) strlen(db);
  table_name_len = (uint32) strlen(table_name);
unknown's avatar
unknown committed
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067
  fname_len = (fname) ? (uint) strlen(fname) : 0;
  sql_ex.field_term = (char*) ex->field_term->ptr();
  sql_ex.field_term_len = (uint8) ex->field_term->length();
  sql_ex.enclosed = (char*) ex->enclosed->ptr();
  sql_ex.enclosed_len = (uint8) ex->enclosed->length();
  sql_ex.line_term = (char*) ex->line_term->ptr();
  sql_ex.line_term_len = (uint8) ex->line_term->length();
  sql_ex.line_start = (char*) ex->line_start->ptr();
  sql_ex.line_start_len = (uint8) ex->line_start->length();
  sql_ex.escaped = (char*) ex->escaped->ptr();
  sql_ex.escaped_len = (uint8) ex->escaped->length();
  sql_ex.opt_flags = 0;
  sql_ex.cached_new_format = -1;
1068
    
unknown's avatar
unknown committed
1069 1070 1071 1072
  if (ex->dumpfile)
    sql_ex.opt_flags |= DUMPFILE_FLAG;
  if (ex->opt_enclosed)
    sql_ex.opt_flags |= OPT_ENCLOSED_FLAG;
1073

unknown's avatar
unknown committed
1074
  sql_ex.empty_flags = 0;
1075

1076
  switch (handle_dup) {
unknown's avatar
unknown committed
1077 1078 1079 1080
  case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break;
  case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break;
  case DUP_ERROR: break;	
  }
1081

unknown's avatar
unknown committed
1082 1083 1084 1085 1086 1087 1088 1089 1090 1091
  if (!ex->field_term->length())
    sql_ex.empty_flags |= FIELD_TERM_EMPTY;
  if (!ex->enclosed->length())
    sql_ex.empty_flags |= ENCLOSED_EMPTY;
  if (!ex->line_term->length())
    sql_ex.empty_flags |= LINE_TERM_EMPTY;
  if (!ex->line_start->length())
    sql_ex.empty_flags |= LINE_START_EMPTY;
  if (!ex->escaped->length())
    sql_ex.empty_flags |= ESCAPED_EMPTY;
1092
    
unknown's avatar
unknown committed
1093
  skip_lines = ex->skip_lines;
1094

unknown's avatar
unknown committed
1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105
  List_iterator<Item> li(fields_arg);
  field_lens_buf.length(0);
  fields_buf.length(0);
  Item* item;
  while ((item = li++))
  {
    num_fields++;
    uchar len = (uchar) strlen(item->name);
    field_block_len += len + 1;
    fields_buf.append(item->name, len + 1);
    field_lens_buf.append((char*)&len, 1);
1106 1107
  }

unknown's avatar
unknown committed
1108 1109 1110 1111
  field_lens = (const uchar*)field_lens_buf.ptr();
  fields = fields_buf.ptr();
}

1112 1113
#endif

1114 1115 1116 1117 1118
/*
  The caller must do buf[event_len] = 0 before he starts using the
  constructed event.
*/

1119 1120 1121
Load_log_event::Load_log_event(const char* buf, int event_len,
			       bool old_format):
  Log_event(buf, old_format),num_fields(0),fields(0),
1122
  field_lens(0),field_block_len(0),
1123
  table_name(0),db(0),fname(0)
unknown's avatar
unknown committed
1124
{
1125
  if (!event_len) // derived class, will call copy_log_event() itself
1126
    return;
1127
  copy_log_event(buf, event_len, old_format);
1128 1129
}

1130 1131
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
				   bool old_format)
1132
{
1133
  uint data_len;
1134
  char* buf_end = (char*)buf + event_len;
1135
  const char* data_head = buf + ((old_format) ?
1136
				 OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN);
1137 1138 1139 1140 1141 1142
  thread_id = uint4korr(data_head + L_THREAD_ID_OFFSET);
  exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
  skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
  table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
  db_len = (uint)data_head[L_DB_LEN_OFFSET];
  num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
unknown's avatar
unknown committed
1143
	  
1144 1145 1146
  int body_offset = ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
		     LOAD_HEADER_LEN + OLD_HEADER_LEN :
		     get_data_body_offset());
unknown's avatar
unknown committed
1147
  
unknown's avatar
unknown committed
1148
  if ((int) event_len < body_offset)
1149
    return 1;
1150 1151 1152 1153
  /*
    Sql_ex.init() on success returns the pointer to the first byte after
    the sql_ex structure, which is the start of field lengths array.
  */
1154 1155 1156 1157 1158 1159
  if (!(field_lens=(uchar*)sql_ex.init((char*)buf + body_offset,
		  buf_end,
		  buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
    return 1;
  
  data_len = event_len - body_offset;
1160
  if (num_fields > data_len) // simple sanity check against corruption
1161
    return 1;
1162
  for (uint i = 0; i < num_fields; i++)
1163
    field_block_len += (uint)field_lens[i] + 1;
1164

unknown's avatar
unknown committed
1165 1166 1167 1168
  fields = (char*)field_lens + num_fields;
  table_name  = fields + field_block_len;
  db = table_name + table_name_len + 1;
  fname = db + db_len + 1;
1169 1170
  fname_len = strlen(fname);
  // null termination is accomplished by the caller doing buf[event_len]=0
1171
  return 0;
unknown's avatar
unknown committed
1172 1173
}

1174
#ifdef MYSQL_CLIENT
unknown's avatar
unknown committed
1175

1176
void Load_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
1177 1178 1179
{
  if (!short_form)
  {
1180
    print_header(file);
1181
    fprintf(file, "\tQuery\tthread_id=%ld\texec_time=%ld\n",
unknown's avatar
unknown committed
1182 1183 1184
	    thread_id, exec_time);
  }

1185
  bool same_db = 0;
unknown's avatar
unknown committed
1186 1187 1188 1189 1190
  if (db && last_db)
  {
    if (!(same_db = !memcmp(last_db, db, db_len + 1)))
      memcpy(last_db, db, db_len + 1);
  }
1191
  
unknown's avatar
unknown committed
1192
  if (db && db[0] && !same_db)
unknown's avatar
unknown committed
1193 1194
    fprintf(file, "use %s;\n", db);

1195
  fprintf(file, "LOAD DATA INFILE '%-*s' ", fname_len, fname);
unknown's avatar
unknown committed
1196

unknown's avatar
unknown committed
1197
  if (sql_ex.opt_flags && REPLACE_FLAG )
unknown's avatar
unknown committed
1198
    fprintf(file," REPLACE ");
unknown's avatar
unknown committed
1199
  else if (sql_ex.opt_flags && IGNORE_FLAG )
unknown's avatar
unknown committed
1200 1201 1202
    fprintf(file," IGNORE ");
  
  fprintf(file, "INTO TABLE %s ", table_name);
unknown's avatar
unknown committed
1203
  if (sql_ex.field_term)
unknown's avatar
unknown committed
1204 1205
  {
    fprintf(file, " FIELDS TERMINATED BY ");
1206
    pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len);
unknown's avatar
unknown committed
1207 1208
  }

unknown's avatar
unknown committed
1209
  if (sql_ex.enclosed)
unknown's avatar
unknown committed
1210
  {
unknown's avatar
unknown committed
1211
    if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
unknown's avatar
unknown committed
1212 1213
      fprintf(file," OPTIONALLY ");
    fprintf(file, " ENCLOSED BY ");
1214
    pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len);
unknown's avatar
unknown committed
1215 1216
  }
     
1217
  if (sql_ex.escaped)
unknown's avatar
unknown committed
1218 1219
  {
    fprintf(file, " ESCAPED BY ");
1220
    pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len);
unknown's avatar
unknown committed
1221 1222
  }
     
1223
  if (sql_ex.line_term)
unknown's avatar
unknown committed
1224 1225
  {
    fprintf(file," LINES TERMINATED BY ");
1226
    pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len);
unknown's avatar
unknown committed
1227 1228
  }

1229
  if (sql_ex.line_start)
unknown's avatar
unknown committed
1230 1231
  {
    fprintf(file," LINES STARTING BY ");
1232
    pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len);
unknown's avatar
unknown committed
1233 1234
  }
     
unknown's avatar
unknown committed
1235
  if ((int)skip_lines > 0)
unknown's avatar
Cleanup  
unknown committed
1236
    fprintf(file, " IGNORE %ld LINES ", (long) skip_lines);
unknown's avatar
unknown committed
1237

1238 1239 1240 1241
  if (num_fields)
  {
    uint i;
    const char* field = fields;
1242 1243
    fprintf(file, " (");
    for (i = 0; i < num_fields; i++)
unknown's avatar
unknown committed
1244
    {
unknown's avatar
unknown committed
1245
      if (i)
1246 1247
	fputc(',', file);
      fprintf(file, field);
unknown's avatar
unknown committed
1248
	  
1249
      field += field_lens[i]  + 1;
unknown's avatar
unknown committed
1250
    }
1251 1252
    fputc(')', file);
  }
unknown's avatar
unknown committed
1253 1254 1255 1256

  fprintf(file, ";\n");
}

1257 1258
#endif /* #ifdef MYSQL_CLIENT */

unknown's avatar
unknown committed
1259 1260
#ifndef MYSQL_CLIENT

1261
void Log_event::set_log_pos(MYSQL_LOG* log)
1262 1263 1264 1265 1266
{
  if (!log_pos)
    log_pos = my_b_tell(&log->log_file);
}

1267

unknown's avatar
unknown committed
1268 1269 1270 1271
void Load_log_event::set_fields(List<Item> &fields)
{
  uint i;
  const char* field = this->fields;
unknown's avatar
unknown committed
1272 1273 1274 1275 1276
  for (i = 0; i < num_fields; i++)
  {
    fields.push_back(new Item_field(db, table_name, field));	  
    field += field_lens[i]  + 1;
  }
unknown's avatar
unknown committed
1277 1278
}

unknown's avatar
unknown committed
1279

1280 1281
Slave_log_event::Slave_log_event(THD* thd_arg,
				 struct st_relay_log_info* rli):
1282
  Log_event(thd_arg),mem_pool(0),master_host(0)
1283
{
1284
  DBUG_ENTER("Slave_log_event");
1285
  if (!rli->inited)				// QQ When can this happen ?
1286
    DBUG_VOID_RETURN;
1287 1288
  
  MASTER_INFO* mi = rli->mi;
unknown's avatar
unknown committed
1289
  // TODO: re-write this better without holding both locks at the same time
1290 1291
  pthread_mutex_lock(&mi->data_lock);
  pthread_mutex_lock(&rli->data_lock);
1292
  master_host_len = strlen(mi->host);
1293
  master_log_len = strlen(rli->master_log_name);
1294
  // on OOM, just do not initialize the structure and print the error
unknown's avatar
unknown committed
1295 1296
  if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
				   MYF(MY_WME))))
1297
  {
unknown's avatar
unknown committed
1298
    master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
1299 1300
    memcpy(master_host, mi->host, master_host_len + 1);
    master_log = master_host + master_host_len + 1;
1301
    memcpy(master_log, rli->master_log_name, master_log_len + 1);
1302
    master_port = mi->port;
1303
    master_pos = rli->master_log_pos;
1304 1305
    DBUG_PRINT("info", ("master_log: %s  pos: %d", master_log,
			(ulong) master_pos));
1306 1307 1308
  }
  else
    sql_print_error("Out of memory while recording slave event");
1309 1310
  pthread_mutex_unlock(&rli->data_lock);
  pthread_mutex_unlock(&mi->data_lock);
1311
  DBUG_VOID_RETURN;
1312 1313
}

1314
#endif /* ! MYSQL_CLIENT */
1315 1316 1317 1318 1319 1320 1321


Slave_log_event::~Slave_log_event()
{
  my_free(mem_pool, MYF(MY_ALLOW_ZERO_PTR));
}

1322 1323
#ifdef MYSQL_CLIENT

1324
void Slave_log_event::print(FILE* file, bool short_form, char* last_db)
1325 1326
{
  char llbuff[22];
unknown's avatar
unknown committed
1327
  if (short_form)
1328 1329 1330
    return;
  print_header(file);
  fputc('\n', file);
1331 1332
  fprintf(file, "Slave: master_host: '%s'  master_port: %d  \
master_log: '%s'  master_pos: %s\n",
unknown's avatar
unknown committed
1333
	  master_host, master_port, master_log, llstr(master_pos, llbuff));
1334 1335
}

1336
#endif /* MYSQL_CLIENT */
1337

1338 1339
int Slave_log_event::get_data_size()
{
unknown's avatar
unknown committed
1340
  return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
1341 1342 1343 1344
}

int Slave_log_event::write_data(IO_CACHE* file)
{
1345 1346
  int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
  int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
1347
  // log and host are already there
1348
  return my_b_safe_write(file, (byte*)mem_pool, get_data_size());
1349 1350
}

1351

1352 1353
void Slave_log_event::init_from_mem_pool(int data_size)
{
1354 1355 1356
  master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
  master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
  master_host = mem_pool + SL_MASTER_HOST_OFFSET;
1357 1358
  master_host_len = strlen(master_host);
  // safety
unknown's avatar
unknown committed
1359
  master_log = master_host + master_host_len + 1;
unknown's avatar
unknown committed
1360
  if (master_log > mem_pool + data_size)
1361 1362 1363 1364 1365 1366 1367
  {
    master_host = 0;
    return;
  }
  master_log_len = strlen(master_log);
}

1368 1369
Slave_log_event::Slave_log_event(const char* buf, int event_len)
  :Log_event(buf,0),mem_pool(0),master_host(0)
1370
{
unknown's avatar
unknown committed
1371
  event_len -= LOG_EVENT_HEADER_LEN;
unknown's avatar
unknown committed
1372
  if (event_len < 0)
unknown's avatar
unknown committed
1373
    return;
1374
  if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME))))
1375
    return;
unknown's avatar
unknown committed
1376
  memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
1377 1378 1379
  mem_pool[event_len] = 0;
  init_from_mem_pool(event_len);
}
1380 1381

#ifndef MYSQL_CLIENT
1382 1383 1384
Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex,
		 const char* db_arg, const char* table_name_arg,
		 List<Item>& fields_arg, enum enum_duplicates handle_dup,
1385 1386 1387 1388
			char* block_arg, uint block_len_arg)
  :Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup),
   fake_base(0),block(block_arg),block_len(block_len_arg),
   file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
1389
{
1390
  sql_ex.force_new_format();
1391 1392 1393
}
#endif

1394 1395 1396 1397 1398
int Create_file_log_event::write_data_body(IO_CACHE* file)
{
  int res;
  if ((res = Load_log_event::write_data_body(file)) || fake_base)
    return res;
1399 1400
  return (my_b_safe_write(file, (byte*) "", 1) ||
	  my_b_safe_write(file, (byte*) block, block_len));
1401 1402 1403
}

int Create_file_log_event::write_data_header(IO_CACHE* file)
1404
{
1405 1406 1407
  int res;
  if ((res = Load_log_event::write_data_header(file)) || fake_base)
    return res;
unknown's avatar
unknown committed
1408
  byte buf[CREATE_FILE_HEADER_LEN];
1409
  int4store(buf + CF_FILE_ID_OFFSET, file_id);
1410
  return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN);
1411 1412 1413 1414 1415 1416 1417 1418 1419
}

int Create_file_log_event::write_base(IO_CACHE* file)
{
  int res;
  fake_base = 1; // pretend we are Load event
  res = write(file);
  fake_base = 0;
  return res;
1420 1421
}

unknown's avatar
unknown committed
1422
Create_file_log_event::Create_file_log_event(const char* buf, int len,
1423 1424
					     bool old_format)
  :Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0)
1425
{
1426
  int block_offset;
unknown's avatar
unknown committed
1427
  if (copy_log_event(buf,len,old_format))
1428
    return;
unknown's avatar
unknown committed
1429 1430 1431 1432
  if (!old_format)
  {
    file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
			+ LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
1433 1434 1435
    // + 1 for \0 terminating fname  
    block_offset = (LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
		    CREATE_FILE_HEADER_LEN + 1);
unknown's avatar
unknown committed
1436 1437 1438 1439 1440 1441 1442 1443 1444 1445
    if (len < block_offset)
      return;
    block = (char*)buf + block_offset;
    block_len = len - block_offset;
  }
  else
  {
    sql_ex.force_new_format();
    inited_from_old = 1;
  }
1446
}
1447 1448


1449
#ifdef MYSQL_CLIENT
1450 1451
void Create_file_log_event::print(FILE* file, bool short_form,
				  char* last_db)
1452
{
1453 1454
  if (short_form)
    return;
1455
  Load_log_event::print(file, 1, last_db);
unknown's avatar
unknown committed
1456
  fprintf(file, " file_id: %d  block_len: %d\n", file_id, block_len);
1457 1458 1459 1460 1461 1462
}
#endif

#ifndef MYSQL_CLIENT
void Create_file_log_event::pack_info(String* packet)
{
1463
  char buf1[256],buf[22], *end;
1464
  String tmp(buf1, sizeof(buf1), system_charset_info);
1465 1466 1467 1468
  tmp.length(0);
  tmp.append("db=");
  tmp.append(db, db_len);
  tmp.append(";table=");
1469
  tmp.append(table_name, table_name_len);
1470
  tmp.append(";file_id=");
1471 1472
  end= int10_to_str((long) file_id, buf, 10);
  tmp.append(buf, (uint32) (end-buf));
1473
  tmp.append(";block_len=");
1474 1475 1476
  end= int10_to_str((long) block_len, buf, 10);
  tmp.append(buf, (uint32) (end-buf));
  net_store_data(packet, (char*) tmp.ptr(), tmp.length());
1477 1478 1479 1480 1481
}
#endif  

#ifndef MYSQL_CLIENT  
Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
1482 1483 1484
					       uint block_len_arg)
  :Log_event(thd_arg), block(block_arg),block_len(block_len_arg),
   file_id(thd_arg->file_id)
1485 1486 1487
{
}
#endif  
unknown's avatar
unknown committed
1488

1489
  
1490 1491
Append_block_log_event::Append_block_log_event(const char* buf, int len)
  :Log_event(buf, 0),block(0)
1492
{
unknown's avatar
unknown committed
1493
  if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
1494 1495 1496 1497 1498 1499 1500 1501
    return;
  file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
  block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD;
  block_len = len - APPEND_BLOCK_EVENT_OVERHEAD;
}

int Append_block_log_event::write_data(IO_CACHE* file)
{
unknown's avatar
unknown committed
1502
  byte buf[APPEND_BLOCK_HEADER_LEN];
1503
  int4store(buf + AB_FILE_ID_OFFSET, file_id);
1504 1505
  return (my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
	  my_b_safe_write(file, (byte*) block, block_len));
1506 1507 1508
}

#ifdef MYSQL_CLIENT  
1509 1510
void Append_block_log_event::print(FILE* file, bool short_form,
				   char* last_db)
1511 1512 1513 1514 1515
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
unknown's avatar
unknown committed
1516
  fprintf(file, "#Append_block: file_id: %d  block_len: %d\n",
1517 1518 1519
	  file_id, block_len);
}
#endif  
1520

1521 1522 1523
#ifndef MYSQL_CLIENT
void Append_block_log_event::pack_info(String* packet)
{
1524 1525 1526 1527 1528
  char buf[256];
  uint length;
  length= (uint) my_sprintf(buf,
			    (buf, ";file_id=%u;block_len=%u", file_id,
			     block_len));
1529
  net_store_data(packet, buf, (int32) length);
1530 1531
}

unknown's avatar
unknown committed
1532

1533 1534
Delete_file_log_event::Delete_file_log_event(THD* thd_arg)
  :Log_event(thd_arg),file_id(thd_arg->file_id)
1535 1536
{
}
unknown's avatar
unknown committed
1537
#endif
1538

1539 1540 1541

Delete_file_log_event::Delete_file_log_event(const char* buf, int len)
  :Log_event(buf, 0),file_id(0)
1542
{
unknown's avatar
unknown committed
1543
  if ((uint)len < DELETE_FILE_EVENT_OVERHEAD)
1544 1545 1546 1547
    return;
  file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
}

1548

1549 1550
int Delete_file_log_event::write_data(IO_CACHE* file)
{
unknown's avatar
unknown committed
1551
 byte buf[DELETE_FILE_HEADER_LEN];
1552
 int4store(buf + DF_FILE_ID_OFFSET, file_id);
1553
 return my_b_safe_write(file, buf, DELETE_FILE_HEADER_LEN);
1554 1555 1556
}

#ifdef MYSQL_CLIENT  
1557 1558
void Delete_file_log_event::print(FILE* file, bool short_form,
				  char* last_db)
1559 1560 1561 1562 1563
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
1564
  fprintf(file, "#Delete_file: file_id=%u\n", file_id);
1565
}
unknown's avatar
unknown committed
1566
#endif
1567

1568 1569 1570
#ifndef MYSQL_CLIENT
void Delete_file_log_event::pack_info(String* packet)
{
1571 1572 1573
  char buf[64];
  uint length;
  length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id));
1574
  net_store_data(packet, buf, (int32) length);
1575 1576 1577
}
#endif  

1578

1579
#ifndef MYSQL_CLIENT  
1580 1581
Execute_load_log_event::Execute_load_log_event(THD* thd_arg)
  :Log_event(thd_arg),file_id(thd_arg->file_id)
1582 1583 1584 1585
{
}
#endif  
  
unknown's avatar
unknown committed
1586

1587 1588
Execute_load_log_event::Execute_load_log_event(const char* buf,int len)
  :Log_event(buf, 0),file_id(0)
1589
{
unknown's avatar
unknown committed
1590
  if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
1591 1592 1593 1594
    return;
  file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET);
}

unknown's avatar
unknown committed
1595

1596 1597
int Execute_load_log_event::write_data(IO_CACHE* file)
{
unknown's avatar
unknown committed
1598 1599
  byte buf[EXEC_LOAD_HEADER_LEN];
  int4store(buf + EL_FILE_ID_OFFSET, file_id);
1600
  return my_b_safe_write(file, buf, EXEC_LOAD_HEADER_LEN);
1601 1602 1603
}

#ifdef MYSQL_CLIENT  
1604 1605
void Execute_load_log_event::print(FILE* file, bool short_form,
				   char* last_db)
1606 1607 1608 1609 1610 1611 1612
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
  fprintf(file, "#Exec_load: file_id=%d\n",
	  file_id);
1613 1614
}
#endif  
1615 1616 1617
#ifndef MYSQL_CLIENT
void Execute_load_log_event::pack_info(String* packet)
{
1618
  char buf[64];
1619 1620
  uint length;
  length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id));
1621
  net_store_data(packet, buf, (int32) length);
1622 1623 1624 1625
}
#endif

#ifndef MYSQL_CLIENT
1626
int Query_log_event::exec_event(struct st_relay_log_info* rli)
1627 1628 1629 1630
{
  int expected_error,actual_error = 0;
  init_sql_alloc(&thd->mem_root, 8192,0);
  thd->db = rewrite_db((char*)db);
1631 1632 1633 1634 1635 1636 1637 1638

  /*
    InnoDB internally stores the master log position it has processed so far;
    position to store is really pos + pending + event_len
    since we must store the pos of the END of the current log event
  */
  rli->event_len= get_event_len();

1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649
  if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
  {
    thd->query = (char*)query;
    thd->set_time((time_t)when);
    thd->current_tablenr = 0;
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    thd->query_id = query_id++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
    thd->query_error = 0;			// clear error
    thd->net.last_errno = 0;
    thd->net.last_error[0] = 0;
1650
    thd->slave_proxy_id = thread_id;		// for temp tables
1651
	
1652 1653 1654 1655
    /*
      Sanity check to make sure the master did not get a really bad
      error on the query.
    */
1656 1657
    if (ignored_error_code((expected_error = error_code)) ||
	!check_expected_error(thd,rli,expected_error))
1658
    {
unknown's avatar
unknown committed
1659 1660
      mysql_log.write(thd,COM_QUERY,"%s",thd->query);
      DBUG_PRINT("query",("%s",thd->query));
1661
      mysql_parse(thd, thd->query, q_len);
1662 1663 1664
      DBUG_PRINT("info",("expected_error: %d  last_errno: %d",
			 expected_error, thd->net.last_errno));
      if ((expected_error != (actual_error= thd->net.last_errno)) &&
1665
	  expected_error &&
unknown's avatar
unknown committed
1666 1667
	  !ignored_error_code(actual_error) &&
	  !ignored_error_code(expected_error))
1668 1669
      {
	const char* errmsg = "Slave: did not get the expected error\
unknown's avatar
unknown committed
1670
 running query from master - expected: '%s' (%d), got '%s' (%d)"; 
1671 1672
	sql_print_error(errmsg, ER_SAFE(expected_error),
			expected_error,
unknown's avatar
unknown committed
1673
			actual_error ? thd->net.last_error: "no error",
1674 1675 1676
			actual_error);
	thd->query_error = 1;
      }
1677 1678
      else if (expected_error == actual_error ||
	       ignored_error_code(actual_error))
1679
      {
1680
	DBUG_PRINT("info",("error ignored"));
1681
	thd->query_error = 0;
1682 1683
	*rli->last_slave_error = 0;
	rli->last_slave_errno = 0;
1684 1685 1686 1687 1688 1689
      }
    }
    else
    {
      // master could be inconsistent, abort and tell DBA to check/fix it
      thd->db = thd->query = 0;
unknown's avatar
unknown committed
1690
      thd->variables.convert_set = 0;
1691 1692 1693 1694 1695
      close_thread_tables(thd);
      free_root(&thd->mem_root,0);
      return 1;
    }
  }
1696 1697
  thd->db= 0;				// prevent db from being freed
  thd->query= 0;			// just to be sure
1698
  // assume no convert for next query unless set explictly
unknown's avatar
unknown committed
1699
  thd->variables.convert_set = 0;
1700 1701 1702 1703
  close_thread_tables(thd);
      
  if (thd->query_error || thd->fatal_error)
  {
1704
    slave_print_error(rli,actual_error, "error '%s' on query '%s'",
1705 1706
		      actual_error ? thd->net.last_error :
		      "unexpected success or fatal error", query);
1707 1708 1709 1710
    free_root(&thd->mem_root,0);
    return 1;
  }
  free_root(&thd->mem_root,0);
1711
  return Log_event::exec_event(rli); 
1712 1713
}

1714

1715
int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
1716 1717 1718 1719 1720 1721
{
  init_sql_alloc(&thd->mem_root, 8192,0);
  thd->db = rewrite_db((char*)db);
  thd->query = 0;
  thd->query_error = 0;
	    
unknown's avatar
unknown committed
1722
  if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
1723 1724 1725 1726 1727 1728 1729 1730 1731 1732
  {
    thd->set_time((time_t)when);
    thd->current_tablenr = 0;
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    thd->query_id = query_id++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));

    TABLE_LIST tables;
    bzero((char*) &tables,sizeof(tables));
    tables.db = thd->db;
1733
    tables.alias = tables.real_name = (char*)table_name;
1734 1735
    tables.lock_type = TL_WRITE;
    // the table will be opened in mysql_load    
unknown's avatar
unknown committed
1736
    if (table_rules_on && !tables_ok(thd, &tables))
1737
    {
1738
      // TODO: this is a bug - this needs to be moved to the I/O thread
1739 1740 1741 1742 1743 1744 1745
      if (net)
        skip_load_data_infile(net);
    }
    else
    {
      char llbuff[22];
      enum enum_duplicates handle_dup = DUP_IGNORE;
1746
      if (sql_ex.opt_flags && REPLACE_FLAG)
1747
	handle_dup = DUP_REPLACE;
1748
      sql_exchange ex((char*)fname, sql_ex.opt_flags &&
1749
		      DUMPFILE_FLAG );
unknown's avatar
unknown committed
1750 1751 1752 1753 1754 1755 1756 1757
      String field_term(sql_ex.field_term,sql_ex.field_term_len,
			system_charset_info);
      String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,
		      system_charset_info);
      String line_term(sql_ex.line_term,sql_ex.line_term_len,
		       system_charset_info);
      String line_start(sql_ex.line_start,sql_ex.line_start_len,
			system_charset_info);
1758 1759
      String escaped(sql_ex.escaped,sql_ex.escaped_len, system_charset_info);

1760
      ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
1761
      if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
1762
	ex.field_term->length(0);
1763

1764 1765 1766 1767 1768 1769 1770 1771
      ex.skip_lines = skip_lines;
      List<Item> fields;
      set_fields(fields);
      thd->slave_proxy_id = thd->thread_id;
      if (net)
      {
	// mysql_load will use thd->net to read the file
	thd->net.vio = net->vio;
1772 1773 1774
	/*
	  Make sure the client does not get confused about the packet sequence
	*/
1775 1776
	thd->net.pkt_nr = net->pkt_nr;
      }
unknown's avatar
unknown committed
1777
      if (mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0,
1778
		     TL_WRITE))
1779
	thd->query_error = 1;
unknown's avatar
unknown committed
1780
      if (thd->cuted_fields)
1781
	sql_print_error("Slave: load data infile at position %s in log \
1782
'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME,
1783
			thd->cuted_fields );
unknown's avatar
unknown committed
1784
      if (net)
1785
        net->pkt_nr= thd->net.pkt_nr;
1786 1787 1788 1789
    }
  }
  else
  {
1790 1791 1792 1793 1794
    /*
      We will just ask the master to send us /dev/null if we do not
      want to load the data.
      TODO: this a bug - needs to be done in I/O thread
    */
1795 1796 1797 1798 1799
    if (net)
      skip_load_data_infile(net);
  }
	    
  thd->net.vio = 0; 
1800
  thd->db= 0;					// prevent db from being freed
1801
  close_thread_tables(thd);
unknown's avatar
unknown committed
1802
  if (thd->query_error)
1803 1804
  {
    int sql_error = thd->net.last_errno;
1805
    if (!sql_error)
1806 1807
      sql_error = ER_UNKNOWN_ERROR;
		
1808 1809
    slave_print_error(rli,sql_error,
		      "Slave: Error '%s' running load data infile ",
1810
		      ER_SAFE(sql_error));
1811 1812 1813 1814 1815
    free_root(&thd->mem_root,0);
    return 1;
  }
  free_root(&thd->mem_root,0);
	    
unknown's avatar
unknown committed
1816
  if (thd->fatal_error)
1817 1818 1819 1820 1821
  {
    sql_print_error("Slave: Fatal error running LOAD DATA INFILE ");
    return 1;
  }

1822
  return Log_event::exec_event(rli); 
1823 1824
}

1825

1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839
/*
  The master started

  IMPLEMENTATION
    - To handle the case where the master died without a stop event,
      we clean up all temporary tables + locks that we got.

  TODO
    - Remove all active user locks
    - If we have an active transaction at this point, the master died
      in the middle while writing the transaction to the binary log.
      In this case we should stop the slave.
*/

1840
int Start_log_event::exec_event(struct st_relay_log_info* rli)
1841
{
1842
  /* All temporary tables was deleted on the master */
unknown's avatar
unknown committed
1843
  close_temporary_tables(thd);
1844 1845 1846
  /*
    If we have old format, load_tmpdir is cleaned up by the I/O thread
  */
1847
  if (!rli->mi->old_format)
1848
    cleanup_load_tmpdir();
1849
  return Log_event::exec_event(rli);
1850 1851
}

1852

1853 1854 1855 1856 1857 1858 1859 1860
/*
  The master stopped. Clean up all temporary tables + locks that the
  master may have set.

  TODO
    - Remove all active user locks
*/

1861
int Stop_log_event::exec_event(struct st_relay_log_info* rli)
1862
{
1863
  // do not clean up immediately after rotate event
1864
  if (rli->master_log_pos > BIN_LOG_HEADER_SIZE) 
1865 1866
  {
    close_temporary_tables(thd);
1867
    cleanup_load_tmpdir();
1868
  }
unknown's avatar
unknown committed
1869 1870
  /*
    We do not want to update master_log pos because we get a rotate event
1871 1872
    before stop, so by now master_log_name is set to the next log.
    If we updated it, we will have incorrect master coordinates and this
unknown's avatar
unknown committed
1873
    could give false triggers in MASTER_POS_WAIT() that we have reached
1874
    the target position when in fact we have not.
unknown's avatar
unknown committed
1875
  */
1876 1877
  rli->inc_pos(get_event_len(), 0);  
  flush_relay_log_info(rli);
1878 1879 1880
  return 0;
}

1881 1882 1883 1884 1885

/*
  Got a rotate log even from the master

  IMPLEMENTATION
unknown's avatar
unknown committed
1886 1887
    This is mainly used so that we can later figure out the logname and
    position for the master.
1888

unknown's avatar
unknown committed
1889 1890
    We can't rotate the slave as this will cause infinitive rotations
    in a A -> B -> A setup.
1891 1892

  RETURN VALUES
unknown's avatar
unknown committed
1893 1894
    0	ok
 */
1895 1896
  

1897
int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
1898
{
1899
  char* log_name = rli->master_log_name;
1900 1901
  DBUG_ENTER("Rotate_log_event::exec_event");

1902
  pthread_mutex_lock(&rli->data_lock);
unknown's avatar
unknown committed
1903
  memcpy(log_name, new_log_ident, ident_len+1);
1904 1905
  rli->master_log_pos = pos;
  rli->relay_log_pos += get_event_len();
1906
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) rli->master_log_pos));
1907
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1908
  pthread_cond_broadcast(&rli->data_cond);
1909
  flush_relay_log_info(rli);
1910
  DBUG_RETURN(0);
1911 1912
}

unknown's avatar
unknown committed
1913

1914
int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
1915
{
1916
  switch (type) {
1917 1918 1919 1920 1921 1922 1923 1924
  case LAST_INSERT_ID_EVENT:
    thd->last_insert_id_used = 1;
    thd->last_insert_id = val;
    break;
  case INSERT_ID_EVENT:
    thd->next_insert_id = val;
    break;
  }
1925
  rli->inc_pending(get_event_len());
1926 1927 1928
  return 0;
}

1929
int Slave_log_event::exec_event(struct st_relay_log_info* rli)
1930
{
unknown's avatar
unknown committed
1931
  if (mysql_bin_log.is_open())
1932
    mysql_bin_log.write(this);
1933
  return Log_event::exec_event(rli);
1934 1935
}

1936
int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
1937 1938
{
  char fname_buf[FN_REFLEN+10];
unknown's avatar
unknown committed
1939
  char *p;
1940 1941 1942
  int fd = -1;
  IO_CACHE file;
  int error = 1;
unknown's avatar
unknown committed
1943

1944
  bzero((char*)&file, sizeof(file));
unknown's avatar
unknown committed
1945 1946
  p = slave_load_file_stem(fname_buf, file_id, server_id);
  strmov(p, ".info");			// strmov takes less code than memcpy
1947 1948 1949 1950 1951
  if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
		    MYF(MY_WME))) < 0 ||
      init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
		    MYF(MY_WME|MY_NABP)))
  {
1952
    slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
1953 1954 1955 1956
    goto err;
  }
  
  // a trick to avoid allocating another buffer
unknown's avatar
unknown committed
1957
  strmov(p, ".data");
1958 1959 1960 1961
  fname = fname_buf;
  fname_len = (uint)(p-fname) + 5;
  if (write_base(&file))
  {
unknown's avatar
unknown committed
1962
    strmov(p, ".info"); // to have it right in the error message
1963
    slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf);
1964 1965 1966 1967 1968 1969 1970 1971 1972
    goto err;
  }
  end_io_cache(&file);
  my_close(fd, MYF(0));
  
  // fname_buf now already has .data, not .info, because we did our trick
  if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
		    MYF(MY_WME))) < 0)
  {
1973
    slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
1974 1975
    goto err;
  }
unknown's avatar
unknown committed
1976
  if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
1977
  {
1978
    slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf);
1979 1980 1981 1982
    goto err;
  }
  if (mysql_bin_log.is_open())
    mysql_bin_log.write(this);
1983 1984
  error=0;					// Everything is ok

1985 1986 1987 1988 1989
err:
  if (error)
    end_io_cache(&file);
  if (fd >= 0)
    my_close(fd, MYF(0));
1990
  return error ? 1 : Log_event::exec_event(rli);
1991 1992
}

1993
int Delete_file_log_event::exec_event(struct st_relay_log_info* rli)
1994 1995
{
  char fname[FN_REFLEN+10];
1996
  char *p= slave_load_file_stem(fname, file_id, server_id);
1997
  memcpy(p, ".data", 6);
1998
  (void) my_delete(fname, MYF(MY_WME));
1999
  memcpy(p, ".info", 6);
2000
  (void) my_delete(fname, MYF(MY_WME));
2001 2002
  if (mysql_bin_log.is_open())
    mysql_bin_log.write(this);
2003
  return Log_event::exec_event(rli);
2004 2005
}

2006
int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
2007 2008
{
  char fname[FN_REFLEN+10];
2009 2010
  char *p= slave_load_file_stem(fname, file_id, server_id);
  int fd;
2011
  int error = 1;
2012

2013 2014 2015
  memcpy(p, ".data", 6);
  if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0)
  {
2016
    slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
2017 2018
    goto err;
  }
unknown's avatar
unknown committed
2019
  if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
2020
  {
2021
    slave_print_error(rli,my_errno, "Write to '%s' failed", fname);
2022 2023 2024 2025 2026
    goto err;
  }
  if (mysql_bin_log.is_open())
    mysql_bin_log.write(this);
  error=0;
2027

2028 2029 2030
err:
  if (fd >= 0)
    my_close(fd, MYF(0));
2031
  return error ? error : Log_event::exec_event(rli);
2032 2033
}

2034
int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
2035 2036
{
  char fname[FN_REFLEN+10];
2037 2038
  char *p= slave_load_file_stem(fname, file_id, server_id);
  int fd;
2039
  int error = 1;
unknown's avatar
unknown committed
2040
  ulong save_options;
2041 2042
  IO_CACHE file;
  Load_log_event* lev = 0;
2043

2044 2045 2046 2047 2048
  memcpy(p, ".info", 6);
  if ((fd = my_open(fname, O_RDONLY|O_BINARY, MYF(MY_WME))) < 0 ||
      init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
		    MYF(MY_WME|MY_NABP)))
  {
2049
    slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
2050 2051
    goto err;
  }
2052 2053
  if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
							 (pthread_mutex_t*)0,
2054 2055
							 (bool)0)) ||
      lev->get_type_code() != NEW_LOAD_EVENT)
2056
  {
2057
    slave_print_error(rli,0, "File '%s' appears corrupted", fname);
2058 2059
    goto err;
  }
unknown's avatar
unknown committed
2060 2061 2062 2063 2064 2065
  /*
    We want to disable binary logging in slave thread because we need the file
    events to appear in the same order as they do on the master relative to
    other events, so that we can preserve ascending order of log sequence
    numbers - needed to handle failover .
  */
2066
  save_options = thd->options;
unknown's avatar
unknown committed
2067
  thd->options &= ~ (ulong) (OPTION_BIN_LOG);
2068 2069 2070
  lev->thd = thd;
  if (lev->exec_event(0,0))
  {
2071
    slave_print_error(rli,my_errno, "Failed executing load from '%s'", fname);
2072 2073 2074 2075
    thd->options = save_options;
    goto err;
  }
  thd->options = save_options;
2076
  (void) my_delete(fname, MYF(MY_WME));
2077
  memcpy(p, ".data", 6);
2078
  (void) my_delete(fname, MYF(MY_WME));
2079 2080 2081
  if (mysql_bin_log.is_open())
    mysql_bin_log.write(this);
  error = 0;
2082

2083 2084 2085
err:
  delete lev;
  if (fd >= 0)
2086
  {
2087
    my_close(fd, MYF(0));
2088 2089
    end_io_cache(&file);
  }
2090
  return error ? error : Log_event::exec_event(rli);
2091 2092
}

2093
#endif /* !MYSQL_CLIENT */