log_event.cc 84.7 KB
Newer Older
unknown's avatar
unknown committed
1
/* Copyright (C) 2000-2003 MySQL AB
unknown's avatar
unknown committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#ifndef MYSQL_CLIENT
#ifdef __GNUC__
#pragma implementation				// gcc: Class implementation
#endif
#include  "mysql_priv.h"
23
#include "slave.h"
24
#include <my_dir.h>
unknown's avatar
unknown committed
25 26
#endif /* MYSQL_CLIENT */

unknown's avatar
unknown committed
27
#define log_cs	&my_charset_latin1
28

unknown's avatar
unknown committed
29
/*
30
  pretty_print_str()
unknown's avatar
unknown committed
31
*/
32

33
#ifdef MYSQL_CLIENT
34
static void pretty_print_str(FILE* file, char* str, int len)
unknown's avatar
unknown committed
35
{
36
  char* end = str + len;
unknown's avatar
unknown committed
37
  fputc('\'', file);
38 39
  while (str < end)
  {
unknown's avatar
unknown committed
40
    char c;
41 42 43 44 45 46 47 48 49 50 51 52
    switch ((c=*str++)) {
    case '\n': fprintf(file, "\\n"); break;
    case '\r': fprintf(file, "\\r"); break;
    case '\\': fprintf(file, "\\\\"); break;
    case '\b': fprintf(file, "\\b"); break;
    case '\t': fprintf(file, "\\t"); break;
    case '\'': fprintf(file, "\\'"); break;
    case 0   : fprintf(file, "\\0"); break;
    default:
      fputc(c, file);
      break;
    }
53 54
  }
  fputc('\'', file);
unknown's avatar
unknown committed
55
}
unknown's avatar
unknown committed
56
#endif /* MYSQL_CLIENT */
unknown's avatar
unknown committed
57

unknown's avatar
unknown committed
58

unknown's avatar
unknown committed
59
/*
60
  ignored_error_code()
unknown's avatar
unknown committed
61
*/
62

unknown's avatar
SCRUM  
unknown committed
63
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
64 65
inline int ignored_error_code(int err_code)
{
unknown's avatar
unknown committed
66 67
  return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
          (use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
68
}
unknown's avatar
SCRUM  
unknown committed
69
#endif
70

unknown's avatar
unknown committed
71

unknown's avatar
unknown committed
72
/*
73
  pretty_print_str()
unknown's avatar
unknown committed
74
*/
unknown's avatar
unknown committed
75

76
#ifndef MYSQL_CLIENT
unknown's avatar
unknown committed
77
static char *pretty_print_str(char *packet, char *str, int len)
unknown's avatar
unknown committed
78
{
unknown's avatar
unknown committed
79 80
  char *end= str + len;
  char *pos= packet;
81
  *pos++= '\'';
82 83 84
  while (str < end)
  {
    char c;
85
    switch ((c=*str++)) {
unknown's avatar
unknown committed
86 87 88 89 90 91 92
    case '\n': *pos++= '\\'; *pos++= 'n'; break;
    case '\r': *pos++= '\\'; *pos++= 'r'; break;
    case '\\': *pos++= '\\'; *pos++= '\\'; break;
    case '\b': *pos++= '\\'; *pos++= 'b'; break;
    case '\t': *pos++= '\\'; *pos++= 't'; break;
    case '\'': *pos++= '\\'; *pos++= '\''; break;
    case 0   : *pos++= '\\'; *pos++= '0'; break;
93
    default:
unknown's avatar
unknown committed
94
      *pos++= c;
95 96
      break;
    }
unknown's avatar
unknown committed
97
  }
98 99
  *pos++= '\'';
  return pos;
unknown's avatar
unknown committed
100
}
unknown's avatar
unknown committed
101
#endif /* !MYSQL_CLIENT */
102

unknown's avatar
unknown committed
103

unknown's avatar
unknown committed
104
/*
105
  slave_load_file_stem()
unknown's avatar
unknown committed
106
*/
unknown's avatar
unknown committed
107

unknown's avatar
SCRUM  
unknown committed
108
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
109 110 111
static inline char* slave_load_file_stem(char*buf, uint file_id,
					 int event_server_id)
{
unknown's avatar
unknown committed
112
  fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
113 114 115 116 117 118 119
  buf = strend(buf);
  buf = int10_to_str(::server_id, buf, 10);
  *buf++ = '-';
  buf = int10_to_str(event_server_id, buf, 10);
  *buf++ = '-';
  return int10_to_str(file_id, buf, 10);
}
unknown's avatar
SCRUM  
unknown committed
120
#endif
121

unknown's avatar
unknown committed
122

unknown's avatar
unknown committed
123
/*
124 125
  Delete all temporary files used for SQL_LOAD.

unknown's avatar
unknown committed
126 127
  SYNOPSIS
    cleanup_load_tmpdir()
unknown's avatar
unknown committed
128
*/
129

unknown's avatar
SCRUM  
unknown committed
130
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
131 132 133 134 135
static void cleanup_load_tmpdir()
{
  MY_DIR *dirp;
  FILEINFO *file;
  uint i;
unknown's avatar
unknown committed
136
  char fname[FN_REFLEN], prefbuf[31], *p;
unknown's avatar
unknown committed
137

138 139 140
  if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
    return;

141 142 143 144 145 146 147 148 149 150 151 152 153
  /* 
     When we are deleting temporary files, we should only remove
     the files associated with the server id of our server.
     We don't use event_server_id here because since we've disabled
     direct binlogging of Create_file/Append_file/Exec_load events
     we cannot meet Start_log event in the middle of events from one 
     LOAD DATA.
  */
  p= strmake(prefbuf,"SQL_LOAD-",9);
  p= int10_to_str(::server_id, p, 10);
  *(p++)= '-';
  *p= 0;

154 155 156
  for (i=0 ; i < (uint)dirp->number_off_files; i++)
  {
    file=dirp->dir_entry+i;
157
    if (is_prefix(file->name, prefbuf))
unknown's avatar
unknown committed
158 159 160 161
    {
      fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
      my_delete(fname, MYF(0));
    }
162 163 164 165
  }

  my_dirend(dirp);
}
unknown's avatar
SCRUM  
unknown committed
166
#endif
167 168


unknown's avatar
unknown committed
169
/*
170
  write_str()
unknown's avatar
unknown committed
171
*/
172 173 174 175 176 177 178 179

static bool write_str(IO_CACHE *file, char *str, byte length)
{
  return (my_b_safe_write(file, &length, 1) ||
	  my_b_safe_write(file, (byte*) str, (int) length));
}


unknown's avatar
unknown committed
180
/*
181
  read_str()
unknown's avatar
unknown committed
182
*/
183 184 185 186 187 188 189 190 191 192 193 194 195

static inline int read_str(char * &buf, char *buf_end, char * &str,
			   uint8 &len)
{
  if (buf + (uint) (uchar) *buf >= buf_end)
    return 1;
  len = (uint8) *buf;
  str= buf+1;
  buf+= (uint) len+1;
  return 0;
}


unknown's avatar
unknown committed
196
/**************************************************************************
unknown's avatar
unknown committed
197
	Log_event methods
unknown's avatar
unknown committed
198
**************************************************************************/
199

unknown's avatar
unknown committed
200
/*
201
  Log_event::get_type_str()
unknown's avatar
unknown committed
202
*/
203

unknown's avatar
unknown committed
204 205
const char* Log_event::get_type_str()
{
206
  switch(get_type_code()) {
unknown's avatar
unknown committed
207 208 209 210 211 212
  case START_EVENT:  return "Start";
  case STOP_EVENT:   return "Stop";
  case QUERY_EVENT:  return "Query";
  case ROTATE_EVENT: return "Rotate";
  case INTVAR_EVENT: return "Intvar";
  case LOAD_EVENT:   return "Load";
213
  case NEW_LOAD_EVENT:   return "New_load";
unknown's avatar
unknown committed
214
  case SLAVE_EVENT:  return "Slave";
215 216 217 218
  case CREATE_FILE_EVENT: return "Create_file";
  case APPEND_BLOCK_EVENT: return "Append_block";
  case DELETE_FILE_EVENT: return "Delete_file";
  case EXEC_LOAD_EVENT: return "Exec_load";
219
  case RAND_EVENT: return "RAND";
unknown's avatar
unknown committed
220
  case USER_VAR_EVENT: return "User var";
unknown's avatar
unknown committed
221
  default: return "Unknown";				/* impossible */ 
unknown's avatar
unknown committed
222 223 224
  }
}

225

unknown's avatar
unknown committed
226
/*
227
  Log_event::Log_event()
unknown's avatar
unknown committed
228
*/
229

unknown's avatar
unknown committed
230
#ifndef MYSQL_CLIENT
231
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
unknown's avatar
unknown committed
232
  :log_pos(0), temp_buf(0), exec_time(0), cached_event_len(0),
233
   flags(flags_arg), thd(thd_arg)
234
{
unknown's avatar
unknown committed
235 236 237 238
  server_id=	thd->server_id;
  when=		thd->start_time;
  cache_stmt=	(using_trans &&
		 (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)));
239 240 241
}


unknown's avatar
unknown committed
242 243 244 245 246 247 248
/*
  This minimal constructor is for when you are not even sure that there is a
  valid THD. For example in the server when we are shutting down or flushing
  logs after receiving a SIGHUP (then we must write a Rotate to the binlog but
  we have no THD, so we need this minimal constructor).
*/

249 250 251 252
Log_event::Log_event()
  :temp_buf(0), exec_time(0), cached_event_len(0), flags(0), cache_stmt(0),
   thd(0)
{
unknown's avatar
unknown committed
253 254 255
  server_id=	::server_id;
  when=		time(NULL);
  log_pos=	0;
256
}
unknown's avatar
unknown committed
257
#endif /* !MYSQL_CLIENT */
258 259


unknown's avatar
unknown committed
260
/*
261
  Log_event::Log_event()
262
*/
263

264
Log_event::Log_event(const char* buf, bool old_format)
265
  :temp_buf(0), cached_event_len(0), cache_stmt(0)
266 267 268
{
  when = uint4korr(buf);
  server_id = uint4korr(buf + SERVER_ID_OFFSET);
269 270
  if (old_format)
  {
271
    log_pos=0;
272 273 274 275
    flags=0;
  }
  else
  {
276
    log_pos = uint4korr(buf + LOG_POS_OFFSET);
277 278
    flags = uint2korr(buf + FLAGS_OFFSET);
  }
279 280 281 282 283 284
#ifndef MYSQL_CLIENT
  thd = 0;
#endif  
}

#ifndef MYSQL_CLIENT
unknown's avatar
SCRUM  
unknown committed
285
#ifdef HAVE_REPLICATION
286

unknown's avatar
unknown committed
287
/*
288
  Log_event::exec_event()
unknown's avatar
unknown committed
289
*/
290

291
int Log_event::exec_event(struct st_relay_log_info* rli)
292
{
293 294 295 296 297 298 299 300 301 302 303 304 305
  /*
    rli is null when (as far as I (Guilhem) know)
    the caller is
    Load_log_event::exec_event *and* that one is called from
    Execute_load_log_event::exec_event. 
    In this case, we don't do anything here ;
    Execute_load_log_event::exec_event will call Log_event::exec_event
    again later with the proper rli.
    Strictly speaking, if we were sure that rli is null
    only in the case discussed above, 'if (rli)' is useless here.
    But as we are not 100% sure, keep it for now.
  */
  if (rli)  
306
  {
307 308 309 310 311 312 313 314 315 316 317 318 319 320
    /*
      If in a transaction, and if the slave supports transactions,
      just inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
      (not OPTION_NOT_AUTOCOMMIT) as transactions are logged
      with BEGIN/COMMIT, not with SET AUTOCOMMIT= .
      
      CAUTION: opt_using_transactions means
      innodb || bdb ; suppose the master supports InnoDB and BDB, 
      but the slave supports only BDB, problems
      will arise: 
      - suppose an InnoDB table is created on the master,
      - then it will be MyISAM on the slave
      - but as opt_using_transactions is true, the slave will believe he is
      transactional with the MyISAM table. And problems will come when one
unknown's avatar
unknown committed
321 322
      does START SLAVE; STOP SLAVE; START SLAVE; (the slave will resume at
      BEGIN whereas there has not been any rollback). This is the problem of
323 324 325 326 327
      using opt_using_transactions instead of a finer
      "does the slave support _the_transactional_handler_used_on_the_master_".
      
      More generally, we'll have problems when a query mixes a transactional
      handler and MyISAM and STOP SLAVE is issued in the middle of the
unknown's avatar
unknown committed
328 329
      "transaction". START SLAVE will resume at BEGIN while the MyISAM table
      has already been updated.
330 331 332
    */
    if ((thd->options & OPTION_BEGIN) && opt_using_transactions)
      rli->inc_event_relay_log_pos(get_event_len());
333 334
    else
    {
335
      rli->inc_group_relay_log_pos(get_event_len(),log_pos);
336
      flush_relay_log_info(rli);
337 338 339 340 341 342
      /* 
         Note that Rotate_log_event::exec_event() does not call this function,
         so there is no chance that a fake rotate event resets
         last_master_timestamp.
      */
      rli->last_master_timestamp= when;
343
    }
344 345 346
  }
  return 0;
}
unknown's avatar
unknown committed
347

348

unknown's avatar
unknown committed
349
/*
350
  Log_event::pack_info()
unknown's avatar
unknown committed
351
*/
352

353
void Log_event::pack_info(Protocol *protocol)
unknown's avatar
unknown committed
354
{
355
  protocol->store("", &my_charset_bin);
unknown's avatar
unknown committed
356 357 358
}


unknown's avatar
unknown committed
359
/*
360
  Log_event::net_send()
unknown's avatar
unknown committed
361

362
  Only called by SHOW BINLOG EVENTS
unknown's avatar
unknown committed
363
*/
unknown's avatar
SCRUM  
unknown committed
364

365
int Log_event::net_send(Protocol *protocol, const char* log_name, my_off_t pos)
366
{
unknown's avatar
unknown committed
367 368
  const char *p= strrchr(log_name, FN_LIBCHAR);
  const char *event_type;
369 370 371
  if (p)
    log_name = p + 1;
  
372
  protocol->prepare_for_resend();
373
  protocol->store(log_name, &my_charset_bin);
374
  protocol->store((ulonglong) pos);
375
  event_type = get_type_str();
376
  protocol->store(event_type, strlen(event_type), &my_charset_bin);
377 378 379 380
  protocol->store((uint32) server_id);
  protocol->store((ulonglong) log_pos);
  pack_info(protocol);
  return protocol->write();
381
}
unknown's avatar
SCRUM  
unknown committed
382 383 384
#endif /* HAVE_REPLICATION */


unknown's avatar
unknown committed
385
/*
unknown's avatar
SCRUM  
unknown committed
386
  Log_event::init_show_field_list()
unknown's avatar
unknown committed
387
*/
unknown's avatar
SCRUM  
unknown committed
388 389 390 391 392 393 394 395 396 397 398 399 400 401

void Log_event::init_show_field_list(List<Item>* field_list)
{
  field_list->push_back(new Item_empty_string("Log_name", 20));
  field_list->push_back(new Item_return_int("Pos", 11,
					    MYSQL_TYPE_LONGLONG));
  field_list->push_back(new Item_empty_string("Event_type", 20));
  field_list->push_back(new Item_return_int("Server_id", 10,
					    MYSQL_TYPE_LONG));
  field_list->push_back(new Item_return_int("Orig_log_pos", 11,
					    MYSQL_TYPE_LONGLONG));
  field_list->push_back(new Item_empty_string("Info", 20));
}

unknown's avatar
unknown committed
402
#endif /* !MYSQL_CLIENT */
unknown's avatar
unknown committed
403

unknown's avatar
unknown committed
404
/*
405
  Log_event::write()
unknown's avatar
unknown committed
406
*/
407

408
int Log_event::write(IO_CACHE* file)
unknown's avatar
unknown committed
409
{
410
  return (write_header(file) || write_data(file)) ? -1 : 0;
unknown's avatar
unknown committed
411 412
}

413

unknown's avatar
unknown committed
414
/*
415
  Log_event::write_header()
unknown's avatar
unknown committed
416
*/
417

418
int Log_event::write_header(IO_CACHE* file)
unknown's avatar
unknown committed
419
{
420
  char buf[LOG_EVENT_HEADER_LEN];
unknown's avatar
unknown committed
421
  char* pos = buf;
unknown's avatar
unknown committed
422
  int4store(pos, (ulong) when); // timestamp
unknown's avatar
unknown committed
423 424
  pos += 4;
  *pos++ = get_type_code(); // event type code
425 426
  int4store(pos, server_id);
  pos += 4;
427 428
  long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
  int4store(pos, tmp);
unknown's avatar
unknown committed
429
  pos += 4;
430
  int4store(pos, log_pos);
431 432 433
  pos += 4;
  int2store(pos, flags);
  pos += 2;
434
  return (my_b_safe_write(file, (byte*) buf, (uint) (pos - buf)));
unknown's avatar
unknown committed
435 436 437
}


unknown's avatar
unknown committed
438
/*
439
  Log_event::read_log_event()
unknown's avatar
unknown committed
440
*/
441 442

#ifndef MYSQL_CLIENT
443
int Log_event::read_log_event(IO_CACHE* file, String* packet,
444
			      pthread_mutex_t* log_lock)
unknown's avatar
unknown committed
445 446
{
  ulong data_len;
447
  int result=0;
unknown's avatar
unknown committed
448
  char buf[LOG_EVENT_HEADER_LEN];
449
  DBUG_ENTER("read_log_event");
450

451
  if (log_lock)
452
    pthread_mutex_lock(log_lock);
453 454
  if (my_b_read(file, (byte*) buf, sizeof(buf)))
  {
455 456 457 458 459
    /*
      If the read hits eof, we must report it as eof so the caller
      will know it can go into cond_wait to be woken up on the next
      update to the log.
    */
460
    DBUG_PRINT("error",("file->error: %d", file->error));
461 462 463
    if (!file->error)
      result= LOG_READ_EOF;
    else
464
      result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
465
    goto end;
466
  }
467
  data_len= uint4korr(buf + EVENT_LEN_OFFSET);
unknown's avatar
unknown committed
468 469
  if (data_len < LOG_EVENT_HEADER_LEN ||
      data_len > current_thd->variables.max_allowed_packet)
470
  {
471
    DBUG_PRINT("error",("data_len: %ld", data_len));
472 473 474
    result= ((data_len < LOG_EVENT_HEADER_LEN) ? LOG_READ_BOGUS :
	     LOG_READ_TOO_LARGE);
    goto end;
475
  }
unknown's avatar
unknown committed
476
  packet->append(buf, sizeof(buf));
477
  data_len-= LOG_EVENT_HEADER_LEN;
478 479 480
  if (data_len)
  {
    if (packet->append(file, data_len))
481
    {
482
      /*
483 484
	Here we should never hit EOF in a non-error condition.
	EOF means we are reading the event partially, which should
485 486 487 488
	never happen.
      */
      result= file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO;
      /* Implicit goto end; */
489
    }
490
  }
491 492 493 494

end:
  if (log_lock)
    pthread_mutex_unlock(log_lock);
495
  DBUG_RETURN(result);
unknown's avatar
unknown committed
496
}
unknown's avatar
unknown committed
497
#endif /* !MYSQL_CLIENT */
unknown's avatar
unknown committed
498

unknown's avatar
unknown committed
499
#ifndef MYSQL_CLIENT
unknown's avatar
unknown committed
500 501
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
502
#define max_allowed_packet current_thd->variables.max_allowed_packet
503
#else
504
#define UNLOCK_MUTEX
505 506 507
#define LOCK_MUTEX
#endif

unknown's avatar
unknown committed
508
/*
509 510
  Log_event::read_log_event()

unknown's avatar
unknown committed
511 512 513
  NOTE:
    Allocates memory;  The caller is responsible for clean-up
*/
514

unknown's avatar
unknown committed
515
#ifndef MYSQL_CLIENT
516 517 518
Log_event* Log_event::read_log_event(IO_CACHE* file,
				     pthread_mutex_t* log_lock,
				     bool old_format)
unknown's avatar
unknown committed
519
#else
520
Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
unknown's avatar
unknown committed
521
#endif  
unknown's avatar
unknown committed
522
{
523
  char head[LOG_EVENT_HEADER_LEN];
524 525
  uint header_size= old_format ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;

526
  LOCK_MUTEX;
527
  if (my_b_read(file, (byte *) head, header_size))
528
  {
unknown's avatar
unknown committed
529
    UNLOCK_MUTEX;
530
    return 0;
531
  }
unknown's avatar
unknown committed
532

533
  uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
534 535 536
  char *buf= 0;
  const char *error= 0;
  Log_event *res=  0;
unknown's avatar
unknown committed
537

538
  if (data_len > max_allowed_packet)
unknown's avatar
unknown committed
539
  {
540 541
    error = "Event too big";
    goto err;
unknown's avatar
unknown committed
542 543
  }

544
  if (data_len < header_size)
unknown's avatar
unknown committed
545
  {
546 547
    error = "Event too small";
    goto err;
unknown's avatar
unknown committed
548
  }
549 550 551

  // some events use the extra byte to null-terminate strings
  if (!(buf = my_malloc(data_len+1, MYF(MY_WME))))
552 553 554
  {
    error = "Out of memory";
    goto err;
unknown's avatar
unknown committed
555
  }
556
  buf[data_len] = 0;
557
  memcpy(buf, head, header_size);
558
  if (my_b_read(file, (byte*) buf + header_size, data_len - header_size))
559 560 561 562
  {
    error = "read error";
    goto err;
  }
563
  if ((res = read_log_event(buf, data_len, &error, old_format)))
564
    res->register_temp_buf(buf);
565

566
err:
unknown's avatar
unknown committed
567
  UNLOCK_MUTEX;
568
  if (error)
569
  {
570 571 572
    sql_print_error("\
Error in Log_event::read_log_event(): '%s', data_len: %d, event_type: %d",
		    error,data_len,head[EVENT_TYPE_OFFSET]);
573
    my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
574 575 576 577 578 579 580 581 582
    /*
      The SQL slave thread will check if file->error<0 to know
      if there was an I/O error. Even if there is no "low-level" I/O errors
      with 'file', any of the high-level above errors is worrying
      enough to stop the SQL thread now ; as we are skipping the current event,
      going on with reading and successfully executing other events can
      only corrupt the slave's databases. So stop.
    */
    file->error= -1;
583
  }
584
  return res;
unknown's avatar
unknown committed
585 586
}

587

unknown's avatar
unknown committed
588
/*
589
  Log_event::read_log_event()
unknown's avatar
unknown committed
590
*/
591

592
Log_event* Log_event::read_log_event(const char* buf, int event_len,
593
				     const char **error, bool old_format)
unknown's avatar
unknown committed
594
{
595
  if (event_len < EVENT_LEN_OFFSET ||
596 597 598
      (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
  {
    *error="Sanity check failed";		// Needed to free buffer
599
    return NULL; // general sanity check - will fail on a partial read
600
  }
601
  
602 603
  Log_event* ev = NULL;
  
604
  switch(buf[EVENT_TYPE_OFFSET]) {
unknown's avatar
unknown committed
605
  case QUERY_EVENT:
606
    ev  = new Query_log_event(buf, event_len, old_format);
607
    break;
unknown's avatar
unknown committed
608
  case LOAD_EVENT:
unknown's avatar
unknown committed
609 610
    ev = new Create_file_log_event(buf, event_len, old_format);
    break;
611
  case NEW_LOAD_EVENT:
612
    ev = new Load_log_event(buf, event_len, old_format);
613
    break;
unknown's avatar
unknown committed
614
  case ROTATE_EVENT:
615
    ev = new Rotate_log_event(buf, event_len, old_format);
616
    break;
unknown's avatar
SCRUM  
unknown committed
617
#ifdef HAVE_REPLICATION
unknown's avatar
unknown committed
618
  case SLAVE_EVENT:
619 620
    ev = new Slave_log_event(buf, event_len);
    break;
unknown's avatar
SCRUM  
unknown committed
621
#endif /* HAVE_REPLICATION */
622
  case CREATE_FILE_EVENT:
unknown's avatar
unknown committed
623
    ev = new Create_file_log_event(buf, event_len, old_format);
624 625 626 627 628 629 630 631 632 633 634
    break;
  case APPEND_BLOCK_EVENT:
    ev = new Append_block_log_event(buf, event_len);
    break;
  case DELETE_FILE_EVENT:
    ev = new Delete_file_log_event(buf, event_len);
    break;
  case EXEC_LOAD_EVENT:
    ev = new Execute_load_log_event(buf, event_len);
    break;
  case START_EVENT:
635
    ev = new Start_log_event(buf, old_format);
636
    break;
unknown's avatar
SCRUM  
unknown committed
637
#ifdef HAVE_REPLICATION
638
  case STOP_EVENT:
639
    ev = new Stop_log_event(buf, old_format);
640
    break;
unknown's avatar
SCRUM  
unknown committed
641
#endif /* HAVE_REPLICATION */
642
  case INTVAR_EVENT:
643
    ev = new Intvar_log_event(buf, old_format);
644
    break;
unknown's avatar
unknown committed
645 646 647
  case RAND_EVENT:
    ev = new Rand_log_event(buf, old_format);
    break;
unknown's avatar
unknown committed
648 649 650
  case USER_VAR_EVENT:
    ev = new User_var_log_event(buf, old_format);
    break;
651 652
  default:
    break;
unknown's avatar
unknown committed
653
  }
654
  if (!ev || !ev->is_valid())
655 656
  {
    delete ev;
657 658 659 660 661 662 663 664 665
#ifdef MYSQL_CLIENT
    if (!force_opt)
    {
      *error= "Found invalid event in binary log";
      return 0;
    }
    ev= new Unknown_log_event(buf, old_format);
#else
    *error= "Found invalid event in binary log";
666
    return 0;
667
#endif
668 669 670
  }
  ev->cached_event_len = event_len;
  return ev;  
unknown's avatar
unknown committed
671 672
}

673
#ifdef MYSQL_CLIENT
674

unknown's avatar
unknown committed
675
/*
676
  Log_event::print_header()
unknown's avatar
unknown committed
677
*/
678

679 680
void Log_event::print_header(FILE* file)
{
681
  char llbuff[22];
682 683
  fputc('#', file);
  print_timestamp(file);
684
  fprintf(file, " server id %d  log_pos %s ", server_id,
685
	  llstr(log_pos,llbuff)); 
686 687
}

unknown's avatar
unknown committed
688
/*
689
  Log_event::print_timestamp()
unknown's avatar
unknown committed
690
*/
691

692
void Log_event::print_timestamp(FILE* file, time_t* ts)
unknown's avatar
unknown committed
693
{
unknown's avatar
unknown committed
694
  struct tm *res;
695 696
  if (!ts)
    ts = &when;
697 698
#ifdef MYSQL_SERVER				// This is always false
  struct tm tm_tmp;
unknown's avatar
unknown committed
699
  localtime_r(ts,(res= &tm_tmp));
unknown's avatar
unknown committed
700
#else
701
  res=localtime(ts);
unknown's avatar
unknown committed
702
#endif
703 704

  fprintf(file,"%02d%02d%02d %2d:%02d:%02d",
705 706 707 708 709 710
	  res->tm_year % 100,
	  res->tm_mon+1,
	  res->tm_mday,
	  res->tm_hour,
	  res->tm_min,
	  res->tm_sec);
unknown's avatar
unknown committed
711 712
}

unknown's avatar
unknown committed
713
#endif /* MYSQL_CLIENT */
unknown's avatar
unknown committed
714 715


unknown's avatar
unknown committed
716
/*
717
  Log_event::set_log_pos()
unknown's avatar
unknown committed
718
*/
unknown's avatar
unknown committed
719

720 721
#ifndef MYSQL_CLIENT
void Log_event::set_log_pos(MYSQL_LOG* log)
unknown's avatar
unknown committed
722
{
723 724
  if (!log_pos)
    log_pos = my_b_tell(&log->log_file);
unknown's avatar
unknown committed
725
}
unknown's avatar
unknown committed
726
#endif /* !MYSQL_CLIENT */
unknown's avatar
unknown committed
727 728


unknown's avatar
unknown committed
729
/**************************************************************************
unknown's avatar
unknown committed
730
	Query_log_event methods
unknown's avatar
unknown committed
731
**************************************************************************/
732

unknown's avatar
SCRUM  
unknown committed
733
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
734

unknown's avatar
unknown committed
735
/*
736
  Query_log_event::pack_info()
unknown's avatar
unknown committed
737
*/
738

739
void Query_log_event::pack_info(Protocol *protocol)
unknown's avatar
unknown committed
740
{
741 742 743 744
  char *buf, *pos;
  if (!(buf= my_malloc(9 + db_len + q_len, MYF(MY_WME))))
    return;
  pos= buf;    
745
  if (db && db_len)
746
  {
747 748
    pos= strmov(buf, "use `");
    memcpy(pos, db, db_len);
unknown's avatar
unknown committed
749
    pos= strmov(pos+db_len, "`; ");
750
  }
751
  if (query && q_len)
752 753 754 755
  {
    memcpy(pos, query, q_len);
    pos+= q_len;
  }
756
  protocol->store(buf, pos-buf, &my_charset_bin);
757
  my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
758
}
unknown's avatar
SCRUM  
unknown committed
759
#endif
760 761


unknown's avatar
unknown committed
762
/*
763
  Query_log_event::write()
unknown's avatar
unknown committed
764
*/
765 766 767 768

int Query_log_event::write(IO_CACHE* file)
{
  return query ? Log_event::write(file) : -1; 
unknown's avatar
unknown committed
769 770
}

771

unknown's avatar
unknown committed
772
/*
773
  Query_log_event::write_data()
unknown's avatar
unknown committed
774
*/
775 776

int Query_log_event::write_data(IO_CACHE* file)
unknown's avatar
unknown committed
777
{
unknown's avatar
unknown committed
778 779
  char buf[QUERY_HEADER_LEN]; 

780 781 782
  if (!query)
    return -1;
  
unknown's avatar
unknown committed
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
  /*
    We want to store the thread id:
    (- as an information for the user when he reads the binlog)
    - if the query uses temporary table: for the slave SQL thread to know to
    which master connection the temp table belongs.
    Now imagine we (write_data()) are called by the slave SQL thread (we are
    logging a query executed by this thread; the slave runs with
    --log-slave-updates). Then this query will be logged with
    thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
    the same name were created simultaneously on the master (in the master
    binlog you have
    CREATE TEMPORARY TABLE t; (thread 1)
    CREATE TEMPORARY TABLE t; (thread 2)
    ...)
    then in the slave's binlog there will be
    CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
    CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
    which is bad (same thread id!).

    To avoid this, we log the thread's thread id EXCEPT for the SQL
    slave thread for which we log the original (master's) thread id.
    Now this moves the bug: what happens if the thread id on the
    master was 10 and when the slave replicates the query, a
    connection number 10 is opened by a normal client on the slave,
    and updates a temp table of the same name? We get a problem
    again. To avoid this, in the handling of temp tables (sql_base.cc)
    we use thread_id AND server_id.  TODO when this is merged into
    4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
    and is a session variable: that's to make mysqlbinlog work with
    temp tables. We probably need to introduce

    SET PSEUDO_SERVER_ID
    for mysqlbinlog in 4.1. mysqlbinlog would print:
    SET PSEUDO_SERVER_ID=
    SET PSEUDO_THREAD_ID=
    for each query using temp tables.
  */
  int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
821 822 823 824 825 826 827
  int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
  buf[Q_DB_LEN_OFFSET] = (char) db_len;
  int2store(buf + Q_ERR_CODE_OFFSET, error_code);

  return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
	  my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
	  my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0;
unknown's avatar
unknown committed
828 829
}

830

unknown's avatar
unknown committed
831
/*
832
  Query_log_event::Query_log_event()
unknown's avatar
unknown committed
833
*/
834

835 836
#ifndef MYSQL_CLIENT
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
837
				 ulong query_length, bool using_trans)
unknown's avatar
unknown committed
838 839
  :Log_event(thd_arg, !thd_arg->tmp_table_used ?
	     0 : LOG_EVENT_THREAD_SPECIFIC_F, using_trans),
unknown's avatar
unknown committed
840
   data_buf(0), query(query_arg),
841
   db(thd_arg->db), q_len((uint32) query_length),
unknown's avatar
unknown committed
842 843 844
   error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno),
   thread_id(thd_arg->thread_id),
   /* save the original thread id; we already know the server id */
845
   slave_proxy_id(thd_arg->variables.pseudo_thread_id)
846 847 848 849 850 851
{
  time_t end_time;
  time(&end_time);
  exec_time = (ulong) (end_time  - thd->start_time);
  db_len = (db) ? (uint32) strlen(db) : 0;
}
unknown's avatar
unknown committed
852
#endif /* MYSQL_CLIENT */
853

unknown's avatar
unknown committed
854

unknown's avatar
unknown committed
855
/*
856
  Query_log_event::Query_log_event()
unknown's avatar
unknown committed
857
*/
858

859
Query_log_event::Query_log_event(const char* buf, int event_len,
unknown's avatar
unknown committed
860 861
				 bool old_format)
  :Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
unknown's avatar
unknown committed
862 863
{
  ulong data_len;
864 865 866 867 868 869 870 871 872 873 874 875 876 877
  if (old_format)
  {
    if ((uint)event_len < OLD_HEADER_LEN + QUERY_HEADER_LEN)
      return;				
    data_len = event_len - (QUERY_HEADER_LEN + OLD_HEADER_LEN);
    buf += OLD_HEADER_LEN;
  }
  else
  {
    if ((uint)event_len < QUERY_EVENT_OVERHEAD)
      return;				
    data_len = event_len - QUERY_EVENT_OVERHEAD;
    buf += LOG_EVENT_HEADER_LEN;
  }
unknown's avatar
unknown committed
878

879 880
  exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
  error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
unknown's avatar
unknown committed
881

882
  if (!(data_buf = (char*) my_malloc(data_len + 1, MYF(MY_WME))))
unknown's avatar
unknown committed
883 884
    return;

885
  memcpy(data_buf, buf + Q_DATA_OFFSET, data_len);
unknown's avatar
unknown committed
886
  slave_proxy_id= thread_id= uint4korr(buf + Q_THREAD_ID_OFFSET);
unknown's avatar
unknown committed
887
  db = data_buf;
888
  db_len = (uint)buf[Q_DB_LEN_OFFSET];
unknown's avatar
unknown committed
889 890 891 892 893
  query=data_buf + db_len + 1;
  q_len = data_len - 1 - db_len;
  *((char*)query+q_len) = 0;
}

894

unknown's avatar
unknown committed
895
/*
896
  Query_log_event::print()
unknown's avatar
unknown committed
897
*/
898

899
#ifdef MYSQL_CLIENT
900
void Query_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
901
{
902
  char buff[40],*end;				// Enough for SET TIMESTAMP
unknown's avatar
unknown committed
903 904
  if (!short_form)
  {
905
    print_header(file);
906 907
    fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
	    (ulong) thread_id, (ulong) exec_time, error_code);
unknown's avatar
unknown committed
908 909
  }

910 911
  bool same_db = 0;

unknown's avatar
unknown committed
912
  if (db && last_db)
913 914 915 916
  {
    if (!(same_db = !memcmp(last_db, db, db_len + 1)))
      memcpy(last_db, db, db_len + 1);
  }
917 918
  
  if (db && db[0] && !same_db)
unknown's avatar
unknown committed
919
    fprintf(file, "use %s;\n", db);
920 921 922 923
  end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
  *end++=';';
  *end++='\n';
  my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME));
unknown's avatar
unknown committed
924 925
  if (flags & LOG_EVENT_THREAD_SPECIFIC_F)
    fprintf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id);
unknown's avatar
unknown committed
926 927 928
  my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME));
  fprintf(file, ";\n");
}
unknown's avatar
unknown committed
929
#endif /* MYSQL_CLIENT */
930

unknown's avatar
unknown committed
931

unknown's avatar
unknown committed
932
/*
933
  Query_log_event::exec_event()
unknown's avatar
unknown committed
934
*/
unknown's avatar
unknown committed
935

unknown's avatar
SCRUM  
unknown committed
936
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
937
int Query_log_event::exec_event(struct st_relay_log_info* rli)
unknown's avatar
unknown committed
938
{
unknown's avatar
unknown committed
939 940
  int expected_error,actual_error= 0;
  thd->db= (char*) rewrite_db(db);
unknown's avatar
unknown committed
941

942
  /*
943 944 945 946 947 948 949
    InnoDB internally stores the master log position it has executed so far,
    i.e. the position just after the COMMIT event.
    When InnoDB will want to store, the positions in rli won't have
    been updated yet, so group_master_log_* will point to old BEGIN
    and event_master_log* will point to the beginning of current COMMIT.
    So the position to store is event_master_log_pos + event_len
    since we must store the pos of the END of the current log event (COMMIT).
950 951 952 953 954 955
  */
  rli->event_len= get_event_len();

  if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
  {
    thd->set_time((time_t)when);
956 957
    thd->query_length= q_len;
    thd->query = (char*)query;
unknown's avatar
unknown committed
958
    VOID(pthread_mutex_lock(&LOCK_thread_count));
959 960
    thd->query_id = query_id++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
961
    thd->query_error= 0;				// clear error
unknown's avatar
unknown committed
962
    thd->clear_error();
unknown's avatar
unknown committed
963
    thd->variables.pseudo_thread_id= thread_id;		// for temp tables
unknown's avatar
unknown committed
964

965 966 967 968 969 970 971 972 973 974
    /*
      Sanity check to make sure the master did not get a really bad
      error on the query.
    */
    if (ignored_error_code((expected_error = error_code)) ||
	!check_expected_error(thd,rli,expected_error))
    {
      mysql_log.write(thd,COM_QUERY,"%s",thd->query);
      DBUG_PRINT("query",("%s",thd->query));
      mysql_parse(thd, thd->query, q_len);
unknown's avatar
unknown committed
975

unknown's avatar
unknown committed
976 977 978 979
      /*
        If we expected a non-zero error code, and we don't get the same error
        code, and none of them should be ignored.
      */
980 981 982 983 984 985 986
      DBUG_PRINT("info",("expected_error: %d  last_errno: %d",
			 expected_error, thd->net.last_errno));
      if ((expected_error != (actual_error= thd->net.last_errno)) &&
	  expected_error &&
	  !ignored_error_code(actual_error) &&
	  !ignored_error_code(expected_error))
      {
unknown's avatar
unknown committed
987 988 989 990 991 992 993 994 995 996 997 998
	slave_print_error(rli, 0,
                          "\
Query '%s' caused different errors on master and slave. \
Error on master: '%s' (%d), Error on slave: '%s' (%d). \
Default database: '%s'",
                          query,
                          ER_SAFE(expected_error),
                          expected_error,
                          actual_error ? thd->net.last_error: "no error",
                          actual_error,
                          print_slave_db_safe(db));
	thd->query_error= 1;
999
      }
unknown's avatar
unknown committed
1000 1001 1002
      /*
        If we get the same error code as expected, or they should be ignored. 
      */
1003 1004 1005 1006 1007 1008 1009 1010
      else if (expected_error == actual_error ||
	       ignored_error_code(actual_error))
      {
	DBUG_PRINT("info",("error ignored"));
	thd->query_error = 0;
	*rli->last_slave_error = 0;
	rli->last_slave_errno = 0;
      }
unknown's avatar
unknown committed
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
      /*
        Other cases: mostly we expected no error and get one.
      */
      else if (thd->query_error || thd->is_fatal_error)
      {
        slave_print_error(rli,actual_error,
			  "Error '%s' on query '%s'. Default database: '%s'",
                          (actual_error ? thd->net.last_error :
			   "unexpected success or fatal error"),
			  query,
                          print_slave_db_safe(db));
        thd->query_error= 1;
      }
    } 
    /* 
       End of sanity check. If the test was wrong, the query got a really bad
       error on the master, which could be inconsistent, abort and tell DBA to
       check/fix it. check_expected_error() already printed the message to
       stderr and rli, and set thd->query_error to 1.
    */
  } /* End of if (db_ok(... */

1033
  VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
1034
  thd->db= 0;	                        // prevent db from being freed
1035
  thd->query= 0;			// just to be sure
1036
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
1037
  // assume no convert for next query unless set explictly
unknown's avatar
unknown committed
1038 1039 1040
#ifdef TO_BE_REMOVED
  thd->variables.convert_set = 0;
#endif
unknown's avatar
unknown committed
1041
  close_thread_tables(thd);      
unknown's avatar
unknown committed
1042
  free_root(&thd->mem_root,MYF(MY_KEEP_PREALLOC));
unknown's avatar
unknown committed
1043
  return (thd->query_error ? thd->query_error : Log_event::exec_event(rli)); 
unknown's avatar
unknown committed
1044
}
unknown's avatar
SCRUM  
unknown committed
1045
#endif
unknown's avatar
unknown committed
1046

unknown's avatar
unknown committed
1047

unknown's avatar
unknown committed
1048
/**************************************************************************
unknown's avatar
unknown committed
1049
	Start_log_event methods
unknown's avatar
unknown committed
1050
**************************************************************************/
1051

unknown's avatar
unknown committed
1052
/*
1053
  Start_log_event::pack_info()
unknown's avatar
unknown committed
1054
*/
1055

unknown's avatar
SCRUM  
unknown committed
1056
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
1057
void Start_log_event::pack_info(Protocol *protocol)
unknown's avatar
unknown committed
1058
{
1059 1060 1061 1062
  char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
  pos= strmov(buf, "Server ver: ");
  pos= strmov(pos, server_version);
  pos= strmov(pos, ", Binlog ver: ");
unknown's avatar
unknown committed
1063 1064
  pos= int10_to_str(binlog_version, pos, 10);
  protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
unknown's avatar
unknown committed
1065
}
unknown's avatar
SCRUM  
unknown committed
1066
#endif
1067 1068


unknown's avatar
unknown committed
1069
/*
1070
  Start_log_event::print()
unknown's avatar
unknown committed
1071
*/
unknown's avatar
unknown committed
1072 1073

#ifdef MYSQL_CLIENT
1074
void Start_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
1075
{
1076 1077 1078 1079 1080 1081
  if (short_form)
    return;

  print_header(file);
  fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version,
	  server_version);
unknown's avatar
unknown committed
1082 1083 1084
  print_timestamp(file);
  if (created)
    fprintf(file," at startup");
1085
  fputc('\n', file);
unknown's avatar
unknown committed
1086 1087
  fflush(file);
}
unknown's avatar
unknown committed
1088
#endif /* MYSQL_CLIENT */
unknown's avatar
unknown committed
1089

unknown's avatar
unknown committed
1090
/*
1091
  Start_log_event::Start_log_event()
unknown's avatar
unknown committed
1092
*/
1093 1094 1095 1096

Start_log_event::Start_log_event(const char* buf,
				 bool old_format)
  :Log_event(buf, old_format)
1097
{
1098 1099 1100 1101 1102
  buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  binlog_version = uint2korr(buf+ST_BINLOG_VER_OFFSET);
  memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
	 ST_SERVER_VER_LEN);
  created = uint4korr(buf+ST_CREATED_OFFSET);
unknown's avatar
unknown committed
1103 1104
}

1105

unknown's avatar
unknown committed
1106
/*
1107
  Start_log_event::write_data()
unknown's avatar
unknown committed
1108
*/
1109

1110
int Start_log_event::write_data(IO_CACHE* file)
1111
{
1112 1113 1114 1115 1116
  char buff[START_HEADER_LEN];
  int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
  memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
  int4store(buff + ST_CREATED_OFFSET,created);
  return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
1117
}
1118

unknown's avatar
unknown committed
1119
/*
1120 1121 1122 1123 1124
  Start_log_event::exec_event()

  The master started

  IMPLEMENTATION
unknown's avatar
unknown committed
1125 1126 1127 1128
    - To handle the case where the master died without having time to write
      DROP TEMPORARY TABLE, DO RELEASE_LOCK (prepared statements' deletion is
      TODO), we clean up all temporary tables that we got, if we are sure we
      can (see below).
1129 1130

  TODO
1131 1132 1133 1134 1135
    - Remove all active user locks.
      Guilhem 2003-06: this is true but not urgent: the worst it can cause is
      the use of a bit of memory for a user lock which will not be used
      anymore. If the user lock is later used, the old one will be released. In
      other words, no deadlock problem.
unknown's avatar
unknown committed
1136 1137
*/

unknown's avatar
SCRUM  
unknown committed
1138
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
1139 1140
int Start_log_event::exec_event(struct st_relay_log_info* rli)
{
1141
  DBUG_ENTER("Start_log_event::exec_event");
unknown's avatar
unknown committed
1142

unknown's avatar
unknown committed
1143 1144 1145 1146 1147 1148 1149 1150
  switch (rli->mi->old_format) {
  case BINLOG_FORMAT_CURRENT:
    /* 
       This is 4.x, so a Start_log_event is only at master startup,
       so we are sure the master has restarted and cleared his temp tables.
    */
    close_temporary_tables(thd);
    cleanup_load_tmpdir();
unknown's avatar
unknown committed
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160
    /*
      As a transaction NEVER spans on 2 or more binlogs:
      if we have an active transaction at this point, the master died while
      writing the transaction to the binary log, i.e. while flushing the binlog
      cache to the binlog. As the write was started, the transaction had been
      committed on the master, so we lack of information to replay this
      transaction on the slave; all we can do is stop with error.
    */
    if (thd->options & OPTION_BEGIN)
    {
unknown's avatar
unknown committed
1161 1162 1163 1164
      slave_print_error(rli, 0, "\
Rolling back unfinished transaction (no COMMIT or ROLLBACK) from relay log. \
Probably cause is that the master died while writing the transaction to it's \
binary log.");
unknown's avatar
unknown committed
1165 1166
      return(1);
    }
unknown's avatar
unknown committed
1167 1168
    break;

unknown's avatar
unknown committed
1169
    /* 
unknown's avatar
unknown committed
1170 1171
       Now the older formats; in that case load_tmpdir is cleaned up by the I/O
       thread.
unknown's avatar
unknown committed
1172
    */
unknown's avatar
unknown committed
1173
  case BINLOG_FORMAT_323_LESS_57:
unknown's avatar
unknown committed
1174
    /*
unknown's avatar
unknown committed
1175 1176 1177
      Cannot distinguish a Start_log_event generated at master startup and
      one generated by master FLUSH LOGS, so cannot be sure temp tables
      have to be dropped. So do nothing.
unknown's avatar
unknown committed
1178
    */
unknown's avatar
unknown committed
1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
    break;
  case BINLOG_FORMAT_323_GEQ_57:
    /*
      Can distinguish, based on the value of 'created',
      which was generated at master startup.
    */
    if (created)
      close_temporary_tables(thd);
    break;
  default:
    /* this case is impossible */
    return 1;
unknown's avatar
unknown committed
1191
  }
unknown's avatar
unknown committed
1192

1193
  DBUG_RETURN(Log_event::exec_event(rli));
1194
}
unknown's avatar
unknown committed
1195
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
1196

unknown's avatar
unknown committed
1197
/**************************************************************************
unknown's avatar
unknown committed
1198
	Load_log_event methods
unknown's avatar
unknown committed
1199
**************************************************************************/
1200

unknown's avatar
unknown committed
1201
/*
1202
  Load_log_event::pack_info()
unknown's avatar
unknown committed
1203
*/
1204

unknown's avatar
SCRUM  
unknown committed
1205
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
1206
void Load_log_event::pack_info(Protocol *protocol)
1207
{
1208 1209 1210 1211 1212 1213
  char *buf, *pos;
  uint buf_len;

  buf_len= 
    5 + db_len + 3 +                        // "use DB; "
    18 + fname_len + 2 +                    // "LOAD DATA INFILE 'file''"
unknown's avatar
unknown committed
1214
    7 +					    // LOCAL
1215 1216 1217 1218 1219 1220 1221 1222 1223 1224
    9 +                                     // " REPLACE or IGNORE "
    11 + table_name_len +                   // "INTO TABLE table"
    21 + sql_ex.field_term_len*4 + 2 +      // " FIELDS TERMINATED BY 'str'"
    23 + sql_ex.enclosed_len*4 + 2 +        // " OPTIONALLY ENCLOSED BY 'str'"
    12 + sql_ex.escaped_len*4 + 2 +         // " ESCAPED BY 'str'"
    21 + sql_ex.line_term_len*4 + 2 +       // " FIELDS TERMINATED BY 'str'"
    19 + sql_ex.line_start_len*4 + 2 +      // " LINES STARTING BY 'str'" 
    15 + 22 +                               // " IGNORE xxx  LINES" 
    3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"

unknown's avatar
unknown committed
1225
  if (!(buf= my_malloc(buf_len, MYF(MY_WME))))
1226 1227
    return;
  pos= buf;
1228
  if (db && db_len)
1229
  {
1230 1231
    pos= strmov(pos, "use `");
    memcpy(pos, db, db_len);
unknown's avatar
unknown committed
1232
    pos= strmov(pos+db_len, "`; ");
1233
  }
1234

unknown's avatar
unknown committed
1235 1236 1237 1238
  pos= strmov(pos, "LOAD DATA ");
  if (check_fname_outside_temp_buf())
    pos= strmov(pos, "LOCAL ");
  pos= strmov(pos, "INFILE '");
1239
  memcpy(pos, fname, fname_len);
unknown's avatar
unknown committed
1240
  pos= strmov(pos+fname_len, "' ");
1241

unknown's avatar
unknown committed
1242
  if (sql_ex.opt_flags & REPLACE_FLAG)
1243
    pos= strmov(pos, " REPLACE ");
unknown's avatar
unknown committed
1244
  else if (sql_ex.opt_flags & IGNORE_FLAG)
1245 1246 1247 1248 1249 1250
    pos= strmov(pos, " IGNORE ");

  pos= strmov(pos ,"INTO TABLE ");
  memcpy(pos, table_name, table_name_len);
  pos+= table_name_len;

1251
  if (sql_ex.field_term_len)
1252
  {
1253 1254
    pos= strmov(pos, " FIELDS TERMINATED BY ");
    pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
1255 1256 1257 1258
  }

  if (sql_ex.enclosed_len)
  {
unknown's avatar
unknown committed
1259
    if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
1260 1261 1262
      pos= strmov(pos, " OPTIONALLY ");
    pos= strmov(pos, " ENCLOSED BY ");
    pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
1263
  }
1264

1265 1266
  if (sql_ex.escaped_len)
  {
1267 1268
    pos= strmov(pos, " ESCAPED BY ");
    pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
1269
  }
1270

1271
  bool line_lexem_added= false;
1272 1273
  if (sql_ex.line_term_len)
  {
1274 1275
    pos= strmov(pos, " LINES TERMINATED BY ");
    pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
1276
    line_lexem_added= true;
1277 1278 1279 1280
  }

  if (sql_ex.line_start_len)
  {
1281 1282 1283
    if (!line_lexem_added)
      pos= strmov(pos," LINES");
    pos= strmov(pos, " STARTING BY ");
1284
    pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
1285
  }
1286

unknown's avatar
unknown committed
1287
  if ((long) skip_lines > 0)
1288 1289
  {
    pos= strmov(pos, " IGNORE ");
unknown's avatar
unknown committed
1290
    pos= longlong10_to_str((longlong) skip_lines, pos, 10);
1291 1292
    pos= strmov(pos," LINES ");    
  }
1293 1294 1295 1296

  if (num_fields)
  {
    uint i;
unknown's avatar
unknown committed
1297
    const char *field= fields;
1298
    pos= strmov(pos, " (");
1299 1300 1301
    for (i = 0; i < num_fields; i++)
    {
      if (i)
unknown's avatar
unknown committed
1302 1303 1304 1305
      {
        *pos++= ' ';
        *pos++= ',';
      }
1306
      memcpy(pos, field, field_lens[i]);
unknown's avatar
unknown committed
1307 1308
      pos+=   field_lens[i];
      field+= field_lens[i]  + 1;
1309
    }
1310
    *pos++= ')';
1311
  }
1312

1313
  protocol->store(buf, pos-buf, &my_charset_bin);
unknown's avatar
unknown committed
1314
  my_free(buf, MYF(0));
1315
}
unknown's avatar
unknown committed
1316
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
1317

1318

unknown's avatar
unknown committed
1319
/*
1320
  Load_log_event::write_data_header()
unknown's avatar
unknown committed
1321
*/
1322 1323

int Load_log_event::write_data_header(IO_CACHE* file)
1324
{
1325
  char buf[LOAD_HEADER_LEN];
unknown's avatar
unknown committed
1326
  int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
1327 1328 1329 1330 1331 1332
  int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
  int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
  buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
  buf[L_DB_LEN_OFFSET] = (char)db_len;
  int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
  return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN);
1333
}
1334

1335

unknown's avatar
unknown committed
1336
/*
1337
  Load_log_event::write_data_body()
unknown's avatar
unknown committed
1338
*/
1339 1340

int Load_log_event::write_data_body(IO_CACHE* file)
1341
{
1342 1343 1344
  if (sql_ex.write_data(file))
    return 1;
  if (num_fields && fields && field_lens)
1345
  {
1346 1347 1348
    if (my_b_safe_write(file, (byte*)field_lens, num_fields) ||
	my_b_safe_write(file, (byte*)fields, field_block_len))
      return 1;
1349
  }
1350 1351 1352
  return (my_b_safe_write(file, (byte*)table_name, table_name_len + 1) ||
	  my_b_safe_write(file, (byte*)db, db_len + 1) ||
	  my_b_safe_write(file, (byte*)fname, fname_len));
1353 1354
}

1355

unknown's avatar
unknown committed
1356
/*
1357
  Load_log_event::Load_log_event()
unknown's avatar
unknown committed
1358
*/
1359

1360
#ifndef MYSQL_CLIENT
unknown's avatar
unknown committed
1361 1362 1363
Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex,
			       const char *db_arg, const char *table_name_arg,
			       List<Item> &fields_arg,
unknown's avatar
unknown committed
1364 1365 1366
			       enum enum_duplicates handle_dup,
			       bool using_trans)
  :Log_event(thd_arg, 0, using_trans), thread_id(thd_arg->thread_id),
1367
   slave_proxy_id(thd_arg->variables.pseudo_thread_id),
unknown's avatar
unknown committed
1368 1369
   num_fields(0),fields(0),
   field_lens(0),field_block_len(0),
unknown's avatar
unknown committed
1370 1371
   table_name(table_name_arg ? table_name_arg : ""),
   db(db_arg), fname(ex->file_name)
unknown's avatar
unknown committed
1372 1373 1374
{
  time_t end_time;
  time(&end_time);
1375
  exec_time = (ulong) (end_time  - thd_arg->start_time);
1376 1377 1378
  /* db can never be a zero pointer in 4.0 */
  db_len = (uint32) strlen(db);
  table_name_len = (uint32) strlen(table_name);
unknown's avatar
unknown committed
1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391
  fname_len = (fname) ? (uint) strlen(fname) : 0;
  sql_ex.field_term = (char*) ex->field_term->ptr();
  sql_ex.field_term_len = (uint8) ex->field_term->length();
  sql_ex.enclosed = (char*) ex->enclosed->ptr();
  sql_ex.enclosed_len = (uint8) ex->enclosed->length();
  sql_ex.line_term = (char*) ex->line_term->ptr();
  sql_ex.line_term_len = (uint8) ex->line_term->length();
  sql_ex.line_start = (char*) ex->line_start->ptr();
  sql_ex.line_start_len = (uint8) ex->line_start->length();
  sql_ex.escaped = (char*) ex->escaped->ptr();
  sql_ex.escaped_len = (uint8) ex->escaped->length();
  sql_ex.opt_flags = 0;
  sql_ex.cached_new_format = -1;
1392
    
unknown's avatar
unknown committed
1393
  if (ex->dumpfile)
unknown's avatar
unknown committed
1394
    sql_ex.opt_flags|= DUMPFILE_FLAG;
unknown's avatar
unknown committed
1395
  if (ex->opt_enclosed)
unknown's avatar
unknown committed
1396
    sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
1397

unknown's avatar
unknown committed
1398
  sql_ex.empty_flags= 0;
1399

1400
  switch (handle_dup) {
unknown's avatar
unknown committed
1401
  case DUP_IGNORE:
unknown's avatar
unknown committed
1402
    sql_ex.opt_flags|= IGNORE_FLAG;
unknown's avatar
unknown committed
1403 1404
    break;
  case DUP_REPLACE:
unknown's avatar
unknown committed
1405
    sql_ex.opt_flags|= REPLACE_FLAG;
unknown's avatar
unknown committed
1406 1407 1408 1409
    break;
  case DUP_UPDATE:				// Impossible here
  case DUP_ERROR:
    break;	
unknown's avatar
unknown committed
1410
  }
1411

unknown's avatar
unknown committed
1412 1413 1414 1415 1416 1417 1418 1419 1420 1421
  if (!ex->field_term->length())
    sql_ex.empty_flags |= FIELD_TERM_EMPTY;
  if (!ex->enclosed->length())
    sql_ex.empty_flags |= ENCLOSED_EMPTY;
  if (!ex->line_term->length())
    sql_ex.empty_flags |= LINE_TERM_EMPTY;
  if (!ex->line_start->length())
    sql_ex.empty_flags |= LINE_START_EMPTY;
  if (!ex->escaped->length())
    sql_ex.empty_flags |= ESCAPED_EMPTY;
1422
    
unknown's avatar
unknown committed
1423
  skip_lines = ex->skip_lines;
1424

unknown's avatar
unknown committed
1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435
  List_iterator<Item> li(fields_arg);
  field_lens_buf.length(0);
  fields_buf.length(0);
  Item* item;
  while ((item = li++))
  {
    num_fields++;
    uchar len = (uchar) strlen(item->name);
    field_block_len += len + 1;
    fields_buf.append(item->name, len + 1);
    field_lens_buf.append((char*)&len, 1);
1436 1437
  }

unknown's avatar
unknown committed
1438 1439 1440
  field_lens = (const uchar*)field_lens_buf.ptr();
  fields = fields_buf.ptr();
}
unknown's avatar
unknown committed
1441
#endif /* !MYSQL_CLIENT */
1442

1443
/*
1444
  Load_log_event::Load_log_event()
1445

unknown's avatar
unknown committed
1446 1447 1448
  NOTE
    The caller must do buf[event_len] = 0 before he starts using the
    constructed event.
1449 1450
*/

unknown's avatar
unknown committed
1451
Load_log_event::Load_log_event(const char *buf, int event_len,
1452 1453
			       bool old_format)
  :Log_event(buf, old_format),num_fields(0),fields(0),
unknown's avatar
unknown committed
1454 1455
   field_lens(0),field_block_len(0),
   table_name(0),db(0),fname(0)
unknown's avatar
unknown committed
1456
{
1457
  if (!event_len) // derived class, will call copy_log_event() itself
1458
    return;
1459
  copy_log_event(buf, event_len, old_format);
1460 1461
}

1462

unknown's avatar
unknown committed
1463
/*
1464
  Load_log_event::copy_log_event()
unknown's avatar
unknown committed
1465
*/
1466

1467 1468
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
				   bool old_format)
1469
{
1470
  uint data_len;
1471
  char* buf_end = (char*)buf + event_len;
1472
  uint header_len= old_format ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
1473
  const char* data_head = buf + header_len;
unknown's avatar
unknown committed
1474
  slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
1475 1476 1477 1478 1479
  exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
  skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
  table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
  db_len = (uint)data_head[L_DB_LEN_OFFSET];
  num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
unknown's avatar
unknown committed
1480
	  
1481
  int body_offset = ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
1482
		     LOAD_HEADER_LEN + header_len :
1483
		     get_data_body_offset());
unknown's avatar
unknown committed
1484
  
unknown's avatar
unknown committed
1485
  if ((int) event_len < body_offset)
1486
    return 1;
1487 1488 1489 1490
  /*
    Sql_ex.init() on success returns the pointer to the first byte after
    the sql_ex structure, which is the start of field lengths array.
  */
1491 1492 1493 1494 1495 1496
  if (!(field_lens=(uchar*)sql_ex.init((char*)buf + body_offset,
		  buf_end,
		  buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
    return 1;
  
  data_len = event_len - body_offset;
1497
  if (num_fields > data_len) // simple sanity check against corruption
1498
    return 1;
1499
  for (uint i = 0; i < num_fields; i++)
1500
    field_block_len += (uint)field_lens[i] + 1;
1501

unknown's avatar
unknown committed
1502 1503 1504 1505
  fields = (char*)field_lens + num_fields;
  table_name  = fields + field_block_len;
  db = table_name + table_name_len + 1;
  fname = db + db_len + 1;
1506 1507
  fname_len = strlen(fname);
  // null termination is accomplished by the caller doing buf[event_len]=0
1508
  return 0;
unknown's avatar
unknown committed
1509 1510 1511
}


unknown's avatar
unknown committed
1512
/*
1513
  Load_log_event::print()
unknown's avatar
unknown committed
1514
*/
1515 1516

#ifdef MYSQL_CLIENT
1517
void Load_log_event::print(FILE* file, bool short_form, char* last_db)
unknown's avatar
unknown committed
1518 1519 1520 1521
{
  print(file, short_form, last_db, 0);
}

unknown's avatar
unknown committed
1522 1523 1524

void Load_log_event::print(FILE* file, bool short_form, char* last_db,
			   bool commented)
unknown's avatar
unknown committed
1525 1526 1527
{
  if (!short_form)
  {
1528
    print_header(file);
1529
    fprintf(file, "\tQuery\tthread_id=%ld\texec_time=%ld\n",
unknown's avatar
unknown committed
1530 1531 1532
	    thread_id, exec_time);
  }

1533
  bool same_db = 0;
unknown's avatar
unknown committed
1534 1535 1536 1537 1538
  if (db && last_db)
  {
    if (!(same_db = !memcmp(last_db, db, db_len + 1)))
      memcpy(last_db, db, db_len + 1);
  }
1539
  
unknown's avatar
unknown committed
1540
  if (db && db[0] && !same_db)
unknown's avatar
unknown committed
1541 1542 1543
    fprintf(file, "%suse %s;\n", 
            commented ? "# " : "",
            db);
unknown's avatar
unknown committed
1544

unknown's avatar
unknown committed
1545 1546
  fprintf(file, "%sLOAD DATA ",
          commented ? "# " : "");
1547 1548
  if (check_fname_outside_temp_buf())
    fprintf(file, "LOCAL ");
1549
  fprintf(file, "INFILE '%-*s' ", fname_len, fname);
unknown's avatar
unknown committed
1550

unknown's avatar
unknown committed
1551
  if (sql_ex.opt_flags & REPLACE_FLAG)
unknown's avatar
unknown committed
1552
    fprintf(file," REPLACE ");
unknown's avatar
unknown committed
1553
  else if (sql_ex.opt_flags & IGNORE_FLAG)
unknown's avatar
unknown committed
1554 1555 1556
    fprintf(file," IGNORE ");
  
  fprintf(file, "INTO TABLE %s ", table_name);
unknown's avatar
unknown committed
1557
  if (sql_ex.field_term)
unknown's avatar
unknown committed
1558 1559
  {
    fprintf(file, " FIELDS TERMINATED BY ");
1560
    pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len);
unknown's avatar
unknown committed
1561 1562
  }

unknown's avatar
unknown committed
1563
  if (sql_ex.enclosed)
unknown's avatar
unknown committed
1564
  {
unknown's avatar
unknown committed
1565
    if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
unknown's avatar
unknown committed
1566 1567
      fprintf(file," OPTIONALLY ");
    fprintf(file, " ENCLOSED BY ");
1568
    pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len);
unknown's avatar
unknown committed
1569 1570
  }
     
1571
  if (sql_ex.escaped)
unknown's avatar
unknown committed
1572 1573
  {
    fprintf(file, " ESCAPED BY ");
1574
    pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len);
unknown's avatar
unknown committed
1575 1576
  }
     
1577
  bool line_lexem_added= false;
1578
  if (sql_ex.line_term)
unknown's avatar
unknown committed
1579 1580
  {
    fprintf(file," LINES TERMINATED BY ");
1581
    pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len);
1582
    line_lexem_added= true;
unknown's avatar
unknown committed
1583 1584
  }

1585
  if (sql_ex.line_start)
unknown's avatar
unknown committed
1586
  {
1587 1588 1589
    if (!line_lexem_added)
      fprintf(file," LINES");
    fprintf(file," STARTING BY ");
1590
    pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len);
unknown's avatar
unknown committed
1591 1592
  }
     
1593 1594
  if ((long) skip_lines > 0)
    fprintf(file, " IGNORE %ld LINES", (long) skip_lines);
unknown's avatar
unknown committed
1595

1596 1597 1598 1599
  if (num_fields)
  {
    uint i;
    const char* field = fields;
1600 1601
    fprintf(file, " (");
    for (i = 0; i < num_fields; i++)
unknown's avatar
unknown committed
1602
    {
unknown's avatar
unknown committed
1603
      if (i)
1604 1605
	fputc(',', file);
      fprintf(file, field);
unknown's avatar
unknown committed
1606
	  
1607
      field += field_lens[i]  + 1;
unknown's avatar
unknown committed
1608
    }
1609 1610
    fputc(')', file);
  }
unknown's avatar
unknown committed
1611 1612 1613

  fprintf(file, ";\n");
}
unknown's avatar
unknown committed
1614
#endif /* MYSQL_CLIENT */
1615

1616

unknown's avatar
unknown committed
1617
/*
1618
  Load_log_event::set_fields()
unknown's avatar
unknown committed
1619
*/
1620

1621
#ifndef MYSQL_CLIENT
1622
void Load_log_event::set_fields(List<Item> &field_list)
unknown's avatar
unknown committed
1623 1624
{
  uint i;
unknown's avatar
unknown committed
1625
  const char* field = fields;
1626
  for (i= 0; i < num_fields; i++)
unknown's avatar
unknown committed
1627
  {
1628 1629
    field_list.push_back(new Item_field(db, table_name, field));	  
    field+= field_lens[i]  + 1;
unknown's avatar
unknown committed
1630
  }
unknown's avatar
unknown committed
1631
}
unknown's avatar
unknown committed
1632
#endif /* !MYSQL_CLIENT */
unknown's avatar
unknown committed
1633 1634


unknown's avatar
SCRUM  
unknown committed
1635
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
unknown's avatar
unknown committed
1636 1637
/*
  Does the data loading job when executing a LOAD DATA on the slave
1638

unknown's avatar
unknown committed
1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652
  SYNOPSIS
    Load_log_event::exec_event
      net  
      rli                             
      use_rli_only_for_errors	  - if set to 1, rli is provided to 
                                  Load_log_event::exec_event only for this 
				  function to have RPL_LOG_NAME and 
				  rli->last_slave_error, both being used by 
				  error reports. rli's position advancing
				  is skipped (done by the caller which is
				  Execute_load_log_event::exec_event).
				  - if set to 0, rli is provided for full use,
				  i.e. for error reports and position
				  advancing.
1653

unknown's avatar
unknown committed
1654 1655 1656 1657 1658 1659 1660
  DESCRIPTION
    Does the data loading job when executing a LOAD DATA on the slave
 
  RETURN VALUE
    0           Success                                                 
    1    	Failure
*/
1661

unknown's avatar
unknown committed
1662 1663
int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, 
			       bool use_rli_only_for_errors)
1664
{
unknown's avatar
unknown committed
1665
  thd->db= (char*) rewrite_db(db);
1666 1667
  DBUG_ASSERT(thd->query == 0);
  thd->query = 0;				// Should not be needed
1668
  thd->query_error = 0;
1669

unknown's avatar
unknown committed
1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681
  /*
    We test replicate_*_db rules. Note that we have already prepared the file
    to load, even if we are going to ignore and delete it now. So it is
    possible that we did a lot of disk writes for nothing. In other words, a
    big LOAD DATA INFILE on the master will still consume a lot of space on
    the slave (space in the relay log + space of temp files: twice the space
    of the file to load...) even if it will finally be ignored.
    TODO: fix this; this can be done by testing rules in
    Create_file_log_event::exec_event() and then discarding Append_block and
    al. Another way is do the filtering in the I/O thread (more efficient: no
    disk writes at all).
  */
1682
  if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
1683
  {
1684 1685 1686 1687 1688 1689 1690 1691 1692 1693
    thd->set_time((time_t)when);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    thd->query_id = query_id++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));

    TABLE_LIST tables;
    bzero((char*) &tables,sizeof(tables));
    tables.db = thd->db;
    tables.alias = tables.real_name = (char*)table_name;
    tables.lock_type = TL_WRITE;
unknown's avatar
unknown committed
1694
    tables.updating= 1;
1695 1696 1697 1698 1699 1700 1701 1702 1703 1704
    // the table will be opened in mysql_load    
    if (table_rules_on && !tables_ok(thd, &tables))
    {
      // TODO: this is a bug - this needs to be moved to the I/O thread
      if (net)
        skip_load_data_infile(net);
    }
    else
    {
      char llbuff[22];
unknown's avatar
unknown committed
1705
      enum enum_duplicates handle_dup;
unknown's avatar
unknown committed
1706 1707
      if (sql_ex.opt_flags & REPLACE_FLAG)
	handle_dup= DUP_REPLACE;
unknown's avatar
unknown committed
1708 1709 1710
      else if (sql_ex.opt_flags & IGNORE_FLAG)
        handle_dup= DUP_IGNORE;
      else
unknown's avatar
unknown committed
1711
      {
unknown's avatar
unknown committed
1712
        /*
unknown's avatar
unknown committed
1713
	  When replication is running fine, if it was DUP_ERROR on the
unknown's avatar
unknown committed
1714 1715 1716 1717
          master then we could choose DUP_IGNORE here, because if DUP_ERROR
          suceeded on master, and data is identical on the master and slave,
          then there should be no uniqueness errors on slave, so DUP_IGNORE is
          the same as DUP_ERROR. But in the unlikely case of uniqueness errors
unknown's avatar
unknown committed
1718 1719 1720
          (because the data on the master and slave happen to be different
	  (user error or bug), we want LOAD DATA to print an error message on
	  the slave to discover the problem.
unknown's avatar
unknown committed
1721 1722 1723 1724 1725

          If reading from net (a 3.23 master), mysql_load() will change this
          to DUP_IGNORE.
        */
        handle_dup= DUP_ERROR;
unknown's avatar
unknown committed
1726
      }
unknown's avatar
unknown committed
1727

unknown's avatar
unknown committed
1728
      sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
1729 1730 1731 1732 1733
      String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs);
      String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,log_cs);
      String line_term(sql_ex.line_term,sql_ex.line_term_len,log_cs);
      String line_start(sql_ex.line_start,sql_ex.line_start_len,log_cs);
      String escaped(sql_ex.escaped,sql_ex.escaped_len, log_cs);
unknown's avatar
unknown committed
1734 1735 1736 1737 1738
      ex.field_term= &field_term;
      ex.enclosed= &enclosed;
      ex.line_term= &line_term;
      ex.line_start= &line_start;
      ex.escaped= &escaped;
1739 1740 1741 1742 1743 1744

      ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
      if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
	ex.field_term->length(0);

      ex.skip_lines = skip_lines;
unknown's avatar
unknown committed
1745 1746
      List<Item> field_list;
      set_fields(field_list);
unknown's avatar
unknown committed
1747
      thd->variables.pseudo_thread_id= thread_id;
1748 1749 1750 1751 1752 1753 1754 1755 1756
      if (net)
      {
	// mysql_load will use thd->net to read the file
	thd->net.vio = net->vio;
	/*
	  Make sure the client does not get confused about the packet sequence
	*/
	thd->net.pkt_nr = net->pkt_nr;
      }
unknown's avatar
unknown committed
1757
      if (mysql_load(thd, &ex, &tables, field_list, handle_dup, net != 0,
1758 1759 1760
		     TL_WRITE))
	thd->query_error = 1;
      if (thd->cuted_fields)
unknown's avatar
unknown committed
1761
      {
unknown's avatar
unknown committed
1762 1763 1764 1765 1766 1767 1768 1769
	/* log_pos is the position of the LOAD event in the master log */
	sql_print_error("\
Slave: load data infile on table '%s' at log position %s in log \
'%s' produced %ld warning(s). Default database: '%s'",
                        (char*) table_name,
                        llstr(log_pos,llbuff), RPL_LOG_NAME, 
			(ulong) thd->cuted_fields,
                        print_slave_db_safe(db));
unknown's avatar
unknown committed
1770
      }
1771 1772 1773
      if (net)
        net->pkt_nr= thd->net.pkt_nr;
    }
1774 1775
  }
  else
1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790
  {
    /*
      We will just ask the master to send us /dev/null if we do not
      want to load the data.
      TODO: this a bug - needs to be done in I/O thread
    */
    if (net)
      skip_load_data_infile(net);
  }
	    
  thd->net.vio = 0; 
  thd->db= 0;					// prevent db from being freed
  close_thread_tables(thd);
  if (thd->query_error)
  {
unknown's avatar
unknown committed
1791 1792 1793 1794 1795 1796 1797 1798 1799 1800
    /* this err/sql_errno code is copy-paste from send_error() */
    const char *err;
    int sql_errno;
    if ((err=thd->net.last_error)[0])
      sql_errno=thd->net.last_errno;
    else
    {
      sql_errno=ER_UNKNOWN_ERROR;
      err=ER(sql_errno);       
    }
unknown's avatar
unknown committed
1801
    slave_print_error(rli,sql_errno,"\
unknown's avatar
unknown committed
1802
Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'",
unknown's avatar
unknown committed
1803
		      err, (char*)table_name, print_slave_db_safe(db));
unknown's avatar
unknown committed
1804
    free_root(&thd->mem_root,MYF(MY_KEEP_PREALLOC));
1805 1806
    return 1;
  }
unknown's avatar
unknown committed
1807
  free_root(&thd->mem_root,MYF(MY_KEEP_PREALLOC));
1808
	    
1809
  if (thd->is_fatal_error)
1810
  {
unknown's avatar
unknown committed
1811 1812 1813
    slave_print_error(rli,ER_UNKNOWN_ERROR, "\
Fatal error running LOAD DATA INFILE on table '%s'. Default database: '%s'",
		      (char*)table_name, print_slave_db_safe(db));
1814 1815 1816
    return 1;
  }

unknown's avatar
unknown committed
1817
  return ( use_rli_only_for_errors ? 0 : Log_event::exec_event(rli) ); 
1818
}
unknown's avatar
SCRUM  
unknown committed
1819
#endif
1820 1821


unknown's avatar
unknown committed
1822
/**************************************************************************
unknown's avatar
unknown committed
1823
  Rotate_log_event methods
unknown's avatar
unknown committed
1824
**************************************************************************/
1825

unknown's avatar
unknown committed
1826
/*
1827
  Rotate_log_event::pack_info()
unknown's avatar
unknown committed
1828
*/
1829

unknown's avatar
SCRUM  
unknown committed
1830
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
1831
void Rotate_log_event::pack_info(Protocol *protocol)
1832
{
unknown's avatar
unknown committed
1833
  char buf1[256], buf[22];
unknown's avatar
unknown committed
1834
  String tmp(buf1, sizeof(buf1), log_cs);
1835
  tmp.length(0);
unknown's avatar
unknown committed
1836 1837 1838 1839
  tmp.append(new_log_ident, ident_len);
  tmp.append(";pos=");
  tmp.append(llstr(pos,buf));
  protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
1840
}
unknown's avatar
SCRUM  
unknown committed
1841
#endif
1842

1843

unknown's avatar
unknown committed
1844
/*
1845
  Rotate_log_event::print()
unknown's avatar
unknown committed
1846
*/
1847 1848 1849

#ifdef MYSQL_CLIENT
void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
1850
{
1851
  char buf[22];
unknown's avatar
unknown committed
1852
  if (short_form)
1853
    return;
1854

1855
  print_header(file);
1856 1857 1858 1859 1860
  fprintf(file, "\tRotate to ");
  if (new_log_ident)
    my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, 
	      MYF(MY_NABP | MY_WME));
  fprintf(file, "  pos: %s", llstr(pos, buf));
1861
  fputc('\n', file);
1862
  fflush(file);
1863
}
unknown's avatar
unknown committed
1864
#endif /* MYSQL_CLIENT */
1865 1866


unknown's avatar
unknown committed
1867
/*
1868
  Rotate_log_event::Rotate_log_event()
unknown's avatar
unknown committed
1869
*/
1870

1871 1872 1873
Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
				   bool old_format)
  :Log_event(buf, old_format),new_log_ident(NULL),alloced(0)
1874
{
1875 1876 1877 1878
  // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
  int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  uint ident_offset;
  if (event_len < header_size)
1879
    return;
1880 1881 1882 1883 1884 1885
  buf += header_size;
  if (old_format)
  {
    ident_len = (uint)(event_len - OLD_HEADER_LEN);
    pos = 4;
    ident_offset = 0;
1886
  }
1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897
  else
  {
    ident_len = (uint)(event_len - ROTATE_EVENT_OVERHEAD);
    pos = uint8korr(buf + R_POS_OFFSET);
    ident_offset = ROTATE_HEADER_LEN;
  }
  set_if_smaller(ident_len,FN_REFLEN-1);
  if (!(new_log_ident= my_strdup_with_length((byte*) buf +
					     ident_offset,
					     (uint) ident_len,
					     MYF(MY_WME))))
1898
    return;
1899
  alloced = 1;
1900
}
1901 1902


unknown's avatar
unknown committed
1903
/*
1904
  Rotate_log_event::write_data()
unknown's avatar
unknown committed
1905
*/
1906

1907
int Rotate_log_event::write_data(IO_CACHE* file)
1908
{
1909
  char buf[ROTATE_HEADER_LEN];
unknown's avatar
unknown committed
1910
  int8store(buf + R_POS_OFFSET, pos);
1911 1912
  return (my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
	  my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len));
1913 1914
}

1915

unknown's avatar
unknown committed
1916
/*
1917 1918 1919
  Rotate_log_event::exec_event()

  Got a rotate log even from the master
1920

1921 1922 1923
  IMPLEMENTATION
    This is mainly used so that we can later figure out the logname and
    position for the master.
1924

1925 1926 1927 1928 1929
    We can't rotate the slave as this will cause infinitive rotations
    in a A -> B -> A setup.

  RETURN VALUES
    0	ok
unknown's avatar
unknown committed
1930
*/
1931

unknown's avatar
SCRUM  
unknown committed
1932
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
1933
int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
1934
{
1935 1936 1937
  DBUG_ENTER("Rotate_log_event::exec_event");

  pthread_mutex_lock(&rli->data_lock);
unknown's avatar
unknown committed
1938
  rli->event_relay_log_pos += get_event_len();
unknown's avatar
unknown committed
1939 1940 1941 1942 1943 1944 1945 1946 1947
  /*
    If we are in a transaction: the only normal case is when the I/O thread was
    copying a big transaction, then it was stopped and restarted: we have this
    in the relay log:
    BEGIN
    ...
    ROTATE (a fake one)
    ...
    COMMIT or ROLLBACK
unknown's avatar
unknown committed
1948 1949
    In that case, we don't want to touch the coordinates which correspond to
    the beginning of the transaction.
unknown's avatar
unknown committed
1950
  */
unknown's avatar
unknown committed
1951
  if (!(thd->options & OPTION_BEGIN))
unknown's avatar
unknown committed
1952
  {
unknown's avatar
unknown committed
1953 1954 1955 1956 1957 1958
    memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
    rli->notify_group_master_log_name_update();
    rli->group_master_log_pos = pos;
    rli->group_relay_log_pos = rli->event_relay_log_pos;
    DBUG_PRINT("info", ("group_master_log_pos: %lu",
                        (ulong) rli->group_master_log_pos));
unknown's avatar
unknown committed
1959
  }
1960 1961 1962 1963
  pthread_mutex_unlock(&rli->data_lock);
  pthread_cond_broadcast(&rli->data_cond);
  flush_relay_log_info(rli);
  DBUG_RETURN(0);
1964
}
unknown's avatar
SCRUM  
unknown committed
1965
#endif
1966 1967


unknown's avatar
unknown committed
1968
/**************************************************************************
unknown's avatar
unknown committed
1969
	Intvar_log_event methods
unknown's avatar
unknown committed
1970
**************************************************************************/
1971

unknown's avatar
unknown committed
1972
/*
1973
  Intvar_log_event::pack_info()
unknown's avatar
unknown committed
1974
*/
1975

unknown's avatar
SCRUM  
unknown committed
1976
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
1977
void Intvar_log_event::pack_info(Protocol *protocol)
1978
{
unknown's avatar
unknown committed
1979 1980
  char buf[256], *pos;
  pos= strmake(buf, get_var_type_name(), sizeof(buf)-23);
unknown's avatar
unknown committed
1981
  *pos++= '=';
unknown's avatar
unknown committed
1982
  pos= longlong10_to_str(val, pos, -10);
unknown's avatar
unknown committed
1983
  protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
1984
}
unknown's avatar
SCRUM  
unknown committed
1985
#endif
1986

unknown's avatar
unknown committed
1987

unknown's avatar
unknown committed
1988
/*
1989
  Intvar_log_event::Intvar_log_event()
unknown's avatar
unknown committed
1990
*/
1991

1992 1993
Intvar_log_event::Intvar_log_event(const char* buf, bool old_format)
  :Log_event(buf, old_format)
1994
{
1995 1996 1997
  buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  type = buf[I_TYPE_OFFSET];
  val = uint8korr(buf+I_VAL_OFFSET);
1998 1999
}

2000

unknown's avatar
unknown committed
2001
/*
2002
  Intvar_log_event::get_var_type_name()
unknown's avatar
unknown committed
2003
*/
2004 2005

const char* Intvar_log_event::get_var_type_name()
2006
{
2007 2008 2009 2010 2011
  switch(type) {
  case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
  case INSERT_ID_EVENT: return "INSERT_ID";
  default: /* impossible */ return "UNKNOWN";
  }
2012 2013
}

unknown's avatar
unknown committed
2014

unknown's avatar
unknown committed
2015
/*
2016
  Intvar_log_event::write_data()
unknown's avatar
unknown committed
2017
*/
2018 2019

int Intvar_log_event::write_data(IO_CACHE* file)
2020
{
2021 2022 2023 2024
  char buf[9];
  buf[I_TYPE_OFFSET] = type;
  int8store(buf + I_VAL_OFFSET, val);
  return my_b_safe_write(file, (byte*) buf, sizeof(buf));
2025 2026
}

2027

unknown's avatar
unknown committed
2028
/*
2029
  Intvar_log_event::print()
unknown's avatar
unknown committed
2030
*/
2031 2032 2033

#ifdef MYSQL_CLIENT
void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
2034
{
2035 2036 2037
  char llbuff[22];
  const char *msg;
  LINT_INIT(msg);
2038

2039 2040 2041 2042 2043
  if (!short_form)
  {
    print_header(file);
    fprintf(file, "\tIntvar\n");
  }
2044

2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055
  fprintf(file, "SET ");
  switch (type) {
  case LAST_INSERT_ID_EVENT:
    msg="LAST_INSERT_ID";
    break;
  case INSERT_ID_EVENT:
    msg="INSERT_ID";
    break;
  }
  fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff));
  fflush(file);
2056
}
2057
#endif
2058

2059

unknown's avatar
unknown committed
2060
/*
2061
  Intvar_log_event::exec_event()
unknown's avatar
unknown committed
2062
*/
2063

unknown's avatar
SCRUM  
unknown committed
2064
#if defined(HAVE_REPLICATION)&& !defined(MYSQL_CLIENT)
2065
int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
2066
{
2067 2068 2069 2070 2071 2072 2073 2074 2075
  switch (type) {
  case LAST_INSERT_ID_EVENT:
    thd->last_insert_id_used = 1;
    thd->last_insert_id = val;
    break;
  case INSERT_ID_EVENT:
    thd->next_insert_id = val;
    break;
  }
2076
  rli->inc_event_relay_log_pos(get_event_len());
2077
  return 0;
2078
}
unknown's avatar
SCRUM  
unknown committed
2079
#endif
2080

2081

unknown's avatar
unknown committed
2082
/**************************************************************************
2083
  Rand_log_event methods
unknown's avatar
unknown committed
2084
**************************************************************************/
2085

unknown's avatar
SCRUM  
unknown committed
2086
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2087
void Rand_log_event::pack_info(Protocol *protocol)
2088
{
unknown's avatar
unknown committed
2089 2090 2091 2092 2093
  char buf1[256], *pos;
  pos= strmov(buf1,"rand_seed1=");
  pos= int10_to_str((long) seed1, pos, 10);
  pos= strmov(pos, ",rand_seed2=");
  pos= int10_to_str((long) seed2, pos, 10);
2094
  protocol->store(buf1, (uint) (pos-buf1), &my_charset_bin);
2095
}
unknown's avatar
SCRUM  
unknown committed
2096
#endif
2097 2098 2099 2100


Rand_log_event::Rand_log_event(const char* buf, bool old_format)
  :Log_event(buf, old_format)
2101
{
2102 2103 2104
  buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  seed1 = uint8korr(buf+RAND_SEED1_OFFSET);
  seed2 = uint8korr(buf+RAND_SEED2_OFFSET);
2105 2106
}

2107 2108

int Rand_log_event::write_data(IO_CACHE* file)
2109
{
2110 2111 2112 2113 2114
  char buf[16];
  int8store(buf + RAND_SEED1_OFFSET, seed1);
  int8store(buf + RAND_SEED2_OFFSET, seed2);
  return my_b_safe_write(file, (byte*) buf, sizeof(buf));
}
2115

2116 2117 2118 2119

#ifdef MYSQL_CLIENT
void Rand_log_event::print(FILE* file, bool short_form, char* last_db)
{
unknown's avatar
unknown committed
2120
  char llbuff[22],llbuff2[22];
2121
  if (!short_form)
2122
  {
2123 2124
    print_header(file);
    fprintf(file, "\tRand\n");
2125
  }
unknown's avatar
unknown committed
2126
  fprintf(file, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n",
unknown's avatar
unknown committed
2127
	  llstr(seed1, llbuff),llstr(seed2, llbuff2));
2128
  fflush(file);
2129
}
unknown's avatar
unknown committed
2130
#endif /* MYSQL_CLIENT */
2131

2132

unknown's avatar
SCRUM  
unknown committed
2133
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2134
int Rand_log_event::exec_event(struct st_relay_log_info* rli)
2135
{
unknown's avatar
unknown committed
2136 2137
  thd->rand.seed1= (ulong) seed1;
  thd->rand.seed2= (ulong) seed2;
2138
  rli->inc_event_relay_log_pos(get_event_len());
2139 2140
  return 0;
}
unknown's avatar
unknown committed
2141
#endif /* !MYSQL_CLIENT */
2142

unknown's avatar
unknown committed
2143

unknown's avatar
unknown committed
2144
/**************************************************************************
2145
  User_var_log_event methods
unknown's avatar
unknown committed
2146
**************************************************************************/
unknown's avatar
unknown committed
2147

unknown's avatar
unknown committed
2148
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
unknown's avatar
unknown committed
2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167
void User_var_log_event::pack_info(Protocol* protocol)
{
  char *buf= 0;
  uint val_offset= 2 + name_len;
  uint event_len= val_offset;

  if (is_null)
  {
    buf= my_malloc(val_offset + 5, MYF(MY_WME));
    strmov(buf + val_offset, "NULL");
    event_len= val_offset + 4;
  }
  else
  {
    switch (type) {
    case REAL_RESULT:
      double real_val;
      float8get(real_val, val);
      buf= my_malloc(val_offset + FLOATING_POINT_BUFFER, MYF(MY_WME));
2168 2169
      event_len+= my_sprintf(buf + val_offset,
			     (buf + val_offset, "%.14g", real_val));
unknown's avatar
unknown committed
2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180
      break;
    case INT_RESULT:
      buf= my_malloc(val_offset + 22, MYF(MY_WME));
      event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
      break;
    case STRING_RESULT:
      /*
	This is correct as pack_info is used for SHOW BINLOG command
	only. But be carefull this is may be incorrect in other cases as
	string may contain \ and '.
      */
2181 2182
      event_len= val_offset + 2 + val_len;
      buf= my_malloc(event_len, MYF(MY_WME));
unknown's avatar
unknown committed
2183 2184
      buf[val_offset]= '\'';
      memcpy(buf + val_offset + 1, val, val_len);
2185
      buf[val_offset + val_len + 1]= '\'';
unknown's avatar
unknown committed
2186
      break;
2187
    case ROW_RESULT:
unknown's avatar
unknown committed
2188
    default:
unknown's avatar
unknown committed
2189 2190 2191 2192 2193 2194 2195
      DBUG_ASSERT(1);
      return;
    }
  }
  buf[0]= '@';
  buf[1+name_len]= '=';
  memcpy(buf+1, name, name_len);
2196
  protocol->store(buf, event_len, &my_charset_bin);
unknown's avatar
unknown committed
2197 2198
  my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
}
unknown's avatar
unknown committed
2199
#endif /* !MYSQL_CLIENT */
unknown's avatar
unknown committed
2200 2201 2202 2203 2204 2205 2206 2207


User_var_log_event::User_var_log_event(const char* buf, bool old_format)
  :Log_event(buf, old_format)
{
  buf+= (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
  name_len= uint4korr(buf);
  name= (char *) buf + UV_NAME_LEN_SIZE;
2208 2209
  buf+= UV_NAME_LEN_SIZE + name_len;
  is_null= (bool) *buf;
unknown's avatar
unknown committed
2210 2211 2212
  if (is_null)
  {
    type= STRING_RESULT;
2213
    charset_number= my_charset_bin.number;
unknown's avatar
unknown committed
2214 2215 2216 2217 2218
    val_len= 0;
    val= 0;  
  }
  else
  {
2219 2220 2221
    type= (Item_result) buf[UV_VAL_IS_NULL];
    charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
    val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + 
unknown's avatar
unknown committed
2222
		       UV_CHARSET_NUMBER_SIZE);
2223 2224
    val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
		   UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
unknown's avatar
unknown committed
2225 2226 2227 2228 2229 2230 2231 2232 2233
  }
}


int User_var_log_event::write_data(IO_CACHE* file)
{
  char buf[UV_NAME_LEN_SIZE];
  char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + 
	    UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
2234 2235 2236
  char buf2[8], *pos= buf2;
  uint buf1_length;

unknown's avatar
unknown committed
2237
  int4store(buf, name_len);
2238 2239 2240 2241 2242 2243 2244
  
  if ((buf1[0]= is_null))
  {
    buf1_length= 1;
    val_len= 0;
  }    
  else
unknown's avatar
unknown committed
2245 2246 2247 2248
  {
    buf1[1]= type;
    int4store(buf1 + 2, charset_number);
    int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
2249
    buf1_length= 10;
unknown's avatar
unknown committed
2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260

    switch (type) {
    case REAL_RESULT:
      float8store(buf2, *(double*) val);
      break;
    case INT_RESULT:
      int8store(buf2, *(longlong*) val);
      break;
    case STRING_RESULT:
      pos= val;
      break;
2261
    case ROW_RESULT:
unknown's avatar
unknown committed
2262
    default:
unknown's avatar
unknown committed
2263 2264 2265 2266 2267
      DBUG_ASSERT(1);
      return 0;
    }
  }
  return (my_b_safe_write(file, (byte*) buf, sizeof(buf))   ||
2268 2269 2270
	  my_b_safe_write(file, (byte*) name, name_len)     ||
	  my_b_safe_write(file, (byte*) buf1, buf1_length) ||
	  my_b_safe_write(file, (byte*) pos, val_len));
unknown's avatar
unknown committed
2271 2272
}

2273

unknown's avatar
unknown committed
2274
/*
unknown's avatar
unknown committed
2275
  User_var_log_event::print()
unknown's avatar
unknown committed
2276
*/
unknown's avatar
unknown committed
2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309

#ifdef MYSQL_CLIENT
void User_var_log_event::print(FILE* file, bool short_form, char* last_db)
{
  if (!short_form)
  {
    print_header(file);
    fprintf(file, "\tUser_var\n");
  }

  fprintf(file, "SET @");
  my_fwrite(file, (byte*) name, (uint) (name_len), MYF(MY_NABP | MY_WME));

  if (is_null)
  {
    fprintf(file, ":=NULL;\n");
  }
  else
  {
    switch (type) {
    case REAL_RESULT:
      double real_val;
      float8get(real_val, val);
      fprintf(file, ":=%.14g;\n", real_val);
      break;
    case INT_RESULT:
      char int_buf[22];
      longlong10_to_str(uint8korr(val), int_buf, -10);
      fprintf(file, ":=%s;\n", int_buf);
      break;
    case STRING_RESULT:
      fprintf(file, ":='%s';\n", val);
      break;
2310
    case ROW_RESULT:
unknown's avatar
unknown committed
2311
    default:
unknown's avatar
unknown committed
2312
      DBUG_ASSERT(1);
unknown's avatar
unknown committed
2313 2314 2315 2316 2317
      return;
    }
  }
  fflush(file);
}
unknown's avatar
SCRUM  
unknown committed
2318
#endif
2319

unknown's avatar
unknown committed
2320

unknown's avatar
unknown committed
2321
/*
unknown's avatar
unknown committed
2322
  User_var_log_event::exec_event()
unknown's avatar
unknown committed
2323
*/
unknown's avatar
unknown committed
2324

unknown's avatar
unknown committed
2325
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
unknown's avatar
unknown committed
2326 2327 2328
int User_var_log_event::exec_event(struct st_relay_log_info* rli)
{
  Item *it= 0;
2329
  CHARSET_INFO *charset= get_charset(charset_number, MYF(0));
unknown's avatar
unknown committed
2330 2331 2332
  LEX_STRING user_var_name;
  user_var_name.str= name;
  user_var_name.length= name_len;
2333 2334
  double real_val;
  longlong int_val;
unknown's avatar
unknown committed
2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345

  if (is_null)
  {
    it= new Item_null();
  }
  else
  {
    switch (type) {
    case REAL_RESULT:
      float8get(real_val, val);
      it= new Item_real(real_val);
2346
      val= (char*) &real_val;		// Pointer to value in native format
2347
      val_len= 8;
unknown's avatar
unknown committed
2348 2349
      break;
    case INT_RESULT:
2350 2351 2352
      int_val= (longlong) uint8korr(val);
      it= new Item_int(int_val);
      val= (char*) &int_val;		// Pointer to value in native format
2353
      val_len= 8;
unknown's avatar
unknown committed
2354 2355 2356 2357
      break;
    case STRING_RESULT:
      it= new Item_string(val, val_len, charset);
      break;
2358
    case ROW_RESULT:
unknown's avatar
unknown committed
2359
    default:
unknown's avatar
unknown committed
2360 2361 2362 2363 2364 2365
      DBUG_ASSERT(1);
      return 0;
    }
  }
  Item_func_set_user_var e(user_var_name, it);
  e.fix_fields(thd, 0, 0);
2366
  e.update_hash(val, val_len, type, charset, DERIVATION_NONE);
unknown's avatar
unknown committed
2367 2368
  free_root(&thd->mem_root,0);

2369
  rli->inc_event_relay_log_pos(get_event_len());
unknown's avatar
unknown committed
2370 2371
  return 0;
}
unknown's avatar
unknown committed
2372
#endif /* !MYSQL_CLIENT */
2373 2374


unknown's avatar
unknown committed
2375
/**************************************************************************
2376
  Slave_log_event methods
unknown's avatar
unknown committed
2377
**************************************************************************/
unknown's avatar
unknown committed
2378

unknown's avatar
SCRUM  
unknown committed
2379
#ifdef HAVE_REPLICATION
2380 2381 2382 2383 2384 2385 2386 2387 2388 2389
#ifdef MYSQL_CLIENT
void Unknown_log_event::print(FILE* file, bool short_form, char* last_db)
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
  fprintf(file, "# %s", "Unknown event\n");
}
#endif  
2390

2391
#ifndef MYSQL_CLIENT
2392
void Slave_log_event::pack_info(Protocol *protocol)
2393
{
unknown's avatar
unknown committed
2394
  char buf[256+HOSTNAME_LENGTH], *pos;
2395 2396 2397 2398 2399 2400 2401
  pos= strmov(buf, "host=");
  pos= strnmov(pos, master_host, HOSTNAME_LENGTH);
  pos= strmov(pos, ",port=");
  pos= int10_to_str((long) master_port, pos, 10);
  pos= strmov(pos, ",log=");
  pos= strmov(pos, master_log);
  pos= strmov(pos, ",pos=");
unknown's avatar
unknown committed
2402
  pos= longlong10_to_str(master_pos, pos, 10);
2403
  protocol->store(buf, pos-buf, &my_charset_bin);
2404
}
unknown's avatar
unknown committed
2405
#endif /* !MYSQL_CLIENT */
2406 2407 2408 2409


#ifndef MYSQL_CLIENT
Slave_log_event::Slave_log_event(THD* thd_arg,
unknown's avatar
unknown committed
2410 2411
				 struct st_relay_log_info* rli)
  :Log_event(thd_arg, 0, 0), mem_pool(0), master_host(0)
2412 2413 2414 2415 2416 2417 2418 2419 2420 2421
{
  DBUG_ENTER("Slave_log_event");
  if (!rli->inited)				// QQ When can this happen ?
    DBUG_VOID_RETURN;
  
  MASTER_INFO* mi = rli->mi;
  // TODO: re-write this better without holding both locks at the same time
  pthread_mutex_lock(&mi->data_lock);
  pthread_mutex_lock(&rli->data_lock);
  master_host_len = strlen(mi->host);
2422
  master_log_len = strlen(rli->group_master_log_name);
2423 2424 2425
  // on OOM, just do not initialize the structure and print the error
  if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
				   MYF(MY_WME))))
2426
  {
2427 2428 2429
    master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
    memcpy(master_host, mi->host, master_host_len + 1);
    master_log = master_host + master_host_len + 1;
2430
    memcpy(master_log, rli->group_master_log_name, master_log_len + 1);
2431
    master_port = mi->port;
2432
    master_pos = rli->group_master_log_pos;
2433 2434
    DBUG_PRINT("info", ("master_log: %s  pos: %d", master_log,
			(ulong) master_pos));
2435
  }
2436 2437 2438 2439 2440 2441
  else
    sql_print_error("Out of memory while recording slave event");
  pthread_mutex_unlock(&rli->data_lock);
  pthread_mutex_unlock(&mi->data_lock);
  DBUG_VOID_RETURN;
}
unknown's avatar
unknown committed
2442
#endif /* !MYSQL_CLIENT */
2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458


Slave_log_event::~Slave_log_event()
{
  my_free(mem_pool, MYF(MY_ALLOW_ZERO_PTR));
}


#ifdef MYSQL_CLIENT
void Slave_log_event::print(FILE* file, bool short_form, char* last_db)
{
  char llbuff[22];
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
unknown's avatar
unknown committed
2459 2460
  fprintf(file, "\
Slave: master_host: '%s'  master_port: %d  master_log: '%s'  master_pos: %s\n",
2461 2462
	  master_host, master_port, master_log, llstr(master_pos, llbuff));
}
unknown's avatar
unknown committed
2463
#endif /* MYSQL_CLIENT */
2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489


int Slave_log_event::get_data_size()
{
  return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
}


int Slave_log_event::write_data(IO_CACHE* file)
{
  int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
  int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
  // log and host are already there
  return my_b_safe_write(file, (byte*)mem_pool, get_data_size());
}


void Slave_log_event::init_from_mem_pool(int data_size)
{
  master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
  master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
  master_host = mem_pool + SL_MASTER_HOST_OFFSET;
  master_host_len = strlen(master_host);
  // safety
  master_log = master_host + master_host_len + 1;
  if (master_log > mem_pool + data_size)
2490
  {
2491 2492
    master_host = 0;
    return;
2493
  }
2494 2495
  master_log_len = strlen(master_log);
}
2496

2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508

Slave_log_event::Slave_log_event(const char* buf, int event_len)
  :Log_event(buf,0),mem_pool(0),master_host(0)
{
  event_len -= LOG_EVENT_HEADER_LEN;
  if (event_len < 0)
    return;
  if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME))))
    return;
  memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
  mem_pool[event_len] = 0;
  init_from_mem_pool(event_len);
2509 2510
}

2511

2512 2513 2514 2515 2516 2517 2518
#ifndef MYSQL_CLIENT
int Slave_log_event::exec_event(struct st_relay_log_info* rli)
{
  if (mysql_bin_log.is_open())
    mysql_bin_log.write(this);
  return Log_event::exec_event(rli);
}
unknown's avatar
unknown committed
2519
#endif /* !MYSQL_CLIENT */
2520 2521


unknown's avatar
unknown committed
2522
/**************************************************************************
unknown's avatar
unknown committed
2523
	Stop_log_event methods
unknown's avatar
unknown committed
2524
**************************************************************************/
2525

unknown's avatar
unknown committed
2526
/*
2527
  Stop_log_event::print()
2528
*/
2529 2530 2531 2532 2533 2534 2535 2536 2537 2538

#ifdef MYSQL_CLIENT
void Stop_log_event::print(FILE* file, bool short_form, char* last_db)
{
  if (short_form)
    return;

  print_header(file);
  fprintf(file, "\tStop\n");
  fflush(file);
2539
}
unknown's avatar
unknown committed
2540
#endif /* MYSQL_CLIENT */
2541

2542

2543
/*
2544
  Stop_log_event::exec_event()
2545

2546
  The master stopped. 
unknown's avatar
unknown committed
2547 2548
  We used to clean up all temporary tables but this is useless as, as the
  master has shut down properly, it has written all DROP TEMPORARY TABLE and DO
2549 2550 2551 2552 2553 2554
  RELEASE_LOCK (prepared statements' deletion is TODO).
  We used to clean up slave_load_tmpdir, but this is useless as it has been
  cleared at the end of LOAD DATA INFILE.
  So we have nothing to do here.
  The place were we must do this cleaning is in Start_log_event::exec_event(),
  not here. Because if we come here, the master was sane.
2555 2556
*/

2557
#ifndef MYSQL_CLIENT
2558
int Stop_log_event::exec_event(struct st_relay_log_info* rli)
2559
{
unknown's avatar
unknown committed
2560 2561
  /*
    We do not want to update master_log pos because we get a rotate event
2562
    before stop, so by now group_master_log_name is set to the next log.
2563
    If we updated it, we will have incorrect master coordinates and this
unknown's avatar
unknown committed
2564
    could give false triggers in MASTER_POS_WAIT() that we have reached
2565
    the target position when in fact we have not.
unknown's avatar
unknown committed
2566
  */
unknown's avatar
unknown committed
2567
  rli->inc_group_relay_log_pos(get_event_len(), 0);
2568
  flush_relay_log_info(rli);
2569 2570
  return 0;
}
unknown's avatar
unknown committed
2571
#endif /* !MYSQL_CLIENT */
unknown's avatar
SCRUM  
unknown committed
2572
#endif /* HAVE_REPLICATION */
2573

2574

unknown's avatar
unknown committed
2575
/**************************************************************************
unknown's avatar
unknown committed
2576
	Create_file_log_event methods
unknown's avatar
unknown committed
2577
**************************************************************************/
2578 2579

/*
2580
  Create_file_log_event ctor
unknown's avatar
unknown committed
2581
*/
2582 2583

#ifndef MYSQL_CLIENT
unknown's avatar
unknown committed
2584 2585 2586 2587 2588 2589 2590
Create_file_log_event::
Create_file_log_event(THD* thd_arg, sql_exchange* ex,
		      const char* db_arg, const char* table_name_arg,
		      List<Item>& fields_arg, enum enum_duplicates handle_dup,
		      char* block_arg, uint block_len_arg, bool using_trans)
  :Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup,
		  using_trans),
2591 2592
   fake_base(0),block(block_arg),block_len(block_len_arg),
   file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
2593
{
2594 2595
  sql_ex.force_new_format();
}
unknown's avatar
unknown committed
2596
#endif /* !MYSQL_CLIENT */
2597

2598

unknown's avatar
unknown committed
2599
/*
2600
  Create_file_log_event::write_data_body()
unknown's avatar
unknown committed
2601
*/
2602 2603 2604 2605 2606 2607 2608 2609

int Create_file_log_event::write_data_body(IO_CACHE* file)
{
  int res;
  if ((res = Load_log_event::write_data_body(file)) || fake_base)
    return res;
  return (my_b_safe_write(file, (byte*) "", 1) ||
	  my_b_safe_write(file, (byte*) block, block_len));
2610 2611
}

2612

unknown's avatar
unknown committed
2613
/*
2614
  Create_file_log_event::write_data_header()
unknown's avatar
unknown committed
2615
*/
unknown's avatar
unknown committed
2616

2617
int Create_file_log_event::write_data_header(IO_CACHE* file)
2618
{
2619 2620 2621 2622 2623 2624 2625 2626 2627
  int res;
  if ((res = Load_log_event::write_data_header(file)) || fake_base)
    return res;
  byte buf[CREATE_FILE_HEADER_LEN];
  int4store(buf + CF_FILE_ID_OFFSET, file_id);
  return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN);
}


unknown's avatar
unknown committed
2628
/*
2629
  Create_file_log_event::write_base()
unknown's avatar
unknown committed
2630
*/
2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641

int Create_file_log_event::write_base(IO_CACHE* file)
{
  int res;
  fake_base = 1; // pretend we are Load event
  res = write(file);
  fake_base = 0;
  return res;
}


unknown's avatar
unknown committed
2642
/*
2643
  Create_file_log_event ctor
unknown's avatar
unknown committed
2644
*/
2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668

Create_file_log_event::Create_file_log_event(const char* buf, int len,
					     bool old_format)
  :Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0)
{
  int block_offset;
  if (copy_log_event(buf,len,old_format))
    return;
  if (!old_format)
  {
    file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
			+ LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
    // + 1 for \0 terminating fname  
    block_offset = (LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
		    CREATE_FILE_HEADER_LEN + 1);
    if (len < block_offset)
      return;
    block = (char*)buf + block_offset;
    block_len = len - block_offset;
  }
  else
  {
    sql_ex.force_new_format();
    inited_from_old = 1;
2669 2670 2671
  }
}

2672

unknown's avatar
unknown committed
2673
/*
2674
  Create_file_log_event::print()
unknown's avatar
unknown committed
2675
*/
2676 2677

#ifdef MYSQL_CLIENT
2678 2679
void Create_file_log_event::print(FILE* file, bool short_form, 
				  char* last_db, bool enable_local)
unknown's avatar
unknown committed
2680
{
2681
  if (short_form)
2682 2683 2684
  {
    if (enable_local && check_fname_outside_temp_buf())
      Load_log_event::print(file, 1, last_db);
2685
    return;
2686 2687 2688 2689
  }

  if (enable_local)
  {
unknown's avatar
unknown committed
2690 2691 2692 2693 2694 2695
    Load_log_event::print(file, 1, last_db, !check_fname_outside_temp_buf());
    /* 
       That one is for "file_id: etc" below: in mysqlbinlog we want the #, in
       SHOW BINLOG EVENTS we don't.
    */
    fprintf(file, "#"); 
2696 2697
  }

2698
  fprintf(file, " file_id: %d  block_len: %d\n", file_id, block_len);
unknown's avatar
unknown committed
2699
}
2700

unknown's avatar
unknown committed
2701

2702 2703 2704 2705 2706
void Create_file_log_event::print(FILE* file, bool short_form,
				  char* last_db)
{
  print(file,short_form,last_db,0);
}
unknown's avatar
unknown committed
2707
#endif /* MYSQL_CLIENT */
unknown's avatar
unknown committed
2708

2709

unknown's avatar
unknown committed
2710
/*
2711
  Create_file_log_event::pack_info()
unknown's avatar
unknown committed
2712
*/
2713

unknown's avatar
SCRUM  
unknown committed
2714
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2715
void Create_file_log_event::pack_info(Protocol *protocol)
2716
{
2717 2718 2719
  char buf[NAME_LEN*2 + 30 + 21*2], *pos;
  pos= strmov(buf, "db=");
  memcpy(pos, db, db_len);
unknown's avatar
unknown committed
2720
  pos= strmov(pos + db_len, ";table=");
2721
  memcpy(pos, table_name, table_name_len);
unknown's avatar
unknown committed
2722
  pos= strmov(pos + table_name_len, ";file_id=");
2723 2724 2725
  pos= int10_to_str((long) file_id, pos, 10);
  pos= strmov(pos, ";block_len=");
  pos= int10_to_str((long) block_len, pos, 10);
unknown's avatar
unknown committed
2726
  protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
2727
}
unknown's avatar
unknown committed
2728
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
2729 2730


unknown's avatar
unknown committed
2731
/*
2732
  Create_file_log_event::exec_event()
unknown's avatar
unknown committed
2733
*/
2734

unknown's avatar
SCRUM  
unknown committed
2735
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2736
int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
2737 2738
{
  char fname_buf[FN_REFLEN+10];
unknown's avatar
unknown committed
2739
  char *p;
2740 2741 2742
  int fd = -1;
  IO_CACHE file;
  int error = 1;
unknown's avatar
unknown committed
2743

2744
  bzero((char*)&file, sizeof(file));
unknown's avatar
unknown committed
2745 2746
  p = slave_load_file_stem(fname_buf, file_id, server_id);
  strmov(p, ".info");			// strmov takes less code than memcpy
2747 2748 2749 2750 2751
  if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
		    MYF(MY_WME))) < 0 ||
      init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
		    MYF(MY_WME|MY_NABP)))
  {
unknown's avatar
unknown committed
2752
    slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf);
2753 2754 2755 2756
    goto err;
  }
  
  // a trick to avoid allocating another buffer
unknown's avatar
unknown committed
2757
  strmov(p, ".data");
2758 2759 2760 2761
  fname = fname_buf;
  fname_len = (uint)(p-fname) + 5;
  if (write_base(&file))
  {
unknown's avatar
unknown committed
2762
    strmov(p, ".info"); // to have it right in the error message
unknown's avatar
unknown committed
2763 2764 2765
    slave_print_error(rli,my_errno,
		      "Error in Create_file event: could not write to file '%s'",
		      fname_buf);
2766 2767 2768 2769 2770 2771 2772 2773 2774
    goto err;
  }
  end_io_cache(&file);
  my_close(fd, MYF(0));
  
  // fname_buf now already has .data, not .info, because we did our trick
  if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
		    MYF(MY_WME))) < 0)
  {
unknown's avatar
unknown committed
2775
    slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf);
2776 2777
    goto err;
  }
unknown's avatar
unknown committed
2778
  if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
2779
  {
unknown's avatar
unknown committed
2780
    slave_print_error(rli,my_errno, "Error in Create_file event: write to '%s' failed", fname_buf);
2781 2782
    goto err;
  }
2783 2784
  error=0;					// Everything is ok

2785 2786 2787 2788 2789
err:
  if (error)
    end_io_cache(&file);
  if (fd >= 0)
    my_close(fd, MYF(0));
2790
  return error ? 1 : Log_event::exec_event(rli);
2791
}
unknown's avatar
unknown committed
2792
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
2793

2794

unknown's avatar
unknown committed
2795
/**************************************************************************
unknown's avatar
unknown committed
2796
	Append_block_log_event methods
unknown's avatar
unknown committed
2797
**************************************************************************/
2798

unknown's avatar
unknown committed
2799
/*
2800
  Append_block_log_event ctor
unknown's avatar
unknown committed
2801
*/
2802 2803

#ifndef MYSQL_CLIENT  
unknown's avatar
unknown committed
2804 2805
Append_block_log_event::Append_block_log_event(THD* thd_arg, const char* db_arg,
					       char* block_arg,
unknown's avatar
unknown committed
2806 2807 2808
					       uint block_len_arg,
					       bool using_trans)
  :Log_event(thd_arg,0, using_trans), block(block_arg),
unknown's avatar
unknown committed
2809
   block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
2810 2811
{
}
unknown's avatar
unknown committed
2812
#endif
2813 2814


unknown's avatar
unknown committed
2815
/*
2816
  Append_block_log_event ctor
unknown's avatar
unknown committed
2817
*/
2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829

Append_block_log_event::Append_block_log_event(const char* buf, int len)
  :Log_event(buf, 0),block(0)
{
  if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
    return;
  file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
  block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD;
  block_len = len - APPEND_BLOCK_EVENT_OVERHEAD;
}


unknown's avatar
unknown committed
2830
/*
2831
  Append_block_log_event::write_data()
unknown's avatar
unknown committed
2832
*/
2833 2834 2835 2836 2837 2838 2839 2840 2841 2842

int Append_block_log_event::write_data(IO_CACHE* file)
{
  byte buf[APPEND_BLOCK_HEADER_LEN];
  int4store(buf + AB_FILE_ID_OFFSET, file_id);
  return (my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
	  my_b_safe_write(file, (byte*) block, block_len));
}


unknown's avatar
unknown committed
2843
/*
2844
  Append_block_log_event::print()
unknown's avatar
unknown committed
2845
*/
2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857

#ifdef MYSQL_CLIENT  
void Append_block_log_event::print(FILE* file, bool short_form,
				   char* last_db)
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
  fprintf(file, "#Append_block: file_id: %d  block_len: %d\n",
	  file_id, block_len);
}
unknown's avatar
unknown committed
2858
#endif /* MYSQL_CLIENT */
2859 2860


unknown's avatar
unknown committed
2861
/*
2862
  Append_block_log_event::pack_info()
unknown's avatar
unknown committed
2863
*/
2864

unknown's avatar
SCRUM  
unknown committed
2865
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2866
void Append_block_log_event::pack_info(Protocol *protocol)
2867 2868 2869 2870 2871 2872
{
  char buf[256];
  uint length;
  length= (uint) my_sprintf(buf,
			    (buf, ";file_id=%u;block_len=%u", file_id,
			     block_len));
unknown's avatar
unknown committed
2873
  protocol->store(buf, length, &my_charset_bin);
2874
}
unknown's avatar
unknown committed
2875
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
2876 2877


unknown's avatar
unknown committed
2878
/*
2879
  Append_block_log_event::exec_event()
unknown's avatar
unknown committed
2880
*/
2881

unknown's avatar
SCRUM  
unknown committed
2882
#if defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2883
int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
2884 2885
{
  char fname[FN_REFLEN+10];
2886 2887
  char *p= slave_load_file_stem(fname, file_id, server_id);
  int fd;
2888
  int error = 1;
2889

2890 2891 2892
  memcpy(p, ".data", 6);
  if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0)
  {
unknown's avatar
unknown committed
2893
    slave_print_error(rli,my_errno, "Error in Append_block event: could not open file '%s'", fname);
2894 2895
    goto err;
  }
unknown's avatar
unknown committed
2896
  if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
2897
  {
unknown's avatar
unknown committed
2898
    slave_print_error(rli,my_errno, "Error in Append_block event: write to '%s' failed", fname);
2899 2900 2901
    goto err;
  }
  error=0;
2902

2903 2904 2905
err:
  if (fd >= 0)
    my_close(fd, MYF(0));
2906
  return error ? error : Log_event::exec_event(rli);
2907
}
unknown's avatar
SCRUM  
unknown committed
2908
#endif
2909 2910


unknown's avatar
unknown committed
2911
/**************************************************************************
unknown's avatar
unknown committed
2912
	Delete_file_log_event methods
unknown's avatar
unknown committed
2913
**************************************************************************/
2914

unknown's avatar
unknown committed
2915
/*
2916
  Delete_file_log_event ctor
unknown's avatar
unknown committed
2917
*/
2918 2919

#ifndef MYSQL_CLIENT
unknown's avatar
unknown committed
2920 2921 2922
Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg,
					     bool using_trans)
  :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
2923 2924
{
}
unknown's avatar
unknown committed
2925
#endif
2926

unknown's avatar
unknown committed
2927
/*
2928
  Delete_file_log_event ctor
unknown's avatar
unknown committed
2929
*/
2930 2931 2932 2933 2934 2935 2936 2937 2938 2939

Delete_file_log_event::Delete_file_log_event(const char* buf, int len)
  :Log_event(buf, 0),file_id(0)
{
  if ((uint)len < DELETE_FILE_EVENT_OVERHEAD)
    return;
  file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
}


unknown's avatar
unknown committed
2940
/*
2941
  Delete_file_log_event::write_data()
unknown's avatar
unknown committed
2942
*/
2943 2944 2945 2946 2947 2948 2949 2950 2951

int Delete_file_log_event::write_data(IO_CACHE* file)
{
 byte buf[DELETE_FILE_HEADER_LEN];
 int4store(buf + DF_FILE_ID_OFFSET, file_id);
 return my_b_safe_write(file, buf, DELETE_FILE_HEADER_LEN);
}


unknown's avatar
unknown committed
2952
/*
2953
  Delete_file_log_event::print()
unknown's avatar
unknown committed
2954
*/
2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965

#ifdef MYSQL_CLIENT  
void Delete_file_log_event::print(FILE* file, bool short_form,
				  char* last_db)
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
  fprintf(file, "#Delete_file: file_id=%u\n", file_id);
}
unknown's avatar
unknown committed
2966
#endif /* MYSQL_CLIENT */
2967

unknown's avatar
unknown committed
2968
/*
2969
  Delete_file_log_event::pack_info()
unknown's avatar
unknown committed
2970
*/
2971

unknown's avatar
SCRUM  
unknown committed
2972
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2973
void Delete_file_log_event::pack_info(Protocol *protocol)
2974 2975 2976 2977
{
  char buf[64];
  uint length;
  length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id));
2978
  protocol->store(buf, (int32) length, &my_charset_bin);
2979
}
unknown's avatar
SCRUM  
unknown committed
2980
#endif
2981

unknown's avatar
unknown committed
2982
/*
2983
  Delete_file_log_event::exec_event()
unknown's avatar
unknown committed
2984
*/
2985

unknown's avatar
SCRUM  
unknown committed
2986
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2987 2988 2989 2990 2991 2992 2993 2994 2995 2996
int Delete_file_log_event::exec_event(struct st_relay_log_info* rli)
{
  char fname[FN_REFLEN+10];
  char *p= slave_load_file_stem(fname, file_id, server_id);
  memcpy(p, ".data", 6);
  (void) my_delete(fname, MYF(MY_WME));
  memcpy(p, ".info", 6);
  (void) my_delete(fname, MYF(MY_WME));
  return Log_event::exec_event(rli);
}
unknown's avatar
unknown committed
2997
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
2998 2999


unknown's avatar
unknown committed
3000
/**************************************************************************
unknown's avatar
unknown committed
3001
	Execute_load_log_event methods
unknown's avatar
unknown committed
3002
**************************************************************************/
3003

unknown's avatar
unknown committed
3004
/*
3005
  Execute_load_log_event ctor
unknown's avatar
unknown committed
3006
*/
3007 3008

#ifndef MYSQL_CLIENT  
unknown's avatar
unknown committed
3009 3010 3011
Execute_load_log_event::Execute_load_log_event(THD *thd_arg, const char* db_arg,
					       bool using_trans)
  :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
3012 3013
{
}
unknown's avatar
unknown committed
3014
#endif
3015 3016
  

unknown's avatar
unknown committed
3017
/*
3018
  Execute_load_log_event ctor
unknown's avatar
unknown committed
3019
*/
3020

unknown's avatar
unknown committed
3021 3022
Execute_load_log_event::Execute_load_log_event(const char* buf, int len)
  :Log_event(buf, 0), file_id(0)
3023 3024 3025 3026 3027 3028 3029
{
  if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
    return;
  file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET);
}


unknown's avatar
unknown committed
3030
/*
3031
  Execute_load_log_event::write_data()
unknown's avatar
unknown committed
3032
*/
3033 3034 3035 3036 3037 3038 3039 3040 3041

int Execute_load_log_event::write_data(IO_CACHE* file)
{
  byte buf[EXEC_LOAD_HEADER_LEN];
  int4store(buf + EL_FILE_ID_OFFSET, file_id);
  return my_b_safe_write(file, buf, EXEC_LOAD_HEADER_LEN);
}


unknown's avatar
unknown committed
3042
/*
3043
  Execute_load_log_event::print()
unknown's avatar
unknown committed
3044
*/
3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056

#ifdef MYSQL_CLIENT  
void Execute_load_log_event::print(FILE* file, bool short_form,
				   char* last_db)
{
  if (short_form)
    return;
  print_header(file);
  fputc('\n', file);
  fprintf(file, "#Exec_load: file_id=%d\n",
	  file_id);
}
unknown's avatar
unknown committed
3057
#endif
3058

unknown's avatar
unknown committed
3059
/*
3060
  Execute_load_log_event::pack_info()
unknown's avatar
unknown committed
3061
*/
3062

unknown's avatar
SCRUM  
unknown committed
3063
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
3064
void Execute_load_log_event::pack_info(Protocol *protocol)
3065 3066 3067 3068
{
  char buf[64];
  uint length;
  length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id));
3069
  protocol->store(buf, (int32) length, &my_charset_bin);
3070 3071 3072
}


unknown's avatar
unknown committed
3073
/*
3074
  Execute_load_log_event::exec_event()
unknown's avatar
unknown committed
3075
*/
unknown's avatar
SCRUM  
unknown committed
3076

3077
int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
3078 3079
{
  char fname[FN_REFLEN+10];
3080 3081
  char *p= slave_load_file_stem(fname, file_id, server_id);
  int fd;
3082 3083 3084
  int error = 1;
  IO_CACHE file;
  Load_log_event* lev = 0;
3085

3086 3087 3088 3089 3090
  memcpy(p, ".info", 6);
  if ((fd = my_open(fname, O_RDONLY|O_BINARY, MYF(MY_WME))) < 0 ||
      init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
		    MYF(MY_WME|MY_NABP)))
  {
unknown's avatar
unknown committed
3091
    slave_print_error(rli,my_errno, "Error in Exec_load event: could not open file '%s'", fname);
3092 3093
    goto err;
  }
3094 3095
  if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
							 (pthread_mutex_t*)0,
3096 3097
							 (bool)0)) ||
      lev->get_type_code() != NEW_LOAD_EVENT)
3098
  {
unknown's avatar
unknown committed
3099
    slave_print_error(rli,0, "Error in Exec_load event: file '%s' appears corrupted", fname);
3100 3101
    goto err;
  }
unknown's avatar
unknown committed
3102

3103
  lev->thd = thd;
3104 3105
  /*
    lev->exec_event should use rli only for errors
unknown's avatar
unknown committed
3106 3107 3108
    i.e. should not advance rli's position.
    lev->exec_event is the place where the table is loaded (it calls
    mysql_load()).
3109 3110
  */
  if (lev->exec_event(0,rli,1)) 
3111
  {
3112 3113 3114 3115 3116 3117 3118 3119 3120
    /*
      We want to indicate the name of the file that could not be loaded
      (SQL_LOADxxx).
      But as we are here we are sure the error is in rli->last_slave_error and
      rli->last_slave_errno (example of error: duplicate entry for key), so we
      don't want to overwrite it with the filename.
      What we want instead is add the filename to the current error message.
    */
    char *tmp= my_strdup(rli->last_slave_error,MYF(MY_WME));
3121 3122 3123 3124 3125 3126 3127 3128
    if (tmp)
    {
      slave_print_error(rli,
			rli->last_slave_errno, /* ok to re-use error code */
			"%s. Failed executing load from '%s'", 
			tmp, fname);
      my_free(tmp,MYF(0));
    }
3129 3130
    goto err;
  }
unknown's avatar
unknown committed
3131 3132 3133 3134 3135 3136 3137 3138 3139 3140
  /*
    We have an open file descriptor to the .info file; we need to close it
    or Windows will refuse to delete the file in my_delete().
  */
  if (fd >= 0)
  {
    my_close(fd, MYF(0));
    end_io_cache(&file);
    fd= -1;
  }
3141
  (void) my_delete(fname, MYF(MY_WME));
3142
  memcpy(p, ".data", 6);
3143
  (void) my_delete(fname, MYF(MY_WME));
3144
  error = 0;
3145

3146 3147 3148
err:
  delete lev;
  if (fd >= 0)
3149
  {
3150
    my_close(fd, MYF(0));
3151 3152
    end_io_cache(&file);
  }
3153
  return error ? error : Log_event::exec_event(rli);
3154
}
unknown's avatar
SCRUM  
unknown committed
3155

unknown's avatar
unknown committed
3156
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
3157 3158


unknown's avatar
unknown committed
3159
/**************************************************************************
unknown's avatar
unknown committed
3160
	sql_ex_info methods
unknown's avatar
unknown committed
3161
**************************************************************************/
3162

unknown's avatar
unknown committed
3163
/*
3164
  sql_ex_info::write_data()
unknown's avatar
unknown committed
3165
*/
3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192

int sql_ex_info::write_data(IO_CACHE* file)
{
  if (new_format())
  {
    return (write_str(file, field_term, field_term_len) ||
	    write_str(file, enclosed,   enclosed_len) ||
	    write_str(file, line_term,  line_term_len) ||
	    write_str(file, line_start, line_start_len) ||
	    write_str(file, escaped,    escaped_len) ||
	    my_b_safe_write(file,(byte*) &opt_flags,1));
  }
  else
  {
    old_sql_ex old_ex;
    old_ex.field_term= *field_term;
    old_ex.enclosed=   *enclosed;
    old_ex.line_term=  *line_term;
    old_ex.line_start= *line_start;
    old_ex.escaped=    *escaped;
    old_ex.opt_flags=  opt_flags;
    old_ex.empty_flags=empty_flags;
    return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex));
  }
}


unknown's avatar
unknown committed
3193
/*
3194
  sql_ex_info::init()
unknown's avatar
unknown committed
3195
*/
3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240

char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
{
  cached_new_format = use_new_format;
  if (use_new_format)
  {
    empty_flags=0;
    /*
      The code below assumes that buf will not disappear from
      under our feet during the lifetime of the event. This assumption
      holds true in the slave thread if the log is in new format, but is not
      the case when we have old format because we will be reusing net buffer
      to read the actual file before we write out the Create_file event.
    */
    if (read_str(buf, buf_end, field_term, field_term_len) ||
	read_str(buf, buf_end, enclosed,   enclosed_len) ||
	read_str(buf, buf_end, line_term,  line_term_len) ||
	read_str(buf, buf_end, line_start, line_start_len) ||
	read_str(buf, buf_end, escaped,	   escaped_len))
      return 0;
    opt_flags = *buf++;
  }
  else
  {
    field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
    field_term = buf++;			// Use first byte in string
    enclosed=	 buf++;
    line_term=   buf++;
    line_start=  buf++;
    escaped=     buf++;
    opt_flags =  *buf++;
    empty_flags= *buf++;
    if (empty_flags & FIELD_TERM_EMPTY)
      field_term_len=0;
    if (empty_flags & ENCLOSED_EMPTY)
      enclosed_len=0;
    if (empty_flags & LINE_TERM_EMPTY)
      line_term_len=0;
    if (empty_flags & LINE_START_EMPTY)
      line_start_len=0;
    if (empty_flags & ESCAPED_EMPTY)
      escaped_len=0;
  }
  return buf;
}