Commit 7dd76c48 authored by tomas@poseidon.ndb.mysql.com's avatar tomas@poseidon.ndb.mysql.com

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new-ndb

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb
parents a19fd0ba 4183ab6d
...@@ -113,6 +113,7 @@ typedef struct st_ndbcluster_share { ...@@ -113,6 +113,7 @@ typedef struct st_ndbcluster_share {
char *old_names; // for rename table char *old_names; // for rename table
TABLE_SHARE *table_share; TABLE_SHARE *table_share;
TABLE *table; TABLE *table;
byte *record[2]; // pointer to allocated records for receiving data
NdbValue *ndb_value[2]; NdbValue *ndb_value[2];
MY_BITMAP *subscriber_bitmap; MY_BITMAP *subscriber_bitmap;
#endif #endif
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "slave.h" #include "slave.h"
#include "ha_ndbcluster_binlog.h" #include "ha_ndbcluster_binlog.h"
#include "NdbDictionary.hpp" #include "NdbDictionary.hpp"
#include <util/NdbAutoPtr.hpp>
#ifdef ndb_dynamite #ifdef ndb_dynamite
#undef assert #undef assert
...@@ -265,7 +266,8 @@ ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share) ...@@ -265,7 +266,8 @@ ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
static int static int
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share, ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
TABLE_SHARE *table_share, TABLE *table) TABLE_SHARE *table_share, TABLE *table,
int reopen)
{ {
int error; int error;
DBUG_ENTER("ndbcluster_binlog_open_table"); DBUG_ENTER("ndbcluster_binlog_open_table");
...@@ -280,7 +282,7 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share, ...@@ -280,7 +282,7 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
free_table_share(table_share); free_table_share(table_share);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
if ((error= open_table_from_share(thd, table_share, "", 0, if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */,
(uint) READ_ALL, 0, table, FALSE))) (uint) READ_ALL, 0, table, FALSE)))
{ {
sql_print_error("Unable to open table for %s, error=%d(%d)", sql_print_error("Unable to open table for %s, error=%d(%d)",
...@@ -290,11 +292,22 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share, ...@@ -290,11 +292,22 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
DBUG_RETURN(error); DBUG_RETURN(error);
} }
assign_new_table_id(table_share); assign_new_table_id(table_share);
if (!table->record[1] || table->record[1] == table->record[0])
if (!reopen)
{
// allocate memory on ndb share so it can be reused after online alter table
share->record[0]= (byte*) alloc_root(&share->mem_root, table->s->rec_buff_length);
share->record[1]= (byte*) alloc_root(&share->mem_root, table->s->rec_buff_length);
}
{ {
table->record[1]= alloc_root(&table->mem_root, my_ptrdiff_t row_offset= share->record[0] - table->record[0];
table->s->rec_buff_length); Field **p_field;
for (p_field= table->field; *p_field; p_field++)
(*p_field)->move_field_offset(row_offset);
table->record[0]= share->record[0];
table->record[1]= share->record[1];
} }
table->in_use= injector_thd; table->in_use= injector_thd;
table->s->db.str= share->db; table->s->db.str= share->db;
...@@ -364,7 +377,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) ...@@ -364,7 +377,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
int error; int error;
TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share)); TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share));
TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table)); TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table));
if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table))) if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 0)))
break; break;
/* /*
! do not touch the contents of the table ! do not touch the contents of the table
...@@ -1530,6 +1543,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1530,6 +1543,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
sql_print_information("NDB: Failed write frm for %s.%s, error %d", sql_print_information("NDB: Failed write frm for %s.%s, error %d",
dbname, tabname, error); dbname, tabname, error);
} }
// copy names as memory will be freed
NdbAutoPtr<char> a1((char *)dbname= strdup(dbname));
NdbAutoPtr<char> a2((char *)tabname= strdup(tabname));
ndbcluster_binlog_close_table(thd, share); ndbcluster_binlog_close_table(thd, share);
TABLE_LIST table_list; TABLE_LIST table_list;
...@@ -1538,10 +1555,16 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1538,10 +1555,16 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
table_list.alias= table_list.table_name= (char *)tabname; table_list.alias= table_list.table_name= (char *)tabname;
close_cached_tables(thd, 0, &table_list, TRUE); close_cached_tables(thd, 0, &table_list, TRUE);
if ((error= ndbcluster_binlog_open_table(thd, share, if ((error= ndbcluster_binlog_open_table(thd, share,
table_share, table))) table_share, table, 1)))
sql_print_information("NDB: Failed to re-open table %s.%s", sql_print_information("NDB: Failed to re-open table %s.%s",
dbname, tabname); dbname, tabname);
table= share->table;
table_share= share->table_share;
dbname= table_share->db.str;
tabname= table_share->table_name.str;
pthread_mutex_unlock(&LOCK_open); pthread_mutex_unlock(&LOCK_open);
} }
my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR)); my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment