Commit a1c92afa authored by joreland@mysql.com's avatar joreland@mysql.com

Merge joreland@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into mysql.com:/home/jonas/src/mysql-4.1-ndb
parents a3bd2cd1 164878bb
...@@ -1003,17 +1003,22 @@ public: ...@@ -1003,17 +1003,22 @@ public:
/* /*
* TUX index in TUP has single Uint32 array attribute which stores an * TUX index in TUP has single Uint32 array attribute which stores an
* index node. TUX uses following methods. * index node. TUX reads and writes the node directly via pointer.
*/ */
int tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node); int tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node);
void tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node); void tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node);
void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node); void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node);
/* /*
* TUX reads primary table attributes for 1) index key 2) primary key * TUX reads primary table attributes for index keys. Input is
* when returning keyinfo. TUX uses following methods. * attribute ids in AttributeHeader format. Output is pointers to
* attribute data within tuple or 0 for NULL value.
*/
void tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData);
/*
* TUX reads primary key for md5 summing and when returning keyinfo.
*/ */
void tuxReadAttrs(); // under construction
void tuxReadKeys(); // under construction void tuxReadKeys(); // under construction
private: private:
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
void void
Dbtup::tuxGetTupAddr(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32& tupAddr) Dbtup::tuxGetTupAddr(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32& tupAddr)
{ {
ljamEntry();
FragrecordPtr fragPtr; FragrecordPtr fragPtr;
fragPtr.i = fragPtrI; fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
...@@ -54,6 +55,7 @@ Dbtup::tuxGetTupAddr(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32& ...@@ -54,6 +55,7 @@ Dbtup::tuxGetTupAddr(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32&
int int
Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node) Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node)
{ {
ljamEntry();
FragrecordPtr fragPtr; FragrecordPtr fragPtr;
fragPtr.i = fragPtrI; fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
...@@ -63,7 +65,7 @@ Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pag ...@@ -63,7 +65,7 @@ Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pag
PagePtr pagePtr; PagePtr pagePtr;
terrorCode = 0; terrorCode = 0;
if (! allocTh(fragPtr.p, tablePtr.p, NORMAL_PAGE, signal, pageOffset, pagePtr)) { if (! allocTh(fragPtr.p, tablePtr.p, NORMAL_PAGE, signal, pageOffset, pagePtr)) {
jam(); ljam();
ndbrequire(terrorCode != 0); ndbrequire(terrorCode != 0);
return terrorCode; return terrorCode;
} }
...@@ -77,6 +79,7 @@ Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pag ...@@ -77,6 +79,7 @@ Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pag
void void
Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node) Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node)
{ {
ljamEntry();
FragrecordPtr fragPtr; FragrecordPtr fragPtr;
fragPtr.i = fragPtrI; fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
...@@ -95,6 +98,7 @@ Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOf ...@@ -95,6 +98,7 @@ Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOf
void void
Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node) Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node)
{ {
ljamEntry();
FragrecordPtr fragPtr; FragrecordPtr fragPtr;
fragPtr.i = fragPtrI; fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
...@@ -109,9 +113,62 @@ Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& no ...@@ -109,9 +113,62 @@ Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& no
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset; node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
} }
void // under construction void
Dbtup::tuxReadAttrs() Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData)
{ {
ljamEntry();
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
// search for tuple version if not original
if (pagePtr.p->pageWord[pageOffset + 1] != tupVersion) {
ljam();
OperationrecPtr opPtr;
opPtr.i = pagePtr.p->pageWord[pageOffset];
Uint32 loopGuard = 0;
while (true) {
ptrCheckGuard(opPtr, cnoOfOprec, operationrec);
if (opPtr.p->realPageIdC != RNIL) {
pagePtr.i = opPtr.p->realPageIdC;
pageOffset = opPtr.p->pageOffsetC;
ptrCheckGuard(pagePtr, cnoOfPage, page);
if (pagePtr.p->pageWord[pageOffset + 1] == tupVersion) {
ljam();
break;
}
}
ljam();
opPtr.i = opPtr.p->nextActiveOp;
ndbrequire(++loopGuard < (1 << ZTUP_VERSION_BITS));
}
}
const Uint32 tabDescriptor = tablePtr.p->tabDescriptor;
const Uint32* tupleHeader = &pagePtr.p->pageWord[pageOffset];
for (Uint32 i = 0; i < numAttrs; i++) {
AttributeHeader ah(attrIds[i]);
Uint32 attrId = ah.getAttributeId();
Uint32 index = tabDescriptor + (attrId << ZAD_LOG_SIZE);
Uint32 desc1 = tableDescriptor[index].tabDescr;
Uint32 desc2 = tableDescriptor[index + 1].tabDescr;
if (AttributeDescriptor::getNullable(desc1)) {
Uint32 offset = AttributeOffset::getNullFlagOffset(desc2);
ndbrequire(offset < tablePtr.p->tupNullWords);
offset += tablePtr.p->tupNullIndex;
ndbrequire(offset < tablePtr.p->tupheadsize);
if (AttributeOffset::isNULL(tupleHeader[offset], desc2)) {
ljam();
attrData[i] = 0;
continue;
}
}
attrData[i] = tupleHeader + AttributeOffset::getOffset(desc2);
}
} }
void // under construction void // under construction
...@@ -259,10 +316,10 @@ Dbtup::execTUP_QUERY_TH(Signal* signal) ...@@ -259,10 +316,10 @@ Dbtup::execTUP_QUERY_TH(Signal* signal)
for this transaction and savepoint id. If its tuple version equals for this transaction and savepoint id. If its tuple version equals
the requested then we have a visible tuple otherwise not. the requested then we have a visible tuple otherwise not.
*/ */
jam(); ljam();
Uint32 read_tupVersion = pagePtr.p->pageWord[tempOp.pageOffset + 1]; Uint32 read_tupVersion = pagePtr.p->pageWord[tempOp.pageOffset + 1];
if (read_tupVersion == req_tupVersion) { if (read_tupVersion == req_tupVersion) {
jam(); ljam();
ret_result = 1; ret_result = 1;
} }
} }
...@@ -580,7 +637,7 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI) ...@@ -580,7 +637,7 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI)
tuple as a copy tuple. The original tuple is stable and is thus tuple as a copy tuple. The original tuple is stable and is thus
preferrable to store in TUX. preferrable to store in TUX.
*/ */
jam(); ljam();
ptrCheckGuard(pageOperPtr, cnoOfOprec, operationrec); ptrCheckGuard(pageOperPtr, cnoOfOprec, operationrec);
realPageId = pageOperPtr.p->realPageId; realPageId = pageOperPtr.p->realPageId;
pageOffset = pageOperPtr.p->pageOffset; pageOffset = pageOperPtr.p->pageOffset;
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <new> #include <new>
#include <ndb_limits.h> #include <ndb_limits.h>
#include <SimulatedBlock.hpp> #include <SimulatedBlock.hpp>
#include <AttributeDescriptor.hpp>
#include <AttributeHeader.hpp> #include <AttributeHeader.hpp>
#include <ArrayPool.hpp> #include <ArrayPool.hpp>
#include <DataBuffer.hpp> #include <DataBuffer.hpp>
...@@ -84,6 +85,10 @@ ...@@ -84,6 +85,10 @@
#define jam() jamLine(90000 + __LINE__) #define jam() jamLine(90000 + __LINE__)
#define jamEntry() jamEntryLine(90000 + __LINE__) #define jamEntry() jamEntryLine(90000 + __LINE__)
#endif #endif
#ifndef jam
#define jam() jamLine(__LINE__)
#define jamEntry() jamEntryLine(__LINE__)
#endif
#undef max #undef max
#undef min #undef min
...@@ -115,7 +120,7 @@ private: ...@@ -115,7 +120,7 @@ private:
struct DescEnt; struct DescEnt;
/* /*
* Pointer to Uint32 data. Interpretation is context dependent. * Pointer to array of Uint32.
*/ */
struct Data { struct Data {
private: private:
...@@ -131,7 +136,7 @@ private: ...@@ -131,7 +136,7 @@ private:
friend class Data; friend class Data;
/* /*
* Pointer to constant Uint32 data. * Pointer to array of constant Uint32.
*/ */
struct ConstData; struct ConstData;
friend struct ConstData; friend struct ConstData;
...@@ -153,6 +158,11 @@ private: ...@@ -153,6 +158,11 @@ private:
// AttributeHeader size is assumed to be 1 word // AttributeHeader size is assumed to be 1 word
static const unsigned AttributeHeaderSize = 1; static const unsigned AttributeHeaderSize = 1;
/*
* Array of pointers to TUP table attributes. Always read-on|y.
*/
typedef const Uint32** TableData;
/* /*
* Logical tuple address, "local key". Identifies table tuples. * Logical tuple address, "local key". Identifies table tuples.
*/ */
...@@ -554,31 +564,6 @@ private: ...@@ -554,31 +564,6 @@ private:
ReadPar(); ReadPar();
}; };
/*
* Tree search for entry.
*/
struct SearchPar;
friend struct SearchPar;
struct SearchPar {
ConstData m_data; // input index key values
TreeEnt m_ent; // input tuple and version
SearchPar();
};
/*
* Attribute data comparison.
*/
struct CmpPar;
friend struct CmpPar;
struct CmpPar {
ConstData m_data1; // full search key
ConstData m_data2; // full or prefix data
unsigned m_len2; // words in data2 buffer
unsigned m_first; // first attribute
unsigned m_numEq; // number of initial equal attributes
CmpPar();
};
/* /*
* Scan bound comparison. * Scan bound comparison.
*/ */
...@@ -602,7 +587,10 @@ private: ...@@ -602,7 +587,10 @@ private:
void execSTTOR(Signal* signal); void execSTTOR(Signal* signal);
void execREAD_CONFIG_REQ(Signal* signal); void execREAD_CONFIG_REQ(Signal* signal);
// utils // utils
void setKeyAttrs(const Frag& frag);
void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData);
void copyAttrs(Data dst, ConstData src, CopyPar& copyPar); void copyAttrs(Data dst, ConstData src, CopyPar& copyPar);
void copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
/* /*
* DbtuxMeta.cpp * DbtuxMeta.cpp
...@@ -645,7 +633,7 @@ private: ...@@ -645,7 +633,7 @@ private:
/* /*
* DbtuxTree.cpp * DbtuxTree.cpp
*/ */
void treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& treePos); void treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent); void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos); void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
...@@ -672,7 +660,8 @@ private: ...@@ -672,7 +660,8 @@ private:
/* /*
* DbtuxCmp.cpp * DbtuxCmp.cpp
*/ */
int cmpTreeAttrs(const Frag& frag, CmpPar& cmpPar); int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2 = MaxAttrDataSize);
int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2);
int cmpScanBound(const Frag& frag, const BoundPar boundPar); int cmpScanBound(const Frag& frag, const BoundPar boundPar);
/* /*
...@@ -716,12 +705,25 @@ private: ...@@ -716,12 +705,25 @@ private:
Uint32 c_internalStartPhase; Uint32 c_internalStartPhase;
Uint32 c_typeOfStart; Uint32 c_typeOfStart;
// buffers /*
Data c_keyBuffer; // search key or scan bound * Array of index key attribute ids in AttributeHeader format.
* Includes fixed attribute sizes. This is global data set at
* operation start and is not passed as a parameter.
*/
Data c_keyAttrs;
// buffer for search key data as pointers to TUP storage
TableData c_searchKey;
// buffer for current entry key data as pointers to TUP storage
TableData c_entryKey;
// buffer for scan bounds and keyinfo (primary key)
Data c_dataBuffer;
// inlined utils // inlined utils
DescEnt& getDescEnt(Uint32 descPage, Uint32 descOff); DescEnt& getDescEnt(Uint32 descPage, Uint32 descOff);
Uint32 getTupAddr(const Frag& frag, const TreeEnt ent); Uint32 getTupAddr(const Frag& frag, TreeEnt ent);
static unsigned min(unsigned x, unsigned y); static unsigned min(unsigned x, unsigned y);
static unsigned max(unsigned x, unsigned y); static unsigned max(unsigned x, unsigned y);
}; };
...@@ -1211,23 +1213,6 @@ Dbtux::ReadPar::ReadPar() : ...@@ -1211,23 +1213,6 @@ Dbtux::ReadPar::ReadPar() :
{ {
} }
inline
Dbtux::SearchPar::SearchPar() :
m_data(0),
m_ent()
{
}
inline
Dbtux::CmpPar::CmpPar() :
m_data1(0),
m_data2(0),
m_len2(0),
m_first(0),
m_numEq(0)
{
}
inline inline
Dbtux::BoundPar::BoundPar() : Dbtux::BoundPar::BoundPar() :
m_data1(0), m_data1(0),
...@@ -1267,12 +1252,13 @@ Dbtux::getDescEnt(Uint32 descPage, Uint32 descOff) ...@@ -1267,12 +1252,13 @@ Dbtux::getDescEnt(Uint32 descPage, Uint32 descOff)
} }
inline Uint32 inline Uint32
Dbtux::getTupAddr(const Frag& frag, const TreeEnt ent) Dbtux::getTupAddr(const Frag& frag, TreeEnt ent)
{ {
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc; const TupLoc tupLoc = ent.m_tupLoc;
Uint32 tupAddr = NullTupAddr; Uint32 tupAddr = NullTupAddr;
c_tup->tuxGetTupAddr(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupAddr); c_tup->tuxGetTupAddr(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupAddr);
jamEntry();
return tupAddr; return tupAddr;
} }
......
...@@ -18,50 +18,42 @@ ...@@ -18,50 +18,42 @@
#include "Dbtux.hpp" #include "Dbtux.hpp"
/* /*
* Search key vs tree entry. * Search key vs node prefix.
* *
* Compare search key and index attribute data. The attribute data may * The comparison starts at given attribute position (in fact 0). The
* be partial in which case CmpUnknown may be returned. Also counts how * position is updated by number of equal initial attributes found. The
* many (additional) initial attributes were equal. * prefix may be partial in which case CmpUnknown may be returned.
*/ */
int int
Dbtux::cmpTreeAttrs(const Frag& frag, CmpPar& cmpPar) Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2)
{ {
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
ConstData data1 = cmpPar.m_data1;
ConstData data2 = cmpPar.m_data2;
// number of words of attribute data left // number of words of attribute data left
unsigned len2 = cmpPar.m_len2; unsigned len2 = maxlen2;
const unsigned numAttrs = frag.m_numAttrs; // skip to right position in search key
unsigned index = cmpPar.m_first; data1 += start;
ndbrequire(index < numAttrs);
// skip to right position in search key XXX do it before the call
for (unsigned i = 0; i < index; i++) {
jam();
data1 += AttributeHeaderSize + data1.ah().getDataSize();
}
unsigned numEq = 0;
int ret = 0; int ret = 0;
while (index < numAttrs) { while (start < numAttrs) {
if (len2 < AttributeHeaderSize) { if (len2 < AttributeHeaderSize) {
jam(); jam();
ret = NdbSqlUtil::CmpUnknown; ret = NdbSqlUtil::CmpUnknown;
break; break;
} }
len2 -= AttributeHeaderSize; len2 -= AttributeHeaderSize;
if (! data1.ah().isNULL()) { if (*data1 != 0) {
if (! data2.ah().isNULL()) { if (! data2.ah().isNULL()) {
jam(); jam();
// current attribute // current attribute
const DescAttr& descAttr = descEnt.m_descAttr[index]; const DescAttr& descAttr = descEnt.m_descAttr[start];
const unsigned typeId = descAttr.m_typeId; const unsigned typeId = descAttr.m_typeId;
// full data size // full data size
const unsigned size1 = data1.ah().getDataSize(); const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize()); ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize());
const unsigned size2 = min(size1, len2); const unsigned size2 = min(size1, len2);
len2 -= size2; len2 -= size2;
// compare // compare
const Uint32* const p1 = &data1[AttributeHeaderSize]; const Uint32* const p1 = *data1;
const Uint32* const p2 = &data2[AttributeHeaderSize]; const Uint32* const p2 = &data2[AttributeHeaderSize];
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2); ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
if (ret != 0) { if (ret != 0) {
...@@ -82,17 +74,70 @@ Dbtux::cmpTreeAttrs(const Frag& frag, CmpPar& cmpPar) ...@@ -82,17 +74,70 @@ Dbtux::cmpTreeAttrs(const Frag& frag, CmpPar& cmpPar)
break; break;
} }
} }
data1 += AttributeHeaderSize + data1.ah().getDataSize(); data1 += 1;
data2 += AttributeHeaderSize + data2.ah().getDataSize(); data2 += AttributeHeaderSize + data2.ah().getDataSize();
numEq++; start++;
index++;
} }
// XXX until data format errors are handled // XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError); ndbrequire(ret != NdbSqlUtil::CmpError);
cmpPar.m_numEq += numEq; // add to previous count
return ret; return ret;
} }
/*
* Search key vs tree entry.
*
* Start position is updated as in previous routine.
*/
int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// skip to right position
data1 += start;
data2 += start;
int ret = 0;
while (start < numAttrs) {
if (*data1 != 0) {
if (*data2 != 0) {
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
const unsigned typeId = descAttr.m_typeId;
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
const Uint32* const p1 = *data1;
const Uint32* const p2 = *data2;
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
if (ret != 0) {
jam();
break;
}
} else {
jam();
// not NULL < NULL
ret = -1;
break;
}
} else {
if (*data2 != 0) {
jam();
// NULL > not NULL
ret = +1;
break;
}
}
data1 += 1;
data2 += 1;
start++;
}
// XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError);
return ret;
}
/* /*
* Scan bound vs tree entry. * Scan bound vs tree entry.
* *
......
...@@ -30,7 +30,7 @@ Dbtux::Dbtux(const Configuration& conf) : ...@@ -30,7 +30,7 @@ Dbtux::Dbtux(const Configuration& conf) :
#endif #endif
c_internalStartPhase(0), c_internalStartPhase(0),
c_typeOfStart(NodeState::ST_ILLEGAL_TYPE), c_typeOfStart(NodeState::ST_ILLEGAL_TYPE),
c_keyBuffer(0) c_dataBuffer(0)
{ {
BLOCK_CONSTRUCTOR(Dbtux); BLOCK_CONSTRUCTOR(Dbtux);
// verify size assumptions (also when release-compiled) // verify size assumptions (also when release-compiled)
...@@ -195,7 +195,10 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ...@@ -195,7 +195,10 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
new (indexPtr.p) Index(); new (indexPtr.p) Index();
} }
// allocate buffers // allocate buffers
c_keyBuffer = (Uint32*)allocRecord("c_keyBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1); c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes);
c_searchKey = (TableData)allocRecord("c_searchKey", sizeof(Uint32*), MaxIndexAttributes);
c_entryKey = (TableData)allocRecord("c_entryKey", sizeof(Uint32*), MaxIndexAttributes);
c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1);
// ack // ack
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
conf->senderRef = reference(); conf->senderRef = reference();
...@@ -206,6 +209,37 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ...@@ -206,6 +209,37 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
// utils // utils
void
Dbtux::setKeyAttrs(const Frag& frag)
{
Data keyAttrs = c_keyAttrs; // global
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
for (unsigned i = 0; i < numAttrs; i++) {
const DescAttr& descAttr = descEnt.m_descAttr[i];
Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// set attr id and fixed size
keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size);
keyAttrs += 1;
}
}
void
Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData)
{
ConstData keyAttrs = c_keyAttrs; // global
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc;
const Uint32 tupVersion = ent.m_tupVersion;
ndbrequire(start < frag.m_numAttrs);
const unsigned numAttrs = frag.m_numAttrs - start;
// start applies to both keys and output data
keyAttrs += start;
keyData += start;
c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, numAttrs, keyAttrs, keyData);
jamEntry();
}
void void
Dbtux::copyAttrs(Data dst, ConstData src, CopyPar& copyPar) Dbtux::copyAttrs(Data dst, ConstData src, CopyPar& copyPar)
{ {
...@@ -240,4 +274,46 @@ Dbtux::copyAttrs(Data dst, ConstData src, CopyPar& copyPar) ...@@ -240,4 +274,46 @@ Dbtux::copyAttrs(Data dst, ConstData src, CopyPar& copyPar)
copyPar = c; copyPar = c;
} }
/*
* Input is pointers to table attributes. Output is array of attribute
* data with headers. Copies whatever fits.
*/
void
Dbtux::copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2)
{
ConstData keyAttrs = c_keyAttrs; // global
const unsigned numAttrs = frag.m_numAttrs;
unsigned len2 = maxlen2;
for (unsigned n = 0; n < numAttrs; n++) {
jam();
const unsigned attrId = keyAttrs.ah().getAttributeId();
const unsigned dataSize = keyAttrs.ah().getDataSize();
const Uint32* const p1 = *data1;
if (p1 != 0) {
if (len2 == 0)
return;
data2.ah() = AttributeHeader(attrId, dataSize);
data2 += 1;
len2 -= 1;
unsigned n = dataSize;
for (unsigned i = 0; i < dataSize; i++) {
if (len2 == 0)
return;
*data2 = p1[i];
data2 += 1;
len2 -= 1;
}
} else {
if (len2 == 0)
return;
data2.ah() = AttributeHeader(attrId, 0);
data2.ah().setNULL();
data2 += 1;
len2 -= 1;
}
keyAttrs += 1;
data1 += 1;
}
}
BLOCK_FUNCTIONS(Dbtux); BLOCK_FUNCTIONS(Dbtux);
...@@ -73,30 +73,25 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -73,30 +73,25 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
} }
ndbrequire(fragPtr.i != RNIL); ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p; Frag& frag = *fragPtr.p;
// set up index entry // set up index keys for this operation
setKeyAttrs(frag);
// set up search entry
TreeEnt ent; TreeEnt ent;
ent.m_tupLoc = TupLoc(req->pageId, req->pageOffset); ent.m_tupLoc = TupLoc(req->pageId, req->pageOffset);
ent.m_tupVersion = req->tupVersion; ent.m_tupVersion = req->tupVersion;
ent.m_fragBit = fragBit; ent.m_fragBit = fragBit;
// read search key // read search key
ReadPar readPar; readKeyAttrs(frag, ent, 0, c_searchKey);
readPar.m_ent = ent;
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
// output goes here
readPar.m_data = c_keyBuffer;
tupReadAttrs(signal, frag, readPar);
// check if all keys are null // check if all keys are null
{ {
const unsigned numAttrs = frag.m_numAttrs;
bool allNull = true; bool allNull = true;
ConstData data = readPar.m_data; for (unsigned i = 0; i < numAttrs; i++) {
for (unsigned i = 0; i < frag.m_numAttrs; i++) { if (c_searchKey[i] != 0) {
if (! data.ah().isNULL()) {
jam(); jam();
allNull = false; allNull = false;
break; break;
} }
data += AttributeHeaderSize + data.ah().getDataSize();
} }
if (allNull) { if (allNull) {
jam(); jam();
...@@ -105,11 +100,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -105,11 +100,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
return; return;
} }
} }
// find position in tree
SearchPar searchPar;
searchPar.m_data = c_keyBuffer;
searchPar.m_ent = ent;
TreePos treePos;
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << "opCode=" << dec << opCode; debugOut << "opCode=" << dec << opCode;
...@@ -121,7 +111,9 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -121,7 +111,9 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
debugOut << endl; debugOut << endl;
} }
#endif #endif
treeSearch(signal, frag, searchPar, treePos); // find position in tree
TreePos treePos;
treeSearch(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << endl; debugOut << treePos << endl;
......
...@@ -28,6 +28,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node) ...@@ -28,6 +28,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
Uint32 pageOffset = NullTupLoc.m_pageOffset; Uint32 pageOffset = NullTupLoc.m_pageOffset;
Uint32* node32 = 0; Uint32* node32 = 0;
int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
jamEntry();
if (errorCode == 0) { if (errorCode == 0) {
jam(); jam();
node.m_loc = TupLoc(pageId, pageOffset); node.m_loc = TupLoc(pageId, pageOffset);
...@@ -63,6 +64,7 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc) ...@@ -63,6 +64,7 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc)
Uint32 pageOffset = loc.m_pageOffset; Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = 0; Uint32* node32 = 0;
c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
jamEntry();
node.m_loc = loc; node.m_loc = loc;
node.m_node = reinterpret_cast<TreeNode*>(node32); node.m_node = reinterpret_cast<TreeNode*>(node32);
node.m_acc = AccNone; node.m_acc = AccNone;
...@@ -83,8 +85,8 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc) ...@@ -83,8 +85,8 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc)
new (node.m_node) TreeNode(); new (node.m_node) TreeNode();
#ifdef VM_TRACE #ifdef VM_TRACE
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
memset(tree.getPref(node.m_node, 0), 0xa2, tree.m_prefSize << 2); memset(node.getPref(0), 0xa2, tree.m_prefSize << 2);
memset(tree.getPref(node.m_node, 1), 0xa2, tree.m_prefSize << 2); memset(node.getPref(1), 0xa2, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node.m_node); TreeEnt* entList = tree.getEntList(node.m_node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2)); memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif #endif
...@@ -103,35 +105,23 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node) ...@@ -103,35 +105,23 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
Uint32 pageOffset = loc.m_pageOffset; Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node); Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
jamEntry();
// invalidate handle and storage // invalidate handle and storage
node.m_loc = NullTupLoc; node.m_loc = NullTupLoc;
node.m_node = 0; node.m_node = 0;
} }
/* /*
* Set prefix. * Set prefix. Copies the number of words that fits. Includes
* attribute headers for now. XXX use null mask instead
*/ */
void void
Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i) Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
{ {
Frag& frag = node.m_frag; const Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
ReadPar readPar; readKeyAttrs(frag, node.getMinMax(i), 0, c_entryKey);
ndbrequire(i <= 1); copyAttrs(frag, c_entryKey, node.getPref(i), tree.m_prefSize);
readPar.m_ent = node.getMinMax(i);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
// leave in signal data
readPar.m_data = 0;
// XXX implement max words to read
tupReadAttrs(signal, frag, readPar);
// copy whatever fits
CopyPar copyPar;
copyPar.m_items = readPar.m_count;
copyPar.m_headers = true;
copyPar.m_maxwords = tree.m_prefSize;
Data pref = node.getPref(i);
copyAttrs(pref, readPar.m_data, copyPar);
} }
// node operations // node operations
......
...@@ -390,7 +390,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -390,7 +390,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
const TreeEnt ent = scan.m_scanPos.m_ent; const TreeEnt ent = scan.m_scanPos.m_ent;
// read tuple key // read tuple key
keyPar.m_ent = ent; keyPar.m_ent = ent;
keyPar.m_data = c_keyBuffer; keyPar.m_data = c_dataBuffer;
tupReadKeys(signal, frag, keyPar); tupReadKeys(signal, frag, keyPar);
// get read lock or exclusive lock // get read lock or exclusive lock
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
...@@ -483,7 +483,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -483,7 +483,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (keyPar.m_data == 0) { if (keyPar.m_data == 0) {
jam(); jam();
keyPar.m_ent = ent; keyPar.m_ent = ent;
keyPar.m_data = c_keyBuffer; keyPar.m_data = c_dataBuffer;
tupReadKeys(signal, frag, keyPar); tupReadKeys(signal, frag, keyPar);
} }
} }
...@@ -704,12 +704,12 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -704,12 +704,12 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
bound.first(iter); bound.first(iter);
for (unsigned j = 0; j < bound.getSize(); j++) { for (unsigned j = 0; j < bound.getSize(); j++) {
jam(); jam();
c_keyBuffer[j] = *iter.data; c_dataBuffer[j] = *iter.data;
bound.next(iter); bound.next(iter);
} }
// comparison parameters // comparison parameters
BoundPar boundPar; BoundPar boundPar;
boundPar.m_data1 = c_keyBuffer; boundPar.m_data1 = c_dataBuffer;
boundPar.m_count1 = scan.m_boundCnt[0]; boundPar.m_count1 = scan.m_boundCnt[0];
boundPar.m_dir = 0; boundPar.m_dir = 0;
loop: { loop: {
...@@ -847,12 +847,12 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -847,12 +847,12 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
bound.first(iter); bound.first(iter);
for (unsigned j = 0; j < bound.getSize(); j++) { for (unsigned j = 0; j < bound.getSize(); j++) {
jam(); jam();
c_keyBuffer[j] = *iter.data; c_dataBuffer[j] = *iter.data;
bound.next(iter); bound.next(iter);
} }
// comparison parameters // comparison parameters
BoundPar boundPar; BoundPar boundPar;
boundPar.m_data1 = c_keyBuffer; boundPar.m_data1 = c_dataBuffer;
boundPar.m_count1 = scan.m_boundCnt[1]; boundPar.m_count1 = scan.m_boundCnt[1];
boundPar.m_dir = 1; boundPar.m_dir = 1;
// use copy of position // use copy of position
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
* same in min/max need not be checked. * same in min/max need not be checked.
*/ */
void void
Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& treePos) Dbtux::treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
...@@ -45,40 +45,25 @@ loop: { ...@@ -45,40 +45,25 @@ loop: {
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup != 0); ndbrequire(occup != 0);
// number of equal initial attributes in bounding node // number of equal initial attributes in bounding node
unsigned numEq = ZNIL; unsigned start = ZNIL;
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
jam(); jam();
unsigned start1 = 0;
// compare prefix // compare prefix
CmpPar cmpPar; int ret = cmpSearchKey(frag, start1, searchKey, node.getPref(i), tree.m_prefSize);
cmpPar.m_data1 = searchPar.m_data;
cmpPar.m_data2 = node.getPref(i);
cmpPar.m_len2 = tree.m_prefSize;
cmpPar.m_first = 0;
cmpPar.m_numEq = 0;
int ret = cmpTreeAttrs(frag, cmpPar);
if (ret == NdbSqlUtil::CmpUnknown) { if (ret == NdbSqlUtil::CmpUnknown) {
jam(); jam();
// read full value // read and compare remaining attributes
ReadPar readPar; readKeyAttrs(frag, node.getMinMax(i), start1, c_entryKey);
readPar.m_ent = node.getMinMax(i); ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
ndbrequire(cmpPar.m_numEq < numAttrs);
readPar.m_first = cmpPar.m_numEq;
readPar.m_count = numAttrs - cmpPar.m_numEq;
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare full value
cmpPar.m_data2 = readPar.m_data;
cmpPar.m_len2 = ZNIL; // big
cmpPar.m_first = readPar.m_first;
ret = cmpTreeAttrs(frag, cmpPar);
ndbrequire(ret != NdbSqlUtil::CmpUnknown); ndbrequire(ret != NdbSqlUtil::CmpUnknown);
} }
if (numEq > cmpPar.m_numEq) if (start > start1)
numEq = cmpPar.m_numEq; start = start1;
if (ret == 0) { if (ret == 0) {
jam(); jam();
// keys are equal, compare entry values // keys are equal, compare entry values
ret = searchPar.m_ent.cmp(node.getMinMax(i)); ret = searchEnt.cmp(node.getMinMax(i));
} }
if (i == 0 ? (ret < 0) : (ret > 0)) { if (i == 0 ? (ret < 0) : (ret > 0)) {
jam(); jam();
...@@ -110,28 +95,18 @@ loop: { ...@@ -110,28 +95,18 @@ loop: {
for (unsigned j = 1; j <= numWithin; j++) { for (unsigned j = 1; j <= numWithin; j++) {
jam(); jam();
int ret = 0; int ret = 0;
// compare remaining attributes if (start < numAttrs) {
if (numEq < numAttrs) {
jam(); jam();
ReadPar readPar; // read and compare remaining attributes
readPar.m_ent = node.getEnt(j); unsigned start1 = start;
readPar.m_first = numEq; readKeyAttrs(frag, node.getEnt(j), start1, c_entryKey);
readPar.m_count = numAttrs - numEq; ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare
CmpPar cmpPar;
cmpPar.m_data1 = searchPar.m_data;
cmpPar.m_data2 = readPar.m_data;
cmpPar.m_len2 = ZNIL; // big
cmpPar.m_first = readPar.m_first;
ret = cmpTreeAttrs(frag, cmpPar);
ndbrequire(ret != NdbSqlUtil::CmpUnknown); ndbrequire(ret != NdbSqlUtil::CmpUnknown);
} }
if (ret == 0) { if (ret == 0) {
jam(); jam();
// keys are equal, compare entry values // keys are equal, compare entry values
ret = searchPar.m_ent.cmp(node.getEnt(j)); ret = searchEnt.cmp(node.getEnt(j));
} }
if (ret <= 0) { if (ret <= 0) {
jam(); jam();
......
...@@ -40,5 +40,13 @@ optim 7 mc02/a 42 ms 69 ms 61 pct ...@@ -40,5 +40,13 @@ optim 7 mc02/a 42 ms 69 ms 61 pct
optim 8 mc02/a 42 ms 69 ms 62 pct optim 8 mc02/a 42 ms 69 ms 62 pct
mc02/b 54 ms 104 ms 92 pct mc02/b 54 ms 104 ms 92 pct
optim 9 mc02/a 43 ms 67 ms 54 pct
mc02/b 53 ms 102 ms 91 pct
optim 10 mc02/a 44 ms 65 ms 46 pct
mc02/b 53 ms 88 ms 66 pct
optim 11 mc02/a 43 ms 63 ms 46 pct
mc02/b 52 ms 86 ms 63 pct
vim: set et: vim: set et:
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment