Commit 3f78e8b8 authored by marty@linux.site's avatar marty@linux.site

Added possibillity to check what fields will get added indexes (ndb does...

Added possibillity to check what fields will get added indexes (ndb does currently not support indexes on disk stored fields), WL#1892
parent c61ee0fb
......@@ -63,6 +63,7 @@ public:
const char **table_name, *field_name;
LEX_STRING comment;
query_id_t query_id; // For quick test of used fields
bool add_index; // For check if field will be indexed
/* Field is part of the following keys */
key_map key_start,part_of_key,part_of_sortkey;
/*
......
......@@ -1241,10 +1241,9 @@ int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab)
Renumber indexes in index list by shifting out
indexes that are to be dropped
*/
int ha_ndbcluster::renumber_indexes(Ndb *ndb, TABLE *tab)
void ha_ndbcluster::renumber_indexes(Ndb *ndb, TABLE *tab)
{
uint i;
int error= 0;
const char *index_name;
KEY* key_info= tab->key_info;
const char **key_name= tab->s->keynames.type_names;
......@@ -1273,7 +1272,7 @@ int ha_ndbcluster::renumber_indexes(Ndb *ndb, TABLE *tab)
}
}
DBUG_RETURN(error);
DBUG_VOID_RETURN;
}
/*
......@@ -4393,7 +4392,7 @@ int ha_ndbcluster::create_handler_files(const char *file)
NDBDICT *dict= ndb->getDictionary();
if (!(tab= dict->getTable(m_tabname)))
DBUG_RETURN(0); // Must be a create, ignore since frm is saved in create
DBUG_ASSERT(m_share->state == NSS_ALTERED);
name= table->s->normalized_path.str;
DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, name));
if (readfrm(name, &data, &length) ||
......@@ -4402,17 +4401,18 @@ int ha_ndbcluster::create_handler_files(const char *file)
DBUG_PRINT("info", ("Missing frm for %s", m_tabname));
my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(1);
error= 1;
}
if (cmp_frm(tab, pack_data, pack_length))
{
else
{
DBUG_PRINT("info", ("Table %s has changed, altering frm in ndb",
m_tabname));
error= table_changed(pack_data, pack_length);
m_share->state= NSS_INITIAL;
my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
}
my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
m_share->state= NSS_INITIAL;
free_share(&m_share); // Decrease ref_count
DBUG_RETURN(error);
}
......@@ -4529,6 +4529,7 @@ int ha_ndbcluster::add_index(TABLE *table_arg,
int error= 0;
uint idx;
DBUG_ASSERT(m_share->state == NSS_INITIAL);
for (idx= 0; idx < num_of_keys; idx++)
{
KEY *key= key_info + idx;
......@@ -4544,7 +4545,11 @@ int ha_ndbcluster::add_index(TABLE *table_arg,
if((error= create_index(key_info[idx].name, key, idx_type, idx)))
break;
}
m_share->state= NSS_ALTERED;
if (!error)
{
ndbcluster_get_share(m_share); // Increase ref_count
m_share->state= NSS_ALTERED;
}
DBUG_RETURN(error);
}
......@@ -4568,6 +4573,7 @@ int ha_ndbcluster::prepare_drop_index(TABLE *table_arg,
uint *key_num, uint num_of_keys)
{
DBUG_ENTER("ha_ndbcluster::prepare_drop_index");
DBUG_ASSERT(m_share->state == NSS_INITIAL);
// Mark indexes for deletion
uint idx;
for (idx= 0; idx < num_of_keys; idx++)
......@@ -4579,8 +4585,10 @@ int ha_ndbcluster::prepare_drop_index(TABLE *table_arg,
THD *thd= current_thd;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
renumber_indexes(ndb, table_arg);
ndbcluster_get_share(m_share); // Increase ref_count
m_share->state= NSS_ALTERED;
DBUG_RETURN(renumber_indexes(ndb, table_arg));
DBUG_RETURN(0);
}
/*
......@@ -4588,13 +4596,19 @@ int ha_ndbcluster::prepare_drop_index(TABLE *table_arg,
*/
int ha_ndbcluster::final_drop_index(TABLE *table_arg)
{
int error;
DBUG_ENTER("ha_ndbcluster::final_drop_index");
DBUG_PRINT("info", ("ha_ndbcluster::final_drop_index"));
// Really drop indexes
THD *thd= current_thd;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
DBUG_RETURN(drop_indexes(ndb, table_arg));
if((error= drop_indexes(ndb, table_arg)))
{
m_share->state= NSS_INITIAL;
free_share(&m_share); // Decrease ref_count
}
DBUG_RETURN(error);
}
/*
......@@ -5212,41 +5226,66 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name,
const void* data;
const NDBTAB* tab;
Ndb* ndb;
char key[FN_REFLEN];
DBUG_ENTER("ndbcluster_discover");
DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
ndb->setDatabaseName(db);
NDBDICT* dict= ndb->getDictionary();
dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
dict->invalidateTable(name);
if (!(tab= dict->getTable(name)))
{
const NdbError err= dict->getNdbError();
if (err.code == 709 || err.code == 723)
DBUG_RETURN(-1);
ERR_RETURN(err);
}
DBUG_PRINT("info", ("Found table %s", tab->getName()));
len= tab->getFrmLength();
if (len == 0 || tab->getFrmData() == NULL)
strxnmov(key, FN_LEN-1, mysql_data_home, "/", db, "/", name, NullS);
NDB_SHARE *share= get_share(key, 0, false);
if (share && share->state == NSS_ALTERED)
{
DBUG_PRINT("error", ("No frm data found."));
DBUG_RETURN(1);
// Frm has been altered on disk, but not yet written to ndb
if (readfrm(key, &data, &len))
{
DBUG_PRINT("error", ("Could not read frm"));
if (share)
free_share(&share);
DBUG_RETURN(1);
}
}
if (unpackfrm(&data, &len, tab->getFrmData()))
else
{
DBUG_PRINT("error", ("Could not unpack table"));
DBUG_RETURN(1);
if (!(tab= dict->getTable(name)))
{
const NdbError err= dict->getNdbError();
if (share)
free_share(&share);
if (err.code == 709 || err.code == 723)
DBUG_RETURN(-1);
ERR_RETURN(err);
}
DBUG_PRINT("info", ("Found table %s", tab->getName()));
len= tab->getFrmLength();
if (len == 0 || tab->getFrmData() == NULL)
{
DBUG_PRINT("error", ("No frm data found."));
if (share)
free_share(&share);
DBUG_RETURN(1);
}
if (unpackfrm(&data, &len, tab->getFrmData()))
{
DBUG_PRINT("error", ("Could not unpack table"));
if (share)
free_share(&share);
DBUG_RETURN(1);
}
}
*frmlen= len;
*frmblob= data;
if (share)
free_share(&share);
DBUG_RETURN(0);
}
......@@ -9192,21 +9231,38 @@ uint ha_ndbcluster::set_up_partition_info(partition_info *part_info,
bool ha_ndbcluster::check_if_incompatible_data(HA_CREATE_INFO *info,
uint table_changes)
{
return COMPATIBLE_DATA_NO; // Disable fast add/drop index
DBUG_ENTER("ha_ndbcluster::check_if_incompatible_data");
uint i;
const NDBTAB *tab= (const NDBTAB *) m_table;
#ifdef HAVE_NDB_BINLOG
DBUG_PRINT("info", ("add/drop index not supported with binlog"));
DBUG_RETURN(COMPATIBLE_DATA_NO); // Disable fast add/drop index with binlog
#endif
for (i= 0; i < table->s->fields; i++)
{
Field *field= table->field[i];
const NDBCOL *col= tab->getColumn(field->field_name);
if (field->add_index &&
col->getStorageType() == NdbDictionary::Column::StorageTypeDisk)
{
DBUG_PRINT("info", ("add/drop index not supported for disk stored column"));
DBUG_RETURN(COMPATIBLE_DATA_NO);
}
}
if (table_changes != IS_EQUAL_YES)
return COMPATIBLE_DATA_NO;
DBUG_RETURN(COMPATIBLE_DATA_NO);
/* Check that auto_increment value was not changed */
if ((info->used_fields & HA_CREATE_USED_AUTO) &&
info->auto_increment_value != 0)
return COMPATIBLE_DATA_NO;
DBUG_RETURN(COMPATIBLE_DATA_NO);
/* Check that row format didn't change */
if ((info->used_fields & HA_CREATE_USED_AUTO) &&
get_row_type() != info->row_type)
return COMPATIBLE_DATA_NO;
DBUG_RETURN(COMPATIBLE_DATA_NO);
return COMPATIBLE_DATA_YES;
DBUG_RETURN(COMPATIBLE_DATA_YES);
}
bool set_up_tablespace(st_alter_tablespace *info,
......
......@@ -685,7 +685,7 @@ private:
void clear_index(int i);
void clear_indexes();
int open_indexes(Ndb *ndb, TABLE *tab);
int renumber_indexes(Ndb *ndb, TABLE *tab);
void renumber_indexes(Ndb *ndb, TABLE *tab);
int drop_indexes(Ndb *ndb, TABLE *tab);
int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict,
KEY *key_info, const char *index_name, uint index_no);
......
......@@ -3705,12 +3705,10 @@ static uint compare_tables(TABLE *table, List<create_field> *create_list,
/* Evaluate changes bitmap and send to check_if_incompatible_data() */
if (!(tmp= field->is_equal(new_field)))
DBUG_RETURN(ALTER_TABLE_DATA_CHANGED);
// Clear indexed marker
field->add_index= 0;
changes|= tmp;
}
/* Check if changes are compatible with current handler without a copy */
if (table->file->check_if_incompatible_data(create_info, changes))
DBUG_RETURN(ALTER_TABLE_DATA_CHANGED);
/*
Go through keys and check if the original ones are compatible
......@@ -3778,6 +3776,11 @@ static uint compare_tables(TABLE *table, List<create_field> *create_list,
/* Key modified. Add the offset of the key to both buffers. */
index_drop_buffer[(*index_drop_count)++]= table_key - table->key_info;
index_add_buffer[(*index_add_count)++]= new_key - key_info_buffer;
field= table->field[new_key->key_part->fieldnr];
// Add field to the key
new_key->key_part->field= table->field[new_key->key_part->fieldnr];
// Mark field to be part of new key
field->add_index= 1;
DBUG_PRINT("info", ("index changed: '%s'", table_key->name));
}
/*end of for (; table_key < table_key_end;) */
......@@ -3797,9 +3800,19 @@ static uint compare_tables(TABLE *table, List<create_field> *create_list,
{
/* Key not found. Add the offset of the key to the add buffer. */
index_add_buffer[(*index_add_count)++]= new_key - key_info_buffer;
DBUG_PRINT("info", ("index added: '%s'", new_key->name));
field= table->field[new_key->key_part->fieldnr];
// Add field to the key
new_key->key_part->field= table->field[new_key->key_part->fieldnr];
// Mark field to be part of new key
field->add_index= 1;
DBUG_PRINT("info", ("index added: '%s'", new_key->name));
}
}
/* Check if changes are compatible with current handler without a copy */
if (table->file->check_if_incompatible_data(create_info, changes))
DBUG_RETURN(ALTER_TABLE_DATA_CHANGED);
if (*index_drop_count || *index_add_count)
DBUG_RETURN(ALTER_TABLE_INDEX_CHANGED);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment