Commit 23dd2e15 authored by pekka@mysql.com's avatar pekka@mysql.com

ndb charsets: TUX must use Dbtup::readAttributes

parent 82c0e3b4
...@@ -1004,17 +1004,20 @@ public: ...@@ -1004,17 +1004,20 @@ public:
void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node); void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node);
/* /*
* TUX reads primary table attributes for index keys. Input is * TUX reads primary table attributes for index keys. Tuple is
* attribute ids in AttributeHeader format. Output is pointers to * specified by location of original tuple and version number. Input
* attribute data within tuple or 0 for NULL value. * is attribute ids in AttributeHeader format. Output is attribute
* data with headers. Uses readAttributes with xfrm option set.
* Returns number of words or negative (-terrorCode) on error.
*/ */
void tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData); int tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut);
/* /*
* TUX reads primary key without headers into an array of words. Used * TUX reads primary key without headers into an array of words. Used
* for md5 summing and when returning keyinfo. * for md5 summing and when returning keyinfo. Returns number of
* words or negative (-terrorCode) on error.
*/ */
void tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pkSize, Uint32* pkData); int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut);
/* /*
* TUX checks if tuple is visible to scan. * TUX checks if tuple is visible to scan.
...@@ -1368,7 +1371,7 @@ private: ...@@ -1368,7 +1371,7 @@ private:
//------------------------------------------------------------------ //------------------------------------------------------------------
int readAttributes(Page* const pagePtr, int readAttributes(Page* const pagePtr,
Uint32 TupHeadOffset, Uint32 TupHeadOffset,
Uint32* inBuffer, const Uint32* inBuffer,
Uint32 inBufLen, Uint32 inBufLen,
Uint32* outBuffer, Uint32* outBuffer,
Uint32 TmaxRead); Uint32 TmaxRead);
......
...@@ -112,10 +112,11 @@ Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& no ...@@ -112,10 +112,11 @@ Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& no
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset; node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
} }
void int
Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData) Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut)
{ {
ljamEntry(); ljamEntry();
// use own variables instead of globals
FragrecordPtr fragPtr; FragrecordPtr fragPtr;
fragPtr.i = fragPtrI; fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
...@@ -134,6 +135,7 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu ...@@ -134,6 +135,7 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu
while (true) { while (true) {
ptrCheckGuard(opPtr, cnoOfOprec, operationrec); ptrCheckGuard(opPtr, cnoOfOprec, operationrec);
if (opPtr.p->realPageIdC != RNIL) { if (opPtr.p->realPageIdC != RNIL) {
// update page and offset
pagePtr.i = opPtr.p->realPageIdC; pagePtr.i = opPtr.p->realPageIdC;
pageOffset = opPtr.p->pageOffsetC; pageOffset = opPtr.p->pageOffsetC;
ptrCheckGuard(pagePtr, cnoOfPage, page); ptrCheckGuard(pagePtr, cnoOfPage, page);
...@@ -147,33 +149,34 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu ...@@ -147,33 +149,34 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu
ndbrequire(++loopGuard < (1 << ZTUP_VERSION_BITS)); ndbrequire(++loopGuard < (1 << ZTUP_VERSION_BITS));
} }
} }
const Uint32 tabDescriptor = tablePtr.p->tabDescriptor; // read key attributes from found tuple version
const Uint32* tupleHeader = &pagePtr.p->pageWord[pageOffset]; // save globals
for (Uint32 i = 0; i < numAttrs; i++) { TablerecPtr tabptr_old = tabptr;
AttributeHeader ah(attrIds[i]); FragrecordPtr fragptr_old = fragptr;
const Uint32 attrId = ah.getAttributeId(); OperationrecPtr operPtr_old = operPtr;
const Uint32 index = tabDescriptor + (attrId << ZAD_LOG_SIZE); // new globals
const Uint32 desc1 = tableDescriptor[index].tabDescr; tabptr = tablePtr;
const Uint32 desc2 = tableDescriptor[index + 1].tabDescr; fragptr = fragPtr;
if (AttributeDescriptor::getNullable(desc1)) { operPtr.i = RNIL;
Uint32 offset = AttributeOffset::getNullFlagOffset(desc2); operPtr.p = NULL;
ndbrequire(offset < tablePtr.p->tupNullWords); // do it
offset += tablePtr.p->tupNullIndex; int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL);
ndbrequire(offset < tablePtr.p->tupheadsize); // restore globals
if (AttributeOffset::isNULL(tupleHeader[offset], desc2)) { tabptr = tabptr_old;
ljam(); fragptr = fragptr_old;
attrData[i] = 0; operPtr = operPtr_old;
continue; // done
} if (ret == (Uint32)-1) {
} ret = terrorCode ? (-(int)terrorCode) : -1;
attrData[i] = tupleHeader + AttributeOffset::getOffset(desc2); }
} return ret;
} }
void int
Dbtup::tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pkSize, Uint32* pkData) Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut)
{ {
ljamEntry(); ljamEntry();
// use own variables instead of globals
FragrecordPtr fragPtr; FragrecordPtr fragPtr;
fragPtr.i = fragPtrI; fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
...@@ -184,25 +187,45 @@ Dbtup::tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pk ...@@ -184,25 +187,45 @@ Dbtup::tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pk
pagePtr.i = pageId; pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page); ptrCheckGuard(pagePtr, cnoOfPage, page);
const Uint32 tabDescriptor = tablePtr.p->tabDescriptor; const Uint32 tabDescriptor = tablePtr.p->tabDescriptor;
const Uint32 numAttrs = tablePtr.p->noOfKeyAttr;
const Uint32* attrIds = &tableDescriptor[tablePtr.p->readKeyArray].tabDescr; const Uint32* attrIds = &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
const Uint32* tupleHeader = &pagePtr.p->pageWord[pageOffset]; const Uint32 numAttrs = tablePtr.p->noOfKeyAttr;
Uint32 size = 0; // read pk attributes from original tuple
for (Uint32 i = 0; i < numAttrs; i++) { // save globals
AttributeHeader ah(attrIds[i]); TablerecPtr tabptr_old = tabptr;
const Uint32 attrId = ah.getAttributeId(); FragrecordPtr fragptr_old = fragptr;
const Uint32 index = tabDescriptor + (attrId << ZAD_LOG_SIZE); OperationrecPtr operPtr_old = operPtr;
const Uint32 desc1 = tableDescriptor[index].tabDescr; // new globals
const Uint32 desc2 = tableDescriptor[index + 1].tabDescr; tabptr = tablePtr;
ndbrequire(! AttributeDescriptor::getNullable(desc1)); fragptr = fragPtr;
const Uint32 attrSize = AttributeDescriptor::getSizeInWords(desc1); operPtr.i = RNIL;
const Uint32* attrData = tupleHeader + AttributeOffset::getOffset(desc2); operPtr.p = NULL;
for (Uint32 j = 0; j < attrSize; j++) { // do it
pkData[size + j] = attrData[j]; int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL);
} // restore globals
size += attrSize; tabptr = tabptr_old;
} fragptr = fragptr_old;
*pkSize = size; operPtr = operPtr_old;
// done
if (ret != (Uint32)-1) {
// remove headers
Uint32 n = 0;
Uint32 i = 0;
while (n < numAttrs) {
const AttributeHeader ah(dataOut[i]);
Uint32 size = ah.getDataSize();
ndbrequire(size != 0);
for (Uint32 j = 0; j < size; j++) {
dataOut[i + j - n] = dataOut[i + j + 1];
}
n += 1;
i += 1 + size;
}
ndbrequire(i == ret);
ret -= numAttrs;
} else {
ret = terrorCode ? (-(int)terrorCode) : -1;
}
return ret;
} }
bool bool
......
...@@ -146,7 +146,7 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) ...@@ -146,7 +146,7 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
/* ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- */
int Dbtup::readAttributes(Page* const pagePtr, int Dbtup::readAttributes(Page* const pagePtr,
Uint32 tupHeadOffset, Uint32 tupHeadOffset,
Uint32* inBuffer, const Uint32* inBuffer,
Uint32 inBufLen, Uint32 inBufLen,
Uint32* outBuffer, Uint32* outBuffer,
Uint32 maxRead) Uint32 maxRead)
......
...@@ -162,11 +162,6 @@ private: ...@@ -162,11 +162,6 @@ private:
// AttributeHeader size is assumed to be 1 word // AttributeHeader size is assumed to be 1 word
static const unsigned AttributeHeaderSize = 1; static const unsigned AttributeHeaderSize = 1;
/*
* Array of pointers to TUP table attributes. Always read-on|y.
*/
typedef const Uint32** TableData;
/* /*
* Logical tuple address, "local key". Identifies table tuples. * Logical tuple address, "local key". Identifies table tuples.
*/ */
...@@ -557,9 +552,9 @@ private: ...@@ -557,9 +552,9 @@ private:
void execREAD_CONFIG_REQ(Signal* signal); void execREAD_CONFIG_REQ(Signal* signal);
// utils // utils
void setKeyAttrs(const Frag& frag); void setKeyAttrs(const Frag& frag);
void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData); void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData);
void readTablePk(const Frag& frag, TreeEnt ent, unsigned& pkSize, Data pkData); void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
void copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize); void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
/* /*
* DbtuxMeta.cpp * DbtuxMeta.cpp
...@@ -626,17 +621,15 @@ private: ...@@ -626,17 +621,15 @@ private:
/* /*
* DbtuxSearch.cpp * DbtuxSearch.cpp
*/ */
void searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/* /*
* DbtuxCmp.cpp * DbtuxCmp.cpp
*/ */
int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize); int cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize); int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey);
/* /*
* DbtuxDebug.cpp * DbtuxDebug.cpp
...@@ -683,17 +676,27 @@ private: ...@@ -683,17 +676,27 @@ private:
Uint32 c_typeOfStart; Uint32 c_typeOfStart;
/* /*
* Array of index key attribute ids in AttributeHeader format. * Global data set at operation start. Unpacked from index metadata.
* Includes fixed attribute sizes. This is global data set at * Not passed as parameter to methods. Invalid across timeslices.
* operation start and is not passed as a parameter. *
* TODO inline all into index metadata
*/ */
// index key attr ids with sizes in AttributeHeader format
Data c_keyAttrs; Data c_keyAttrs;
// buffer for search key data as pointers to TUP storage // pointers to index key comparison functions
TableData c_searchKey; NdbSqlUtil::Cmp** c_sqlCmp;
/*
* Other buffers used during the operation.
*/
// buffer for search key data with headers
Data c_searchKey;
// buffer for current entry key data as pointers to TUP storage // buffer for current entry key data with headers
TableData c_entryKey; Data c_entryKey;
// buffer for scan bounds and keyinfo (primary key) // buffer for scan bounds and keyinfo (primary key)
Data c_dataBuffer; Data c_dataBuffer;
......
...@@ -18,21 +18,24 @@ ...@@ -18,21 +18,24 @@
#include "Dbtux.hpp" #include "Dbtux.hpp"
/* /*
* Search key vs node prefix. * Search key vs node prefix or entry
* *
* The comparison starts at given attribute position (in fact 0). The * The comparison starts at given attribute position. The position is
* position is updated by number of equal initial attributes found. The * updated by number of equal initial attributes found. The entry data
* prefix may be partial in which case CmpUnknown may be returned. * may be partial in which case CmpUnknown may be returned.
*/ */
int int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen) Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen)
{ {
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// number of words of attribute data left // number of words of attribute data left
unsigned len2 = maxlen; unsigned len2 = maxlen;
// skip to right position in search key // skip to right position in search key only
searchKey += start; for (unsigned i = 0; i < start; i++) {
jam();
searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
}
int ret = 0; int ret = 0;
while (start < numAttrs) { while (start < numAttrs) {
if (len2 <= AttributeHeaderSize) { if (len2 <= AttributeHeaderSize) {
...@@ -41,22 +44,21 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons ...@@ -41,22 +44,21 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
break; break;
} }
len2 -= AttributeHeaderSize; len2 -= AttributeHeaderSize;
if (*searchKey != 0) { if (! searchKey.ah().isNULL()) {
if (! entryData.ah().isNULL()) { if (! entryData.ah().isNULL()) {
jam(); jam();
// current attribute // current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start]; const DescAttr& descAttr = descEnt.m_descAttr[start];
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
// full data size // full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize()); ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2); const unsigned size2 = min(size1, len2);
len2 -= size2; len2 -= size2;
// compare // compare
const Uint32* const p1 = *searchKey; NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
const Uint32* const p1 = &searchKey[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize];
ret = (*type.m_cmp)(p1, p2, size1, size2); ret = (*cmp)(p1, p2, size1, size2);
if (ret != 0) { if (ret != 0) {
jam(); jam();
break; break;
...@@ -75,7 +77,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons ...@@ -75,7 +77,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
break; break;
} }
} }
searchKey += 1; searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
entryData += AttributeHeaderSize + entryData.ah().getDataSize(); entryData += AttributeHeaderSize + entryData.ah().getDataSize();
start++; start++;
} }
...@@ -83,60 +85,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons ...@@ -83,60 +85,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
} }
/* /*
* Search key vs tree entry. * Scan bound vs node prefix or entry.
*
* Start position is updated as in previous routine.
*/
int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// skip to right position
searchKey += start;
entryKey += start;
int ret = 0;
while (start < numAttrs) {
if (*searchKey != 0) {
if (*entryKey != 0) {
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
const Uint32* const p1 = *searchKey;
const Uint32* const p2 = *entryKey;
ret = (*type.m_cmp)(p1, p2, size1, size1);
if (ret != 0) {
jam();
break;
}
} else {
jam();
// not NULL > NULL
ret = +1;
break;
}
} else {
if (*entryKey != 0) {
jam();
// NULL < not NULL
ret = -1;
break;
}
}
searchKey += 1;
entryKey += 1;
start++;
}
return ret;
}
/*
* Scan bound vs node prefix.
* *
* Compare lower or upper bound and index attribute data. The attribute * Compare lower or upper bound and index attribute data. The attribute
* data may be partial in which case CmpUnknown may be returned. * data may be partial in which case CmpUnknown may be returned.
...@@ -184,8 +133,6 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -184,8 +133,6 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
// current attribute // current attribute
const unsigned index = boundInfo.ah().getAttributeId(); const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index]; const DescAttr& descAttr = descEnt.m_descAttr[index];
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId); ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// full data size // full data size
const unsigned size1 = boundInfo.ah().getDataSize(); const unsigned size1 = boundInfo.ah().getDataSize();
...@@ -193,9 +140,10 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -193,9 +140,10 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
const unsigned size2 = min(size1, len2); const unsigned size2 = min(size1, len2);
len2 -= size2; len2 -= size2;
// compare // compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
const Uint32* const p1 = &boundInfo[AttributeHeaderSize]; const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize];
int ret = (*type.m_cmp)(p1, p2, size1, size2); int ret = (*cmp)(p1, p2, size1, size2);
if (ret != 0) { if (ret != 0) {
jam(); jam();
return ret; return ret;
...@@ -244,72 +192,3 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -244,72 +192,3 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
return +1; return +1;
} }
} }
/*
* Scan bound vs tree entry.
*/
int
Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey)
{
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// direction 0-lower 1-upper
ndbrequire(dir <= 1);
// initialize type to equality
unsigned type = 4;
while (boundCount != 0) {
// get and skip bound type
type = boundInfo[0];
boundInfo += 1;
if (! boundInfo.ah().isNULL()) {
if (*entryKey != 0) {
jam();
// current attribute
const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index];
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = *entryKey;
int ret = (*type.m_cmp)(p1, p2, size1, size1);
if (ret != 0) {
jam();
return ret;
}
} else {
jam();
// not NULL > NULL
return +1;
}
} else {
jam();
if (*entryKey != 0) {
jam();
// NULL < not NULL
return -1;
}
}
boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
entryKey += 1;
boundCount -= 1;
}
if (dir == 0) {
// lower bound
jam();
if (type == 1) {
jam();
return +1;
}
return -1;
} else {
// upper bound
jam();
if (type == 3) {
jam();
return -1;
}
return +1;
}
}
...@@ -207,14 +207,10 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -207,14 +207,10 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
} }
// check ordering within node // check ordering within node
for (unsigned j = 1; j < node.getOccup(); j++) { for (unsigned j = 1; j < node.getOccup(); j++) {
unsigned start = 0;
const TreeEnt ent1 = node.getEnt(j - 1); const TreeEnt ent1 = node.getEnt(j - 1);
const TreeEnt ent2 = node.getEnt(j); const TreeEnt ent2 = node.getEnt(j);
if (j == 1) { unsigned start = 0;
readKeyAttrs(frag, ent1, start, c_searchKey); readKeyAttrs(frag, ent1, start, c_searchKey);
} else {
memcpy(c_searchKey, c_entryKey, frag.m_numAttrs << 2);
}
readKeyAttrs(frag, ent2, start, c_entryKey); readKeyAttrs(frag, ent2, start, c_entryKey);
int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey); int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
if (ret == 0) if (ret == 0)
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
#define DBTUX_GEN_CPP #define DBTUX_GEN_CPP
#include "Dbtux.hpp" #include "Dbtux.hpp"
#include <signaldata/TuxContinueB.hpp>
#include <signaldata/TuxContinueB.hpp>
Dbtux::Dbtux(const Configuration& conf) : Dbtux::Dbtux(const Configuration& conf) :
SimulatedBlock(DBTUX, conf), SimulatedBlock(DBTUX, conf),
...@@ -202,8 +200,9 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ...@@ -202,8 +200,9 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
} }
// allocate buffers // allocate buffers
c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes); c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes);
c_searchKey = (TableData)allocRecord("c_searchKey", sizeof(Uint32*), MaxIndexAttributes); c_sqlCmp = (NdbSqlUtil::Cmp**)allocRecord("c_sqlCmp", sizeof(NdbSqlUtil::Cmp*), MaxIndexAttributes);
c_entryKey = (TableData)allocRecord("c_entryKey", sizeof(Uint32*), MaxIndexAttributes); c_searchKey = (Uint32*)allocRecord("c_searchKey", sizeof(Uint32*), MaxIndexAttributes);
c_entryKey = (Uint32*)allocRecord("c_entryKey", sizeof(Uint32*), MaxIndexAttributes);
c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1); c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1);
// ack // ack
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
...@@ -219,6 +218,7 @@ void ...@@ -219,6 +218,7 @@ void
Dbtux::setKeyAttrs(const Frag& frag) Dbtux::setKeyAttrs(const Frag& frag)
{ {
Data keyAttrs = c_keyAttrs; // global Data keyAttrs = c_keyAttrs; // global
NdbSqlUtil::Cmp** sqlCmp = c_sqlCmp; // global
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
for (unsigned i = 0; i < numAttrs; i++) { for (unsigned i = 0; i < numAttrs; i++) {
...@@ -227,75 +227,71 @@ Dbtux::setKeyAttrs(const Frag& frag) ...@@ -227,75 +227,71 @@ Dbtux::setKeyAttrs(const Frag& frag)
// set attr id and fixed size // set attr id and fixed size
keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size); keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size);
keyAttrs += 1; keyAttrs += 1;
// set comparison method pointer
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
ndbrequire(sqlType.m_cmp != 0);
*(sqlCmp++) = sqlType.m_cmp;
} }
} }
void void
Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData) Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
{ {
ConstData keyAttrs = c_keyAttrs; // global ConstData keyAttrs = c_keyAttrs; // global
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc; const TupLoc tupLoc = ent.m_tupLoc;
const Uint32 tupVersion = ent.m_tupVersion; const Uint32 tupVersion = ent.m_tupVersion;
ndbrequire(start < frag.m_numAttrs); ndbrequire(start < frag.m_numAttrs);
const unsigned numAttrs = frag.m_numAttrs - start; const Uint32 numAttrs = frag.m_numAttrs - start;
// start applies to both keys and output data // skip to start position in keyAttrs only
keyAttrs += start; keyAttrs += start;
keyData += start; int ret = c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, keyAttrs, numAttrs, keyData);
c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, numAttrs, keyAttrs, keyData);
jamEntry(); jamEntry();
// TODO handle error
ndbrequire(ret > 0);
} }
void void
Dbtux::readTablePk(const Frag& frag, TreeEnt ent, unsigned& pkSize, Data pkData) Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize)
{ {
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc; const TupLoc tupLoc = ent.m_tupLoc;
Uint32 size = 0; int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, pkData);
c_tup->tuxReadKeys(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, &size, pkData); jamEntry();
ndbrequire(size != 0); // TODO handle error
pkSize = size; ndbrequire(ret > 0);
pkSize = ret;
} }
/* /*
* Input is pointers to table attributes. Output is array of attribute * Copy attribute data with headers. Input is all index key data.
* data with headers. Copies whatever fits. * Copies whatever fits.
*/ */
void void
Dbtux::copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2) Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2)
{ {
ConstData keyAttrs = c_keyAttrs; // global unsigned n = frag.m_numAttrs;
const unsigned numAttrs = frag.m_numAttrs;
unsigned len2 = maxlen2; unsigned len2 = maxlen2;
for (unsigned n = 0; n < numAttrs; n++) { while (n != 0) {
jam(); jam();
const unsigned attrId = keyAttrs.ah().getAttributeId(); const unsigned dataSize = data1.ah().getDataSize();
const unsigned dataSize = keyAttrs.ah().getDataSize(); // copy header
const Uint32* const p1 = *data1;
if (p1 != 0) {
if (len2 == 0) if (len2 == 0)
return; return;
data2.ah() = AttributeHeader(attrId, dataSize); data2[0] = data1[0];
data1 += 1;
data2 += 1; data2 += 1;
len2 -= 1; len2 -= 1;
unsigned n = dataSize; // copy data
for (unsigned i = 0; i < dataSize; i++) { for (unsigned i = 0; i < dataSize; i++) {
if (len2 == 0) if (len2 == 0)
return; return;
*data2 = p1[i]; data2[i] = data1[i];
data2 += 1;
len2 -= 1;
}
} else {
if (len2 == 0)
return;
data2.ah() = AttributeHeader(attrId, 0);
data2.ah().setNULL();
data2 += 1;
len2 -= 1; len2 -= 1;
} }
keyAttrs += 1; data1 += dataSize;
data1 += 1; data2 += dataSize;
n -= 1;
} }
#ifdef VM_TRACE #ifdef VM_TRACE
memset(data2, DataFillByte, len2 << 2); memset(data2, DataFillByte, len2 << 2);
......
...@@ -389,7 +389,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -389,7 +389,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam(); jam();
const TreeEnt ent = scan.m_scanPos.m_ent; const TreeEnt ent = scan.m_scanPos.m_ent;
// read tuple key // read tuple key
readTablePk(frag, ent, pkSize, pkData); readTablePk(frag, ent, pkData, pkSize);
// get read lock or exclusive lock // get read lock or exclusive lock
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL; lockReq->returnCode = RNIL;
...@@ -480,7 +480,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -480,7 +480,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam(); jam();
if (pkSize == 0) { if (pkSize == 0) {
jam(); jam();
readTablePk(frag, ent, pkSize, pkData); readTablePk(frag, ent, pkData, pkSize);
} }
} }
// conf signal // conf signal
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max * TODO optimize for initial equal attrs in node min/max
*/ */
void void
Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
...@@ -144,7 +144,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt sear ...@@ -144,7 +144,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt sear
* to it. * to it.
*/ */
void void
Dbtux::searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment