Commit 0f0635ec authored by unknown's avatar unknown

Add extra ordered index in when UNIQUE and PRIMARY KEY are specified.

Extra ordered indexes are created primarily 
to support partial key scan/read and range scans of hash indexes.
I.e. the ordered index are used instead of the hash indexes for 
these queries.


sql/ha_ndbcluster.cc:
  Add creation and use of extra ordered indexes on pk and unique indexes.
sql/ha_ndbcluster.h:
  Added new functions for creating extra ordered indexes
parent 11538b81
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#define USE_DISCOVER_ON_STARTUP #define USE_DISCOVER_ON_STARTUP
//#define USE_NDB_POOL //#define USE_NDB_POOL
//#define USE_EXTRA_ORDERED_INDEX
// Default value for parallelism // Default value for parallelism
static const int parallelism= 240; static const int parallelism= 240;
...@@ -64,6 +65,10 @@ typedef NdbDictionary::Dictionary NDBDICT; ...@@ -64,6 +65,10 @@ typedef NdbDictionary::Dictionary NDBDICT;
bool ndbcluster_inited= false; bool ndbcluster_inited= false;
#ifdef USE_EXTRA_ORDERED_INDEX
static const char* unique_suffix= "$unique";
#endif
// Handler synchronization // Handler synchronization
pthread_mutex_t ndbcluster_mutex; pthread_mutex_t ndbcluster_mutex;
...@@ -95,6 +100,7 @@ static const err_code_mapping err_map[]= ...@@ -95,6 +100,7 @@ static const err_code_mapping err_map[]=
{ 630, HA_ERR_FOUND_DUPP_KEY }, { 630, HA_ERR_FOUND_DUPP_KEY },
{ 893, HA_ERR_FOUND_DUPP_UNIQUE }, { 893, HA_ERR_FOUND_DUPP_UNIQUE },
{ 721, HA_ERR_TABLE_EXIST }, { 721, HA_ERR_TABLE_EXIST },
{ 4244, HA_ERR_TABLE_EXIST },
{ 241, HA_ERR_OLD_METADATA }, { 241, HA_ERR_OLD_METADATA },
{ -1, -1 } { -1, -1 }
}; };
...@@ -354,15 +360,37 @@ int ha_ndbcluster::get_metadata(const char *path) ...@@ -354,15 +360,37 @@ int ha_ndbcluster::get_metadata(const char *path)
m_table= (void*)tab; m_table= (void*)tab;
for (i= 0; i < MAX_KEY; i++) for (i= 0; i < MAX_KEY; i++)
{
m_indextype[i]= UNDEFINED_INDEX; m_indextype[i]= UNDEFINED_INDEX;
m_unique_index_name[i]= NULL;
}
// Save information about all known indexes // Save information about all known indexes
for (i= 0; i < table->keys; i++) for (i= 0; i < table->keys; i++)
m_indextype[i] = get_index_type_from_table(i); {
m_indextype[i]= get_index_type_from_table(i);
#ifdef USE_EXTRA_ORDERED_INDEX
if (m_indextype[i] == UNIQUE_INDEX)
{
char *name;
const char *index_name= get_index_name(i);
int name_len= strlen(index_name)+strlen(unique_suffix)+1;
if (!(name= my_malloc(name_len, MYF(MY_WME))))
DBUG_RETURN(2);
strxnmov(name, name_len, index_name, unique_suffix, NullS);
m_unique_index_name[i]= name;
DBUG_PRINT("info", ("Created unique index name: %s for index %d",
name, i));
}
#endif
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/* /*
Decode the type of an index from information Decode the type of an index from information
provided in table object provided in table object
...@@ -380,11 +408,19 @@ NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint index_no) const ...@@ -380,11 +408,19 @@ NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint index_no) const
void ha_ndbcluster::release_metadata() void ha_ndbcluster::release_metadata()
{ {
int i;
DBUG_ENTER("release_metadata"); DBUG_ENTER("release_metadata");
DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
m_table= NULL; m_table= NULL;
for (i= 0; i < MAX_KEY; i++)
{
my_free((char*)m_unique_index_name[i], MYF(MY_ALLOW_ZERO_PTR));
m_unique_index_name[i]= NULL;
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -416,6 +452,18 @@ inline const char* ha_ndbcluster::get_index_name(uint idx_no) const ...@@ -416,6 +452,18 @@ inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
return table->keynames.type_names[idx_no]; return table->keynames.type_names[idx_no];
} }
inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const
{
#ifdef USE_EXTRA_ORDERED_INDEX
DBUG_ASSERT(idx_no < MAX_KEY);
DBUG_ASSERT(m_unique_index_name[idx_no]);
return m_unique_index_name[idx_no];
#else
return get_index_name(idx_no);
#endif
}
inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
{ {
DBUG_ASSERT(idx_no < MAX_KEY); DBUG_ASSERT(idx_no < MAX_KEY);
...@@ -550,7 +598,6 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -550,7 +598,6 @@ int ha_ndbcluster::unique_index_read(const byte *key,
uint key_len, byte *buf) uint key_len, byte *buf)
{ {
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
const char *index_name;
NdbIndexOperation *op; NdbIndexOperation *op;
THD *thd= current_thd; THD *thd= current_thd;
byte *key_ptr; byte *key_ptr;
...@@ -560,9 +607,10 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -560,9 +607,10 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_ENTER("unique_index_read"); DBUG_ENTER("unique_index_read");
DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index)); DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
DBUG_DUMP("key", (char*)key, key_len); DBUG_DUMP("key", (char*)key, key_len);
DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index)));
index_name= get_index_name(active_index); if (!(op= trans->getNdbIndexOperation(get_unique_index_name(active_index),
if (!(op= trans->getNdbIndexOperation(index_name, m_tabname)) || m_tabname)) ||
op->readTuple() != 0) op->readTuple() != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
...@@ -1349,15 +1397,35 @@ int ha_ndbcluster::index_read(byte *buf, ...@@ -1349,15 +1397,35 @@ int ha_ndbcluster::index_read(byte *buf,
DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d", DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d",
active_index, key_len, find_flag)); active_index, key_len, find_flag));
KEY* key_info;
int error= 1; int error= 1;
statistic_increment(ha_read_key_count, &LOCK_status); statistic_increment(ha_read_key_count, &LOCK_status);
switch (get_index_type(active_index)){ switch (get_index_type(active_index)){
case PRIMARY_KEY_INDEX: case PRIMARY_KEY_INDEX:
#ifdef USE_EXTRA_ORDERED_INDEX
key_info= table->key_info + active_index;
if (key_len < key_info->key_length ||
find_flag != HA_READ_KEY_EXACT)
{
error= ordered_index_scan(key, key_len, buf, find_flag);
break;
}
#endif
error= pk_read(key, key_len, buf); error= pk_read(key, key_len, buf);
break; break;
case UNIQUE_INDEX: case UNIQUE_INDEX:
#ifdef USE_EXTRA_ORDERED_INDEX
key_info= table->key_info + active_index;
if (key_len < key_info->key_length ||
find_flag != HA_READ_KEY_EXACT)
{
error= ordered_index_scan(key, key_len, buf, find_flag);
break;
}
#endif
error= unique_index_read(key, key_len, buf); error= unique_index_read(key, key_len, buf);
break; break;
...@@ -1391,7 +1459,7 @@ int ha_ndbcluster::index_next(byte *buf) ...@@ -1391,7 +1459,7 @@ int ha_ndbcluster::index_next(byte *buf)
int error = 1; int error = 1;
statistic_increment(ha_read_next_count,&LOCK_status); statistic_increment(ha_read_next_count,&LOCK_status);
if (get_index_type(active_index) == PRIMARY_KEY_INDEX) if (!m_active_cursor)
error= HA_ERR_END_OF_FILE; error= HA_ERR_END_OF_FILE;
else else
error = next_result(buf); error = next_result(buf);
...@@ -2041,9 +2109,8 @@ int ha_ndbcluster::create(const char *name, ...@@ -2041,9 +2109,8 @@ int ha_ndbcluster::create(const char *name,
NdbDictionary::Column::Type ndb_type; NdbDictionary::Column::Type ndb_type;
NDBCOL col; NDBCOL col;
uint pack_length, length, i; uint pack_length, length, i;
int res;
const void *data, *pack_data; const void *data, *pack_data;
const char **key_name= form->keynames.type_names; const char **key_names= form->keynames.type_names;
char name2[FN_HEADLEN]; char name2[FN_HEADLEN];
DBUG_ENTER("create"); DBUG_ENTER("create");
...@@ -2086,13 +2153,11 @@ int ha_ndbcluster::create(const char *name, ...@@ -2086,13 +2153,11 @@ int ha_ndbcluster::create(const char *name,
col.setPrimaryKey(field->flags & PRI_KEY_FLAG); col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
if (field->flags & AUTO_INCREMENT_FLAG) if (field->flags & AUTO_INCREMENT_FLAG)
{ {
DBUG_PRINT("info", ("Found auto_increment key"));
col.setAutoIncrement(TRUE); col.setAutoIncrement(TRUE);
ulonglong value = info->auto_increment_value ? ulonglong value= info->auto_increment_value ?
info->auto_increment_value -1 : info->auto_increment_value -1 : (ulonglong) 0;
(ulonglong) 0; DBUG_PRINT("info", ("Autoincrement key, initial: %d", value));
DBUG_PRINT("info", ("initial value=%ld", value)); col.setAutoIncrementInitialValue(value);
// col.setInitialAutIncValue(value);
} }
else else
col.setAutoIncrement(false); col.setAutoIncrement(false);
...@@ -2143,41 +2208,81 @@ int ha_ndbcluster::create(const char *name, ...@@ -2143,41 +2208,81 @@ int ha_ndbcluster::create(const char *name,
} }
// Create secondary indexes // Create secondary indexes
for (i= 0; i < form->keys; i++) KEY* key_info= form->key_info;
const char** key_name= key_names;
for (i= 0; i < form->keys; i++, key_info++, key_name++)
{ {
DBUG_PRINT("info", ("Found index %u: %s", i, key_name[i])); int error= 0;
DBUG_PRINT("info", ("Index %u: %s", i, *key_name));
if (i == form->primary_key) if (i == form->primary_key)
{ {
DBUG_PRINT("info", ("Skipping it, PK already created")); #ifdef USE_EXTRA_ORDERED_INDEX
continue; error= create_ordered_index(*key_name, key_info);
#endif
} }
else if (key_info->flags & HA_NOSAME)
error= create_unique_index(*key_name, key_info);
else
error= create_ordered_index(*key_name, key_info);
DBUG_PRINT("info", ("Creating index %u: %s", i, key_name[i]));
res= create_index(key_name[i], if (error)
form->key_info + i); {
switch(res){
case 0:
// OK
break;
default:
DBUG_PRINT("error", ("Failed to create index %u", i)); DBUG_PRINT("error", ("Failed to create index %u", i));
drop_table(); drop_table();
my_errno= res; my_errno= error;
goto err_end; break;
} }
} }
err_end:
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} }
int ha_ndbcluster::create_ordered_index(const char *name,
KEY *key_info)
{
DBUG_ENTER("create_ordered_index");
DBUG_RETURN(create_index(name, key_info, false));
}
int ha_ndbcluster::create_unique_index(const char *name,
KEY *key_info)
{
int error;
const char* unique_name= name;
DBUG_ENTER("create_unique_index");
#ifdef USE_EXTRA_ORDERED_INDEX
char buf[FN_HEADLEN];
strxnmov(buf, FN_HEADLEN, name, unique_suffix, NullS);
unique_name= buf;
#endif
error= create_index(unique_name, key_info, true);
if (error)
DBUG_RETURN(error);
#ifdef USE_EXTRA_ORDERED_INDEX
/*
If unique index contains more then one attribute
an ordered index should be created to support
partial key search
*/
error= create_ordered_index(name, key_info);
#endif
DBUG_RETURN(error);
}
/* /*
Create an index in NDB Cluster Create an index in NDB Cluster
*/ */
int ha_ndbcluster::create_index(const char *name, int ha_ndbcluster::create_index(const char *name,
KEY *key_info){ KEY *key_info,
bool unique)
{
NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
KEY_PART_INFO *key_part= key_info->key_part; KEY_PART_INFO *key_part= key_info->key_part;
KEY_PART_INFO *end= key_part + key_info->key_parts; KEY_PART_INFO *end= key_part + key_info->key_parts;
...@@ -2185,12 +2290,8 @@ int ha_ndbcluster::create_index(const char *name, ...@@ -2185,12 +2290,8 @@ int ha_ndbcluster::create_index(const char *name,
DBUG_ENTER("create_index"); DBUG_ENTER("create_index");
DBUG_PRINT("enter", ("name: %s ", name)); DBUG_PRINT("enter", ("name: %s ", name));
// Check that an index with the same name do not already exist
if (dict->getIndex(name, m_tabname))
ERR_RETURN(dict->getNdbError());
NdbDictionary::Index ndb_index(name); NdbDictionary::Index ndb_index(name);
if (key_info->flags & HA_NOSAME) if (unique)
ndb_index.setType(NdbDictionary::Index::UniqueHashIndex); ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
else else
{ {
...@@ -2349,6 +2450,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -2349,6 +2450,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
HA_NOT_READ_AFTER_KEY), HA_NOT_READ_AFTER_KEY),
m_use_write(false) m_use_write(false)
{ {
int i;
DBUG_ENTER("ha_ndbcluster"); DBUG_ENTER("ha_ndbcluster");
...@@ -2360,6 +2462,12 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -2360,6 +2462,12 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
records= 100; records= 100;
block_size= 1024; block_size= 1024;
for (i= 0; i < MAX_KEY; i++)
{
m_indextype[i]= UNDEFINED_INDEX;
m_unique_index_name[i]= NULL;
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -2765,6 +2873,7 @@ ha_ndbcluster::records_in_range(int inx, ...@@ -2765,6 +2873,7 @@ ha_ndbcluster::records_in_range(int inx,
DBUG_PRINT("enter", ("end_key: %x, end_key_len: %d", end_key, end_key_len)); DBUG_PRINT("enter", ("end_key: %x, end_key_len: %d", end_key, end_key_len));
DBUG_PRINT("enter", ("end_search_flag: %d", end_search_flag)); DBUG_PRINT("enter", ("end_search_flag: %d", end_search_flag));
#ifndef USE_EXTRA_ORDERED_INDEX
/* /*
Check that start_key_len is equal to Check that start_key_len is equal to
the length of the used index and the length of the used index and
...@@ -2779,6 +2888,14 @@ ha_ndbcluster::records_in_range(int inx, ...@@ -2779,6 +2888,14 @@ ha_ndbcluster::records_in_range(int inx,
key_length)); key_length));
records= HA_POS_ERROR; records= HA_POS_ERROR;
} }
#else
/*
Extra ordered indexes are created primarily
to support partial key scan/read and range scans of hash indexes.
I.e. the ordered index are used instead of the hash indexes for
these queries.
*/
#endif
DBUG_RETURN(records); DBUG_RETURN(records);
} }
......
...@@ -135,11 +135,14 @@ class ha_ndbcluster: public handler ...@@ -135,11 +135,14 @@ class ha_ndbcluster: public handler
private: private:
int alter_table_name(const char *from, const char *to); int alter_table_name(const char *from, const char *to);
int drop_table(); int drop_table();
int create_index(const char *name, KEY *key_info); int create_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info);
int create_unique_index(const char *name, KEY *key_info);
int initialize_autoincrement(const void* table); int initialize_autoincrement(const void* table);
int get_metadata(const char* path); int get_metadata(const char* path);
void release_metadata(); void release_metadata();
const char* get_index_name(uint idx_no) const; const char* get_index_name(uint idx_no) const;
const char* get_unique_index_name(uint idx_no) const;
NDB_INDEX_TYPE get_index_type(uint idx_no) const; NDB_INDEX_TYPE get_index_type(uint idx_no) const;
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const; NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
...@@ -193,6 +196,7 @@ class ha_ndbcluster: public handler ...@@ -193,6 +196,7 @@ class ha_ndbcluster: public handler
THR_LOCK_DATA m_lock; THR_LOCK_DATA m_lock;
NDB_SHARE *m_share; NDB_SHARE *m_share;
NDB_INDEX_TYPE m_indextype[MAX_KEY]; NDB_INDEX_TYPE m_indextype[MAX_KEY];
const char* m_unique_index_name[MAX_KEY];
NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
bool m_use_write; bool m_use_write;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment