Commit 59c9bcb3 authored by mskold@mysql.com's avatar mskold@mysql.com

Merge mskold@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into mysql.com:/usr/local/home/marty/MySQL/mysql-4.1-ndb
parents b9dbef8e 348cff41
DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
drop database if exists test2;
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
......@@ -349,3 +350,30 @@ select * from t7;
adress a b c
No adress 8 NULL 12
drop table t7;
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10)
) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413');
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
insert into t2 select pk1,attr1,attr2 from test.t1;
select * from t2 order by a;
a b c
9410 9412 NULL
9411 9413 17
select b from test.t1, t2 where c = test.t1.attr2;
b
9413
select b,test.t1.attr1 from test.t1, t2 where test.t1.pk1 < a;
b attr1
9413 9412
drop table test.t1, t2;
drop database test2;
DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
drop database if exists test2;
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL
......@@ -206,3 +207,51 @@ begin;
drop table t2;
drop table t3;
drop table t4;
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL
) ENGINE=ndbcluster;
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
begin;
insert into test.t1 values(1,1);
insert into t2 values(1,1,1);
insert into test.t1 values(2,2);
insert into t2 values(2,2,2);
select count(*) from test.t1;
count(*)
2
select count(*) from t2;
count(*)
2
select * from test.t1 where pk1 = 1;
pk1 attr1
1 1
select * from t2 where a = 1;
a b c
1 1 1
select test.t1.attr1
from test.t1, test.t1 as t1x where test.t1.pk1 = t1x.pk1 + 1;
attr1
2
select t2.a
from t2, t2 as t2x where t2.a = t2x.a + 1;
a
2
select test.t1.pk1, a from test.t1,t2 where b > test.t1.attr1;
pk1 a
1 2
rollback;
select count(*) from test.t1;
count(*)
0
select count(*) from t2;
count(*)
0
drop table test.t1, t2;
drop database test2;
......@@ -2,6 +2,7 @@
--disable_warnings
DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
drop database if exists test2;
--enable_warnings
#
......@@ -319,3 +320,36 @@ delete from t7 where b=23;
select * from t7;
drop table t7;
#
# Test multiple databases in one statement
#
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10)
) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413');
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
insert into t2 select pk1,attr1,attr2 from test.t1;
select * from t2 order by a;
select b from test.t1, t2 where c = test.t1.attr2;
select b,test.t1.attr1 from test.t1, t2 where test.t1.pk1 < a;
drop table test.t1, t2;
drop database test2;
......@@ -2,6 +2,7 @@
--disable_warnings
DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
drop database if exists test2;
--enable_warnings
#
......@@ -253,3 +254,45 @@ drop table t2;
drop table t3;
drop table t4;
#
# Test multiple databases in one transaction
#
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL
) ENGINE=ndbcluster;
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
begin;
insert into test.t1 values(1,1);
insert into t2 values(1,1,1);
insert into test.t1 values(2,2);
insert into t2 values(2,2,2);
select count(*) from test.t1;
select count(*) from t2;
select * from test.t1 where pk1 = 1;
select * from t2 where a = 1;
select test.t1.attr1
from test.t1, test.t1 as t1x where test.t1.pk1 = t1x.pk1 + 1;
select t2.a
from t2, t2 as t2x where t2.a = t2x.a + 1;
select test.t1.pk1, a from test.t1,t2 where b > test.t1.attr1;
rollback;
select count(*) from test.t1;
select count(*) from t2;
drop table test.t1, t2;
drop database test2;
......@@ -1416,9 +1416,14 @@ public:
*/
Uint64 getAutoIncrementValue(const char* aTableName,
Uint32 cacheSize = 1);
Uint64 getAutoIncrementValue(NdbDictionary::Table * aTable,
Uint32 cacheSize = 1);
Uint64 readAutoIncrementValue(const char* aTableName);
Uint64 readAutoIncrementValue(NdbDictionary::Table * aTable);
bool setAutoIncrementValue(const char* aTableName, Uint64 val,
bool increase = false);
bool setAutoIncrementValue(NdbDictionary::Table * aTable, Uint64 val,
bool increase = false);
Uint64 getTupleIdFromNdb(const char* aTableName,
Uint32 cacheSize = 1000);
Uint64 getTupleIdFromNdb(Uint32 aTableId,
......
......@@ -19,6 +19,7 @@
#include <ndb_types.h>
#include <NdbError.hpp>
#include <NdbDictionary.hpp>
class NdbConnection;
class NdbOperation;
......@@ -440,6 +441,14 @@ public:
*/
int executePendingBlobOps(Uint8 flags = 0xFF);
// Fast path calls for MySQL ha_ndbcluster
NdbOperation* getNdbOperation(NdbDictionary::Table * table);
NdbIndexOperation* getNdbIndexOperation(NdbDictionary::Index * index,
NdbDictionary::Table * table);
NdbScanOperation* getNdbScanOperation(NdbDictionary::Table * table);
NdbIndexScanOperation* getNdbIndexScanOperation(NdbDictionary::Index * index,
NdbDictionary::Table * table);
private:
/**
* Release completed operations
......@@ -553,6 +562,8 @@ private:
NdbIndexOperation* getNdbIndexOperation(class NdbIndexImpl* anIndex,
class NdbTableImpl* aTable,
NdbOperation* aNextOp = 0);
NdbIndexScanOperation* getNdbIndexScanOperation(NdbIndexImpl* index,
NdbTableImpl* table);
void handleExecuteCompletion();
......
......@@ -736,6 +736,17 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
return tupleId;
}
Uint64
Ndb::getAutoIncrementValue(NdbDictionary::Table * aTable, Uint32 cacheSize)
{
DEBUG_TRACE("getAutoIncrementValue");
if (aTable == 0)
return ~0;
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
return tupleId;
}
Uint64
Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
{
......@@ -770,6 +781,17 @@ Ndb::readAutoIncrementValue(const char* aTableName)
return tupleId;
}
Uint64
Ndb::readAutoIncrementValue(NdbDictionary::Table * aTable)
{
DEBUG_TRACE("readtAutoIncrementValue");
if (aTable == 0)
return ~0;
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
return tupleId;
}
Uint64
Ndb::readTupleIdFromNdb(Uint32 aTableId)
{
......@@ -790,6 +812,16 @@ Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
return setTupleIdInNdb(table->m_tableId, val, increase);
}
bool
Ndb::setAutoIncrementValue(NdbDictionary::Table * aTable, Uint64 val, bool increase)
{
DEBUG_TRACE("setAutoIncrementValue " << val);
if (aTable == 0)
return ~0;
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
return setTupleIdInNdb(table->m_tableId, val, increase);
}
bool
Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase )
{
......
......@@ -995,6 +995,14 @@ NdbConnection::getNdbOperation(NdbTableImpl * tab, NdbOperation* aNextOp)
return NULL;
}//NdbConnection::getNdbOperation()
NdbOperation* NdbConnection::getNdbOperation(NdbDictionary::Table * table)
{
if (table)
return getNdbOperation(& NdbTableImpl::getImpl(*table));
else
return NULL;
}//NdbConnection::getNdbOperation()
// NdbScanOperation
/*****************************************************************************
NdbScanOperation* getNdbScanOperation(const char* aTableName);
......@@ -1037,15 +1045,24 @@ Remark: Get an operation from NdbScanOperation idlelist and get the NdbC
NdbIndexScanOperation*
NdbConnection::getNdbIndexScanOperation(const char* anIndexName,
const char* aTableName)
{
NdbIndexImpl* index =
theNdb->theDictionary->getIndex(anIndexName, aTableName);
NdbTableImpl* table = theNdb->theDictionary->getTable(aTableName);
return getNdbIndexScanOperation(index, table);
}
NdbIndexScanOperation*
NdbConnection::getNdbIndexScanOperation(NdbIndexImpl* index,
NdbTableImpl* table)
{
if (theCommitStatus == Started){
NdbIndexImpl* index =
theNdb->theDictionary->getIndex(anIndexName, aTableName);
NdbTableImpl* table = theNdb->theDictionary->getTable(aTableName);
NdbTableImpl* indexTable =
theNdb->theDictionary->getIndexTable(index, table);
const NdbTableImpl * indexTable = index->getIndexTable();
if (indexTable != 0){
NdbIndexScanOperation* tOp = getNdbScanOperation(indexTable);
NdbIndexScanOperation* tOp =
getNdbScanOperation((NdbTableImpl *) indexTable);
tOp->m_currentTable = table;
if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor;
return tOp;
} else {
......@@ -1056,7 +1073,18 @@ NdbConnection::getNdbIndexScanOperation(const char* anIndexName,
setOperationErrorCodeAbort(4114);
return NULL;
}//NdbConnection::getNdbScanOperation()
}//NdbConnection::getNdbIndexScanOperation()
NdbIndexScanOperation*
NdbConnection::getNdbIndexScanOperation(NdbDictionary::Index * index,
NdbDictionary::Table * table)
{
if (index && table)
return getNdbIndexScanOperation(& NdbIndexImpl::getImpl(*index),
& NdbTableImpl::getImpl(*table));
else
return NULL;
}//NdbConnection::getNdbIndexScanOperation()
/*****************************************************************************
NdbScanOperation* getNdbScanOperation(int aTableId);
......@@ -1097,6 +1125,14 @@ NdbConnection::getNdbScanOperation(NdbTableImpl * tab)
return NULL;
}//NdbConnection::getNdbScanOperation()
NdbScanOperation*
NdbConnection::getNdbScanOperation(NdbDictionary::Table * table)
{
if (table)
return getNdbScanOperation(& NdbTableImpl::getImpl(*table));
else
return NULL;
}//NdbConnection::getNdbScanOperation()
// IndexOperation
......@@ -1191,6 +1227,18 @@ NdbConnection::getNdbIndexOperation(NdbIndexImpl * anIndex,
return NULL;
}//NdbConnection::getNdbIndexOperation()
NdbIndexOperation*
NdbConnection::getNdbIndexOperation(NdbDictionary::Index * index,
NdbDictionary::Table * table)
{
if (index && table)
return getNdbIndexOperation(& NdbIndexImpl::getImpl(*index),
& NdbTableImpl::getImpl(*table));
else
return NULL;
}//NdbConnection::getNdbIndexOperation()
/*******************************************************************************
int receiveDIHNDBTAMPER(NdbApiSignal* aSignal)
......
......@@ -492,6 +492,12 @@ NdbIndexImpl::getTable() const
return m_tableName.c_str();
}
const NdbTableImpl *
NdbIndexImpl::getIndexTable() const
{
return m_table;
}
/**
* NdbEventImpl
*/
......
......@@ -170,6 +170,7 @@ public:
const char * getName() const;
void setTable(const char * table);
const char * getTable() const;
const NdbTableImpl * getIndexTable() const;
Uint32 m_indexId;
BaseString m_internalName;
......
......@@ -188,12 +188,15 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
m_keyInfo = lockExcl;
bool range = false;
if (m_currentTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
m_currentTable->m_indexType == NdbDictionary::Index::UniqueOrderedIndex){
assert(m_currentTable == m_accessTable);
m_currentTable = theNdb->theDictionary->
getTable(m_currentTable->m_primaryTable.c_str());
assert(m_currentTable != NULL);
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
m_accessTable->m_indexType == NdbDictionary::Index::UniqueOrderedIndex){
if (m_currentTable == m_accessTable){
// Old way of scanning indexes, should not be allowed
m_currentTable = theNdb->theDictionary->
getTable(m_currentTable->m_primaryTable.c_str());
assert(m_currentTable != NULL);
}
assert (m_currentTable != m_accessTable);
// Modify operation state
theStatus = SetBound;
theOperationType = OpenRangeScanRequest;
......
......@@ -542,41 +542,95 @@ int ha_ndbcluster::get_metadata(const char *path)
// All checks OK, lets use the table
m_table= (void*)tab;
DBUG_RETURN(build_index_list());
DBUG_RETURN(build_index_list(table, ILBP_OPEN));
}
int ha_ndbcluster::build_index_list()
int ha_ndbcluster::build_index_list(TABLE *tab, enum IBLP phase)
{
int error= 0;
char *name;
const char *index_name;
static const char* unique_suffix= "$unique";
uint i, name_len;
KEY* key_info= tab->key_info;
const char **key_name= tab->keynames.type_names;
NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
DBUG_ENTER("build_index_list");
// Save information about all known indexes
for (i= 0; i < table->keys; i++)
for (i= 0; i < tab->keys; i++, key_info++, key_name++)
{
index_name= *key_name;
NDB_INDEX_TYPE idx_type= get_index_type_from_table(i);
m_indextype[i]= idx_type;
m_index[i].type= idx_type;
if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
{
index_name= get_index_name(i);
name_len= strlen(index_name)+strlen(unique_suffix)+1;
// Create name for unique index by appending "$unique";
if (!(name= my_malloc(name_len, MYF(MY_WME))))
DBUG_RETURN(2);
strxnmov(name, name_len, index_name, unique_suffix, NullS);
m_unique_index_name[i]= name;
m_index[i].unique_name= name;
DBUG_PRINT("info", ("Created unique index name: %s for index %d",
name, i));
}
// Create secondary indexes if in create phase
if (phase == ILBP_CREATE)
{
DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));
switch (m_index[i].type){
case PRIMARY_KEY_INDEX:
// Do nothing, already created
break;
case PRIMARY_KEY_ORDERED_INDEX:
error= create_ordered_index(index_name, key_info);
break;
case UNIQUE_ORDERED_INDEX:
if (!(error= create_ordered_index(index_name, key_info)))
error= create_unique_index(get_unique_index_name(i), key_info);
break;
case UNIQUE_INDEX:
error= create_unique_index(get_unique_index_name(i), key_info);
break;
case ORDERED_INDEX:
error= create_ordered_index(index_name, key_info);
break;
default:
DBUG_ASSERT(false);
break;
}
if (error)
{
DBUG_PRINT("error", ("Failed to create index %u", i));
drop_table();
break;
}
}
// Add handles to index objects
DBUG_PRINT("info", ("Trying to add handle to index %s", index_name));
if ((m_index[i].type != PRIMARY_KEY_INDEX) &&
(m_index[i].type != UNIQUE_INDEX))
{
const NDBINDEX *index= dict->getIndex(index_name, m_tabname);
if (!index) DBUG_RETURN(1);
m_index[i].index= (void *) index;
}
if (m_index[i].unique_name)
{
const NDBINDEX *index= dict->getIndex(m_index[i].unique_name, m_tabname);
if (!index) DBUG_RETURN(1);
m_index[i].unique_index= (void *) index;
}
DBUG_PRINT("info", ("Added handle to index %s", index_name));
}
DBUG_RETURN(0);
DBUG_RETURN(error);
}
/*
Decode the type of an index from information
provided in table object
......@@ -605,9 +659,11 @@ void ha_ndbcluster::release_metadata()
// Release index list
for (i= 0; i < MAX_KEY; i++)
{
if (m_unique_index_name[i])
my_free((char*)m_unique_index_name[i], MYF(0));
m_unique_index_name[i]= NULL;
if (m_index[i].unique_name)
my_free((char*)m_index[i].unique_name, MYF(0));
m_index[i].unique_name= NULL;
m_index[i].unique_index= NULL;
m_index[i].index= NULL;
}
DBUG_VOID_RETURN;
......@@ -667,13 +723,13 @@ inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const
{
return m_unique_index_name[idx_no];
return m_index[idx_no].unique_name;
}
inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
{
DBUG_ASSERT(idx_no < MAX_KEY);
return m_indextype[idx_no];
return m_index[idx_no].type;
}
......@@ -763,7 +819,8 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
DBUG_PRINT("enter", ("key_len: %u", key_len));
DBUG_DUMP("key", (char*)key, key_len);
if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0)
if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
op->readTuple() != 0)
ERR_RETURN(trans->getNdbError());
if (table->primary_key == MAX_KEY)
......@@ -831,7 +888,8 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
// We have allready retrieved all fields, nothing to complement
DBUG_RETURN(0);
if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0)
if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
op->readTuple() != 0)
ERR_RETURN(trans->getNdbError());
int res;
......@@ -882,8 +940,9 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_DUMP("key", (char*)key, key_len);
DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index)));
if (!(op= trans->getNdbIndexOperation(get_unique_index_name(active_index),
m_tabname)) ||
if (!(op= trans->getNdbIndexOperation((NDBINDEX *)
m_index[active_index].unique_index,
(NDBTAB *) m_table)) ||
op->readTuple() != 0)
ERR_RETURN(trans->getNdbError());
......@@ -1083,7 +1142,9 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname)))
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
m_index[active_index].index,
(NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
......@@ -1146,7 +1207,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_PRINT("info", ("Starting a new filtered scan on %s",
m_tabname));
if (!(op= trans->getNdbScanOperation(m_tabname)))
if (!(op= trans->getNdbScanOperation((NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
......@@ -1217,7 +1278,7 @@ int ha_ndbcluster::full_table_scan(byte *buf)
DBUG_ENTER("full_table_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
if (!(op=trans->getNdbScanOperation(m_tabname)))
if (!(op=trans->getNdbScanOperation((NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
......@@ -1294,7 +1355,7 @@ int ha_ndbcluster::write_row(byte *record)
has_auto_increment= (table->next_number_field && record == table->record[0]);
skip_auto_increment= table->auto_increment_field_not_null;
if (!(op= trans->getNdbOperation(m_tabname)))
if (!(op= trans->getNdbOperation((NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
res= (m_use_write) ? op->writeTuple() :op->insertTuple();
......@@ -1304,7 +1365,7 @@ int ha_ndbcluster::write_row(byte *record)
if (table->primary_key == MAX_KEY)
{
// Table has hidden primary key
Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname);
Uint64 auto_value= m_ndb->getAutoIncrementValue((NDBTAB *) m_table);
if (set_hidden_key(op, table->fields, (const byte*)&auto_value))
ERR_RETURN(op->getNdbError());
}
......@@ -1360,7 +1421,7 @@ int ha_ndbcluster::write_row(byte *record)
Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
DBUG_PRINT("info",
("Trying to set next auto increment value to %u", next_val));
if (m_ndb->setAutoIncrementValue(m_tabname, next_val, true))
if (m_ndb->setAutoIncrementValue((NDBTAB *) m_table, next_val, true))
DBUG_PRINT("info",
("Setting next auto increment value to %u", next_val));
}
......@@ -1473,7 +1534,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
}
else
{
if (!(op= trans->getNdbOperation(m_tabname)) ||
if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
op->updateTuple() != 0)
ERR_RETURN(trans->getNdbError());
......@@ -1551,7 +1612,7 @@ int ha_ndbcluster::delete_row(const byte *record)
else
{
if (!(op=trans->getNdbOperation(m_tabname)) ||
if (!(op=trans->getNdbOperation((NDBTAB *) m_table)) ||
op->deleteTuple() != 0)
ERR_RETURN(trans->getNdbError());
......@@ -2839,50 +2900,10 @@ int ha_ndbcluster::create(const char *name,
}
DBUG_PRINT("info", ("Table %s/%s created successfully",
m_dbname, m_tabname));
if ((my_errno= build_index_list()))
DBUG_RETURN(my_errno);
// Create secondary indexes
KEY* key_info= form->key_info;
const char** key_name= key_names;
for (i= 0; i < form->keys; i++, key_info++, key_name++)
{
int error= 0;
DBUG_PRINT("info", ("Index %u: %s", i, *key_name));
switch (get_index_type_from_table(i)){
case PRIMARY_KEY_INDEX:
// Do nothing, already created
break;
case PRIMARY_KEY_ORDERED_INDEX:
error= create_ordered_index(*key_name, key_info);
break;
case UNIQUE_ORDERED_INDEX:
if (!(error= create_ordered_index(*key_name, key_info)))
error= create_unique_index(get_unique_index_name(i), key_info);
break;
case UNIQUE_INDEX:
error= create_unique_index(get_unique_index_name(i), key_info);
break;
case ORDERED_INDEX:
error= create_ordered_index(*key_name, key_info);
break;
default:
DBUG_ASSERT(false);
break;
}
// Create secondary indexes
my_errno= build_index_list(form, ILBP_CREATE);
if (error)
{
DBUG_PRINT("error", ("Failed to create index %u", i));
drop_table();
my_errno= error;
break;
}
}
DBUG_RETURN(my_errno);
}
......@@ -2918,6 +2939,7 @@ int ha_ndbcluster::create_index(const char *name,
DBUG_ENTER("create_index");
DBUG_PRINT("enter", ("name: %s ", name));
// NdbDictionary::Index ndb_index(name);
NdbDictionary::Index ndb_index(name);
if (unique)
ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
......@@ -3059,8 +3081,8 @@ longlong ha_ndbcluster::get_auto_increment()
: autoincrement_prefetch;
Uint64 auto_value=
(skip_auto_increment) ?
m_ndb->readAutoIncrementValue(m_tabname)
: m_ndb->getAutoIncrementValue(m_tabname, cache_size);
m_ndb->readAutoIncrementValue((NDBTAB *) m_table)
: m_ndb->getAutoIncrementValue((NDBTAB *) m_table, cache_size);
DBUG_RETURN((longlong)auto_value);
}
......@@ -3104,8 +3126,10 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
for (i= 0; i < MAX_KEY; i++)
{
m_indextype[i]= UNDEFINED_INDEX;
m_unique_index_name[i]= NULL;
m_index[i].type= UNDEFINED_INDEX;
m_index[i].unique_name= NULL;
m_index[i].unique_index= NULL;
m_index[i].index= NULL;
}
DBUG_VOID_RETURN;
......
......@@ -37,6 +37,7 @@ class NdbScanOperation;
class NdbIndexScanOperation;
class NdbBlob;
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
PRIMARY_KEY_INDEX = 1,
......@@ -46,6 +47,12 @@ typedef enum ndb_index_type {
ORDERED_INDEX = 5
} NDB_INDEX_TYPE;
typedef struct ndb_index_data {
NDB_INDEX_TYPE type;
void *index;
const char * unique_name;
void *unique_index;
} NDB_INDEX_DATA;
typedef struct st_ndbcluster_share {
THR_LOCK lock;
......@@ -148,8 +155,9 @@ class ha_ndbcluster: public handler
int create_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info);
int create_unique_index(const char *name, KEY *key_info);
int initialize_autoincrement(const void* table);
int build_index_list();
int initialize_autoincrement(const void *table);
enum IBLP {ILBP_CREATE = 0, ILBP_OPEN = 1}; // index_list_build_phase
int build_index_list(TABLE *tab, enum IBLP phase);
int get_metadata(const char* path);
void release_metadata();
const char* get_index_name(uint idx_no) const;
......@@ -211,8 +219,7 @@ class ha_ndbcluster: public handler
ulong m_table_flags;
THR_LOCK_DATA m_lock;
NDB_SHARE *m_share;
NDB_INDEX_TYPE m_indextype[MAX_KEY];
const char* m_unique_index_name[MAX_KEY];
NDB_INDEX_DATA m_index[MAX_KEY];
// NdbRecAttr has no reference to blob
typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment