ha_ndbcluster_binlog.cc 142 KB
Newer Older
unknown's avatar
unknown committed
1 2 3 4
/* Copyright (C) 2000-2003 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
5
  the Free Software Foundation; version 2 of the License.
unknown's avatar
unknown committed
6 7 8 9 10 11 12 13 14 15 16 17

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
*/

#include "mysql_priv.h"
18
#include "sql_show.h"
unknown's avatar
unknown committed
19
#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
unknown's avatar
unknown committed
20 21 22 23
#include "ha_ndbcluster.h"

#ifdef HAVE_NDB_BINLOG
#include "rpl_injector.h"
24
#include "rpl_filter.h"
unknown's avatar
unknown committed
25 26
#include "slave.h"
#include "ha_ndbcluster_binlog.h"
27
#include "NdbDictionary.hpp"
unknown's avatar
unknown committed
28
#include "ndb_cluster_connection.hpp"
29
#include <util/NdbAutoPtr.hpp>
unknown's avatar
unknown committed
30

31 32 33 34 35
#ifdef ndb_dynamite
#undef assert
#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
#endif

unknown's avatar
unknown committed
36 37 38 39 40 41 42
/*
  defines for cluster replication table names
*/
#include "ha_ndbcluster_tables.h"
#define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
#define NDB_SCHEMA_TABLE_FILE "./" NDB_REP_DB "/" NDB_SCHEMA_TABLE

43 44 45 46 47 48
/*
  Timeout for syncing schema events between
  mysql servers, and between mysql server and the binlog
*/
const int opt_ndb_sync_timeout= 120;

unknown's avatar
unknown committed
49 50 51 52 53 54
/*
  Flag showing if the ndb injector thread is running, if so == 1
  -1 if it was started but later stopped for some reason
   0 if never started
*/
int ndb_binlog_thread_running= 0;
55 56 57 58 59
/*
  Flag showing if the ndb binlog should be created, if so == TRUE
  FALSE if not
*/
my_bool ndb_binlog_running= FALSE;
60
my_bool ndb_binlog_tables_inited= FALSE;
unknown's avatar
unknown committed
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82

/*
  Global reference to the ndb injector thread THD oject

  Has one sole purpose, for setting the in_use table member variable
  in get_share(...)
*/
THD *injector_thd= 0;

/*
  Global reference to ndb injector thd object.

  Used mainly by the binlog index thread, but exposed to the client sql
  thread for one reason; to setup the events operations for a table
  to enable ndb injector thread receiving events.

  Must therefore always be used with a surrounding
  pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation
*/
static Ndb *injector_ndb= 0;
static Ndb *schema_ndb= 0;

83
static int ndbcluster_binlog_inited= 0;
unknown's avatar
unknown committed
84 85 86 87 88 89 90 91 92 93 94 95 96 97
/*
  Flag "ndbcluster_binlog_terminating" set when shutting down mysqld.
  Server main loop should call handlerton function:

  ndbcluster_hton->binlog_func ==
  ndbcluster_binlog_func(...,BFN_BINLOG_END,...) ==
  ndbcluster_binlog_end

  at shutdown, which sets the flag. And then server needs to wait for it
  to complete.  Otherwise binlog will not be complete.

  ndbcluster_hton->panic == ndbcluster_end() will not return until
  ndb binlog is completed
*/
98
static int ndbcluster_binlog_terminating= 0;
99

unknown's avatar
unknown committed
100 101 102 103 104 105 106 107 108 109 110 111 112
/*
  Mutex and condition used for interacting between client sql thread
  and injector thread
*/
pthread_t ndb_binlog_thread;
pthread_mutex_t injector_mutex;
pthread_cond_t  injector_cond;

/* NDB Injector thread (used for binlog creation) */
static ulonglong ndb_latest_applied_binlog_epoch= 0;
static ulonglong ndb_latest_handled_binlog_epoch= 0;
static ulonglong ndb_latest_received_binlog_epoch= 0;

113 114
NDB_SHARE *ndb_apply_status_share= 0;
NDB_SHARE *ndb_schema_share= 0;
115
pthread_mutex_t ndb_schema_share_mutex;
unknown's avatar
unknown committed
116

117 118 119
extern my_bool opt_log_slave_updates;
static my_bool g_ndb_log_slave_updates;

120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
/* Schema object distribution handling */
HASH ndb_schema_objects;
typedef struct st_ndb_schema_object {
  pthread_mutex_t mutex;
  char *key;
  uint key_length;
  uint use_count;
  MY_BITMAP slock_bitmap;
  uint32 slock[256/32]; // 256 bits for lock status of table
} NDB_SCHEMA_OBJECT;
static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
                                                my_bool create_if_not_exists,
                                                my_bool have_lock);
static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
                                   bool have_lock);

unknown's avatar
unknown committed
136
static Uint64 *p_latest_trans_gci= 0;
unknown's avatar
unknown committed
137 138

/*
139
  Global variables for holding the ndb_binlog_index table reference
unknown's avatar
unknown committed
140
*/
141
static TABLE *ndb_binlog_index= 0;
unknown's avatar
unknown committed
142 143 144 145 146 147 148
static TABLE_LIST binlog_tables;

/*
  Helper functions
*/

#ifndef DBUG_OFF
149
/* purecov: begin deadcode */
150
static void print_records(TABLE *table, const uchar *record)
unknown's avatar
unknown committed
151
{
unknown's avatar
unknown committed
152
  for (uint j= 0; j < table->s->fields; j++)
unknown's avatar
unknown committed
153
  {
unknown's avatar
unknown committed
154 155 156
    char buf[40];
    int pos= 0;
    Field *field= table->field[j];
157
    const uchar* field_ptr= field->ptr - table->record[0] + record;
unknown's avatar
unknown committed
158 159 160 161
    int pack_len= field->pack_length();
    int n= pack_len < 10 ? pack_len : 10;

    for (int i= 0; i < n && pos < 20; i++)
unknown's avatar
unknown committed
162
    {
unknown's avatar
unknown committed
163
      pos+= sprintf(&buf[pos]," %x", (int) (uchar) field_ptr[i]);
unknown's avatar
unknown committed
164
    }
unknown's avatar
unknown committed
165 166
    buf[pos]= 0;
    DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
unknown's avatar
unknown committed
167 168
  }
}
169
/* purecov: end */
unknown's avatar
unknown committed
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
#else
#define print_records(a,b)
#endif


#ifndef DBUG_OFF
static void dbug_print_table(const char *info, TABLE *table)
{
  if (table == 0)
  {
    DBUG_PRINT("info",("%s: (null)", info));
    return;
  }
  DBUG_PRINT("info",
             ("%s: %s.%s s->fields: %d  "
unknown's avatar
unknown committed
185 186
              "reclength: %lu  rec_buff_length: %u  record[0]: 0x%lx  "
              "record[1]: 0x%lx",
unknown's avatar
unknown committed
187 188 189 190 191 192
              info,
              table->s->db.str,
              table->s->table_name.str,
              table->s->fields,
              table->s->reclength,
              table->s->rec_buff_length,
unknown's avatar
unknown committed
193 194
              (long) table->record[0],
              (long) table->record[1]));
unknown's avatar
unknown committed
195 196 197 198 199 200 201 202 203

  for (unsigned int i= 0; i < table->s->fields; i++) 
  {
    Field *f= table->field[i];
    DBUG_PRINT("info",
               ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d  pack_length: %d  "
                "ptr: 0x%lx[+%d]  null_bit: %u  null_ptr: 0x%lx[+%d]",
                i,
                f->field_name,
unknown's avatar
unknown committed
204
                (long) f->flags,
unknown's avatar
unknown committed
205 206 207 208 209 210 211 212
                (f->flags & PRI_KEY_FLAG)  ? "pri"       : "attr",
                (f->flags & NOT_NULL_FLAG) ? ""          : ",nullable",
                (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
                (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
                (f->flags & BLOB_FLAG)     ? ",blob"     : "",
                (f->flags & BINARY_FLAG)   ? ",binary"   : "",
                f->real_type(),
                f->pack_length(),
unknown's avatar
unknown committed
213
                (long) f->ptr, (int) (f->ptr - table->record[0]),
unknown's avatar
unknown committed
214
                f->null_bit,
unknown's avatar
unknown committed
215
                (long) f->null_ptr,
216
                (int) ((uchar*) f->null_ptr - table->record[0])));
unknown's avatar
unknown committed
217 218 219 220
    if (f->type() == MYSQL_TYPE_BIT)
    {
      Field_bit *g= (Field_bit*) f;
      DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d  bit_ptr: 0x%lx[+%d] "
unknown's avatar
unknown committed
221 222
                                   "bit_ofs: %d  bit_len: %u",
                                   g->field_length, (long) g->bit_ptr,
223
                                   (int) ((uchar*) g->bit_ptr -
unknown's avatar
unknown committed
224
                                          table->record[0]),
unknown's avatar
unknown committed
225 226 227 228 229 230 231 232 233 234 235 236 237
                                   g->bit_ofs, g->bit_len));
    }
  }
}
#else
#define dbug_print_table(a,b)
#endif


/*
  Run a query through mysql_parse

  Used to:
238 239
  - purging the ndb_binlog_index
  - creating the ndb_apply_status table
unknown's avatar
unknown committed
240 241
*/
static void run_query(THD *thd, char *buf, char *end,
unknown's avatar
unknown committed
242
                      const int *no_print_error, my_bool disable_binlog)
unknown's avatar
unknown committed
243
{
unknown's avatar
unknown committed
244 245
  ulong save_thd_query_length= thd->query_length;
  char *save_thd_query= thd->query;
246
  ulong save_thread_id= thd->variables.pseudo_thread_id;
unknown's avatar
unknown committed
247 248 249
  struct system_status_var save_thd_status_var= thd->status_var;
  THD_TRANS save_thd_transaction_all= thd->transaction.all;
  THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt;
unknown's avatar
unknown committed
250 251
  ulonglong save_thd_options= thd->options;
  DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options));
unknown's avatar
unknown committed
252
  NET save_thd_net= thd->net;
253
  const char* found_semicolon= NULL;
unknown's avatar
unknown committed
254 255

  bzero((char*) &thd->net, sizeof(NET));
Gleb Shchepa's avatar
Gleb Shchepa committed
256
  thd->set_query(buf, (uint) (end - buf));
unknown's avatar
unknown committed
257
  thd->variables.pseudo_thread_id= thread_id;
unknown's avatar
unknown committed
258
  thd->transaction.stmt.modified_non_trans_table= FALSE;
unknown's avatar
unknown committed
259 260 261 262
  if (disable_binlog)
    thd->options&= ~OPTION_BIN_LOG;
    
  DBUG_PRINT("query", ("%s", thd->query));
263 264 265 266

  DBUG_ASSERT(!thd->in_sub_stmt);
  DBUG_ASSERT(!thd->prelocked_mode);

267
  mysql_parse(thd, thd->query, thd->query_length, &found_semicolon);
unknown's avatar
unknown committed
268

269
  if (no_print_error && thd->is_slave_error)
unknown's avatar
unknown committed
270
  {
unknown's avatar
unknown committed
271 272 273
    int i;
    Thd_ndb *thd_ndb= get_thd_ndb(thd);
    for (i= 0; no_print_error[i]; i++)
274
      if ((thd_ndb->m_error_code == no_print_error[i]) ||
275
          (thd->main_da.sql_errno() == (unsigned) no_print_error[i]))
unknown's avatar
unknown committed
276 277 278
        break;
    if (!no_print_error[i])
      sql_print_error("NDB: %s: error %s %d(ndb: %d) %d %d",
279 280 281
                      buf,
                      thd->main_da.message(),
                      thd->main_da.sql_errno(),
282
                      thd_ndb->m_error_code,
283
                      (int) thd->is_error(), thd->is_slave_error);
unknown's avatar
unknown committed
284
  }
285
  close_thread_tables(thd);
286 287 288 289 290 291 292 293 294 295 296
  /*
    XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command()
    can not be called from within a statement, and
    run_query() can be called from anywhere, including from within
    a sub-statement.
    This particular reset is a temporary hack to avoid an assert
    for double assignment of the diagnostics area when run_query()
    is called from ndbcluster_reset_logs(), which is called from
    mysql_flush().
  */
  thd->main_da.reset_diagnostics_area();
unknown's avatar
unknown committed
297 298

  thd->options= save_thd_options;
Gleb Shchepa's avatar
Gleb Shchepa committed
299
  thd->set_query(save_thd_query, save_thd_query_length);
300
  thd->variables.pseudo_thread_id= save_thread_id;
unknown's avatar
unknown committed
301 302 303 304
  thd->status_var= save_thd_status_var;
  thd->transaction.all= save_thd_transaction_all;
  thd->transaction.stmt= save_thd_transaction_stmt;
  thd->net= save_thd_net;
unknown's avatar
unknown committed
305 306 307 308

  if (thd == injector_thd)
  {
    /*
309
      running the query will close all tables, including the ndb_binlog_index
unknown's avatar
unknown committed
310 311
      used in injector_thd
    */
312
    ndb_binlog_index= 0;
unknown's avatar
unknown committed
313 314 315
  }
}

316 317 318 319 320 321
static void
ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
{
  DBUG_ENTER("ndbcluster_binlog_close_table");
  if (share->table_share)
  {
322
    closefrm(share->table, 1);
323 324 325 326 327 328 329
    share->table_share= 0;
    share->table= 0;
  }
  DBUG_ASSERT(share->table == 0);
  DBUG_VOID_RETURN;
}

330 331 332 333 334 335 336 337

/*
  Creates a TABLE object for the ndb cluster table

  NOTES
    This does not open the underlying table
*/

338
static int
339
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
340 341
                             TABLE_SHARE *table_share, TABLE *table,
                             int reopen)
342 343 344 345
{
  int error;
  DBUG_ENTER("ndbcluster_binlog_open_table");
  
unknown's avatar
unknown committed
346
  safe_mutex_assert_owner(&LOCK_open);
347
  init_tmp_table_share(thd, table_share, share->db, 0, share->table_name, 
348 349 350
                       share->key);
  if ((error= open_table_def(thd, table_share, 0)))
  {
351
    DBUG_PRINT("error", ("open_table_def failed: %d my_errno: %d", error, my_errno));
unknown's avatar
unknown committed
352
    free_table_share(table_share);
353 354
    DBUG_RETURN(error);
  }
355
  if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */, 
356 357
                                    (uint) READ_ALL, 0, table, FALSE)))
  {
358
    DBUG_PRINT("error", ("open_table_from_share failed %d my_errno: %d", error, my_errno));
unknown's avatar
unknown committed
359
    free_table_share(table_share);
360 361
    DBUG_RETURN(error);
  }
unknown's avatar
unknown committed
362
  assign_new_table_id(table_share);
363 364 365 366

  if (!reopen)
  {
    // allocate memory on ndb share so it can be reused after online alter table
367 368 369 370
    (void)multi_alloc_root(&share->mem_root,
                           &(share->record[0]), table->s->rec_buff_length,
                           &(share->record[1]), table->s->rec_buff_length,
                           NULL);
371
  }
372
  {
373 374 375 376 377 378
    my_ptrdiff_t row_offset= share->record[0] - table->record[0];
    Field **p_field;
    for (p_field= table->field; *p_field; p_field++)
      (*p_field)->move_field_offset(row_offset);
    table->record[0]= share->record[0];
    table->record[1]= share->record[1];
379
  }
380

381 382 383 384 385 386 387
  table->in_use= injector_thd;
  
  table->s->db.str= share->db;
  table->s->db.length= strlen(share->db);
  table->s->table_name.str= share->table_name;
  table->s->table_name.length= strlen(share->table_name);
  
388
  DBUG_ASSERT(share->table_share == 0);
389
  share->table_share= table_share;
390
  DBUG_ASSERT(share->table == 0);
391
  share->table= table;
392 393
  /* We can't use 'use_all_columns()' as the file object is not setup yet */
  table->column_bitmaps_set_no_signal(&table->s->all_set, &table->s->all_set);
394 395 396 397 398 399 400
#ifndef DBUG_OFF
  dbug_print_table("table", table);
#endif
  DBUG_RETURN(0);
}


unknown's avatar
unknown committed
401 402 403
/*
  Initialize the binlog part of the NDB_SHARE
*/
404
int ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
unknown's avatar
unknown committed
405 406 407
{
  THD *thd= current_thd;
  MEM_ROOT *mem_root= &share->mem_root;
408
  int do_event_op= ndb_binlog_running;
409
  int error= 0;
unknown's avatar
unknown committed
410
  DBUG_ENTER("ndbcluster_binlog_init_share");
unknown's avatar
unknown committed
411

412 413
  share->connect_count= g_ndb_cluster_connection->get_connect_count();

unknown's avatar
unknown committed
414 415
  share->op= 0;
  share->table= 0;
416

417
  if (!ndb_schema_share &&
418 419 420
      strcmp(share->db, NDB_REP_DB) == 0 &&
      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
    do_event_op= 1;
421 422 423 424
  else if (!ndb_apply_status_share &&
           strcmp(share->db, NDB_REP_DB) == 0 &&
           strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
    do_event_op= 1;
425 426 427 428 429 430 431 432 433

  {
    int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
    share->subscriber_bitmap= (MY_BITMAP*)
      alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
    for (i= 0; i < no_nodes; i++)
    {
      bitmap_init(&share->subscriber_bitmap[i],
                  (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
434
                  max_ndb_nodes, FALSE);
435 436 437 438 439
      bitmap_clear_all(&share->subscriber_bitmap[i]);
    }
  }

  if (!do_event_op)
unknown's avatar
unknown committed
440
  {
441 442 443 444 445 446 447 448 449 450 451
    if (_table)
    {
      if (_table->s->primary_key == MAX_KEY)
        share->flags|= NSF_HIDDEN_PK;
      if (_table->s->blob_fields != 0)
        share->flags|= NSF_BLOB_FLAG;
    }
    else
    {
      share->flags|= NSF_NO_BINLOG;
    }
452
    DBUG_RETURN(error);
unknown's avatar
unknown committed
453 454 455
  }
  while (1) 
  {
456
    int error;
457 458
    TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share));
    TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table));
459
    if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 0)))
unknown's avatar
unknown committed
460
      break;
461 462 463 464 465 466 467 468 469 470 471 472
    /*
      ! do not touch the contents of the table
      it may be in use by the injector thread
    */
    MEM_ROOT *mem_root= &share->mem_root;
    share->ndb_value[0]= (NdbValue*)
      alloc_root(mem_root, sizeof(NdbValue) *
                 (table->s->fields + 2 /*extra for hidden key and part key*/));
    share->ndb_value[1]= (NdbValue*)
      alloc_root(mem_root, sizeof(NdbValue) *
                 (table->s->fields + 2 /*extra for hidden key and part key*/));

unknown's avatar
unknown committed
473 474
    if (table->s->primary_key == MAX_KEY)
      share->flags|= NSF_HIDDEN_PK;
475 476
    if (table->s->blob_fields != 0)
      share->flags|= NSF_BLOB_FLAG;
unknown's avatar
unknown committed
477 478
    break;
  }
479
  DBUG_RETURN(error);
unknown's avatar
unknown committed
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495
}

/*****************************************************************
  functions called from master sql client threads
****************************************************************/

/*
  called in mysql_show_binlog_events and reset_logs to make sure we wait for
  all events originating from this mysql server to arrive in the binlog

  Wait for the last epoch in which the last transaction is a part of.

  Wait a maximum of 30 seconds.
*/
static void ndbcluster_binlog_wait(THD *thd)
{
496
  if (ndb_binlog_running)
unknown's avatar
unknown committed
497 498 499
  {
    DBUG_ENTER("ndbcluster_binlog_wait");
    const char *save_info= thd ? thd->proc_info : 0;
unknown's avatar
unknown committed
500
    ulonglong wait_epoch= *p_latest_trans_gci;
unknown's avatar
unknown committed
501 502 503 504
    int count= 30;
    if (thd)
      thd->proc_info= "Waiting for ndbcluster binlog update to "
	"reach current position";
505
    while (count && ndb_binlog_running &&
unknown's avatar
unknown committed
506 507 508 509 510 511 512 513 514 515 516 517
           ndb_latest_handled_binlog_epoch < wait_epoch)
    {
      count--;
      sleep(1);
    }
    if (thd)
      thd->proc_info= save_info;
    DBUG_VOID_RETURN;
  }
}

/*
518
 Called from MYSQL_BIN_LOG::reset_logs in log.cc when binlog is emptied
unknown's avatar
unknown committed
519 520 521
*/
static int ndbcluster_reset_logs(THD *thd)
{
522
  if (!ndb_binlog_running)
unknown's avatar
unknown committed
523 524 525 526 527 528 529 530 531 532 533 534 535
    return 0;

  DBUG_ENTER("ndbcluster_reset_logs");

  /*
    Wait for all events orifinating from this mysql server has
    reached the binlog before continuing to reset
  */
  ndbcluster_binlog_wait(thd);

  char buf[1024];
  char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_REP_TABLE);

unknown's avatar
unknown committed
536
  run_query(thd, buf, end, NULL, TRUE);
unknown's avatar
unknown committed
537 538 539 540 541

  DBUG_RETURN(0);
}

/*
542
  Called from MYSQL_BIN_LOG::purge_logs in log.cc when the binlog "file"
unknown's avatar
unknown committed
543 544 545 546 547 548
  is removed
*/

static int
ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
{
549
  if (!ndb_binlog_running || thd->slave_thread)
unknown's avatar
unknown committed
550 551 552 553 554 555 556 557 558 559 560
    return 0;

  DBUG_ENTER("ndbcluster_binlog_index_purge_file");
  DBUG_PRINT("enter", ("file: %s", file));

  char buf[1024];
  char *end= strmov(strmov(strmov(buf,
                                  "DELETE FROM "
                                  NDB_REP_DB "." NDB_REP_TABLE
                                  " WHERE File='"), file), "'");

unknown's avatar
unknown committed
561
  run_query(thd, buf, end, NULL, TRUE);
unknown's avatar
unknown committed
562 563 564 565 566

  DBUG_RETURN(0);
}

static void
567
ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binlog_command,
unknown's avatar
unknown committed
568 569 570 571 572 573
                            const char *query, uint query_length,
                            const char *db, const char *table_name)
{
  DBUG_ENTER("ndbcluster_binlog_log_query");
  DBUG_PRINT("enter", ("db: %s  table_name: %s  query: %s",
                       db, table_name, query));
574 575 576 577 578 579
  enum SCHEMA_OP_TYPE type;
  int log= 0;
  switch (binlog_command)
  {
  case LOGCOM_CREATE_TABLE:
    type= SOT_CREATE_TABLE;
580
    DBUG_ASSERT(FALSE);
581 582 583
    break;
  case LOGCOM_ALTER_TABLE:
    type= SOT_ALTER_TABLE;
584
    log= 1;
585 586 587
    break;
  case LOGCOM_RENAME_TABLE:
    type= SOT_RENAME_TABLE;
588
    DBUG_ASSERT(FALSE);
589 590 591
    break;
  case LOGCOM_DROP_TABLE:
    type= SOT_DROP_TABLE;
592
    DBUG_ASSERT(FALSE);
593 594 595 596 597 598 599 600 601 602 603
    break;
  case LOGCOM_CREATE_DB:
    type= SOT_CREATE_DB;
    log= 1;
    break;
  case LOGCOM_ALTER_DB:
    type= SOT_ALTER_DB;
    log= 1;
    break;
  case LOGCOM_DROP_DB:
    type= SOT_DROP_DB;
604
    DBUG_ASSERT(FALSE);
605 606 607
    break;
  }
  if (log)
608
  {
609
    ndbcluster_log_schema_op(thd, 0, query, query_length,
610 611
                             db, table_name, 0, 0, type,
                             0, 0, 0);
612
  }
unknown's avatar
unknown committed
613 614 615
  DBUG_VOID_RETURN;
}

616

unknown's avatar
unknown committed
617
/*
618 619
  End use of the NDB Cluster binlog
   - wait for binlog thread to shutdown
unknown's avatar
unknown committed
620 621 622 623
*/

static int ndbcluster_binlog_end(THD *thd)
{
624
  DBUG_ENTER("ndbcluster_binlog_end");
unknown's avatar
unknown committed
625

626
  if (!ndbcluster_binlog_inited)
unknown's avatar
unknown committed
627
    DBUG_RETURN(0);
628
  ndbcluster_binlog_inited= 0;
unknown's avatar
unknown committed
629 630

#ifdef HAVE_NDB_BINLOG
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
  if (ndb_util_thread_running > 0)
  {
    /*
      Wait for util thread to die (as this uses the injector mutex)
      There is a very small change that ndb_util_thread dies and the
      following mutex is freed before it's accessed. This shouldn't
      however be a likely case as the ndbcluster_binlog_end is supposed to
      be called before ndb_cluster_end().
    */
    pthread_mutex_lock(&LOCK_ndb_util_thread);
    /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
    ndb_util_thread_running++;
    ndbcluster_terminating= 1;
    pthread_cond_signal(&COND_ndb_util_thread);
    while (ndb_util_thread_running > 1)
      pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
    ndb_util_thread_running--;
    pthread_mutex_unlock(&LOCK_ndb_util_thread);
  }

unknown's avatar
unknown committed
651
  /* wait for injector thread to finish */
652
  ndbcluster_binlog_terminating= 1;
653
  pthread_mutex_lock(&injector_mutex);
654
  pthread_cond_signal(&injector_cond);
655 656
  while (ndb_binlog_thread_running > 0)
    pthread_cond_wait(&injector_cond, &injector_mutex);
657 658 659 660
  pthread_mutex_unlock(&injector_mutex);

  pthread_mutex_destroy(&injector_mutex);
  pthread_cond_destroy(&injector_cond);
661
  pthread_mutex_destroy(&ndb_schema_share_mutex);
unknown's avatar
unknown committed
662
#endif
663

unknown's avatar
unknown committed
664 665 666 667 668 669 670 671
  DBUG_RETURN(0);
}

/*****************************************************************
  functions called from slave sql client threads
****************************************************************/
static void ndbcluster_reset_slave(THD *thd)
{
672
  if (!ndb_binlog_running)
unknown's avatar
unknown committed
673 674 675 676 677
    return;

  DBUG_ENTER("ndbcluster_reset_slave");
  char buf[1024];
  char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
unknown's avatar
unknown committed
678
  run_query(thd, buf, end, NULL, TRUE);
unknown's avatar
unknown committed
679 680 681 682 683 684
  DBUG_VOID_RETURN;
}

/*
  Initialize the binlog part of the ndb handlerton
*/
685 686 687 688 689 690 691 692 693 694 695 696

/**
  Upon the sql command flush logs, we need to ensure that all outstanding
  ndb data to be logged has made it to the binary log to get a deterministic
  behavior on the rotation of the log.
 */
static bool ndbcluster_flush_logs(handlerton *hton)
{
  ndbcluster_binlog_wait(current_thd);
  return FALSE;
}

697 698 699
static int ndbcluster_binlog_func(handlerton *hton, THD *thd, 
                                  enum_binlog_func fn, 
                                  void *arg)
unknown's avatar
unknown committed
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723
{
  switch(fn)
  {
  case BFN_RESET_LOGS:
    ndbcluster_reset_logs(thd);
    break;
  case BFN_RESET_SLAVE:
    ndbcluster_reset_slave(thd);
    break;
  case BFN_BINLOG_WAIT:
    ndbcluster_binlog_wait(thd);
    break;
  case BFN_BINLOG_END:
    ndbcluster_binlog_end(thd);
    break;
  case BFN_BINLOG_PURGE_FILE:
    ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
    break;
  }
  return 0;
}

void ndbcluster_binlog_init_handlerton()
{
724
  handlerton *h= ndbcluster_hton;
725
  h->flush_logs=       ndbcluster_flush_logs;
726 727
  h->binlog_func=      ndbcluster_binlog_func;
  h->binlog_log_query= ndbcluster_binlog_log_query;
unknown's avatar
unknown committed
728 729 730 731 732 733 734
}





/*
735
  check the availability af the ndb_apply_status share
unknown's avatar
unknown committed
736 737 738
  - return share, but do not increase refcount
  - return 0 if there is no share
*/
739
static NDB_SHARE *ndbcluster_check_ndb_apply_status_share()
unknown's avatar
unknown committed
740 741 742 743
{
  pthread_mutex_lock(&ndbcluster_mutex);

  void *share= hash_search(&ndbcluster_open_tables, 
744
                           (uchar*) NDB_APPLY_TABLE_FILE,
unknown's avatar
unknown committed
745
                           sizeof(NDB_APPLY_TABLE_FILE) - 1);
746
  DBUG_PRINT("info",("ndbcluster_check_ndb_apply_status_share %s 0x%lx",
unknown's avatar
unknown committed
747
                     NDB_APPLY_TABLE_FILE, (long) share));
unknown's avatar
unknown committed
748 749 750 751 752
  pthread_mutex_unlock(&ndbcluster_mutex);
  return (NDB_SHARE*) share;
}

/*
753
  check the availability af the schema share
unknown's avatar
unknown committed
754 755 756
  - return share, but do not increase refcount
  - return 0 if there is no share
*/
757
static NDB_SHARE *ndbcluster_check_ndb_schema_share()
unknown's avatar
unknown committed
758 759 760 761
{
  pthread_mutex_lock(&ndbcluster_mutex);

  void *share= hash_search(&ndbcluster_open_tables, 
762
                           (uchar*) NDB_SCHEMA_TABLE_FILE,
unknown's avatar
unknown committed
763
                           sizeof(NDB_SCHEMA_TABLE_FILE) - 1);
764
  DBUG_PRINT("info",("ndbcluster_check_ndb_schema_share %s 0x%lx",
unknown's avatar
unknown committed
765
                     NDB_SCHEMA_TABLE_FILE, (long) share));
unknown's avatar
unknown committed
766 767 768 769 770
  pthread_mutex_unlock(&ndbcluster_mutex);
  return (NDB_SHARE*) share;
}

/*
771
  Create the ndb_apply_status table
unknown's avatar
unknown committed
772
*/
773
static int ndbcluster_create_ndb_apply_status_table(THD *thd)
unknown's avatar
unknown committed
774
{
775
  DBUG_ENTER("ndbcluster_create_ndb_apply_status_table");
unknown's avatar
unknown committed
776 777 778 779 780 781 782

  /*
    Check if we already have the apply status table.
    If so it should have been discovered at startup
    and thus have a share
  */

783
  if (ndbcluster_check_ndb_apply_status_share())
unknown's avatar
unknown committed
784 785 786 787 788
    DBUG_RETURN(0);

  if (g_ndb_cluster_connection->get_no_ready() <= 0)
    DBUG_RETURN(0);

789
  char buf[1024 + 1], *end;
unknown's avatar
unknown committed
790 791 792 793 794 795 796 797 798

  if (ndb_extra_logging)
    sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE);

  /*
    Check if apply status table exists in MySQL "dictionary"
    if so, remove it since there is none in Ndb
  */
  {
799
    build_table_filename(buf, sizeof(buf) - 1,
800
                         NDB_REP_DB, NDB_APPLY_TABLE, reg_ext, 0);
unknown's avatar
unknown committed
801 802 803 804 805 806 807 808 809 810
    my_delete(buf, MYF(0));
  }

  /*
    Note, updating this table schema must be reflected in ndb_restore
  */
  end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
                   NDB_REP_DB "." NDB_APPLY_TABLE
                   " ( server_id INT UNSIGNED NOT NULL,"
                   " epoch BIGINT UNSIGNED NOT NULL, "
811 812 813
                   " log_name VARCHAR(255) BINARY NOT NULL, "
                   " start_pos BIGINT UNSIGNED NOT NULL, "
                   " end_pos BIGINT UNSIGNED NOT NULL, "
814
                   " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB CHARACTER SET latin1");
unknown's avatar
unknown committed
815

816
  const int no_print_error[6]= {ER_TABLE_EXISTS_ERROR,
817
                                701,
unknown's avatar
unknown committed
818
                                702,
819
                                721, // Table already exist
820 821
                                4009,
                                0}; // do not print error 701 etc
unknown's avatar
unknown committed
822
  run_query(thd, buf, end, no_print_error, TRUE);
unknown's avatar
unknown committed
823 824 825 826 827 828

  DBUG_RETURN(0);
}


/*
829
  Create the schema table
unknown's avatar
unknown committed
830 831 832 833 834 835 836 837 838 839 840
*/
static int ndbcluster_create_schema_table(THD *thd)
{
  DBUG_ENTER("ndbcluster_create_schema_table");

  /*
    Check if we already have the schema table.
    If so it should have been discovered at startup
    and thus have a share
  */

841
  if (ndbcluster_check_ndb_schema_share())
unknown's avatar
unknown committed
842 843 844 845 846
    DBUG_RETURN(0);

  if (g_ndb_cluster_connection->get_no_ready() <= 0)
    DBUG_RETURN(0);

847
  char buf[1024 + 1], *end;
unknown's avatar
unknown committed
848 849 850 851 852 853 854 855 856

  if (ndb_extra_logging)
    sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_SCHEMA_TABLE);

  /*
    Check if schema table exists in MySQL "dictionary"
    if so, remove it since there is none in Ndb
  */
  {
857
    build_table_filename(buf, sizeof(buf) - 1,
858
                         NDB_REP_DB, NDB_SCHEMA_TABLE, reg_ext, 0);
unknown's avatar
unknown committed
859 860 861 862 863 864 865 866
    my_delete(buf, MYF(0));
  }

  /*
    Update the defines below to reflect the table schema
  */
  end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
                   NDB_REP_DB "." NDB_SCHEMA_TABLE
867 868
                   " ( db VARBINARY(63) NOT NULL,"
                   " name VARBINARY(63) NOT NULL,"
unknown's avatar
unknown committed
869
                   " slock BINARY(32) NOT NULL,"
870
                   " query BLOB NOT NULL,"
unknown's avatar
unknown committed
871 872 873 874 875
                   " node_id INT UNSIGNED NOT NULL,"
                   " epoch BIGINT UNSIGNED NOT NULL,"
                   " id INT UNSIGNED NOT NULL,"
                   " version INT UNSIGNED NOT NULL,"
                   " type INT UNSIGNED NOT NULL,"
876
                   " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB CHARACTER SET latin1");
unknown's avatar
unknown committed
877

878
  const int no_print_error[6]= {ER_TABLE_EXISTS_ERROR,
879
                                701,
unknown's avatar
unknown committed
880
                                702,
881
                                721, // Table already exist
882 883
                                4009,
                                0}; // do not print error 701 etc
unknown's avatar
unknown committed
884
  run_query(thd, buf, end, no_print_error, TRUE);
unknown's avatar
unknown committed
885 886 887 888

  DBUG_RETURN(0);
}

889
int ndbcluster_setup_binlog_table_shares(THD *thd)
unknown's avatar
unknown committed
890
{
891 892
  if (!ndb_schema_share &&
      ndbcluster_check_ndb_schema_share() == 0)
unknown's avatar
unknown committed
893
  {
894 895 896
    pthread_mutex_lock(&LOCK_open);
    ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
    pthread_mutex_unlock(&LOCK_open);
897
    if (!ndb_schema_share)
unknown's avatar
unknown committed
898
    {
899 900
      ndbcluster_create_schema_table(thd);
      // always make sure we create the 'schema' first
901
      if (!ndb_schema_share)
902
        return 1;
unknown's avatar
unknown committed
903 904
    }
  }
905 906
  if (!ndb_apply_status_share &&
      ndbcluster_check_ndb_apply_status_share() == 0)
unknown's avatar
unknown committed
907
  {
908 909 910
    pthread_mutex_lock(&LOCK_open);
    ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
    pthread_mutex_unlock(&LOCK_open);
911
    if (!ndb_apply_status_share)
912
    {
913 914
      ndbcluster_create_ndb_apply_status_table(thd);
      if (!ndb_apply_status_share)
915 916 917 918 919 920 921
        return 1;
    }
  }
  if (!ndbcluster_find_all_files(thd))
  {
    pthread_mutex_lock(&LOCK_open);
    ndb_binlog_tables_inited= TRUE;
unknown's avatar
unknown committed
922 923 924
    if (ndb_extra_logging)
      sql_print_information("NDB Binlog: ndb tables writable");
    close_cached_tables(NULL, NULL, TRUE, FALSE, FALSE);
925 926 927
    pthread_mutex_unlock(&LOCK_open);
    /* Signal injector thread that all is setup */
    pthread_cond_signal(&injector_cond);
unknown's avatar
unknown committed
928
  }
929
  return 0;
unknown's avatar
unknown committed
930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947
}

/*
  Defines and struct for schema table.
  Should reflect table definition above.
*/
#define SCHEMA_DB_I 0u
#define SCHEMA_NAME_I 1u
#define SCHEMA_SLOCK_I 2u
#define SCHEMA_QUERY_I 3u
#define SCHEMA_NODE_ID_I 4u
#define SCHEMA_EPOCH_I 5u
#define SCHEMA_ID_I 6u
#define SCHEMA_VERSION_I 7u
#define SCHEMA_TYPE_I 8u
#define SCHEMA_SIZE 9u
#define SCHEMA_SLOCK_SIZE 32u

948
struct Cluster_schema
unknown's avatar
unknown committed
949
{
unknown's avatar
unknown committed
950
  uchar db_length;
unknown's avatar
unknown committed
951
  char db[64];
unknown's avatar
unknown committed
952
  uchar name_length;
unknown's avatar
unknown committed
953
  char name[64];
unknown's avatar
unknown committed
954
  uchar slock_length;
unknown's avatar
unknown committed
955 956
  uint32 slock[SCHEMA_SLOCK_SIZE/4];
  unsigned short query_length;
957
  char *query;
unknown's avatar
unknown committed
958 959 960 961 962
  Uint64 epoch;
  uint32 node_id;
  uint32 id;
  uint32 version;
  uint32 type;
963
  uint32 any_value;
unknown's avatar
unknown committed
964 965 966 967 968
};

/*
  Transfer schema table data into corresponding struct
*/
969
static void ndbcluster_get_schema(NDB_SHARE *share,
970
                                  Cluster_schema *s)
unknown's avatar
unknown committed
971
{
972
  TABLE *table= share->table;
unknown's avatar
unknown committed
973
  Field **field;
974
  /* unpack blob values */
975
  uchar* blobs_buffer= 0;
976
  uint blobs_buffer_size= 0;
977
  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
978 979 980 981 982 983 984 985 986
  {
    ptrdiff_t ptrdiff= 0;
    int ret= get_ndb_blobs_value(table, share->ndb_value[0],
                                 blobs_buffer, blobs_buffer_size,
                                 ptrdiff);
    if (ret != 0)
    {
      my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
      DBUG_PRINT("info", ("blob read error"));
987
      DBUG_ASSERT(FALSE);
988 989
    }
  }
990
  /* db varchar 1 length uchar */
unknown's avatar
unknown committed
991 992 993 994 995 996
  field= table->field;
  s->db_length= *(uint8*)(*field)->ptr;
  DBUG_ASSERT(s->db_length <= (*field)->field_length);
  DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
  memcpy(s->db, (*field)->ptr + 1, s->db_length);
  s->db[s->db_length]= 0;
997
  /* name varchar 1 length uchar */
unknown's avatar
unknown committed
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
  field++;
  s->name_length= *(uint8*)(*field)->ptr;
  DBUG_ASSERT(s->name_length <= (*field)->field_length);
  DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
  memcpy(s->name, (*field)->ptr + 1, s->name_length);
  s->name[s->name_length]= 0;
  /* slock fixed length */
  field++;
  s->slock_length= (*field)->field_length;
  DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
  memcpy(s->slock, (*field)->ptr, s->slock_length);
1009
  /* query blob */
unknown's avatar
unknown committed
1010
  field++;
1011 1012 1013
  {
    Field_blob *field_blob= (Field_blob*)(*field);
    uint blob_len= field_blob->get_length((*field)->ptr);
1014
    uchar *blob_ptr= 0;
1015
    field_blob->get_ptr(&blob_ptr);
1016
    DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
1017
    s->query_length= blob_len;
1018
    s->query= sql_strmake((char*) blob_ptr, blob_len);
1019
  }
unknown's avatar
unknown committed
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
  /* node_id */
  field++;
  s->node_id= ((Field_long *)*field)->val_int();
  /* epoch */
  field++;
  s->epoch= ((Field_long *)*field)->val_int();
  /* id */
  field++;
  s->id= ((Field_long *)*field)->val_int();
  /* version */
  field++;
  s->version= ((Field_long *)*field)->val_int();
  /* type */
  field++;
  s->type= ((Field_long *)*field)->val_int();
1035 1036
  /* free blobs buffer */
  my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
1037
  dbug_tmp_restore_column_map(table->read_set, old_map);
unknown's avatar
unknown committed
1038 1039 1040 1041 1042
}

/*
  helper function to pack a ndb varchar
*/
1043 1044
char *ndb_pack_varchar(const NDBCOL *col, char *buf,
                       const char *str, int sz)
unknown's avatar
unknown committed
1045 1046 1047 1048 1049 1050 1051
{
  switch (col->getArrayType())
  {
    case NDBCOL::ArrayTypeFixed:
      memcpy(buf, str, sz);
      break;
    case NDBCOL::ArrayTypeShortVar:
unknown's avatar
unknown committed
1052
      *(uchar*)buf= (uchar)sz;
unknown's avatar
unknown committed
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062
      memcpy(buf + 1, str, sz);
      break;
    case NDBCOL::ArrayTypeMediumVar:
      int2store(buf, sz);
      memcpy(buf + 2, str, sz);
      break;
  }
  return buf;
}

1063 1064 1065 1066 1067 1068 1069 1070 1071
/*
  acknowledge handling of schema operation
*/
static int
ndbcluster_update_slock(THD *thd,
                        const char *db,
                        const char *table_name)
{
  DBUG_ENTER("ndbcluster_update_slock");
1072
  if (!ndb_schema_share)
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
  {
    DBUG_RETURN(0);
  }

  const NdbError *ndb_error= 0;
  uint32 node_id= g_ndb_cluster_connection->node_id();
  Ndb *ndb= check_ndb_in_thd(thd);
  char save_db[FN_HEADLEN];
  strcpy(save_db, ndb->getDatabaseName());

  char tmp_buf[FN_REFLEN];
  NDBDICT *dict= ndb->getDictionary();
  ndb->setDatabaseName(NDB_REP_DB);
  Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
  const NDBTAB *ndbtab= ndbtab_g.get_table();
  NdbTransaction *trans= 0;
  int retries= 100;
1090
  int retry_sleep= 10; /* 10 milliseconds, transaction */
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191
  const NDBCOL *col[SCHEMA_SIZE];
  unsigned sz[SCHEMA_SIZE];

  MY_BITMAP slock;
  uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
  bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);

  if (ndbtab == 0)
  {
    abort();
    DBUG_RETURN(0);
  }

  {
    uint i;
    for (i= 0; i < SCHEMA_SIZE; i++)
    {
      col[i]= ndbtab->getColumn(i);
      if (i != SCHEMA_QUERY_I)
      {
        sz[i]= col[i]->getLength();
        DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
      }
    }
  }

  while (1)
  {
    if ((trans= ndb->startTransaction()) == 0)
      goto err;
    {
      NdbOperation *op= 0;
      int r= 0;

      /* read the bitmap exlusive */
      r|= (op= trans->getNdbOperation(ndbtab)) == 0;
      DBUG_ASSERT(r == 0);
      r|= op->readTupleExclusive();
      DBUG_ASSERT(r == 0);
    
      /* db */
      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
      r|= op->equal(SCHEMA_DB_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* name */
      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
                       strlen(table_name));
      r|= op->equal(SCHEMA_NAME_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* slock */
      r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
      DBUG_ASSERT(r == 0);
    }
    if (trans->execute(NdbTransaction::NoCommit))
      goto err;
    bitmap_clear_bit(&slock, node_id);
    {
      NdbOperation *op= 0;
      int r= 0;

      /* now update the tuple */
      r|= (op= trans->getNdbOperation(ndbtab)) == 0;
      DBUG_ASSERT(r == 0);
      r|= op->updateTuple();
      DBUG_ASSERT(r == 0);

      /* db */
      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
      r|= op->equal(SCHEMA_DB_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* name */
      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
                       strlen(table_name));
      r|= op->equal(SCHEMA_NAME_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* slock */
      r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
      DBUG_ASSERT(r == 0);
      /* node_id */
      r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
      DBUG_ASSERT(r == 0);
      /* type */
      r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
      DBUG_ASSERT(r == 0);
    }
    if (trans->execute(NdbTransaction::Commit) == 0)
    {
      dict->forceGCPWait();
      DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
                          node_id, db, table_name));
      break;
    }
  err:
    const NdbError *this_error= trans ?
      &trans->getNdbError() : &ndb->getNdbError();
    if (this_error->status == NdbError::TemporaryError)
    {
      if (retries--)
      {
        if (trans)
          ndb->closeTransaction(trans);
1192
        my_sleep(retry_sleep);
1193 1194 1195 1196 1197 1198
        continue; // retry
      }
    }
    ndb_error= this_error;
    break;
  }
unknown's avatar
unknown committed
1199

1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212
  if (ndb_error)
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                        ndb_error->code,
                        ndb_error->message,
                        "Could not release lock on '%s.%s'",
                        db, table_name);
  if (trans)
    ndb->closeTransaction(trans);
  ndb->setDatabaseName(save_db);
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1213 1214 1215
/*
  log query in schema table
*/
1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240
static void ndb_report_waiting(const char *key,
                               int the_time,
                               const char *op,
                               const char *obj)
{
  ulonglong ndb_latest_epoch= 0;
  const char *proc_info= "<no info>";
  pthread_mutex_lock(&injector_mutex);
  if (injector_ndb)
    ndb_latest_epoch= injector_ndb->getLatestGCI();
  if (injector_thd)
    proc_info= injector_thd->proc_info;
  pthread_mutex_unlock(&injector_mutex);
  sql_print_information("NDB %s:"
                        " waiting max %u sec for %s %s."
                        "  epochs: (%u,%u,%u)"
                        "  injector proc_info: %s"
                        ,key, the_time, op, obj
                        ,(uint)ndb_latest_handled_binlog_epoch
                        ,(uint)ndb_latest_received_binlog_epoch
                        ,(uint)ndb_latest_epoch
                        ,proc_info
                        );
}

unknown's avatar
unknown committed
1241 1242 1243 1244 1245
int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
                             const char *query, int query_length,
                             const char *db, const char *table_name,
                             uint32 ndb_table_id,
                             uint32 ndb_table_version,
1246
                             enum SCHEMA_OP_TYPE type,
1247 1248
                             const char *new_db, const char *new_table_name,
                             int have_lock_open)
unknown's avatar
unknown committed
1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264
{
  DBUG_ENTER("ndbcluster_log_schema_op");
  Thd_ndb *thd_ndb= get_thd_ndb(thd);
  if (!thd_ndb)
  {
    if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
    {
      sql_print_error("Could not allocate Thd_ndb object");
      DBUG_RETURN(1);
    }
    set_thd_ndb(thd, thd_ndb);
  }

  DBUG_PRINT("enter",
             ("query: %s  db: %s  table_name: %s  thd_ndb->options: %d",
              query, db, table_name, thd_ndb->options));
1265
  if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
unknown's avatar
unknown committed
1266 1267 1268 1269 1270
  {
    DBUG_RETURN(0);
  }

  char tmp_buf2[FN_REFLEN];
1271
  const char *type_str;
unknown's avatar
unknown committed
1272 1273 1274 1275 1276 1277 1278 1279 1280
  switch (type)
  {
  case SOT_DROP_TABLE:
    /* drop database command, do not log at drop table */
    if (thd->lex->sql_command ==  SQLCOM_DROP_DB)
      DBUG_RETURN(0);
    /* redo the drop table query as is may contain several tables */
    query= tmp_buf2;
    query_length= (uint) (strxmov(tmp_buf2, "drop table `",
1281
                                  table_name, "`", NullS) - tmp_buf2);
1282
    type_str= "drop table";
1283
    break;
unknown's avatar
unknown committed
1284
  case SOT_RENAME_TABLE:
1285 1286 1287
    /* redo the rename table query as is may contain several tables */
    query= tmp_buf2;
    query_length= (uint) (strxmov(tmp_buf2, "rename table `",
1288 1289
                                  db, ".", table_name, "` to `",
                                  new_db, ".", new_table_name, "`", NullS) - tmp_buf2);
1290
    type_str= "rename table";
1291 1292
    break;
  case SOT_CREATE_TABLE:
1293 1294
    type_str= "create table";
    break;
unknown's avatar
unknown committed
1295
  case SOT_ALTER_TABLE:
1296
    type_str= "alter table";
unknown's avatar
unknown committed
1297 1298
    break;
  case SOT_DROP_DB:
1299
    type_str= "drop db";
unknown's avatar
unknown committed
1300 1301
    break;
  case SOT_CREATE_DB:
1302
    type_str= "create db";
unknown's avatar
unknown committed
1303 1304
    break;
  case SOT_ALTER_DB:
1305
    type_str= "alter db";
unknown's avatar
unknown committed
1306
    break;
1307
  case SOT_TABLESPACE:
1308
    type_str= "tablespace";
1309 1310
    break;
  case SOT_LOGFILE_GROUP:
1311
    type_str= "logfile group";
1312
    break;
unknown's avatar
unknown committed
1313 1314 1315
  case SOT_TRUNCATE_TABLE:
    type_str= "truncate table";
    break;
unknown's avatar
unknown committed
1316 1317 1318 1319
  default:
    abort(); /* should not happen, programming error */
  }

1320
  NDB_SCHEMA_OBJECT *ndb_schema_object;
1321
  {
1322 1323
    char key[FN_REFLEN + 1];
    build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
1324
    ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
1325 1326
  }

unknown's avatar
unknown committed
1327 1328 1329 1330
  const NdbError *ndb_error= 0;
  uint32 node_id= g_ndb_cluster_connection->node_id();
  Uint64 epoch= 0;
  MY_BITMAP schema_subscribers;
1331
  uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
1332 1333
  char bitbuf_e[sizeof(bitbuf)];
  bzero(bitbuf_e, sizeof(bitbuf_e));
unknown's avatar
unknown committed
1334
  {
1335 1336
    int i, updated= 0;
    int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
1337
    bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, FALSE);
unknown's avatar
unknown committed
1338
    bitmap_set_all(&schema_subscribers);
1339 1340 1341 1342 1343 1344 1345 1346 1347 1348

    /* begin protect ndb_schema_share */
    pthread_mutex_lock(&ndb_schema_share_mutex);
    if (ndb_schema_share == 0)
    {
      pthread_mutex_unlock(&ndb_schema_share_mutex);
      if (ndb_schema_object)
        ndb_free_schema_object(&ndb_schema_object, FALSE);
      DBUG_RETURN(0);    
    }
1349
    (void) pthread_mutex_lock(&ndb_schema_share->mutex);
1350
    for (i= 0; i < no_storage_nodes; i++)
unknown's avatar
unknown committed
1351
    {
1352
      MY_BITMAP *table_subscribers= &ndb_schema_share->subscriber_bitmap[i];
unknown's avatar
unknown committed
1353
      if (!bitmap_is_clear_all(table_subscribers))
1354
      {
unknown's avatar
unknown committed
1355 1356
        bitmap_intersect(&schema_subscribers,
                         table_subscribers);
1357 1358
        updated= 1;
      }
unknown's avatar
unknown committed
1359
    }
1360
    (void) pthread_mutex_unlock(&ndb_schema_share->mutex);
1361 1362 1363
    pthread_mutex_unlock(&ndb_schema_share_mutex);
    /* end protect ndb_schema_share */

1364
    if (updated)
1365
    {
1366
      bitmap_clear_bit(&schema_subscribers, node_id);
1367 1368 1369 1370 1371 1372 1373 1374 1375
      /*
        if setting own acknowledge bit it is important that
        no other mysqld's are registred, as subsequent code
        will cause the original event to be hidden (by blob
        merge event code)
      */
      if (bitmap_is_clear_all(&schema_subscribers))
          bitmap_set_bit(&schema_subscribers, node_id);
    }
1376 1377 1378
    else
      bitmap_clear_all(&schema_subscribers);

1379
    if (ndb_schema_object)
unknown's avatar
unknown committed
1380
    {
1381 1382 1383 1384
      (void) pthread_mutex_lock(&ndb_schema_object->mutex);
      memcpy(ndb_schema_object->slock, schema_subscribers.bitmap,
             sizeof(ndb_schema_object->slock));
      (void) pthread_mutex_unlock(&ndb_schema_object->mutex);
unknown's avatar
unknown committed
1385 1386
    }

1387
    DBUG_DUMP("schema_subscribers", (uchar*)schema_subscribers.bitmap,
unknown's avatar
unknown committed
1388 1389 1390 1391 1392 1393
              no_bytes_in_map(&schema_subscribers));
    DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
                        bitmap_is_clear_all(&schema_subscribers)));
  }

  Ndb *ndb= thd_ndb->ndb;
1394 1395
  char save_db[FN_REFLEN];
  strcpy(save_db, ndb->getDatabaseName());
unknown's avatar
unknown committed
1396

1397
  char tmp_buf[FN_REFLEN];
unknown's avatar
unknown committed
1398 1399
  NDBDICT *dict= ndb->getDictionary();
  ndb->setDatabaseName(NDB_REP_DB);
1400 1401
  Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
  const NDBTAB *ndbtab= ndbtab_g.get_table();
unknown's avatar
unknown committed
1402 1403
  NdbTransaction *trans= 0;
  int retries= 100;
1404
  int retry_sleep= 10; /* 10 milliseconds, transaction */
unknown's avatar
unknown committed
1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
  const NDBCOL *col[SCHEMA_SIZE];
  unsigned sz[SCHEMA_SIZE];

  if (ndbtab == 0)
  {
    if (strcmp(NDB_REP_DB, db) != 0 ||
        strcmp(NDB_SCHEMA_TABLE, table_name))
    {
      ndb_error= &dict->getNdbError();
    }
1415
    goto end;
unknown's avatar
unknown committed
1416 1417 1418 1419 1420 1421 1422
  }

  {
    uint i;
    for (i= 0; i < SCHEMA_SIZE; i++)
    {
      col[i]= ndbtab->getColumn(i);
1423 1424 1425 1426 1427
      if (i != SCHEMA_QUERY_I)
      {
        sz[i]= col[i]->getLength();
        DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
      }
unknown's avatar
unknown committed
1428 1429 1430 1431 1432
    }
  }

  while (1)
  {
1433 1434 1435 1436
    const char *log_db= db;
    const char *log_tab= table_name;
    const char *log_subscribers= (char*)schema_subscribers.bitmap;
    uint32 log_type= (uint32)type;
unknown's avatar
unknown committed
1437 1438
    if ((trans= ndb->startTransaction()) == 0)
      goto err;
1439
    while (1)
unknown's avatar
unknown committed
1440 1441 1442 1443 1444 1445 1446 1447 1448
    {
      NdbOperation *op= 0;
      int r= 0;
      r|= (op= trans->getNdbOperation(ndbtab)) == 0;
      DBUG_ASSERT(r == 0);
      r|= op->writeTuple();
      DBUG_ASSERT(r == 0);
      
      /* db */
1449
      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
unknown's avatar
unknown committed
1450 1451 1452
      r|= op->equal(SCHEMA_DB_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* name */
1453 1454
      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
                       strlen(log_tab));
unknown's avatar
unknown committed
1455 1456 1457 1458
      r|= op->equal(SCHEMA_NAME_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* slock */
      DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
1459
      r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
unknown's avatar
unknown committed
1460 1461
      DBUG_ASSERT(r == 0);
      /* query */
1462 1463 1464 1465 1466 1467 1468 1469
      {
        NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
        DBUG_ASSERT(ndb_blob != 0);
        uint blob_len= query_length;
        const char* blob_ptr= query;
        r|= ndb_blob->setValue(blob_ptr, blob_len);
        DBUG_ASSERT(r == 0);
      }
unknown's avatar
unknown committed
1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482
      /* node_id */
      r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
      DBUG_ASSERT(r == 0);
      /* epoch */
      r|= op->setValue(SCHEMA_EPOCH_I, epoch);
      DBUG_ASSERT(r == 0);
      /* id */
      r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
      DBUG_ASSERT(r == 0);
      /* version */
      r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
      DBUG_ASSERT(r == 0);
      /* type */
1483
      r|= op->setValue(SCHEMA_TYPE_I, log_type);
unknown's avatar
unknown committed
1484
      DBUG_ASSERT(r == 0);
1485 1486 1487 1488 1489 1490
      /* any value */
      if (!(thd->options & OPTION_BIN_LOG))
        r|= op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
      else
        r|= op->setAnyValue(thd->server_id);
      DBUG_ASSERT(r == 0);
1491 1492 1493 1494
      if (log_db != new_db && new_db && new_table_name)
      {
        log_db= new_db;
        log_tab= new_table_name;
1495
        log_subscribers= bitbuf_e; // no ack expected on this
1496 1497 1498 1499
        log_type= (uint32)SOT_RENAME_TABLE_NEW;
        continue;
      }
      break;
unknown's avatar
unknown committed
1500 1501 1502 1503 1504 1505 1506
    }
    if (trans->execute(NdbTransaction::Commit) == 0)
    {
      DBUG_PRINT("info", ("logged: %s", query));
      break;
    }
err:
1507 1508 1509
    const NdbError *this_error= trans ?
      &trans->getNdbError() : &ndb->getNdbError();
    if (this_error->status == NdbError::TemporaryError)
unknown's avatar
unknown committed
1510 1511 1512
    {
      if (retries--)
      {
1513 1514
        if (trans)
          ndb->closeTransaction(trans);
1515
        my_sleep(retry_sleep);
unknown's avatar
unknown committed
1516 1517 1518
        continue; // retry
      }
    }
1519
    ndb_error= this_error;
unknown's avatar
unknown committed
1520 1521 1522 1523
    break;
  }
end:
  if (ndb_error)
1524
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
unknown's avatar
unknown committed
1525 1526 1527 1528 1529 1530 1531
                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                        ndb_error->code,
                        ndb_error->message,
                        "Could not log query '%s' on other mysqld's");
          
  if (trans)
    ndb->closeTransaction(trans);
1532
  ndb->setDatabaseName(save_db);
unknown's avatar
unknown committed
1533 1534 1535 1536 1537 1538 1539

  /*
    Wait for other mysqld's to acknowledge the table operation
  */
  if (ndb_error == 0 &&
      !bitmap_is_clear_all(&schema_subscribers))
  {
1540 1541 1542 1543 1544 1545 1546 1547 1548
    /*
      if own nodeid is set we are a single mysqld registred
      as an optimization we update the slock directly
    */
    if (bitmap_is_set(&schema_subscribers, node_id))
      ndbcluster_update_slock(thd, db, table_name);
    else
      dict->forceGCPWait();

1549
    int max_timeout= opt_ndb_sync_timeout;
1550
    (void) pthread_mutex_lock(&ndb_schema_object->mutex);
1551 1552 1553 1554 1555
    if (have_lock_open)
    {
      safe_mutex_assert_owner(&LOCK_open);
      (void) pthread_mutex_unlock(&LOCK_open);
    }
unknown's avatar
unknown committed
1556 1557 1558 1559
    while (1)
    {
      struct timespec abstime;
      int i;
1560
      int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
unknown's avatar
unknown committed
1561
      set_timespec(abstime, 1);
unknown's avatar
ndb:  
unknown committed
1562 1563 1564
      int ret= pthread_cond_timedwait(&injector_cond,
                                      &ndb_schema_object->mutex,
                                      &abstime);
1565 1566
      if (thd->killed)
        break;
1567 1568 1569 1570 1571 1572 1573 1574

      /* begin protect ndb_schema_share */
      pthread_mutex_lock(&ndb_schema_share_mutex);
      if (ndb_schema_share == 0)
      {
        pthread_mutex_unlock(&ndb_schema_share_mutex);
        break;
      }
1575
      (void) pthread_mutex_lock(&ndb_schema_share->mutex);
1576
      for (i= 0; i < no_storage_nodes; i++)
unknown's avatar
unknown committed
1577 1578
      {
        /* remove any unsubscribed from schema_subscribers */
1579
        MY_BITMAP *tmp= &ndb_schema_share->subscriber_bitmap[i];
unknown's avatar
unknown committed
1580 1581 1582
        if (!bitmap_is_clear_all(tmp))
          bitmap_intersect(&schema_subscribers, tmp);
      }
1583
      (void) pthread_mutex_unlock(&ndb_schema_share->mutex);
1584 1585
      pthread_mutex_unlock(&ndb_schema_share_mutex);
      /* end protect ndb_schema_share */
unknown's avatar
unknown committed
1586

1587 1588
      /* remove any unsubscribed from ndb_schema_object->slock */
      bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers);
unknown's avatar
unknown committed
1589

1590
      DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
1591
                (uchar*)ndb_schema_object->slock_bitmap.bitmap,
1592
                no_bytes_in_map(&ndb_schema_object->slock_bitmap));
unknown's avatar
unknown committed
1593

1594
      if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
unknown's avatar
unknown committed
1595 1596
        break;

unknown's avatar
ndb:  
unknown committed
1597
      if (ret)
unknown's avatar
unknown committed
1598
      {
unknown's avatar
ndb:  
unknown committed
1599 1600 1601
        max_timeout--;
        if (max_timeout == 0)
        {
1602
          sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
unknown's avatar
ndb:  
unknown committed
1603 1604 1605 1606 1607 1608
                          type_str, ndb_schema_object->key);
          break;
        }
        if (ndb_extra_logging)
          ndb_report_waiting(type_str, max_timeout,
                             "distributing", ndb_schema_object->key);
unknown's avatar
unknown committed
1609 1610
      }
    }
1611 1612 1613 1614
    if (have_lock_open)
    {
      (void) pthread_mutex_lock(&LOCK_open);
    }
1615
    (void) pthread_mutex_unlock(&ndb_schema_object->mutex);
unknown's avatar
unknown committed
1616
  }
1617

1618 1619
  if (ndb_schema_object)
    ndb_free_schema_object(&ndb_schema_object, FALSE);
1620

unknown's avatar
unknown committed
1621 1622 1623 1624 1625 1626
  DBUG_RETURN(0);
}

/*
  Handle _non_ data events from the storage nodes
*/
1627
int
unknown's avatar
unknown committed
1628 1629 1630
ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
                         NDB_SHARE *share)
{
1631
  DBUG_ENTER("ndb_handle_schema_change");
1632
  TABLE* table= share->table;
1633
  TABLE_SHARE *table_share= share->table_share;
1634
  const char *dbname= table_share->db.str;
unknown's avatar
unknown committed
1635
  const char *tabname= table_share->table_name.str;
1636 1637 1638 1639 1640
  bool do_close_cached_tables= FALSE;
  bool is_online_alter_table= FALSE;
  bool is_rename_table= FALSE;
  bool is_remote_change=
    (uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
unknown's avatar
unknown committed
1641

1642 1643 1644 1645
  if (pOp->getEventType() == NDBEVENT::TE_ALTER)
  {
    if (pOp->tableFrmChanged())
    {
1646
      DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: table frm changed"));
1647 1648 1649 1650
      is_online_alter_table= TRUE;
    }
    else
    {
1651
      DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: name changed"));
1652 1653 1654 1655 1656
      DBUG_ASSERT(pOp->tableNameChanged());
      is_rename_table= TRUE;
    }
  }

1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667
  {
    ndb->setDatabaseName(dbname);
    Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
    const NDBTAB *ev_tab= pOp->getTable();
    const NDBTAB *cache_tab= ndbtab_g.get_table();
    if (cache_tab &&
        cache_tab->getObjectId() == ev_tab->getObjectId() &&
        cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
      ndbtab_g.invalidate();
  }

1668 1669 1670 1671 1672
  /*
    Refresh local frm file and dictionary cache if
    remote on-line alter table
  */
  if (is_remote_change && is_online_alter_table)
unknown's avatar
unknown committed
1673
  {
1674
    const char *tabname= table_share->table_name.str;
1675
    char key[FN_REFLEN + 1];
1676 1677
    uchar *data= 0, *pack_data= 0;
    size_t length, pack_length;
1678 1679 1680
    int error;
    NDBDICT *dict= ndb->getDictionary();
    const NDBTAB *altered_table= pOp->getTable();
1681
    
1682 1683
    DBUG_PRINT("info", ("Detected frm change of table %s.%s",
                        dbname, tabname));
1684
    build_table_filename(key, FN_LEN - 1, dbname, tabname, NullS, 0);
1685
    /*
1686 1687 1688
      If the there is no local table shadowing the altered table and 
      it has an frm that is different than the one on disk then 
      overwrite it with the new table definition
1689
    */
1690 1691
    if (!ndbcluster_check_if_local_table(dbname, tabname) &&
	readfrm(key, &data, &length) == 0 &&
1692 1693
        packfrm(data, length, &pack_data, &pack_length) == 0 &&
        cmp_frm(altered_table, pack_data, pack_length))
1694
    {
1695
      DBUG_DUMP("frm", (uchar*) altered_table->getFrmData(), 
1696 1697
                altered_table->getFrmLength());
      pthread_mutex_lock(&LOCK_open);
1698 1699
      Ndb_table_guard ndbtab_g(dict, tabname);
      const NDBTAB *old= ndbtab_g.get_table();
1700 1701 1702 1703
      if (!old &&
          old->getObjectVersion() != altered_table->getObjectVersion())
        dict->putTable(altered_table);
      
1704 1705
      my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
      data= NULL;
1706 1707
      if ((error= unpackfrm(&data, &length,
                            (const uchar*) altered_table->getFrmData())) ||
1708
          (error= writefrm(key, data, length)))
1709
      {
1710 1711
        sql_print_information("NDB: Failed write frm for %s.%s, error %d",
                              dbname, tabname, error);
1712
      }
1713 1714
      
      // copy names as memory will be freed
unknown's avatar
unknown committed
1715 1716
      NdbAutoPtr<char> a1((char *)(dbname= strdup(dbname)));
      NdbAutoPtr<char> a2((char *)(tabname= strdup(tabname)));
1717
      ndbcluster_binlog_close_table(thd, share);
1718 1719 1720 1721 1722

      TABLE_LIST table_list;
      bzero((char*) &table_list,sizeof(table_list));
      table_list.db= (char *)dbname;
      table_list.alias= table_list.table_name= (char *)tabname;
1723
      close_cached_tables(thd, &table_list, TRUE, FALSE, FALSE);
1724

1725 1726
      if ((error= ndbcluster_binlog_open_table(thd, share,
                                               table_share, table, 1)))
1727 1728
        sql_print_information("NDB: Failed to re-open table %s.%s",
                              dbname, tabname);
1729 1730 1731 1732 1733 1734

      table= share->table;
      table_share= share->table_share;
      dbname= table_share->db.str;
      tabname= table_share->table_name.str;

1735
      pthread_mutex_unlock(&LOCK_open);
1736
    }
1737 1738
    my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
    my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
unknown's avatar
unknown committed
1739 1740
  }

1741
  // If only frm was changed continue replicating
1742
  if (is_online_alter_table)
1743 1744 1745 1746 1747 1748
  {
    /* Signal ha_ndbcluster::alter_table that drop is done */
    (void) pthread_cond_signal(&injector_cond);
    DBUG_RETURN(0);
  }

unknown's avatar
unknown committed
1749
  (void) pthread_mutex_lock(&share->mutex);
1750 1751 1752 1753 1754 1755 1756 1757 1758 1759
  if (is_rename_table && !is_remote_change)
  {
    DBUG_PRINT("info", ("Detected name change of table %s.%s",
                        share->db, share->table_name));
    /* ToDo: remove printout */
    if (ndb_extra_logging)
      sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
                            share_prefix, share->table->s->db.str,
                            share->table->s->table_name.str,
                            share->key);
1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770
    {
      ndb->setDatabaseName(share->table->s->db.str);
      Ndb_table_guard ndbtab_g(ndb->getDictionary(),
                               share->table->s->table_name.str);
      const NDBTAB *ev_tab= pOp->getTable();
      const NDBTAB *cache_tab= ndbtab_g.get_table();
      if (cache_tab &&
          cache_tab->getObjectId() == ev_tab->getObjectId() &&
          cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
        ndbtab_g.invalidate();
    }
1771 1772 1773 1774 1775 1776
    /* do the rename of the table in the share */
    share->table->s->db.str= share->db;
    share->table->s->db.length= strlen(share->db);
    share->table->s->table_name.str= share->table_name;
    share->table->s->table_name.length= strlen(share->table_name);
  }
unknown's avatar
unknown committed
1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788
  DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
  if (share->op_old == pOp)
    share->op_old= 0;
  else
    share->op= 0;
  // either just us or drop table handling as well
      
  /* Signal ha_ndbcluster::delete/rename_table that drop is done */
  (void) pthread_mutex_unlock(&share->mutex);
  (void) pthread_cond_signal(&injector_cond);

  pthread_mutex_lock(&ndbcluster_mutex);
1789 1790 1791
  /* ndb_share reference binlog free */
  DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
                           share->key, share->use_count));
unknown's avatar
unknown committed
1792
  free_share(&share, TRUE);
1793
  if (is_remote_change && share && share->state != NSS_DROPPED)
unknown's avatar
unknown committed
1794
  {
1795
    DBUG_PRINT("info", ("remote change"));
1796
    share->state= NSS_DROPPED;
unknown's avatar
unknown committed
1797
    if (share->use_count != 1)
1798 1799 1800
    {
      /* open handler holding reference */
      /* wait with freeing create ndb_share to below */
1801
      do_close_cached_tables= TRUE;
1802
    }
1803 1804
    else
    {
1805 1806 1807
      /* ndb_share reference create free */
      DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
                               share->key, share->use_count));
1808 1809 1810
      free_share(&share, TRUE);
      share= 0;
    }
unknown's avatar
unknown committed
1811
  }
1812 1813
  else
    share= 0;
unknown's avatar
unknown committed
1814 1815 1816
  pthread_mutex_unlock(&ndbcluster_mutex);

  pOp->setCustomData(0);
1817

unknown's avatar
unknown committed
1818
  pthread_mutex_lock(&injector_mutex);
1819
  ndb->dropEventOperation(pOp);
unknown's avatar
unknown committed
1820 1821 1822 1823
  pOp= 0;
  pthread_mutex_unlock(&injector_mutex);

  if (do_close_cached_tables)
1824 1825 1826 1827 1828
  {
    TABLE_LIST table_list;
    bzero((char*) &table_list,sizeof(table_list));
    table_list.db= (char *)dbname;
    table_list.alias= table_list.table_name= (char *)tabname;
1829
    close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
1830 1831 1832
    /* ndb_share reference create free */
    DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
                             share->key, share->use_count));
1833 1834
    free_share(&share);
  }
unknown's avatar
unknown committed
1835
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1836 1837
}

1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855
static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
{
  if (schema->any_value & NDB_ANYVALUE_RESERVED)
  {
    if (schema->any_value != NDB_ANYVALUE_FOR_NOLOGGING)
      sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
                        "query not logged",
                        schema->any_value);
    return;
  }
  uint32 thd_server_id_save= thd->server_id;
  DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
  char *thd_db_save= thd->db;
  if (schema->any_value == 0)
    thd->server_id= ::server_id;
  else
    thd->server_id= schema->any_value;
  thd->db= schema->db;
1856
  int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
1857 1858
  thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
                    schema->query_length, FALSE,
1859 1860
                    schema->name[0] == 0 || thd->db[0] == 0,
                    errcode);
1861 1862 1863 1864
  thd->server_id= thd_server_id_save;
  thd->db= thd_db_save;
}

unknown's avatar
unknown committed
1865 1866 1867
static int
ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
                                      NdbEventOperation *pOp,
1868
                                      List<Cluster_schema> 
1869
                                      *post_epoch_log_list,
1870
                                      List<Cluster_schema> 
1871 1872
                                      *post_epoch_unlock_list,
                                      MEM_ROOT *mem_root)
unknown's avatar
unknown committed
1873 1874
{
  DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
1875
  NDB_SHARE *tmp_share= (NDB_SHARE *)pOp->getCustomData();
1876
  if (tmp_share && ndb_schema_share == tmp_share)
unknown's avatar
unknown committed
1877 1878 1879
  {
    NDBEVENT::TableEvent ev_type= pOp->getEventType();
    DBUG_PRINT("enter", ("%s.%s  ev_type: %d",
1880
                         tmp_share->db, tmp_share->table_name, ev_type));
1881 1882
    if (ev_type == NDBEVENT::TE_UPDATE ||
        ev_type == NDBEVENT::TE_INSERT)
unknown's avatar
unknown committed
1883
    {
1884 1885
      Cluster_schema *schema= (Cluster_schema *)
        sql_alloc(sizeof(Cluster_schema));
unknown's avatar
unknown committed
1886
      MY_BITMAP slock;
1887
      bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
unknown's avatar
unknown committed
1888
      uint node_id= g_ndb_cluster_connection->node_id();
1889 1890 1891 1892
      {
        ndbcluster_get_schema(tmp_share, schema);
        schema->any_value= pOp->getAnyValue();
      }
1893
      enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
1894 1895 1896 1897
      DBUG_PRINT("info",
                 ("%s.%s: log query_length: %d  query: '%s'  type: %d",
                  schema->db, schema->name,
                  schema->query_length, schema->query,
1898 1899
                  schema_type));
      if (schema_type == SOT_CLEAR_SLOCK)
1900
      {
1901 1902 1903 1904 1905 1906
        /*
          handle slock after epoch is completed to ensure that
          schema events get inserted in the binlog after any data
          events
        */
        post_epoch_log_list->push_back(schema, mem_root);
1907 1908
        DBUG_RETURN(0);
      }
unknown's avatar
unknown committed
1909 1910
      if (schema->node_id != node_id)
      {
1911
        int log_query= 0, post_epoch_unlock= 0;
1912
        switch (schema_type)
unknown's avatar
unknown committed
1913 1914
        {
        case SOT_DROP_TABLE:
1915
          // fall through
unknown's avatar
unknown committed
1916
        case SOT_RENAME_TABLE:
1917 1918 1919
          // fall through
        case SOT_RENAME_TABLE_NEW:
          // fall through
unknown's avatar
unknown committed
1920
        case SOT_ALTER_TABLE:
1921 1922 1923 1924
          post_epoch_log_list->push_back(schema, mem_root);
          /* acknowledge this query _after_ epoch completion */
          post_epoch_unlock= 1;
          break;
unknown's avatar
unknown committed
1925
	case SOT_TRUNCATE_TABLE:
1926
        {
1927 1928
          char key[FN_REFLEN + 1];
          build_table_filename(key, sizeof(key) - 1,
1929
                               schema->db, schema->name, "", 0);
1930
          /* ndb_share reference temporary, free below */
1931
          NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
1932 1933 1934 1935 1936
          if (share)
          {
            DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                                     share->key, share->use_count));
          }
1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949
          // invalidation already handled by binlog thread
          if (!share || !share->op)
          {
            {
              injector_ndb->setDatabaseName(schema->db);
              Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
                                       schema->name);
              ndbtab_g.invalidate();
            }
            TABLE_LIST table_list;
            bzero((char*) &table_list,sizeof(table_list));
            table_list.db= schema->db;
            table_list.alias= table_list.table_name= schema->name;
1950
            close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
1951
          }
1952
          /* ndb_share reference temporary free */
1953
          if (share)
1954 1955 1956
          {
            DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                                     share->key, share->use_count));
1957
            free_share(&share);
1958
          }
1959 1960 1961
        }
        // fall through
        case SOT_CREATE_TABLE:
unknown's avatar
unknown committed
1962
          pthread_mutex_lock(&LOCK_open);
unknown's avatar
unknown committed
1963 1964
          if (ndbcluster_check_if_local_table(schema->db, schema->name))
          {
1965
            DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
unknown's avatar
unknown committed
1966
                                schema->db, schema->name));
1967
            sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
1968 1969 1970
                            "binlog schema event '%s' from node %d. ",
                            schema->db, schema->name, schema->query,
                            schema->node_id);
unknown's avatar
unknown committed
1971
          }
1972
          else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
unknown's avatar
unknown committed
1973
          {
1974
            sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
1975 1976
                            "binlog schema event '%s' from node %d. "
                            "my_errno: %d",
unknown's avatar
unknown committed
1977
                            schema->db, schema->name, schema->query,
1978 1979 1980 1981
                            schema->node_id, my_errno);
            List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
            MYSQL_ERROR *err;
            while ((err= it++))
1982
              sql_print_warning("NDB Binlog: (%d)%s", err->code, err->msg);
unknown's avatar
unknown committed
1983 1984 1985 1986 1987
          }
          pthread_mutex_unlock(&LOCK_open);
          log_query= 1;
          break;
        case SOT_DROP_DB:
unknown's avatar
unknown committed
1988 1989 1990
          /* Drop the database locally if it only contains ndb tables */
          if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
          {
unknown's avatar
unknown committed
1991
            const int no_print_error[1]= {0};
unknown's avatar
unknown committed
1992 1993
            run_query(thd, schema->query,
                      schema->query + schema->query_length,
unknown's avatar
unknown committed
1994
                      no_print_error,    /* print error */
unknown's avatar
unknown committed
1995 1996 1997 1998 1999 2000 2001 2002 2003
                      TRUE);   /* don't binlog the query */
            /* binlog dropping database after any table operations */
            post_epoch_log_list->push_back(schema, mem_root);
            /* acknowledge this query _after_ epoch completion */
            post_epoch_unlock= 1;
          }
          else
          {
            /* Database contained local tables, leave it */
2004
            sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
2005 2006 2007
                            "binlog schema event '%s' from node %d. ",
                            schema->db, schema->query,
                            schema->node_id);
unknown's avatar
unknown committed
2008 2009
            log_query= 1;
          }
unknown's avatar
unknown committed
2010 2011 2012 2013
          break;
        case SOT_CREATE_DB:
          /* fall through */
        case SOT_ALTER_DB:
unknown's avatar
unknown committed
2014 2015
        {
          const int no_print_error[1]= {0};
unknown's avatar
unknown committed
2016 2017
          run_query(thd, schema->query,
                    schema->query + schema->query_length,
unknown's avatar
unknown committed
2018
                    no_print_error,    /* print error */
2019 2020
                    TRUE);   /* don't binlog the query */
          log_query= 1;
unknown's avatar
unknown committed
2021
          break;
unknown's avatar
unknown committed
2022
        }
2023 2024 2025 2026
        case SOT_TABLESPACE:
        case SOT_LOGFILE_GROUP:
          log_query= 1;
          break;
2027 2028
        case SOT_CLEAR_SLOCK:
          abort();
unknown's avatar
unknown committed
2029
        }
2030
        if (log_query && ndb_binlog_running)
2031
          ndb_binlog_query(thd, schema);
2032
        /* signal that schema operation has been handled */
2033
        DBUG_DUMP("slock", (uchar*) schema->slock, schema->slock_length);
2034 2035 2036 2037 2038 2039 2040
        if (bitmap_is_set(&slock, node_id))
        {
          if (post_epoch_unlock)
            post_epoch_unlock_list->push_back(schema, mem_root);
          else
            ndbcluster_update_slock(thd, schema->db, schema->name);
        }
unknown's avatar
unknown committed
2041
      }
2042
      DBUG_RETURN(0);
unknown's avatar
unknown committed
2043
    }
2044 2045 2046 2047 2048
    /*
      the normal case of UPDATE/INSERT has already been handled
    */
    switch (ev_type)
    {
unknown's avatar
unknown committed
2049 2050 2051 2052
    case NDBEVENT::TE_DELETE:
      // skip
      break;
    case NDBEVENT::TE_CLUSTER_FAILURE:
2053
      if (ndb_extra_logging)
2054
        sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
2055
                              ndb_schema_share->key, (unsigned) pOp->getGCI());
2056
      // fall through
unknown's avatar
unknown committed
2057
    case NDBEVENT::TE_DROP:
2058 2059 2060 2061
      if (ndb_extra_logging &&
          ndb_binlog_tables_inited && ndb_binlog_running)
        sql_print_information("NDB Binlog: ndb tables initially "
                              "read only on reconnect.");
2062 2063 2064

      /* begin protect ndb_schema_share */
      pthread_mutex_lock(&ndb_schema_share_mutex);
2065 2066 2067 2068
      /* ndb_share reference binlog extra free */
      DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
                               ndb_schema_share->key,
                               ndb_schema_share->use_count));
2069 2070
      free_share(&ndb_schema_share);
      ndb_schema_share= 0;
unknown's avatar
unknown committed
2071
      ndb_binlog_tables_inited= 0;
2072 2073 2074
      pthread_mutex_unlock(&ndb_schema_share_mutex);
      /* end protect ndb_schema_share */

2075
      close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
2076 2077
      // fall through
    case NDBEVENT::TE_ALTER:
2078
      ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
unknown's avatar
unknown committed
2079 2080 2081 2082 2083
      break;
    case NDBEVENT::TE_NODE_FAILURE:
    {
      uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
      DBUG_ASSERT(node_id != 0xFF);
2084 2085
      (void) pthread_mutex_lock(&tmp_share->mutex);
      bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
unknown's avatar
unknown committed
2086
      DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
unknown's avatar
unknown committed
2087 2088 2089 2090 2091
      if (ndb_extra_logging)
      {
        sql_print_information("NDB Binlog: Node: %d, down,"
                              " Subscriber bitmask %x%x",
                              pOp->getNdbdNodeId(),
2092 2093
                              tmp_share->subscriber_bitmap[node_id].bitmap[1],
                              tmp_share->subscriber_bitmap[node_id].bitmap[0]);
unknown's avatar
unknown committed
2094
      }
2095
      (void) pthread_mutex_unlock(&tmp_share->mutex);
unknown's avatar
unknown committed
2096 2097 2098 2099 2100 2101 2102 2103
      (void) pthread_cond_signal(&injector_cond);
      break;
    }
    case NDBEVENT::TE_SUBSCRIBE:
    {
      uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
      uint8 req_id= pOp->getReqNodeId();
      DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2104 2105
      (void) pthread_mutex_lock(&tmp_share->mutex);
      bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
unknown's avatar
unknown committed
2106
      DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
unknown's avatar
unknown committed
2107 2108 2109 2110 2111 2112
      if (ndb_extra_logging)
      {
        sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
                              " Subscriber bitmask %x%x",
                              pOp->getNdbdNodeId(),
                              req_id,
2113 2114
                              tmp_share->subscriber_bitmap[node_id].bitmap[1],
                              tmp_share->subscriber_bitmap[node_id].bitmap[0]);
unknown's avatar
unknown committed
2115
      }
2116
      (void) pthread_mutex_unlock(&tmp_share->mutex);
unknown's avatar
unknown committed
2117 2118 2119 2120 2121 2122 2123 2124
      (void) pthread_cond_signal(&injector_cond);
      break;
    }
    case NDBEVENT::TE_UNSUBSCRIBE:
    {
      uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
      uint8 req_id= pOp->getReqNodeId();
      DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2125 2126
      (void) pthread_mutex_lock(&tmp_share->mutex);
      bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
unknown's avatar
unknown committed
2127
      DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
unknown's avatar
unknown committed
2128 2129 2130 2131 2132 2133
      if (ndb_extra_logging)
      {
        sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
                              " Subscriber bitmask %x%x",
                              pOp->getNdbdNodeId(),
                              req_id,
2134 2135
                              tmp_share->subscriber_bitmap[node_id].bitmap[1],
                              tmp_share->subscriber_bitmap[node_id].bitmap[0]);
unknown's avatar
unknown committed
2136
      }
2137
      (void) pthread_mutex_unlock(&tmp_share->mutex);
unknown's avatar
unknown committed
2138 2139 2140 2141 2142
      (void) pthread_cond_signal(&injector_cond);
      break;
    }
    default:
      sql_print_error("NDB Binlog: unknown non data event %d for %s. "
2143
                      "Ignoring...", (unsigned) ev_type, tmp_share->key);
unknown's avatar
unknown committed
2144 2145 2146 2147 2148
    }
  }
  DBUG_RETURN(0);
}

2149 2150 2151 2152 2153 2154
/*
  process any operations that should be done after
  the epoch is complete
*/
static void
ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
2155
                                                 List<Cluster_schema>
2156
                                                 *post_epoch_log_list,
2157
                                                 List<Cluster_schema>
2158 2159
                                                 *post_epoch_unlock_list)
{
2160 2161
  if (post_epoch_log_list->elements == 0)
    return;
2162
  DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
2163
  Cluster_schema *schema;
2164 2165
  while ((schema= post_epoch_log_list->pop()))
  {
2166 2167 2168 2169 2170 2171
    DBUG_PRINT("info",
               ("%s.%s: log query_length: %d  query: '%s'  type: %d",
                schema->db, schema->name,
                schema->query_length, schema->query,
                schema->type));
    int log_query= 0;
2172
    {
2173
      enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
2174 2175
      char key[FN_REFLEN + 1];
      build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
2176 2177 2178 2179 2180
      if (schema_type == SOT_CLEAR_SLOCK)
      {
        pthread_mutex_lock(&ndbcluster_mutex);
        NDB_SCHEMA_OBJECT *ndb_schema_object=
          (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects,
2181
                                           (uchar*) key, strlen(key));
2182 2183 2184 2185 2186 2187
        if (ndb_schema_object)
        {
          pthread_mutex_lock(&ndb_schema_object->mutex);
          memcpy(ndb_schema_object->slock, schema->slock,
                 sizeof(ndb_schema_object->slock));
          DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
2188
                    (uchar*)ndb_schema_object->slock_bitmap.bitmap,
2189 2190 2191 2192 2193 2194 2195
                    no_bytes_in_map(&ndb_schema_object->slock_bitmap));
          pthread_mutex_unlock(&ndb_schema_object->mutex);
          pthread_cond_signal(&injector_cond);
        }
        pthread_mutex_unlock(&ndbcluster_mutex);
        continue;
      }
2196
      /* ndb_share reference temporary, free below */
2197
      NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
2198 2199 2200 2201 2202
      if (share)
      {
        DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                                 share->key, share->use_count));
      }
2203
      switch (schema_type)
2204 2205
      {
      case SOT_DROP_DB:
2206
        log_query= 1;
2207
        break;
2208
      case SOT_DROP_TABLE:
unknown's avatar
unknown committed
2209
        log_query= 1;
2210 2211 2212 2213 2214 2215
        // invalidation already handled by binlog thread
        if (share && share->op)
        {
          break;
        }
        // fall through
2216
      case SOT_RENAME_TABLE:
2217
        // fall through
2218
      case SOT_ALTER_TABLE:
2219 2220
        // invalidation already handled by binlog thread
        if (!share || !share->op)
2221
        {
2222 2223 2224 2225 2226 2227 2228 2229 2230 2231
          {
            injector_ndb->setDatabaseName(schema->db);
            Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
                                     schema->name);
            ndbtab_g.invalidate();
          }
          TABLE_LIST table_list;
          bzero((char*) &table_list,sizeof(table_list));
          table_list.db= schema->db;
          table_list.alias= table_list.table_name= schema->name;
2232
          close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2233
        }
2234 2235 2236 2237 2238
        if (schema_type != SOT_ALTER_TABLE)
          break;
        // fall through
      case SOT_RENAME_TABLE_NEW:
        log_query= 1;
2239
        if (ndb_binlog_running && (!share || !share->op))
2240
        {
2241 2242 2243 2244 2245 2246
          /*
            we need to free any share here as command below
            may need to call handle_trailing_share
          */
          if (share)
          {
2247 2248 2249
            /* ndb_share reference temporary free */
            DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                                     share->key, share->use_count));
2250 2251 2252 2253
            free_share(&share);
            share= 0;
          }
          pthread_mutex_lock(&LOCK_open);
unknown's avatar
unknown committed
2254 2255
          if (ndbcluster_check_if_local_table(schema->db, schema->name))
          {
2256
            DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
unknown's avatar
unknown committed
2257
                                schema->db, schema->name));
2258
            sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
2259 2260 2261
                            "binlog schema event '%s' from node %d. ",
                            schema->db, schema->name, schema->query,
                            schema->node_id);
unknown's avatar
unknown committed
2262
          }
2263
          else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
unknown's avatar
unknown committed
2264
          {
2265
            sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
2266
                            "binlog schema event '%s' from node %d. my_errno: %d",
2267
                            schema->db, schema->name, schema->query,
2268 2269 2270 2271
                            schema->node_id, my_errno);
            List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
            MYSQL_ERROR *err;
            while ((err= it++))
2272
              sql_print_warning("NDB Binlog: (%d)%s", err->code, err->msg);
2273 2274
          }
          pthread_mutex_unlock(&LOCK_open);
2275
        }
2276
        break;
2277
      default:
2278
        DBUG_ASSERT(FALSE);
2279 2280
      }
      if (share)
2281
      {
2282 2283 2284
        /* ndb_share reference temporary free */
        DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                                 share->key, share->use_count));
2285
        free_share(&share);
2286 2287
        share= 0;
      }
2288
    }
2289
    if (ndb_binlog_running && log_query)
2290
      ndb_binlog_query(thd, schema);
2291 2292 2293 2294
  }
  while ((schema= post_epoch_unlock_list->pop()))
  {
    ndbcluster_update_slock(thd, schema->db, schema->name);
unknown's avatar
unknown committed
2295 2296
  }
  DBUG_VOID_RETURN;
2297 2298
}

unknown's avatar
unknown committed
2299 2300 2301 2302 2303 2304
/*
  Timer class for doing performance measurements
*/

/*********************************************************************
  Internal helper functions for handeling of the cluster replication tables
2305 2306
  - ndb_binlog_index
  - ndb_apply_status
unknown's avatar
unknown committed
2307 2308 2309 2310
*********************************************************************/

/*
  struct to hold the data to be inserted into the
2311
  ndb_binlog_index table
unknown's avatar
unknown committed
2312
*/
2313
struct ndb_binlog_index_row {
unknown's avatar
unknown committed
2314 2315 2316 2317 2318 2319 2320 2321 2322 2323
  ulonglong gci;
  const char *master_log_file;
  ulonglong master_log_pos;
  ulonglong n_inserts;
  ulonglong n_updates;
  ulonglong n_deletes;
  ulonglong n_schemaops;
};

/*
2324
  Open the ndb_binlog_index table
unknown's avatar
unknown committed
2325
*/
2326 2327
static int open_ndb_binlog_index(THD *thd, TABLE_LIST *tables,
                             TABLE **ndb_binlog_index)
unknown's avatar
unknown committed
2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342
{
  static char repdb[]= NDB_REP_DB;
  static char reptable[]= NDB_REP_TABLE;
  const char *save_proc_info= thd->proc_info;

  bzero((char*) tables, sizeof(*tables));
  tables->db= repdb;
  tables->alias= tables->table_name= reptable;
  tables->lock_type= TL_WRITE;
  thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
  tables->required_type= FRMTYPE_TABLE;
  uint counter;
  thd->clear_error();
  if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH))
  {
2343 2344 2345 2346 2347 2348
    if (thd->killed)
      sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
    else
      sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
                      thd->main_da.sql_errno(),
                      thd->main_da.message());
unknown's avatar
unknown committed
2349 2350 2351
    thd->proc_info= save_proc_info;
    return -1;
  }
2352
  *ndb_binlog_index= tables->table;
unknown's avatar
unknown committed
2353
  thd->proc_info= save_proc_info;
2354
  (*ndb_binlog_index)->use_all_columns();
unknown's avatar
unknown committed
2355 2356 2357
  return 0;
}

2358

unknown's avatar
unknown committed
2359
/*
2360
  Insert one row in the ndb_binlog_index
unknown's avatar
unknown committed
2361
*/
2362

2363
int ndb_add_ndb_binlog_index(THD *thd, void *_row)
unknown's avatar
unknown committed
2364
{
2365
  ndb_binlog_index_row &row= *(ndb_binlog_index_row *) _row;
unknown's avatar
unknown committed
2366 2367
  int error= 0;
  bool need_reopen;
2368 2369 2370 2371 2372 2373 2374
  /*
    Turn of binlogging to prevent the table changes to be written to
    the binary log.
  */
  ulong saved_options= thd->options;
  thd->options&= ~(OPTION_BIN_LOG);

unknown's avatar
unknown committed
2375 2376
  for ( ; ; ) /* loop for need_reopen */
  {
2377
    if (!ndb_binlog_index && open_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index))
unknown's avatar
unknown committed
2378 2379
    {
      error= -1;
2380
      goto add_ndb_binlog_index_err;
unknown's avatar
unknown committed
2381 2382 2383 2384 2385 2386
    }

    if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
    {
      if (need_reopen)
      {
unknown's avatar
unknown committed
2387 2388
        TABLE_LIST *p_binlog_tables= &binlog_tables;
        close_tables_for_reopen(thd, &p_binlog_tables);
unknown's avatar
unknown committed
2389
        ndb_binlog_index= 0;
unknown's avatar
unknown committed
2390 2391
        continue;
      }
2392
      sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
unknown's avatar
unknown committed
2393
      error= -1;
2394
      goto add_ndb_binlog_index_err;
unknown's avatar
unknown committed
2395 2396 2397 2398
    }
    break;
  }

unknown's avatar
unknown committed
2399 2400 2401 2402 2403
  /*
    Intialize ndb_binlog_index->record[0]
  */
  empty_record(ndb_binlog_index);

2404 2405
  ndb_binlog_index->field[0]->store(row.master_log_pos);
  ndb_binlog_index->field[1]->store(row.master_log_file,
unknown's avatar
unknown committed
2406 2407
                                strlen(row.master_log_file),
                                &my_charset_bin);
2408 2409 2410 2411 2412
  ndb_binlog_index->field[2]->store(row.gci);
  ndb_binlog_index->field[3]->store(row.n_inserts);
  ndb_binlog_index->field[4]->store(row.n_updates);
  ndb_binlog_index->field[5]->store(row.n_deletes);
  ndb_binlog_index->field[6]->store(row.n_schemaops);
unknown's avatar
unknown committed
2413

2414
  if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
unknown's avatar
unknown committed
2415
  {
2416
    sql_print_error("NDB Binlog: Writing row to ndb_binlog_index: %d", error);
unknown's avatar
unknown committed
2417
    error= -1;
2418
    goto add_ndb_binlog_index_err;
unknown's avatar
unknown committed
2419 2420 2421 2422
  }

  mysql_unlock_tables(thd, thd->lock);
  thd->lock= 0;
2423
  thd->options= saved_options;
unknown's avatar
unknown committed
2424
  return 0;
2425
add_ndb_binlog_index_err:
unknown's avatar
unknown committed
2426
  close_thread_tables(thd);
2427
  ndb_binlog_index= 0;
2428
  thd->options= saved_options;
unknown's avatar
unknown committed
2429 2430 2431 2432 2433 2434 2435
  return error;
}

/*********************************************************************
  Functions for start, stop, wait for ndbcluster binlog thread
*********************************************************************/

2436 2437 2438 2439 2440 2441 2442 2443
enum Binlog_thread_state
{
  BCCC_running= 0,
  BCCC_exit= 1,
  BCCC_restart= 2
};

static enum Binlog_thread_state do_ndbcluster_binlog_close_connection= BCCC_restart;
unknown's avatar
unknown committed
2444 2445 2446 2447 2448

int ndbcluster_binlog_start()
{
  DBUG_ENTER("ndbcluster_binlog_start");

2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460
  if (::server_id == 0)
  {
    sql_print_warning("NDB: server id set to zero will cause any other mysqld "
                      "with bin log to log with wrong server id");
  }
  else if (::server_id & 0x1 << 31)
  {
    sql_print_error("NDB: server id's with high bit set is reserved for internal "
                    "purposes");
    DBUG_RETURN(-1);
  }

unknown's avatar
unknown committed
2461 2462
  pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
  pthread_cond_init(&injector_cond, NULL);
2463
  pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
unknown's avatar
unknown committed
2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474

  /* Create injector thread */
  if (pthread_create(&ndb_binlog_thread, &connection_attrib,
                     ndb_binlog_thread_func, 0))
  {
    DBUG_PRINT("error", ("Could not create ndb injector thread"));
    pthread_cond_destroy(&injector_cond);
    pthread_mutex_destroy(&injector_mutex);
    DBUG_RETURN(-1);
  }

2475 2476 2477
  ndbcluster_binlog_inited= 1;

  /* Wait for the injector thread to start */
unknown's avatar
unknown committed
2478 2479 2480 2481
  pthread_mutex_lock(&injector_mutex);
  while (!ndb_binlog_thread_running)
    pthread_cond_wait(&injector_cond, &injector_mutex);
  pthread_mutex_unlock(&injector_mutex);
2482

unknown's avatar
unknown committed
2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505
  if (ndb_binlog_thread_running < 0)
    DBUG_RETURN(-1);

  DBUG_RETURN(0);
}


/**************************************************************
  Internal helper functions for creating/dropping ndb events
  used by the client sql threads
**************************************************************/
void
ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
{
  event_name->set_ascii("REPL$", 5);
  event_name->append(db);
  if (tbl)
  {
    event_name->append('/');
    event_name->append(tbl);
  }
}

2506 2507 2508
bool
ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
{
2509 2510
  char key[FN_REFLEN + 1];
  char ndb_file[FN_REFLEN + 1];
2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527

  DBUG_ENTER("ndbcluster_check_if_local_table");
  build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
  build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
  /* Check that any defined table is an ndb table */
  DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
  if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
  {
    DBUG_PRINT("info", ("table file %s not on disk, local table", ndb_file));   
  
  
    DBUG_RETURN(true);
  }

  DBUG_RETURN(false);
}

2528 2529 2530 2531 2532
bool
ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
{
  DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
  DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
2533 2534
  LEX_STRING *tabname;
  List<LEX_STRING> files;
2535
  char path[FN_REFLEN + 1];
2536

2537
  build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
2538 2539 2540 2541 2542 2543 2544 2545
  if (find_files(thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK)
  {
    DBUG_PRINT("info", ("Failed to find files"));
    DBUG_RETURN(true);
  }
  DBUG_PRINT("info",("found: %d files", files.elements));
  while ((tabname= files.pop()))
  {
2546 2547
    DBUG_PRINT("info", ("Found table %s", tabname->str));
    if (ndbcluster_check_if_local_table(dbname, tabname->str))
2548 2549 2550 2551 2552 2553
      DBUG_RETURN(true);
  }
  
  DBUG_RETURN(false);
}

unknown's avatar
unknown committed
2554 2555 2556 2557 2558
/*
  Common function for setting up everything for logging a table at
  create/discover.
*/
int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
2559
                                   uint key_len,
unknown's avatar
unknown committed
2560 2561
                                   const char *db,
                                   const char *table_name,
2562
                                   my_bool share_may_exist)
unknown's avatar
unknown committed
2563
{
2564
  int do_event_op= ndb_binlog_running;
unknown's avatar
unknown committed
2565
  DBUG_ENTER("ndbcluster_create_binlog_setup");
2566 2567
  DBUG_PRINT("enter",("key: %s  key_len: %d  %s.%s  share_may_exist: %d",
                      key, key_len, db, table_name, share_may_exist));
2568
  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
2569
  DBUG_ASSERT(strlen(key) == key_len);
unknown's avatar
unknown committed
2570 2571 2572 2573

  pthread_mutex_lock(&ndbcluster_mutex);

  /* Handle any trailing share */
2574
  NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
2575
                                             (uchar*) key, key_len);
2576

2577
  if (share && share_may_exist)
unknown's avatar
unknown committed
2578
  {
2579 2580 2581 2582 2583 2584 2585
    if (share->flags & NSF_NO_BINLOG ||
        share->op != 0 ||
        share->op_old != 0)
    {
      pthread_mutex_unlock(&ndbcluster_mutex);
      DBUG_RETURN(0); // replication already setup, or should not
    }
unknown's avatar
unknown committed
2586
  }
2587 2588 2589

  if (share)
  {
2590 2591 2592 2593 2594 2595
    if (share->op || share->op_old)
    {
      my_errno= HA_ERR_TABLE_EXIST;
      pthread_mutex_unlock(&ndbcluster_mutex);
      DBUG_RETURN(1);
    }
2596
    if (!share_may_exist || share->connect_count != 
2597 2598 2599 2600 2601
        g_ndb_cluster_connection->get_connect_count())
    {
      handle_trailing_share(share);
      share= NULL;
    }
2602 2603
  }

unknown's avatar
unknown committed
2604
  /* Create share which is needed to hold replication information */
2605 2606
  if (share)
  {
2607
    /* ndb_share reference create */
2608
    ++share->use_count;
2609 2610
    DBUG_PRINT("NDB_SHARE", ("%s create  use_count: %u",
                             share->key, share->use_count));
2611
  }
2612 2613
  /* ndb_share reference create */
  else if (!(share= get_share(key, 0, TRUE, TRUE)))
unknown's avatar
unknown committed
2614 2615 2616 2617
  {
    sql_print_error("NDB Binlog: "
                    "allocating table share for %s failed", key);
  }
2618 2619 2620 2621 2622
  else
  {
    DBUG_PRINT("NDB_SHARE", ("%s create  use_count: %u",
                             share->key, share->use_count));
  }
2623

2624
  if (!ndb_schema_share &&
2625 2626 2627
      strcmp(share->db, NDB_REP_DB) == 0 &&
      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
    do_event_op= 1;
2628 2629 2630 2631
  else if (!ndb_apply_status_share &&
           strcmp(share->db, NDB_REP_DB) == 0 &&
           strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
    do_event_op= 1;
2632 2633

  if (!do_event_op)
2634 2635 2636 2637 2638
  {
    share->flags|= NSF_NO_BINLOG;
    pthread_mutex_unlock(&ndbcluster_mutex);
    DBUG_RETURN(0);
  }
unknown's avatar
unknown committed
2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653
  pthread_mutex_unlock(&ndbcluster_mutex);

  while (share && !IS_TMP_PREFIX(table_name))
  {
    /*
      ToDo make sanity check of share so that the table is actually the same
      I.e. we need to do open file from frm in this case
      Currently awaiting this to be fixed in the 4.1 tree in the general
      case
    */

    /* Create the event in NDB */
    ndb->setDatabaseName(db);

    NDBDICT *dict= ndb->getDictionary();
2654 2655
    Ndb_table_guard ndbtab_g(dict, table_name);
    const NDBTAB *ndbtab= ndbtab_g.get_table();
unknown's avatar
unknown committed
2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669
    if (ndbtab == 0)
    {
      if (ndb_extra_logging)
        sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
                              "%s, %d", key, dict->getNdbError().message,
                              dict->getNdbError().code);
      break; // error
    }
    String event_name(INJECTOR_EVENT_LEN);
    ndb_rep_event_name(&event_name, db, table_name);
    /*
      event should have been created by someone else,
      but let's make sure, and create if it doesn't exist
    */
2670 2671
    const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
    if (!ev)
unknown's avatar
unknown committed
2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685
    {
      if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share))
      {
        sql_print_error("NDB Binlog: "
                        "FAILED CREATE (DISCOVER) TABLE Event: %s",
                        event_name.c_ptr());
        break; // error
      }
      if (ndb_extra_logging)
        sql_print_information("NDB Binlog: "
                              "CREATE (DISCOVER) TABLE Event: %s",
                              event_name.c_ptr());
    }
    else
2686 2687
    {
      delete ev;
unknown's avatar
unknown committed
2688 2689 2690
      if (ndb_extra_logging)
        sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
                              event_name.c_ptr());
2691
    }
unknown's avatar
unknown committed
2692 2693 2694 2695

    /*
      create the event operations for receiving logging events
    */
unknown's avatar
unknown committed
2696
    if (ndbcluster_create_event_ops(share, ndbtab, event_name.c_ptr()))
unknown's avatar
unknown committed
2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710
    {
      sql_print_error("NDB Binlog:"
                      "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
                      event_name.c_ptr());
      /* a warning has been issued to the client */
      DBUG_RETURN(0);
    }
    DBUG_RETURN(0);
  }
  DBUG_RETURN(-1);
}

int
ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
unknown's avatar
unknown committed
2711 2712
                        const char *event_name, NDB_SHARE *share,
                        int push_warning)
unknown's avatar
unknown committed
2713
{
2714
  THD *thd= current_thd;
unknown's avatar
unknown committed
2715
  DBUG_ENTER("ndbcluster_create_event");
2716 2717 2718 2719
  DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
                      ndbtab->getName(), ndbtab->getObjectVersion(),
                      event_name, share ? share->key : "(nil)"));
  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
unknown's avatar
unknown committed
2720
  if (!share)
2721 2722
  {
    DBUG_PRINT("info", ("share == NULL"));
unknown's avatar
unknown committed
2723
    DBUG_RETURN(0);
2724 2725 2726
  }
  if (share->flags & NSF_NO_BINLOG)
  {
2727 2728
    DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
                        share->flags, share->flags & NSF_NO_BINLOG));
2729 2730 2731
    DBUG_RETURN(0);
  }

unknown's avatar
unknown committed
2732 2733 2734 2735 2736 2737
  NDBDICT *dict= ndb->getDictionary();
  NDBEVENT my_event(event_name);
  my_event.setTable(*ndbtab);
  my_event.addTableEvent(NDBEVENT::TE_ALL);
  if (share->flags & NSF_HIDDEN_PK)
  {
2738 2739 2740
    if (share->flags & NSF_BLOB_FLAG)
    {
      sql_print_error("NDB Binlog: logging of table %s "
unknown's avatar
unknown committed
2741
                      "with BLOB attribute and no PK is not supported",
2742
                      share->key);
unknown's avatar
unknown committed
2743
      if (push_warning)
2744
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
2745 2746
                            ER_ILLEGAL_HA_CREATE_OPTION,
                            ER(ER_ILLEGAL_HA_CREATE_OPTION),
unknown's avatar
unknown committed
2747
                            ndbcluster_hton_name,
unknown's avatar
unknown committed
2748
                            "Binlog of table with BLOB attribute and no PK");
unknown's avatar
unknown committed
2749

2750
      share->flags|= NSF_NO_BINLOG;
2751 2752 2753
      DBUG_RETURN(-1);
    }
    /* No primary key, subscribe for all attributes */
unknown's avatar
unknown committed
2754 2755 2756 2757 2758
    my_event.setReport(NDBEVENT::ER_ALL);
    DBUG_PRINT("info", ("subscription all"));
  }
  else
  {
2759
    if (ndb_schema_share || strcmp(share->db, NDB_REP_DB) ||
unknown's avatar
unknown committed
2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771
        strcmp(share->table_name, NDB_SCHEMA_TABLE))
    {
      my_event.setReport(NDBEVENT::ER_UPDATED);
      DBUG_PRINT("info", ("subscription only updated"));
    }
    else
    {
      my_event.setReport((NDBEVENT::EventReport)
                         (NDBEVENT::ER_ALL | NDBEVENT::ER_SUBSCRIBE));
      DBUG_PRINT("info", ("subscription all and subscribe"));
    }
  }
2772
  if (share->flags & NSF_BLOB_FLAG)
2773
    my_event.mergeEvents(TRUE);
unknown's avatar
unknown committed
2774 2775 2776 2777 2778 2779 2780 2781 2782 2783

  /* add all columns to the event */
  int n_cols= ndbtab->getNoOfColumns();
  for(int a= 0; a < n_cols; a++)
    my_event.addEventColumn(a);

  if (dict->createEvent(my_event)) // Add event to database
  {
    if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
    {
unknown's avatar
unknown committed
2784 2785 2786
      /*
        failed, print a warning
      */
2787
      if (push_warning > 1)
2788
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
unknown's avatar
unknown committed
2789 2790 2791
                            ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                            dict->getNdbError().code,
                            dict->getNdbError().message, "NDB");
unknown's avatar
unknown committed
2792 2793 2794 2795 2796 2797
      sql_print_error("NDB Binlog: Unable to create event in database. "
                      "Event: %s  Error Code: %d  Message: %s", event_name,
                      dict->getNdbError().code, dict->getNdbError().message);
      DBUG_RETURN(-1);
    }

2798 2799 2800 2801
    /*
      try retrieving the event, if table version/id matches, we will get
      a valid event.  Otherwise we have a trailing event from before
    */
2802 2803
    const NDBEVENT *ev;
    if ((ev= dict->getEvent(event_name)))
2804
    {
2805
      delete ev;
2806 2807 2808
      DBUG_RETURN(0);
    }

unknown's avatar
unknown committed
2809 2810 2811
    /*
      trailing event from before; an error, but try to correct it
    */
2812 2813
    if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT &&
        dict->dropEvent(my_event.getName()))
unknown's avatar
unknown committed
2814
    {
2815
      if (push_warning > 1)
2816
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
unknown's avatar
unknown committed
2817 2818 2819
                            ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                            dict->getNdbError().code,
                            dict->getNdbError().message, "NDB");
unknown's avatar
unknown committed
2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833
      sql_print_error("NDB Binlog: Unable to create event in database. "
                      " Attempt to correct with drop failed. "
                      "Event: %s Error Code: %d Message: %s",
                      event_name,
                      dict->getNdbError().code,
                      dict->getNdbError().message);
      DBUG_RETURN(-1);
    }

    /*
      try to add the event again
    */
    if (dict->createEvent(my_event))
    {
2834
      if (push_warning > 1)
2835
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
unknown's avatar
unknown committed
2836 2837 2838
                            ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                            dict->getNdbError().code,
                            dict->getNdbError().message, "NDB");
unknown's avatar
unknown committed
2839 2840 2841 2842 2843 2844 2845 2846 2847
      sql_print_error("NDB Binlog: Unable to create event in database. "
                      " Attempt to correct with drop ok, but create failed. "
                      "Event: %s Error Code: %d Message: %s",
                      event_name,
                      dict->getNdbError().code,
                      dict->getNdbError().message);
      DBUG_RETURN(-1);
    }
#ifdef NDB_BINLOG_EXTRA_WARNINGS
2848
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
unknown's avatar
unknown committed
2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876
                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                        0, "NDB Binlog: Removed trailing event",
                        "NDB");
#endif
  }

  DBUG_RETURN(0);
}

inline int is_ndb_compatible_type(Field *field)
{
  return
    !(field->flags & BLOB_FLAG) &&
    field->type() != MYSQL_TYPE_BIT &&
    field->pack_length() != 0;
}

/*
  - create eventOperations for receiving log events
  - setup ndb recattrs for reception of log event data
  - "start" the event operation

  used at create/discover of tables
*/
int
ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
                            const char *event_name)
{
2877
  THD *thd= current_thd;
unknown's avatar
unknown committed
2878 2879 2880 2881 2882 2883
  /*
    we are in either create table or rename table so table should be
    locked, hence we can work with the share without locks
  */

  DBUG_ENTER("ndbcluster_create_event_ops");
2884
  DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
2885
  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
unknown's avatar
unknown committed
2886 2887 2888

  DBUG_ASSERT(share != 0);

2889 2890
  if (share->flags & NSF_NO_BINLOG)
  {
2891 2892
    DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
                        share->flags));
2893 2894 2895
    DBUG_RETURN(0);
  }

2896 2897
  int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
  if (!ndb_schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
2898
      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
2899 2900
    do_ndb_schema_share= 1;
  else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
2901
           strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
2902
    do_ndb_apply_status_share= 1;
2903
  else if (!binlog_filter->db_ok(share->db) || !ndb_binlog_running)
2904 2905 2906 2907 2908
  {
    share->flags|= NSF_NO_BINLOG;
    DBUG_RETURN(0);
  }

unknown's avatar
unknown committed
2909 2910 2911 2912 2913 2914
  if (share->op)
  {
    assert(share->op->getCustomData() == (void *) share);

    DBUG_ASSERT(share->use_count > 1);
    sql_print_error("NDB Binlog: discover reusing old ev op");
2915 2916 2917
    /* ndb_share reference ToDo free */
    DBUG_PRINT("NDB_SHARE", ("%s ToDo free  use_count: %u",
                             share->key, share->use_count));
unknown's avatar
unknown committed
2918 2919 2920 2921 2922 2923 2924
    free_share(&share); // old event op already has reference
    DBUG_RETURN(0);
  }

  TABLE *table= share->table;

  int retries= 100;
2925 2926 2927 2928 2929
  /*
    100 milliseconds, temporary error on schema operation can
    take some time to be resolved
  */
  int retry_sleep= 100;
unknown's avatar
unknown committed
2930 2931 2932 2933
  while (1)
  {
    pthread_mutex_lock(&injector_mutex);
    Ndb *ndb= injector_ndb;
2934
    if (do_ndb_schema_share)
unknown's avatar
unknown committed
2935 2936 2937 2938 2939 2940 2941 2942
      ndb= schema_ndb;

    if (ndb == 0)
    {
      pthread_mutex_unlock(&injector_mutex);
      DBUG_RETURN(-1);
    }

2943
    NdbEventOperation* op;
2944
    if (do_ndb_schema_share)
2945 2946 2947 2948 2949 2950 2951 2952 2953 2954
      op= ndb->createEventOperation(event_name);
    else
    {
      // set injector_ndb database/schema from table internal name
      int ret= ndb->setDatabaseAndSchemaName(ndbtab);
      assert(ret == 0);
      op= ndb->createEventOperation(event_name);
      // reset to catch errors
      ndb->setDatabaseName("");
    }
unknown's avatar
unknown committed
2955 2956 2957 2958
    if (!op)
    {
      sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
                      " %s",event_name);
2959
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
unknown's avatar
unknown committed
2960 2961 2962 2963
                          ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                          ndb->getNdbError().code,
                          ndb->getNdbError().message,
                          "NDB");
unknown's avatar
ndb:  
unknown committed
2964
      pthread_mutex_unlock(&injector_mutex);
unknown's avatar
unknown committed
2965 2966 2967
      DBUG_RETURN(-1);
    }

2968
    if (share->flags & NSF_BLOB_FLAG)
2969
      op->mergeEvents(TRUE); // currently not inherited from event
2970

unknown's avatar
unknown committed
2971 2972 2973
    DBUG_PRINT("info", ("share->ndb_value[0]: 0x%lx  share->ndb_value[1]: 0x%lx",
                        (long) share->ndb_value[0],
                        (long) share->ndb_value[1]));
unknown's avatar
unknown committed
2974
    int n_columns= ndbtab->getNoOfColumns();
2975
    int n_fields= table ? table->s->fields : 0; // XXX ???
unknown's avatar
unknown committed
2976 2977 2978
    for (int j= 0; j < n_columns; j++)
    {
      const char *col_name= ndbtab->getColumn(j)->getName();
2979
      NdbValue attr0, attr1;
unknown's avatar
unknown committed
2980 2981 2982 2983 2984 2985
      if (j < n_fields)
      {
        Field *f= share->table->field[j];
        if (is_ndb_compatible_type(f))
        {
          DBUG_PRINT("info", ("%s compatible", col_name));
2986
          attr0.rec= op->getValue(col_name, (char*) f->ptr);
2987
          attr1.rec= op->getPreValue(col_name,
2988 2989
                                     (f->ptr - share->table->record[0]) +
                                     (char*) share->table->record[1]);
unknown's avatar
unknown committed
2990
        }
2991
        else if (! (f->flags & BLOB_FLAG))
unknown's avatar
unknown committed
2992 2993
        {
          DBUG_PRINT("info", ("%s non compatible", col_name));
2994 2995 2996 2997 2998 2999
          attr0.rec= op->getValue(col_name);
          attr1.rec= op->getPreValue(col_name);
        }
        else
        {
          DBUG_PRINT("info", ("%s blob", col_name));
3000
          DBUG_ASSERT(share->flags & NSF_BLOB_FLAG);
3001 3002
          attr0.blob= op->getBlobHandle(col_name);
          attr1.blob= op->getPreBlobHandle(col_name);
3003 3004 3005 3006 3007
          if (attr0.blob == NULL || attr1.blob == NULL)
          {
            sql_print_error("NDB Binlog: Creating NdbEventOperation"
                            " blob field %u handles failed (code=%d) for %s",
                            j, op->getNdbError().code, event_name);
3008
            push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
3009 3010 3011 3012 3013 3014 3015 3016
                                ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                                op->getNdbError().code,
                                op->getNdbError().message,
                                "NDB");
            ndb->dropEventOperation(op);
            pthread_mutex_unlock(&injector_mutex);
            DBUG_RETURN(-1);
          }
unknown's avatar
unknown committed
3017 3018 3019 3020 3021
        }
      }
      else
      {
        DBUG_PRINT("info", ("%s hidden key", col_name));
3022 3023
        attr0.rec= op->getValue(col_name);
        attr1.rec= op->getPreValue(col_name);
unknown's avatar
unknown committed
3024
      }
3025 3026
      share->ndb_value[0][j].ptr= attr0.ptr;
      share->ndb_value[1][j].ptr= attr1.ptr;
unknown's avatar
unknown committed
3027 3028 3029 3030 3031 3032 3033 3034
      DBUG_PRINT("info", ("&share->ndb_value[0][%d]: 0x%lx  "
                          "share->ndb_value[0][%d]: 0x%lx",
                          j, (long) &share->ndb_value[0][j],
                          j, (long) attr0.ptr));
      DBUG_PRINT("info", ("&share->ndb_value[1][%d]: 0x%lx  "
                          "share->ndb_value[1][%d]: 0x%lx",
                          j, (long) &share->ndb_value[0][j],
                          j, (long) attr1.ptr));
unknown's avatar
unknown committed
3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046
    }
    op->setCustomData((void *) share); // set before execute
    share->op= op; // assign op in NDB_SHARE
    if (op->execute())
    {
      share->op= NULL;
      retries--;
      if (op->getNdbError().status != NdbError::TemporaryError &&
          op->getNdbError().code != 1407)
        retries= 0;
      if (retries == 0)
      {
3047
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
unknown's avatar
unknown committed
3048 3049 3050 3051 3052 3053 3054 3055 3056 3057
                            ER_GET_ERRMSG, ER(ER_GET_ERRMSG), 
                            op->getNdbError().code, op->getNdbError().message,
                            "NDB");
        sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
                        event_name,
                        op->getNdbError().code, op->getNdbError().message);
      }
      ndb->dropEventOperation(op);
      pthread_mutex_unlock(&injector_mutex);
      if (retries)
3058 3059
      {
        my_sleep(retry_sleep);
unknown's avatar
unknown committed
3060
        continue;
3061
      }
unknown's avatar
unknown committed
3062 3063 3064 3065 3066 3067
      DBUG_RETURN(-1);
    }
    pthread_mutex_unlock(&injector_mutex);
    break;
  }

3068
  /* ndb_share reference binlog */
unknown's avatar
unknown committed
3069
  get_share(share);
3070 3071
  DBUG_PRINT("NDB_SHARE", ("%s binlog  use_count: %u",
                           share->key, share->use_count));
3072
  if (do_ndb_apply_status_share)
3073
  {
3074
    /* ndb_share reference binlog extra */
3075
    ndb_apply_status_share= get_share(share);
3076 3077
    DBUG_PRINT("NDB_SHARE", ("%s binlog extra  use_count: %u",
                             share->key, share->use_count));
3078 3079
    (void) pthread_cond_signal(&injector_cond);
  }
3080
  else if (do_ndb_schema_share)
3081
  {
3082
    /* ndb_share reference binlog extra */
3083
    ndb_schema_share= get_share(share);
3084 3085
    DBUG_PRINT("NDB_SHARE", ("%s binlog extra  use_count: %u",
                             share->key, share->use_count));
3086 3087
    (void) pthread_cond_signal(&injector_cond);
  }
unknown's avatar
unknown committed
3088

unknown's avatar
unknown committed
3089 3090
  DBUG_PRINT("info",("%s share->op: 0x%lx  share->use_count: %u",
                     share->key, (long) share->op, share->use_count));
unknown's avatar
unknown committed
3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103

  if (ndb_extra_logging)
    sql_print_information("NDB Binlog: logging %s", share->key);
  DBUG_RETURN(0);
}

/*
  when entering the calling thread should have a share lock id share != 0
  then the injector thread will have  one as well, i.e. share->use_count == 0
  (unless it has already dropped... then share->op == 0)
*/
int
ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
3104
                             NDB_SHARE *share, const char *type_str)
unknown's avatar
unknown committed
3105 3106
{
  DBUG_ENTER("ndbcluster_handle_drop_table");
3107
  THD *thd= current_thd;
unknown's avatar
unknown committed
3108 3109 3110 3111 3112 3113 3114

  NDBDICT *dict= ndb->getDictionary();
  if (event_name && dict->dropEvent(event_name))
  {
    if (dict->getNdbError().code != 4710)
    {
      /* drop event failed for some reason, issue a warning */
3115
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
unknown's avatar
unknown committed
3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127
                          ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                          dict->getNdbError().code,
                          dict->getNdbError().message, "NDB");
      /* error is not that the event did not exist */
      sql_print_error("NDB Binlog: Unable to drop event in database. "
                      "Event: %s Error Code: %d Message: %s",
                      event_name,
                      dict->getNdbError().code,
                      dict->getNdbError().message);
      /* ToDo; handle error? */
      if (share && share->op &&
          share->op->getState() == NdbEventOperation::EO_EXECUTING &&
3128
          dict->getNdbError().mysql_code != HA_ERR_NO_CONNECTION)
unknown's avatar
unknown committed
3129
      {
3130
        DBUG_ASSERT(FALSE);
unknown's avatar
unknown committed
3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152
        DBUG_RETURN(-1);
      }
    }
  }

  if (share == 0 || share->op == 0)
  {
    DBUG_RETURN(0);
  }

/*
  Syncronized drop between client thread and injector thread is
  neccessary in order to maintain ordering in the binlog,
  such that the drop occurs _after_ any inserts/updates/deletes.

  The penalty for this is that the drop table becomes slow.

  This wait is however not strictly neccessary to produce a binlog
  that is usable.  However the slave does not currently handle
  these out of order, thus we are keeping the SYNC_DROP_ defined
  for now.
*/
3153
  const char *save_proc_info= thd->proc_info;
unknown's avatar
unknown committed
3154 3155
#define SYNC_DROP_
#ifdef SYNC_DROP_
3156
  thd->proc_info= "Syncing ndb table schema operation and binlog";
unknown's avatar
unknown committed
3157
  (void) pthread_mutex_lock(&share->mutex);
3158 3159 3160
  safe_mutex_assert_owner(&LOCK_open);
  (void) pthread_mutex_unlock(&LOCK_open);
  int max_timeout= opt_ndb_sync_timeout;
unknown's avatar
unknown committed
3161 3162 3163 3164
  while (share->op)
  {
    struct timespec abstime;
    set_timespec(abstime, 1);
unknown's avatar
ndb:  
unknown committed
3165 3166 3167
    int ret= pthread_cond_timedwait(&injector_cond,
                                    &share->mutex,
                                    &abstime);
3168 3169
    if (thd->killed ||
        share->op == 0)
unknown's avatar
unknown committed
3170
      break;
unknown's avatar
ndb:  
unknown committed
3171
    if (ret)
unknown's avatar
unknown committed
3172
    {
unknown's avatar
ndb:  
unknown committed
3173 3174 3175
      max_timeout--;
      if (max_timeout == 0)
      {
3176 3177
        sql_print_error("NDB %s: %s timed out. Ignoring...",
                        type_str, share->key);
unknown's avatar
ndb:  
unknown committed
3178 3179 3180 3181 3182
        break;
      }
      if (ndb_extra_logging)
        ndb_report_waiting(type_str, max_timeout,
                           type_str, share->key);
unknown's avatar
unknown committed
3183 3184
    }
  }
3185
  (void) pthread_mutex_lock(&LOCK_open);
unknown's avatar
unknown committed
3186 3187 3188 3189 3190 3191 3192
  (void) pthread_mutex_unlock(&share->mutex);
#else
  (void) pthread_mutex_lock(&share->mutex);
  share->op_old= share->op;
  share->op= 0;
  (void) pthread_mutex_unlock(&share->mutex);
#endif
3193
  thd->proc_info= save_proc_info;
unknown's avatar
unknown committed
3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207

  DBUG_RETURN(0);
}


/********************************************************************
  Internal helper functions for differentd events from the stoarage nodes
  used by the ndb injector thread
********************************************************************/

/*
  Handle error states on events from the storage nodes
*/
static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
3208
                                          ndb_binlog_index_row &row)
unknown's avatar
unknown committed
3209 3210 3211 3212 3213 3214 3215 3216
{
  NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
  DBUG_ENTER("ndb_binlog_thread_handle_error");

  int overrun= pOp->isOverrun();
  if (overrun)
  {
    /*
3217
      ToDo: this error should rather clear the ndb_binlog_index...
unknown's avatar
unknown committed
3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229
      and continue
    */
    sql_print_error("NDB Binlog: Overrun in event buffer, "
                    "this means we have dropped events. Cannot "
                    "continue binlog for %s", share->key);
    pOp->clearError();
    DBUG_RETURN(-1);
  }

  if (!pOp->isConsistent())
  {
    /*
3230
      ToDo: this error should rather clear the ndb_binlog_index...
unknown's avatar
unknown committed
3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247
      and continue
    */
    sql_print_error("NDB Binlog: Not Consistent. Cannot "
                    "continue binlog for %s. Error code: %d"
                    " Message: %s", share->key,
                    pOp->getNdbError().code,
                    pOp->getNdbError().message);
    pOp->clearError();
    DBUG_RETURN(-1);
  }
  sql_print_error("NDB Binlog: unhandled error %d for table %s",
                  pOp->hasError(), share->key);
  pOp->clearError();
  DBUG_RETURN(0);
}

static int
unknown's avatar
ndb:  
unknown committed
3248 3249
ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
                                        NdbEventOperation *pOp,
3250
                                        ndb_binlog_index_row &row)
unknown's avatar
unknown committed
3251 3252 3253 3254 3255 3256 3257
{
  NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
  NDBEVENT::TableEvent type= pOp->getEventType();

  switch (type)
  {
  case NDBEVENT::TE_CLUSTER_FAILURE:
3258
    if (ndb_extra_logging)
3259 3260
      sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
                            share->key, (unsigned) pOp->getGCI());
3261
    if (ndb_apply_status_share == share)
unknown's avatar
unknown committed
3262
    {
3263 3264 3265 3266
      if (ndb_extra_logging &&
          ndb_binlog_tables_inited && ndb_binlog_running)
        sql_print_information("NDB Binlog: ndb tables initially "
                              "read only on reconnect.");
3267 3268 3269
      /* ndb_share reference binlog extra free */
      DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
                               share->key, share->use_count));
3270 3271
      free_share(&ndb_apply_status_share);
      ndb_apply_status_share= 0;
unknown's avatar
unknown committed
3272
      ndb_binlog_tables_inited= 0;
unknown's avatar
unknown committed
3273
    }
unknown's avatar
unknown committed
3274 3275 3276 3277 3278
    DBUG_PRINT("error", ("CLUSTER FAILURE EVENT: "
                        "%s  received share: 0x%lx  op: 0x%lx  share op: 0x%lx  "
                        "op_old: 0x%lx",
                         share->key, (long) share, (long) pOp,
                         (long) share->op, (long) share->op_old));
unknown's avatar
unknown committed
3279 3280
    break;
  case NDBEVENT::TE_DROP:
3281
    if (ndb_apply_status_share == share)
unknown's avatar
unknown committed
3282
    {
3283 3284 3285 3286
      if (ndb_extra_logging &&
          ndb_binlog_tables_inited && ndb_binlog_running)
        sql_print_information("NDB Binlog: ndb tables initially "
                              "read only on reconnect.");
3287 3288 3289
      /* ndb_share reference binlog extra free */
      DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
                               share->key, share->use_count));
3290 3291
      free_share(&ndb_apply_status_share);
      ndb_apply_status_share= 0;
unknown's avatar
unknown committed
3292
      ndb_binlog_tables_inited= 0;
unknown's avatar
unknown committed
3293 3294 3295 3296
    }
    /* ToDo: remove printout */
    if (ndb_extra_logging)
      sql_print_information("NDB Binlog: drop table %s.", share->key);
3297 3298
    // fall through
  case NDBEVENT::TE_ALTER:
unknown's avatar
unknown committed
3299
    row.n_schemaops++;
unknown's avatar
unknown committed
3300 3301 3302 3303 3304
    DBUG_PRINT("info", ("TABLE %s  EVENT: %s  received share: 0x%lx  op: 0x%lx  "
                        "share op: 0x%lx  op_old: 0x%lx",
                        type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
                        share->key, (long) share, (long) pOp,
                        (long) share->op, (long) share->op_old));
unknown's avatar
unknown committed
3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318
    break;
  case NDBEVENT::TE_NODE_FAILURE:
    /* fall through */
  case NDBEVENT::TE_SUBSCRIBE:
    /* fall through */
  case NDBEVENT::TE_UNSUBSCRIBE:
    /* ignore */
    return 0;
  default:
    sql_print_error("NDB Binlog: unknown non data event %d for %s. "
                    "Ignoring...", (unsigned) type, share->key);
    return 0;
  }

unknown's avatar
ndb:  
unknown committed
3319
  ndb_handle_schema_change(thd, ndb, pOp, share);
unknown's avatar
unknown committed
3320 3321 3322 3323 3324 3325 3326 3327
  return 0;
}

/*
  Handle data events from the storage nodes
*/
static int
ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
3328
                                    ndb_binlog_index_row &row,
unknown's avatar
unknown committed
3329 3330 3331
                                    injector::transaction &trans)
{
  NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
3332
  if (share == ndb_apply_status_share)
unknown's avatar
unknown committed
3333 3334
    return 0;

3335
  uint32 originating_server_id= pOp->getAnyValue();
3336 3337
  if (originating_server_id == 0)
    originating_server_id= ::server_id;
3338 3339 3340 3341 3342 3343 3344 3345
  else if (originating_server_id & NDB_ANYVALUE_RESERVED)
  {
    if (originating_server_id != NDB_ANYVALUE_FOR_NOLOGGING)
      sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
                        "event not logged",
                        originating_server_id);
    return 0;
  }
3346 3347 3348 3349 3350 3351 3352 3353
  else if (!g_ndb_log_slave_updates)
  {
    /*
      This event comes from a slave applier since it has an originating
      server id set. Since option to log slave updates is not set, skip it.
    */
    return 0;
  }
3354 3355

  TABLE *table= share->table;
3356 3357
  DBUG_ASSERT(trans.good());
  DBUG_ASSERT(table != 0);
unknown's avatar
unknown committed
3358 3359 3360 3361 3362 3363 3364 3365 3366

  dbug_print_table("table", table);

  TABLE_SHARE *table_s= table->s;
  uint n_fields= table_s->fields;
  MY_BITMAP b;
  /* Potential buffer for the bitmap */
  uint32 bitbuf[128 / (sizeof(uint32) * 8)];
  bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL, 
3367
              n_fields, FALSE);
unknown's avatar
unknown committed
3368 3369 3370 3371 3372 3373 3374 3375
  bitmap_set_all(&b);

  /*
   row data is already in table->record[0]
   As we told the NdbEventOperation to do this
   (saves moving data about many times)
  */

3376 3377 3378 3379
  /*
    for now malloc/free blobs buffer each time
    TODO if possible share single permanent buffer with handlers
   */
3380
  uchar* blobs_buffer[2] = { 0, 0 };
3381 3382
  uint blobs_buffer_size[2] = { 0, 0 };

unknown's avatar
unknown committed
3383 3384 3385 3386
  switch(pOp->getEventType())
  {
  case NDBEVENT::TE_INSERT:
    row.n_inserts++;
3387 3388
    DBUG_PRINT("info", ("INSERT INTO %s.%s",
                        table_s->db.str, table_s->table_name.str));
unknown's avatar
unknown committed
3389
    {
3390 3391 3392
      if (share->flags & NSF_BLOB_FLAG)
      {
        my_ptrdiff_t ptrdiff= 0;
unknown's avatar
unknown committed
3393 3394 3395 3396
        IF_DBUG(int ret =) get_ndb_blobs_value(table, share->ndb_value[0],
                                               blobs_buffer[0],
                                               blobs_buffer_size[0],
                                               ptrdiff);
3397 3398
        DBUG_ASSERT(ret == 0);
      }
unknown's avatar
unknown committed
3399
      ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
3400
      IF_DBUG(int ret=) trans.write_row(originating_server_id,
unknown's avatar
unknown committed
3401 3402 3403
                                        injector::transaction::table(table,
                                                                     TRUE),
                                        &b, n_fields, table->record[0]);
unknown's avatar
unknown committed
3404
      DBUG_ASSERT(ret == 0);
unknown's avatar
unknown committed
3405 3406 3407 3408
    }
    break;
  case NDBEVENT::TE_DELETE:
    row.n_deletes++;
3409 3410
    DBUG_PRINT("info",("DELETE FROM %s.%s",
                       table_s->db.str, table_s->table_name.str));
unknown's avatar
unknown committed
3411 3412 3413 3414 3415 3416 3417 3418 3419 3420
    {
      /*
        table->record[0] contains only the primary key in this case
        since we do not have an after image
      */
      int n;
      if (table->s->primary_key != MAX_KEY)
        n= 0; /*
                use the primary key only as it save time and space and
                it is the only thing needed to log the delete
unknown's avatar
unknown committed
3421
              */
unknown's avatar
unknown committed
3422 3423 3424 3425 3426
      else
        n= 1; /*
                we use the before values since we don't have a primary key
                since the mysql server does not handle the hidden primary
                key
unknown's avatar
unknown committed
3427
              */
unknown's avatar
unknown committed
3428

3429 3430 3431
      if (share->flags & NSF_BLOB_FLAG)
      {
        my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
unknown's avatar
unknown committed
3432 3433 3434 3435
        IF_DBUG(int ret =) get_ndb_blobs_value(table, share->ndb_value[n],
                                               blobs_buffer[n],
                                               blobs_buffer_size[n],
                                               ptrdiff);
3436 3437
        DBUG_ASSERT(ret == 0);
      }
unknown's avatar
unknown committed
3438
      ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
unknown's avatar
unknown committed
3439
      DBUG_EXECUTE("info", print_records(table, table->record[n]););
3440
      IF_DBUG(int ret =) trans.delete_row(originating_server_id,
unknown's avatar
unknown committed
3441 3442 3443
                                          injector::transaction::table(table,
                                                                       TRUE),
                                          &b, n_fields, table->record[n]);
unknown's avatar
unknown committed
3444
      DBUG_ASSERT(ret == 0);
unknown's avatar
unknown committed
3445 3446 3447 3448
    }
    break;
  case NDBEVENT::TE_UPDATE:
    row.n_updates++;
3449 3450
    DBUG_PRINT("info", ("UPDATE %s.%s",
                        table_s->db.str, table_s->table_name.str));
unknown's avatar
unknown committed
3451
    {
3452 3453 3454
      if (share->flags & NSF_BLOB_FLAG)
      {
        my_ptrdiff_t ptrdiff= 0;
unknown's avatar
unknown committed
3455 3456 3457 3458
        IF_DBUG(int ret =) get_ndb_blobs_value(table, share->ndb_value[0],
                                               blobs_buffer[0],
                                               blobs_buffer_size[0],
                                               ptrdiff);
3459 3460
        DBUG_ASSERT(ret == 0);
      }
unknown's avatar
unknown committed
3461 3462
      ndb_unpack_record(table, share->ndb_value[0],
                        &b, table->record[0]);
unknown's avatar
unknown committed
3463
      DBUG_EXECUTE("info", print_records(table, table->record[0]););
unknown's avatar
unknown committed
3464 3465 3466
      if (table->s->primary_key != MAX_KEY) 
      {
        /*
3467
          since table has a primary key, we can do a write
unknown's avatar
unknown committed
3468
          using only after values
unknown's avatar
unknown committed
3469
        */
3470 3471
        trans.write_row(originating_server_id,
                        injector::transaction::table(table, TRUE),
unknown's avatar
unknown committed
3472 3473 3474 3475 3476 3477 3478
                        &b, n_fields, table->record[0]);// after values
      }
      else
      {
        /*
          mysql server cannot handle the ndb hidden key and
          therefore needs the before image as well
unknown's avatar
unknown committed
3479
        */
3480 3481 3482
        if (share->flags & NSF_BLOB_FLAG)
        {
          my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
unknown's avatar
unknown committed
3483 3484 3485 3486
          IF_DBUG(int ret =) get_ndb_blobs_value(table, share->ndb_value[1],
                                                 blobs_buffer[1],
                                                 blobs_buffer_size[1],
                                                 ptrdiff);
3487 3488
          DBUG_ASSERT(ret == 0);
        }
unknown's avatar
unknown committed
3489
        ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
unknown's avatar
unknown committed
3490
        DBUG_EXECUTE("info", print_records(table, table->record[1]););
3491
        IF_DBUG(int ret =) trans.update_row(originating_server_id,
unknown's avatar
unknown committed
3492 3493 3494 3495 3496
                                            injector::transaction::table(table,
                                                                         TRUE),
                                            &b, n_fields,
                                            table->record[1], // before values
                                            table->record[0]);// after values
unknown's avatar
unknown committed
3497
        DBUG_ASSERT(ret == 0);
unknown's avatar
unknown committed
3498 3499 3500 3501 3502 3503 3504 3505 3506
      }
    }
    break;
  default:
    /* We should REALLY never get here. */
    DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
    break;
  }

3507 3508 3509 3510 3511 3512
  if (share->flags & NSF_BLOB_FLAG)
  {
    my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
    my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
  }

unknown's avatar
unknown committed
3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539
  return 0;
}

//#define RUN_NDB_BINLOG_TIMER
#ifdef RUN_NDB_BINLOG_TIMER
class Timer
{
public:
  Timer() { start(); }
  void start() { gettimeofday(&m_start, 0); }
  void stop() { gettimeofday(&m_stop, 0); }
  ulong elapsed_ms()
  {
    return (ulong)
      (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
       ((longlong) m_stop.tv_usec -
        (longlong) m_start.tv_usec + 999) / 1000);
  }
private:
  struct timeval m_start,m_stop;
};
#endif

/****************************************************************
  Injector thread main loop
****************************************************************/

3540 3541 3542 3543
static uchar *
ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT *schema_object,
                           size_t *length,
                           my_bool not_used __attribute__((unused)))
3544 3545
{
  *length= schema_object->key_length;
3546
  return (uchar*) schema_object->key;
3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561
}

static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
                                                my_bool create_if_not_exists,
                                                my_bool have_lock)
{
  NDB_SCHEMA_OBJECT *ndb_schema_object;
  uint length= (uint) strlen(key);
  DBUG_ENTER("ndb_get_schema_object");
  DBUG_PRINT("enter", ("key: '%s'", key));

  if (!have_lock)
    pthread_mutex_lock(&ndbcluster_mutex);
  while (!(ndb_schema_object=
           (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects,
3562
                                            (uchar*) key,
3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579
                                            length)))
  {
    if (!create_if_not_exists)
    {
      DBUG_PRINT("info", ("does not exist"));
      break;
    }
    if (!(ndb_schema_object=
          (NDB_SCHEMA_OBJECT*) my_malloc(sizeof(*ndb_schema_object) + length + 1,
                                         MYF(MY_WME | MY_ZEROFILL))))
    {
      DBUG_PRINT("info", ("malloc error"));
      break;
    }
    ndb_schema_object->key= (char *)(ndb_schema_object+1);
    memcpy(ndb_schema_object->key, key, length + 1);
    ndb_schema_object->key_length= length;
3580
    if (my_hash_insert(&ndb_schema_objects, (uchar*) ndb_schema_object))
3581
    {
3582
      my_free((uchar*) ndb_schema_object, 0);
3583 3584 3585 3586
      break;
    }
    pthread_mutex_init(&ndb_schema_object->mutex, MY_MUTEX_INIT_FAST);
    bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock,
3587
                sizeof(ndb_schema_object->slock)*8, FALSE);
3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611
    bitmap_clear_all(&ndb_schema_object->slock_bitmap);
    break;
  }
  if (ndb_schema_object)
  {
    ndb_schema_object->use_count++;
    DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count));
  }
  if (!have_lock)
    pthread_mutex_unlock(&ndbcluster_mutex);
  DBUG_RETURN(ndb_schema_object);
}


static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
                                   bool have_lock)
{
  DBUG_ENTER("ndb_free_schema_object");
  DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key));
  if (!have_lock)
    pthread_mutex_lock(&ndbcluster_mutex);
  if (!--(*ndb_schema_object)->use_count)
  {
    DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
3612
    hash_delete(&ndb_schema_objects, (uchar*) *ndb_schema_object);
3613
    pthread_mutex_destroy(&(*ndb_schema_object)->mutex);
3614
    my_free((uchar*) *ndb_schema_object, MYF(0));
3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625
    *ndb_schema_object= 0;
  }
  else
  {
    DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
  }
  if (!have_lock)
    pthread_mutex_unlock(&ndbcluster_mutex);
  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
3626

unknown's avatar
unknown committed
3627 3628 3629
pthread_handler_t ndb_binlog_thread_func(void *arg)
{
  THD *thd; /* needs to be first for thread_stack */
unknown's avatar
ndb:  
unknown committed
3630 3631
  Ndb *i_ndb= 0;
  Ndb *s_ndb= 0;
unknown's avatar
unknown committed
3632
  Thd_ndb *thd_ndb=0;
3633
  int ndb_update_ndb_binlog_index= 1;
unknown's avatar
unknown committed
3634
  injector *inj= injector::instance();
3635
  uint incident_id= 0;
unknown's avatar
unknown committed
3636

3637 3638 3639
#ifdef RUN_NDB_BINLOG_TIMER
  Timer main_timer;
#endif
unknown's avatar
unknown committed
3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650

  pthread_mutex_lock(&injector_mutex);
  /*
    Set up the Thread
  */
  my_thread_init();
  DBUG_ENTER("ndb_binlog_thread");

  thd= new THD; /* note that contructor of THD uses DBUG_ */
  THD_CHECK_SENTRY(thd);

3651 3652 3653 3654 3655 3656 3657
  /* We need to set thd->thread_id before thd->store_globals, or it will
     set an invalid value for thd->variables.pseudo_thread_id.
  */
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id= thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

unknown's avatar
unknown committed
3658 3659 3660 3661 3662 3663 3664 3665
  thd->thread_stack= (char*) &thd; /* remember where our stack is */
  if (thd->store_globals())
  {
    thd->cleanup();
    delete thd;
    ndb_binlog_thread_running= -1;
    pthread_mutex_unlock(&injector_mutex);
    pthread_cond_signal(&injector_cond);
3666 3667

    DBUG_LEAVE;                               // Must match DBUG_ENTER()
unknown's avatar
unknown committed
3668 3669
    my_thread_end();
    pthread_exit(0);
3670
    return NULL;                              // Avoid compiler warnings
unknown's avatar
unknown committed
3671
  }
3672
  lex_start(thd);
unknown's avatar
unknown committed
3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695

  thd->init_for_queries();
  thd->command= COM_DAEMON;
  thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
  thd->version= refresh_version;
  thd->main_security_ctx.host_or_ip= "";
  thd->client_capabilities= 0;
  my_net_init(&thd->net, 0);
  thd->main_security_ctx.master_access= ~0;
  thd->main_security_ctx.priv_user= 0;

  /*
    Set up ndb binlog
  */
  sql_print_information("Starting MySQL Cluster Binlog Thread");

  pthread_detach_this_thread();
  thd->real_id= pthread_self();
  pthread_mutex_lock(&LOCK_thread_count);
  threads.append(thd);
  pthread_mutex_unlock(&LOCK_thread_count);
  thd->lex->start_transaction_opt= 0;

unknown's avatar
ndb:  
unknown committed
3696 3697
  if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
      s_ndb->init())
unknown's avatar
unknown committed
3698 3699
  {
    sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
3700 3701 3702
    ndb_binlog_thread_running= -1;
    pthread_mutex_unlock(&injector_mutex);
    pthread_cond_signal(&injector_cond);
unknown's avatar
unknown committed
3703 3704 3705
    goto err;
  }

3706
  // empty database
unknown's avatar
ndb:  
unknown committed
3707 3708
  if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
      i_ndb->init())
unknown's avatar
unknown committed
3709 3710 3711 3712 3713 3714 3715 3716
  {
    sql_print_error("NDB Binlog: Getting Ndb object failed");
    ndb_binlog_thread_running= -1;
    pthread_mutex_unlock(&injector_mutex);
    pthread_cond_signal(&injector_cond);
    goto err;
  }

3717 3718 3719 3720
  /* init hash for schema object distribution */
  (void) hash_init(&ndb_schema_objects, system_charset_info, 32, 0, 0,
                   (hash_get_key)ndb_schema_objects_get_key, 0, 0);

unknown's avatar
unknown committed
3721 3722 3723 3724 3725 3726 3727 3728
  /*
    Expose global reference to our ndb object.

    Used by both sql client thread and binlog thread to interact
    with the storage
    pthread_mutex_lock(&injector_mutex);
  */
  injector_thd= thd;
unknown's avatar
ndb:  
unknown committed
3729
  injector_ndb= i_ndb;
unknown's avatar
unknown committed
3730 3731
  p_latest_trans_gci= 
    injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci();
unknown's avatar
ndb:  
unknown committed
3732
  schema_ndb= s_ndb;
3733

3734 3735
  if (opt_bin_log)
  {
unknown's avatar
unknown committed
3736
    ndb_binlog_running= TRUE;
3737
  }
3738 3739 3740

  /* Thread start up completed  */
  ndb_binlog_thread_running= 1;
unknown's avatar
unknown committed
3741 3742 3743
  pthread_mutex_unlock(&injector_mutex);
  pthread_cond_signal(&injector_cond);

3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761
  /*
    wait for mysql server to start (so that the binlog is started
    and thus can receive the first GAP event)
  */
  pthread_mutex_lock(&LOCK_server_started);
  while (!mysqld_server_started)
  {
    struct timespec abstime;
    set_timespec(abstime, 1);
    pthread_cond_timedwait(&COND_server_started, &LOCK_server_started,
                           &abstime);
    if (ndbcluster_terminating)
    {
      pthread_mutex_unlock(&LOCK_server_started);
      goto err;
    }
  }
  pthread_mutex_unlock(&LOCK_server_started);
3762
restart:
unknown's avatar
unknown committed
3763 3764 3765
  /*
    Main NDB Injector loop
  */
3766
  while (ndb_binlog_running)
3767 3768
  {
    /*
3769 3770
      check if it is the first log, if so we do not insert a GAP event
      as there is really no log to have a GAP in
3771
    */
3772
    if (incident_id == 0)
3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785
    {
      LOG_INFO log_info;
      mysql_bin_log.get_current_log(&log_info);
      int len=  strlen(log_info.log_file_name);
      uint no= 0;
      if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) &&
          no == 1)
      {
        /* this is the fist log, so skip GAP event */
        break;
      }
    }

3786
    /*
3787 3788
      Always insert a GAP event as we cannot know what has happened
      in the cluster while not being connected.
3789
    */
3790 3791 3792 3793 3794 3795 3796 3797
    LEX_STRING const msg[2]=
      {
        { C_STRING_WITH_LEN("mysqld startup")    },
        { C_STRING_WITH_LEN("cluster disconnect")}
      };
    IF_DBUG(int error=)
      inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg[incident_id]);
    DBUG_ASSERT(!error);
3798
    break;
3799
  }
3800
  incident_id= 1;
unknown's avatar
unknown committed
3801
  {
3802 3803 3804
    thd->proc_info= "Waiting for ndbcluster to start";

    pthread_mutex_lock(&injector_mutex);
3805 3806
    while (!ndb_schema_share ||
           (ndb_binlog_running && !ndb_apply_status_share))
3807 3808 3809 3810 3811
    {
      /* ndb not connected yet */
      struct timespec abstime;
      set_timespec(abstime, 1);
      pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
3812
      if (ndbcluster_binlog_terminating)
3813 3814 3815 3816 3817 3818
      {
        pthread_mutex_unlock(&injector_mutex);
        goto err;
      }
    }
    pthread_mutex_unlock(&injector_mutex);
3819 3820 3821

    if (thd_ndb == NULL)
    {
3822
      DBUG_ASSERT(ndbcluster_hton->slot != ~(uint)0);
3823 3824 3825 3826 3827 3828 3829 3830 3831
      if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
      {
        sql_print_error("Could not allocate Thd_ndb object");
        goto err;
      }
      set_thd_ndb(thd, thd_ndb);
      thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
      thd->query_id= 0; // to keep valgrind quiet
    }
unknown's avatar
unknown committed
3832 3833 3834
  }

  {
3835 3836
    // wait for the first event
    thd->proc_info= "Waiting for first event from ndbcluster";
3837 3838 3839
    int schema_res, res;
    Uint64 schema_gci;
    do
3840
    {
3841 3842 3843
      DBUG_PRINT("info", ("Waiting for the first event"));

      if (ndbcluster_binlog_terminating)
3844
        goto err;
3845

3846
      schema_res= s_ndb->pollEvents(100, &schema_gci);
3847
    } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
3848
    if (ndb_binlog_running)
3849
    {
3850 3851
      Uint64 gci= i_ndb->getLatestGCI();
      while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
3852
      {
3853
        if (ndbcluster_binlog_terminating)
3854 3855
          goto err;
        res= i_ndb->pollEvents(10, &gci);
3856 3857 3858 3859 3860
      }
      if (gci > schema_gci)
      {
        schema_gci= gci;
      }
3861 3862
    }
    // now check that we have epochs consistant with what we had before the restart
unknown's avatar
unknown committed
3863 3864
    DBUG_PRINT("info", ("schema_res: %d  schema_gci: %lu", schema_res,
                        (long) schema_gci));
3865
    {
3866 3867
      i_ndb->flushIncompleteEvents(schema_gci);
      s_ndb->flushIncompleteEvents(schema_gci);
3868 3869 3870 3871 3872 3873
      if (schema_gci < ndb_latest_handled_binlog_epoch)
      {
        sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
                        "ndb_latest_handled_binlog_epoch: %u, while current epoch: %u. "
                        "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
                        (unsigned) ndb_latest_handled_binlog_epoch, (unsigned) schema_gci);
unknown's avatar
unknown committed
3874
        *p_latest_trans_gci= 0;
3875 3876 3877 3878
        ndb_latest_handled_binlog_epoch= 0;
        ndb_latest_applied_binlog_epoch= 0;
        ndb_latest_received_binlog_epoch= 0;
      }
3879 3880 3881 3882 3883 3884
      else if (ndb_latest_applied_binlog_epoch > 0)
      {
        sql_print_warning("NDB Binlog: cluster has reconnected. "
                          "Changes to the database that occured while "
                          "disconnected will not be in the binlog");
      }
3885 3886 3887 3888 3889
      if (ndb_extra_logging)
      {
        sql_print_information("NDB Binlog: starting log at epoch %u",
                              (unsigned)schema_gci);
      }
3890 3891
    }
  }
3892 3893 3894 3895
  {
    static char db[]= "";
    thd->db= db;
    if (ndb_binlog_running)
3896
      open_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index);
3897 3898
    thd->db= db;
  }
3899
  do_ndbcluster_binlog_close_connection= BCCC_running;
3900 3901
  for ( ; !((ndbcluster_binlog_terminating ||
             do_ndbcluster_binlog_close_connection) &&
unknown's avatar
unknown committed
3902
            ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) &&
3903 3904 3905 3906 3907 3908
          do_ndbcluster_binlog_close_connection != BCCC_restart; )
  {
#ifndef DBUG_OFF
    if (do_ndbcluster_binlog_close_connection)
    {
      DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
unknown's avatar
unknown committed
3909 3910 3911 3912 3913
                          "ndb_latest_handled_binlog_epoch: %lu, "
                          "*p_latest_trans_gci: %lu",
                          do_ndbcluster_binlog_close_connection,
                          (ulong) ndb_latest_handled_binlog_epoch,
                          (ulong) *p_latest_trans_gci));
3914 3915
    }
#endif
unknown's avatar
unknown committed
3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928
#ifdef RUN_NDB_BINLOG_TIMER
    main_timer.stop();
    sql_print_information("main_timer %ld ms",  main_timer.elapsed_ms());
    main_timer.start();
#endif

    /*
      now we don't want any events before next gci is complete
    */
    thd->proc_info= "Waiting for event from ndbcluster";
    thd->set_time();
    
    /* wait for event or 1000 ms */
3929 3930 3931 3932
    Uint64 gci= 0, schema_gci;
    int res= 0, tot_poll_wait= 1000;
    if (ndb_binlog_running)
    {
unknown's avatar
ndb:  
unknown committed
3933
      res= i_ndb->pollEvents(tot_poll_wait, &gci);
3934 3935
      tot_poll_wait= 0;
    }
3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946
    else
    {
      /*
        Just consume any events, not used if no binlogging
        e.g. node failure events
      */
      Uint64 tmp_gci;
      if (i_ndb->pollEvents(0, &tmp_gci))
        while (i_ndb->nextEvent())
          ;
    }
unknown's avatar
ndb:  
unknown committed
3947
    int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
unknown's avatar
unknown committed
3948 3949 3950
    ndb_latest_received_binlog_epoch= gci;

    while (gci > schema_gci && schema_res >= 0)
3951 3952 3953 3954 3955
    {
      static char buf[64];
      thd->proc_info= "Waiting for schema epoch";
      my_snprintf(buf, sizeof(buf), "%s %u(%u)", thd->proc_info, (unsigned) schema_gci, (unsigned) gci);
      thd->proc_info= buf;
unknown's avatar
ndb:  
unknown committed
3956
      schema_res= s_ndb->pollEvents(10, &schema_gci);
3957
    }
unknown's avatar
unknown committed
3958

3959 3960
    if ((ndbcluster_binlog_terminating ||
         do_ndbcluster_binlog_close_connection) &&
unknown's avatar
unknown committed
3961
        (ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci ||
3962
         !ndb_binlog_running))
unknown's avatar
unknown committed
3963 3964
      break; /* Shutting down server */

3965
    if (ndb_binlog_index && ndb_binlog_index->s->version < refresh_version)
unknown's avatar
unknown committed
3966
    {
3967
      if (ndb_binlog_index->s->version < refresh_version)
unknown's avatar
unknown committed
3968 3969
      {
        close_thread_tables(thd);
3970
        ndb_binlog_index= 0;
unknown's avatar
unknown committed
3971 3972 3973 3974 3975 3976 3977 3978
      }
    }

    MEM_ROOT **root_ptr=
      my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
    MEM_ROOT *old_root= *root_ptr;
    MEM_ROOT mem_root;
    init_sql_alloc(&mem_root, 4096, 0);
3979 3980
    List<Cluster_schema> post_epoch_log_list;
    List<Cluster_schema> post_epoch_unlock_list;
unknown's avatar
unknown committed
3981 3982 3983 3984
    *root_ptr= &mem_root;

    if (unlikely(schema_res > 0))
    {
3985
      thd->proc_info= "Processing events from schema table";
unknown's avatar
ndb:  
unknown committed
3986
      s_ndb->
unknown's avatar
unknown committed
3987
        setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
unknown's avatar
ndb:  
unknown committed
3988
      s_ndb->
3989
        setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
unknown's avatar
ndb:  
unknown committed
3990
      NdbEventOperation *pOp= s_ndb->nextEvent();
unknown's avatar
unknown committed
3991 3992 3993
      while (pOp != NULL)
      {
        if (!pOp->hasError())
3994
        {
3995
          ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
3996 3997 3998
                                                &post_epoch_log_list,
                                                &post_epoch_unlock_list,
                                                &mem_root);
3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010
          DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
                              s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
                              "<empty>"));
          DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
                              i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
                              "<empty>"));
          if (i_ndb->getEventOperation() == NULL &&
              s_ndb->getEventOperation() == NULL &&
              do_ndbcluster_binlog_close_connection == BCCC_running)
          {
            DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
            do_ndbcluster_binlog_close_connection= BCCC_restart;
unknown's avatar
unknown committed
4011
            if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
4012
            {
unknown's avatar
unknown committed
4013 4014 4015 4016
              sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
                              "as latest received epoch is %lu",
                              (ulong) *p_latest_trans_gci,
                              (ulong) ndb_latest_received_binlog_epoch);
4017 4018 4019
            }
          }
        }
unknown's avatar
unknown committed
4020 4021 4022 4023 4024
        else
          sql_print_error("NDB: error %lu (%s) on handling "
                          "binlog schema event",
                          (ulong) pOp->getNdbError().code,
                          pOp->getNdbError().message);
unknown's avatar
ndb:  
unknown committed
4025
        pOp= s_ndb->nextEvent();
unknown's avatar
unknown committed
4026 4027 4028 4029 4030 4031 4032
      }
    }

    if (res > 0)
    {
      DBUG_PRINT("info", ("pollEvents res: %d", res));
      thd->proc_info= "Processing events";
unknown's avatar
ndb:  
unknown committed
4033
      NdbEventOperation *pOp= i_ndb->nextEvent();
4034
      ndb_binlog_index_row row;
unknown's avatar
unknown committed
4035 4036
      while (pOp != NULL)
      {
4037 4038 4039 4040 4041
#ifdef RUN_NDB_BINLOG_TIMER
        Timer gci_timer, write_timer;
        int event_count= 0;
        gci_timer.start();
#endif
4042 4043
        gci= pOp->getGCI();
        DBUG_PRINT("info", ("Handling gci: %d", (unsigned)gci));
4044 4045
        // sometimes get TE_ALTER with invalid table
        DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
4046
                    ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
4047 4048
        DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);

4049 4050
        /* initialize some variables for this epoch */
        g_ndb_log_slave_updates= opt_log_slave_updates;
unknown's avatar
ndb:  
unknown committed
4051
        i_ndb->
unknown's avatar
unknown committed
4052
          setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
unknown's avatar
ndb:  
unknown committed
4053
        i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
unknown's avatar
unknown committed
4054 4055

        bzero((char*) &row, sizeof(row));
4056
        thd->variables.character_set_client= &my_charset_latin1;
4057 4058 4059 4060 4061
        injector::transaction trans;
        // pass table map before epoch
        {
          Uint32 iter= 0;
          const NdbEventOperation *gci_op;
4062
          Uint32 event_types;
unknown's avatar
ndb:  
unknown committed
4063
          while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
4064
                 != NULL)
4065
          {
4066
            NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData();
unknown's avatar
unknown committed
4067 4068
            DBUG_PRINT("info", ("per gci_op: 0x%lx  share: 0x%lx  event_types: 0x%x",
                                (long) gci_op, (long) share, event_types));
4069 4070 4071 4072 4073 4074 4075 4076
            // workaround for interface returning TE_STOP events
            // which are normally filtered out below in the nextEvent loop
            if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0)
            {
              DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
                                  gci_op->getEvent()->getTable()->getName()));
              continue;
            }
4077 4078 4079
            // this should not happen
            if (share == NULL || share->table == NULL)
            {
4080 4081
              DBUG_PRINT("info", ("no share or table %s!",
                                  gci_op->getEvent()->getTable()->getName()));
4082 4083
              continue;
            }
4084
            if (share == ndb_apply_status_share)
4085
            {
4086 4087 4088 4089
              // skip this table, it is handled specially
              continue;
            }
            TABLE *table= share->table;
unknown's avatar
unknown committed
4090
#ifndef DBUG_OFF
4091
            const LEX_STRING &name= table->s->table_name;
unknown's avatar
unknown committed
4092
#endif
4093 4094 4095 4096 4097
            if ((event_types & (NdbDictionary::Event::TE_INSERT |
                                NdbDictionary::Event::TE_UPDATE |
                                NdbDictionary::Event::TE_DELETE)) == 0)
            {
              DBUG_PRINT("info", ("skipping non data event table: %.*s",
4098
                                  (int) name.length, name.str));
4099
              continue;
4100
            }
4101 4102 4103 4104 4105 4106
            if (!trans.good())
            {
              DBUG_PRINT("info",
                         ("Found new data event, initializing transaction"));
              inj->new_trans(thd, &trans);
            }
4107 4108
            DBUG_PRINT("info", ("use_table: %.*s",
                                (int) name.length, name.str));
4109
            injector::transaction::table tbl(table, TRUE);
unknown's avatar
unknown committed
4110
            IF_DBUG(int ret=) trans.use_table(::server_id, tbl);
unknown's avatar
unknown committed
4111
            DBUG_ASSERT(ret == 0);
4112 4113
          }
        }
4114
        if (trans.good())
unknown's avatar
unknown committed
4115
        {
4116
          if (ndb_apply_status_share)
4117
          {
4118
            TABLE *table= ndb_apply_status_share->table;
4119

unknown's avatar
unknown committed
4120 4121
#ifndef DBUG_OFF
            const LEX_STRING& name= table->s->table_name;
4122 4123
            DBUG_PRINT("info", ("use_table: %.*s",
                                (int) name.length, name.str));
unknown's avatar
unknown committed
4124
#endif
4125
            injector::transaction::table tbl(table, TRUE);
unknown's avatar
unknown committed
4126
            IF_DBUG(int ret=) trans.use_table(::server_id, tbl);
unknown's avatar
unknown committed
4127
            DBUG_ASSERT(ret == 0);
4128

unknown's avatar
unknown committed
4129 4130 4131 4132 4133
	    /* 
	       Intialize table->record[0] 
	    */
	    empty_record(table);

4134 4135
            table->field[0]->store((longlong)::server_id);
            table->field[1]->store((longlong)gci);
4136 4137 4138
            table->field[2]->store("", 0, &my_charset_bin);
            table->field[3]->store((longlong)0);
            table->field[4]->store((longlong)0);
4139
            trans.write_row(::server_id,
4140 4141
                            injector::transaction::table(table, TRUE),
                            &table->s->all_set, table->s->fields,
4142 4143 4144 4145 4146 4147
                            table->record[0]);
          }
          else
          {
            sql_print_error("NDB: Could not get apply status share");
          }
unknown's avatar
unknown committed
4148 4149 4150 4151 4152 4153 4154 4155 4156 4157
        }
#ifdef RUN_NDB_BINLOG_TIMER
        write_timer.start();
#endif
        do
        {
#ifdef RUN_NDB_BINLOG_TIMER
          event_count++;
#endif
          if (pOp->hasError() &&
unknown's avatar
ndb:  
unknown committed
4158
              ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0)
unknown's avatar
unknown committed
4159 4160 4161 4162 4163 4164
            goto err;

#ifndef DBUG_OFF
          {
            NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
            DBUG_PRINT("info",
unknown's avatar
unknown committed
4165 4166 4167 4168 4169 4170 4171
                       ("EVENT TYPE: %d  GCI: %ld  last applied: %ld  "
                        "share: 0x%lx (%s.%s)", pOp->getEventType(),
                        (long) gci,
                        (long) ndb_latest_applied_binlog_epoch,
                        (long) share,
                        share ? share->db :  "'NULL'",
                        share ? share->table_name : "'NULL'"));
unknown's avatar
unknown committed
4172 4173
            DBUG_ASSERT(share != 0);
          }
4174 4175 4176 4177 4178 4179
          // assert that there is consistancy between gci op list
          // and event list
          {
            Uint32 iter= 0;
            const NdbEventOperation *gci_op;
            Uint32 event_types;
unknown's avatar
ndb:  
unknown committed
4180
            while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
4181 4182 4183 4184 4185 4186 4187 4188
                   != NULL)
            {
              if (gci_op == pOp)
                break;
            }
            DBUG_ASSERT(gci_op == pOp);
            DBUG_ASSERT((event_types & pOp->getEventType()) != 0);
          }
unknown's avatar
unknown committed
4189 4190 4191
#endif
          if ((unsigned) pOp->getEventType() <
              (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
unknown's avatar
ndb:  
unknown committed
4192
            ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans);
unknown's avatar
unknown committed
4193
          else
4194 4195
          {
            // set injector_ndb database/schema from table internal name
unknown's avatar
unknown committed
4196
            IF_DBUG(int ret=)
unknown's avatar
ndb:  
unknown committed
4197
              i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
4198
            DBUG_ASSERT(ret == 0);
unknown's avatar
ndb:  
unknown committed
4199
            ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row);
4200
            // reset to catch errors
unknown's avatar
ndb:  
unknown committed
4201
            i_ndb->setDatabaseName("");
4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213
            DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
                                s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
                                "<empty>"));
            DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
                                i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
                                "<empty>"));
            if (i_ndb->getEventOperation() == NULL &&
                s_ndb->getEventOperation() == NULL &&
                do_ndbcluster_binlog_close_connection == BCCC_running)
            {
              DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
              do_ndbcluster_binlog_close_connection= BCCC_restart;
unknown's avatar
unknown committed
4214
              if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
4215
              {
unknown's avatar
unknown committed
4216 4217 4218 4219
                sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
                                "as latest received epoch is %lu",
                                (ulong) *p_latest_trans_gci,
                                (ulong) ndb_latest_received_binlog_epoch);
4220 4221
              }
            }
4222
          }
unknown's avatar
unknown committed
4223

unknown's avatar
ndb:  
unknown committed
4224
          pOp= i_ndb->nextEvent();
unknown's avatar
unknown committed
4225 4226 4227 4228 4229
        } while (pOp && pOp->getGCI() == gci);

        /*
          note! pOp is not referring to an event in the next epoch
          or is == 0
unknown's avatar
unknown committed
4230
        */
unknown's avatar
unknown committed
4231 4232 4233 4234
#ifdef RUN_NDB_BINLOG_TIMER
        write_timer.stop();
#endif

4235
        if (trans.good())
unknown's avatar
unknown committed
4236
        {
4237
          //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes);
4238
          thd->proc_info= "Committing events to binlog";
unknown's avatar
unknown committed
4239 4240 4241
          injector::transaction::binlog_pos start= trans.start_pos();
          if (int r= trans.commit())
          {
4242
            sql_print_error("NDB Binlog: "
unknown's avatar
unknown committed
4243 4244 4245 4246 4247 4248 4249 4250
                            "Error during COMMIT of GCI. Error: %d",
                            r);
            /* TODO: Further handling? */
          }
          row.gci= gci;
          row.master_log_file= start.file_name();
          row.master_log_pos= start.file_pos();

unknown's avatar
unknown committed
4251
          DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
4252 4253
          if (ndb_update_ndb_binlog_index)
            ndb_add_ndb_binlog_index(thd, &row);
4254
          ndb_latest_applied_binlog_epoch= gci;
unknown's avatar
unknown committed
4255 4256 4257 4258 4259 4260 4261 4262
        }
        ndb_latest_handled_binlog_epoch= gci;
#ifdef RUN_NDB_BINLOG_TIMER
        gci_timer.stop();
        sql_print_information("gci %ld event_count %d write time "
                              "%ld(%d e/s), total time %ld(%d e/s)",
                              (ulong)gci, event_count,
                              write_timer.elapsed_ms(),
4263
                              (1000*event_count) / write_timer.elapsed_ms(),
unknown's avatar
unknown committed
4264
                              gci_timer.elapsed_ms(),
4265
                              (1000*event_count) / gci_timer.elapsed_ms());
unknown's avatar
unknown committed
4266 4267 4268 4269
#endif
      }
    }

4270 4271 4272
    ndb_binlog_thread_handle_schema_event_post_epoch(thd,
                                                     &post_epoch_log_list,
                                                     &post_epoch_unlock_list);
unknown's avatar
unknown committed
4273 4274 4275 4276
    free_root(&mem_root, MYF(0));
    *root_ptr= old_root;
    ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
  }
4277
  if (do_ndbcluster_binlog_close_connection == BCCC_restart)
4278 4279 4280
  {
    ndb_binlog_tables_inited= FALSE;
    close_thread_tables(thd);
4281
    ndb_binlog_index= 0;
4282
    goto restart;
4283
  }
unknown's avatar
unknown committed
4284
err:
4285
  sql_print_information("Stopping Cluster Binlog");
unknown's avatar
unknown committed
4286
  DBUG_PRINT("info",("Shutting down cluster binlog thread"));
4287
  thd->proc_info= "Shutting down";
unknown's avatar
unknown committed
4288 4289 4290
  close_thread_tables(thd);
  pthread_mutex_lock(&injector_mutex);
  /* don't mess with the injector_ndb anymore from other threads */
unknown's avatar
ndb:  
unknown committed
4291
  injector_thd= 0;
unknown's avatar
unknown committed
4292
  injector_ndb= 0;
unknown's avatar
unknown committed
4293
  p_latest_trans_gci= 0;
unknown's avatar
ndb:  
unknown committed
4294
  schema_ndb= 0;
unknown's avatar
unknown committed
4295 4296 4297
  pthread_mutex_unlock(&injector_mutex);
  thd->db= 0; // as not to try to free memory

4298
  if (ndb_apply_status_share)
4299
  {
4300 4301 4302 4303
    /* ndb_share reference binlog extra free */
    DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
                             ndb_apply_status_share->key,
                             ndb_apply_status_share->use_count));
4304 4305
    free_share(&ndb_apply_status_share);
    ndb_apply_status_share= 0;
4306
  }
4307
  if (ndb_schema_share)
4308
  {
4309 4310
    /* begin protect ndb_schema_share */
    pthread_mutex_lock(&ndb_schema_share_mutex);
4311 4312 4313 4314
    /* ndb_share reference binlog extra free */
    DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
                             ndb_schema_share->key,
                             ndb_schema_share->use_count));
4315 4316
    free_share(&ndb_schema_share);
    ndb_schema_share= 0;
unknown's avatar
unknown committed
4317
    ndb_binlog_tables_inited= 0;
4318 4319
    pthread_mutex_unlock(&ndb_schema_share_mutex);
    /* end protect ndb_schema_share */
4320
  }
unknown's avatar
unknown committed
4321 4322

  /* remove all event operations */
unknown's avatar
ndb:  
unknown committed
4323
  if (s_ndb)
unknown's avatar
unknown committed
4324 4325 4326
  {
    NdbEventOperation *op;
    DBUG_PRINT("info",("removing all event operations"));
unknown's avatar
ndb:  
unknown committed
4327
    while ((op= s_ndb->getEventOperation()))
unknown's avatar
unknown committed
4328
    {
4329
      DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
unknown's avatar
unknown committed
4330 4331 4332
      DBUG_PRINT("info",("removing event operation on %s",
                         op->getEvent()->getName()));
      NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
4333
      DBUG_ASSERT(share != 0);
unknown's avatar
ndb:  
unknown committed
4334 4335 4336
      DBUG_ASSERT(share->op == op ||
                  share->op_old == op);
      share->op= share->op_old= 0;
4337 4338 4339
      /* ndb_share reference binlog free */
      DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
                               share->key, share->use_count));
unknown's avatar
unknown committed
4340
      free_share(&share);
unknown's avatar
ndb:  
unknown committed
4341 4342 4343 4344 4345 4346 4347 4348 4349 4350 4351 4352 4353 4354 4355
      s_ndb->dropEventOperation(op);
    }
    delete s_ndb;
    s_ndb= 0;
  }
  if (i_ndb)
  {
    NdbEventOperation *op;
    DBUG_PRINT("info",("removing all event operations"));
    while ((op= i_ndb->getEventOperation()))
    {
      DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
      DBUG_PRINT("info",("removing event operation on %s",
                         op->getEvent()->getName()));
      NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
4356
      DBUG_ASSERT(share != 0);
unknown's avatar
ndb:  
unknown committed
4357 4358 4359
      DBUG_ASSERT(share->op == op ||
                  share->op_old == op);
      share->op= share->op_old= 0;
4360 4361 4362
      /* ndb_share reference binlog free */
      DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
                               share->key, share->use_count));
unknown's avatar
ndb:  
unknown committed
4363 4364
      free_share(&share);
      i_ndb->dropEventOperation(op);
unknown's avatar
unknown committed
4365
    }
unknown's avatar
ndb:  
unknown committed
4366 4367
    delete i_ndb;
    i_ndb= 0;
unknown's avatar
unknown committed
4368 4369
  }

4370 4371
  hash_free(&ndb_schema_objects);

unknown's avatar
unknown committed
4372
  net_end(&thd->net);
4373
  thd->cleanup();
unknown's avatar
unknown committed
4374 4375 4376
  delete thd;

  ndb_binlog_thread_running= -1;
4377
  ndb_binlog_running= FALSE;
unknown's avatar
unknown committed
4378 4379 4380 4381
  (void) pthread_cond_signal(&injector_cond);

  DBUG_PRINT("exit", ("ndb_binlog_thread"));

4382 4383
  DBUG_LEAVE;                               // Must match DBUG_ENTER()
  my_thread_end();
unknown's avatar
unknown committed
4384
  pthread_exit(0);
4385
  return NULL;                              // Avoid compiler warnings
unknown's avatar
unknown committed
4386 4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399
}

bool
ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
                              enum ha_stat_type stat_type)
{
  char buf[IO_SIZE];
  uint buflen;
  ulonglong ndb_latest_epoch= 0;
  DBUG_ENTER("ndbcluster_show_status_binlog");
  
  pthread_mutex_lock(&injector_mutex);
  if (injector_ndb)
  {
unknown's avatar
unknown committed
4400
    char buff1[22],buff2[22],buff3[22],buff4[22],buff5[22];
unknown's avatar
unknown committed
4401 4402 4403 4404 4405
    ndb_latest_epoch= injector_ndb->getLatestGCI();
    pthread_mutex_unlock(&injector_mutex);

    buflen=
      snprintf(buf, sizeof(buf),
unknown's avatar
unknown committed
4406 4407 4408 4409 4410 4411
               "latest_epoch=%s, "
               "latest_trans_epoch=%s, "
               "latest_received_binlog_epoch=%s, "
               "latest_handled_binlog_epoch=%s, "
               "latest_applied_binlog_epoch=%s",
               llstr(ndb_latest_epoch, buff1),
unknown's avatar
unknown committed
4412
               llstr(*p_latest_trans_gci, buff2),
unknown's avatar
unknown committed
4413 4414 4415
               llstr(ndb_latest_received_binlog_epoch, buff3),
               llstr(ndb_latest_handled_binlog_epoch, buff4),
               llstr(ndb_latest_applied_binlog_epoch, buff5));
unknown's avatar
unknown committed
4416
    if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
unknown's avatar
unknown committed
4417 4418 4419 4420 4421 4422 4423 4424 4425 4426
                   "binlog", strlen("binlog"),
                   buf, buflen))
      DBUG_RETURN(TRUE);
  }
  else
    pthread_mutex_unlock(&injector_mutex);
  DBUG_RETURN(FALSE);
}

#endif /* HAVE_NDB_BINLOG */
unknown's avatar
unknown committed
4427
#endif