Commit ed51beb9 authored by pekka@mysql.com's avatar pekka@mysql.com

tux optim 6 - remove node cache

parent 38cf1a3f
...@@ -102,11 +102,6 @@ private: ...@@ -102,11 +102,6 @@ private:
// sizes are in words (Uint32) // sizes are in words (Uint32)
static const unsigned MaxIndexFragments = 2 * NO_OF_FRAG_PER_NODE; static const unsigned MaxIndexFragments = 2 * NO_OF_FRAG_PER_NODE;
static const unsigned MaxIndexAttributes = MAX_ATTRIBUTES_IN_INDEX; static const unsigned MaxIndexAttributes = MAX_ATTRIBUTES_IN_INDEX;
#ifdef VM_TRACE
static const unsigned MaxNodeHandles = 10000; // More space for printTree
#else
static const unsigned MaxNodeHandles = 128; // enough for 1 operation
#endif
static const unsigned MaxAttrDataSize = 2048; static const unsigned MaxAttrDataSize = 2048;
public: public:
static const unsigned DescPageSize = 256; static const unsigned DescPageSize = 256;
...@@ -179,7 +174,7 @@ private: ...@@ -179,7 +174,7 @@ private:
}; };
/* /*
* There is no const variable NullTupLoc since the compiler may not be * There is no const member NullTupLoc since the compiler may not be
* able to optimize it to TupLoc() constants. Instead null values are * able to optimize it to TupLoc() constants. Instead null values are
* constructed on the stack with TupLoc(). * constructed on the stack with TupLoc().
*/ */
...@@ -462,8 +457,7 @@ private: ...@@ -462,8 +457,7 @@ private:
Uint16 m_descOff; Uint16 m_descOff;
Uint16 m_numAttrs; Uint16 m_numAttrs;
TreeHead m_tree; TreeHead m_tree;
Uint32 m_nodeList; // node cache of current operation TupLoc m_freeLoc; // one node pre-allocated for insert
Uint32 m_nodeFree; // one node pre-allocated for insert
DLList<ScanOp> m_scanList; // current scans on this fragment DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI; Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2]; Uint32 m_tupTableFragPtrI[2];
...@@ -498,9 +492,8 @@ private: ...@@ -498,9 +492,8 @@ private:
// node handles // node handles
/* /*
* A tree operation builds a cache of accessed nodes. This allows * A node handle is a reference to a tree node in TUP. It is used to
* different implementations of index memory access. The cache is * operate on the node. Node handles are allocated on the stack.
* committed and released at the end of the operation.
*/ */
struct NodeHandle; struct NodeHandle;
friend struct NodeHandle; friend struct NodeHandle;
...@@ -509,11 +502,9 @@ private: ...@@ -509,11 +502,9 @@ private:
TupLoc m_loc; // physical node address TupLoc m_loc; // physical node address
TreeNode* m_node; // pointer to node storage TreeNode* m_node; // pointer to node storage
AccSize m_acc; // accessed size AccSize m_acc; // accessed size
union {
Uint32 m_next; // next active node under fragment
Uint32 nextPool;
};
NodeHandle(Frag& frag); NodeHandle(Frag& frag);
NodeHandle(const NodeHandle& node);
NodeHandle& operator=(const NodeHandle& node);
// getters // getters
TupLoc getLink(unsigned i); TupLoc getLink(unsigned i);
unsigned getChilds(); // cannot spell unsigned getChilds(); // cannot spell
...@@ -521,20 +512,19 @@ private: ...@@ -521,20 +512,19 @@ private:
unsigned getOccup(); unsigned getOccup();
int getBalance(); int getBalance();
Uint32 getNodeScan(); Uint32 getNodeScan();
Data getPref(unsigned i);
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// setters // setters
void setLink(unsigned i, TupLoc loc); void setLink(unsigned i, TupLoc loc);
void setSide(unsigned i); void setSide(unsigned i);
void setOccup(unsigned n); void setOccup(unsigned n);
void setBalance(int b); void setBalance(int b);
void setNodeScan(Uint32 scanPtrI); void setNodeScan(Uint32 scanPtrI);
// access other parts of the node
Data getPref(unsigned i);
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// for ndbrequire and ndbassert // for ndbrequire and ndbassert
void progError(int line, int cause, const char* file); void progError(int line, int cause, const char* file);
}; };
typedef Ptr<NodeHandle> NodeHandlePtr;
ArrayPool<NodeHandle> c_nodeHandlePool;
// parameters for methods // parameters for methods
...@@ -565,17 +555,6 @@ private: ...@@ -565,17 +555,6 @@ private:
ReadPar(); ReadPar();
}; };
/*
* Node storage operation.
*/
struct StorePar {
TupStoreTh::OpCode m_opCode;// operation code
unsigned m_offset; // data offset in words
unsigned m_size; // number of words
Uint32 m_errorCode; // terrorCode from TUP
StorePar();
};
/* /*
* Tree search for entry. * Tree search for entry.
*/ */
...@@ -649,15 +628,12 @@ private: ...@@ -649,15 +628,12 @@ private:
/* /*
* DbtuxNode.cpp * DbtuxNode.cpp
*/ */
void seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr); int allocNode(Signal* signal, NodeHandle& node);
void preallocNode(Signal* signal, Frag& frag, Uint32& errorCode); void accessNode(Signal* signal, NodeHandle& node, AccSize acc);
void findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc); void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc);
void selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc, AccSize acc); void insertNode(Signal* signal, NodeHandle& node, AccSize acc);
void insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc); void deleteNode(Signal* signal, NodeHandle& node);
void deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr);
void accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
void setNodePref(Signal* signal, NodeHandle& node, unsigned i); void setNodePref(Signal* signal, NodeHandle& node, unsigned i);
void commitNodes(Signal* signal, Frag& frag, bool updateOk);
// node operations // node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent); void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
...@@ -675,8 +651,8 @@ private: ...@@ -675,8 +651,8 @@ private:
void treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& treePos); void treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& treePos);
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent); void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos); void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i); void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i); void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
/* /*
* DbtuxScan.cpp * DbtuxScan.cpp
...@@ -1054,8 +1030,7 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) : ...@@ -1054,8 +1030,7 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) :
m_descOff(0), m_descOff(0),
m_numAttrs(ZNIL), m_numAttrs(ZNIL),
m_tree(), m_tree(),
m_nodeList(RNIL), m_freeLoc(),
m_nodeFree(RNIL),
m_scanList(scanOpPool), m_scanList(scanOpPool),
m_tupIndexFragPtrI(RNIL) m_tupIndexFragPtrI(RNIL)
{ {
...@@ -1086,11 +1061,29 @@ Dbtux::NodeHandle::NodeHandle(Frag& frag) : ...@@ -1086,11 +1061,29 @@ Dbtux::NodeHandle::NodeHandle(Frag& frag) :
m_frag(frag), m_frag(frag),
m_loc(), m_loc(),
m_node(0), m_node(0),
m_acc(AccNone), m_acc(AccNone)
m_next(RNIL)
{ {
} }
inline
Dbtux::NodeHandle::NodeHandle(const NodeHandle& node) :
m_frag(node.m_frag),
m_loc(node.m_loc),
m_node(node.m_node),
m_acc(node.m_acc)
{
}
inline Dbtux::NodeHandle&
Dbtux::NodeHandle::operator=(const NodeHandle& node)
{
ndbassert(&m_frag == &node.m_frag);
m_loc = node.m_loc;
m_node = node.m_node;
m_acc = node.m_acc;
return *this;
}
inline Dbtux::TupLoc inline Dbtux::TupLoc
Dbtux::NodeHandle::getLink(unsigned i) Dbtux::NodeHandle::getLink(unsigned i)
{ {
...@@ -1128,37 +1121,6 @@ Dbtux::NodeHandle::getNodeScan() ...@@ -1128,37 +1121,6 @@ Dbtux::NodeHandle::getNodeScan()
return m_node->m_nodeScan; return m_node->m_nodeScan;
} }
inline Dbtux::Data
Dbtux::NodeHandle::getPref(unsigned i)
{
TreeHead& tree = m_frag.m_tree;
ndbrequire(m_acc >= AccPref && i <= 1);
return tree.getPref(m_node, i);
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getEnt(unsigned pos)
{
TreeHead& tree = m_frag.m_tree;
TreeEnt* entList = tree.getEntList(m_node);
const unsigned occup = m_node->m_occup;
ndbrequire(pos < occup);
if (pos == 0 || pos == occup - 1) {
ndbrequire(m_acc >= AccPref)
} else {
ndbrequire(m_acc == AccFull)
}
return entList[(1 + pos) % occup];
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getMinMax(unsigned i)
{
const unsigned occup = m_node->m_occup;
ndbrequire(i <= 1 && occup != 0);
return getEnt(i == 0 ? 0 : occup - 1);
}
inline void inline void
Dbtux::NodeHandle::setLink(unsigned i, TupLoc loc) Dbtux::NodeHandle::setLink(unsigned i, TupLoc loc)
{ {
...@@ -1195,6 +1157,37 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI) ...@@ -1195,6 +1157,37 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI)
m_node->m_nodeScan = scanPtrI; m_node->m_nodeScan = scanPtrI;
} }
inline Dbtux::Data
Dbtux::NodeHandle::getPref(unsigned i)
{
TreeHead& tree = m_frag.m_tree;
ndbrequire(m_acc >= AccPref && i <= 1);
return tree.getPref(m_node, i);
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getEnt(unsigned pos)
{
TreeHead& tree = m_frag.m_tree;
TreeEnt* entList = tree.getEntList(m_node);
const unsigned occup = m_node->m_occup;
ndbrequire(pos < occup);
if (pos == 0 || pos == occup - 1) {
ndbrequire(m_acc >= AccPref)
} else {
ndbrequire(m_acc == AccFull)
}
return entList[(1 + pos) % occup];
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getMinMax(unsigned i)
{
const unsigned occup = m_node->m_occup;
ndbrequire(i <= 1 && occup != 0);
return getEnt(i == 0 ? 0 : occup - 1);
}
// parameters for methods // parameters for methods
inline inline
...@@ -1217,15 +1210,6 @@ Dbtux::ReadPar::ReadPar() : ...@@ -1217,15 +1210,6 @@ Dbtux::ReadPar::ReadPar() :
{ {
} }
inline
Dbtux::StorePar::StorePar() :
m_opCode(TupStoreTh::OpUndefined),
m_offset(0),
m_size(0),
m_errorCode(0)
{
}
inline inline
Dbtux::SearchPar::SearchPar() : Dbtux::SearchPar::SearchPar() :
m_data(0), m_data(0),
......
...@@ -106,13 +106,11 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -106,13 +106,11 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
signal->theData[1] = 1; signal->theData[1] = 1;
execDUMP_STATE_ORD(signal); execDUMP_STATE_ORD(signal);
if (debugFile != 0) { if (debugFile != 0) {
commitNodes(signal, frag, false);
printTree(signal, frag, debugOut); printTree(signal, frag, debugOut);
} }
} }
ndbrequire(false); ndbrequire(false);
} }
commitNodes(signal, frag, false);
} }
void void
...@@ -123,9 +121,9 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -123,9 +121,9 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
return; return;
} }
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
NodeHandlePtr nodePtr; NodeHandle node(frag);
selectNode(signal, frag, nodePtr, loc, AccFull); selectNode(signal, node, loc, AccFull);
out << par.m_path << " " << *nodePtr.p << endl; out << par.m_path << " " << node << endl;
// check children // check children
PrintPar cpar[2]; PrintPar cpar[2];
ndbrequire(strlen(par.m_path) + 1 < sizeof(par.m_path)); ndbrequire(strlen(par.m_path) + 1 < sizeof(par.m_path));
...@@ -134,56 +132,56 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -134,56 +132,56 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i; cpar[i].m_side = i;
cpar[i].m_depth = 0; cpar[i].m_depth = 0;
cpar[i].m_parent = loc; cpar[i].m_parent = loc;
printNode(signal, frag, out, nodePtr.p->getLink(i), cpar[i]); printNode(signal, frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) { if (! cpar[i].m_ok) {
par.m_ok = false; par.m_ok = false;
} }
} }
// check child-parent links // check child-parent links
if (nodePtr.p->getLink(2) != par.m_parent) { if (node.getLink(2) != par.m_parent) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "parent loc " << hex << nodePtr.p->getLink(2); out << "parent loc " << hex << node.getLink(2);
out << " should be " << hex << par.m_parent << endl; out << " should be " << hex << par.m_parent << endl;
} }
if (nodePtr.p->getSide() != par.m_side) { if (node.getSide() != par.m_side) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "side " << dec << nodePtr.p->getSide(); out << "side " << dec << node.getSide();
out << " should be " << dec << par.m_side << endl; out << " should be " << dec << par.m_side << endl;
} }
// check balance // check balance
const int balance = -cpar[0].m_depth + cpar[1].m_depth; const int balance = -cpar[0].m_depth + cpar[1].m_depth;
if (nodePtr.p->getBalance() != balance) { if (node.getBalance() != balance) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "balance " << nodePtr.p->getBalance(); out << "balance " << node.getBalance();
out << " should be " << balance << endl; out << " should be " << balance << endl;
} }
if (abs(nodePtr.p->getBalance()) > 1) { if (abs(node.getBalance()) > 1) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "balance " << nodePtr.p->getBalance() << " is invalid" << endl; out << "balance " << node.getBalance() << " is invalid" << endl;
} }
// check occupancy // check occupancy
if (nodePtr.p->getOccup() > tree.m_maxOccup) { if (node.getOccup() > tree.m_maxOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "occupancy " << nodePtr.p->getOccup(); out << "occupancy " << node.getOccup();
out << " greater than max " << tree.m_maxOccup << endl; out << " greater than max " << tree.m_maxOccup << endl;
} }
// check for occupancy of interior node // check for occupancy of interior node
if (nodePtr.p->getChilds() == 2 && nodePtr.p->getOccup() < tree.m_minOccup) { if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "occupancy " << nodePtr.p->getOccup() << " of interior node"; out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl; out << " less than min " << tree.m_minOccup << endl;
} }
// check missed half-leaf/leaf merge // check missed half-leaf/leaf merge
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
if (nodePtr.p->getLink(i) != NullTupLoc && if (node.getLink(i) != NullTupLoc &&
nodePtr.p->getLink(1 - i) == NullTupLoc && node.getLink(1 - i) == NullTupLoc &&
nodePtr.p->getOccup() + cpar[i].m_occup <= tree.m_maxOccup) { node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "missed merge with child " << i << endl; out << "missed merge with child " << i << endl;
...@@ -191,7 +189,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -191,7 +189,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
} }
// return values // return values
par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth); par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
par.m_occup = nodePtr.p->getOccup(); par.m_occup = node.getOccup();
} }
NdbOut& NdbOut&
......
...@@ -178,7 +178,6 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ...@@ -178,7 +178,6 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
c_fragPool.setSize(nFragment); c_fragPool.setSize(nFragment);
c_descPagePool.setSize(nDescPage); c_descPagePool.setSize(nDescPage);
c_fragOpPool.setSize(MaxIndexFragments); c_fragOpPool.setSize(MaxIndexFragments);
c_nodeHandlePool.setSize(MaxNodeHandles);
c_scanOpPool.setSize(nScanOp); c_scanOpPool.setSize(nScanOp);
c_scanBoundPool.setSize(nScanBoundWords); c_scanBoundPool.setSize(nScanBoundWords);
/* /*
......
...@@ -72,7 +72,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -72,7 +72,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
} }
ndbrequire(fragPtr.i != RNIL); ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p; Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
// set up index entry // set up index entry
TreeEnt ent; TreeEnt ent;
ent.m_tupAddr = req->tupAddr; ent.m_tupAddr = req->tupAddr;
...@@ -143,17 +142,18 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -143,17 +142,18 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
} }
/* /*
* At most one new node is inserted in the operation. We keep one * At most one new node is inserted in the operation. We keep one
* free node pre-allocated so the operation cannot fail. This also * free node pre-allocated so the operation cannot fail.
* gives a real TupAddr for links to the new node.
*/ */
if (frag.m_nodeFree == RNIL) { if (frag.m_freeLoc == NullTupLoc) {
jam(); jam();
preallocNode(signal, frag, req->errorCode); NodeHandle node(frag);
req->errorCode = allocNode(signal, node);
if (req->errorCode != 0) { if (req->errorCode != 0) {
jam(); jam();
break; break;
} }
ndbrequire(frag.m_nodeFree != RNIL); frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc);
} }
treeAdd(signal, frag, treePos, ent); treeAdd(signal, frag, treePos, ent);
break; break;
...@@ -175,7 +175,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -175,7 +175,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break; break;
} }
// commit and release nodes // commit and release nodes
commitNodes(signal, frag, req->errorCode == 0);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugTree) { if (debugFlags & DebugTree) {
printTree(signal, frag, debugOut); printTree(signal, frag, debugOut);
......
...@@ -316,12 +316,6 @@ Dbtux::dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 sen ...@@ -316,12 +316,6 @@ Dbtux::dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 sen
unsigned i = --indexPtr.p->m_numFrags; unsigned i = --indexPtr.p->m_numFrags;
FragPtr fragPtr; FragPtr fragPtr;
c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]); c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
if (frag.m_nodeFree != RNIL) {
c_nodeHandlePool.release(frag.m_nodeFree);
frag.m_nodeFree = RNIL;
}
c_fragPool.release(fragPtr); c_fragPool.release(fragPtr);
// the real time break is not used for anything currently // the real time break is not used for anything currently
signal->theData[0] = TuxContinueB::DropIndex; signal->theData[0] = TuxContinueB::DropIndex;
......
...@@ -18,161 +18,94 @@ ...@@ -18,161 +18,94 @@
#include "Dbtux.hpp" #include "Dbtux.hpp"
/* /*
* Node handles. * Allocate index node in TUP.
*
* Temporary version between "cache" and "pointer" implementations.
*/ */
int
// Dbtux Dbtux::allocNode(Signal* signal, NodeHandle& node)
void
Dbtux::seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
{ {
if (! c_nodeHandlePool.seize(nodePtr)) { Frag& frag = node.m_frag;
jam();
return;
}
new (nodePtr.p) NodeHandle(frag);
nodePtr.p->m_next = frag.m_nodeList;
frag.m_nodeList = nodePtr.i;
}
void
Dbtux::preallocNode(Signal* signal, Frag& frag, Uint32& errorCode)
{
ndbrequire(frag.m_nodeFree == RNIL);
NodeHandlePtr nodePtr;
seizeNode(signal, frag, nodePtr);
ndbrequire(nodePtr.i != RNIL);
// remove from cache XXX ugly
frag.m_nodeFree = frag.m_nodeList;
frag.m_nodeList = nodePtr.p->m_next;
// alloc index node in TUP
Uint32 pageId = NullTupLoc.m_pageId; Uint32 pageId = NullTupLoc.m_pageId;
Uint32 pageOffset = NullTupLoc.m_pageOffset; Uint32 pageOffset = NullTupLoc.m_pageOffset;
Uint32* node32 = 0; Uint32* node32 = 0;
errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
if (errorCode != 0) { if (errorCode == 0) {
jam(); jam();
c_nodeHandlePool.release(nodePtr); node.m_loc = TupLoc(pageId, pageOffset);
frag.m_nodeFree = RNIL; node.m_node = reinterpret_cast<TreeNode*>(node32);
return; node.m_acc = AccNone;
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
} }
nodePtr.p->m_loc = TupLoc(pageId, pageOffset); return errorCode;
nodePtr.p->m_node = reinterpret_cast<TreeNode*>(node32);
ndbrequire(nodePtr.p->m_loc != NullTupLoc && nodePtr.p->m_node != 0);
new (nodePtr.p->m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
TreeNode* node = nodePtr.p->m_node;
memset(tree.getPref(node, 0), 0xa2, tree.m_prefSize << 2);
memset(tree.getPref(node, 1), 0xa2, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
} }
/* /*
* Find node in the cache. XXX too slow, use direct links instead * Access more of the node.
*/ */
void void
Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc) Dbtux::accessNode(Signal* signal, NodeHandle& node, AccSize acc)
{ {
NodeHandlePtr tmpPtr; ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
tmpPtr.i = frag.m_nodeList; if (node.m_acc >= acc)
while (tmpPtr.i != RNIL) {
jam();
c_nodeHandlePool.getPtr(tmpPtr);
if (tmpPtr.p->m_loc == loc) {
jam();
nodePtr = tmpPtr;
return; return;
} // XXX could do prefetch
tmpPtr.i = tmpPtr.p->m_next; node.m_acc = acc;
}
nodePtr.i = RNIL;
nodePtr.p = 0;
} }
/* /*
* Get handle for existing node. * Set handle to point to existing node.
*/ */
void void
Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc, AccSize acc) Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc)
{ {
ndbrequire(loc != NullTupLoc && acc > AccNone); Frag& frag = node.m_frag;
NodeHandlePtr tmpPtr; ndbrequire(loc != NullTupLoc);
// search in cache
findNode(signal, frag, tmpPtr, loc);
if (tmpPtr.i == RNIL) {
jam();
// add new node
seizeNode(signal, frag, tmpPtr);
ndbrequire(tmpPtr.i != RNIL);
tmpPtr.p->m_loc = loc;
Uint32 pageId = loc.m_pageId; Uint32 pageId = loc.m_pageId;
Uint32 pageOffset = loc.m_pageOffset; Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = 0; Uint32* node32 = 0;
c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
tmpPtr.p->m_node = reinterpret_cast<TreeNode*>(node32); node.m_loc = loc;
ndbrequire(tmpPtr.p->m_loc != NullTupLoc && tmpPtr.p->m_node != 0); node.m_node = reinterpret_cast<TreeNode*>(node32);
} node.m_acc = AccNone;
if (tmpPtr.p->m_acc < acc) { ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
jam(); accessNode(signal, node, acc);
accessNode(signal, frag, tmpPtr, acc);
}
nodePtr = tmpPtr;
} }
/* /*
* Create new node in the cache using the pre-allocated node. * Set handle to point to new node. Uses the pre-allocated node.
*/ */
void void
Dbtux::insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc) Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc)
{ {
ndbrequire(acc > AccNone); Frag& frag = node.m_frag;
NodeHandlePtr tmpPtr; TupLoc loc = frag.m_freeLoc;
// use the pre-allocated node frag.m_freeLoc = NullTupLoc;
tmpPtr.i = frag.m_nodeFree; selectNode(signal, node, loc, acc);
frag.m_nodeFree = RNIL; new (node.m_node) TreeNode();
c_nodeHandlePool.getPtr(tmpPtr); #ifdef VM_TRACE
// move it to the cache TreeHead& tree = frag.m_tree;
tmpPtr.p->m_next = frag.m_nodeList; memset(tree.getPref(node.m_node, 0), 0xa2, tree.m_prefSize << 2);
frag.m_nodeList = tmpPtr.i; memset(tree.getPref(node.m_node, 1), 0xa2, tree.m_prefSize << 2);
tmpPtr.p->m_acc = acc; TreeEnt* entList = tree.getEntList(node.m_node);
nodePtr = tmpPtr; memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
} }
/* /*
* Delete existing node. * Delete existing node.
*/ */
void void
Dbtux::deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr) Dbtux::deleteNode(Signal* signal, NodeHandle& node)
{ {
NodeHandlePtr tmpPtr = nodePtr; Frag& frag = node.m_frag;
ndbrequire(tmpPtr.p->getOccup() == 0); ndbrequire(node.getOccup() == 0);
Uint32 pageId = tmpPtr.p->m_loc.m_pageId; TupLoc loc = node.m_loc;
Uint32 pageOffset = tmpPtr.p->m_loc.m_pageOffset; Uint32 pageId = loc.m_pageId;
Uint32* node32 = reinterpret_cast<Uint32*>(tmpPtr.p->m_node); Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
// invalidate handle and storage // invalidate handle and storage
tmpPtr.p->m_loc = NullTupLoc; node.m_loc = NullTupLoc;
tmpPtr.p->m_node = 0; node.m_node = 0;
// scans have already been moved by nodePopDown or nodePopUp
}
/*
* Access more of the node.
*/
void
Dbtux::accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc)
{
NodeHandlePtr tmpPtr = nodePtr;
ndbrequire(tmpPtr.p->m_loc != NullTupLoc && tmpPtr.p->m_node != 0);
if (tmpPtr.p->m_acc >= acc)
return;
// XXX could do prefetch
tmpPtr.p->m_acc = acc;
} }
/* /*
...@@ -201,25 +134,6 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i) ...@@ -201,25 +134,6 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
copyAttrs(pref, readPar.m_data, copyPar); copyAttrs(pref, readPar.m_data, copyPar);
} }
/*
* Commit and release nodes at the end of an operation. Used also on
* error since no changes have been made (updateOk false).
*/
void
Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
{
NodeHandlePtr nodePtr;
nodePtr.i = frag.m_nodeList;
frag.m_nodeList = RNIL;
while (nodePtr.i != RNIL) {
c_nodeHandlePool.getPtr(nodePtr);
// release
NodeHandlePtr tmpPtr = nodePtr;
nodePtr.i = nodePtr.p->m_next;
c_nodeHandlePool.release(tmpPtr);
}
}
// node operations // node operations
/* /*
......
...@@ -42,7 +42,6 @@ Dbtux::execACC_SCANREQ(Signal* signal) ...@@ -42,7 +42,6 @@ Dbtux::execACC_SCANREQ(Signal* signal)
} }
ndbrequire(fragPtr.i != RNIL); ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p; Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
// must be normal DIH/TC fragment // must be normal DIH/TC fragment
ndbrequire(frag.m_fragId < (1 << frag.m_fragOff)); ndbrequire(frag.m_fragId < (1 << frag.m_fragOff));
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -241,7 +240,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -241,7 +240,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl; debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
ndbrequire(frag.m_nodeList == RNIL);
// handle unlock previous and close scan // handle unlock previous and close scan
switch (req->scanFlag) { switch (req->scanFlag) {
case NextScanReq::ZSCAN_NEXT: case NextScanReq::ZSCAN_NEXT:
...@@ -278,9 +276,9 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -278,9 +276,9 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
if (scan.m_scanPos.m_loc != NullTupLoc) { if (scan.m_scanPos.m_loc != NullTupLoc) {
jam(); jam();
const TupLoc loc = scan.m_scanPos.m_loc; const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandlePtr nodePtr; NodeHandle node(frag);
selectNode(signal, frag, nodePtr, loc, AccHead); selectNode(signal, node, loc, AccHead);
unlinkScan(*nodePtr.p, scanPtr); unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc; scan.m_scanPos.m_loc = NullTupLoc;
} }
if (scan.m_lockwait) { if (scan.m_lockwait) {
...@@ -295,7 +293,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -295,7 +293,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jamEntry(); jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success); ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting; scan.m_state = ScanOp::Aborting;
commitNodes(signal, frag, true);
return; return;
} }
if (scan.m_state == ScanOp::Locked) { if (scan.m_state == ScanOp::Locked) {
...@@ -350,7 +347,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -350,7 +347,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true; signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry(); jamEntry();
commitNodes(signal, frag, true);
return; // stop return; // stop
} }
if (scan.m_lockwait) { if (scan.m_lockwait) {
...@@ -365,7 +361,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -365,7 +361,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
// if TC has ordered scan close, it will be detected here // if TC has ordered scan close, it will be detected here
sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF, sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB); signal, signalLength, JBB);
commitNodes(signal, frag, true);
return; // stop return; // stop
} }
if (scan.m_state == ScanOp::First) { if (scan.m_state == ScanOp::First) {
...@@ -444,7 +439,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -444,7 +439,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true; signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry(); jamEntry();
commitNodes(signal, frag, true);
return; // stop return; // stop
break; break;
case AccLockReq::Refused: case AccLockReq::Refused:
...@@ -457,7 +451,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -457,7 +451,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true; signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry(); jamEntry();
commitNodes(signal, frag, true);
return; // stop return; // stop
break; break;
case AccLockReq::NoFreeOp: case AccLockReq::NoFreeOp:
...@@ -470,7 +463,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -470,7 +463,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true; signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry(); jamEntry();
commitNodes(signal, frag, true);
return; // stop return; // stop
break; break;
default: default:
...@@ -554,7 +546,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -554,7 +546,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
scan.m_lastEnt = ent; scan.m_lastEnt = ent;
// next time look for next entry // next time look for next entry
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
commitNodes(signal, frag, true);
return; return;
} }
// XXX in ACC this is checked before req->checkLcpStop // XXX in ACC this is checked before req->checkLcpStop
...@@ -568,7 +559,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -568,7 +559,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
unsigned signalLength = 3; unsigned signalLength = 3;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB); signal, signalLength, JBB);
commitNodes(signal, frag, true);
return; return;
} }
ndbrequire(false); ndbrequire(false);
...@@ -707,7 +697,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -707,7 +697,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
} }
TreePos pos; TreePos pos;
pos.m_loc = tree.m_root; pos.m_loc = tree.m_root;
NodeHandlePtr nodePtr; NodeHandle node(frag);
// unpack lower bound // unpack lower bound
const ScanBound& bound = *scan.m_bound[0]; const ScanBound& bound = *scan.m_bound[0];
ScanBoundIterator iter; ScanBoundIterator iter;
...@@ -724,20 +714,20 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -724,20 +714,20 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
boundPar.m_dir = 0; boundPar.m_dir = 0;
loop: { loop: {
jam(); jam();
selectNode(signal, frag, nodePtr, pos.m_loc, AccPref); selectNode(signal, node, pos.m_loc, AccPref);
const unsigned occup = nodePtr.p->getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup != 0); ndbrequire(occup != 0);
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
jam(); jam();
// compare prefix // compare prefix
boundPar.m_data2 = nodePtr.p->getPref(i); boundPar.m_data2 = node.getPref(i);
boundPar.m_len2 = tree.m_prefSize; boundPar.m_len2 = tree.m_prefSize;
int ret = cmpScanBound(frag, boundPar); int ret = cmpScanBound(frag, boundPar);
if (ret == NdbSqlUtil::CmpUnknown) { if (ret == NdbSqlUtil::CmpUnknown) {
jam(); jam();
// read full value // read full value
ReadPar readPar; ReadPar readPar;
readPar.m_ent = nodePtr.p->getMinMax(i); readPar.m_ent = node.getMinMax(i);
readPar.m_first = 0; readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs; readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data readPar.m_data = 0; // leave in signal data
...@@ -750,7 +740,7 @@ loop: { ...@@ -750,7 +740,7 @@ loop: {
} }
if (i == 0 && ret < 0) { if (i == 0 && ret < 0) {
jam(); jam();
const TupLoc loc = nodePtr.p->getLink(i); const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
// continue to left subtree // continue to left subtree
...@@ -763,12 +753,12 @@ loop: { ...@@ -763,12 +753,12 @@ loop: {
pos.m_dir = 3; pos.m_dir = 3;
scan.m_scanPos = pos; scan.m_scanPos = pos;
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
linkScan(*nodePtr.p, scanPtr); linkScan(node, scanPtr);
return; return;
} }
if (i == 1 && ret > 0) { if (i == 1 && ret > 0) {
jam(); jam();
const TupLoc loc = nodePtr.p->getLink(i); const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
// continue to right subtree // continue to right subtree
...@@ -779,18 +769,18 @@ loop: { ...@@ -779,18 +769,18 @@ loop: {
pos.m_dir = 1; pos.m_dir = 1;
scan.m_scanPos = pos; scan.m_scanPos = pos;
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
linkScan(*nodePtr.p, scanPtr); linkScan(node, scanPtr);
return; return;
} }
} }
// read rest of current node // read rest of current node
accessNode(signal, frag, nodePtr, AccFull); accessNode(signal, node, AccFull);
// look for first entry // look for first entry
ndbrequire(occup >= 2); ndbrequire(occup >= 2);
for (unsigned j = 1; j < occup; j++) { for (unsigned j = 1; j < occup; j++) {
jam(); jam();
ReadPar readPar; ReadPar readPar;
readPar.m_ent = nodePtr.p->getEnt(j); readPar.m_ent = node.getEnt(j);
readPar.m_first = 0; readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs; readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data readPar.m_data = 0; // leave in signal data
...@@ -808,7 +798,7 @@ loop: { ...@@ -808,7 +798,7 @@ loop: {
pos.m_dir = 3; pos.m_dir = 3;
scan.m_scanPos = pos; scan.m_scanPos = pos;
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
linkScan(*nodePtr.p, scanPtr); linkScan(node, scanPtr);
return; return;
} }
} }
...@@ -868,11 +858,11 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -868,11 +858,11 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
// use copy of position // use copy of position
TreePos pos = scan.m_scanPos; TreePos pos = scan.m_scanPos;
// get and remember original node // get and remember original node
NodeHandlePtr origNodePtr; NodeHandle origNode(frag);
selectNode(signal, frag, origNodePtr, pos.m_loc, AccHead); selectNode(signal, origNode, pos.m_loc, AccHead);
ndbrequire(islinkScan(*origNodePtr.p, scanPtr)); ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop // current node in loop
NodeHandlePtr nodePtr = origNodePtr; NodeHandle node = origNode;
while (true) { while (true) {
jam(); jam();
if (pos.m_dir == 2) { if (pos.m_dir == 2) {
...@@ -882,14 +872,14 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -882,14 +872,14 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Last; scan.m_state = ScanOp::Last;
break; break;
} }
if (nodePtr.p->m_loc != pos.m_loc) { if (node.m_loc != pos.m_loc) {
jam(); jam();
selectNode(signal, frag, nodePtr, pos.m_loc, AccHead); selectNode(signal, node, pos.m_loc, AccHead);
} }
if (pos.m_dir == 4) { if (pos.m_dir == 4) {
// coming down from parent proceed to left child // coming down from parent proceed to left child
jam(); jam();
TupLoc loc = nodePtr.p->getLink(0); TupLoc loc = node.getLink(0);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
pos.m_loc = loc; pos.m_loc = loc;
...@@ -909,10 +899,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -909,10 +899,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pos.m_dir == 3) { if (pos.m_dir == 3) {
// within node // within node
jam(); jam();
unsigned occup = nodePtr.p->getOccup(); unsigned occup = node.getOccup();
ndbrequire(occup >= 1); ndbrequire(occup >= 1);
// access full node // access full node
accessNode(signal, frag, nodePtr, AccFull); accessNode(signal, node, AccFull);
// advance position // advance position
if (! pos.m_match) if (! pos.m_match)
pos.m_match = true; pos.m_match = true;
...@@ -920,7 +910,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -920,7 +910,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
pos.m_pos++; pos.m_pos++;
if (pos.m_pos < occup) { if (pos.m_pos < occup) {
jam(); jam();
pos.m_ent = nodePtr.p->getEnt(pos.m_pos); pos.m_ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged pos.m_dir = 3; // unchanged
// XXX implement prefix optimization // XXX implement prefix optimization
ReadPar readPar; ReadPar readPar;
...@@ -951,7 +941,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -951,7 +941,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break; break;
} }
// after node proceed to right child // after node proceed to right child
TupLoc loc = nodePtr.p->getLink(1); TupLoc loc = node.getLink(1);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
pos.m_loc = loc; pos.m_loc = loc;
...@@ -964,8 +954,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -964,8 +954,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pos.m_dir == 1) { if (pos.m_dir == 1) {
// coming from right child proceed to parent // coming from right child proceed to parent
jam(); jam();
pos.m_loc = nodePtr.p->getLink(2); pos.m_loc = node.getLink(2);
pos.m_dir = nodePtr.p->getSide(); pos.m_dir = node.getSide();
continue; continue;
} }
ndbrequire(false); ndbrequire(false);
...@@ -974,16 +964,16 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -974,16 +964,16 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos; scan.m_scanPos = pos;
// relink // relink
if (scan.m_state == ScanOp::Current) { if (scan.m_state == ScanOp::Current) {
ndbrequire(pos.m_loc == nodePtr.p->m_loc); ndbrequire(pos.m_loc == node.m_loc);
if (origNodePtr.i != nodePtr.i) { if (origNode.m_loc != node.m_loc) {
jam(); jam();
unlinkScan(*origNodePtr.p, scanPtr); unlinkScan(origNode, scanPtr);
linkScan(*nodePtr.p, scanPtr); linkScan(node, scanPtr);
} }
} else if (scan.m_state == ScanOp::Last) { } else if (scan.m_state == ScanOp::Last) {
jam(); jam();
ndbrequire(pos.m_loc == NullTupLoc); ndbrequire(pos.m_loc == NullTupLoc);
unlinkScan(*origNodePtr.p, scanPtr); unlinkScan(origNode, scanPtr);
} else { } else {
ndbrequire(false); ndbrequire(false);
} }
...@@ -1043,7 +1033,6 @@ void ...@@ -1043,7 +1033,6 @@ void
Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL); ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
// unlock all not unlocked by LQH // unlock all not unlocked by LQH
for (unsigned i = 0; i < MaxAccLockOps; i++) { for (unsigned i = 0; i < MaxAccLockOps; i++) {
...@@ -1068,7 +1057,6 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) ...@@ -1068,7 +1057,6 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB); signal, signalLength, JBB);
releaseScanOp(scanPtr); releaseScanOp(scanPtr);
commitNodes(signal, frag, true);
} }
void void
......
...@@ -31,7 +31,6 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree ...@@ -31,7 +31,6 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
treePos.m_loc = tree.m_root; treePos.m_loc = tree.m_root;
NodeHandlePtr nodePtr;
if (treePos.m_loc == NullTupLoc) { if (treePos.m_loc == NullTupLoc) {
// empty tree // empty tree
jam(); jam();
...@@ -39,10 +38,11 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree ...@@ -39,10 +38,11 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree
treePos.m_match = false; treePos.m_match = false;
return; return;
} }
NodeHandle node(frag);
loop: { loop: {
jam(); jam();
selectNode(signal, frag, nodePtr, treePos.m_loc, AccPref); selectNode(signal, node, treePos.m_loc, AccPref);
const unsigned occup = nodePtr.p->getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup != 0); ndbrequire(occup != 0);
// number of equal initial attributes in bounding node // number of equal initial attributes in bounding node
unsigned numEq = ZNIL; unsigned numEq = ZNIL;
...@@ -51,7 +51,7 @@ loop: { ...@@ -51,7 +51,7 @@ loop: {
// compare prefix // compare prefix
CmpPar cmpPar; CmpPar cmpPar;
cmpPar.m_data1 = searchPar.m_data; cmpPar.m_data1 = searchPar.m_data;
cmpPar.m_data2 = nodePtr.p->getPref(i); cmpPar.m_data2 = node.getPref(i);
cmpPar.m_len2 = tree.m_prefSize; cmpPar.m_len2 = tree.m_prefSize;
cmpPar.m_first = 0; cmpPar.m_first = 0;
cmpPar.m_numEq = 0; cmpPar.m_numEq = 0;
...@@ -60,7 +60,7 @@ loop: { ...@@ -60,7 +60,7 @@ loop: {
jam(); jam();
// read full value // read full value
ReadPar readPar; ReadPar readPar;
readPar.m_ent = nodePtr.p->getMinMax(i); readPar.m_ent = node.getMinMax(i);
ndbrequire(cmpPar.m_numEq < numAttrs); ndbrequire(cmpPar.m_numEq < numAttrs);
readPar.m_first = cmpPar.m_numEq; readPar.m_first = cmpPar.m_numEq;
readPar.m_count = numAttrs - cmpPar.m_numEq; readPar.m_count = numAttrs - cmpPar.m_numEq;
...@@ -78,11 +78,11 @@ loop: { ...@@ -78,11 +78,11 @@ loop: {
if (ret == 0) { if (ret == 0) {
jam(); jam();
// keys are equal, compare entry values // keys are equal, compare entry values
ret = searchPar.m_ent.cmp(nodePtr.p->getMinMax(i)); ret = searchPar.m_ent.cmp(node.getMinMax(i));
} }
if (i == 0 ? (ret < 0) : (ret > 0)) { if (i == 0 ? (ret < 0) : (ret > 0)) {
jam(); jam();
const TupLoc loc = nodePtr.p->getLink(i); const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
// continue to left/right subtree // continue to left/right subtree
...@@ -102,8 +102,8 @@ loop: { ...@@ -102,8 +102,8 @@ loop: {
return; return;
} }
} }
// read rest of the bounding node // access rest of the bounding node
accessNode(signal, frag, nodePtr, AccFull); accessNode(signal, node, AccFull);
// position is strictly within the node // position is strictly within the node
ndbrequire(occup >= 2); ndbrequire(occup >= 2);
const unsigned numWithin = occup - 2; const unsigned numWithin = occup - 2;
...@@ -114,7 +114,7 @@ loop: { ...@@ -114,7 +114,7 @@ loop: {
if (numEq < numAttrs) { if (numEq < numAttrs) {
jam(); jam();
ReadPar readPar; ReadPar readPar;
readPar.m_ent = nodePtr.p->getEnt(j); readPar.m_ent = node.getEnt(j);
readPar.m_first = numEq; readPar.m_first = numEq;
readPar.m_count = numAttrs - numEq; readPar.m_count = numAttrs - numEq;
readPar.m_data = 0; // leave in signal data readPar.m_data = 0; // leave in signal data
...@@ -131,7 +131,7 @@ loop: { ...@@ -131,7 +131,7 @@ loop: {
if (ret == 0) { if (ret == 0) {
jam(); jam();
// keys are equal, compare entry values // keys are equal, compare entry values
ret = searchPar.m_ent.cmp(nodePtr.p->getEnt(j)); ret = searchPar.m_ent.cmp(node.getEnt(j));
} }
if (ret <= 0) { if (ret <= 0) {
jam(); jam();
...@@ -156,31 +156,31 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -156,31 +156,31 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos; unsigned pos = treePos.m_pos;
NodeHandlePtr nodePtr; NodeHandle node(frag);
// check for empty tree // check for empty tree
if (treePos.m_loc == NullTupLoc) { if (treePos.m_loc == NullTupLoc) {
jam(); jam();
insertNode(signal, frag, nodePtr, AccPref); insertNode(signal, node, AccPref);
nodePushUp(signal, *nodePtr.p, 0, ent); nodePushUp(signal, node, 0, ent);
nodePtr.p->setSide(2); node.setSide(2);
tree.m_root = nodePtr.p->m_loc; tree.m_root = node.m_loc;
return; return;
} }
// access full node // access full node
selectNode(signal, frag, nodePtr, treePos.m_loc, AccFull); selectNode(signal, node, treePos.m_loc, AccFull);
// check if it is bounding node // check if it is bounding node
if (pos != 0 && pos != nodePtr.p->getOccup()) { if (pos != 0 && pos != node.getOccup()) {
jam(); jam();
// check if room for one more // check if room for one more
if (nodePtr.p->getOccup() < tree.m_maxOccup) { if (node.getOccup() < tree.m_maxOccup) {
jam(); jam();
nodePushUp(signal, *nodePtr.p, pos, ent); nodePushUp(signal, node, pos, ent);
return; return;
} }
// returns min entry // returns min entry
nodePushDown(signal, *nodePtr.p, pos - 1, ent); nodePushDown(signal, node, pos - 1, ent);
// find position to add the removed min entry // find position to add the removed min entry
TupLoc childLoc = nodePtr.p->getLink(0); TupLoc childLoc = node.getLink(0);
if (childLoc == NullTupLoc) { if (childLoc == NullTupLoc) {
jam(); jam();
// left child will be added // left child will be added
...@@ -190,60 +190,60 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -190,60 +190,60 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// find glb node // find glb node
while (childLoc != NullTupLoc) { while (childLoc != NullTupLoc) {
jam(); jam();
selectNode(signal, frag, nodePtr, childLoc, AccHead); selectNode(signal, node, childLoc, AccHead);
childLoc = nodePtr.p->getLink(1); childLoc = node.getLink(1);
} }
// access full node again // access full node again
accessNode(signal, frag, nodePtr, AccFull); accessNode(signal, node, AccFull);
pos = nodePtr.p->getOccup(); pos = node.getOccup();
} }
// fall thru to next case // fall thru to next case
} }
// adding new min or max // adding new min or max
unsigned i = (pos == 0 ? 0 : 1); unsigned i = (pos == 0 ? 0 : 1);
ndbrequire(nodePtr.p->getLink(i) == NullTupLoc); ndbrequire(node.getLink(i) == NullTupLoc);
// check if the half-leaf/leaf has room for one more // check if the half-leaf/leaf has room for one more
if (nodePtr.p->getOccup() < tree.m_maxOccup) { if (node.getOccup() < tree.m_maxOccup) {
jam(); jam();
nodePushUp(signal, *nodePtr.p, pos, ent); nodePushUp(signal, node, pos, ent);
return; return;
} }
// add a new node // add a new node
NodeHandlePtr childPtr; NodeHandle childNode(frag);
insertNode(signal, frag, childPtr, AccPref); insertNode(signal, childNode, AccPref);
nodePushUp(signal, *childPtr.p, 0, ent); nodePushUp(signal, childNode, 0, ent);
// connect parent and child // connect parent and child
nodePtr.p->setLink(i, childPtr.p->m_loc); node.setLink(i, childNode.m_loc);
childPtr.p->setLink(2, nodePtr.p->m_loc); childNode.setLink(2, node.m_loc);
childPtr.p->setSide(i); childNode.setSide(i);
// re-balance tree at each node // re-balance tree at each node
while (true) { while (true) {
// height of subtree i has increased by 1 // height of subtree i has increased by 1
int j = (i == 0 ? -1 : +1); int j = (i == 0 ? -1 : +1);
int b = nodePtr.p->getBalance(); int b = node.getBalance();
if (b == 0) { if (b == 0) {
// perfectly balanced // perfectly balanced
jam(); jam();
nodePtr.p->setBalance(j); node.setBalance(j);
// height change propagates up // height change propagates up
} else if (b == -j) { } else if (b == -j) {
// height of shorter subtree increased // height of shorter subtree increased
jam(); jam();
nodePtr.p->setBalance(0); node.setBalance(0);
// height of tree did not change - done // height of tree did not change - done
break; break;
} else if (b == j) { } else if (b == j) {
// height of longer subtree increased // height of longer subtree increased
jam(); jam();
NodeHandlePtr childPtr; NodeHandle childNode(frag);
selectNode(signal, frag, childPtr, nodePtr.p->getLink(i), AccHead); selectNode(signal, childNode, node.getLink(i), AccHead);
int b2 = childPtr.p->getBalance(); int b2 = childNode.getBalance();
if (b2 == b) { if (b2 == b) {
jam(); jam();
treeRotateSingle(signal, frag, nodePtr, i); treeRotateSingle(signal, frag, node, i);
} else if (b2 == -b) { } else if (b2 == -b) {
jam(); jam();
treeRotateDouble(signal, frag, nodePtr, i); treeRotateDouble(signal, frag, node, i);
} else { } else {
// height of subtree increased so it cannot be perfectly balanced // height of subtree increased so it cannot be perfectly balanced
ndbrequire(false); ndbrequire(false);
...@@ -253,14 +253,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -253,14 +253,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
} else { } else {
ndbrequire(false); ndbrequire(false);
} }
TupLoc parentLoc = nodePtr.p->getLink(2); TupLoc parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) { if (parentLoc == NullTupLoc) {
jam(); jam();
// root node - done // root node - done
break; break;
} }
i = nodePtr.p->getSide(); i = node.getSide();
selectNode(signal, frag, nodePtr, parentLoc, AccHead); selectNode(signal, node, parentLoc, AccHead);
} }
} }
...@@ -272,101 +272,101 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -272,101 +272,101 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos; unsigned pos = treePos.m_pos;
NodeHandlePtr nodePtr; NodeHandle node(frag);
// access full node // access full node
selectNode(signal, frag, nodePtr, treePos.m_loc, AccFull); selectNode(signal, node, treePos.m_loc, AccFull);
TreeEnt ent; TreeEnt ent;
// check interior node first // check interior node first
if (nodePtr.p->getChilds() == 2) { if (node.getChilds() == 2) {
jam(); jam();
ndbrequire(nodePtr.p->getOccup() >= tree.m_minOccup); ndbrequire(node.getOccup() >= tree.m_minOccup);
// check if no underflow // check if no underflow
if (nodePtr.p->getOccup() > tree.m_minOccup) { if (node.getOccup() > tree.m_minOccup) {
jam(); jam();
nodePopDown(signal, *nodePtr.p, pos, ent); nodePopDown(signal, node, pos, ent);
return; return;
} }
// save current handle // save current handle
NodeHandlePtr parentPtr = nodePtr; NodeHandle parentNode = node;
// find glb node // find glb node
TupLoc childLoc = nodePtr.p->getLink(0); TupLoc childLoc = node.getLink(0);
while (childLoc != NullTupLoc) { while (childLoc != NullTupLoc) {
jam(); jam();
selectNode(signal, frag, nodePtr, childLoc, AccHead); selectNode(signal, node, childLoc, AccHead);
childLoc = nodePtr.p->getLink(1); childLoc = node.getLink(1);
} }
// access full node again // access full node again
accessNode(signal, frag, nodePtr, AccFull); accessNode(signal, node, AccFull);
// use glb max as new parent min // use glb max as new parent min
ent = nodePtr.p->getEnt(nodePtr.p->getOccup() - 1); ent = node.getEnt(node.getOccup() - 1);
nodePopUp(signal, *parentPtr.p, pos, ent); nodePopUp(signal, parentNode, pos, ent);
// set up to remove glb max // set up to remove glb max
pos = nodePtr.p->getOccup() - 1; pos = node.getOccup() - 1;
// fall thru to next case // fall thru to next case
} }
// remove the element // remove the element
nodePopDown(signal, *nodePtr.p, pos, ent); nodePopDown(signal, node, pos, ent);
ndbrequire(nodePtr.p->getChilds() <= 1); ndbrequire(node.getChilds() <= 1);
// handle half-leaf // handle half-leaf
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
jam(); jam();
TupLoc childLoc = nodePtr.p->getLink(i); TupLoc childLoc = node.getLink(i);
if (childLoc != NullTupLoc) { if (childLoc != NullTupLoc) {
// move to child // move to child
selectNode(signal, frag, nodePtr, childLoc, AccFull); selectNode(signal, node, childLoc, AccFull);
// balance of half-leaf parent requires child to be leaf // balance of half-leaf parent requires child to be leaf
break; break;
} }
} }
ndbrequire(nodePtr.p->getChilds() == 0); ndbrequire(node.getChilds() == 0);
// get parent if any // get parent if any
TupLoc parentLoc = nodePtr.p->getLink(2); TupLoc parentLoc = node.getLink(2);
NodeHandlePtr parentPtr; NodeHandle parentNode(frag);
unsigned i = nodePtr.p->getSide(); unsigned i = node.getSide();
// move all that fits into parent // move all that fits into parent
if (parentLoc != NullTupLoc) { if (parentLoc != NullTupLoc) {
jam(); jam();
selectNode(signal, frag, parentPtr, nodePtr.p->getLink(2), AccFull); selectNode(signal, parentNode, node.getLink(2), AccFull);
nodeSlide(signal, *parentPtr.p, *nodePtr.p, i); nodeSlide(signal, parentNode, node, i);
// fall thru to next case // fall thru to next case
} }
// non-empty leaf // non-empty leaf
if (nodePtr.p->getOccup() >= 1) { if (node.getOccup() >= 1) {
jam(); jam();
return; return;
} }
// remove empty leaf // remove empty leaf
deleteNode(signal, frag, nodePtr); deleteNode(signal, node);
if (parentLoc == NullTupLoc) { if (parentLoc == NullTupLoc) {
jam(); jam();
// tree is now empty // tree is now empty
tree.m_root = NullTupLoc; tree.m_root = NullTupLoc;
return; return;
} }
nodePtr = parentPtr; node = parentNode;
nodePtr.p->setLink(i, NullTupLoc); node.setLink(i, NullTupLoc);
#ifdef dbtux_min_occup_less_max_occup #ifdef dbtux_min_occup_less_max_occup
// check if we created a half-leaf // check if we created a half-leaf
if (nodePtr.p->getBalance() == 0) { if (node.getBalance() == 0) {
jam(); jam();
// move entries from the other child // move entries from the other child
TupLoc childLoc = nodePtr.p->getLink(1 - i); TupLoc childLoc = node.getLink(1 - i);
NodeHandlePtr childPtr; NodeHandle childNode(frag);
selectNode(signal, frag, childPtr, childLoc, AccFull); selectNode(signal, childNode, childLoc, AccFull);
nodeSlide(signal, *nodePtr.p, *childPtr.i, 1 - i); nodeSlide(signal, node, childNode, 1 - i);
if (childPtr.p->getOccup() == 0) { if (childNode.getOccup() == 0) {
jam(); jam();
deleteNode(signal, frag, childPtr); deleteNode(signal, childNode);
nodePtr.p->setLink(1 - i, NullTupLoc); node.setLink(1 - i, NullTupLoc);
// we are balanced again but our parent balance changes by -1 // we are balanced again but our parent balance changes by -1
parentLoc = nodePtr.p->getLink(2); parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) { if (parentLoc == NullTupLoc) {
jam(); jam();
return; return;
} }
// fix side and become parent // fix side and become parent
i = nodePtr.p->getSide(); i = node.getSide();
selectNode(signal, frag, nodePtr, parentLoc, AccHead); selectNode(signal, node, parentLoc, AccHead);
} }
} }
#endif #endif
...@@ -374,50 +374,50 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -374,50 +374,50 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
while (true) { while (true) {
// height of subtree i has decreased by 1 // height of subtree i has decreased by 1
int j = (i == 0 ? -1 : +1); int j = (i == 0 ? -1 : +1);
int b = nodePtr.p->getBalance(); int b = node.getBalance();
if (b == 0) { if (b == 0) {
// perfectly balanced // perfectly balanced
jam(); jam();
nodePtr.p->setBalance(-j); node.setBalance(-j);
// height of tree did not change - done // height of tree did not change - done
return; return;
} else if (b == j) { } else if (b == j) {
// height of longer subtree has decreased // height of longer subtree has decreased
jam(); jam();
nodePtr.p->setBalance(0); node.setBalance(0);
// height change propagates up // height change propagates up
} else if (b == -j) { } else if (b == -j) {
// height of shorter subtree has decreased // height of shorter subtree has decreased
jam(); jam();
NodeHandlePtr childPtr;
// child on the other side // child on the other side
selectNode(signal, frag, childPtr, nodePtr.p->getLink(1 - i), AccHead); NodeHandle childNode(frag);
int b2 = childPtr.p->getBalance(); selectNode(signal, childNode, node.getLink(1 - i), AccHead);
int b2 = childNode.getBalance();
if (b2 == b) { if (b2 == b) {
jam(); jam();
treeRotateSingle(signal, frag, nodePtr, 1 - i); treeRotateSingle(signal, frag, node, 1 - i);
// height of tree decreased and propagates up // height of tree decreased and propagates up
} else if (b2 == -b) { } else if (b2 == -b) {
jam(); jam();
treeRotateDouble(signal, frag, nodePtr, 1 - i); treeRotateDouble(signal, frag, node, 1 - i);
// height of tree decreased and propagates up // height of tree decreased and propagates up
} else { } else {
jam(); jam();
treeRotateSingle(signal, frag, nodePtr, 1 - i); treeRotateSingle(signal, frag, node, 1 - i);
// height of tree did not change - done // height of tree did not change - done
return; return;
} }
} else { } else {
ndbrequire(false); ndbrequire(false);
} }
TupLoc parentLoc = nodePtr.p->getLink(2); TupLoc parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) { if (parentLoc == NullTupLoc) {
jam(); jam();
// root node - done // root node - done
return; return;
} }
i = nodePtr.p->getSide(); i = node.getSide();
selectNode(signal, frag, nodePtr, parentLoc, AccHead); selectNode(signal, node, parentLoc, AccHead);
} }
} }
...@@ -440,55 +440,55 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -440,55 +440,55 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
void void
Dbtux::treeRotateSingle(Signal* signal, Dbtux::treeRotateSingle(Signal* signal,
Frag& frag, Frag& frag,
NodeHandlePtr& nodePtr, NodeHandle& node,
unsigned i) unsigned i)
{ {
ndbrequire(i <= 1); ndbrequire(i <= 1);
/* /*
5 is the old top node that have been unbalanced due to an insert or 5 is the old top node that have been unbalanced due to an insert or
delete. The balance is still the old balance before the update. delete. The balance is still the old balance before the update.
Verify that n5Bal is 1 if RR rotate and -1 if LL rotate. Verify that bal5 is 1 if RR rotate and -1 if LL rotate.
*/ */
NodeHandlePtr n5Ptr = nodePtr; NodeHandle node5 = node;
const TupLoc n5Loc = n5Ptr.p->m_loc; const TupLoc loc5 = node5.m_loc;
const int n5Bal = n5Ptr.p->getBalance(); const int bal5 = node5.getBalance();
const int n5side = n5Ptr.p->getSide(); const int side5 = node5.getSide();
ndbrequire(n5Bal + (1 - i) == i); ndbrequire(bal5 + (1 - i) == i);
/* /*
3 is the new root of this part of the tree which is to swap place with 3 is the new root of this part of the tree which is to swap place with
node 5. For an insert to cause this it must have the same balance as 5. node 5. For an insert to cause this it must have the same balance as 5.
For deletes it can have the balance 0. For deletes it can have the balance 0.
*/ */
TupLoc n3Loc = n5Ptr.p->getLink(i); TupLoc loc3 = node5.getLink(i);
NodeHandlePtr n3Ptr; NodeHandle node3(frag);
selectNode(signal, frag, n3Ptr, n3Loc, AccHead); selectNode(signal, node3, loc3, AccHead);
const int n3Bal = n3Ptr.p->getBalance(); const int bal3 = node3.getBalance();
/* /*
2 must always be there but is not changed. Thus we mereley check that it 2 must always be there but is not changed. Thus we mereley check that it
exists. exists.
*/ */
ndbrequire(n3Ptr.p->getLink(i) != NullTupLoc); ndbrequire(node3.getLink(i) != NullTupLoc);
/* /*
4 is not necessarily there but if it is there it will move from one 4 is not necessarily there but if it is there it will move from one
side of 3 to the other side of 5. For LL it moves from the right side side of 3 to the other side of 5. For LL it moves from the right side
to the left side and for RR it moves from the left side to the right to the left side and for RR it moves from the left side to the right
side. This means that it also changes parent from 3 to 5. side. This means that it also changes parent from 3 to 5.
*/ */
TupLoc n4Loc = n3Ptr.p->getLink(1 - i); TupLoc loc4 = node3.getLink(1 - i);
NodeHandlePtr n4Ptr; NodeHandle node4(frag);
if (n4Loc != NullTupLoc) { if (loc4 != NullTupLoc) {
jam(); jam();
selectNode(signal, frag, n4Ptr, n4Loc, AccHead); selectNode(signal, node4, loc4, AccHead);
ndbrequire(n4Ptr.p->getSide() == (1 - i) && ndbrequire(node4.getSide() == (1 - i) &&
n4Ptr.p->getLink(2) == n3Loc); node4.getLink(2) == loc3);
n4Ptr.p->setSide(i); node4.setSide(i);
n4Ptr.p->setLink(2, n5Loc); node4.setLink(2, loc5);
}//if }//if
/* /*
Retrieve the address of 5's parent before it is destroyed Retrieve the address of 5's parent before it is destroyed
*/ */
TupLoc n0Loc = n5Ptr.p->getLink(2); TupLoc loc0 = node5.getLink(2);
/* /*
The next step is to perform the rotation. 3 will inherit 5's parent The next step is to perform the rotation. 3 will inherit 5's parent
...@@ -502,22 +502,22 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -502,22 +502,22 @@ Dbtux::treeRotateSingle(Signal* signal,
1. 3 must have had 5 as parent before the change. 1. 3 must have had 5 as parent before the change.
2. 3's side is left for LL and right for RR before change. 2. 3's side is left for LL and right for RR before change.
*/ */
ndbrequire(n3Ptr.p->getLink(2) == n5Loc); ndbrequire(node3.getLink(2) == loc5);
ndbrequire(n3Ptr.p->getSide() == i); ndbrequire(node3.getSide() == i);
n3Ptr.p->setLink(1 - i, n5Loc); node3.setLink(1 - i, loc5);
n3Ptr.p->setLink(2, n0Loc); node3.setLink(2, loc0);
n3Ptr.p->setSide(n5side); node3.setSide(side5);
n5Ptr.p->setLink(i, n4Loc); node5.setLink(i, loc4);
n5Ptr.p->setLink(2, n3Loc); node5.setLink(2, loc3);
n5Ptr.p->setSide(1 - i); node5.setSide(1 - i);
if (n0Loc != NullTupLoc) { if (loc0 != NullTupLoc) {
jam(); jam();
NodeHandlePtr n0Ptr; NodeHandle node0(frag);
selectNode(signal, frag, n0Ptr, n0Loc, AccHead); selectNode(signal, node0, loc0, AccHead);
n0Ptr.p->setLink(n5side, n3Loc); node0.setLink(side5, loc3);
} else { } else {
jam(); jam();
frag.m_tree.m_root = n3Loc; frag.m_tree.m_root = loc3;
}//if }//if
/* The final step of the change is to update the balance of 3 and /* The final step of the change is to update the balance of 3 and
5 that changed places. There are two cases here. The first case is 5 that changed places. There are two cases here. The first case is
...@@ -530,22 +530,22 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -530,22 +530,22 @@ Dbtux::treeRotateSingle(Signal* signal,
In this case 5 will change balance but still be unbalanced and 3 will In this case 5 will change balance but still be unbalanced and 3 will
be unbalanced in the opposite direction of 5. be unbalanced in the opposite direction of 5.
*/ */
if (n3Bal == n5Bal) { if (bal3 == bal5) {
jam(); jam();
n3Ptr.p->setBalance(0); node3.setBalance(0);
n5Ptr.p->setBalance(0); node5.setBalance(0);
} else if (n3Bal == 0) { } else if (bal3 == 0) {
jam(); jam();
n3Ptr.p->setBalance(-n5Bal); node3.setBalance(-bal5);
n5Ptr.p->setBalance(n5Bal); node5.setBalance(bal5);
} else { } else {
ndbrequire(false); ndbrequire(false);
}//if }//if
/* /*
Set nodePtr to 3 as return parameter for enabling caller to continue Set node to 3 as return parameter for enabling caller to continue
traversing the tree. traversing the tree.
*/ */
nodePtr = n3Ptr; node = node3;
} }
/* /*
...@@ -650,105 +650,105 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -650,105 +650,105 @@ Dbtux::treeRotateSingle(Signal* signal,
* *
*/ */
void void
Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i) Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i)
{ {
// old top node // old top node
NodeHandlePtr n6Ptr = nodePtr; NodeHandle node6 = node;
const TupLoc n6Loc = n6Ptr.p->m_loc; const TupLoc loc6 = node6.m_loc;
// the un-updated balance // the un-updated balance
const int n6Bal = n6Ptr.p->getBalance(); const int bal6 = node6.getBalance();
const unsigned n6Side = n6Ptr.p->getSide(); const unsigned side6 = node6.getSide();
// level 1 // level 1
TupLoc n2Loc = n6Ptr.p->getLink(i); TupLoc loc2 = node6.getLink(i);
NodeHandlePtr n2Ptr; NodeHandle node2(frag);
selectNode(signal, frag, n2Ptr, n2Loc, AccHead); selectNode(signal, node2, loc2, AccHead);
const int n2Bal = n2Ptr.p->getBalance(); const int bal2 = node2.getBalance();
// level 2 // level 2
TupLoc n4Loc = n2Ptr.p->getLink(1 - i); TupLoc loc4 = node2.getLink(1 - i);
NodeHandlePtr n4Ptr; NodeHandle node4(frag);
selectNode(signal, frag, n4Ptr, n4Loc, AccHead); selectNode(signal, node4, loc4, AccHead);
const int n4Bal = n4Ptr.p->getBalance(); const int bal4 = node4.getBalance();
ndbrequire(i <= 1); ndbrequire(i <= 1);
ndbrequire(n6Bal + (1 - i) == i); ndbrequire(bal6 + (1 - i) == i);
ndbrequire(n2Bal == -n6Bal); ndbrequire(bal2 == -bal6);
ndbrequire(n2Ptr.p->getLink(2) == n6Loc); ndbrequire(node2.getLink(2) == loc6);
ndbrequire(n2Ptr.p->getSide() == i); ndbrequire(node2.getSide() == i);
ndbrequire(n4Ptr.p->getLink(2) == n2Loc); ndbrequire(node4.getLink(2) == loc2);
// level 3 // level 3
TupLoc n3Loc = n4Ptr.p->getLink(i); TupLoc loc3 = node4.getLink(i);
TupLoc n5Loc = n4Ptr.p->getLink(1 - i); TupLoc loc5 = node4.getLink(1 - i);
// fill up leaf before it becomes internal // fill up leaf before it becomes internal
if (n3Loc == NullTupLoc && n5Loc == NullTupLoc) { if (loc3 == NullTupLoc && loc5 == NullTupLoc) {
jam(); jam();
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
accessNode(signal, frag, n2Ptr, AccFull); accessNode(signal, node2, AccFull);
accessNode(signal, frag, n4Ptr, AccFull); accessNode(signal, node4, AccFull);
nodeSlide(signal, *n4Ptr.p, *n2Ptr.p, i); nodeSlide(signal, node4, node2, i);
// implied by rule of merging half-leaves with leaves // implied by rule of merging half-leaves with leaves
ndbrequire(n4Ptr.p->getOccup() >= tree.m_minOccup); ndbrequire(node4.getOccup() >= tree.m_minOccup);
ndbrequire(n2Ptr.p->getOccup() != 0); ndbrequire(node2.getOccup() != 0);
} else { } else {
if (n3Loc != NullTupLoc) { if (loc3 != NullTupLoc) {
jam(); jam();
NodeHandlePtr n3Ptr; NodeHandle node3(frag);
selectNode(signal, frag, n3Ptr, n3Loc, AccHead); selectNode(signal, node3, loc3, AccHead);
n3Ptr.p->setLink(2, n2Loc); node3.setLink(2, loc2);
n3Ptr.p->setSide(1 - i); node3.setSide(1 - i);
} }
if (n5Loc != NullTupLoc) { if (loc5 != NullTupLoc) {
jam(); jam();
NodeHandlePtr n5Ptr; NodeHandle node5(frag);
selectNode(signal, frag, n5Ptr, n5Loc, AccHead); selectNode(signal, node5, loc5, AccHead);
n5Ptr.p->setLink(2, n6Ptr.p->m_loc); node5.setLink(2, node6.m_loc);
n5Ptr.p->setSide(i); node5.setSide(i);
} }
} }
// parent // parent
TupLoc n0Loc = n6Ptr.p->getLink(2); TupLoc loc0 = node6.getLink(2);
NodeHandlePtr n0Ptr; NodeHandle node0(frag);
// perform the rotation // perform the rotation
n6Ptr.p->setLink(i, n5Loc); node6.setLink(i, loc5);
n6Ptr.p->setLink(2, n4Loc); node6.setLink(2, loc4);
n6Ptr.p->setSide(1 - i); node6.setSide(1 - i);
n2Ptr.p->setLink(1 - i, n3Loc); node2.setLink(1 - i, loc3);
n2Ptr.p->setLink(2, n4Loc); node2.setLink(2, loc4);
n4Ptr.p->setLink(i, n2Loc); node4.setLink(i, loc2);
n4Ptr.p->setLink(1 - i, n6Loc); node4.setLink(1 - i, loc6);
n4Ptr.p->setLink(2, n0Loc); node4.setLink(2, loc0);
n4Ptr.p->setSide(n6Side); node4.setSide(side6);
if (n0Loc != NullTupLoc) { if (loc0 != NullTupLoc) {
jam(); jam();
selectNode(signal, frag, n0Ptr, n0Loc, AccHead); selectNode(signal, node0, loc0, AccHead);
n0Ptr.p->setLink(n6Side, n4Loc); node0.setLink(side6, loc4);
} else { } else {
jam(); jam();
frag.m_tree.m_root = n4Loc; frag.m_tree.m_root = loc4;
} }
// set balance of changed nodes // set balance of changed nodes
n4Ptr.p->setBalance(0); node4.setBalance(0);
if (n4Bal == 0) { if (bal4 == 0) {
jam(); jam();
n2Ptr.p->setBalance(0); node2.setBalance(0);
n6Ptr.p->setBalance(0); node6.setBalance(0);
} else if (n4Bal == -n2Bal) { } else if (bal4 == -bal2) {
jam(); jam();
n2Ptr.p->setBalance(0); node2.setBalance(0);
n6Ptr.p->setBalance(n2Bal); node6.setBalance(bal2);
} else if (n4Bal == n2Bal) { } else if (bal4 == bal2) {
jam(); jam();
n2Ptr.p->setBalance(-n2Bal); node2.setBalance(-bal2);
n6Ptr.p->setBalance(0); node6.setBalance(0);
} else { } else {
ndbrequire(false); ndbrequire(false);
} }
// new top node // new top node
nodePtr = n4Ptr; node = node4;
} }
...@@ -31,6 +31,7 @@ optim 4 mc02/a 42 ms 80 ms 87 pct ...@@ -31,6 +31,7 @@ optim 4 mc02/a 42 ms 80 ms 87 pct
optim 5 mc02/a 43 ms 77 ms 77 pct optim 5 mc02/a 43 ms 77 ms 77 pct
mc02/b 54 ms 118 ms 117 pct mc02/b 54 ms 118 ms 117 pct
optim 6 mc02/a 42 ms 70 ms 66 pct
mc02/b 53 ms 109 ms 105 pct
vim: set et: vim: set et:
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment