Commit 8bd2c37d authored by msvensson@neptunus.(none)'s avatar msvensson@neptunus.(none)

Merge bk-internal:/home/bk/mysql-5.1-new

into  neptunus.(none):/home/msvensson/mysql/mysql-5.1
parents e316aa6d bdd57aee
--let $binlog_start=102
--replace_result $binlog_start <binlog_start>
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
drop database if exists mysqltest; drop database if exists mysqltest;
drop table if exists t1,t2; drop table if exists t1,t2,t3;
drop database if exists mysqltest; drop database if exists mysqltest;
drop table if exists t1,t2; drop table if exists t1,t2,t3;
reset master; reset master;
reset master; reset master;
create database mysqltest; create database mysqltest;
...@@ -123,3 +123,68 @@ master-bin1.000001 # Table_map # # cluster.apply_status ...@@ -123,3 +123,68 @@ master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # # master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; drop table `t1` master-bin1.000001 # Query # # use `test`; drop table `t1`
reset master;
show tables;
Tables_in_test
reset master;
show tables;
Tables_in_test
create table t1 (a int key) engine=ndb;
create table t2 (a int key) engine=ndb;
create table t3 (a int key) engine=ndb;
rename table t3 to t4, t2 to t3, t1 to t2, t4 to t1;
show binlog events from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin1.000001 # Query # # use `test`; create table t1 (a int key) engine=ndb
master-bin1.000001 # Query # # use `test`; create table t2 (a int key) engine=ndb
master-bin1.000001 # Query # # use `test`; create table t3 (a int key) engine=ndb
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; rename table `test.t3` to `test.t4`
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; rename table `test.t2` to `test.t3`
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; rename table `test.t1` to `test.t2`
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; rename table `test.t4` to `test.t1`
drop table t1;
drop table t2;
drop table t3;
reset master;
show tables;
Tables_in_test
reset master;
show tables;
Tables_in_test
create table t1 (a int key) engine=ndb;
insert into t1 values(1);
rename table t1 to t2;
insert into t2 values(2);
show binlog events from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin1.000001 # Query # # use `test`; create table t1 (a int key) engine=ndb
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Table_map # # test.t1
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; rename table `test.t1` to `test.t2`
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Table_map # # test.t2
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
drop table t2;
...@@ -5,18 +5,16 @@ ...@@ -5,18 +5,16 @@
--disable_warnings --disable_warnings
connection server2; connection server2;
drop database if exists mysqltest; drop database if exists mysqltest;
drop table if exists t1,t2; drop table if exists t1,t2,t3;
connection server1; connection server1;
drop database if exists mysqltest; drop database if exists mysqltest;
drop table if exists t1,t2; drop table if exists t1,t2,t3;
--connection server1 --connection server1
reset master; reset master;
--connection server2 --connection server2
reset master; reset master;
--enable_warnings --enable_warnings
--let $binlog_start=102
# #
# basic test to see if ddl distribution works across # basic test to see if ddl distribution works across
# multiple binlogs # multiple binlogs
...@@ -33,15 +31,10 @@ create table t1 (a int primary key) engine=ndb; ...@@ -33,15 +31,10 @@ create table t1 (a int primary key) engine=ndb;
--connection server2 --connection server2
create table t2 (a int primary key) engine=ndb; create table t2 (a int primary key) engine=ndb;
--replace_result $binlog_start <binlog_start> --source include/show_binlog_events.inc
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
--connection server1 --connection server1
--replace_result $binlog_start <binlog_start> -- source include/show_binlog_events.inc
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
# alter table # alter table
--connection server1 --connection server1
...@@ -53,9 +46,7 @@ reset master; ...@@ -53,9 +46,7 @@ reset master;
alter table t2 add column (b int); alter table t2 add column (b int);
--connections server1 --connections server1
--replace_result $binlog_start <binlog_start> --source include/show_binlog_events.inc
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
# alter database # alter database
...@@ -91,9 +82,7 @@ drop database mysqltest; ...@@ -91,9 +82,7 @@ drop database mysqltest;
create table t1 (a int primary key) engine=ndb; create table t1 (a int primary key) engine=ndb;
--connection server2 --connection server2
--replace_result $binlog_start <binlog_start> --source include/show_binlog_events.inc
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
--connection server2 --connection server2
drop table t2; drop table t2;
...@@ -144,6 +133,51 @@ ENGINE =NDB; ...@@ -144,6 +133,51 @@ ENGINE =NDB;
drop table t1; drop table t1;
--connection server2 --connection server2
--replace_result $binlog_start <binlog_start> --source include/show_binlog_events.inc
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start #
# Bug #17827 cluster: rename of several tables in one statement,
# gets multiply logged
#
--connection server1
reset master;
show tables;
--connection server2
reset master;
show tables;
--connection server1
create table t1 (a int key) engine=ndb;
create table t2 (a int key) engine=ndb;
create table t3 (a int key) engine=ndb;
rename table t3 to t4, t2 to t3, t1 to t2, t4 to t1;
--connection server2
--source include/show_binlog_events.inc
drop table t1;
drop table t2;
drop table t3;
#
# Bug #17838 binlog not setup on seconday master after rename
#
#
--connection server1
reset master;
show tables;
--connection server2
reset master;
show tables;
--connection server1
create table t1 (a int key) engine=ndb;
insert into t1 values(1);
rename table t1 to t2;
insert into t2 values(2);
# now we should see data in table t1 _and_ t2
# prior to bug fix, data was missing for t2
--connection server2
--source include/show_binlog_events.inc
drop table t2;
...@@ -1367,15 +1367,17 @@ int ha_ndbcluster::drop_indexes(Ndb *ndb, TABLE *tab) ...@@ -1367,15 +1367,17 @@ int ha_ndbcluster::drop_indexes(Ndb *ndb, TABLE *tab)
*/ */
NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const
{ {
return get_index_type_from_key(inx, table_share->key_info); return get_index_type_from_key(inx, table_share->key_info,
inx == table_share->primary_key);
} }
NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_key(uint inx, NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_key(uint inx,
KEY *key_info) const KEY *key_info,
bool primary) const
{ {
bool is_hash_index= (key_info[inx].algorithm == bool is_hash_index= (key_info[inx].algorithm ==
HA_KEY_ALG_HASH); HA_KEY_ALG_HASH);
if (inx == table_share->primary_key) if (primary)
return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX; return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX;
return ((key_info[inx].flags & HA_NOSAME) ? return ((key_info[inx].flags & HA_NOSAME) ?
...@@ -4644,7 +4646,7 @@ int ha_ndbcluster::add_index(TABLE *table_arg, ...@@ -4644,7 +4646,7 @@ int ha_ndbcluster::add_index(TABLE *table_arg,
KEY *key= key_info + idx; KEY *key= key_info + idx;
KEY_PART_INFO *key_part= key->key_part; KEY_PART_INFO *key_part= key->key_part;
KEY_PART_INFO *end= key_part + key->key_parts; KEY_PART_INFO *end= key_part + key->key_parts;
NDB_INDEX_TYPE idx_type= get_index_type_from_key(idx, key); NDB_INDEX_TYPE idx_type= get_index_type_from_key(idx, key, false);
DBUG_PRINT("info", ("Adding index: '%s'", key_info[idx].name)); DBUG_PRINT("info", ("Adding index: '%s'", key_info[idx].name));
// Add fields to key_part struct // Add fields to key_part struct
for (; key_part != end; key_part++) for (; key_part != end; key_part++)
......
...@@ -719,7 +719,8 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -719,7 +719,8 @@ static void set_tabname(const char *pathname, char *tabname);
void release_metadata(); void release_metadata();
NDB_INDEX_TYPE get_index_type(uint idx_no) const; NDB_INDEX_TYPE get_index_type(uint idx_no) const;
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const; NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info) const; NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info,
bool primary) const;
int check_index_fields_not_null(uint index_no); int check_index_fields_not_null(uint index_no);
uint set_up_partition_info(partition_info *part_info, uint set_up_partition_info(partition_info *part_info,
......
...@@ -230,12 +230,25 @@ static void run_query(THD *thd, char *buf, char *end, ...@@ -230,12 +230,25 @@ static void run_query(THD *thd, char *buf, char *end,
} }
} }
int static void
ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
{
DBUG_ENTER("ndbcluster_binlog_close_table");
if (share->table_share)
{
free_table_share(share->table_share);
share->table_share= 0;
share->table= 0;
}
DBUG_ASSERT(share->table == 0);
DBUG_VOID_RETURN;
}
static int
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share, ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
TABLE_SHARE *table_share, TABLE *table) TABLE_SHARE *table_share, TABLE *table)
{ {
int error; int error;
MEM_ROOT *mem_root= &share->mem_root;
DBUG_ENTER("ndbcluster_binlog_open_table"); DBUG_ENTER("ndbcluster_binlog_open_table");
init_tmp_table_share(table_share, share->db, 0, share->table_name, init_tmp_table_share(table_share, share->db, 0, share->table_name,
...@@ -274,22 +287,13 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share, ...@@ -274,22 +287,13 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
table->s->table_name.str= share->table_name; table->s->table_name.str= share->table_name;
table->s->table_name.length= strlen(share->table_name); table->s->table_name.length= strlen(share->table_name);
DBUG_ASSERT(share->table_share == 0);
share->table_share= table_share; share->table_share= table_share;
DBUG_ASSERT(share->table == 0);
share->table= table; share->table= table;
#ifndef DBUG_OFF #ifndef DBUG_OFF
dbug_print_table("table", table); dbug_print_table("table", table);
#endif #endif
/*
! do not touch the contents of the table
it may be in use by the injector thread
*/
share->ndb_value[0]= (NdbValue*)
alloc_root(mem_root, sizeof(NdbValue) *
(table->s->fields + 2 /*extra for hidden key and part key*/));
share->ndb_value[1]= (NdbValue*)
alloc_root(mem_root, sizeof(NdbValue) *
(table->s->fields + 2 /*extra for hidden key and part key*/));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -351,6 +355,18 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) ...@@ -351,6 +355,18 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME)); TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table))) if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
break; break;
/*
! do not touch the contents of the table
it may be in use by the injector thread
*/
MEM_ROOT *mem_root= &share->mem_root;
share->ndb_value[0]= (NdbValue*)
alloc_root(mem_root, sizeof(NdbValue) *
(table->s->fields + 2 /*extra for hidden key and part key*/));
share->ndb_value[1]= (NdbValue*)
alloc_root(mem_root, sizeof(NdbValue) *
(table->s->fields + 2 /*extra for hidden key and part key*/));
if (table->s->primary_key == MAX_KEY) if (table->s->primary_key == MAX_KEY)
share->flags|= NSF_HIDDEN_PK; share->flags|= NSF_HIDDEN_PK;
if (table->s->blob_fields != 0) if (table->s->blob_fields != 0)
...@@ -1156,8 +1172,11 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -1156,8 +1172,11 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
(void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&share->mutex);
} }
if (get_a_share) if (get_a_share && share)
{
free_share(&share); free_share(&share);
share= 0;
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1314,22 +1333,35 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1314,22 +1333,35 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
NDB_SHARE *share) NDB_SHARE *share)
{ {
DBUG_ENTER("ndb_handle_schema_change"); DBUG_ENTER("ndb_handle_schema_change");
int remote_drop_table= 0, do_close_cached_tables= 0; bool do_close_cached_tables= FALSE;
const char *dbname= share->table->s->db.str; bool is_online_alter_table= FALSE;
const char *tabname= share->table->s->table_name.str; bool is_rename_table= FALSE;
bool online_alter_table= (pOp->getEventType() == NDBEVENT::TE_ALTER && bool is_remote_change=
pOp->tableFrmChanged()); (uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
if (pOp->getEventType() == NDBEVENT::TE_ALTER)
{
if (pOp->tableFrmChanged())
{
is_online_alter_table= TRUE;
}
else
{
DBUG_ASSERT(pOp->tableNameChanged());
is_rename_table= TRUE;
}
}
if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE && if (is_remote_change) /* includes CLUSTER_FAILURE */
(uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
{ {
TABLE_SHARE *table_share= share->table->s;
TABLE* table= share->table; TABLE* table= share->table;
TABLE_SHARE *table_share= table->s;
const char *dbname= table_share->db.str;
/* /*
Invalidate table and all it's indexes Invalidate table and all it's indexes
*/ */
ndb->setDatabaseName(share->table->s->db.str); ndb->setDatabaseName(dbname);
Thd_ndb *thd_ndb= get_thd_ndb(thd); Thd_ndb *thd_ndb= get_thd_ndb(thd);
DBUG_ASSERT(thd_ndb != NULL); DBUG_ASSERT(thd_ndb != NULL);
Ndb* old_ndb= thd_ndb->ndb; Ndb* old_ndb= thd_ndb->ndb;
...@@ -1341,8 +1373,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1341,8 +1373,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
table_handler.invalidate_dictionary_cache(TRUE); table_handler.invalidate_dictionary_cache(TRUE);
thd_ndb->ndb= old_ndb; thd_ndb->ndb= old_ndb;
if (online_alter_table) if (is_online_alter_table)
{ {
const char *tabname= table_share->table_name.str;
char key[FN_REFLEN]; char key[FN_REFLEN];
const void *data= 0, *pack_data= 0; const void *data= 0, *pack_data= 0;
uint length, pack_length; uint length, pack_length;
...@@ -1364,17 +1397,18 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1364,17 +1397,18 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
DBUG_DUMP("frm", (char*)altered_table->getFrmData(), DBUG_DUMP("frm", (char*)altered_table->getFrmData(),
altered_table->getFrmLength()); altered_table->getFrmLength());
pthread_mutex_lock(&LOCK_open); pthread_mutex_lock(&LOCK_open);
const NDBTAB *old= dict->getTable(tabname); const NDBTAB *old= dict->getTable(tabname);
if (!old && if (!old &&
old->getObjectVersion() != altered_table->getObjectVersion()) old->getObjectVersion() != altered_table->getObjectVersion())
dict->putTable(altered_table); dict->putTable(altered_table);
if ((error= unpackfrm(&data, &length, altered_table->getFrmData())) || if ((error= unpackfrm(&data, &length, altered_table->getFrmData())) ||
(error= writefrm(key, data, length))) (error= writefrm(key, data, length)))
{ {
sql_print_information("NDB: Failed write frm for %s.%s, error %d", sql_print_information("NDB: Failed write frm for %s.%s, error %d",
dbname, tabname, error); dbname, tabname, error);
} }
ndbcluster_binlog_close_table(thd, share);
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE); close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
if ((error= ndbcluster_binlog_open_table(thd, share, if ((error= ndbcluster_binlog_open_table(thd, share,
table_share, table))) table_share, table)))
...@@ -1383,11 +1417,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1383,11 +1417,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
pthread_mutex_unlock(&LOCK_open); pthread_mutex_unlock(&LOCK_open);
} }
} }
remote_drop_table= 1;
} }
// If only frm was changed continue replicating // If only frm was changed continue replicating
if (online_alter_table) if (is_online_alter_table)
{ {
/* Signal ha_ndbcluster::alter_table that drop is done */ /* Signal ha_ndbcluster::alter_table that drop is done */
(void) pthread_cond_signal(&injector_cond); (void) pthread_cond_signal(&injector_cond);
...@@ -1395,6 +1428,22 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1395,6 +1428,22 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
} }
(void) pthread_mutex_lock(&share->mutex); (void) pthread_mutex_lock(&share->mutex);
if (is_rename_table && !is_remote_change)
{
DBUG_PRINT("info", ("Detected name change of table %s.%s",
share->db, share->table_name));
/* ToDo: remove printout */
if (ndb_extra_logging)
sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
share_prefix, share->table->s->db.str,
share->table->s->table_name.str,
share->key);
/* do the rename of the table in the share */
share->table->s->db.str= share->db;
share->table->s->db.length= strlen(share->db);
share->table->s->table_name.str= share->table_name;
share->table->s->table_name.length= strlen(share->table_name);
}
DBUG_ASSERT(share->op == pOp || share->op_old == pOp); DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
if (share->op_old == pOp) if (share->op_old == pOp)
share->op_old= 0; share->op_old= 0;
...@@ -1408,11 +1457,11 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1408,11 +1457,11 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
pthread_mutex_lock(&ndbcluster_mutex); pthread_mutex_lock(&ndbcluster_mutex);
free_share(&share, TRUE); free_share(&share, TRUE);
if (remote_drop_table && share && share->state != NSS_DROPPED) if (is_remote_change && share && share->state != NSS_DROPPED)
{ {
DBUG_PRINT("info", ("remote drop table")); DBUG_PRINT("info", ("remote change"));
if (share->use_count != 1) if (share->use_count != 1)
do_close_cached_tables= 1; do_close_cached_tables= TRUE;
share->state= NSS_DROPPED; share->state= NSS_DROPPED;
free_share(&share, TRUE); free_share(&share, TRUE);
} }
...@@ -1464,24 +1513,45 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1464,24 +1513,45 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
int log_query= 0; int log_query= 0;
DBUG_PRINT("info", ("log query_length: %d query: '%s'", DBUG_PRINT("info", ("log query_length: %d query: '%s'",
schema->query_length, schema->query)); schema->query_length, schema->query));
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
NDB_SHARE *share= get_share(key, 0, false, false);
switch ((enum SCHEMA_OP_TYPE)schema->type) switch ((enum SCHEMA_OP_TYPE)schema->type)
{ {
case SOT_DROP_TABLE: case SOT_DROP_TABLE:
/* binlog dropping table after any table operations */ /* binlog dropping table after any table operations */
if (ndb_binlog_running) if (share && share->op)
post_epoch_log_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root);
log_query= 0; log_query= 0;
break; break;
case SOT_RENAME_TABLE: case SOT_RENAME_TABLE:
/* fall through */ if (share && share->op)
{
log_query= 0;
post_epoch_log_list->push_back(schema, mem_root);
break; /* discovery will be handled by binlog */
}
goto sot_create_table;
case SOT_ALTER_TABLE: case SOT_ALTER_TABLE:
if (ndb_binlog_running) if (share && share->op)
{ {
log_query= 1; log_query= 0;
post_epoch_log_list->push_back(schema, mem_root);
break; /* discovery will be handled by binlog */ break; /* discovery will be handled by binlog */
} }
/* fall through */ goto sot_create_table;
case SOT_CREATE_TABLE: case SOT_CREATE_TABLE:
sot_create_table:
/*
we need to free any share here as command below
may need to call handle_trailing_share
*/
if (share)
{
free_share(&share);
share= 0;
}
pthread_mutex_lock(&LOCK_open); pthread_mutex_lock(&LOCK_open);
if (ndb_create_table_from_engine(thd, schema->db, schema->name)) if (ndb_create_table_from_engine(thd, schema->db, schema->name))
{ {
...@@ -1514,10 +1584,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1514,10 +1584,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
break; break;
case SOT_CLEAR_SLOCK: case SOT_CLEAR_SLOCK:
{ {
char key[FN_REFLEN];
build_table_filename(key, sizeof(key),
schema->db, schema->name, "");
NDB_SHARE *share= get_share(key, 0, false, false);
if (share) if (share)
{ {
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
...@@ -1528,6 +1594,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1528,6 +1594,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
pthread_mutex_unlock(&share->mutex); pthread_mutex_unlock(&share->mutex);
pthread_cond_signal(&injector_cond); pthread_cond_signal(&injector_cond);
free_share(&share); free_share(&share);
share= 0;
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1536,7 +1603,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1536,7 +1603,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
log_query= 1; log_query= 1;
break; break;
} }
if (share)
{
free_share(&share);
share= 0;
}
/* signal that schema operation has been handled */ /* signal that schema operation has been handled */
if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK) if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK)
{ {
...@@ -1571,23 +1642,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1571,23 +1642,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
case NDBEVENT::TE_DELETE: case NDBEVENT::TE_DELETE:
// skip // skip
break; break;
case NDBEVENT::TE_ALTER:
if (pOp->tableNameChanged())
{
DBUG_PRINT("info", ("Detected name change of table %s.%s",
share->db, share->table_name));
/* do the rename of the table in the share */
share->table->s->db.str= share->db;
share->table->s->db.length= strlen(share->db);
share->table->s->table_name.str= share->table_name;
share->table->s->table_name.length= strlen(share->table_name);
}
ndb_handle_schema_change(thd, ndb, pOp, share);
break;
case NDBEVENT::TE_CLUSTER_FAILURE: case NDBEVENT::TE_CLUSTER_FAILURE:
case NDBEVENT::TE_DROP: case NDBEVENT::TE_DROP:
free_share(&schema_share); free_share(&schema_share);
schema_share= 0; schema_share= 0;
// fall through
case NDBEVENT::TE_ALTER:
ndb_handle_schema_change(thd, ndb, pOp, share); ndb_handle_schema_change(thd, ndb, pOp, share);
break; break;
case NDBEVENT::TE_NODE_FAILURE: case NDBEVENT::TE_NODE_FAILURE:
...@@ -1659,6 +1719,72 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1659,6 +1719,72 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/*
process any operations that should be done after
the epoch is complete
*/
static void
ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
List<Cluster_replication_schema>
*post_epoch_log_list,
List<Cluster_replication_schema>
*post_epoch_unlock_list)
{
DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
Cluster_replication_schema *schema;
while ((schema= post_epoch_log_list->pop()))
{
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
schema->query_length, schema->query));
{
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
NDB_SHARE *share= get_share(key, 0, false, false);
switch ((enum SCHEMA_OP_TYPE)schema->type)
{
case SOT_DROP_DB:
case SOT_DROP_TABLE:
break;
case SOT_RENAME_TABLE:
case SOT_ALTER_TABLE:
if (share && share->op)
{
break; /* discovery handled by binlog */
}
pthread_mutex_lock(&LOCK_open);
if (ndb_create_table_from_engine(thd, schema->db, schema->name))
{
sql_print_error("Could not discover table '%s.%s' from "
"binlog schema event '%s' from node %d",
schema->db, schema->name, schema->query,
schema->node_id);
}
pthread_mutex_unlock(&LOCK_open);
default:
DBUG_ASSERT(false);
}
if (share)
{
free_share(&share);
share= 0;
}
}
{
char *thd_db_save= thd->db;
thd->db= schema->db;
thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
schema->query_length, FALSE,
schema->name[0] == 0);
thd->db= thd_db_save;
}
}
while ((schema= post_epoch_unlock_list->pop()))
{
ndbcluster_update_slock(thd, schema->db, schema->name);
}
DBUG_VOID_RETURN;
}
/* /*
Timer class for doing performance measurements Timer class for doing performance measurements
*/ */
...@@ -2206,6 +2332,10 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ...@@ -2206,6 +2332,10 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
if (share->flags & NSF_BLOB_FLAG) if (share->flags & NSF_BLOB_FLAG)
op->mergeEvents(true); // currently not inherited from event op->mergeEvents(true); // currently not inherited from event
DBUG_PRINT("info", ("share->ndb_value[0]: 0x%x",
share->ndb_value[0]));
DBUG_PRINT("info", ("share->ndb_value[1]: 0x%x",
share->ndb_value[1]));
int n_columns= ndbtab->getNoOfColumns(); int n_columns= ndbtab->getNoOfColumns();
int n_fields= table ? table->s->fields : 0; // XXX ??? int n_fields= table ? table->s->fields : 0; // XXX ???
for (int j= 0; j < n_columns; j++) for (int j= 0; j < n_columns; j++)
...@@ -2258,6 +2388,12 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ...@@ -2258,6 +2388,12 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
} }
share->ndb_value[0][j].ptr= attr0.ptr; share->ndb_value[0][j].ptr= attr0.ptr;
share->ndb_value[1][j].ptr= attr1.ptr; share->ndb_value[1][j].ptr= attr1.ptr;
DBUG_PRINT("info", ("&share->ndb_value[0][%d]: 0x%x "
"share->ndb_value[0][%d]: 0x%x",
j, &share->ndb_value[0][j], j, attr0.ptr));
DBUG_PRINT("info", ("&share->ndb_value[1][%d]: 0x%x "
"share->ndb_value[1][%d]: 0x%x",
j, &share->ndb_value[0][j], j, attr1.ptr));
} }
op->setCustomData((void *) share); // set before execute op->setCustomData((void *) share); // set before execute
share->op= op; // assign op in NDB_SHARE share->op= op; // assign op in NDB_SHARE
...@@ -2468,24 +2604,6 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2468,24 +2604,6 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
"op_old: %lx", "op_old: %lx",
share->key, share, pOp, share->op, share->op_old)); share->key, share, pOp, share->op, share->op_old));
break; break;
case NDBEVENT::TE_ALTER:
if (pOp->tableNameChanged())
{
DBUG_PRINT("info", ("Detected name change of table %s.%s",
share->db, share->table_name));
/* ToDo: remove printout */
if (ndb_extra_logging)
sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
share_prefix, share->table->s->db.str,
share->table->s->table_name.str,
share->key);
/* do the rename of the table in the share */
share->table->s->db.str= share->db;
share->table->s->db.length= strlen(share->db);
share->table->s->table_name.str= share->table_name;
share->table->s->table_name.length= strlen(share->table_name);
}
goto drop_alter_common;
case NDBEVENT::TE_DROP: case NDBEVENT::TE_DROP:
if (apply_status_share == share) if (apply_status_share == share)
{ {
...@@ -2495,7 +2613,8 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2495,7 +2613,8 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
/* ToDo: remove printout */ /* ToDo: remove printout */
if (ndb_extra_logging) if (ndb_extra_logging)
sql_print_information("NDB Binlog: drop table %s.", share->key); sql_print_information("NDB Binlog: drop table %s.", share->key);
drop_alter_common: // fall through
case NDBEVENT::TE_ALTER:
row.n_schemaops++; row.n_schemaops++;
DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: %lx " DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: %lx "
"share op: %lx op_old: %lx", "share op: %lx op_old: %lx",
...@@ -3075,26 +3194,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3075,26 +3194,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
} }
} }
/* ndb_binlog_thread_handle_schema_event_post_epoch(thd,
process any operations that should be done after &post_epoch_log_list,
the epoch is complete &post_epoch_unlock_list);
*/
{
Cluster_replication_schema *schema;
while ((schema= post_epoch_unlock_list.pop()))
{
ndbcluster_update_slock(thd, schema->db, schema->name);
}
while ((schema= post_epoch_log_list.pop()))
{
char *thd_db_save= thd->db;
thd->db= schema->db;
thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
schema->query_length, FALSE,
schema->name[0] == 0);
thd->db= thd_db_save;
}
}
free_root(&mem_root, MYF(0)); free_root(&mem_root, MYF(0));
*root_ptr= old_root; *root_ptr= old_root;
ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch; ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
...@@ -3110,9 +3212,15 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3110,9 +3212,15 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
sql_print_information("Stopping Cluster Binlog"); sql_print_information("Stopping Cluster Binlog");
if (apply_status_share) if (apply_status_share)
{
free_share(&apply_status_share); free_share(&apply_status_share);
apply_status_share= 0;
}
if (schema_share) if (schema_share)
{
free_share(&schema_share); free_share(&schema_share);
schema_share= 0;
}
/* remove all event operations */ /* remove all event operations */
if (ndb) if (ndb)
......
...@@ -292,6 +292,7 @@ private: ...@@ -292,6 +292,7 @@ private:
}; };
Buf theKeyBuf; Buf theKeyBuf;
Buf theAccessKeyBuf; Buf theAccessKeyBuf;
Buf thePackKeyBuf;
Buf theHeadInlineBuf; Buf theHeadInlineBuf;
Buf theHeadInlineCopyBuf; // for writeTuple Buf theHeadInlineCopyBuf; // for writeTuple
Buf thePartBuf; Buf thePartBuf;
...@@ -328,6 +329,9 @@ private: ...@@ -328,6 +329,9 @@ private:
Uint32 getPartNumber(Uint64 pos); Uint32 getPartNumber(Uint64 pos);
Uint32 getPartCount(); Uint32 getPartCount();
Uint32 getDistKey(Uint32 part); Uint32 getDistKey(Uint32 part);
// pack / unpack
int packKeyValue(const NdbTableImpl* aTable, const Buf& srcBuf);
int unpackKeyValue(const NdbTableImpl* aTable, Buf& dstBuf);
// getters and setters // getters and setters
int getTableKeyValue(NdbOperation* anOp); int getTableKeyValue(NdbOperation* anOp);
int setTableKeyValue(NdbOperation* anOp); int setTableKeyValue(NdbOperation* anOp);
......
...@@ -881,7 +881,7 @@ protected: ...@@ -881,7 +881,7 @@ protected:
Uint32 ptr2int() { return theReceiver.getId(); }; Uint32 ptr2int() { return theReceiver.getId(); };
// get table or index key from prepared signals // get table or index key from prepared signals
int getKeyFromTCREQ(Uint32* data, unsigned size); int getKeyFromTCREQ(Uint32* data, Uint32 & size);
/****************************************************************************** /******************************************************************************
* These are the private variables that are defined in the operation objects. * These are the private variables that are defined in the operation objects.
......
...@@ -240,7 +240,7 @@ protected: ...@@ -240,7 +240,7 @@ protected:
void receiver_completed(NdbReceiver*); void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP(); void execCLOSE_SCAN_REP();
int getKeyFromKEYINFO20(Uint32* data, unsigned size); int getKeyFromKEYINFO20(Uint32* data, Uint32 & size);
NdbOperation* takeOverScanOp(OperationType opType, NdbTransaction*); NdbOperation* takeOverScanOp(OperationType opType, NdbTransaction*);
bool m_ordered; bool m_ordered;
......
...@@ -310,7 +310,7 @@ NdbBlob::Buf::alloc(unsigned n) ...@@ -310,7 +310,7 @@ NdbBlob::Buf::alloc(unsigned n)
void void
NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src) NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
{ {
assert(size == src.size); size = src.size;
memcpy(data, src.data, size); memcpy(data, src.data, size);
} }
...@@ -408,6 +408,75 @@ NdbBlob::getDistKey(Uint32 part) ...@@ -408,6 +408,75 @@ NdbBlob::getDistKey(Uint32 part)
return (part / theStripeSize) % theStripeSize; return (part / theStripeSize) % theStripeSize;
} }
// pack/unpack table/index key XXX support routines, shortcuts
int
NdbBlob::packKeyValue(const NdbTableImpl* aTable, const Buf& srcBuf)
{
DBUG_ENTER("NdbBlob::packKeyValue");
const Uint32* data = (const Uint32*)srcBuf.data;
unsigned pos = 0;
Uint32* pack_data = (Uint32*)thePackKeyBuf.data;
unsigned pack_pos = 0;
for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
NdbColumnImpl* c = aTable->m_columns[i];
assert(c != NULL);
if (c->m_pk) {
unsigned len = c->m_attrSize * c->m_arraySize;
Uint32 pack_len;
bool ok = c->get_var_length(&data[pos], pack_len);
if (! ok) {
setErrorCode(NdbBlobImpl::ErrCorruptPK);
DBUG_RETURN(-1);
}
memcpy(&pack_data[pack_pos], &data[pos], pack_len);
while (pack_len % 4 != 0) {
char* p = (char*)&pack_data[pack_pos] + pack_len++;
*p = 0;
}
pos += (len + 3) / 4;
pack_pos += pack_len / 4;
}
}
assert(4 * pos == srcBuf.size);
assert(4 * pack_pos <= thePackKeyBuf.maxsize);
thePackKeyBuf.size = 4 * pack_pos;
DBUG_RETURN(0);
}
int
NdbBlob::unpackKeyValue(const NdbTableImpl* aTable, Buf& dstBuf)
{
DBUG_ENTER("NdbBlob::unpackKeyValue");
Uint32* data = (Uint32*)dstBuf.data;
unsigned pos = 0;
const Uint32* pack_data = (const Uint32*)thePackKeyBuf.data;
unsigned pack_pos = 0;
for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
NdbColumnImpl* c = aTable->m_columns[i];
assert(c != NULL);
if (c->m_pk) {
unsigned len = c->m_attrSize * c->m_arraySize;
Uint32 pack_len;
bool ok = c->get_var_length(&pack_data[pack_pos], pack_len);
if (! ok) {
setErrorCode(NdbBlobImpl::ErrCorruptPK);
DBUG_RETURN(-1);
}
memcpy(&data[pos], &pack_data[pack_pos], pack_len);
while (pack_len % 4 != 0) {
char* p = (char*)&data[pos] + pack_len++;
*p = 0;
}
pos += (len + 3) / 4;
pack_pos += pack_len / 4;
}
}
assert(4 * pos == dstBuf.size);
assert(4 * pack_pos == thePackKeyBuf.size);
DBUG_RETURN(0);
}
// getters and setters // getters and setters
int int
...@@ -489,12 +558,10 @@ int ...@@ -489,12 +558,10 @@ int
NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part) NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part)
{ {
DBUG_ENTER("NdbBlob::setPartKeyValue"); DBUG_ENTER("NdbBlob::setPartKeyValue");
DBUG_PRINT("info", ("dist=%u part=%u key=", getDistKey(part), part)); DBUG_PRINT("info", ("dist=%u part=%u packkey=", getDistKey(part), part));
DBUG_DUMP("info", theKeyBuf.data, 4 * theTable->m_keyLenInWords); DBUG_DUMP("info", thePackKeyBuf.data, 4 * thePackKeyBuf.size);
//Uint32* data = (Uint32*)theKeyBuf.data;
//unsigned size = theTable->m_keyLenInWords;
// TODO use attr ids after compatibility with 4.1.7 not needed // TODO use attr ids after compatibility with 4.1.7 not needed
if (anOp->equal("PK", theKeyBuf.data) == -1 || if (anOp->equal("PK", thePackKeyBuf.data) == -1 ||
anOp->equal("DIST", getDistKey(part)) == -1 || anOp->equal("DIST", getDistKey(part)) == -1 ||
anOp->equal("PART", part) == -1) { anOp->equal("PART", part) == -1) {
setErrorCode(anOp); setErrorCode(anOp);
...@@ -1242,21 +1309,27 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl ...@@ -1242,21 +1309,27 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
if (isKeyOp()) { if (isKeyOp()) {
if (isTableOp()) { if (isTableOp()) {
// get table key // get table key
Uint32* data = (Uint32*)theKeyBuf.data; Uint32* data = (Uint32*)thePackKeyBuf.data;
unsigned size = theTable->m_keyLenInWords; Uint32 size = theTable->m_keyLenInWords; // in-out
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) { if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
setErrorCode(NdbBlobImpl::ErrUsage); setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
thePackKeyBuf.size = 4 * size;
if (unpackKeyValue(theTable, theKeyBuf) == -1)
DBUG_RETURN(-1);
} }
if (isIndexOp()) { if (isIndexOp()) {
// get index key // get index key
Uint32* data = (Uint32*)theAccessKeyBuf.data; Uint32* data = (Uint32*)thePackKeyBuf.data;
unsigned size = theAccessTable->m_keyLenInWords; Uint32 size = theAccessTable->m_keyLenInWords; // in-out
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) { if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
setErrorCode(NdbBlobImpl::ErrUsage); setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
thePackKeyBuf.size = 4 * size;
if (unpackKeyValue(theAccessTable, theAccessKeyBuf) == -1)
DBUG_RETURN(-1);
} }
if (isReadOp()) { if (isReadOp()) {
// add read of head+inline in this op // add read of head+inline in this op
...@@ -1303,6 +1376,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, ...@@ -1303,6 +1376,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
theEventOp = anOp; theEventOp = anOp;
theBlobEventOp = aBlobOp; theBlobEventOp = aBlobOp;
theTable = anOp->m_eventImpl->m_tableImpl; theTable = anOp->m_eventImpl->m_tableImpl;
theAccessTable = theTable;
theColumn = aColumn; theColumn = aColumn;
// prepare blob column and table // prepare blob column and table
if (prepareColumn() == -1) if (prepareColumn() == -1)
...@@ -1321,7 +1395,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, ...@@ -1321,7 +1395,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
if (theBlobEventOp != NULL) { if (theBlobEventOp != NULL) {
if ((theBlobEventPkRecAttr = if ((theBlobEventPkRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0), theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
theKeyBuf.data, version)) == NULL || thePackKeyBuf.data, version)) == NULL ||
(theBlobEventDistRecAttr = (theBlobEventDistRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1), theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1),
(char*)0, version)) == NULL || (char*)0, version)) == NULL ||
...@@ -1380,6 +1454,7 @@ NdbBlob::prepareColumn() ...@@ -1380,6 +1454,7 @@ NdbBlob::prepareColumn()
} }
// these buffers are always used // these buffers are always used
theKeyBuf.alloc(theTable->m_keyLenInWords << 2); theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
thePackKeyBuf.alloc(max(theTable->m_keyLenInWords, theAccessTable->m_keyLenInWords) << 2);
theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize); theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
theHead = (Head*)theHeadInlineBuf.data; theHead = (Head*)theHeadInlineBuf.data;
theInlineData = theHeadInlineBuf.data + sizeof(Head); theInlineData = theHeadInlineBuf.data + sizeof(Head);
...@@ -1464,7 +1539,7 @@ NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch) ...@@ -1464,7 +1539,7 @@ NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch)
if (tOp == NULL || if (tOp == NULL ||
tOp->readTuple() == -1 || tOp->readTuple() == -1 ||
setAccessKeyValue(tOp) == -1 || setAccessKeyValue(tOp) == -1 ||
tOp->getValue(pkAttrId, theKeyBuf.data) == NULL) { tOp->getValue(pkAttrId, thePackKeyBuf.data) == NULL) {
setErrorCode(tOp); setErrorCode(tOp);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
...@@ -1553,10 +1628,12 @@ NdbBlob::postExecute(NdbTransaction::ExecType anExecType) ...@@ -1553,10 +1628,12 @@ NdbBlob::postExecute(NdbTransaction::ExecType anExecType)
assert(isKeyOp()); assert(isKeyOp());
if (isIndexOp()) { if (isIndexOp()) {
NdbBlob* tFirstBlob = theNdbOp->theBlobList; NdbBlob* tFirstBlob = theNdbOp->theBlobList;
if (this != tFirstBlob) { if (this == tFirstBlob) {
packKeyValue(theTable, theKeyBuf);
} else {
// copy key from first blob // copy key from first blob
assert(theKeyBuf.size == tFirstBlob->theKeyBuf.size); theKeyBuf.copyfrom(tFirstBlob->theKeyBuf);
memcpy(theKeyBuf.data, tFirstBlob->theKeyBuf.data, tFirstBlob->theKeyBuf.size); thePackKeyBuf.copyfrom(tFirstBlob->thePackKeyBuf);
} }
} }
if (isReadOp()) { if (isReadOp()) {
...@@ -1710,12 +1787,16 @@ NdbBlob::atNextResult() ...@@ -1710,12 +1787,16 @@ NdbBlob::atNextResult()
DBUG_RETURN(-1); DBUG_RETURN(-1);
assert(isScanOp()); assert(isScanOp());
// get primary key // get primary key
{ Uint32* data = (Uint32*)theKeyBuf.data; { NdbScanOperation* tScanOp = (NdbScanOperation*)theNdbOp;
unsigned size = theTable->m_keyLenInWords; Uint32* data = (Uint32*)thePackKeyBuf.data;
if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) { unsigned size = theTable->m_keyLenInWords; // in-out
if (tScanOp->getKeyFromKEYINFO20(data, size) == -1) {
setErrorCode(NdbBlobImpl::ErrUsage); setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
thePackKeyBuf.size = 4 * size;
if (unpackKeyValue(theTable, theKeyBuf) == -1)
DBUG_RETURN(-1);
} }
getHeadFromRecAttr(); getHeadFromRecAttr();
if (setPos(0) == -1) if (setPos(0) == -1)
......
...@@ -34,6 +34,8 @@ public: ...@@ -34,6 +34,8 @@ public:
STATIC_CONST( ErrAbort = 4268 ); STATIC_CONST( ErrAbort = 4268 );
// "Unknown blob error" // "Unknown blob error"
STATIC_CONST( ErrUnknown = 4269 ); STATIC_CONST( ErrUnknown = 4269 );
// "Corrupted main table PK in blob operation"
STATIC_CONST( ErrCorruptPK = 4274 );
}; };
#endif #endif
...@@ -689,9 +689,45 @@ NdbEventOperationImpl::receive_event() ...@@ -689,9 +689,45 @@ NdbEventOperationImpl::receive_event()
error.code)); error.code));
DBUG_RETURN_EVENT(1); DBUG_RETURN_EVENT(1);
} }
if ( m_eventImpl->m_tableImpl)
delete m_eventImpl->m_tableImpl; NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
m_eventImpl->m_tableImpl = at; m_eventImpl->m_tableImpl = at;
DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x",
tmp_table_impl, at));
// change the rec attrs to refer to the new table object
int i;
for (i = 0; i < 2; i++)
{
NdbRecAttr *p = theFirstPkAttrs[i];
while (p)
{
int no = p->getColumn()->getColumnNo();
NdbColumnImpl *tAttrInfo = at->getColumn(no);
DBUG_PRINT("info", ("rec_attr: 0x%x "
"switching column impl 0x%x -> 0x%x",
p, p->m_column, tAttrInfo));
p->m_column = tAttrInfo;
p = p->next();
}
}
for (i = 0; i < 2; i++)
{
NdbRecAttr *p = theFirstDataAttrs[i];
while (p)
{
int no = p->getColumn()->getColumnNo();
NdbColumnImpl *tAttrInfo = at->getColumn(no);
DBUG_PRINT("info", ("rec_attr: 0x%x "
"switching column impl 0x%x -> 0x%x",
p, p->m_column, tAttrInfo));
p->m_column = tAttrInfo;
p = p->next();
}
}
if (tmp_table_impl)
delete tmp_table_impl;
} }
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT)) if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
...@@ -1979,7 +2015,7 @@ split_concatenated_pk(const NdbTableImpl* t, Uint32* ah_buffer, ...@@ -1979,7 +2015,7 @@ split_concatenated_pk(const NdbTableImpl* t, Uint32* ah_buffer,
ah_buffer[n++] = ah.m_value; ah_buffer[n++] = ah.m_value;
sz += ah.getDataSize(); sz += ah.getDataSize();
} }
assert(n == t->m_noOfKeys && sz == pk_sz); assert(n == t->m_noOfKeys && sz <= pk_sz);
} }
int int
......
...@@ -472,7 +472,8 @@ void ...@@ -472,7 +472,8 @@ void
NdbOperation::reorderKEYINFO() NdbOperation::reorderKEYINFO()
{ {
Uint32 data[4000]; Uint32 data[4000];
getKeyFromTCREQ(data, 4000); Uint32 size = 4000;
getKeyFromTCREQ(data, size);
Uint32 pos = 1; Uint32 pos = 1;
Uint32 k; Uint32 k;
for (k = 0; k < m_accessTable->m_noOfKeys; k++) { for (k = 0; k < m_accessTable->m_noOfKeys; k++) {
...@@ -501,7 +502,7 @@ NdbOperation::reorderKEYINFO() ...@@ -501,7 +502,7 @@ NdbOperation::reorderKEYINFO()
} }
int int
NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size) NdbOperation::getKeyFromTCREQ(Uint32* data, Uint32 & size)
{ {
assert(size >= theTupKeyLen && theTupKeyLen > 0); assert(size >= theTupKeyLen && theTupKeyLen > 0);
size = theTupKeyLen; size = theTupKeyLen;
......
...@@ -199,7 +199,7 @@ NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r) ...@@ -199,7 +199,7 @@ NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r)
out << hex << "H'" << r.u_32_value() << dec; out << hex << "H'" << r.u_32_value() << dec;
break; break;
case NdbDictionary::Column::Unsigned: case NdbDictionary::Column::Unsigned:
out << r.u_32_value(); out << *((Uint32*)r.aRef() + j);
break; break;
case NdbDictionary::Column::Smallunsigned: case NdbDictionary::Column::Smallunsigned:
out << r.u_short_value(); out << r.u_short_value();
......
...@@ -912,13 +912,20 @@ NdbScanOperation::doSendScan(int aProcessorId) ...@@ -912,13 +912,20 @@ NdbScanOperation::doSendScan(int aProcessorId)
* the scan process. * the scan process.
****************************************************************************/ ****************************************************************************/
int int
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size) NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, Uint32 & size)
{ {
NdbRecAttr * tRecAttr = m_curr_row; NdbRecAttr * tRecAttr = m_curr_row;
if(tRecAttr) if(tRecAttr)
{ {
const Uint32 * src = (Uint32*)tRecAttr->aRef(); const Uint32 * src = (Uint32*)tRecAttr->aRef();
memcpy(data, src, 4*size);
assert(tRecAttr->get_size_in_bytes() > 0);
assert(tRecAttr->get_size_in_bytes() < 65536);
const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
assert(size >= len);
memcpy(data, src, 4*len);
size = len;
return 0; return 0;
} }
return -1; return -1;
......
...@@ -599,7 +599,8 @@ ErrorBundle ErrorCodes[] = { ...@@ -599,7 +599,8 @@ ErrorBundle ErrorCodes[] = {
{ 4336, DMEC, AE, "Auto-increment value set below current value" }, { 4336, DMEC, AE, "Auto-increment value set below current value" },
{ 4271, DMEC, AE, "Invalid index object, not retrieved via getIndex()" }, { 4271, DMEC, AE, "Invalid index object, not retrieved via getIndex()" },
{ 4272, DMEC, AE, "Table definition has undefined column" }, { 4272, DMEC, AE, "Table definition has undefined column" },
{ 4273, DMEC, IE, "No blob table in dict cache" } { 4273, DMEC, IE, "No blob table in dict cache" },
{ 4274, DMEC, IE, "Corrupted main table PK in blob operation" }
}; };
static static
......
...@@ -223,7 +223,7 @@ dropTable() ...@@ -223,7 +223,7 @@ dropTable()
{ {
NdbDictionary::Table tab(g_opt.m_tname); NdbDictionary::Table tab(g_opt.m_tname);
if (g_dic->getTable(g_opt.m_tname) != 0) if (g_dic->getTable(g_opt.m_tname) != 0)
CHK(g_dic->dropTable(tab) == 0); CHK(g_dic->dropTable(g_opt.m_tname) == 0);
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment