Commit bf6d480a authored by unknown's avatar unknown

Bug #16997 Table rename that changes database does not rename indexes: Moved...

Bug #16997 Table rename that changes database does not rename indexes: Moved index tables to system database


parent d4aa0b8d
DROP TABLE IF EXISTS t1,t2;
drop database if exists mysqltest;
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10),
INDEX i1(attr1)
) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (0,0,0,"zero"),(1,1,1,"one"),(2,2,2,"two");
SELECT * FROM t1 WHERE attr1 = 1;
pk1 attr1 attr2 attr3
1 1 1 one
alter table t1 rename t2;
SELECT * FROM t2 WHERE attr1 = 1;
pk1 attr1 attr2 attr3
1 1 1 one
create database ndbtest;
alter table t2 rename ndbtest.t2;
SELECT * FROM ndbtest.t2 WHERE attr1 = 1;
pk1 attr1 attr2 attr3
1 1 1 one
drop table ndbtest.t2;
drop database ndbtest;
-- source include/have_ndb.inc
-- source include/not_embedded.inc
--disable_warnings
DROP TABLE IF EXISTS t1,t2;
drop database if exists mysqltest;
--enable_warnings
#
# Table rename tests
#
#
# Create a normal table with primary key
#
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10),
INDEX i1(attr1)
) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (0,0,0,"zero"),(1,1,1,"one"),(2,2,2,"two");
SELECT * FROM t1 WHERE attr1 = 1;
alter table t1 rename t2;
SELECT * FROM t2 WHERE attr1 = 1;
create database ndbtest;
alter table t2 rename ndbtest.t2;
SELECT * FROM ndbtest.t2 WHERE attr1 = 1;
drop table ndbtest.t2;
drop database ndbtest;
# End of 4.1 tests
...@@ -4922,13 +4922,17 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) ...@@ -4922,13 +4922,17 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
{ {
NDBDICT *dict; NDBDICT *dict;
char old_dbname[FN_HEADLEN]; char old_dbname[FN_HEADLEN];
char new_dbname[FN_HEADLEN];
char new_tabname[FN_HEADLEN]; char new_tabname[FN_HEADLEN];
const NDBTAB *orig_tab; const NDBTAB *orig_tab;
int result; int result;
bool recreate_indexes= FALSE;
NDBDICT::List index_list;
DBUG_ENTER("ha_ndbcluster::rename_table"); DBUG_ENTER("ha_ndbcluster::rename_table");
DBUG_PRINT("info", ("Renaming %s to %s", from, to)); DBUG_PRINT("info", ("Renaming %s to %s", from, to));
set_dbname(from, old_dbname); set_dbname(from, old_dbname);
set_dbname(to, new_dbname);
set_tabname(from); set_tabname(from);
set_tabname(to, new_tabname); set_tabname(to, new_tabname);
...@@ -4953,6 +4957,11 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) ...@@ -4953,6 +4957,11 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
DBUG_ASSERT(r == 0); DBUG_ASSERT(r == 0);
} }
#endif #endif
if (my_strcasecmp(system_charset_info, new_dbname, old_dbname))
{
dict->listIndexes(index_list, *orig_tab);
recreate_indexes= TRUE;
}
// Change current database to that of target table // Change current database to that of target table
set_dbname(to); set_dbname(to);
ndb->setDatabaseName(m_dbname); ndb->setDatabaseName(m_dbname);
...@@ -5033,6 +5042,32 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) ...@@ -5033,6 +5042,32 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
SOT_RENAME_TABLE, SOT_RENAME_TABLE,
m_dbname, new_tabname); m_dbname, new_tabname);
} }
// If we are moving tables between databases, we need to recreate
// indexes
if (recreate_indexes)
{
for (unsigned i = 0; i < index_list.count; i++)
{
NDBDICT::List::Element& index_el = index_list.elements[i];
// Recreate any indexes not stored in the system database
if (my_strcasecmp(system_charset_info,
index_el.database, NDB_SYSTEM_DATABASE))
{
set_dbname(from);
ndb->setDatabaseName(m_dbname);
const NDBINDEX * index= dict->getIndexGlobal(index_el.name, new_tab);
DBUG_PRINT("info", ("Creating index %s/%s",
index_el.database, index->getName()));
dict->createIndex(*index, new_tab);
DBUG_PRINT("info", ("Dropping index %s/%s",
index_el.database, index->getName()));
set_dbname(from);
ndb->setDatabaseName(m_dbname);
dict->dropIndexGlobal(*index);
}
}
}
if (share) if (share)
free_share(&share); free_share(&share);
#endif #endif
......
...@@ -1001,6 +1001,9 @@ typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*); ...@@ -1001,6 +1001,9 @@ typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
#define WAITFOR_RESPONSE_TIMEOUT 120000 // Milliseconds #define WAITFOR_RESPONSE_TIMEOUT 120000 // Milliseconds
#endif #endif
#define NDB_SYSTEM_DATABASE "sys"
#define NDB_SYSTEM_SCHEMA "def"
/** /**
* @class Ndb * @class Ndb
* @brief Represents the NDB kernel and is the main class of the NDB API. * @brief Represents the NDB kernel and is the main class of the NDB API.
...@@ -1648,6 +1651,8 @@ private: ...@@ -1648,6 +1651,8 @@ private:
const char * externalizeIndexName(const char * internalIndexName, const char * externalizeIndexName(const char * internalIndexName,
bool fullyQualifiedNames); bool fullyQualifiedNames);
const char * externalizeIndexName(const char * internalIndexName); const char * externalizeIndexName(const char * internalIndexName);
const BaseString old_internalize_index_name(const NdbTableImpl * table,
const char * external_name) const;
const BaseString internalize_index_name(const NdbTableImpl * table, const BaseString internalize_index_name(const NdbTableImpl * table,
const char * external_name) const; const char * external_name) const;
......
...@@ -1635,6 +1635,16 @@ public: ...@@ -1635,6 +1635,16 @@ public:
int listIndexes(List & list, const char * tableName); int listIndexes(List & list, const char * tableName);
int listIndexes(List & list, const char * tableName) const; int listIndexes(List & list, const char * tableName) const;
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
* Fetch list of indexes of given table.
* @param list Reference to list where to store the listed indexes
* @param table Reference to table that index belongs to.
* @return 0 if successful, otherwise -1
*/
int listIndexes(List & list, const Table &table) const;
#endif
/** @} *******************************************************************/ /** @} *******************************************************************/
/** /**
* @name Events * @name Events
......
...@@ -1196,6 +1196,35 @@ Ndb::internalize_table_name(const char *external_name) const ...@@ -1196,6 +1196,35 @@ Ndb::internalize_table_name(const char *external_name) const
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
const BaseString
Ndb::old_internalize_index_name(const NdbTableImpl * table,
const char * external_name) const
{
BaseString ret;
DBUG_ENTER("old_internalize_index_name");
DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
external_name, table ? table->m_id : ~0));
if (!table)
{
DBUG_PRINT("error", ("!table"));
DBUG_RETURN(ret);
}
if (fullyQualifiedNames)
{
/* Internal index name format <db>/<schema>/<tabid>/<table> */
ret.assfmt("%s%d%c%s",
theImpl->m_prefix.c_str(),
table->m_id,
table_name_separator,
external_name);
}
else
ret.assign(external_name);
DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
DBUG_RETURN(ret);
}
const BaseString const BaseString
Ndb::internalize_index_name(const NdbTableImpl * table, Ndb::internalize_index_name(const NdbTableImpl * table,
...@@ -1213,9 +1242,9 @@ Ndb::internalize_index_name(const NdbTableImpl * table, ...@@ -1213,9 +1242,9 @@ Ndb::internalize_index_name(const NdbTableImpl * table,
if (fullyQualifiedNames) if (fullyQualifiedNames)
{ {
/* Internal index name format <db>/<schema>/<tabid>/<table> */ /* Internal index name format sys/def/<tabid>/<table> */
ret.assfmt("%s%d%c%s", ret.assfmt("%s%d%c%s",
theImpl->m_prefix.c_str(), theImpl->m_systemPrefix.c_str(),
table->m_id, table->m_id,
table_name_separator, table_name_separator,
external_name); external_name);
......
...@@ -1618,6 +1618,14 @@ NdbDictionary::Dictionary::listIndexes(List& list, ...@@ -1618,6 +1618,14 @@ NdbDictionary::Dictionary::listIndexes(List& list,
return m_impl.listIndexes(list, tab->getTableId()); return m_impl.listIndexes(list, tab->getTableId());
} }
int
NdbDictionary::Dictionary::listIndexes(List& list,
const NdbDictionary::Table &table) const
{
return m_impl.listIndexes(list, table.getTableId());
}
const struct NdbError & const struct NdbError &
NdbDictionary::Dictionary::getNdbError() const { NdbDictionary::Dictionary::getNdbError() const {
return m_impl.getNdbError(); return m_impl.getNdbError();
......
...@@ -1512,9 +1512,21 @@ NdbTableImpl * ...@@ -1512,9 +1512,21 @@ NdbTableImpl *
NdbDictionaryImpl::getIndexTable(NdbIndexImpl * index, NdbDictionaryImpl::getIndexTable(NdbIndexImpl * index,
NdbTableImpl * table) NdbTableImpl * table)
{ {
const char *current_db= m_ndb.getDatabaseName();
NdbTableImpl *index_table;
const BaseString internalName( const BaseString internalName(
m_ndb.internalize_index_name(table, index->getName())); m_ndb.internalize_index_name(table, index->getName()));
return getTable(m_ndb.externalizeTableName(internalName.c_str())); // Get index table in system database
m_ndb.setDatabaseName(NDB_SYSTEM_DATABASE);
index_table= getTable(m_ndb.externalizeTableName(internalName.c_str()));
m_ndb.setDatabaseName(current_db);
if (!index_table)
{
// Index table not found
// Try geting index table in current database (old format)
index_table= getTable(m_ndb.externalizeTableName(internalName.c_str()));
}
return index_table;
} }
#if 0 #if 0
......
...@@ -1022,6 +1022,33 @@ NdbDictionaryImpl::getIndexGlobal(const char * index_name, ...@@ -1022,6 +1022,33 @@ NdbDictionaryImpl::getIndexGlobal(const char * index_name,
} }
break; break;
} }
{
// Index not found, try old format
const BaseString
old_internal_indexname(m_ndb.old_internalize_index_name(&ndbtab,
index_name));
retry= 2;
while (retry)
{
NdbTableImpl *tab=
fetchGlobalTableImplRef(InitIndex(old_internal_indexname,
index_name, ndbtab));
if (tab)
{
// tab->m_index sould be set. otherwise tab == 0
NdbIndexImpl *idx= tab->m_index;
if (idx->m_table_id != (unsigned)ndbtab.getObjectId() ||
idx->m_table_version != (unsigned)ndbtab.getObjectVersion())
{
releaseIndexGlobal(*idx, 1);
retry--;
continue;
}
DBUG_RETURN(idx);
}
break;
}
}
m_error.code= 4243; m_error.code= 4243;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1088,17 +1115,41 @@ NdbDictionaryImpl::getIndex(const char* index_name, ...@@ -1088,17 +1115,41 @@ NdbDictionaryImpl::getIndex(const char* index_name,
index_name, index_name,
prim)); prim));
if (!tab) if (!tab)
goto err; goto retry;
info= Ndb_local_table_info::create(tab, 0); info= Ndb_local_table_info::create(tab, 0);
if (!info) if (!info)
goto err; goto retry;
m_localHash.put(internal_indexname.c_str(), info); m_localHash.put(internal_indexname.c_str(), info);
} }
else else
tab= info->m_table_impl; tab= info->m_table_impl;
return tab->m_index; return tab->m_index;
retry:
// Index not found, try fetching it from current database
const BaseString
old_internal_indexname(m_ndb.old_internalize_index_name(&prim, index_name));
info= m_localHash.get(old_internal_indexname.c_str());
if (info == 0)
{
tab= fetchGlobalTableImplRef(InitIndex(old_internal_indexname,
index_name,
prim));
if (!tab)
goto err;
info= Ndb_local_table_info::create(tab, 0);
if (!info)
goto err;
m_localHash.put(old_internal_indexname.c_str(), info);
}
else
tab= info->m_table_impl;
return tab->m_index;
err: err:
m_error.code= 4243; m_error.code= 4243;
......
...@@ -93,6 +93,8 @@ public: ...@@ -93,6 +93,8 @@ public:
m_schemaname.c_str(), table_name_separator); m_schemaname.c_str(), table_name_separator);
} }
BaseString m_systemPrefix; // Buffer for preformatted for <sys>/<def>/
/** /**
* NOTE free lists must be _after_ theNdbObjectIdMap take * NOTE free lists must be _after_ theNdbObjectIdMap take
* assure that destructors are run in correct order * assure that destructors are run in correct order
......
...@@ -223,6 +223,9 @@ NdbImpl::NdbImpl(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -223,6 +223,9 @@ NdbImpl::NdbImpl(Ndb_cluster_connection *ndb_cluster_connection,
} }
m_optimized_node_selection= m_optimized_node_selection=
m_ndb_cluster_connection.m_optimized_node_selection; m_ndb_cluster_connection.m_optimized_node_selection;
m_systemPrefix.assfmt("%s%c%s%c", NDB_SYSTEM_DATABASE, table_name_separator,
NDB_SYSTEM_SCHEMA, table_name_separator);
} }
NdbImpl::~NdbImpl() NdbImpl::~NdbImpl()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment