Commit 3e1493b8 authored by mskold@mysql.com's avatar mskold@mysql.com

Fix for WL#1731 Handler: multiple databases

parent 4fe6ab1f
DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7; DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
drop database if exists test2;
CREATE TABLE t1 ( CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY, pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL, attr1 INT NOT NULL,
...@@ -349,3 +350,30 @@ select * from t7; ...@@ -349,3 +350,30 @@ select * from t7;
adress a b c adress a b c
No adress 8 NULL 12 No adress 8 NULL 12
drop table t7; drop table t7;
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10)
) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413');
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
insert into t2 select pk1,attr1,attr2 from test.t1;
select * from t2 order by a;
a b c
9410 9412 NULL
9411 9413 17
select b from test.t1, t2 where c = test.t1.attr2;
b
9413
select b,test.t1.attr1 from test.t1, t2 where test.t1.pk1 < a;
b attr1
9413 9412
drop table test.t1, t2;
drop database test2;
DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7; DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
drop database if exists test2;
CREATE TABLE t1 ( CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY, pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL attr1 INT NOT NULL
...@@ -206,3 +207,51 @@ begin; ...@@ -206,3 +207,51 @@ begin;
drop table t2; drop table t2;
drop table t3; drop table t3;
drop table t4; drop table t4;
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL
) ENGINE=ndbcluster;
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
begin;
insert into test.t1 values(1,1);
insert into t2 values(1,1,1);
insert into test.t1 values(2,2);
insert into t2 values(2,2,2);
select count(*) from test.t1;
count(*)
2
select count(*) from t2;
count(*)
2
select * from test.t1 where pk1 = 1;
pk1 attr1
1 1
select * from t2 where a = 1;
a b c
1 1 1
select test.t1.attr1
from test.t1, test.t1 as t1x where test.t1.pk1 = t1x.pk1 + 1;
attr1
2
select t2.a
from t2, t2 as t2x where t2.a = t2x.a + 1;
a
2
select test.t1.pk1, a from test.t1,t2 where b > test.t1.attr1;
pk1 a
1 2
rollback;
select count(*) from test.t1;
count(*)
0
select count(*) from t2;
count(*)
0
drop table test.t1, t2;
drop database test2;
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
--disable_warnings --disable_warnings
DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7; DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
drop database if exists test2;
--enable_warnings --enable_warnings
# #
...@@ -319,3 +320,36 @@ delete from t7 where b=23; ...@@ -319,3 +320,36 @@ delete from t7 where b=23;
select * from t7; select * from t7;
drop table t7; drop table t7;
#
# Test multiple databases in one statement
#
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10)
) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413');
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
insert into t2 select pk1,attr1,attr2 from test.t1;
select * from t2 order by a;
select b from test.t1, t2 where c = test.t1.attr2;
select b,test.t1.attr1 from test.t1, t2 where test.t1.pk1 < a;
drop table test.t1, t2;
drop database test2;
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
--disable_warnings --disable_warnings
DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7; DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
drop database if exists test2;
--enable_warnings --enable_warnings
# #
...@@ -253,3 +254,45 @@ drop table t2; ...@@ -253,3 +254,45 @@ drop table t2;
drop table t3; drop table t3;
drop table t4; drop table t4;
#
# Test multiple databases in one transaction
#
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL
) ENGINE=ndbcluster;
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
begin;
insert into test.t1 values(1,1);
insert into t2 values(1,1,1);
insert into test.t1 values(2,2);
insert into t2 values(2,2,2);
select count(*) from test.t1;
select count(*) from t2;
select * from test.t1 where pk1 = 1;
select * from t2 where a = 1;
select test.t1.attr1
from test.t1, test.t1 as t1x where test.t1.pk1 = t1x.pk1 + 1;
select t2.a
from t2, t2 as t2x where t2.a = t2x.a + 1;
select test.t1.pk1, a from test.t1,t2 where b > test.t1.attr1;
rollback;
select count(*) from test.t1;
select count(*) from t2;
drop table test.t1, t2;
drop database test2;
...@@ -1416,9 +1416,14 @@ public: ...@@ -1416,9 +1416,14 @@ public:
*/ */
Uint64 getAutoIncrementValue(const char* aTableName, Uint64 getAutoIncrementValue(const char* aTableName,
Uint32 cacheSize = 1); Uint32 cacheSize = 1);
Uint64 getAutoIncrementValue(NdbDictionary::Table * aTable,
Uint32 cacheSize = 1);
Uint64 readAutoIncrementValue(const char* aTableName); Uint64 readAutoIncrementValue(const char* aTableName);
Uint64 readAutoIncrementValue(NdbDictionary::Table * aTable);
bool setAutoIncrementValue(const char* aTableName, Uint64 val, bool setAutoIncrementValue(const char* aTableName, Uint64 val,
bool increase = false); bool increase = false);
bool setAutoIncrementValue(NdbDictionary::Table * aTable, Uint64 val,
bool increase = false);
Uint64 getTupleIdFromNdb(const char* aTableName, Uint64 getTupleIdFromNdb(const char* aTableName,
Uint32 cacheSize = 1000); Uint32 cacheSize = 1000);
Uint64 getTupleIdFromNdb(Uint32 aTableId, Uint64 getTupleIdFromNdb(Uint32 aTableId,
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <ndb_types.h> #include <ndb_types.h>
#include <NdbError.hpp> #include <NdbError.hpp>
#include <NdbDictionary.hpp>
class NdbConnection; class NdbConnection;
class NdbOperation; class NdbOperation;
...@@ -440,6 +441,14 @@ public: ...@@ -440,6 +441,14 @@ public:
*/ */
int executePendingBlobOps(Uint8 flags = 0xFF); int executePendingBlobOps(Uint8 flags = 0xFF);
// Fast path calls for MySQL ha_ndbcluster
NdbOperation* getNdbOperation(NdbDictionary::Table * table);
NdbIndexOperation* getNdbIndexOperation(NdbDictionary::Index * index,
NdbDictionary::Table * table);
NdbScanOperation* getNdbScanOperation(NdbDictionary::Table * table);
NdbIndexScanOperation* getNdbIndexScanOperation(NdbDictionary::Index * index,
NdbDictionary::Table * table);
private: private:
/** /**
* Release completed operations * Release completed operations
...@@ -553,6 +562,8 @@ private: ...@@ -553,6 +562,8 @@ private:
NdbIndexOperation* getNdbIndexOperation(class NdbIndexImpl* anIndex, NdbIndexOperation* getNdbIndexOperation(class NdbIndexImpl* anIndex,
class NdbTableImpl* aTable, class NdbTableImpl* aTable,
NdbOperation* aNextOp = 0); NdbOperation* aNextOp = 0);
NdbIndexScanOperation* getNdbIndexScanOperation(NdbIndexImpl* index,
NdbTableImpl* table);
void handleExecuteCompletion(); void handleExecuteCompletion();
......
...@@ -736,6 +736,17 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize) ...@@ -736,6 +736,17 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
return tupleId; return tupleId;
} }
Uint64
Ndb::getAutoIncrementValue(NdbDictionary::Table * aTable, Uint32 cacheSize)
{
DEBUG_TRACE("getAutoIncrementValue");
if (aTable == 0)
return ~0;
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
return tupleId;
}
Uint64 Uint64
Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize) Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
{ {
...@@ -770,6 +781,17 @@ Ndb::readAutoIncrementValue(const char* aTableName) ...@@ -770,6 +781,17 @@ Ndb::readAutoIncrementValue(const char* aTableName)
return tupleId; return tupleId;
} }
Uint64
Ndb::readAutoIncrementValue(NdbDictionary::Table * aTable)
{
DEBUG_TRACE("readtAutoIncrementValue");
if (aTable == 0)
return ~0;
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
return tupleId;
}
Uint64 Uint64
Ndb::readTupleIdFromNdb(Uint32 aTableId) Ndb::readTupleIdFromNdb(Uint32 aTableId)
{ {
...@@ -790,6 +812,16 @@ Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase) ...@@ -790,6 +812,16 @@ Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
return setTupleIdInNdb(table->m_tableId, val, increase); return setTupleIdInNdb(table->m_tableId, val, increase);
} }
bool
Ndb::setAutoIncrementValue(NdbDictionary::Table * aTable, Uint64 val, bool increase)
{
DEBUG_TRACE("setAutoIncrementValue " << val);
if (aTable == 0)
return ~0;
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
return setTupleIdInNdb(table->m_tableId, val, increase);
}
bool bool
Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase ) Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase )
{ {
......
...@@ -995,6 +995,14 @@ NdbConnection::getNdbOperation(NdbTableImpl * tab, NdbOperation* aNextOp) ...@@ -995,6 +995,14 @@ NdbConnection::getNdbOperation(NdbTableImpl * tab, NdbOperation* aNextOp)
return NULL; return NULL;
}//NdbConnection::getNdbOperation() }//NdbConnection::getNdbOperation()
NdbOperation* NdbConnection::getNdbOperation(NdbDictionary::Table * table)
{
if (table)
return getNdbOperation(& NdbTableImpl::getImpl(*table));
else
return NULL;
}//NdbConnection::getNdbOperation()
// NdbScanOperation // NdbScanOperation
/***************************************************************************** /*****************************************************************************
NdbScanOperation* getNdbScanOperation(const char* aTableName); NdbScanOperation* getNdbScanOperation(const char* aTableName);
...@@ -1037,15 +1045,24 @@ Remark: Get an operation from NdbScanOperation idlelist and get the NdbC ...@@ -1037,15 +1045,24 @@ Remark: Get an operation from NdbScanOperation idlelist and get the NdbC
NdbIndexScanOperation* NdbIndexScanOperation*
NdbConnection::getNdbIndexScanOperation(const char* anIndexName, NdbConnection::getNdbIndexScanOperation(const char* anIndexName,
const char* aTableName) const char* aTableName)
{
NdbIndexImpl* index =
theNdb->theDictionary->getIndex(anIndexName, aTableName);
NdbTableImpl* table = theNdb->theDictionary->getTable(aTableName);
return getNdbIndexScanOperation(index, table);
}
NdbIndexScanOperation*
NdbConnection::getNdbIndexScanOperation(NdbIndexImpl* index,
NdbTableImpl* table)
{ {
if (theCommitStatus == Started){ if (theCommitStatus == Started){
NdbIndexImpl* index = const NdbTableImpl * indexTable = index->getIndexTable();
theNdb->theDictionary->getIndex(anIndexName, aTableName);
NdbTableImpl* table = theNdb->theDictionary->getTable(aTableName);
NdbTableImpl* indexTable =
theNdb->theDictionary->getIndexTable(index, table);
if (indexTable != 0){ if (indexTable != 0){
NdbIndexScanOperation* tOp = getNdbScanOperation(indexTable); NdbIndexScanOperation* tOp =
getNdbScanOperation((NdbTableImpl *) indexTable);
tOp->m_currentTable = table;
if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor; if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor;
return tOp; return tOp;
} else { } else {
...@@ -1056,7 +1073,18 @@ NdbConnection::getNdbIndexScanOperation(const char* anIndexName, ...@@ -1056,7 +1073,18 @@ NdbConnection::getNdbIndexScanOperation(const char* anIndexName,
setOperationErrorCodeAbort(4114); setOperationErrorCodeAbort(4114);
return NULL; return NULL;
}//NdbConnection::getNdbScanOperation() }//NdbConnection::getNdbIndexScanOperation()
NdbIndexScanOperation*
NdbConnection::getNdbIndexScanOperation(NdbDictionary::Index * index,
NdbDictionary::Table * table)
{
if (index && table)
return getNdbIndexScanOperation(& NdbIndexImpl::getImpl(*index),
& NdbTableImpl::getImpl(*table));
else
return NULL;
}//NdbConnection::getNdbIndexScanOperation()
/***************************************************************************** /*****************************************************************************
NdbScanOperation* getNdbScanOperation(int aTableId); NdbScanOperation* getNdbScanOperation(int aTableId);
...@@ -1097,6 +1125,14 @@ getNdbOp_error1: ...@@ -1097,6 +1125,14 @@ getNdbOp_error1:
return NULL; return NULL;
}//NdbConnection::getNdbScanOperation() }//NdbConnection::getNdbScanOperation()
NdbScanOperation*
NdbConnection::getNdbScanOperation(NdbDictionary::Table * table)
{
if (table)
return getNdbScanOperation(& NdbTableImpl::getImpl(*table));
else
return NULL;
}//NdbConnection::getNdbScanOperation()
// IndexOperation // IndexOperation
...@@ -1191,6 +1227,18 @@ NdbConnection::getNdbIndexOperation(NdbIndexImpl * anIndex, ...@@ -1191,6 +1227,18 @@ NdbConnection::getNdbIndexOperation(NdbIndexImpl * anIndex,
return NULL; return NULL;
}//NdbConnection::getNdbIndexOperation() }//NdbConnection::getNdbIndexOperation()
NdbIndexOperation*
NdbConnection::getNdbIndexOperation(NdbDictionary::Index * index,
NdbDictionary::Table * table)
{
if (index && table)
return getNdbIndexOperation(& NdbIndexImpl::getImpl(*index),
& NdbTableImpl::getImpl(*table));
else
return NULL;
}//NdbConnection::getNdbIndexOperation()
/******************************************************************************* /*******************************************************************************
int receiveDIHNDBTAMPER(NdbApiSignal* aSignal) int receiveDIHNDBTAMPER(NdbApiSignal* aSignal)
......
...@@ -492,6 +492,12 @@ NdbIndexImpl::getTable() const ...@@ -492,6 +492,12 @@ NdbIndexImpl::getTable() const
return m_tableName.c_str(); return m_tableName.c_str();
} }
const NdbTableImpl *
NdbIndexImpl::getIndexTable() const
{
return m_table;
}
/** /**
* NdbEventImpl * NdbEventImpl
*/ */
......
...@@ -170,6 +170,7 @@ public: ...@@ -170,6 +170,7 @@ public:
const char * getName() const; const char * getName() const;
void setTable(const char * table); void setTable(const char * table);
const char * getTable() const; const char * getTable() const;
const NdbTableImpl * getIndexTable() const;
Uint32 m_indexId; Uint32 m_indexId;
BaseString m_internalName; BaseString m_internalName;
......
...@@ -188,12 +188,15 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -188,12 +188,15 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
m_keyInfo = lockExcl; m_keyInfo = lockExcl;
bool range = false; bool range = false;
if (m_currentTable->m_indexType == NdbDictionary::Index::OrderedIndex || if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
m_currentTable->m_indexType == NdbDictionary::Index::UniqueOrderedIndex){ m_accessTable->m_indexType == NdbDictionary::Index::UniqueOrderedIndex){
assert(m_currentTable == m_accessTable); if (m_currentTable == m_accessTable){
m_currentTable = theNdb->theDictionary-> // Old way of scanning indexes, should not be allowed
getTable(m_currentTable->m_primaryTable.c_str()); m_currentTable = theNdb->theDictionary->
assert(m_currentTable != NULL); getTable(m_currentTable->m_primaryTable.c_str());
assert(m_currentTable != NULL);
}
assert (m_currentTable != m_accessTable);
// Modify operation state // Modify operation state
theStatus = SetBound; theStatus = SetBound;
theOperationType = OpenRangeScanRequest; theOperationType = OpenRangeScanRequest;
......
...@@ -545,20 +545,19 @@ int ha_ndbcluster::get_metadata(const char *path) ...@@ -545,20 +545,19 @@ int ha_ndbcluster::get_metadata(const char *path)
DBUG_RETURN(build_index_list()); DBUG_RETURN(build_index_list());
} }
int ha_ndbcluster::build_index_list() int ha_ndbcluster::build_index_list0()
{ {
char *name; char *name;
const char *index_name; const char *index_name;
static const char* unique_suffix= "$unique"; static const char* unique_suffix= "$unique";
uint i, name_len; uint i, name_len;
DBUG_ENTER("build_index_list"); DBUG_ENTER("build_index_list0");
// Save information about all known indexes // Save information about all known indexes
for (i= 0; i < table->keys; i++) for (i= 0; i < table->keys; i++)
{ {
NDB_INDEX_TYPE idx_type= get_index_type_from_table(i); NDB_INDEX_TYPE idx_type= get_index_type_from_table(i);
m_indextype[i]= idx_type; m_index[i].type= idx_type;
if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX) if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
{ {
index_name= get_index_name(i); index_name= get_index_name(i);
...@@ -567,7 +566,7 @@ int ha_ndbcluster::build_index_list() ...@@ -567,7 +566,7 @@ int ha_ndbcluster::build_index_list()
if (!(name= my_malloc(name_len, MYF(MY_WME)))) if (!(name= my_malloc(name_len, MYF(MY_WME))))
DBUG_RETURN(2); DBUG_RETURN(2);
strxnmov(name, name_len, index_name, unique_suffix, NullS); strxnmov(name, name_len, index_name, unique_suffix, NullS);
m_unique_index_name[i]= name; m_index[i].unique_name = name;
DBUG_PRINT("info", ("Created unique index name: %s for index %d", DBUG_PRINT("info", ("Created unique index name: %s for index %d",
name, i)); name, i));
} }
...@@ -575,7 +574,44 @@ int ha_ndbcluster::build_index_list() ...@@ -575,7 +574,44 @@ int ha_ndbcluster::build_index_list()
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int ha_ndbcluster::build_index_list1()
{
uint i;
NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
DBUG_ENTER("build_index_object_list1");
// Add direct references to index objects
for (i= 0; i < table->keys; i++)
{
DBUG_PRINT("info", ("Trying to add handle to index %s", get_index_name(i)));
if ((m_index[i].type != PRIMARY_KEY_INDEX) &&
(m_index[i].type != UNIQUE_INDEX))
{
const NDBINDEX *index= dict->getIndex(get_index_name(i), m_tabname);
if (!index) DBUG_RETURN(1);
m_index[i].index = (void *) index;
}
if (m_index[i].unique_name)
{
const NDBINDEX *index= dict->getIndex(m_index[i].unique_name, m_tabname);
if (!index) DBUG_RETURN(1);
m_index[i].unique_index = (void *) index;
}
DBUG_PRINT("info", ("Added handle to index %s", get_index_name(i)));
}
DBUG_RETURN(0);
}
int ha_ndbcluster::build_index_list()
{
int res;
DBUG_ENTER("build_index_list");
if ((res= build_index_list0()))
DBUG_RETURN(res);
if ((res= build_index_list1()))
DBUG_RETURN(res);
DBUG_RETURN(0);
}
/* /*
Decode the type of an index from information Decode the type of an index from information
...@@ -605,9 +641,11 @@ void ha_ndbcluster::release_metadata() ...@@ -605,9 +641,11 @@ void ha_ndbcluster::release_metadata()
// Release index list // Release index list
for (i= 0; i < MAX_KEY; i++) for (i= 0; i < MAX_KEY; i++)
{ {
if (m_unique_index_name[i]) if (m_index[i].unique_name)
my_free((char*)m_unique_index_name[i], MYF(0)); my_free((char*)m_index[i].unique_name, MYF(0));
m_unique_index_name[i]= NULL; m_index[i].unique_name= NULL;
m_index[i].unique_index= NULL;
m_index[i].index= NULL;
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -667,13 +705,13 @@ inline const char* ha_ndbcluster::get_index_name(uint idx_no) const ...@@ -667,13 +705,13 @@ inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const
{ {
return m_unique_index_name[idx_no]; return m_index[idx_no].unique_name;
} }
inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
{ {
DBUG_ASSERT(idx_no < MAX_KEY); DBUG_ASSERT(idx_no < MAX_KEY);
return m_indextype[idx_no]; return m_index[idx_no].type;
} }
...@@ -763,7 +801,8 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) ...@@ -763,7 +801,8 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
DBUG_PRINT("enter", ("key_len: %u", key_len)); DBUG_PRINT("enter", ("key_len: %u", key_len));
DBUG_DUMP("key", (char*)key, key_len); DBUG_DUMP("key", (char*)key, key_len);
if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0) if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
op->readTuple() != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
if (table->primary_key == MAX_KEY) if (table->primary_key == MAX_KEY)
...@@ -831,7 +870,8 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ...@@ -831,7 +870,8 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
// We have allready retrieved all fields, nothing to complement // We have allready retrieved all fields, nothing to complement
DBUG_RETURN(0); DBUG_RETURN(0);
if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0) if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
op->readTuple() != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
int res; int res;
...@@ -882,8 +922,9 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -882,8 +922,9 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_DUMP("key", (char*)key, key_len); DBUG_DUMP("key", (char*)key, key_len);
DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index))); DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index)));
if (!(op= trans->getNdbIndexOperation(get_unique_index_name(active_index), if (!(op= trans->getNdbIndexOperation((NDBINDEX *)
m_tabname)) || m_index[active_index].unique_index,
(NDBTAB *) m_table)) ||
op->readTuple() != 0) op->readTuple() != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
...@@ -1083,7 +1124,9 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1083,7 +1124,9 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
index_name= get_index_name(active_index); index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname))) if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
m_index[active_index].index,
(NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
...@@ -1146,7 +1189,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, ...@@ -1146,7 +1189,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_PRINT("info", ("Starting a new filtered scan on %s", DBUG_PRINT("info", ("Starting a new filtered scan on %s",
m_tabname)); m_tabname));
if (!(op= trans->getNdbScanOperation(m_tabname))) if (!(op= trans->getNdbScanOperation((NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type); get_ndb_lock_type(m_lock.type);
...@@ -1217,7 +1260,7 @@ int ha_ndbcluster::full_table_scan(byte *buf) ...@@ -1217,7 +1260,7 @@ int ha_ndbcluster::full_table_scan(byte *buf)
DBUG_ENTER("full_table_scan"); DBUG_ENTER("full_table_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname)); DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
if (!(op=trans->getNdbScanOperation(m_tabname))) if (!(op=trans->getNdbScanOperation((NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type); get_ndb_lock_type(m_lock.type);
...@@ -1294,7 +1337,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1294,7 +1337,7 @@ int ha_ndbcluster::write_row(byte *record)
has_auto_increment= (table->next_number_field && record == table->record[0]); has_auto_increment= (table->next_number_field && record == table->record[0]);
skip_auto_increment= table->auto_increment_field_not_null; skip_auto_increment= table->auto_increment_field_not_null;
if (!(op= trans->getNdbOperation(m_tabname))) if (!(op= trans->getNdbOperation((NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
res= (m_use_write) ? op->writeTuple() :op->insertTuple(); res= (m_use_write) ? op->writeTuple() :op->insertTuple();
...@@ -1304,7 +1347,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1304,7 +1347,7 @@ int ha_ndbcluster::write_row(byte *record)
if (table->primary_key == MAX_KEY) if (table->primary_key == MAX_KEY)
{ {
// Table has hidden primary key // Table has hidden primary key
Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname); Uint64 auto_value= m_ndb->getAutoIncrementValue((NDBTAB *) m_table);
if (set_hidden_key(op, table->fields, (const byte*)&auto_value)) if (set_hidden_key(op, table->fields, (const byte*)&auto_value))
ERR_RETURN(op->getNdbError()); ERR_RETURN(op->getNdbError());
} }
...@@ -1360,7 +1403,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1360,7 +1403,7 @@ int ha_ndbcluster::write_row(byte *record)
Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1; Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
DBUG_PRINT("info", DBUG_PRINT("info",
("Trying to set next auto increment value to %u", next_val)); ("Trying to set next auto increment value to %u", next_val));
if (m_ndb->setAutoIncrementValue(m_tabname, next_val, true)) if (m_ndb->setAutoIncrementValue((NDBTAB *) m_table, next_val, true))
DBUG_PRINT("info", DBUG_PRINT("info",
("Setting next auto increment value to %u", next_val)); ("Setting next auto increment value to %u", next_val));
} }
...@@ -1473,7 +1516,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -1473,7 +1516,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
} }
else else
{ {
if (!(op= trans->getNdbOperation(m_tabname)) || if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
op->updateTuple() != 0) op->updateTuple() != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
...@@ -1551,7 +1594,7 @@ int ha_ndbcluster::delete_row(const byte *record) ...@@ -1551,7 +1594,7 @@ int ha_ndbcluster::delete_row(const byte *record)
else else
{ {
if (!(op=trans->getNdbOperation(m_tabname)) || if (!(op=trans->getNdbOperation((NDBTAB *) m_table)) ||
op->deleteTuple() != 0) op->deleteTuple() != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
...@@ -2840,7 +2883,7 @@ int ha_ndbcluster::create(const char *name, ...@@ -2840,7 +2883,7 @@ int ha_ndbcluster::create(const char *name,
DBUG_PRINT("info", ("Table %s/%s created successfully", DBUG_PRINT("info", ("Table %s/%s created successfully",
m_dbname, m_tabname)); m_dbname, m_tabname));
if ((my_errno= build_index_list())) if ((my_errno= build_index_list0()))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
// Create secondary indexes // Create secondary indexes
...@@ -2882,6 +2925,10 @@ int ha_ndbcluster::create(const char *name, ...@@ -2882,6 +2925,10 @@ int ha_ndbcluster::create(const char *name,
break; break;
} }
} }
if (!(my_errno) && (my_errno= build_index_list1()))
DBUG_RETURN(my_errno);
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} }
...@@ -2918,6 +2965,7 @@ int ha_ndbcluster::create_index(const char *name, ...@@ -2918,6 +2965,7 @@ int ha_ndbcluster::create_index(const char *name,
DBUG_ENTER("create_index"); DBUG_ENTER("create_index");
DBUG_PRINT("enter", ("name: %s ", name)); DBUG_PRINT("enter", ("name: %s ", name));
// NdbDictionary::Index ndb_index(name);
NdbDictionary::Index ndb_index(name); NdbDictionary::Index ndb_index(name);
if (unique) if (unique)
ndb_index.setType(NdbDictionary::Index::UniqueHashIndex); ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
...@@ -3059,8 +3107,8 @@ longlong ha_ndbcluster::get_auto_increment() ...@@ -3059,8 +3107,8 @@ longlong ha_ndbcluster::get_auto_increment()
: autoincrement_prefetch; : autoincrement_prefetch;
Uint64 auto_value= Uint64 auto_value=
(skip_auto_increment) ? (skip_auto_increment) ?
m_ndb->readAutoIncrementValue(m_tabname) m_ndb->readAutoIncrementValue((NDBTAB *) m_table)
: m_ndb->getAutoIncrementValue(m_tabname, cache_size); : m_ndb->getAutoIncrementValue((NDBTAB *) m_table, cache_size);
DBUG_RETURN((longlong)auto_value); DBUG_RETURN((longlong)auto_value);
} }
...@@ -3104,8 +3152,10 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -3104,8 +3152,10 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
for (i= 0; i < MAX_KEY; i++) for (i= 0; i < MAX_KEY; i++)
{ {
m_indextype[i]= UNDEFINED_INDEX; m_index[i].type= UNDEFINED_INDEX;
m_unique_index_name[i]= NULL; m_index[i].unique_name= NULL;
m_index[i].unique_index= NULL;
m_index[i].index= NULL;
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
......
...@@ -37,6 +37,7 @@ class NdbScanOperation; ...@@ -37,6 +37,7 @@ class NdbScanOperation;
class NdbIndexScanOperation; class NdbIndexScanOperation;
class NdbBlob; class NdbBlob;
typedef enum ndb_index_type { typedef enum ndb_index_type {
UNDEFINED_INDEX = 0, UNDEFINED_INDEX = 0,
PRIMARY_KEY_INDEX = 1, PRIMARY_KEY_INDEX = 1,
...@@ -46,6 +47,12 @@ typedef enum ndb_index_type { ...@@ -46,6 +47,12 @@ typedef enum ndb_index_type {
ORDERED_INDEX = 5 ORDERED_INDEX = 5
} NDB_INDEX_TYPE; } NDB_INDEX_TYPE;
typedef struct ndb_index_data {
NDB_INDEX_TYPE type;
void *index;
const char * unique_name;
void *unique_index;
} NDB_INDEX_DATA;
typedef struct st_ndbcluster_share { typedef struct st_ndbcluster_share {
THR_LOCK lock; THR_LOCK lock;
...@@ -149,6 +156,8 @@ class ha_ndbcluster: public handler ...@@ -149,6 +156,8 @@ class ha_ndbcluster: public handler
int create_ordered_index(const char *name, KEY *key_info); int create_ordered_index(const char *name, KEY *key_info);
int create_unique_index(const char *name, KEY *key_info); int create_unique_index(const char *name, KEY *key_info);
int initialize_autoincrement(const void* table); int initialize_autoincrement(const void* table);
int build_index_list0();
int build_index_list1();
int build_index_list(); int build_index_list();
int get_metadata(const char* path); int get_metadata(const char* path);
void release_metadata(); void release_metadata();
...@@ -211,8 +220,7 @@ class ha_ndbcluster: public handler ...@@ -211,8 +220,7 @@ class ha_ndbcluster: public handler
ulong m_table_flags; ulong m_table_flags;
THR_LOCK_DATA m_lock; THR_LOCK_DATA m_lock;
NDB_SHARE *m_share; NDB_SHARE *m_share;
NDB_INDEX_TYPE m_indextype[MAX_KEY]; NDB_INDEX_DATA m_index[MAX_KEY];
const char* m_unique_index_name[MAX_KEY];
// NdbRecAttr has no reference to blob // NdbRecAttr has no reference to blob
typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue; typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment