Commit 4cbb9917 authored by unknown's avatar unknown

Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into mysql.com:/space/pekka/ndb/version/my41-tux

parents 012de9d7 4a9f7745
...@@ -110,7 +110,7 @@ ...@@ -110,7 +110,7 @@
*/ */
#define MAX_TTREE_NODE_SIZE 64 // total words in node #define MAX_TTREE_NODE_SIZE 64 // total words in node
#define MAX_TTREE_PREF_SIZE 4 // words in min prefix #define MAX_TTREE_PREF_SIZE 4 // words in min prefix
#define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy #define MAX_TTREE_NODE_SLACK 2 // diff between max and min occupancy
/* /*
* Blobs. * Blobs.
......
...@@ -32,7 +32,6 @@ ...@@ -32,7 +32,6 @@
// signal classes // signal classes
#include <signaldata/DictTabInfo.hpp> #include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp> #include <signaldata/TuxContinueB.hpp>
#include <signaldata/BuildIndx.hpp>
#include <signaldata/TupFrag.hpp> #include <signaldata/TupFrag.hpp>
#include <signaldata/AlterIndx.hpp> #include <signaldata/AlterIndx.hpp>
#include <signaldata/DropTab.hpp> #include <signaldata/DropTab.hpp>
...@@ -478,7 +477,7 @@ private: ...@@ -478,7 +477,7 @@ private:
Uint16 m_numAttrs; Uint16 m_numAttrs;
bool m_storeNullKey; bool m_storeNullKey;
TreeHead m_tree; TreeHead m_tree;
TupLoc m_freeLoc; // one node pre-allocated for insert TupLoc m_freeLoc; // list of free index nodes
DLList<ScanOp> m_scanList; // current scans on this fragment DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI; Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2]; Uint32 m_tupTableFragPtrI[2];
...@@ -582,17 +581,24 @@ private: ...@@ -582,17 +581,24 @@ private:
* DbtuxNode.cpp * DbtuxNode.cpp
*/ */
int allocNode(Signal* signal, NodeHandle& node); int allocNode(Signal* signal, NodeHandle& node);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc); void selectNode(NodeHandle& node, TupLoc loc);
void insertNode(Signal* signal, NodeHandle& node); void insertNode(NodeHandle& node);
void deleteNode(Signal* signal, NodeHandle& node); void deleteNode(NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node); void setNodePref(NodeHandle& node);
// node operations // node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent); void nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePushUpScans(NodeHandle& node, unsigned pos);
void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePopDownScans(NodeHandle& node, unsigned pos);
void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i); void nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
void nodePushDownScans(NodeHandle& node, unsigned pos);
void nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
void nodePopUpScans(NodeHandle& node, unsigned pos);
void nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
// scans linked to node // scans linked to node
void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList);
void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList);
void moveScanList(NodeHandle& node, unsigned pos);
void linkScan(NodeHandle& node, ScanOpPtr scanPtr); void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr); void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr); bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
...@@ -600,10 +606,21 @@ private: ...@@ -600,10 +606,21 @@ private:
/* /*
* DbtuxTree.cpp * DbtuxTree.cpp
*/ */
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent); // add entry
void treeRemove(Signal* signal, Frag& frag, TreePos treePos); void treeAdd(Frag& frag, TreePos treePos, TreeEnt ent);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); void treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); void treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i);
void treeAddRebalance(Frag& frag, NodeHandle node, unsigned i);
// remove entry
void treeRemove(Frag& frag, TreePos treePos);
void treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos);
void treeRemoveSemi(Frag& frag, NodeHandle node, unsigned i);
void treeRemoveLeaf(Frag& frag, NodeHandle node);
void treeRemoveNode(Frag& frag, NodeHandle node);
void treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i);
// rotate
void treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i);
/* /*
* DbtuxScan.cpp * DbtuxScan.cpp
...@@ -615,9 +632,9 @@ private: ...@@ -615,9 +632,9 @@ private:
void execACCKEYCONF(Signal* signal); void execACCKEYCONF(Signal* signal);
void execACCKEYREF(Signal* signal); void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal); void execACC_ABORTCONF(Signal* signal);
void scanFirst(Signal* signal, ScanOpPtr scanPtr); void scanFirst(ScanOpPtr scanPtr);
void scanNext(Signal* signal, ScanOpPtr scanPtr); void scanNext(ScanOpPtr scanPtr);
bool scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr); void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp); void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
...@@ -626,9 +643,9 @@ private: ...@@ -626,9 +643,9 @@ private:
/* /*
* DbtuxSearch.cpp * DbtuxSearch.cpp
*/ */
void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/* /*
* DbtuxCmp.cpp * DbtuxCmp.cpp
...@@ -652,7 +669,7 @@ private: ...@@ -652,7 +669,7 @@ private:
PrintPar(); PrintPar();
}; };
void printTree(Signal* signal, Frag& frag, NdbOut& out); void printTree(Signal* signal, Frag& frag, NdbOut& out);
void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par); void printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
friend class NdbOut& operator<<(NdbOut&, const TupLoc&); friend class NdbOut& operator<<(NdbOut&, const TupLoc&);
friend class NdbOut& operator<<(NdbOut&, const TreeEnt&); friend class NdbOut& operator<<(NdbOut&, const TreeEnt&);
friend class NdbOut& operator<<(NdbOut&, const TreeNode&); friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
......
...@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
strcpy(par.m_path, "."); strcpy(par.m_path, ".");
par.m_side = 2; par.m_side = 2;
par.m_parent = NullTupLoc; par.m_parent = NullTupLoc;
printNode(signal, frag, out, tree.m_root, par); printNode(frag, out, tree.m_root, par);
out.m_out->flush(); out.m_out->flush();
if (! par.m_ok) { if (! par.m_ok) {
if (debugFile == 0) { if (debugFile == 0) {
...@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
} }
void void
Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par) Dbtux::printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
{ {
if (loc == NullTupLoc) { if (loc == NullTupLoc) {
par.m_depth = 0; par.m_depth = 0;
...@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
} }
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, loc); selectNode(node, loc);
out << par.m_path << " " << node << endl; out << par.m_path << " " << node << endl;
// check children // check children
PrintPar cpar[2]; PrintPar cpar[2];
...@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i; cpar[i].m_side = i;
cpar[i].m_depth = 0; cpar[i].m_depth = 0;
cpar[i].m_parent = loc; cpar[i].m_parent = loc;
printNode(signal, frag, out, node.getLink(i), cpar[i]); printNode(frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) { if (! cpar[i].m_ok) {
par.m_ok = false; par.m_ok = false;
} }
...@@ -178,16 +178,19 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -178,16 +178,19 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
out << "occupancy " << node.getOccup() << " of interior node"; out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl; out << " less than min " << tree.m_minOccup << endl;
} }
// check missed half-leaf/leaf merge #ifdef dbtux_totally_groks_t_trees
// check missed semi-leaf/leaf merge
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) != NullTupLoc && if (node.getLink(i) != NullTupLoc &&
node.getLink(1 - i) == NullTupLoc && node.getLink(1 - i) == NullTupLoc &&
node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) { // our semi-leaf seems to satify interior minOccup condition
node.getOccup() < tree.m_minOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << sep; out << par.m_path << sep;
out << "missed merge with child " << i << endl; out << "missed merge with child " << i << endl;
} }
} }
#endif
// check inline prefix // check inline prefix
{ ConstData data1 = node.getPref(); { ConstData data1 = node.getPref();
Uint32 data2[MaxPrefSize]; Uint32 data2[MaxPrefSize];
......
...@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
switch (opCode) { switch (opCode) {
case TuxMaintReq::OpAdd: case TuxMaintReq::OpAdd:
jam(); jam();
searchToAdd(signal, frag, c_searchKey, ent, treePos); searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
...@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break; break;
} }
/* /*
* At most one new node is inserted in the operation. We keep one * At most one new node is inserted in the operation. Pre-allocate
* free node pre-allocated so the operation cannot fail. * it so that the operation cannot fail.
*/ */
if (frag.m_freeLoc == NullTupLoc) { if (frag.m_freeLoc == NullTupLoc) {
jam(); jam();
...@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
jam(); jam();
break; break;
} }
// link to freelist
node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc; frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc); ndbrequire(frag.m_freeLoc != NullTupLoc);
} }
treeAdd(signal, frag, treePos, ent); treeAdd(frag, treePos, ent);
break; break;
case TuxMaintReq::OpRemove: case TuxMaintReq::OpRemove:
jam(); jam();
searchToRemove(signal, frag, c_searchKey, ent, treePos); searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
...@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
} }
break; break;
} }
treeRemove(signal, frag, treePos); treeRemove(frag, treePos);
break; break;
default: default:
ndbrequire(false); ndbrequire(false);
......
...@@ -211,11 +211,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) ...@@ -211,11 +211,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
// make these configurable later // make these configurable later
tree.m_nodeSize = MAX_TTREE_NODE_SIZE; tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
tree.m_prefSize = MAX_TTREE_PREF_SIZE; tree.m_prefSize = MAX_TTREE_PREF_SIZE;
#ifdef dbtux_min_occup_less_max_occup
const unsigned maxSlack = MAX_TTREE_NODE_SLACK; const unsigned maxSlack = MAX_TTREE_NODE_SLACK;
#else
const unsigned maxSlack = 0;
#endif
// size up to and including first 2 entries // size up to and including first 2 entries
const unsigned pref = tree.getSize(AccPref); const unsigned pref = tree.getSize(AccPref);
if (! (pref <= tree.m_nodeSize)) { if (! (pref <= tree.m_nodeSize)) {
......
...@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node) ...@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
* Set handle to point to existing node. * Set handle to point to existing node.
*/ */
void void
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc) Dbtux::selectNode(NodeHandle& node, TupLoc loc)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc); ndbrequire(loc != NullTupLoc);
...@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc) ...@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
} }
/* /*
* Set handle to point to new node. Uses the pre-allocated node. * Set handle to point to new node. Uses a pre-allocated node.
*/ */
void void
Dbtux::insertNode(Signal* signal, NodeHandle& node) Dbtux::insertNode(NodeHandle& node)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TupLoc loc = frag.m_freeLoc; // unlink from freelist
frag.m_freeLoc = NullTupLoc; selectNode(node, frag.m_freeLoc);
selectNode(signal, node, loc); frag.m_freeLoc = node.getLink(0);
new (node.m_node) TreeNode(); new (node.m_node) TreeNode();
#ifdef VM_TRACE #ifdef VM_TRACE
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node) ...@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node)
} }
/* /*
* Delete existing node. * Delete existing node. Simply put it on the freelist.
*/ */
void void
Dbtux::deleteNode(Signal* signal, NodeHandle& node) Dbtux::deleteNode(NodeHandle& node)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
ndbrequire(node.getOccup() == 0); ndbrequire(node.getOccup() == 0);
TupLoc loc = node.m_loc; // link to freelist
Uint32 pageId = loc.getPageId(); node.setLink(0, frag.m_freeLoc);
Uint32 pageOffset = loc.getPageOffset(); frag.m_freeLoc = node.m_loc;
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node); // invalidate the handle
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
jamEntry();
// invalidate handle and storage
node.m_loc = NullTupLoc; node.m_loc = NullTupLoc;
node.m_node = 0; node.m_node = 0;
} }
...@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node) ...@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead * attribute headers for now. XXX use null mask instead
*/ */
void void
Dbtux::setNodePref(Signal* signal, NodeHandle& node) Dbtux::setNodePref(NodeHandle& node)
{ {
const Frag& frag = node.m_frag; const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
...@@ -117,18 +114,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node) ...@@ -117,18 +114,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* v * v
* A B C D E _ _ => A B C X D E _ * A B C D E _ _ => A B C X D E _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Add list of scans at the new entry.
*/ */
void void
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent) Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup < tree.m_maxOccup && pos <= occup); ndbrequire(occup < tree.m_maxOccup && pos <= occup);
// fix scans // fix old scans
if (node.getNodeScan() != RNIL)
nodePushUpScans(node, pos);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// add new scans
if (scanList != RNIL)
addScanList(node, pos, scanList);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(node);
}
void
Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
...@@ -144,21 +168,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -144,21 +168,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
scanPos.m_pos++; scanPos.m_pos++;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(signal, node);
} }
/* /*
...@@ -169,42 +179,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -169,42 +179,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
* ^ ^ * ^ ^
* A B C D E F _ => A B C E F _ _ * A B C D E F _ => A B C E F _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Scans at removed entry are returned if non-zero location is passed or
* else moved forward.
*/ */
void void
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr; if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // remove or move scans at this position
scanPtr.i = node.getNodeScan(); if (scanList == 0)
while (scanPtr.i != RNIL) { moveScanList(node, pos);
else
removeScanList(node, pos, *scanList);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePopDownScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); tmpList[i] = tmpList[i + 1];
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popDown pos=" << pos << " " << node << endl;
}
#endif
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
} }
// fix other scans entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(node);
}
void
Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
// handled before
ndbrequire(scanPos.m_pos != pos); ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos > pos) { if (scanPos.m_pos > pos) {
jam(); jam();
...@@ -217,21 +240,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -217,21 +240,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos--; scanPos.m_pos--;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(signal, node);
} }
/* /*
...@@ -242,43 +251,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -242,43 +251,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
* ^ v ^ * ^ v ^
* A B C D E _ _ => B C D X E _ _ * A B C D E _ _ => B C D X E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Return list of scans at the removed position 0.
*/ */
void void
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr; if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // remove scans at 0
scanPtr.i = node.getNodeScan(); removeScanList(node, 0, scanList);
while (scanPtr.i != RNIL) { // fix other scans
if (node.getNodeScan() != RNIL)
nodePushDownScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
for (unsigned i = 0; i < pos; i++) {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); tmpList[i] = tmpList[i + 1];
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == 0) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pushDown pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
} }
// fix other scans tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(node);
}
void
Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
// handled before
ndbrequire(scanPos.m_pos != 0); ndbrequire(scanPos.m_pos != 0);
if (scanPos.m_pos <= pos) { if (scanPos.m_pos <= pos) {
jam(); jam();
...@@ -291,22 +309,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -291,22 +309,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
scanPos.m_pos--; scanPos.m_pos--;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
for (unsigned i = 0; i < pos; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
} }
/* /*
...@@ -318,39 +321,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -318,39 +321,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
* v ^ ^ * v ^ ^
* A B C D E _ _ => X A B C E _ _ * A B C D E _ _ => X A B C E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Move scans at removed entry and add scans at the new entry.
*/ */
void void
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr; if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // move scans whose entry disappears
scanPtr.i = node.getNodeScan(); moveScanList(node, pos);
while (scanPtr.i != RNIL) { // fix other scans
if (node.getNodeScan() != RNIL)
nodePopUpScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
ent = tmpList[pos];
for (unsigned i = pos; i > 0; i--) {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); tmpList[i] = tmpList[i - 1];
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popUp pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
} }
// fix other scans tmpList[0] = newMin;
entList[0] = entList[occup];
// add scans
if (scanList != RNIL)
addScanList(node, 0, scanList);
// fix prefix
if (true)
setNodePref(node);
}
void
Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
...@@ -367,41 +381,123 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -367,41 +381,123 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos++; scanPos.m_pos++;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
ent = tmpList[pos];
for (unsigned i = pos; i > 0; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[0] = newMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
} }
/* /*
* Move all possible entries from another node before the min (i=0) or * Move number of entries from another node to this node before the min
* after the max (i=1). XXX can be optimized * (i=0) or after the max (i=1). Expensive but not often used.
*/ */
void void
Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i) Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
{ {
Frag& frag = dstNode.m_frag; Frag& frag = dstNode.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
ndbrequire(i <= 1); ndbrequire(i <= 1);
while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) { while (cnt != 0) {
TreeEnt ent; TreeEnt ent;
nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent); Uint32 scanList = RNIL;
nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent); nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
cnt--;
} }
} }
// scans linked to node
/*
* Add list of scans to node at given position.
*/
void
Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList)
{
ScanOpPtr scanPtr;
scanPtr.i = scanList;
do {
jam();
c_scanOpPool.getPtr(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "To pos=" << pos << " " << node << endl;
}
#endif
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
scanPtr.p->m_nodeScan = RNIL;
linkScan(node, scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
// set position but leave direction alone
scanPos.m_loc = node.m_loc;
scanPos.m_pos = pos;
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Remove list of scans from node at given position. The return
* location must point to existing list (in fact RNIL always).
*/
void
Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
do {
jam();
c_scanOpPool.getPtr(scanPtr);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "Fron pos=" << pos << " " << node << endl;
}
#endif
unlinkScan(node, scanPtr);
scanPtr.p->m_nodeScan = scanList;
scanList = scanPtr.i;
// unset position but leave direction alone
scanPos.m_loc = NullTupLoc;
scanPos.m_pos = ZNIL;
}
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Move list of scans away from entry about to be removed. Uses scan
* method scanNext().
*/
void
Dbtux::moveScanList(NodeHandle& node, unsigned pos)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
ndbrequire(scanPos.m_loc == node.m_loc);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pos=" << pos << " " << node << endl;
}
#endif
scanNext(scanPtr);
ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
}
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/* /*
* Link scan to the list under the node. The list is single-linked and * Link scan to the list under the node. The list is single-linked and
* ordering does not matter. * ordering does not matter.
......
...@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jam(); jam();
const TupLoc loc = scan.m_scanPos.m_loc; const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, loc); selectNode(node, loc);
unlinkScan(node, scanPtr); unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc; scan.m_scanPos.m_loc = NullTupLoc;
} }
...@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::First) { if (scan.m_state == ScanOp::First) {
jam(); jam();
// search is done only once in single range scan // search is done only once in single range scan
scanFirst(signal, scanPtr); scanFirst(scanPtr);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & DebugScan) {
debugOut << "First scan " << scanPtr.i << " " << scan << endl; debugOut << "First scan " << scanPtr.i << " " << scan << endl;
...@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::Next) { if (scan.m_state == ScanOp::Next) {
jam(); jam();
// look for next // look for next
scanNext(signal, scanPtr); scanNext(scanPtr);
} }
// for reading tuple key in Current or Locked state // for reading tuple key in Current or Locked state
Data pkData = c_dataBuffer; Data pkData = c_dataBuffer;
...@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal) ...@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
* by scanNext. * by scanNext.
*/ */
void void
Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanFirst(ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
...@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
} }
// search for scan start position // search for scan start position
TreePos treePos; TreePos treePos;
searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos); searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
if (treePos.m_loc == NullTupLoc) { if (treePos.m_loc == NullTupLoc) {
// empty tree // empty tree
jam(); jam();
...@@ -710,13 +710,13 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -710,13 +710,13 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
// link the scan to node found // link the scan to node found
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc); selectNode(node, treePos.m_loc);
linkScan(node, scanPtr); linkScan(node, scanPtr);
} }
/* /*
* Move to next entry. The scan is already linked to some node. When * Move to next entry. The scan is already linked to some node. When
* we leave, if any entry was found, it will be linked to a possibly * we leave, if an entry was found, it will be linked to a possibly
* different node. The scan has a position, and a direction which tells * different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of: * from where we came to this position. This is one of:
* *
...@@ -725,9 +725,12 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -725,9 +725,12 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* 2 - up from root (the scan ends) * 2 - up from root (the scan ends)
* 3 - left to right within node (at end proceed to right child) * 3 - left to right within node (at end proceed to right child)
* 4 - down from parent (proceed to left child) * 4 - down from parent (proceed to left child)
*
* If an entry was found, scan direction is 3. Therefore tree
* re-organizations need not worry about scan direction.
*/ */
void void
Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanNext(ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
...@@ -736,22 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -736,22 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
debugOut << "Next in scan " << scanPtr.i << " " << scan << endl; debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
if (scan.m_state == ScanOp::Locked) { // cannot be moved away from tuple we have locked
jam(); ndbrequire(scan.m_state != ScanOp::Locked);
// version of a tuple locked by us cannot disappear (assert only)
#ifdef dbtux_wl_1942_is_done
ndbassert(false);
#endif
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current;
}
// set up index keys for this operation // set up index keys for this operation
setKeyAttrs(frag); setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer // unpack upper bound into c_dataBuffer
...@@ -767,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -767,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
TreePos pos = scan.m_scanPos; TreePos pos = scan.m_scanPos;
// get and remember original node // get and remember original node
NodeHandle origNode(frag); NodeHandle origNode(frag);
selectNode(signal, origNode, pos.m_loc); selectNode(origNode, pos.m_loc);
ndbrequire(islinkScan(origNode, scanPtr)); ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop // current node in loop
NodeHandle node = origNode; NodeHandle node = origNode;
...@@ -784,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -784,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
} }
if (node.m_loc != pos.m_loc) { if (node.m_loc != pos.m_loc) {
jam(); jam();
selectNode(signal, node, pos.m_loc); selectNode(node, pos.m_loc);
} }
if (pos.m_dir == 4) { if (pos.m_dir == 4) {
// coming down from parent proceed to left child // coming down from parent proceed to left child
...@@ -832,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -832,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break; break;
} }
// can we see it // can we see it
if (! scanVisible(signal, scanPtr, ent)) { if (! scanVisible(scanPtr, ent)) {
jam(); jam();
continue; continue;
} }
...@@ -864,6 +853,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -864,6 +853,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos; scan.m_scanPos = pos;
// relink // relink
if (scan.m_state == ScanOp::Current) { if (scan.m_state == ScanOp::Current) {
ndbrequire(pos.m_match == true && pos.m_dir == 3);
ndbrequire(pos.m_loc == node.m_loc); ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) { if (origNode.m_loc != node.m_loc) {
jam(); jam();
...@@ -894,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -894,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
* which are not analyzed or handled yet. * which are not analyzed or handled yet.
*/ */
bool bool
Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent) Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
{ {
const ScanOp& scan = *scanPtr.p; const ScanOp& scan = *scanPtr.p;
const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max * TODO optimize for initial equal attrs in node min/max
*/ */
void void
Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
...@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
unsigned start = 0; unsigned start = 0;
...@@ -159,12 +159,12 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -159,12 +159,12 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
* *
* Compares search key to each node min. A move to right subtree can * Compares search key to each node min. A move to right subtree can
* overshoot target node. The last such node is saved. The final node * overshoot target node. The last such node is saved. The final node
* is a half-leaf or leaf. If search key is less than final node min * is a semi-leaf or leaf. If search key is less than final node min
* then the saved node is the g.l.b of the final node and we move back * then the saved node is the g.l.b of the final node and we move back
* to it. * to it.
*/ */
void void
Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
...@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s ...@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
unsigned start = 0; unsigned start = 0;
...@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s ...@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
* Similar to searchToAdd. * Similar to searchToAdd.
*/ */
void void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos) Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag); NodeHandle currNode(frag);
...@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo ...@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize); ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
......
...@@ -18,71 +18,105 @@ ...@@ -18,71 +18,105 @@
#include "Dbtux.hpp" #include "Dbtux.hpp"
/* /*
* Add entry. * Add entry. Handle the case when there is room for one more. This
* is the common case given slack in nodes.
*/ */
void void
Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) Dbtux::treeAdd(Frag& frag, TreePos treePos, TreeEnt ent)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandle node(frag); NodeHandle node(frag);
// check for empty tree if (treePos.m_loc != NullTupLoc) {
if (treePos.m_loc == NullTupLoc) { // non-empty tree
jam();
insertNode(signal, node);
nodePushUp(signal, node, 0, ent);
node.setSide(2);
tree.m_root = node.m_loc;
return;
}
selectNode(signal, node, treePos.m_loc);
// check if it is bounding node
if (pos != 0 && pos != node.getOccup()) {
jam(); jam();
// check if room for one more selectNode(node, treePos.m_loc);
unsigned pos = treePos.m_pos;
if (node.getOccup() < tree.m_maxOccup) { if (node.getOccup() < tree.m_maxOccup) {
// node has room
jam(); jam();
nodePushUp(signal, node, pos, ent); nodePushUp(node, pos, ent, RNIL);
return; return;
} }
// returns min entry treeAddFull(frag, node, pos, ent);
nodePushDown(signal, node, pos - 1, ent); return;
// find position to add the removed min entry }
TupLoc childLoc = node.getLink(0); jam();
if (childLoc == NullTupLoc) { insertNode(node);
nodePushUp(node, 0, ent, RNIL);
node.setSide(2);
tree.m_root = node.m_loc;
}
/*
* Add entry when node is full. Handle the case when there is g.l.b
* node in left subtree with room for one more. It will receive the min
* entry of this node. The min entry could be the entry to add.
*/
void
Dbtux::treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent)
{
TreeHead& tree = frag.m_tree;
TupLoc loc = lubNode.getLink(0);
if (loc != NullTupLoc) {
// find g.l.b node
NodeHandle glbNode(frag);
do {
jam(); jam();
// left child will be added selectNode(glbNode, loc);
pos = 0; loc = glbNode.getLink(1);
} else { } while (loc != NullTupLoc);
if (glbNode.getOccup() < tree.m_maxOccup) {
// g.l.b node has room
jam(); jam();
// find glb node Uint32 scanList = RNIL;
while (childLoc != NullTupLoc) { if (pos != 0) {
jam(); jam();
selectNode(signal, node, childLoc); // add the new entry and return min entry
childLoc = node.getLink(1); nodePushDown(lubNode, pos - 1, ent, scanList);
} }
pos = node.getOccup(); // g.l.b node receives min entry from l.u.b node
nodePushUp(glbNode, glbNode.getOccup(), ent, scanList);
return;
} }
// fall thru to next case treeAddNode(frag, lubNode, pos, ent, glbNode, 1);
}
// adding new min or max
unsigned i = (pos == 0 ? 0 : 1);
ndbrequire(node.getLink(i) == NullTupLoc);
// check if the half-leaf/leaf has room for one more
if (node.getOccup() < tree.m_maxOccup) {
jam();
nodePushUp(signal, node, pos, ent);
return; return;
} }
// add a new node treeAddNode(frag, lubNode, pos, ent, lubNode, 0);
NodeHandle childNode(frag); }
insertNode(signal, childNode);
nodePushUp(signal, childNode, 0, ent); /*
* Add entry when there is no g.l.b node in left subtree or the g.l.b
* node is full. We must add a new left or right child node which
* becomes the new g.l.b node.
*/
void
Dbtux::treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i)
{
NodeHandle glbNode(frag);
insertNode(glbNode);
// connect parent and child // connect parent and child
node.setLink(i, childNode.m_loc); parentNode.setLink(i, glbNode.m_loc);
childNode.setLink(2, node.m_loc); glbNode.setLink(2, parentNode.m_loc);
childNode.setSide(i); glbNode.setSide(i);
// re-balance tree at each node Uint32 scanList = RNIL;
if (pos != 0) {
jam();
// add the new entry and return min entry
nodePushDown(lubNode, pos - 1, ent, scanList);
}
// g.l.b node receives min entry from l.u.b node
nodePushUp(glbNode, 0, ent, scanList);
// re-balance the tree
treeAddRebalance(frag, parentNode, i);
}
/*
* Re-balance tree after adding a node. The process starts with the
* parent of the added node.
*/
void
Dbtux::treeAddRebalance(Frag& frag, NodeHandle node, unsigned i)
{
while (true) { while (true) {
// height of subtree i has increased by 1 // height of subtree i has increased by 1
int j = (i == 0 ? -1 : +1); int j = (i == 0 ? -1 : +1);
...@@ -102,14 +136,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -102,14 +136,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// height of longer subtree increased // height of longer subtree increased
jam(); jam();
NodeHandle childNode(frag); NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(i)); selectNode(childNode, node.getLink(i));
int b2 = childNode.getBalance(); int b2 = childNode.getBalance();
if (b2 == b) { if (b2 == b) {
jam(); jam();
treeRotateSingle(signal, frag, node, i); treeRotateSingle(frag, node, i);
} else if (b2 == -b) { } else if (b2 == -b) {
jam(); jam();
treeRotateDouble(signal, frag, node, i); treeRotateDouble(frag, node, i);
} else { } else {
// height of subtree increased so it cannot be perfectly balanced // height of subtree increased so it cannot be perfectly balanced
ndbrequire(false); ndbrequire(false);
...@@ -126,115 +160,169 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -126,115 +160,169 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
break; break;
} }
i = node.getSide(); i = node.getSide();
selectNode(signal, node, parentLoc); selectNode(node, parentLoc);
} }
} }
/* /*
* Remove entry. * Remove entry. Optimize for nodes with slack. Handle the case when
* there is no underflow i.e. occupancy remains at least minOccup. For
* interior nodes this is a requirement. For others it means that we do
* not need to consider merge of semi-leaf and leaf.
*/ */
void void
Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) Dbtux::treeRemove(Frag& frag, TreePos treePos)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos; unsigned pos = treePos.m_pos;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc); selectNode(node, treePos.m_loc);
TreeEnt ent; TreeEnt ent;
// check interior node first if (node.getOccup() > tree.m_minOccup) {
if (node.getChilds() == 2) { // no underflow in any node type
jam(); jam();
ndbrequire(node.getOccup() >= tree.m_minOccup); nodePopDown(node, pos, ent, 0);
// check if no underflow return;
if (node.getOccup() > tree.m_minOccup) {
jam();
nodePopDown(signal, node, pos, ent);
return;
}
// save current handle
NodeHandle parentNode = node;
// find glb node
TupLoc childLoc = node.getLink(0);
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, node, childLoc);
childLoc = node.getLink(1);
}
// use glb max as new parent min
ent = node.getEnt(node.getOccup() - 1);
nodePopUp(signal, parentNode, pos, ent);
// set up to remove glb max
pos = node.getOccup() - 1;
// fall thru to next case
} }
// remove the element if (node.getChilds() == 2) {
nodePopDown(signal, node, pos, ent); // underflow in interior node
ndbrequire(node.getChilds() <= 1);
// handle half-leaf
unsigned i;
for (i = 0; i <= 1; i++) {
jam(); jam();
TupLoc childLoc = node.getLink(i); treeRemoveInner(frag, node, pos);
if (childLoc != NullTupLoc) { return;
// move to child
selectNode(signal, node, childLoc);
// balance of half-leaf parent requires child to be leaf
break;
}
} }
ndbrequire(node.getChilds() == 0); // remove entry in semi/leaf
// get parent if any nodePopDown(node, pos, ent, 0);
TupLoc parentLoc = node.getLink(2); if (node.getLink(0) != NullTupLoc) {
NodeHandle parentNode(frag);
i = node.getSide();
// move all that fits into parent
if (parentLoc != NullTupLoc) {
jam(); jam();
selectNode(signal, parentNode, node.getLink(2)); treeRemoveSemi(frag, node, 0);
nodeSlide(signal, parentNode, node, i); return;
// fall thru to next case
} }
// non-empty leaf if (node.getLink(1) != NullTupLoc) {
if (node.getOccup() >= 1) {
jam(); jam();
treeRemoveSemi(frag, node, 1);
return; return;
} }
// remove empty leaf treeRemoveLeaf(frag, node);
deleteNode(signal, node); }
if (parentLoc == NullTupLoc) {
/*
* Remove entry when interior node underflows. There is g.l.b node in
* left subtree to borrow an entry from. The max entry of the g.l.b
* node becomes the min entry of this node.
*/
void
Dbtux::treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos)
{
TreeHead& tree = frag.m_tree;
TreeEnt ent;
// find g.l.b node
NodeHandle glbNode(frag);
TupLoc loc = lubNode.getLink(0);
do {
jam();
selectNode(glbNode, loc);
loc = glbNode.getLink(1);
} while (loc != NullTupLoc);
// borrow max entry from semi/leaf
Uint32 scanList = RNIL;
nodePopDown(glbNode, glbNode.getOccup() - 1, ent, &scanList);
nodePopUp(lubNode, pos, ent, scanList);
if (glbNode.getLink(0) != NullTupLoc) {
jam(); jam();
// tree is now empty treeRemoveSemi(frag, glbNode, 0);
tree.m_root = NullTupLoc;
return; return;
} }
node = parentNode; treeRemoveLeaf(frag, glbNode);
node.setLink(i, NullTupLoc); }
#ifdef dbtux_min_occup_less_max_occup
// check if we created a half-leaf /*
if (node.getBalance() == 0) { * Handle semi-leaf after removing an entry. Move entries from leaf to
* semi-leaf to bring semi-leaf occupancy above minOccup, if possible.
* The leaf may become empty.
*/
void
Dbtux::treeRemoveSemi(Frag& frag, NodeHandle semiNode, unsigned i)
{
TreeHead& tree = frag.m_tree;
ndbrequire(semiNode.getChilds() < 2);
TupLoc leafLoc = semiNode.getLink(i);
NodeHandle leafNode(frag);
selectNode(leafNode, leafLoc);
if (semiNode.getOccup() < tree.m_minOccup) {
jam();
unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - semiNode.getOccup());
nodeSlide(semiNode, leafNode, cnt, i);
if (leafNode.getOccup() == 0) {
// remove empty leaf
jam();
treeRemoveNode(frag, leafNode);
}
}
}
/*
* Handle leaf after removing an entry. If parent is semi-leaf, move
* entries to it as in the semi-leaf case. If parent is interior node,
* do nothing.
*/
void
Dbtux::treeRemoveLeaf(Frag& frag, NodeHandle leafNode)
{
TreeHead& tree = frag.m_tree;
TupLoc parentLoc = leafNode.getLink(2);
if (parentLoc != NullTupLoc) {
jam(); jam();
// move entries from the other child NodeHandle parentNode(frag);
TupLoc childLoc = node.getLink(1 - i); selectNode(parentNode, parentLoc);
NodeHandle childNode(frag); unsigned i = leafNode.getSide();
selectNode(signal, childNode, childLoc); if (parentNode.getLink(1 - i) == NullTupLoc) {
nodeSlide(signal, node, childNode, 1 - i); // parent is semi-leaf
if (childNode.getOccup() == 0) {
jam(); jam();
deleteNode(signal, childNode); if (parentNode.getOccup() < tree.m_minOccup) {
node.setLink(1 - i, NullTupLoc);
// we are balanced again but our parent balance changes by -1
parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) {
jam(); jam();
return; unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - parentNode.getOccup());
nodeSlide(parentNode, leafNode, cnt, i);
} }
// fix side and become parent
i = node.getSide();
selectNode(signal, node, parentLoc);
} }
} }
#endif if (leafNode.getOccup() == 0) {
// re-balance tree at each node jam();
// remove empty leaf
treeRemoveNode(frag, leafNode);
}
}
/*
* Remove empty leaf.
*/
void
Dbtux::treeRemoveNode(Frag& frag, NodeHandle leafNode)
{
TreeHead& tree = frag.m_tree;
ndbrequire(leafNode.getChilds() == 0);
TupLoc parentLoc = leafNode.getLink(2);
unsigned i = leafNode.getSide();
deleteNode(leafNode);
if (parentLoc != NullTupLoc) {
jam();
NodeHandle parentNode(frag);
selectNode(parentNode, parentLoc);
parentNode.setLink(i, NullTupLoc);
// re-balance the tree
treeRemoveRebalance(frag, parentNode, i);
return;
}
// tree is now empty
tree.m_root = NullTupLoc;
}
/*
* Re-balance tree after removing a node. The process starts with the
* parent of the removed node.
*/
void
Dbtux::treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i)
{
while (true) { while (true) {
// height of subtree i has decreased by 1 // height of subtree i has decreased by 1
int j = (i == 0 ? -1 : +1); int j = (i == 0 ? -1 : +1);
...@@ -255,19 +343,19 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -255,19 +343,19 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
jam(); jam();
// child on the other side // child on the other side
NodeHandle childNode(frag); NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(1 - i)); selectNode(childNode, node.getLink(1 - i));
int b2 = childNode.getBalance(); int b2 = childNode.getBalance();
if (b2 == b) { if (b2 == b) {
jam(); jam();
treeRotateSingle(signal, frag, node, 1 - i); treeRotateSingle(frag, node, 1 - i);
// height of tree decreased and propagates up // height of tree decreased and propagates up
} else if (b2 == -b) { } else if (b2 == -b) {
jam(); jam();
treeRotateDouble(signal, frag, node, 1 - i); treeRotateDouble(frag, node, 1 - i);
// height of tree decreased and propagates up // height of tree decreased and propagates up
} else { } else {
jam(); jam();
treeRotateSingle(signal, frag, node, 1 - i); treeRotateSingle(frag, node, 1 - i);
// height of tree did not change - done // height of tree did not change - done
return; return;
} }
...@@ -281,7 +369,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -281,7 +369,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
return; return;
} }
i = node.getSide(); i = node.getSide();
selectNode(signal, node, parentLoc); selectNode(node, parentLoc);
} }
} }
...@@ -302,10 +390,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -302,10 +390,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
* all optional. If 4 are there it changes side. * all optional. If 4 are there it changes side.
*/ */
void void
Dbtux::treeRotateSingle(Signal* signal, Dbtux::treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i)
Frag& frag,
NodeHandle& node,
unsigned i)
{ {
ndbrequire(i <= 1); ndbrequire(i <= 1);
/* /*
...@@ -325,7 +410,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -325,7 +410,7 @@ Dbtux::treeRotateSingle(Signal* signal,
*/ */
TupLoc loc3 = node5.getLink(i); TupLoc loc3 = node5.getLink(i);
NodeHandle node3(frag); NodeHandle node3(frag);
selectNode(signal, node3, loc3); selectNode(node3, loc3);
const int bal3 = node3.getBalance(); const int bal3 = node3.getBalance();
/* /*
2 must always be there but is not changed. Thus we mereley check that it 2 must always be there but is not changed. Thus we mereley check that it
...@@ -342,7 +427,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -342,7 +427,7 @@ Dbtux::treeRotateSingle(Signal* signal,
NodeHandle node4(frag); NodeHandle node4(frag);
if (loc4 != NullTupLoc) { if (loc4 != NullTupLoc) {
jam(); jam();
selectNode(signal, node4, loc4); selectNode(node4, loc4);
ndbrequire(node4.getSide() == (1 - i) && ndbrequire(node4.getSide() == (1 - i) &&
node4.getLink(2) == loc3); node4.getLink(2) == loc3);
node4.setSide(i); node4.setSide(i);
...@@ -377,7 +462,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -377,7 +462,7 @@ Dbtux::treeRotateSingle(Signal* signal,
if (loc0 != NullTupLoc) { if (loc0 != NullTupLoc) {
jam(); jam();
NodeHandle node0(frag); NodeHandle node0(frag);
selectNode(signal, node0, loc0); selectNode(node0, loc0);
node0.setLink(side5, loc3); node0.setLink(side5, loc3);
} else { } else {
jam(); jam();
...@@ -514,8 +599,10 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -514,8 +599,10 @@ Dbtux::treeRotateSingle(Signal* signal,
* *
*/ */
void void
Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i) Dbtux::treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i)
{ {
TreeHead& tree = frag.m_tree;
// old top node // old top node
NodeHandle node6 = node; NodeHandle node6 = node;
const TupLoc loc6 = node6.m_loc; const TupLoc loc6 = node6.m_loc;
...@@ -526,13 +613,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i ...@@ -526,13 +613,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// level 1 // level 1
TupLoc loc2 = node6.getLink(i); TupLoc loc2 = node6.getLink(i);
NodeHandle node2(frag); NodeHandle node2(frag);
selectNode(signal, node2, loc2); selectNode(node2, loc2);
const int bal2 = node2.getBalance(); const int bal2 = node2.getBalance();
// level 2 // level 2
TupLoc loc4 = node2.getLink(1 - i); TupLoc loc4 = node2.getLink(1 - i);
NodeHandle node4(frag); NodeHandle node4(frag);
selectNode(signal, node4, loc4); selectNode(node4, loc4);
const int bal4 = node4.getBalance(); const int bal4 = node4.getBalance();
ndbrequire(i <= 1); ndbrequire(i <= 1);
...@@ -549,23 +636,26 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i ...@@ -549,23 +636,26 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// fill up leaf before it becomes internal // fill up leaf before it becomes internal
if (loc3 == NullTupLoc && loc5 == NullTupLoc) { if (loc3 == NullTupLoc && loc5 == NullTupLoc) {
jam(); jam();
TreeHead& tree = frag.m_tree; if (node4.getOccup() < tree.m_minOccup) {
nodeSlide(signal, node4, node2, i); jam();
// implied by rule of merging half-leaves with leaves unsigned cnt = tree.m_minOccup - node4.getOccup();
ndbrequire(node4.getOccup() >= tree.m_minOccup); ndbrequire(cnt < node2.getOccup());
ndbrequire(node2.getOccup() != 0); nodeSlide(node4, node2, cnt, i);
ndbrequire(node4.getOccup() >= tree.m_minOccup);
ndbrequire(node2.getOccup() != 0);
}
} else { } else {
if (loc3 != NullTupLoc) { if (loc3 != NullTupLoc) {
jam(); jam();
NodeHandle node3(frag); NodeHandle node3(frag);
selectNode(signal, node3, loc3); selectNode(node3, loc3);
node3.setLink(2, loc2); node3.setLink(2, loc2);
node3.setSide(1 - i); node3.setSide(1 - i);
} }
if (loc5 != NullTupLoc) { if (loc5 != NullTupLoc) {
jam(); jam();
NodeHandle node5(frag); NodeHandle node5(frag);
selectNode(signal, node5, loc5); selectNode(node5, loc5);
node5.setLink(2, node6.m_loc); node5.setLink(2, node6.m_loc);
node5.setSide(i); node5.setSide(i);
} }
...@@ -588,7 +678,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i ...@@ -588,7 +678,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
if (loc0 != NullTupLoc) { if (loc0 != NullTupLoc) {
jam(); jam();
selectNode(signal, node0, loc0); selectNode(node0, loc0);
node0.setLink(side6, loc4); node0.setLink(side6, loc4);
} else { } else {
jam(); jam();
......
...@@ -29,6 +29,7 @@ shows ms / 1000 rows for each and index time overhead ...@@ -29,6 +29,7 @@ shows ms / 1000 rows for each and index time overhead
samples 10% of all PKs (100,000 pk reads, 100,000 scans) samples 10% of all PKs (100,000 pk reads, 100,000 scans)
the "pct" values are from more accurate total times (not shown) the "pct" values are from more accurate total times (not shown)
comments [ ... ] are after the case
040616 mc02/a 40 ms 87 ms 114 pct 040616 mc02/a 40 ms 87 ms 114 pct
mc02/b 51 ms 128 ms 148 pct mc02/b 51 ms 128 ms 148 pct
...@@ -76,13 +77,12 @@ optim 13 mc02/a 40 ms 57 ms 42 pct ...@@ -76,13 +77,12 @@ optim 13 mc02/a 40 ms 57 ms 42 pct
mc02/c 9 ms 13 ms 50 pct mc02/c 9 ms 13 ms 50 pct
mc02/d 170 ms 256 ms 50 pct mc02/d 170 ms 256 ms 50 pct
after wl-1884 store all-NULL keys (the tests have pctnull=10 per column)
optim 13 mc02/a 39 ms 59 ms 50 pct optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/b 47 ms 77 ms 61 pct mc02/b 47 ms 77 ms 61 pct
mc02/c 9 ms 12 ms 44 pct mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct mc02/d 246 ms 289 ms 17 pct
[ after wl-1884 store all-NULL keys (the tests have pctnull=10 per column) ]
[ case d: bug in testOIBasic killed PK read performance ] [ case d: bug in testOIBasic killed PK read performance ]
optim 14 mc02/a 41 ms 60 ms 44 pct optim 14 mc02/a 41 ms 60 ms 44 pct
...@@ -98,8 +98,7 @@ none mc02/a 35 ms 60 ms 71 pct ...@@ -98,8 +98,7 @@ none mc02/a 35 ms 60 ms 71 pct
mc02/c 5 ms 12 ms 106 pct mc02/c 5 ms 12 ms 106 pct
mc02/d 165 ms 238 ms 44 pct mc02/d 165 ms 238 ms 44 pct
[ johan re-installed mc02 as fedora gcc-3.3.2 ] [ johan re-installed mc02 as fedora gcc-3.3.2, tux uses more C++ stuff than tup]
[ case c: table scan has improved... ]
charsets mc02/a 35 ms 60 ms 71 pct charsets mc02/a 35 ms 60 ms 71 pct
mc02/b 42 ms 84 ms 97 pct mc02/b 42 ms 84 ms 97 pct
...@@ -118,6 +117,19 @@ optim 15 mc02/a 34 ms 60 ms 72 pct ...@@ -118,6 +117,19 @@ optim 15 mc02/a 34 ms 60 ms 72 pct
optim 16 mc02/a 34 ms 53 ms 53 pct optim 16 mc02/a 34 ms 53 ms 53 pct
mc02/b 42 ms 75 ms 75 pct mc02/b 42 ms 75 ms 75 pct
[ case a, b: binary search of bounding node when adding entry ] [ binary search of bounding node when adding entry ]
none mc02/a 35 ms 53 ms 51 pct
mc02/b 42 ms 75 ms 76 pct
[ rewrote treeAdd / treeRemove ]
optim 17 mc02/a 35 ms 52 ms 49 pct
mc02/b 43 ms 75 ms 75 pct
[ allow slack (2) in interior nodes - almost no effect?? ]
wl-1942 mc02/a 35 ms 52 ms 49 pct
mc02/b 42 ms 75 ms 76 pct
vim: set et: vim: set et:
...@@ -42,7 +42,7 @@ struct Opt { ...@@ -42,7 +42,7 @@ struct Opt {
CHARSET_INFO* m_cs; CHARSET_INFO* m_cs;
bool m_dups; bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype; NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop; unsigned m_subsubloop;
const char* m_index; const char* m_index;
unsigned m_loop; unsigned m_loop;
bool m_nologging; bool m_nologging;
...@@ -66,7 +66,7 @@ struct Opt { ...@@ -66,7 +66,7 @@ struct Opt {
m_cs(0), m_cs(0),
m_dups(false), m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined), m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4), m_subsubloop(4),
m_index(0), m_index(0),
m_loop(1), m_loop(1),
m_nologging(false), m_nologging(false),
...@@ -79,7 +79,7 @@ struct Opt { ...@@ -79,7 +79,7 @@ struct Opt {
m_seed(0), m_seed(0),
m_subloop(4), m_subloop(4),
m_table(0), m_table(0),
m_threads(4), m_threads(6), // table + 5 indexes
m_v(1) { m_v(1) {
} }
}; };
...@@ -208,6 +208,8 @@ struct Par : public Opt { ...@@ -208,6 +208,8 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; } Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr; Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; } Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
unsigned m_lno;
unsigned m_slno;
unsigned m_totrows; unsigned m_totrows;
// value calculation // value calculation
unsigned m_range; unsigned m_range;
...@@ -226,6 +228,8 @@ struct Par : public Opt { ...@@ -226,6 +228,8 @@ struct Par : public Opt {
m_tab(0), m_tab(0),
m_set(0), m_set(0),
m_tmr(0), m_tmr(0),
m_lno(0),
m_slno(0),
m_totrows(m_threads * m_rows), m_totrows(m_threads * m_rows),
m_range(m_rows), m_range(m_rows),
m_pctrange(0), m_pctrange(0),
...@@ -2069,7 +2073,8 @@ pkinsert(Par par) ...@@ -2069,7 +2073,8 @@ pkinsert(Par par)
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
Lst lst; Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock(); set.lock();
if (set.exist(i) || set.pending(i)) { if (set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
...@@ -2174,7 +2179,8 @@ pkdelete(Par par) ...@@ -2174,7 +2179,8 @@ pkdelete(Par par)
Lst lst; Lst lst;
bool deadlock = false; bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock(); set.lock();
if (! set.exist(i) || set.pending(i)) { if (! set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
...@@ -2398,7 +2404,7 @@ static int ...@@ -2398,7 +2404,7 @@ static int
scanreadindex(Par par, const ITab& itab) scanreadindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) { for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); bset.calc(par);
CHK(scanreadindex(par, itab, bset) == 0); CHK(scanreadindex(par, itab, bset) == 0);
...@@ -2645,7 +2651,7 @@ static int ...@@ -2645,7 +2651,7 @@ static int
scanupdateindex(Par par, const ITab& itab) scanupdateindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) { for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0); CHK(scanupdateindex(par, itab, bset) == 0);
...@@ -2685,6 +2691,53 @@ readverify(Par par) ...@@ -2685,6 +2691,53 @@ readverify(Par par)
return 0; return 0;
} }
static int
readverifyfull(Par par)
{
par.m_verify = true;
if (par.m_no == 0)
CHK(scanreadtable(par) == 0);
else {
const Tab& tab = par.tab();
unsigned i = par.m_no;
if (i <= tab.m_itabs && useindex(i)) {
const ITab& itab = tab.m_itab[i - 1];
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset) == 0);
}
}
return 0;
}
static int
pkops(Par par)
{
par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
unsigned sel = urandom(10);
if (par.m_slno % 2 == 0) {
// favor insert
if (sel < 8) {
CHK(pkinsert(par) == 0);
} else if (sel < 9) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
} else {
// favor delete
if (sel < 1) {
CHK(pkinsert(par) == 0);
} else if (sel < 2) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
}
}
return 0;
}
static int static int
pkupdatescanread(Par par) pkupdatescanread(Par par)
{ {
...@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode) ...@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
thr.m_par.m_tab = par.m_tab; thr.m_par.m_tab = par.m_tab;
thr.m_par.m_set = par.m_set; thr.m_par.m_set = par.m_set;
thr.m_par.m_tmr = par.m_tmr; thr.m_par.m_tmr = par.m_tmr;
thr.m_par.m_lno = par.m_lno;
thr.m_par.m_slno = par.m_slno;
thr.m_func = func; thr.m_func = func;
thr.start(); thr.start();
} }
...@@ -2953,8 +3008,8 @@ tbuild(Par par) ...@@ -2953,8 +3008,8 @@ tbuild(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
if (i % 2 == 0) { if (par.m_slno % 2 == 0) {
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
...@@ -2963,9 +3018,10 @@ tbuild(Par par) ...@@ -2963,9 +3018,10 @@ tbuild(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
} }
RUNSTEP(par, readverify, MT); RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, pkdelete, MT); RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverify, MT); RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST); RUNSTEP(par, dropindex, ST);
} }
return 0; return 0;
...@@ -2973,6 +3029,22 @@ tbuild(Par par) ...@@ -2973,6 +3029,22 @@ tbuild(Par par)
static int static int
tpkops(Par par) tpkops(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count());
RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
static int
tpkopsread(Par par)
{ {
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
...@@ -2981,7 +3053,7 @@ tpkops(Par par) ...@@ -2981,7 +3053,7 @@ tpkops(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdatescanread, MT); RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
} }
...@@ -3000,7 +3072,7 @@ tmixedops(Par par) ...@@ -3000,7 +3072,7 @@ tmixedops(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, mixedoperations, MT); RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
} }
...@@ -3014,7 +3086,7 @@ tbusybuild(Par par) ...@@ -3014,7 +3086,7 @@ tbusybuild(Par par)
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdateindexbuild, MT); RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
...@@ -3030,7 +3102,7 @@ ttimebuild(Par par) ...@@ -3030,7 +3102,7 @@ ttimebuild(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
t1.on(); t1.on();
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
...@@ -3049,7 +3121,7 @@ ttimemaint(Par par) ...@@ -3049,7 +3121,7 @@ ttimemaint(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
t1.on(); t1.on();
RUNSTEP(par, pkupdate, MT); RUNSTEP(par, pkupdate, MT);
...@@ -3074,7 +3146,7 @@ ttimescan(Par par) ...@@ -3074,7 +3146,7 @@ ttimescan(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
par.m_tmr = &t1; par.m_tmr = &t1;
...@@ -3096,7 +3168,7 @@ ttimepkread(Par par) ...@@ -3096,7 +3168,7 @@ ttimepkread(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
par.m_tmr = &t1; par.m_tmr = &t1;
...@@ -3132,9 +3204,10 @@ struct TCase { ...@@ -3132,9 +3204,10 @@ struct TCase {
static const TCase static const TCase
tcaselist[] = { tcaselist[] = {
TCase("a", tbuild, "index build"), TCase("a", tbuild, "index build"),
TCase("b", tpkops, "pk operations and scan reads"), TCase("b", tpkops, "pk operations"),
TCase("c", tmixedops, "pk operations and scan operations"), TCase("c", tpkopsread, "pk operations and scan reads"),
TCase("d", tbusybuild, "pk operations and index build"), TCase("d", tmixedops, "pk operations and scan operations"),
TCase("e", tbusybuild, "pk operations and index build"),
TCase("t", ttimebuild, "time index build"), TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"), TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"), TCase("v", ttimescan, "time full scan table vs index on pk"),
...@@ -3192,10 +3265,10 @@ runtest(Par par) ...@@ -3192,10 +3265,10 @@ runtest(Par par)
Thr& thr = *g_thrlist[n]; Thr& thr = *g_thrlist[n];
assert(thr.m_thread != 0); assert(thr.m_thread != 0);
} }
for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) { for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
LL1("loop " << l); LL1("loop " << par.m_lno);
if (par.m_seed == 0) if (par.m_seed == 0)
srandom(l); srandom(par.m_lno);
for (unsigned i = 0; i < tcasecount; i++) { for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i]; const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment