Commit cb048699 authored by unknown's avatar unknown

tux optim 6 - remove node cache

parent becde519
......@@ -102,11 +102,6 @@ private:
// sizes are in words (Uint32)
static const unsigned MaxIndexFragments = 2 * NO_OF_FRAG_PER_NODE;
static const unsigned MaxIndexAttributes = MAX_ATTRIBUTES_IN_INDEX;
#ifdef VM_TRACE
static const unsigned MaxNodeHandles = 10000; // More space for printTree
#else
static const unsigned MaxNodeHandles = 128; // enough for 1 operation
#endif
static const unsigned MaxAttrDataSize = 2048;
public:
static const unsigned DescPageSize = 256;
......@@ -179,7 +174,7 @@ private:
};
/*
* There is no const variable NullTupLoc since the compiler may not be
* There is no const member NullTupLoc since the compiler may not be
* able to optimize it to TupLoc() constants. Instead null values are
* constructed on the stack with TupLoc().
*/
......@@ -462,8 +457,7 @@ private:
Uint16 m_descOff;
Uint16 m_numAttrs;
TreeHead m_tree;
Uint32 m_nodeList; // node cache of current operation
Uint32 m_nodeFree; // one node pre-allocated for insert
TupLoc m_freeLoc; // one node pre-allocated for insert
DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2];
......@@ -498,9 +492,8 @@ private:
// node handles
/*
* A tree operation builds a cache of accessed nodes. This allows
* different implementations of index memory access. The cache is
* committed and released at the end of the operation.
* A node handle is a reference to a tree node in TUP. It is used to
* operate on the node. Node handles are allocated on the stack.
*/
struct NodeHandle;
friend struct NodeHandle;
......@@ -509,11 +502,9 @@ private:
TupLoc m_loc; // physical node address
TreeNode* m_node; // pointer to node storage
AccSize m_acc; // accessed size
union {
Uint32 m_next; // next active node under fragment
Uint32 nextPool;
};
NodeHandle(Frag& frag);
NodeHandle(const NodeHandle& node);
NodeHandle& operator=(const NodeHandle& node);
// getters
TupLoc getLink(unsigned i);
unsigned getChilds(); // cannot spell
......@@ -521,20 +512,19 @@ private:
unsigned getOccup();
int getBalance();
Uint32 getNodeScan();
Data getPref(unsigned i);
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// setters
void setLink(unsigned i, TupLoc loc);
void setSide(unsigned i);
void setOccup(unsigned n);
void setBalance(int b);
void setNodeScan(Uint32 scanPtrI);
// access other parts of the node
Data getPref(unsigned i);
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// for ndbrequire and ndbassert
void progError(int line, int cause, const char* file);
};
typedef Ptr<NodeHandle> NodeHandlePtr;
ArrayPool<NodeHandle> c_nodeHandlePool;
// parameters for methods
......@@ -565,17 +555,6 @@ private:
ReadPar();
};
/*
* Node storage operation.
*/
struct StorePar {
TupStoreTh::OpCode m_opCode;// operation code
unsigned m_offset; // data offset in words
unsigned m_size; // number of words
Uint32 m_errorCode; // terrorCode from TUP
StorePar();
};
/*
* Tree search for entry.
*/
......@@ -649,15 +628,12 @@ private:
/*
* DbtuxNode.cpp
*/
void seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr);
void preallocNode(Signal* signal, Frag& frag, Uint32& errorCode);
void findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc);
void selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc, AccSize acc);
void insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
void deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr);
void accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
int allocNode(Signal* signal, NodeHandle& node);
void accessNode(Signal* signal, NodeHandle& node, AccSize acc);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc);
void insertNode(Signal* signal, NodeHandle& node, AccSize acc);
void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node, unsigned i);
void commitNodes(Signal* signal, Frag& frag, bool updateOk);
// node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
......@@ -675,8 +651,8 @@ private:
void treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& treePos);
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
/*
* DbtuxScan.cpp
......@@ -1054,8 +1030,7 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) :
m_descOff(0),
m_numAttrs(ZNIL),
m_tree(),
m_nodeList(RNIL),
m_nodeFree(RNIL),
m_freeLoc(),
m_scanList(scanOpPool),
m_tupIndexFragPtrI(RNIL)
{
......@@ -1086,11 +1061,29 @@ Dbtux::NodeHandle::NodeHandle(Frag& frag) :
m_frag(frag),
m_loc(),
m_node(0),
m_acc(AccNone),
m_next(RNIL)
m_acc(AccNone)
{
}
inline
Dbtux::NodeHandle::NodeHandle(const NodeHandle& node) :
m_frag(node.m_frag),
m_loc(node.m_loc),
m_node(node.m_node),
m_acc(node.m_acc)
{
}
inline Dbtux::NodeHandle&
Dbtux::NodeHandle::operator=(const NodeHandle& node)
{
ndbassert(&m_frag == &node.m_frag);
m_loc = node.m_loc;
m_node = node.m_node;
m_acc = node.m_acc;
return *this;
}
inline Dbtux::TupLoc
Dbtux::NodeHandle::getLink(unsigned i)
{
......@@ -1128,37 +1121,6 @@ Dbtux::NodeHandle::getNodeScan()
return m_node->m_nodeScan;
}
inline Dbtux::Data
Dbtux::NodeHandle::getPref(unsigned i)
{
TreeHead& tree = m_frag.m_tree;
ndbrequire(m_acc >= AccPref && i <= 1);
return tree.getPref(m_node, i);
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getEnt(unsigned pos)
{
TreeHead& tree = m_frag.m_tree;
TreeEnt* entList = tree.getEntList(m_node);
const unsigned occup = m_node->m_occup;
ndbrequire(pos < occup);
if (pos == 0 || pos == occup - 1) {
ndbrequire(m_acc >= AccPref)
} else {
ndbrequire(m_acc == AccFull)
}
return entList[(1 + pos) % occup];
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getMinMax(unsigned i)
{
const unsigned occup = m_node->m_occup;
ndbrequire(i <= 1 && occup != 0);
return getEnt(i == 0 ? 0 : occup - 1);
}
inline void
Dbtux::NodeHandle::setLink(unsigned i, TupLoc loc)
{
......@@ -1195,6 +1157,37 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI)
m_node->m_nodeScan = scanPtrI;
}
inline Dbtux::Data
Dbtux::NodeHandle::getPref(unsigned i)
{
TreeHead& tree = m_frag.m_tree;
ndbrequire(m_acc >= AccPref && i <= 1);
return tree.getPref(m_node, i);
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getEnt(unsigned pos)
{
TreeHead& tree = m_frag.m_tree;
TreeEnt* entList = tree.getEntList(m_node);
const unsigned occup = m_node->m_occup;
ndbrequire(pos < occup);
if (pos == 0 || pos == occup - 1) {
ndbrequire(m_acc >= AccPref)
} else {
ndbrequire(m_acc == AccFull)
}
return entList[(1 + pos) % occup];
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getMinMax(unsigned i)
{
const unsigned occup = m_node->m_occup;
ndbrequire(i <= 1 && occup != 0);
return getEnt(i == 0 ? 0 : occup - 1);
}
// parameters for methods
inline
......@@ -1217,15 +1210,6 @@ Dbtux::ReadPar::ReadPar() :
{
}
inline
Dbtux::StorePar::StorePar() :
m_opCode(TupStoreTh::OpUndefined),
m_offset(0),
m_size(0),
m_errorCode(0)
{
}
inline
Dbtux::SearchPar::SearchPar() :
m_data(0),
......
......@@ -106,13 +106,11 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
signal->theData[1] = 1;
execDUMP_STATE_ORD(signal);
if (debugFile != 0) {
commitNodes(signal, frag, false);
printTree(signal, frag, debugOut);
}
}
ndbrequire(false);
}
commitNodes(signal, frag, false);
}
void
......@@ -123,9 +121,9 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
return;
}
TreeHead& tree = frag.m_tree;
NodeHandlePtr nodePtr;
selectNode(signal, frag, nodePtr, loc, AccFull);
out << par.m_path << " " << *nodePtr.p << endl;
NodeHandle node(frag);
selectNode(signal, node, loc, AccFull);
out << par.m_path << " " << node << endl;
// check children
PrintPar cpar[2];
ndbrequire(strlen(par.m_path) + 1 < sizeof(par.m_path));
......@@ -134,56 +132,56 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i;
cpar[i].m_depth = 0;
cpar[i].m_parent = loc;
printNode(signal, frag, out, nodePtr.p->getLink(i), cpar[i]);
printNode(signal, frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) {
par.m_ok = false;
}
}
// check child-parent links
if (nodePtr.p->getLink(2) != par.m_parent) {
if (node.getLink(2) != par.m_parent) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "parent loc " << hex << nodePtr.p->getLink(2);
out << "parent loc " << hex << node.getLink(2);
out << " should be " << hex << par.m_parent << endl;
}
if (nodePtr.p->getSide() != par.m_side) {
if (node.getSide() != par.m_side) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "side " << dec << nodePtr.p->getSide();
out << "side " << dec << node.getSide();
out << " should be " << dec << par.m_side << endl;
}
// check balance
const int balance = -cpar[0].m_depth + cpar[1].m_depth;
if (nodePtr.p->getBalance() != balance) {
if (node.getBalance() != balance) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "balance " << nodePtr.p->getBalance();
out << "balance " << node.getBalance();
out << " should be " << balance << endl;
}
if (abs(nodePtr.p->getBalance()) > 1) {
if (abs(node.getBalance()) > 1) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "balance " << nodePtr.p->getBalance() << " is invalid" << endl;
out << "balance " << node.getBalance() << " is invalid" << endl;
}
// check occupancy
if (nodePtr.p->getOccup() > tree.m_maxOccup) {
if (node.getOccup() > tree.m_maxOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "occupancy " << nodePtr.p->getOccup();
out << "occupancy " << node.getOccup();
out << " greater than max " << tree.m_maxOccup << endl;
}
// check for occupancy of interior node
if (nodePtr.p->getChilds() == 2 && nodePtr.p->getOccup() < tree.m_minOccup) {
if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "occupancy " << nodePtr.p->getOccup() << " of interior node";
out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl;
}
// check missed half-leaf/leaf merge
for (unsigned i = 0; i <= 1; i++) {
if (nodePtr.p->getLink(i) != NullTupLoc &&
nodePtr.p->getLink(1 - i) == NullTupLoc &&
nodePtr.p->getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
if (node.getLink(i) != NullTupLoc &&
node.getLink(1 - i) == NullTupLoc &&
node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "missed merge with child " << i << endl;
......@@ -191,7 +189,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
}
// return values
par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
par.m_occup = nodePtr.p->getOccup();
par.m_occup = node.getOccup();
}
NdbOut&
......
......@@ -178,7 +178,6 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
c_fragPool.setSize(nFragment);
c_descPagePool.setSize(nDescPage);
c_fragOpPool.setSize(MaxIndexFragments);
c_nodeHandlePool.setSize(MaxNodeHandles);
c_scanOpPool.setSize(nScanOp);
c_scanBoundPool.setSize(nScanBoundWords);
/*
......
......@@ -72,7 +72,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
}
ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
// set up index entry
TreeEnt ent;
ent.m_tupAddr = req->tupAddr;
......@@ -143,17 +142,18 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
}
/*
* At most one new node is inserted in the operation. We keep one
* free node pre-allocated so the operation cannot fail. This also
* gives a real TupAddr for links to the new node.
* free node pre-allocated so the operation cannot fail.
*/
if (frag.m_nodeFree == RNIL) {
if (frag.m_freeLoc == NullTupLoc) {
jam();
preallocNode(signal, frag, req->errorCode);
NodeHandle node(frag);
req->errorCode = allocNode(signal, node);
if (req->errorCode != 0) {
jam();
break;
}
ndbrequire(frag.m_nodeFree != RNIL);
frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc);
}
treeAdd(signal, frag, treePos, ent);
break;
......@@ -175,7 +175,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
}
// commit and release nodes
commitNodes(signal, frag, req->errorCode == 0);
#ifdef VM_TRACE
if (debugFlags & DebugTree) {
printTree(signal, frag, debugOut);
......
......@@ -316,12 +316,6 @@ Dbtux::dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 sen
unsigned i = --indexPtr.p->m_numFrags;
FragPtr fragPtr;
c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
if (frag.m_nodeFree != RNIL) {
c_nodeHandlePool.release(frag.m_nodeFree);
frag.m_nodeFree = RNIL;
}
c_fragPool.release(fragPtr);
// the real time break is not used for anything currently
signal->theData[0] = TuxContinueB::DropIndex;
......
......@@ -18,161 +18,94 @@
#include "Dbtux.hpp"
/*
* Node handles.
*
* Temporary version between "cache" and "pointer" implementations.
* Allocate index node in TUP.
*/
// Dbtux
void
Dbtux::seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
{
if (! c_nodeHandlePool.seize(nodePtr)) {
jam();
return;
}
new (nodePtr.p) NodeHandle(frag);
nodePtr.p->m_next = frag.m_nodeList;
frag.m_nodeList = nodePtr.i;
}
void
Dbtux::preallocNode(Signal* signal, Frag& frag, Uint32& errorCode)
int
Dbtux::allocNode(Signal* signal, NodeHandle& node)
{
ndbrequire(frag.m_nodeFree == RNIL);
NodeHandlePtr nodePtr;
seizeNode(signal, frag, nodePtr);
ndbrequire(nodePtr.i != RNIL);
// remove from cache XXX ugly
frag.m_nodeFree = frag.m_nodeList;
frag.m_nodeList = nodePtr.p->m_next;
// alloc index node in TUP
Frag& frag = node.m_frag;
Uint32 pageId = NullTupLoc.m_pageId;
Uint32 pageOffset = NullTupLoc.m_pageOffset;
Uint32* node32 = 0;
errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
if (errorCode != 0) {
int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
if (errorCode == 0) {
jam();
c_nodeHandlePool.release(nodePtr);
frag.m_nodeFree = RNIL;
return;
node.m_loc = TupLoc(pageId, pageOffset);
node.m_node = reinterpret_cast<TreeNode*>(node32);
node.m_acc = AccNone;
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
}
nodePtr.p->m_loc = TupLoc(pageId, pageOffset);
nodePtr.p->m_node = reinterpret_cast<TreeNode*>(node32);
ndbrequire(nodePtr.p->m_loc != NullTupLoc && nodePtr.p->m_node != 0);
new (nodePtr.p->m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
TreeNode* node = nodePtr.p->m_node;
memset(tree.getPref(node, 0), 0xa2, tree.m_prefSize << 2);
memset(tree.getPref(node, 1), 0xa2, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
return errorCode;
}
/*
* Find node in the cache. XXX too slow, use direct links instead
* Access more of the node.
*/
void
Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc)
Dbtux::accessNode(Signal* signal, NodeHandle& node, AccSize acc)
{
NodeHandlePtr tmpPtr;
tmpPtr.i = frag.m_nodeList;
while (tmpPtr.i != RNIL) {
jam();
c_nodeHandlePool.getPtr(tmpPtr);
if (tmpPtr.p->m_loc == loc) {
jam();
nodePtr = tmpPtr;
return;
}
tmpPtr.i = tmpPtr.p->m_next;
}
nodePtr.i = RNIL;
nodePtr.p = 0;
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
if (node.m_acc >= acc)
return;
// XXX could do prefetch
node.m_acc = acc;
}
/*
* Get handle for existing node.
* Set handle to point to existing node.
*/
void
Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc, AccSize acc)
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc)
{
ndbrequire(loc != NullTupLoc && acc > AccNone);
NodeHandlePtr tmpPtr;
// search in cache
findNode(signal, frag, tmpPtr, loc);
if (tmpPtr.i == RNIL) {
jam();
// add new node
seizeNode(signal, frag, tmpPtr);
ndbrequire(tmpPtr.i != RNIL);
tmpPtr.p->m_loc = loc;
Uint32 pageId = loc.m_pageId;
Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = 0;
c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
tmpPtr.p->m_node = reinterpret_cast<TreeNode*>(node32);
ndbrequire(tmpPtr.p->m_loc != NullTupLoc && tmpPtr.p->m_node != 0);
}
if (tmpPtr.p->m_acc < acc) {
jam();
accessNode(signal, frag, tmpPtr, acc);
}
nodePtr = tmpPtr;
Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc);
Uint32 pageId = loc.m_pageId;
Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = 0;
c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
node.m_loc = loc;
node.m_node = reinterpret_cast<TreeNode*>(node32);
node.m_acc = AccNone;
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
accessNode(signal, node, acc);
}
/*
* Create new node in the cache using the pre-allocated node.
* Set handle to point to new node. Uses the pre-allocated node.
*/
void
Dbtux::insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc)
Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc)
{
ndbrequire(acc > AccNone);
NodeHandlePtr tmpPtr;
// use the pre-allocated node
tmpPtr.i = frag.m_nodeFree;
frag.m_nodeFree = RNIL;
c_nodeHandlePool.getPtr(tmpPtr);
// move it to the cache
tmpPtr.p->m_next = frag.m_nodeList;
frag.m_nodeList = tmpPtr.i;
tmpPtr.p->m_acc = acc;
nodePtr = tmpPtr;
Frag& frag = node.m_frag;
TupLoc loc = frag.m_freeLoc;
frag.m_freeLoc = NullTupLoc;
selectNode(signal, node, loc, acc);
new (node.m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
memset(tree.getPref(node.m_node, 0), 0xa2, tree.m_prefSize << 2);
memset(tree.getPref(node.m_node, 1), 0xa2, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node.m_node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
}
/*
* Delete existing node.
*/
void
Dbtux::deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
Dbtux::deleteNode(Signal* signal, NodeHandle& node)
{
NodeHandlePtr tmpPtr = nodePtr;
ndbrequire(tmpPtr.p->getOccup() == 0);
Uint32 pageId = tmpPtr.p->m_loc.m_pageId;
Uint32 pageOffset = tmpPtr.p->m_loc.m_pageOffset;
Uint32* node32 = reinterpret_cast<Uint32*>(tmpPtr.p->m_node);
Frag& frag = node.m_frag;
ndbrequire(node.getOccup() == 0);
TupLoc loc = node.m_loc;
Uint32 pageId = loc.m_pageId;
Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
// invalidate handle and storage
tmpPtr.p->m_loc = NullTupLoc;
tmpPtr.p->m_node = 0;
// scans have already been moved by nodePopDown or nodePopUp
}
/*
* Access more of the node.
*/
void
Dbtux::accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc)
{
NodeHandlePtr tmpPtr = nodePtr;
ndbrequire(tmpPtr.p->m_loc != NullTupLoc && tmpPtr.p->m_node != 0);
if (tmpPtr.p->m_acc >= acc)
return;
// XXX could do prefetch
tmpPtr.p->m_acc = acc;
node.m_loc = NullTupLoc;
node.m_node = 0;
}
/*
......@@ -201,25 +134,6 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
copyAttrs(pref, readPar.m_data, copyPar);
}
/*
* Commit and release nodes at the end of an operation. Used also on
* error since no changes have been made (updateOk false).
*/
void
Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
{
NodeHandlePtr nodePtr;
nodePtr.i = frag.m_nodeList;
frag.m_nodeList = RNIL;
while (nodePtr.i != RNIL) {
c_nodeHandlePool.getPtr(nodePtr);
// release
NodeHandlePtr tmpPtr = nodePtr;
nodePtr.i = nodePtr.p->m_next;
c_nodeHandlePool.release(tmpPtr);
}
}
// node operations
/*
......
......@@ -42,7 +42,6 @@ Dbtux::execACC_SCANREQ(Signal* signal)
}
ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
// must be normal DIH/TC fragment
ndbrequire(frag.m_fragId < (1 << frag.m_fragOff));
TreeHead& tree = frag.m_tree;
......@@ -241,7 +240,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl;
}
#endif
ndbrequire(frag.m_nodeList == RNIL);
// handle unlock previous and close scan
switch (req->scanFlag) {
case NextScanReq::ZSCAN_NEXT:
......@@ -278,9 +276,9 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
if (scan.m_scanPos.m_loc != NullTupLoc) {
jam();
const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandlePtr nodePtr;
selectNode(signal, frag, nodePtr, loc, AccHead);
unlinkScan(*nodePtr.p, scanPtr);
NodeHandle node(frag);
selectNode(signal, node, loc, AccHead);
unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc;
}
if (scan.m_lockwait) {
......@@ -295,7 +293,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting;
commitNodes(signal, frag, true);
return;
}
if (scan.m_state == ScanOp::Locked) {
......@@ -350,7 +347,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry();
commitNodes(signal, frag, true);
return; // stop
}
if (scan.m_lockwait) {
......@@ -365,7 +361,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
// if TC has ordered scan close, it will be detected here
sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
commitNodes(signal, frag, true);
return; // stop
}
if (scan.m_state == ScanOp::First) {
......@@ -444,7 +439,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry();
commitNodes(signal, frag, true);
return; // stop
break;
case AccLockReq::Refused:
......@@ -457,7 +451,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry();
commitNodes(signal, frag, true);
return; // stop
break;
case AccLockReq::NoFreeOp:
......@@ -470,7 +463,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry();
commitNodes(signal, frag, true);
return; // stop
break;
default:
......@@ -554,7 +546,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
scan.m_lastEnt = ent;
// next time look for next entry
scan.m_state = ScanOp::Next;
commitNodes(signal, frag, true);
return;
}
// XXX in ACC this is checked before req->checkLcpStop
......@@ -568,7 +559,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
unsigned signalLength = 3;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
commitNodes(signal, frag, true);
return;
}
ndbrequire(false);
......@@ -707,7 +697,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
}
TreePos pos;
pos.m_loc = tree.m_root;
NodeHandlePtr nodePtr;
NodeHandle node(frag);
// unpack lower bound
const ScanBound& bound = *scan.m_bound[0];
ScanBoundIterator iter;
......@@ -724,20 +714,20 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
boundPar.m_dir = 0;
loop: {
jam();
selectNode(signal, frag, nodePtr, pos.m_loc, AccPref);
const unsigned occup = nodePtr.p->getOccup();
selectNode(signal, node, pos.m_loc, AccPref);
const unsigned occup = node.getOccup();
ndbrequire(occup != 0);
for (unsigned i = 0; i <= 1; i++) {
jam();
// compare prefix
boundPar.m_data2 = nodePtr.p->getPref(i);
boundPar.m_data2 = node.getPref(i);
boundPar.m_len2 = tree.m_prefSize;
int ret = cmpScanBound(frag, boundPar);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read full value
ReadPar readPar;
readPar.m_ent = nodePtr.p->getMinMax(i);
readPar.m_ent = node.getMinMax(i);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
......@@ -750,7 +740,7 @@ loop: {
}
if (i == 0 && ret < 0) {
jam();
const TupLoc loc = nodePtr.p->getLink(i);
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
......@@ -763,12 +753,12 @@ loop: {
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(*nodePtr.p, scanPtr);
linkScan(node, scanPtr);
return;
}
if (i == 1 && ret > 0) {
jam();
const TupLoc loc = nodePtr.p->getLink(i);
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to right subtree
......@@ -779,18 +769,18 @@ loop: {
pos.m_dir = 1;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(*nodePtr.p, scanPtr);
linkScan(node, scanPtr);
return;
}
}
// read rest of current node
accessNode(signal, frag, nodePtr, AccFull);
accessNode(signal, node, AccFull);
// look for first entry
ndbrequire(occup >= 2);
for (unsigned j = 1; j < occup; j++) {
jam();
ReadPar readPar;
readPar.m_ent = nodePtr.p->getEnt(j);
readPar.m_ent = node.getEnt(j);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
......@@ -808,7 +798,7 @@ loop: {
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(*nodePtr.p, scanPtr);
linkScan(node, scanPtr);
return;
}
}
......@@ -868,11 +858,11 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
// use copy of position
TreePos pos = scan.m_scanPos;
// get and remember original node
NodeHandlePtr origNodePtr;
selectNode(signal, frag, origNodePtr, pos.m_loc, AccHead);
ndbrequire(islinkScan(*origNodePtr.p, scanPtr));
NodeHandle origNode(frag);
selectNode(signal, origNode, pos.m_loc, AccHead);
ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop
NodeHandlePtr nodePtr = origNodePtr;
NodeHandle node = origNode;
while (true) {
jam();
if (pos.m_dir == 2) {
......@@ -882,14 +872,14 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Last;
break;
}
if (nodePtr.p->m_loc != pos.m_loc) {
if (node.m_loc != pos.m_loc) {
jam();
selectNode(signal, frag, nodePtr, pos.m_loc, AccHead);
selectNode(signal, node, pos.m_loc, AccHead);
}
if (pos.m_dir == 4) {
// coming down from parent proceed to left child
jam();
TupLoc loc = nodePtr.p->getLink(0);
TupLoc loc = node.getLink(0);
if (loc != NullTupLoc) {
jam();
pos.m_loc = loc;
......@@ -909,10 +899,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pos.m_dir == 3) {
// within node
jam();
unsigned occup = nodePtr.p->getOccup();
unsigned occup = node.getOccup();
ndbrequire(occup >= 1);
// access full node
accessNode(signal, frag, nodePtr, AccFull);
accessNode(signal, node, AccFull);
// advance position
if (! pos.m_match)
pos.m_match = true;
......@@ -920,7 +910,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
pos.m_pos++;
if (pos.m_pos < occup) {
jam();
pos.m_ent = nodePtr.p->getEnt(pos.m_pos);
pos.m_ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged
// XXX implement prefix optimization
ReadPar readPar;
......@@ -951,7 +941,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break;
}
// after node proceed to right child
TupLoc loc = nodePtr.p->getLink(1);
TupLoc loc = node.getLink(1);
if (loc != NullTupLoc) {
jam();
pos.m_loc = loc;
......@@ -964,8 +954,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pos.m_dir == 1) {
// coming from right child proceed to parent
jam();
pos.m_loc = nodePtr.p->getLink(2);
pos.m_dir = nodePtr.p->getSide();
pos.m_loc = node.getLink(2);
pos.m_dir = node.getSide();
continue;
}
ndbrequire(false);
......@@ -974,16 +964,16 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos;
// relink
if (scan.m_state == ScanOp::Current) {
ndbrequire(pos.m_loc == nodePtr.p->m_loc);
if (origNodePtr.i != nodePtr.i) {
ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) {
jam();
unlinkScan(*origNodePtr.p, scanPtr);
linkScan(*nodePtr.p, scanPtr);
unlinkScan(origNode, scanPtr);
linkScan(node, scanPtr);
}
} else if (scan.m_state == ScanOp::Last) {
jam();
ndbrequire(pos.m_loc == NullTupLoc);
unlinkScan(*origNodePtr.p, scanPtr);
unlinkScan(origNode, scanPtr);
} else {
ndbrequire(false);
}
......@@ -1043,7 +1033,6 @@ void
Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
// unlock all not unlocked by LQH
for (unsigned i = 0; i < MaxAccLockOps; i++) {
......@@ -1068,7 +1057,6 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
releaseScanOp(scanPtr);
commitNodes(signal, frag, true);
}
void
......
......@@ -31,7 +31,6 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
treePos.m_loc = tree.m_root;
NodeHandlePtr nodePtr;
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
......@@ -39,10 +38,11 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree
treePos.m_match = false;
return;
}
NodeHandle node(frag);
loop: {
jam();
selectNode(signal, frag, nodePtr, treePos.m_loc, AccPref);
const unsigned occup = nodePtr.p->getOccup();
selectNode(signal, node, treePos.m_loc, AccPref);
const unsigned occup = node.getOccup();
ndbrequire(occup != 0);
// number of equal initial attributes in bounding node
unsigned numEq = ZNIL;
......@@ -51,7 +51,7 @@ loop: {
// compare prefix
CmpPar cmpPar;
cmpPar.m_data1 = searchPar.m_data;
cmpPar.m_data2 = nodePtr.p->getPref(i);
cmpPar.m_data2 = node.getPref(i);
cmpPar.m_len2 = tree.m_prefSize;
cmpPar.m_first = 0;
cmpPar.m_numEq = 0;
......@@ -60,7 +60,7 @@ loop: {
jam();
// read full value
ReadPar readPar;
readPar.m_ent = nodePtr.p->getMinMax(i);
readPar.m_ent = node.getMinMax(i);
ndbrequire(cmpPar.m_numEq < numAttrs);
readPar.m_first = cmpPar.m_numEq;
readPar.m_count = numAttrs - cmpPar.m_numEq;
......@@ -78,11 +78,11 @@ loop: {
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchPar.m_ent.cmp(nodePtr.p->getMinMax(i));
ret = searchPar.m_ent.cmp(node.getMinMax(i));
}
if (i == 0 ? (ret < 0) : (ret > 0)) {
jam();
const TupLoc loc = nodePtr.p->getLink(i);
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to left/right subtree
......@@ -102,8 +102,8 @@ loop: {
return;
}
}
// read rest of the bounding node
accessNode(signal, frag, nodePtr, AccFull);
// access rest of the bounding node
accessNode(signal, node, AccFull);
// position is strictly within the node
ndbrequire(occup >= 2);
const unsigned numWithin = occup - 2;
......@@ -114,7 +114,7 @@ loop: {
if (numEq < numAttrs) {
jam();
ReadPar readPar;
readPar.m_ent = nodePtr.p->getEnt(j);
readPar.m_ent = node.getEnt(j);
readPar.m_first = numEq;
readPar.m_count = numAttrs - numEq;
readPar.m_data = 0; // leave in signal data
......@@ -131,7 +131,7 @@ loop: {
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchPar.m_ent.cmp(nodePtr.p->getEnt(j));
ret = searchPar.m_ent.cmp(node.getEnt(j));
}
if (ret <= 0) {
jam();
......@@ -156,31 +156,31 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
{
TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandlePtr nodePtr;
NodeHandle node(frag);
// check for empty tree
if (treePos.m_loc == NullTupLoc) {
jam();
insertNode(signal, frag, nodePtr, AccPref);
nodePushUp(signal, *nodePtr.p, 0, ent);
nodePtr.p->setSide(2);
tree.m_root = nodePtr.p->m_loc;
insertNode(signal, node, AccPref);
nodePushUp(signal, node, 0, ent);
node.setSide(2);
tree.m_root = node.m_loc;
return;
}
// access full node
selectNode(signal, frag, nodePtr, treePos.m_loc, AccFull);
selectNode(signal, node, treePos.m_loc, AccFull);
// check if it is bounding node
if (pos != 0 && pos != nodePtr.p->getOccup()) {
if (pos != 0 && pos != node.getOccup()) {
jam();
// check if room for one more
if (nodePtr.p->getOccup() < tree.m_maxOccup) {
if (node.getOccup() < tree.m_maxOccup) {
jam();
nodePushUp(signal, *nodePtr.p, pos, ent);
nodePushUp(signal, node, pos, ent);
return;
}
// returns min entry
nodePushDown(signal, *nodePtr.p, pos - 1, ent);
nodePushDown(signal, node, pos - 1, ent);
// find position to add the removed min entry
TupLoc childLoc = nodePtr.p->getLink(0);
TupLoc childLoc = node.getLink(0);
if (childLoc == NullTupLoc) {
jam();
// left child will be added
......@@ -190,60 +190,60 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// find glb node
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, frag, nodePtr, childLoc, AccHead);
childLoc = nodePtr.p->getLink(1);
selectNode(signal, node, childLoc, AccHead);
childLoc = node.getLink(1);
}
// access full node again
accessNode(signal, frag, nodePtr, AccFull);
pos = nodePtr.p->getOccup();
accessNode(signal, node, AccFull);
pos = node.getOccup();
}
// fall thru to next case
}
// adding new min or max
unsigned i = (pos == 0 ? 0 : 1);
ndbrequire(nodePtr.p->getLink(i) == NullTupLoc);
ndbrequire(node.getLink(i) == NullTupLoc);
// check if the half-leaf/leaf has room for one more
if (nodePtr.p->getOccup() < tree.m_maxOccup) {
if (node.getOccup() < tree.m_maxOccup) {
jam();
nodePushUp(signal, *nodePtr.p, pos, ent);
nodePushUp(signal, node, pos, ent);
return;
}
// add a new node
NodeHandlePtr childPtr;
insertNode(signal, frag, childPtr, AccPref);
nodePushUp(signal, *childPtr.p, 0, ent);
NodeHandle childNode(frag);
insertNode(signal, childNode, AccPref);
nodePushUp(signal, childNode, 0, ent);
// connect parent and child
nodePtr.p->setLink(i, childPtr.p->m_loc);
childPtr.p->setLink(2, nodePtr.p->m_loc);
childPtr.p->setSide(i);
node.setLink(i, childNode.m_loc);
childNode.setLink(2, node.m_loc);
childNode.setSide(i);
// re-balance tree at each node
while (true) {
// height of subtree i has increased by 1
int j = (i == 0 ? -1 : +1);
int b = nodePtr.p->getBalance();
int b = node.getBalance();
if (b == 0) {
// perfectly balanced
jam();
nodePtr.p->setBalance(j);
node.setBalance(j);
// height change propagates up
} else if (b == -j) {
// height of shorter subtree increased
jam();
nodePtr.p->setBalance(0);
node.setBalance(0);
// height of tree did not change - done
break;
} else if (b == j) {
// height of longer subtree increased
jam();
NodeHandlePtr childPtr;
selectNode(signal, frag, childPtr, nodePtr.p->getLink(i), AccHead);
int b2 = childPtr.p->getBalance();
NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(i), AccHead);
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
treeRotateSingle(signal, frag, nodePtr, i);
treeRotateSingle(signal, frag, node, i);
} else if (b2 == -b) {
jam();
treeRotateDouble(signal, frag, nodePtr, i);
treeRotateDouble(signal, frag, node, i);
} else {
// height of subtree increased so it cannot be perfectly balanced
ndbrequire(false);
......@@ -253,14 +253,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
} else {
ndbrequire(false);
}
TupLoc parentLoc = nodePtr.p->getLink(2);
TupLoc parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) {
jam();
// root node - done
break;
}
i = nodePtr.p->getSide();
selectNode(signal, frag, nodePtr, parentLoc, AccHead);
i = node.getSide();
selectNode(signal, node, parentLoc, AccHead);
}
}
......@@ -272,101 +272,101 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
{
TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandlePtr nodePtr;
NodeHandle node(frag);
// access full node
selectNode(signal, frag, nodePtr, treePos.m_loc, AccFull);
selectNode(signal, node, treePos.m_loc, AccFull);
TreeEnt ent;
// check interior node first
if (nodePtr.p->getChilds() == 2) {
if (node.getChilds() == 2) {
jam();
ndbrequire(nodePtr.p->getOccup() >= tree.m_minOccup);
ndbrequire(node.getOccup() >= tree.m_minOccup);
// check if no underflow
if (nodePtr.p->getOccup() > tree.m_minOccup) {
if (node.getOccup() > tree.m_minOccup) {
jam();
nodePopDown(signal, *nodePtr.p, pos, ent);
nodePopDown(signal, node, pos, ent);
return;
}
// save current handle
NodeHandlePtr parentPtr = nodePtr;
NodeHandle parentNode = node;
// find glb node
TupLoc childLoc = nodePtr.p->getLink(0);
TupLoc childLoc = node.getLink(0);
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, frag, nodePtr, childLoc, AccHead);
childLoc = nodePtr.p->getLink(1);
selectNode(signal, node, childLoc, AccHead);
childLoc = node.getLink(1);
}
// access full node again
accessNode(signal, frag, nodePtr, AccFull);
accessNode(signal, node, AccFull);
// use glb max as new parent min
ent = nodePtr.p->getEnt(nodePtr.p->getOccup() - 1);
nodePopUp(signal, *parentPtr.p, pos, ent);
ent = node.getEnt(node.getOccup() - 1);
nodePopUp(signal, parentNode, pos, ent);
// set up to remove glb max
pos = nodePtr.p->getOccup() - 1;
pos = node.getOccup() - 1;
// fall thru to next case
}
// remove the element
nodePopDown(signal, *nodePtr.p, pos, ent);
ndbrequire(nodePtr.p->getChilds() <= 1);
nodePopDown(signal, node, pos, ent);
ndbrequire(node.getChilds() <= 1);
// handle half-leaf
for (unsigned i = 0; i <= 1; i++) {
jam();
TupLoc childLoc = nodePtr.p->getLink(i);
TupLoc childLoc = node.getLink(i);
if (childLoc != NullTupLoc) {
// move to child
selectNode(signal, frag, nodePtr, childLoc, AccFull);
selectNode(signal, node, childLoc, AccFull);
// balance of half-leaf parent requires child to be leaf
break;
}
}
ndbrequire(nodePtr.p->getChilds() == 0);
ndbrequire(node.getChilds() == 0);
// get parent if any
TupLoc parentLoc = nodePtr.p->getLink(2);
NodeHandlePtr parentPtr;
unsigned i = nodePtr.p->getSide();
TupLoc parentLoc = node.getLink(2);
NodeHandle parentNode(frag);
unsigned i = node.getSide();
// move all that fits into parent
if (parentLoc != NullTupLoc) {
jam();
selectNode(signal, frag, parentPtr, nodePtr.p->getLink(2), AccFull);
nodeSlide(signal, *parentPtr.p, *nodePtr.p, i);
selectNode(signal, parentNode, node.getLink(2), AccFull);
nodeSlide(signal, parentNode, node, i);
// fall thru to next case
}
// non-empty leaf
if (nodePtr.p->getOccup() >= 1) {
if (node.getOccup() >= 1) {
jam();
return;
}
// remove empty leaf
deleteNode(signal, frag, nodePtr);
deleteNode(signal, node);
if (parentLoc == NullTupLoc) {
jam();
// tree is now empty
tree.m_root = NullTupLoc;
return;
}
nodePtr = parentPtr;
nodePtr.p->setLink(i, NullTupLoc);
node = parentNode;
node.setLink(i, NullTupLoc);
#ifdef dbtux_min_occup_less_max_occup
// check if we created a half-leaf
if (nodePtr.p->getBalance() == 0) {
if (node.getBalance() == 0) {
jam();
// move entries from the other child
TupLoc childLoc = nodePtr.p->getLink(1 - i);
NodeHandlePtr childPtr;
selectNode(signal, frag, childPtr, childLoc, AccFull);
nodeSlide(signal, *nodePtr.p, *childPtr.i, 1 - i);
if (childPtr.p->getOccup() == 0) {
TupLoc childLoc = node.getLink(1 - i);
NodeHandle childNode(frag);
selectNode(signal, childNode, childLoc, AccFull);
nodeSlide(signal, node, childNode, 1 - i);
if (childNode.getOccup() == 0) {
jam();
deleteNode(signal, frag, childPtr);
nodePtr.p->setLink(1 - i, NullTupLoc);
deleteNode(signal, childNode);
node.setLink(1 - i, NullTupLoc);
// we are balanced again but our parent balance changes by -1
parentLoc = nodePtr.p->getLink(2);
parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) {
jam();
return;
}
// fix side and become parent
i = nodePtr.p->getSide();
selectNode(signal, frag, nodePtr, parentLoc, AccHead);
i = node.getSide();
selectNode(signal, node, parentLoc, AccHead);
}
}
#endif
......@@ -374,50 +374,50 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
while (true) {
// height of subtree i has decreased by 1
int j = (i == 0 ? -1 : +1);
int b = nodePtr.p->getBalance();
int b = node.getBalance();
if (b == 0) {
// perfectly balanced
jam();
nodePtr.p->setBalance(-j);
node.setBalance(-j);
// height of tree did not change - done
return;
} else if (b == j) {
// height of longer subtree has decreased
jam();
nodePtr.p->setBalance(0);
node.setBalance(0);
// height change propagates up
} else if (b == -j) {
// height of shorter subtree has decreased
jam();
NodeHandlePtr childPtr;
// child on the other side
selectNode(signal, frag, childPtr, nodePtr.p->getLink(1 - i), AccHead);
int b2 = childPtr.p->getBalance();
NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(1 - i), AccHead);
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
treeRotateSingle(signal, frag, nodePtr, 1 - i);
treeRotateSingle(signal, frag, node, 1 - i);
// height of tree decreased and propagates up
} else if (b2 == -b) {
jam();
treeRotateDouble(signal, frag, nodePtr, 1 - i);
treeRotateDouble(signal, frag, node, 1 - i);
// height of tree decreased and propagates up
} else {
jam();
treeRotateSingle(signal, frag, nodePtr, 1 - i);
treeRotateSingle(signal, frag, node, 1 - i);
// height of tree did not change - done
return;
}
} else {
ndbrequire(false);
}
TupLoc parentLoc = nodePtr.p->getLink(2);
TupLoc parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) {
jam();
// root node - done
return;
}
i = nodePtr.p->getSide();
selectNode(signal, frag, nodePtr, parentLoc, AccHead);
i = node.getSide();
selectNode(signal, node, parentLoc, AccHead);
}
}
......@@ -440,55 +440,55 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
void
Dbtux::treeRotateSingle(Signal* signal,
Frag& frag,
NodeHandlePtr& nodePtr,
NodeHandle& node,
unsigned i)
{
ndbrequire(i <= 1);
/*
5 is the old top node that have been unbalanced due to an insert or
delete. The balance is still the old balance before the update.
Verify that n5Bal is 1 if RR rotate and -1 if LL rotate.
Verify that bal5 is 1 if RR rotate and -1 if LL rotate.
*/
NodeHandlePtr n5Ptr = nodePtr;
const TupLoc n5Loc = n5Ptr.p->m_loc;
const int n5Bal = n5Ptr.p->getBalance();
const int n5side = n5Ptr.p->getSide();
ndbrequire(n5Bal + (1 - i) == i);
NodeHandle node5 = node;
const TupLoc loc5 = node5.m_loc;
const int bal5 = node5.getBalance();
const int side5 = node5.getSide();
ndbrequire(bal5 + (1 - i) == i);
/*
3 is the new root of this part of the tree which is to swap place with
node 5. For an insert to cause this it must have the same balance as 5.
For deletes it can have the balance 0.
*/
TupLoc n3Loc = n5Ptr.p->getLink(i);
NodeHandlePtr n3Ptr;
selectNode(signal, frag, n3Ptr, n3Loc, AccHead);
const int n3Bal = n3Ptr.p->getBalance();
TupLoc loc3 = node5.getLink(i);
NodeHandle node3(frag);
selectNode(signal, node3, loc3, AccHead);
const int bal3 = node3.getBalance();
/*
2 must always be there but is not changed. Thus we mereley check that it
exists.
*/
ndbrequire(n3Ptr.p->getLink(i) != NullTupLoc);
ndbrequire(node3.getLink(i) != NullTupLoc);
/*
4 is not necessarily there but if it is there it will move from one
side of 3 to the other side of 5. For LL it moves from the right side
to the left side and for RR it moves from the left side to the right
side. This means that it also changes parent from 3 to 5.
*/
TupLoc n4Loc = n3Ptr.p->getLink(1 - i);
NodeHandlePtr n4Ptr;
if (n4Loc != NullTupLoc) {
TupLoc loc4 = node3.getLink(1 - i);
NodeHandle node4(frag);
if (loc4 != NullTupLoc) {
jam();
selectNode(signal, frag, n4Ptr, n4Loc, AccHead);
ndbrequire(n4Ptr.p->getSide() == (1 - i) &&
n4Ptr.p->getLink(2) == n3Loc);
n4Ptr.p->setSide(i);
n4Ptr.p->setLink(2, n5Loc);
selectNode(signal, node4, loc4, AccHead);
ndbrequire(node4.getSide() == (1 - i) &&
node4.getLink(2) == loc3);
node4.setSide(i);
node4.setLink(2, loc5);
}//if
/*
Retrieve the address of 5's parent before it is destroyed
*/
TupLoc n0Loc = n5Ptr.p->getLink(2);
TupLoc loc0 = node5.getLink(2);
/*
The next step is to perform the rotation. 3 will inherit 5's parent
......@@ -502,22 +502,22 @@ Dbtux::treeRotateSingle(Signal* signal,
1. 3 must have had 5 as parent before the change.
2. 3's side is left for LL and right for RR before change.
*/
ndbrequire(n3Ptr.p->getLink(2) == n5Loc);
ndbrequire(n3Ptr.p->getSide() == i);
n3Ptr.p->setLink(1 - i, n5Loc);
n3Ptr.p->setLink(2, n0Loc);
n3Ptr.p->setSide(n5side);
n5Ptr.p->setLink(i, n4Loc);
n5Ptr.p->setLink(2, n3Loc);
n5Ptr.p->setSide(1 - i);
if (n0Loc != NullTupLoc) {
ndbrequire(node3.getLink(2) == loc5);
ndbrequire(node3.getSide() == i);
node3.setLink(1 - i, loc5);
node3.setLink(2, loc0);
node3.setSide(side5);
node5.setLink(i, loc4);
node5.setLink(2, loc3);
node5.setSide(1 - i);
if (loc0 != NullTupLoc) {
jam();
NodeHandlePtr n0Ptr;
selectNode(signal, frag, n0Ptr, n0Loc, AccHead);
n0Ptr.p->setLink(n5side, n3Loc);
NodeHandle node0(frag);
selectNode(signal, node0, loc0, AccHead);
node0.setLink(side5, loc3);
} else {
jam();
frag.m_tree.m_root = n3Loc;
frag.m_tree.m_root = loc3;
}//if
/* The final step of the change is to update the balance of 3 and
5 that changed places. There are two cases here. The first case is
......@@ -530,22 +530,22 @@ Dbtux::treeRotateSingle(Signal* signal,
In this case 5 will change balance but still be unbalanced and 3 will
be unbalanced in the opposite direction of 5.
*/
if (n3Bal == n5Bal) {
if (bal3 == bal5) {
jam();
n3Ptr.p->setBalance(0);
n5Ptr.p->setBalance(0);
} else if (n3Bal == 0) {
node3.setBalance(0);
node5.setBalance(0);
} else if (bal3 == 0) {
jam();
n3Ptr.p->setBalance(-n5Bal);
n5Ptr.p->setBalance(n5Bal);
node3.setBalance(-bal5);
node5.setBalance(bal5);
} else {
ndbrequire(false);
}//if
/*
Set nodePtr to 3 as return parameter for enabling caller to continue
Set node to 3 as return parameter for enabling caller to continue
traversing the tree.
*/
nodePtr = n3Ptr;
node = node3;
}
/*
......@@ -650,105 +650,105 @@ Dbtux::treeRotateSingle(Signal* signal,
*
*/
void
Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i)
Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i)
{
// old top node
NodeHandlePtr n6Ptr = nodePtr;
const TupLoc n6Loc = n6Ptr.p->m_loc;
NodeHandle node6 = node;
const TupLoc loc6 = node6.m_loc;
// the un-updated balance
const int n6Bal = n6Ptr.p->getBalance();
const unsigned n6Side = n6Ptr.p->getSide();
const int bal6 = node6.getBalance();
const unsigned side6 = node6.getSide();
// level 1
TupLoc n2Loc = n6Ptr.p->getLink(i);
NodeHandlePtr n2Ptr;
selectNode(signal, frag, n2Ptr, n2Loc, AccHead);
const int n2Bal = n2Ptr.p->getBalance();
TupLoc loc2 = node6.getLink(i);
NodeHandle node2(frag);
selectNode(signal, node2, loc2, AccHead);
const int bal2 = node2.getBalance();
// level 2
TupLoc n4Loc = n2Ptr.p->getLink(1 - i);
NodeHandlePtr n4Ptr;
selectNode(signal, frag, n4Ptr, n4Loc, AccHead);
const int n4Bal = n4Ptr.p->getBalance();
TupLoc loc4 = node2.getLink(1 - i);
NodeHandle node4(frag);
selectNode(signal, node4, loc4, AccHead);
const int bal4 = node4.getBalance();
ndbrequire(i <= 1);
ndbrequire(n6Bal + (1 - i) == i);
ndbrequire(n2Bal == -n6Bal);
ndbrequire(n2Ptr.p->getLink(2) == n6Loc);
ndbrequire(n2Ptr.p->getSide() == i);
ndbrequire(n4Ptr.p->getLink(2) == n2Loc);
ndbrequire(bal6 + (1 - i) == i);
ndbrequire(bal2 == -bal6);
ndbrequire(node2.getLink(2) == loc6);
ndbrequire(node2.getSide() == i);
ndbrequire(node4.getLink(2) == loc2);
// level 3
TupLoc n3Loc = n4Ptr.p->getLink(i);
TupLoc n5Loc = n4Ptr.p->getLink(1 - i);
TupLoc loc3 = node4.getLink(i);
TupLoc loc5 = node4.getLink(1 - i);
// fill up leaf before it becomes internal
if (n3Loc == NullTupLoc && n5Loc == NullTupLoc) {
if (loc3 == NullTupLoc && loc5 == NullTupLoc) {
jam();
TreeHead& tree = frag.m_tree;
accessNode(signal, frag, n2Ptr, AccFull);
accessNode(signal, frag, n4Ptr, AccFull);
nodeSlide(signal, *n4Ptr.p, *n2Ptr.p, i);
accessNode(signal, node2, AccFull);
accessNode(signal, node4, AccFull);
nodeSlide(signal, node4, node2, i);
// implied by rule of merging half-leaves with leaves
ndbrequire(n4Ptr.p->getOccup() >= tree.m_minOccup);
ndbrequire(n2Ptr.p->getOccup() != 0);
ndbrequire(node4.getOccup() >= tree.m_minOccup);
ndbrequire(node2.getOccup() != 0);
} else {
if (n3Loc != NullTupLoc) {
if (loc3 != NullTupLoc) {
jam();
NodeHandlePtr n3Ptr;
selectNode(signal, frag, n3Ptr, n3Loc, AccHead);
n3Ptr.p->setLink(2, n2Loc);
n3Ptr.p->setSide(1 - i);
NodeHandle node3(frag);
selectNode(signal, node3, loc3, AccHead);
node3.setLink(2, loc2);
node3.setSide(1 - i);
}
if (n5Loc != NullTupLoc) {
if (loc5 != NullTupLoc) {
jam();
NodeHandlePtr n5Ptr;
selectNode(signal, frag, n5Ptr, n5Loc, AccHead);
n5Ptr.p->setLink(2, n6Ptr.p->m_loc);
n5Ptr.p->setSide(i);
NodeHandle node5(frag);
selectNode(signal, node5, loc5, AccHead);
node5.setLink(2, node6.m_loc);
node5.setSide(i);
}
}
// parent
TupLoc n0Loc = n6Ptr.p->getLink(2);
NodeHandlePtr n0Ptr;
TupLoc loc0 = node6.getLink(2);
NodeHandle node0(frag);
// perform the rotation
n6Ptr.p->setLink(i, n5Loc);
n6Ptr.p->setLink(2, n4Loc);
n6Ptr.p->setSide(1 - i);
node6.setLink(i, loc5);
node6.setLink(2, loc4);
node6.setSide(1 - i);
n2Ptr.p->setLink(1 - i, n3Loc);
n2Ptr.p->setLink(2, n4Loc);
node2.setLink(1 - i, loc3);
node2.setLink(2, loc4);
n4Ptr.p->setLink(i, n2Loc);
n4Ptr.p->setLink(1 - i, n6Loc);
n4Ptr.p->setLink(2, n0Loc);
n4Ptr.p->setSide(n6Side);
node4.setLink(i, loc2);
node4.setLink(1 - i, loc6);
node4.setLink(2, loc0);
node4.setSide(side6);
if (n0Loc != NullTupLoc) {
if (loc0 != NullTupLoc) {
jam();
selectNode(signal, frag, n0Ptr, n0Loc, AccHead);
n0Ptr.p->setLink(n6Side, n4Loc);
selectNode(signal, node0, loc0, AccHead);
node0.setLink(side6, loc4);
} else {
jam();
frag.m_tree.m_root = n4Loc;
frag.m_tree.m_root = loc4;
}
// set balance of changed nodes
n4Ptr.p->setBalance(0);
if (n4Bal == 0) {
node4.setBalance(0);
if (bal4 == 0) {
jam();
n2Ptr.p->setBalance(0);
n6Ptr.p->setBalance(0);
} else if (n4Bal == -n2Bal) {
node2.setBalance(0);
node6.setBalance(0);
} else if (bal4 == -bal2) {
jam();
n2Ptr.p->setBalance(0);
n6Ptr.p->setBalance(n2Bal);
} else if (n4Bal == n2Bal) {
node2.setBalance(0);
node6.setBalance(bal2);
} else if (bal4 == bal2) {
jam();
n2Ptr.p->setBalance(-n2Bal);
n6Ptr.p->setBalance(0);
node2.setBalance(-bal2);
node6.setBalance(0);
} else {
ndbrequire(false);
}
// new top node
nodePtr = n4Ptr;
node = node4;
}
......@@ -31,6 +31,7 @@ optim 4 mc02/a 42 ms 80 ms 87 pct
optim 5 mc02/a 43 ms 77 ms 77 pct
mc02/b 54 ms 118 ms 117 pct
optim 6 mc02/a 42 ms 70 ms 66 pct
mc02/b 53 ms 109 ms 105 pct
vim: set et:
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment