Commit 4e41d649 authored by unknown's avatar unknown

tux optim 12 - remove max prefix + related

parent ef389ca0
...@@ -88,7 +88,7 @@ ...@@ -88,7 +88,7 @@
* Ordered index constants. Make configurable per index later. * Ordered index constants. Make configurable per index later.
*/ */
#define MAX_TTREE_NODE_SIZE 64 // total words in node #define MAX_TTREE_NODE_SIZE 64 // total words in node
#define MAX_TTREE_PREF_SIZE 4 // words in min/max prefix each #define MAX_TTREE_PREF_SIZE 4 // words in min prefix
#define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy #define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy
/* /*
......
...@@ -77,10 +77,14 @@ ...@@ -77,10 +77,14 @@
#define jam() jamLine(60000 + __LINE__) #define jam() jamLine(60000 + __LINE__)
#define jamEntry() jamEntryLine(60000 + __LINE__) #define jamEntry() jamEntryLine(60000 + __LINE__)
#endif #endif
#ifdef DBTUX_CMP_CPP #ifdef DBTUX_SEARCH_CPP
#define jam() jamLine(70000 + __LINE__) #define jam() jamLine(70000 + __LINE__)
#define jamEntry() jamEntryLine(70000 + __LINE__) #define jamEntry() jamEntryLine(70000 + __LINE__)
#endif #endif
#ifdef DBTUX_CMP_CPP
#define jam() jamLine(80000 + __LINE__)
#define jamEntry() jamEntryLine(80000 + __LINE__)
#endif
#ifdef DBTUX_DEBUG_CPP #ifdef DBTUX_DEBUG_CPP
#define jam() jamLine(90000 + __LINE__) #define jam() jamLine(90000 + __LINE__)
#define jamEntry() jamEntryLine(90000 + __LINE__) #define jamEntry() jamEntryLine(90000 + __LINE__)
...@@ -112,6 +116,7 @@ public: ...@@ -112,6 +116,7 @@ public:
static const unsigned DescPageSize = 256; static const unsigned DescPageSize = 256;
private: private:
static const unsigned MaxTreeNodeSize = MAX_TTREE_NODE_SIZE; static const unsigned MaxTreeNodeSize = MAX_TTREE_NODE_SIZE;
static const unsigned MaxPrefSize = MAX_TTREE_PREF_SIZE;
static const unsigned ScanBoundSegmentSize = 7; static const unsigned ScanBoundSegmentSize = 7;
static const unsigned MaxAccLockOps = MAX_PARALLEL_OP_PER_SCAN; static const unsigned MaxAccLockOps = MAX_PARALLEL_OP_PER_SCAN;
BLOCK_DEFINES(Dbtux); BLOCK_DEFINES(Dbtux);
...@@ -206,19 +211,19 @@ private: ...@@ -206,19 +211,19 @@ private:
unsigned m_fragBit : 1; // which duplicated table fragment unsigned m_fragBit : 1; // which duplicated table fragment
TreeEnt(); TreeEnt();
// methods // methods
bool eq(const TreeEnt ent) const;
int cmp(const TreeEnt ent) const; int cmp(const TreeEnt ent) const;
}; };
static const unsigned TreeEntSize = sizeof(TreeEnt) >> 2; static const unsigned TreeEntSize = sizeof(TreeEnt) >> 2;
static const TreeEnt NullTreeEnt; static const TreeEnt NullTreeEnt;
/* /*
* Tree node has 1) fixed part 2) actual table data for min and max * Tree node has 1) fixed part 2) a prefix of index key data for min
* prefix 3) max and min entries 4) rest of entries 5) one extra entry * entry 3) max and min entries 4) rest of entries 5) one extra entry
* used as work space. * used as work space.
* *
* struct TreeNode part 1, size 6 words * struct TreeNode part 1, size 6 words
* min prefix part 2, size TreeHead::m_prefSize * min prefix part 2, size TreeHead::m_prefSize
* max prefix part 2, size TreeHead::m_prefSize
* max entry part 3 * max entry part 3
* min entry part 3 * min entry part 3
* rest of entries part 4 * rest of entries part 4
...@@ -265,14 +270,14 @@ private: ...@@ -265,14 +270,14 @@ private:
friend struct TreeHead; friend struct TreeHead;
struct TreeHead { struct TreeHead {
Uint8 m_nodeSize; // words in tree node Uint8 m_nodeSize; // words in tree node
Uint8 m_prefSize; // words in min/max prefix each Uint8 m_prefSize; // words in min prefix
Uint8 m_minOccup; // min entries in internal node Uint8 m_minOccup; // min entries in internal node
Uint8 m_maxOccup; // max entries in node Uint8 m_maxOccup; // max entries in node
TupLoc m_root; // root node TupLoc m_root; // root node
TreeHead(); TreeHead();
// methods // methods
unsigned getSize(AccSize acc) const; unsigned getSize(AccSize acc) const;
Data getPref(TreeNode* node, unsigned i) const; Data getPref(TreeNode* node) const;
TreeEnt* getEntList(TreeNode* node) const; TreeEnt* getEntList(TreeNode* node) const;
}; };
...@@ -514,6 +519,8 @@ private: ...@@ -514,6 +519,8 @@ private:
NodeHandle(Frag& frag); NodeHandle(Frag& frag);
NodeHandle(const NodeHandle& node); NodeHandle(const NodeHandle& node);
NodeHandle& operator=(const NodeHandle& node); NodeHandle& operator=(const NodeHandle& node);
// check if unassigned
bool isNull();
// getters // getters
TupLoc getLink(unsigned i); TupLoc getLink(unsigned i);
unsigned getChilds(); // cannot spell unsigned getChilds(); // cannot spell
...@@ -528,7 +535,7 @@ private: ...@@ -528,7 +535,7 @@ private:
void setBalance(int b); void setBalance(int b);
void setNodeScan(Uint32 scanPtrI); void setNodeScan(Uint32 scanPtrI);
// access other parts of the node // access other parts of the node
Data getPref(unsigned i); Data getPref();
TreeEnt getEnt(unsigned pos); TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i); TreeEnt getMinMax(unsigned i);
// for ndbrequire and ndbassert // for ndbrequire and ndbassert
...@@ -618,7 +625,7 @@ private: ...@@ -618,7 +625,7 @@ private:
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc); void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc);
void insertNode(Signal* signal, NodeHandle& node, AccSize acc); void insertNode(Signal* signal, NodeHandle& node, AccSize acc);
void deleteNode(Signal* signal, NodeHandle& node); void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node, unsigned i); void setNodePref(Signal* signal, NodeHandle& node);
// node operations // node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent); void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
...@@ -633,7 +640,6 @@ private: ...@@ -633,7 +640,6 @@ private:
/* /*
* DbtuxTree.cpp * DbtuxTree.cpp
*/ */
void treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent); void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos); void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
...@@ -657,12 +663,20 @@ private: ...@@ -657,12 +663,20 @@ private:
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp); void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
void releaseScanOp(ScanOpPtr& scanPtr); void releaseScanOp(ScanOpPtr& scanPtr);
/*
* DbtuxSearch.cpp
*/
void searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/* /*
* DbtuxCmp.cpp * DbtuxCmp.cpp
*/ */
int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2 = MaxAttrDataSize); int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2); int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey);
int cmpScanBound(const Frag& frag, const BoundPar boundPar); int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey);
/* /*
* DbtuxDebug.cpp * DbtuxDebug.cpp
...@@ -675,6 +689,7 @@ private: ...@@ -675,6 +689,7 @@ private:
TupLoc m_parent; // expected parent address TupLoc m_parent; // expected parent address
int m_depth; // returned depth int m_depth; // returned depth
unsigned m_occup; // returned occupancy unsigned m_occup; // returned occupancy
TreeEnt m_minmax[2]; // returned subtree min and max
bool m_ok; // returned status bool m_ok; // returned status
PrintPar(); PrintPar();
}; };
...@@ -699,6 +714,8 @@ private: ...@@ -699,6 +714,8 @@ private:
DebugTree = 4, // log and check tree after each op DebugTree = 4, // log and check tree after each op
DebugScan = 8 // log scans DebugScan = 8 // log scans
}; };
static const int DataFillByte = 0xa2;
static const int NodeFillByte = 0xa4;
#endif #endif
// start up info // start up info
...@@ -859,13 +876,18 @@ Dbtux::TreeEnt::TreeEnt() : ...@@ -859,13 +876,18 @@ Dbtux::TreeEnt::TreeEnt() :
{ {
} }
inline bool
Dbtux::TreeEnt::eq(const TreeEnt ent) const
{
return
m_tupLoc == ent.m_tupLoc &&
m_tupVersion == ent.m_tupVersion &&
m_fragBit == ent.m_fragBit;
}
inline int inline int
Dbtux::TreeEnt::cmp(const TreeEnt ent) const Dbtux::TreeEnt::cmp(const TreeEnt ent) const
{ {
if (m_fragBit < ent.m_fragBit)
return -1;
if (m_fragBit > ent.m_fragBit)
return +1;
if (m_tupLoc.m_pageId < ent.m_tupLoc.m_pageId) if (m_tupLoc.m_pageId < ent.m_tupLoc.m_pageId)
return -1; return -1;
if (m_tupLoc.m_pageId > ent.m_tupLoc.m_pageId) if (m_tupLoc.m_pageId > ent.m_tupLoc.m_pageId)
...@@ -878,6 +900,10 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const ...@@ -878,6 +900,10 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const
return -1; return -1;
if (m_tupVersion > ent.m_tupVersion) if (m_tupVersion > ent.m_tupVersion)
return +1; return +1;
if (m_fragBit < ent.m_fragBit)
return -1;
if (m_fragBit > ent.m_fragBit)
return +1;
return 0; return 0;
} }
...@@ -920,7 +946,7 @@ Dbtux::TreeHead::getSize(AccSize acc) const ...@@ -920,7 +946,7 @@ Dbtux::TreeHead::getSize(AccSize acc) const
case AccHead: case AccHead:
return NodeHeadSize; return NodeHeadSize;
case AccPref: case AccPref:
return NodeHeadSize + 2 * m_prefSize + 2 * TreeEntSize; return NodeHeadSize + m_prefSize + 2 * TreeEntSize;
case AccFull: case AccFull:
return m_nodeSize; return m_nodeSize;
} }
...@@ -929,16 +955,16 @@ Dbtux::TreeHead::getSize(AccSize acc) const ...@@ -929,16 +955,16 @@ Dbtux::TreeHead::getSize(AccSize acc) const
} }
inline Dbtux::Data inline Dbtux::Data
Dbtux::TreeHead::getPref(TreeNode* node, unsigned i) const Dbtux::TreeHead::getPref(TreeNode* node) const
{ {
Uint32* ptr = (Uint32*)node + NodeHeadSize + i * m_prefSize; Uint32* ptr = (Uint32*)node + NodeHeadSize;
return ptr; return ptr;
} }
inline Dbtux::TreeEnt* inline Dbtux::TreeEnt*
Dbtux::TreeHead::getEntList(TreeNode* node) const Dbtux::TreeHead::getEntList(TreeNode* node) const
{ {
Uint32* ptr = (Uint32*)node + NodeHeadSize + 2 * m_prefSize; Uint32* ptr = (Uint32*)node + NodeHeadSize + m_prefSize;
return (TreeEnt*)ptr; return (TreeEnt*)ptr;
} }
...@@ -1087,6 +1113,12 @@ Dbtux::NodeHandle::operator=(const NodeHandle& node) ...@@ -1087,6 +1113,12 @@ Dbtux::NodeHandle::operator=(const NodeHandle& node)
return *this; return *this;
} }
inline bool
Dbtux::NodeHandle::isNull()
{
return m_node == 0;
}
inline Dbtux::TupLoc inline Dbtux::TupLoc
Dbtux::NodeHandle::getLink(unsigned i) Dbtux::NodeHandle::getLink(unsigned i)
{ {
...@@ -1161,11 +1193,11 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI) ...@@ -1161,11 +1193,11 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI)
} }
inline Dbtux::Data inline Dbtux::Data
Dbtux::NodeHandle::getPref(unsigned i) Dbtux::NodeHandle::getPref()
{ {
TreeHead& tree = m_frag.m_tree; TreeHead& tree = m_frag.m_tree;
ndbrequire(m_acc >= AccPref && i <= 1); ndbrequire(m_acc >= AccPref);
return tree.getPref(m_node, i); return tree.getPref(m_node);
} }
inline Dbtux::TreeEnt inline Dbtux::TreeEnt
......
...@@ -25,14 +25,14 @@ ...@@ -25,14 +25,14 @@
* prefix may be partial in which case CmpUnknown may be returned. * prefix may be partial in which case CmpUnknown may be returned.
*/ */
int int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2) Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen)
{ {
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// number of words of attribute data left // number of words of attribute data left
unsigned len2 = maxlen2; unsigned len2 = maxlen;
// skip to right position in search key // skip to right position in search key
data1 += start; searchKey += start;
int ret = 0; int ret = 0;
while (start < numAttrs) { while (start < numAttrs) {
if (len2 < AttributeHeaderSize) { if (len2 < AttributeHeaderSize) {
...@@ -41,20 +41,20 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstDat ...@@ -41,20 +41,20 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstDat
break; break;
} }
len2 -= AttributeHeaderSize; len2 -= AttributeHeaderSize;
if (*data1 != 0) { if (*searchKey != 0) {
if (! data2.ah().isNULL()) { if (! entryData.ah().isNULL()) {
jam(); jam();
// current attribute // current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start]; const DescAttr& descAttr = descEnt.m_descAttr[start];
const unsigned typeId = descAttr.m_typeId; const unsigned typeId = descAttr.m_typeId;
// full data size // full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize()); ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2); const unsigned size2 = min(size1, len2);
len2 -= size2; len2 -= size2;
// compare // compare
const Uint32* const p1 = *data1; const Uint32* const p1 = *searchKey;
const Uint32* const p2 = &data2[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize];
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2); ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
if (ret != 0) { if (ret != 0) {
jam(); jam();
...@@ -67,15 +67,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstDat ...@@ -67,15 +67,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstDat
break; break;
} }
} else { } else {
if (! data2.ah().isNULL()) { if (! entryData.ah().isNULL()) {
jam(); jam();
// NULL > not NULL // NULL > not NULL
ret = +1; ret = +1;
break; break;
} }
} }
data1 += 1; searchKey += 1;
data2 += AttributeHeaderSize + data2.ah().getDataSize(); entryData += AttributeHeaderSize + entryData.ah().getDataSize();
start++; start++;
} }
// XXX until data format errors are handled // XXX until data format errors are handled
...@@ -89,17 +89,17 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstDat ...@@ -89,17 +89,17 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstDat
* Start position is updated as in previous routine. * Start position is updated as in previous routine.
*/ */
int int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2) Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey)
{ {
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// skip to right position // skip to right position
data1 += start; searchKey += start;
data2 += start; entryKey += start;
int ret = 0; int ret = 0;
while (start < numAttrs) { while (start < numAttrs) {
if (*data1 != 0) { if (*searchKey != 0) {
if (*data2 != 0) { if (*entryKey != 0) {
jam(); jam();
// current attribute // current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start]; const DescAttr& descAttr = descEnt.m_descAttr[start];
...@@ -107,8 +107,8 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableDat ...@@ -107,8 +107,8 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableDat
// full data size // full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare // compare
const Uint32* const p1 = *data1; const Uint32* const p1 = *searchKey;
const Uint32* const p2 = *data2; const Uint32* const p2 = *entryKey;
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1); ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
if (ret != 0) { if (ret != 0) {
jam(); jam();
...@@ -121,15 +121,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableDat ...@@ -121,15 +121,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableDat
break; break;
} }
} else { } else {
if (*data2 != 0) { if (*entryKey != 0) {
jam(); jam();
// NULL > not NULL // NULL > not NULL
ret = +1; ret = +1;
break; break;
} }
} }
data1 += 1; searchKey += 1;
data2 += 1; entryKey += 1;
start++; start++;
} }
// XXX until data format errors are handled // XXX until data format errors are handled
...@@ -137,71 +137,68 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableDat ...@@ -137,71 +137,68 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableDat
return ret; return ret;
} }
/* /*
* Scan bound vs tree entry. * Scan bound vs node prefix.
* *
* Compare lower or upper bound and index attribute data. The attribute * Compare lower or upper bound and index attribute data. The attribute
* data may be partial in which case CmpUnknown may be returned. * data may be partial in which case CmpUnknown may be returned.
* Returns -1 if the boundary is to the left of the compared key and +1 if * Returns -1 if the boundary is to the left of the compared key and +1
* the boundary is to the right of the compared key. * if the boundary is to the right of the compared key.
* *
* To get this behaviour we treat equality a little bit special. * To get this behaviour we treat equality a little bit special. If the
* If the boundary is a lower bound then the boundary is to the left of all * boundary is a lower bound then the boundary is to the left of all
* equal keys and if it is an upper bound then the boundary is to the right * equal keys and if it is an upper bound then the boundary is to the
* of all equal keys. * right of all equal keys.
* *
* When searching for the first key we are using the lower bound to try * When searching for the first key we are using the lower bound to try
* to find the first key that is to the right of the boundary. * to find the first key that is to the right of the boundary. Then we
* Then we start scanning from this tuple (including the tuple itself) * start scanning from this tuple (including the tuple itself) until we
* until we find the first key which is to the right of the boundary. Then * find the first key which is to the right of the boundary. Then we
* we stop and do not include that key in the scan result. * stop and do not include that key in the scan result.
*/ */
int int
Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar) Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
{ {
unsigned type = 4;
int ret = 0;
/*
No boundary means full scan, low boundary is to the right of all keys.
Thus we should always return -1. For upper bound we are to the right of
all keys, thus we should always return +1. We achieve this behaviour
by initialising return value to 0 and set type to 4.
*/
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
ConstData data1 = boundPar.m_data1;
ConstData data2 = boundPar.m_data2;
// direction 0-lower 1-upper // direction 0-lower 1-upper
const unsigned dir = boundPar.m_dir;
ndbrequire(dir <= 1); ndbrequire(dir <= 1);
// number of words of data left // number of words of data left
unsigned len2 = boundPar.m_len2; unsigned len2 = maxlen;
for (unsigned i = 0; i < boundPar.m_count1; i++) { /*
* No boundary means full scan, low boundary is to the right of all
* keys. Thus we should always return -1. For upper bound we are to
* the right of all keys, thus we should always return +1. We achieve
* this behaviour by initializing type to 4.
*/
unsigned type = 4;
while (boundCount != 0) {
if (len2 < AttributeHeaderSize) { if (len2 < AttributeHeaderSize) {
jam(); jam();
return NdbSqlUtil::CmpUnknown; return NdbSqlUtil::CmpUnknown;
} }
len2 -= AttributeHeaderSize; len2 -= AttributeHeaderSize;
// get and skip bound type // get and skip bound type
type = data1[0]; type = boundInfo[0];
data1 += 1; boundInfo += 1;
ndbrequire(! data1.ah().isNULL()); ndbrequire(! boundInfo.ah().isNULL());
if (! data2.ah().isNULL()) { if (! entryData.ah().isNULL()) {
jam(); jam();
// current attribute // current attribute
const unsigned index = data1.ah().getAttributeId(); const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index]; const DescAttr& descAttr = descEnt.m_descAttr[index];
const unsigned typeId = descAttr.m_typeId; const unsigned typeId = descAttr.m_typeId;
ndbrequire(data2.ah().getAttributeId() == descAttr.m_primaryAttrId); ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// full data size // full data size
const unsigned size1 = data1.ah().getDataSize(); const unsigned size1 = boundInfo.ah().getDataSize();
ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize()); ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2); const unsigned size2 = min(size1, len2);
len2 -= size2; len2 -= size2;
// compare // compare
const Uint32* const p1 = &data1[AttributeHeaderSize]; const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = &data2[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize];
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2); int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
// XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError);
if (ret != 0) { if (ret != 0) {
jam(); jam();
return ret; return ret;
...@@ -209,22 +206,22 @@ Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar) ...@@ -209,22 +206,22 @@ Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar)
} else { } else {
jam(); jam();
/* /*
NULL is bigger than any bound, thus the boundary is always to the * NULL is bigger than any bound, thus the boundary is always to
left of NULL * the left of NULL.
*/ */
return -1; return -1;
} }
data1 += AttributeHeaderSize + data1.ah().getDataSize(); boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
data2 += AttributeHeaderSize + data2.ah().getDataSize(); entryData += AttributeHeaderSize + entryData.ah().getDataSize();
boundCount -= 1;
} }
ndbassert(ret == 0);
if (dir == 0) { if (dir == 0) {
jam(); jam();
/* /*
Looking for the lower bound. If strict lower bound then the boundary is * Looking for the lower bound. If strict lower bound then the
to the right of the compared key and otherwise (equal included in range) * boundary is to the right of the compared key and otherwise (equal
then the boundary is to the left of the key. * included in range) then the boundary is to the left of the key.
*/ */
if (type == 1) { if (type == 1) {
jam(); jam();
return +1; return +1;
...@@ -233,10 +230,11 @@ Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar) ...@@ -233,10 +230,11 @@ Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar)
} else { } else {
jam(); jam();
/* /*
Looking for the upper bound. If strict upper bound then the boundary is * Looking for the upper bound. If strict upper bound then the
to the left of all equal keys and otherwise (equal included in the * boundary is to the left of all equal keys and otherwise (equal
range) then the boundary is to the right of all equal keys. * included in the range) then the boundary is to the right of all
*/ * equal keys.
*/
if (type == 3) { if (type == 3) {
jam(); jam();
return -1; return -1;
...@@ -245,3 +243,67 @@ Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar) ...@@ -245,3 +243,67 @@ Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar)
} }
} }
/*
* Scan bound vs tree entry.
*/
int
Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey)
{
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// direction 0-lower 1-upper
ndbrequire(dir <= 1);
// initialize type to equality
unsigned type = 4;
while (boundCount != 0) {
// get and skip bound type
type = boundInfo[0];
boundInfo += 1;
ndbrequire(! boundInfo.ah().isNULL());
if (*entryKey != 0) {
jam();
// current attribute
const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index];
const unsigned typeId = descAttr.m_typeId;
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = *entryKey;
int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
// XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError);
if (ret != 0) {
jam();
return ret;
}
} else {
jam();
/*
* NULL is bigger than any bound, thus the boundary is always to
* the left of NULL.
*/
return -1;
}
boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
entryKey += 1;
boundCount -= 1;
}
if (dir == 0) {
// lower bound
jam();
if (type == 1) {
jam();
return +1;
}
return -1;
} else {
// upper bound
jam();
if (type == 3) {
jam();
return -1;
}
return +1;
}
}
...@@ -137,16 +137,17 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -137,16 +137,17 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
par.m_ok = false; par.m_ok = false;
} }
} }
static const char* const sep = " *** ";
// check child-parent links // check child-parent links
if (node.getLink(2) != par.m_parent) { if (node.getLink(2) != par.m_parent) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << sep;
out << "parent loc " << hex << node.getLink(2); out << "parent loc " << hex << node.getLink(2);
out << " should be " << hex << par.m_parent << endl; out << " should be " << hex << par.m_parent << endl;
} }
if (node.getSide() != par.m_side) { if (node.getSide() != par.m_side) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << sep;
out << "side " << dec << node.getSide(); out << "side " << dec << node.getSide();
out << " should be " << dec << par.m_side << endl; out << " should be " << dec << par.m_side << endl;
} }
...@@ -154,26 +155,26 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -154,26 +155,26 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
const int balance = -cpar[0].m_depth + cpar[1].m_depth; const int balance = -cpar[0].m_depth + cpar[1].m_depth;
if (node.getBalance() != balance) { if (node.getBalance() != balance) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << sep;
out << "balance " << node.getBalance(); out << "balance " << node.getBalance();
out << " should be " << balance << endl; out << " should be " << balance << endl;
} }
if (abs(node.getBalance()) > 1) { if (abs(node.getBalance()) > 1) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << sep;
out << "balance " << node.getBalance() << " is invalid" << endl; out << "balance " << node.getBalance() << " is invalid" << endl;
} }
// check occupancy // check occupancy
if (node.getOccup() > tree.m_maxOccup) { if (node.getOccup() == 0 || node.getOccup() > tree.m_maxOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << sep;
out << "occupancy " << node.getOccup(); out << "occupancy " << node.getOccup();
out << " greater than max " << tree.m_maxOccup << endl; out << " zero or greater than max " << tree.m_maxOccup << endl;
} }
// check for occupancy of interior node // check for occupancy of interior node
if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) { if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << sep;
out << "occupancy " << node.getOccup() << " of interior node"; out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl; out << " less than min " << tree.m_minOccup << endl;
} }
...@@ -183,13 +184,74 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -183,13 +184,74 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
node.getLink(1 - i) == NullTupLoc && node.getLink(1 - i) == NullTupLoc &&
node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) { node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << sep;
out << "missed merge with child " << i << endl; out << "missed merge with child " << i << endl;
} }
} }
// check inline prefix
{ ConstData data1 = node.getPref();
Uint32 data2[MaxPrefSize];
memset(data2, DataFillByte, MaxPrefSize << 2);
readKeyAttrs(frag, node.getMinMax(0), 0, c_searchKey);
copyAttrs(frag, c_searchKey, data2, tree.m_prefSize);
for (unsigned n = 0; n < tree.m_prefSize; n++) {
if (data1[n] != data2[n]) {
par.m_ok = false;
out << par.m_path << sep;
out << "inline prefix mismatch word " << n;
out << " value " << hex << data1[n];
out << " should be " << hex << data2[n] << endl;
break;
}
}
}
// check ordering within node
for (unsigned j = 1; j < node.getOccup(); j++) {
unsigned start = 0;
const TreeEnt ent1 = node.getEnt(j - 1);
const TreeEnt ent2 = node.getEnt(j);
if (j == 1) {
readKeyAttrs(frag, ent1, start, c_searchKey);
} else {
memcpy(c_searchKey, c_entryKey, frag.m_numAttrs << 2);
}
readKeyAttrs(frag, ent2, start, c_entryKey);
int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
if (ret == 0)
ret = ent1.cmp(ent2);
if (ret != -1) {
par.m_ok = false;
out << par.m_path << sep;
out << " disorder within node at pos " << j << endl;
}
}
// check ordering wrt subtrees
for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) == NullTupLoc)
continue;
const TreeEnt ent1 = cpar[i].m_minmax[1 - i];
const TreeEnt ent2 = node.getMinMax(i);
unsigned start = 0;
readKeyAttrs(frag, ent1, start, c_searchKey);
readKeyAttrs(frag, ent2, start, c_entryKey);
int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
if (ret == 0)
ret = ent1.cmp(ent2);
if (ret != (i == 0 ? -1 : +1)) {
par.m_ok = false;
out << par.m_path << sep;
out << " disorder wrt subtree " << i << endl;
}
}
// return values // return values
par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth); par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
par.m_occup = node.getOccup(); par.m_occup = node.getOccup();
for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) == NullTupLoc)
par.m_minmax[i] = node.getMinMax(i);
else
par.m_minmax[i] = cpar[i].m_minmax[i];
}
} }
NdbOut& NdbOut&
...@@ -355,20 +417,19 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node) ...@@ -355,20 +417,19 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node)
out << " [acc " << dec << node.m_acc << "]"; out << " [acc " << dec << node.m_acc << "]";
out << " [node " << *node.m_node << "]"; out << " [node " << *node.m_node << "]";
if (node.m_acc >= Dbtux::AccPref) { if (node.m_acc >= Dbtux::AccPref) {
for (unsigned i = 0; i <= 1; i++) { const Uint32* data;
out << " [pref " << dec << i; out << " [pref";
const Uint32* data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + i * tree.m_prefSize; data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize;
for (unsigned j = 0; j < node.m_frag.m_tree.m_prefSize; j++) for (unsigned j = 0; j < tree.m_prefSize; j++)
out << " " << hex << data[j]; out << " " << hex << data[j];
out << "]"; out << "]";
}
out << " [entList"; out << " [entList";
unsigned numpos = node.m_node->m_occup; unsigned numpos = node.m_node->m_occup;
if (node.m_acc < Dbtux::AccFull && numpos > 2) { if (node.m_acc < Dbtux::AccFull && numpos > 2) {
numpos = 2; numpos = 2;
out << "(" << dec << numpos << ")"; out << "(" << dec << numpos << ")";
} }
const Uint32* data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + 2 * tree.m_prefSize; data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize;
const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data; const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data;
for (unsigned pos = 0; pos < numpos; pos++) for (unsigned pos = 0; pos < numpos; pos++)
out << " " << entList[pos]; out << " " << entList[pos];
......
...@@ -26,7 +26,12 @@ Dbtux::Dbtux(const Configuration& conf) : ...@@ -26,7 +26,12 @@ Dbtux::Dbtux(const Configuration& conf) :
#ifdef VM_TRACE #ifdef VM_TRACE
debugFile(0), debugFile(0),
debugOut(*new NullOutputStream()), debugOut(*new NullOutputStream()),
// until ndb_mgm supports dump
#ifdef DBTUX_DEBUG_TREE
debugFlags(DebugTree),
#else
debugFlags(0), debugFlags(0),
#endif
#endif #endif
c_internalStartPhase(0), c_internalStartPhase(0),
c_typeOfStart(NodeState::ST_ILLEGAL_TYPE), c_typeOfStart(NodeState::ST_ILLEGAL_TYPE),
...@@ -314,6 +319,9 @@ Dbtux::copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2 ...@@ -314,6 +319,9 @@ Dbtux::copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2
keyAttrs += 1; keyAttrs += 1;
data1 += 1; data1 += 1;
} }
#ifdef VM_TRACE
memset(data2, DataFillByte, len2 << 2);
#endif
} }
BLOCK_FUNCTIONS(Dbtux); BLOCK_FUNCTIONS(Dbtux);
...@@ -110,20 +110,19 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -110,20 +110,19 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
debugOut << " entry=" << ent; debugOut << " entry=" << ent;
debugOut << endl; debugOut << endl;
} }
#endif
// find position in tree
TreePos treePos;
treeSearch(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << endl;
}
#endif #endif
// do the operation // do the operation
req->errorCode = 0; req->errorCode = 0;
TreePos treePos;
switch (opCode) { switch (opCode) {
case TuxMaintReq::OpAdd: case TuxMaintReq::OpAdd:
jam(); jam();
searchToAdd(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << endl;
}
#endif
if (treePos.m_match) { if (treePos.m_match) {
jam(); jam();
// there is no "Building" state so this will have to do // there is no "Building" state so this will have to do
...@@ -152,6 +151,12 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -152,6 +151,12 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break; break;
case TuxMaintReq::OpRemove: case TuxMaintReq::OpRemove:
jam(); jam();
searchToRemove(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << endl;
}
#endif
if (! treePos.m_match) { if (! treePos.m_match) {
jam(); jam();
// there is no "Building" state so this will have to do // there is no "Building" state so this will have to do
...@@ -167,7 +172,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -167,7 +172,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
ndbrequire(false); ndbrequire(false);
break; break;
} }
// commit and release nodes
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugTree) { if (debugFlags & DebugTree) {
printTree(signal, frag, debugOut); printTree(signal, frag, debugOut);
......
...@@ -85,10 +85,9 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc) ...@@ -85,10 +85,9 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc)
new (node.m_node) TreeNode(); new (node.m_node) TreeNode();
#ifdef VM_TRACE #ifdef VM_TRACE
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
memset(node.getPref(0), 0xa2, tree.m_prefSize << 2); memset(node.getPref(), DataFillByte, tree.m_prefSize << 2);
memset(node.getPref(1), 0xa2, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node.m_node); TreeEnt* entList = tree.getEntList(node.m_node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2)); memset(entList, NodeFillByte, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif #endif
} }
...@@ -116,12 +115,12 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node) ...@@ -116,12 +115,12 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead * attribute headers for now. XXX use null mask instead
*/ */
void void
Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i) Dbtux::setNodePref(Signal* signal, NodeHandle& node)
{ {
const Frag& frag = node.m_frag; const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
readKeyAttrs(frag, node.getMinMax(i), 0, c_entryKey); readKeyAttrs(frag, node.getMinMax(0), 0, c_entryKey);
copyAttrs(frag, c_entryKey, node.getPref(i), tree.m_prefSize); copyAttrs(frag, c_entryKey, node.getPref(), tree.m_prefSize);
} }
// node operations // node operations
...@@ -173,11 +172,9 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -173,11 +172,9 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
tmpList[pos] = ent; tmpList[pos] = ent;
entList[0] = entList[occup + 1]; entList[0] = entList[occup + 1];
node.setOccup(occup + 1); node.setOccup(occup + 1);
// fix prefixes // fix prefix
if (occup == 0 || pos == 0) if (occup == 0 || pos == 0)
setNodePref(signal, node, 0); setNodePref(signal, node);
if (occup == 0 || pos == occup)
setNodePref(signal, node, 1);
} }
/* /*
...@@ -248,11 +245,9 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -248,11 +245,9 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
} }
entList[0] = entList[occup - 1]; entList[0] = entList[occup - 1];
node.setOccup(occup - 1); node.setOccup(occup - 1);
// fix prefixes // fix prefix
if (occup != 1 && pos == 0) if (occup != 1 && pos == 0)
setNodePref(signal, node, 0); setNodePref(signal, node);
if (occup != 1 && pos == occup - 1)
setNodePref(signal, node, 1);
} }
/* /*
...@@ -325,11 +320,9 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -325,11 +320,9 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
tmpList[pos] = ent; tmpList[pos] = ent;
ent = oldMin; ent = oldMin;
entList[0] = entList[occup]; entList[0] = entList[occup];
// fix prefixes // fix prefix
if (true) if (true)
setNodePref(signal, node, 0); setNodePref(signal, node);
if (occup == 1 || pos == occup - 1)
setNodePref(signal, node, 1);
} }
/* /*
...@@ -403,11 +396,9 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -403,11 +396,9 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
} }
tmpList[0] = newMin; tmpList[0] = newMin;
entList[0] = entList[occup]; entList[0] = entList[occup];
// fix prefixes // fix prefix
if (true) if (true)
setNodePref(signal, node, 0); setNodePref(signal, node);
if (occup == 1 || pos == occup - 1)
setNodePref(signal, node, 1);
} }
/* /*
......
...@@ -689,16 +689,9 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -689,16 +689,9 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
if (tree.m_root == NullTupLoc) { // set up index keys for this operation
// tree may have become empty setKeyAttrs(frag);
jam(); // unpack lower bound into c_dataBuffer
scan.m_state = ScanOp::Last;
return;
}
TreePos pos;
pos.m_loc = tree.m_root;
NodeHandle node(frag);
// unpack lower bound
const ScanBound& bound = *scan.m_bound[0]; const ScanBound& bound = *scan.m_bound[0];
ScanBoundIterator iter; ScanBoundIterator iter;
bound.first(iter); bound.first(iter);
...@@ -707,103 +700,22 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -707,103 +700,22 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
c_dataBuffer[j] = *iter.data; c_dataBuffer[j] = *iter.data;
bound.next(iter); bound.next(iter);
} }
// comparison parameters // search for scan start position
BoundPar boundPar; TreePos treePos;
boundPar.m_data1 = c_dataBuffer; searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
boundPar.m_count1 = scan.m_boundCnt[0]; if (treePos.m_loc == NullTupLoc) {
boundPar.m_dir = 0; // empty tree
loop: {
jam(); jam();
selectNode(signal, node, pos.m_loc, AccPref); scan.m_state = ScanOp::Last;
const unsigned occup = node.getOccup(); return;
ndbrequire(occup != 0);
for (unsigned i = 0; i <= 1; i++) {
jam();
// compare prefix
boundPar.m_data2 = node.getPref(i);
boundPar.m_len2 = tree.m_prefSize;
int ret = cmpScanBound(frag, boundPar);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read full value
ReadPar readPar;
readPar.m_ent = node.getMinMax(i);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare full value
boundPar.m_data2 = readPar.m_data;
boundPar.m_len2 = ZNIL; // big
ret = cmpScanBound(frag, boundPar);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (i == 0 && ret < 0) {
jam();
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
pos.m_loc = loc;
goto loop;
}
// start scanning this node
pos.m_pos = 0;
pos.m_match = false;
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(node, scanPtr);
return;
}
if (i == 1 && ret > 0) {
jam();
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to right subtree
pos.m_loc = loc;
goto loop;
}
// start scanning upwards
pos.m_dir = 1;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(node, scanPtr);
return;
}
}
// read rest of current node
accessNode(signal, node, AccFull);
// look for first entry
ndbrequire(occup >= 2);
for (unsigned j = 1; j < occup; j++) {
jam();
ReadPar readPar;
readPar.m_ent = node.getEnt(j);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare
boundPar.m_data2 = readPar.m_data;
boundPar.m_len2 = ZNIL; // big
int ret = cmpScanBound(frag, boundPar);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
jam();
// start scanning this node
pos.m_pos = j;
pos.m_match = false;
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(node, scanPtr);
return;
}
}
ndbrequire(false);
} }
// set position and state
scan.m_scanPos = treePos;
scan.m_state = ScanOp::Next;
// link the scan to node found
NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc, AccFull);
linkScan(node, scanPtr);
} }
/* /*
...@@ -841,7 +753,9 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -841,7 +753,9 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_accLockOp = RNIL; scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current; scan.m_state = ScanOp::Current;
} }
// unpack upper bound // set up index keys for this operation
setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer
const ScanBound& bound = *scan.m_bound[1]; const ScanBound& bound = *scan.m_bound[1];
ScanBoundIterator iter; ScanBoundIterator iter;
bound.first(iter); bound.first(iter);
...@@ -850,11 +764,6 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -850,11 +764,6 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
c_dataBuffer[j] = *iter.data; c_dataBuffer[j] = *iter.data;
bound.next(iter); bound.next(iter);
} }
// comparison parameters
BoundPar boundPar;
boundPar.m_data1 = c_dataBuffer;
boundPar.m_count1 = scan.m_boundCnt[1];
boundPar.m_dir = 1;
// use copy of position // use copy of position
TreePos pos = scan.m_scanPos; TreePos pos = scan.m_scanPos;
// get and remember original node // get and remember original node
...@@ -912,17 +821,9 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -912,17 +821,9 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
jam(); jam();
pos.m_ent = node.getEnt(pos.m_pos); pos.m_ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged pos.m_dir = 3; // unchanged
// XXX implement prefix optimization // read and compare all attributes
ReadPar readPar; readKeyAttrs(frag, pos.m_ent, 0, c_entryKey);
readPar.m_ent = pos.m_ent; int ret = cmpScanBound(frag, 1, c_dataBuffer, scan.m_boundCnt[1], c_entryKey);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare
boundPar.m_data2 = readPar.m_data;
boundPar.m_len2 = ZNIL; // big
int ret = cmpScanBound(frag, boundPar);
ndbrequire(ret != NdbSqlUtil::CmpUnknown); ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) { if (ret < 0) {
jam(); jam();
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define DBTUX_SEARCH_CPP
#include "Dbtux.hpp"
/*
* Search for entry to add.
*
* Similar to searchToRemove (see below).
*
* TODO optimize for initial equal attrs in node min/max
*/
void
Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
/*
* In order to not (yet) change old behaviour, a position between
* 2 nodes returns the one at the bottom of the tree.
*/
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
unsigned start = 0;
ret = cmpSearchKey(frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
ndbrequire(start < numAttrs);
readKeyAttrs(frag, currNode.getMinMax(0), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getMinMax(0));
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b but remember the bottom node
bottomNode = currNode;
currNode = glbNode;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = true;
return;
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
// read and compare attributes
unsigned start = 0;
readKeyAttrs(frag, currNode.getEnt(j), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getEnt(j));
}
if (ret <= 0) {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = (ret == 0);
return;
}
}
if (! bottomNode.isNull()) {
jam();
// backwards compatible for now
treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
return;
}
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup();
treePos.m_match = false;
}
/*
* Search for entry to remove.
*
* Compares search key to each node min. A move to right subtree can
* overshoot target node. The last such node is saved. The final node
* is a half-leaf or leaf. If search key is less than final node min
* then the saved node is the g.l.b of the final node and we move back
* to it.
*/
void
Dbtux::searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
unsigned start = 0;
ret = cmpSearchKey(frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
ndbrequire(start < numAttrs);
readKeyAttrs(frag, currNode.getMinMax(0), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getMinMax(0));
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b
currNode = glbNode;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = true;
return;
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
// pos 0 was handled above
for (unsigned j = 1, occup = currNode.getOccup(); j < occup; j++) {
jam();
// compare only the entry
if (searchEnt.eq(currNode.getEnt(j))) {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = true;
return;
}
}
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup();
treePos.m_match = false;
}
/*
* Search for scan start position.
*
* Similar to searchToAdd.
*/
void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare all attributes
readKeyAttrs(frag, currNode.getMinMax(0), 0, c_entryKey);
ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b but remember the bottom node
bottomNode = currNode;
currNode = glbNode;
} else {
// start scanning this node
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
ndbassert(false);
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
// read and compare attributes
readKeyAttrs(frag, currNode.getEnt(j), 0, c_entryKey);
ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
// start scanning from current entry
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
}
if (! bottomNode.isNull()) {
jam();
// start scanning the l.u.b
treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
// start scanning upwards (pretend we came from right child)
treePos.m_loc = currNode.m_loc;
treePos.m_dir = 1;
}
...@@ -17,112 +17,6 @@ ...@@ -17,112 +17,6 @@
#define DBTUX_TREE_CPP #define DBTUX_TREE_CPP
#include "Dbtux.hpp" #include "Dbtux.hpp"
/*
* Search for entry.
*
* Search key is index attribute data and tree entry value. Start from
* root node and compare the key to min/max of each node. Use linear
* search on the final (bounding) node. Initial attributes which are
* same in min/max need not be checked.
*/
void
Dbtux::treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
treePos.m_loc = tree.m_root;
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_pos = 0;
treePos.m_match = false;
return;
}
NodeHandle node(frag);
loop: {
jam();
selectNode(signal, node, treePos.m_loc, AccPref);
const unsigned occup = node.getOccup();
ndbrequire(occup != 0);
// number of equal initial attributes in bounding node
unsigned start = ZNIL;
for (unsigned i = 0; i <= 1; i++) {
jam();
unsigned start1 = 0;
// compare prefix
int ret = cmpSearchKey(frag, start1, searchKey, node.getPref(i), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
readKeyAttrs(frag, node.getMinMax(i), start1, c_entryKey);
ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (start > start1)
start = start1;
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(node.getMinMax(i));
}
if (i == 0 ? (ret < 0) : (ret > 0)) {
jam();
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to left/right subtree
treePos.m_loc = loc;
goto loop;
}
// position is immediately before/after this node
treePos.m_pos = (i == 0 ? 0 : occup);
treePos.m_match = false;
return;
}
if (ret == 0) {
jam();
// position is at first/last entry
treePos.m_pos = (i == 0 ? 0 : occup - 1);
treePos.m_match = true;
return;
}
}
// access rest of the bounding node
accessNode(signal, node, AccFull);
// position is strictly within the node
ndbrequire(occup >= 2);
const unsigned numWithin = occup - 2;
for (unsigned j = 1; j <= numWithin; j++) {
jam();
int ret = 0;
if (start < numAttrs) {
jam();
// read and compare remaining attributes
unsigned start1 = start;
readKeyAttrs(frag, node.getEnt(j), start1, c_entryKey);
ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(node.getEnt(j));
}
if (ret <= 0) {
jam();
// position is before or at this entry
treePos.m_pos = j;
treePos.m_match = (ret == 0);
return;
}
}
// position is before last entry
treePos.m_pos = occup - 1;
treePos.m_match = false;
return;
}
}
/* /*
* Add entry. * Add entry.
*/ */
......
...@@ -7,6 +7,7 @@ libdbtux_a_SOURCES = \ ...@@ -7,6 +7,7 @@ libdbtux_a_SOURCES = \
DbtuxNode.cpp \ DbtuxNode.cpp \
DbtuxTree.cpp \ DbtuxTree.cpp \
DbtuxScan.cpp \ DbtuxScan.cpp \
DbtuxSearch.cpp \
DbtuxCmp.cpp \ DbtuxCmp.cpp \
DbtuxDebug.cpp DbtuxDebug.cpp
......
...@@ -49,4 +49,7 @@ optim 10 mc02/a 44 ms 65 ms 46 pct ...@@ -49,4 +49,7 @@ optim 10 mc02/a 44 ms 65 ms 46 pct
optim 11 mc02/a 43 ms 63 ms 46 pct optim 11 mc02/a 43 ms 63 ms 46 pct
mc02/b 52 ms 86 ms 63 pct mc02/b 52 ms 86 ms 63 pct
optim 12 mc02/a 38 ms 55 ms 43 pct
mc02/b 47 ms 77 ms 63 pct
vim: set et: vim: set et:
...@@ -2525,7 +2525,7 @@ tbusybuild(Par par) ...@@ -2525,7 +2525,7 @@ tbusybuild(Par par)
for (unsigned i = 0; i < par.m_subloop; i++) { for (unsigned i = 0; i < par.m_subloop; i++) {
RUNSTEP(par, pkupdateindexbuild, MT); RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, MT); RUNSTEP(par, readverify, ST);
RUNSTEP(par, dropindex, ST); RUNSTEP(par, dropindex, ST);
} }
return 0; return 0;
...@@ -2564,9 +2564,11 @@ ttimemaint(Par par) ...@@ -2564,9 +2564,11 @@ ttimemaint(Par par)
t1.off(par.m_totrows); t1.off(par.m_totrows);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
t2.on(); t2.on();
RUNSTEP(par, pkupdate, MT); RUNSTEP(par, pkupdate, MT);
t2.off(par.m_totrows); t2.off(par.m_totrows);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, dropindex, ST); RUNSTEP(par, dropindex, ST);
} }
LL1("update - " << t1.time()); LL1("update - " << t1.time());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment