Commit aa6fb646 authored by unknown's avatar unknown

NDB wl-1942 dbtux - move scans correctly at index tree re-org

parent d49b8ac4
...@@ -587,12 +587,19 @@ private: ...@@ -587,12 +587,19 @@ private:
void deleteNode(Signal* signal, NodeHandle& node); void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node); void setNodePref(Signal* signal, NodeHandle& node);
// node operations // node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent); void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos);
void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos);
void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
void nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos);
void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
void nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos);
void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i); void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
// scans linked to node // scans linked to node
void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList);
void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList);
void moveScanList(Signal* signal, NodeHandle& node, unsigned pos);
void linkScan(NodeHandle& node, ScanOpPtr scanPtr); void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr); void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr); bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
......
...@@ -117,18 +117,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node) ...@@ -117,18 +117,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* v * v
* A B C D E _ _ => A B C X D E _ * A B C D E _ _ => A B C X D E _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Add list of scans at the new entry.
*/ */
void void
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent) Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup < tree.m_maxOccup && pos <= occup); ndbrequire(occup < tree.m_maxOccup && pos <= occup);
// fix scans // fix old scans
if (node.getNodeScan() != RNIL)
nodePushUpScans(signal, node, pos);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// add new scans
if (scanList != RNIL)
addScanList(node, pos, scanList);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(signal, node);
}
void
Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
...@@ -144,21 +171,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -144,21 +171,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
scanPos.m_pos++; scanPos.m_pos++;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(signal, node);
} }
/* /*
...@@ -169,42 +182,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -169,42 +182,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
* ^ ^ * ^ ^
* A B C D E F _ => A B C E F _ _ * A B C D E F _ => A B C E F _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Scans at removed entry are returned if non-zero location is passed or
* else moved forward.
*/ */
void void
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr; if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // remove or move scans at this position
scanPtr.i = node.getNodeScan(); if (scanList == 0)
while (scanPtr.i != RNIL) { moveScanList(signal, node, pos);
jam(); else
c_scanOpPool.getPtr(scanPtr); removeScanList(node, pos, *scanList);
TreePos& scanPos = scanPtr.p->m_scanPos; // fix other scans
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); if (node.getNodeScan() != RNIL)
const Uint32 nextPtrI = scanPtr.p->m_nodeScan; nodePopDownScans(signal, node, pos);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popDown pos=" << pos << " " << node << endl;
}
#endif
scanNext(signal, scanPtr);
} }
scanPtr.i = nextPtrI; // fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
jam();
tmpList[i] = tmpList[i + 1];
} }
// fix other scans entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(signal, node);
}
void
Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
// handled before
ndbrequire(scanPos.m_pos != pos); ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos > pos) { if (scanPos.m_pos > pos) {
jam(); jam();
...@@ -217,21 +243,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -217,21 +243,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos--; scanPos.m_pos--;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(signal, node);
} }
/* /*
...@@ -242,43 +254,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -242,43 +254,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
* ^ v ^ * ^ v ^
* A B C D E _ _ => B C D X E _ _ * A B C D E _ _ => B C D X E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Return list of scans at the removed position 0.
*/ */
void void
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr; if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // remove scans at 0
scanPtr.i = node.getNodeScan(); removeScanList(node, 0, scanList);
while (scanPtr.i != RNIL) { // fix other scans
jam(); if (node.getNodeScan() != RNIL)
c_scanOpPool.getPtr(scanPtr); nodePushDownScans(signal, node, pos);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == 0) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pushDown pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext(signal, scanPtr);
} }
scanPtr.i = nextPtrI; // fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
for (unsigned i = 0; i < pos; i++) {
jam();
tmpList[i] = tmpList[i + 1];
} }
// fix other scans tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
}
void
Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
// handled before
ndbrequire(scanPos.m_pos != 0); ndbrequire(scanPos.m_pos != 0);
if (scanPos.m_pos <= pos) { if (scanPos.m_pos <= pos) {
jam(); jam();
...@@ -291,22 +312,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -291,22 +312,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
scanPos.m_pos--; scanPos.m_pos--;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
for (unsigned i = 0; i < pos; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
} }
/* /*
...@@ -318,39 +324,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -318,39 +324,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
* v ^ ^ * v ^ ^
* A B C D E _ _ => X A B C E _ _ * A B C D E _ _ => X A B C E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Move scans at removed entry and add scans at the new entry.
*/ */
void void
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr; if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // move scans whose entry disappears
scanPtr.i = node.getNodeScan(); moveScanList(signal, node, pos);
while (scanPtr.i != RNIL) { // fix other scans
jam(); if (node.getNodeScan() != RNIL)
c_scanOpPool.getPtr(scanPtr); nodePopUpScans(signal, node, pos);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popUp pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext(signal, scanPtr);
} }
scanPtr.i = nextPtrI; // fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
ent = tmpList[pos];
for (unsigned i = pos; i > 0; i--) {
jam();
tmpList[i] = tmpList[i - 1];
} }
// fix other scans tmpList[0] = newMin;
entList[0] = entList[occup];
// add scans
if (scanList != RNIL)
addScanList(node, 0, scanList);
// fix prefix
if (true)
setNodePref(signal, node);
}
void
Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
...@@ -367,22 +384,7 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -367,22 +384,7 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos++; scanPos.m_pos++;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
ent = tmpList[pos];
for (unsigned i = pos; i > 0; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[0] = newMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
} }
/* /*
...@@ -397,12 +399,108 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig ...@@ -397,12 +399,108 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig
ndbrequire(i <= 1); ndbrequire(i <= 1);
while (cnt != 0) { while (cnt != 0) {
TreeEnt ent; TreeEnt ent;
nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent); Uint32 scanList = RNIL;
nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent); nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
cnt--; cnt--;
} }
} }
// scans linked to node
/*
* Add list of scans to node at given position.
*/
void
Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList)
{
ScanOpPtr scanPtr;
scanPtr.i = scanList;
do {
jam();
c_scanOpPool.getPtr(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "To pos=" << pos << " " << node << endl;
}
#endif
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
scanPtr.p->m_nodeScan = RNIL;
linkScan(node, scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
// set position but leave direction alone
scanPos.m_loc = node.m_loc;
scanPos.m_pos = pos;
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Remove list of scans from node at given position. The return
* location must point to existing list (in fact RNIL always).
*/
void
Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
do {
jam();
c_scanOpPool.getPtr(scanPtr);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "Fron pos=" << pos << " " << node << endl;
}
#endif
unlinkScan(node, scanPtr);
scanPtr.p->m_nodeScan = scanList;
scanList = scanPtr.i;
// unset position but leave direction alone
scanPos.m_loc = NullTupLoc;
scanPos.m_pos = ZNIL;
}
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Move list of scans away from entry about to be removed. Uses scan
* method scanNext().
*/
void
Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
ndbrequire(scanPos.m_loc == node.m_loc);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pos=" << pos << " " << node << endl;
}
#endif
scanNext(signal, scanPtr);
ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
}
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/* /*
* Link scan to the list under the node. The list is single-linked and * Link scan to the list under the node. The list is single-linked and
* ordering does not matter. * ordering does not matter.
......
...@@ -716,7 +716,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -716,7 +716,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
/* /*
* Move to next entry. The scan is already linked to some node. When * Move to next entry. The scan is already linked to some node. When
* we leave, if any entry was found, it will be linked to a possibly * we leave, if an entry was found, it will be linked to a possibly
* different node. The scan has a position, and a direction which tells * different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of: * from where we came to this position. This is one of:
* *
...@@ -725,6 +725,9 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -725,6 +725,9 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* 2 - up from root (the scan ends) * 2 - up from root (the scan ends)
* 3 - left to right within node (at end proceed to right child) * 3 - left to right within node (at end proceed to right child)
* 4 - down from parent (proceed to left child) * 4 - down from parent (proceed to left child)
*
* If an entry was found, scan direction is 3. Therefore tree
* re-organizations need not worry about scan direction.
*/ */
void void
Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
...@@ -739,9 +742,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -739,9 +742,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (scan.m_state == ScanOp::Locked) { if (scan.m_state == ScanOp::Locked) {
jam(); jam();
// version of a tuple locked by us cannot disappear (assert only) // version of a tuple locked by us cannot disappear (assert only)
#ifdef dbtux_wl_1942_is_done
ndbassert(false); ndbassert(false);
#endif
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL; lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock; lockReq->requestInfo = AccLockReq::Unlock;
...@@ -864,6 +865,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -864,6 +865,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos; scan.m_scanPos = pos;
// relink // relink
if (scan.m_state == ScanOp::Current) { if (scan.m_state == ScanOp::Current) {
ndbrequire(pos.m_match == true && pos.m_dir == 3);
ndbrequire(pos.m_loc == node.m_loc); ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) { if (origNode.m_loc != node.m_loc) {
jam(); jam();
......
...@@ -34,7 +34,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -34,7 +34,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
if (node.getOccup() < tree.m_maxOccup) { if (node.getOccup() < tree.m_maxOccup) {
// node has room // node has room
jam(); jam();
nodePushUp(signal, node, pos, ent); nodePushUp(signal, node, pos, ent, RNIL);
return; return;
} }
treeAddFull(signal, frag, node, pos, ent); treeAddFull(signal, frag, node, pos, ent);
...@@ -42,7 +42,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -42,7 +42,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
} }
jam(); jam();
insertNode(signal, node); insertNode(signal, node);
nodePushUp(signal, node, 0, ent); nodePushUp(signal, node, 0, ent, RNIL);
node.setSide(2); node.setSide(2);
tree.m_root = node.m_loc; tree.m_root = node.m_loc;
} }
...@@ -68,13 +68,14 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, ...@@ -68,13 +68,14 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
if (glbNode.getOccup() < tree.m_maxOccup) { if (glbNode.getOccup() < tree.m_maxOccup) {
// g.l.b node has room // g.l.b node has room
jam(); jam();
Uint32 scanList = RNIL;
if (pos != 0) { if (pos != 0) {
jam(); jam();
// add the new entry and return min entry // add the new entry and return min entry
nodePushDown(signal, lubNode, pos - 1, ent); nodePushDown(signal, lubNode, pos - 1, ent, scanList);
} }
// g.l.b node receives min entry from l.u.b node // g.l.b node receives min entry from l.u.b node
nodePushUp(signal, glbNode, glbNode.getOccup(), ent); nodePushUp(signal, glbNode, glbNode.getOccup(), ent, scanList);
return; return;
} }
treeAddNode(signal, frag, lubNode, pos, ent, glbNode, 1); treeAddNode(signal, frag, lubNode, pos, ent, glbNode, 1);
...@@ -97,13 +98,14 @@ Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, ...@@ -97,13 +98,14 @@ Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
parentNode.setLink(i, glbNode.m_loc); parentNode.setLink(i, glbNode.m_loc);
glbNode.setLink(2, parentNode.m_loc); glbNode.setLink(2, parentNode.m_loc);
glbNode.setSide(i); glbNode.setSide(i);
Uint32 scanList = RNIL;
if (pos != 0) { if (pos != 0) {
jam(); jam();
// add the new entry and return min entry // add the new entry and return min entry
nodePushDown(signal, lubNode, pos - 1, ent); nodePushDown(signal, lubNode, pos - 1, ent, scanList);
} }
// g.l.b node receives min entry from l.u.b node // g.l.b node receives min entry from l.u.b node
nodePushUp(signal, glbNode, 0, ent); nodePushUp(signal, glbNode, 0, ent, scanList);
// re-balance the tree // re-balance the tree
treeAddRebalance(signal, frag, parentNode, i); treeAddRebalance(signal, frag, parentNode, i);
} }
...@@ -179,7 +181,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -179,7 +181,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
if (node.getOccup() > tree.m_minOccup) { if (node.getOccup() > tree.m_minOccup) {
// no underflow in any node type // no underflow in any node type
jam(); jam();
nodePopDown(signal, node, pos, ent); nodePopDown(signal, node, pos, ent, 0);
return; return;
} }
if (node.getChilds() == 2) { if (node.getChilds() == 2) {
...@@ -189,7 +191,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -189,7 +191,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
return; return;
} }
// remove entry in semi/leaf // remove entry in semi/leaf
nodePopDown(signal, node, pos, ent); nodePopDown(signal, node, pos, ent, 0);
if (node.getLink(0) != NullTupLoc) { if (node.getLink(0) != NullTupLoc) {
jam(); jam();
treeRemoveSemi(signal, frag, node, 0); treeRemoveSemi(signal, frag, node, 0);
...@@ -222,8 +224,9 @@ Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned ...@@ -222,8 +224,9 @@ Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned
loc = glbNode.getLink(1); loc = glbNode.getLink(1);
} while (loc != NullTupLoc); } while (loc != NullTupLoc);
// borrow max entry from semi/leaf // borrow max entry from semi/leaf
nodePopDown(signal, glbNode, glbNode.getOccup() - 1, ent); Uint32 scanList = RNIL;
nodePopUp(signal, lubNode, pos, ent); nodePopDown(signal, glbNode, glbNode.getOccup() - 1, ent, &scanList);
nodePopUp(signal, lubNode, pos, ent, scanList);
if (glbNode.getLink(0) != NullTupLoc) { if (glbNode.getLink(0) != NullTupLoc) {
jam(); jam();
treeRemoveSemi(signal, frag, glbNode, 0); treeRemoveSemi(signal, frag, glbNode, 0);
......
...@@ -129,4 +129,7 @@ optim 17 mc02/a 35 ms 52 ms 49 pct ...@@ -129,4 +129,7 @@ optim 17 mc02/a 35 ms 52 ms 49 pct
[ allow slack (2) in interior nodes - almost no effect?? ] [ allow slack (2) in interior nodes - almost no effect?? ]
wl-1942 mc02/a 35 ms 52 ms 49 pct
mc02/b 42 ms 75 ms 76 pct
vim: set et: vim: set et:
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment