Commit 4a9f7745 authored by unknown's avatar unknown

NDB wl-1533 tux optim: after wl-1942 can remove signal from many methods


ndb/src/kernel/blocks/dbtux/Dbtux.hpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/test/ndbapi/testOIBasic.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
parent aa6fb646
...@@ -32,7 +32,6 @@ ...@@ -32,7 +32,6 @@
// signal classes // signal classes
#include <signaldata/DictTabInfo.hpp> #include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp> #include <signaldata/TuxContinueB.hpp>
#include <signaldata/BuildIndx.hpp>
#include <signaldata/TupFrag.hpp> #include <signaldata/TupFrag.hpp>
#include <signaldata/AlterIndx.hpp> #include <signaldata/AlterIndx.hpp>
#include <signaldata/DropTab.hpp> #include <signaldata/DropTab.hpp>
...@@ -478,7 +477,7 @@ private: ...@@ -478,7 +477,7 @@ private:
Uint16 m_numAttrs; Uint16 m_numAttrs;
bool m_storeNullKey; bool m_storeNullKey;
TreeHead m_tree; TreeHead m_tree;
TupLoc m_freeLoc; // one node pre-allocated for insert TupLoc m_freeLoc; // list of free index nodes
DLList<ScanOp> m_scanList; // current scans on this fragment DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI; Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2]; Uint32 m_tupTableFragPtrI[2];
...@@ -582,24 +581,24 @@ private: ...@@ -582,24 +581,24 @@ private:
* DbtuxNode.cpp * DbtuxNode.cpp
*/ */
int allocNode(Signal* signal, NodeHandle& node); int allocNode(Signal* signal, NodeHandle& node);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc); void selectNode(NodeHandle& node, TupLoc loc);
void insertNode(Signal* signal, NodeHandle& node); void insertNode(NodeHandle& node);
void deleteNode(Signal* signal, NodeHandle& node); void deleteNode(NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node); void setNodePref(NodeHandle& node);
// node operations // node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList); void nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
void nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos); void nodePushUpScans(NodeHandle& node, unsigned pos);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList); void nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
void nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos); void nodePopDownScans(NodeHandle& node, unsigned pos);
void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList); void nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
void nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos); void nodePushDownScans(NodeHandle& node, unsigned pos);
void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList); void nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
void nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos); void nodePopUpScans(NodeHandle& node, unsigned pos);
void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i); void nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
// scans linked to node // scans linked to node
void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList); void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList);
void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList); void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList);
void moveScanList(Signal* signal, NodeHandle& node, unsigned pos); void moveScanList(NodeHandle& node, unsigned pos);
void linkScan(NodeHandle& node, ScanOpPtr scanPtr); void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr); void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr); bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
...@@ -608,20 +607,20 @@ private: ...@@ -608,20 +607,20 @@ private:
* DbtuxTree.cpp * DbtuxTree.cpp
*/ */
// add entry // add entry
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent); void treeAdd(Frag& frag, TreePos treePos, TreeEnt ent);
void treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent); void treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent);
void treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i); void treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i);
void treeAddRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i); void treeAddRebalance(Frag& frag, NodeHandle node, unsigned i);
// remove entry // remove entry
void treeRemove(Signal* signal, Frag& frag, TreePos treePos); void treeRemove(Frag& frag, TreePos treePos);
void treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos); void treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos);
void treeRemoveSemi(Signal* signal, Frag& frag, NodeHandle node, unsigned i); void treeRemoveSemi(Frag& frag, NodeHandle node, unsigned i);
void treeRemoveLeaf(Signal* signal, Frag& frag, NodeHandle node); void treeRemoveLeaf(Frag& frag, NodeHandle node);
void treeRemoveNode(Signal* signal, Frag& frag, NodeHandle node); void treeRemoveNode(Frag& frag, NodeHandle node);
void treeRemoveRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i); void treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i);
// rotate // rotate
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); void treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); void treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i);
/* /*
* DbtuxScan.cpp * DbtuxScan.cpp
...@@ -633,9 +632,9 @@ private: ...@@ -633,9 +632,9 @@ private:
void execACCKEYCONF(Signal* signal); void execACCKEYCONF(Signal* signal);
void execACCKEYREF(Signal* signal); void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal); void execACC_ABORTCONF(Signal* signal);
void scanFirst(Signal* signal, ScanOpPtr scanPtr); void scanFirst(ScanOpPtr scanPtr);
void scanNext(Signal* signal, ScanOpPtr scanPtr); void scanNext(ScanOpPtr scanPtr);
bool scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr); void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp); void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
...@@ -644,9 +643,9 @@ private: ...@@ -644,9 +643,9 @@ private:
/* /*
* DbtuxSearch.cpp * DbtuxSearch.cpp
*/ */
void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/* /*
* DbtuxCmp.cpp * DbtuxCmp.cpp
...@@ -670,7 +669,7 @@ private: ...@@ -670,7 +669,7 @@ private:
PrintPar(); PrintPar();
}; };
void printTree(Signal* signal, Frag& frag, NdbOut& out); void printTree(Signal* signal, Frag& frag, NdbOut& out);
void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par); void printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
friend class NdbOut& operator<<(NdbOut&, const TupLoc&); friend class NdbOut& operator<<(NdbOut&, const TupLoc&);
friend class NdbOut& operator<<(NdbOut&, const TreeEnt&); friend class NdbOut& operator<<(NdbOut&, const TreeEnt&);
friend class NdbOut& operator<<(NdbOut&, const TreeNode&); friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
......
...@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
strcpy(par.m_path, "."); strcpy(par.m_path, ".");
par.m_side = 2; par.m_side = 2;
par.m_parent = NullTupLoc; par.m_parent = NullTupLoc;
printNode(signal, frag, out, tree.m_root, par); printNode(frag, out, tree.m_root, par);
out.m_out->flush(); out.m_out->flush();
if (! par.m_ok) { if (! par.m_ok) {
if (debugFile == 0) { if (debugFile == 0) {
...@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
} }
void void
Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par) Dbtux::printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
{ {
if (loc == NullTupLoc) { if (loc == NullTupLoc) {
par.m_depth = 0; par.m_depth = 0;
...@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
} }
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, loc); selectNode(node, loc);
out << par.m_path << " " << node << endl; out << par.m_path << " " << node << endl;
// check children // check children
PrintPar cpar[2]; PrintPar cpar[2];
...@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i; cpar[i].m_side = i;
cpar[i].m_depth = 0; cpar[i].m_depth = 0;
cpar[i].m_parent = loc; cpar[i].m_parent = loc;
printNode(signal, frag, out, node.getLink(i), cpar[i]); printNode(frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) { if (! cpar[i].m_ok) {
par.m_ok = false; par.m_ok = false;
} }
......
...@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
switch (opCode) { switch (opCode) {
case TuxMaintReq::OpAdd: case TuxMaintReq::OpAdd:
jam(); jam();
searchToAdd(signal, frag, c_searchKey, ent, treePos); searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
...@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break; break;
} }
/* /*
* At most one new node is inserted in the operation. We keep one * At most one new node is inserted in the operation. Pre-allocate
* free node pre-allocated so the operation cannot fail. * it so that the operation cannot fail.
*/ */
if (frag.m_freeLoc == NullTupLoc) { if (frag.m_freeLoc == NullTupLoc) {
jam(); jam();
...@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
jam(); jam();
break; break;
} }
// link to freelist
node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc; frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc); ndbrequire(frag.m_freeLoc != NullTupLoc);
} }
treeAdd(signal, frag, treePos, ent); treeAdd(frag, treePos, ent);
break; break;
case TuxMaintReq::OpRemove: case TuxMaintReq::OpRemove:
jam(); jam();
searchToRemove(signal, frag, c_searchKey, ent, treePos); searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
...@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
} }
break; break;
} }
treeRemove(signal, frag, treePos); treeRemove(frag, treePos);
break; break;
default: default:
ndbrequire(false); ndbrequire(false);
......
...@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node) ...@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
* Set handle to point to existing node. * Set handle to point to existing node.
*/ */
void void
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc) Dbtux::selectNode(NodeHandle& node, TupLoc loc)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc); ndbrequire(loc != NullTupLoc);
...@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc) ...@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
} }
/* /*
* Set handle to point to new node. Uses the pre-allocated node. * Set handle to point to new node. Uses a pre-allocated node.
*/ */
void void
Dbtux::insertNode(Signal* signal, NodeHandle& node) Dbtux::insertNode(NodeHandle& node)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TupLoc loc = frag.m_freeLoc; // unlink from freelist
frag.m_freeLoc = NullTupLoc; selectNode(node, frag.m_freeLoc);
selectNode(signal, node, loc); frag.m_freeLoc = node.getLink(0);
new (node.m_node) TreeNode(); new (node.m_node) TreeNode();
#ifdef VM_TRACE #ifdef VM_TRACE
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node) ...@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node)
} }
/* /*
* Delete existing node. * Delete existing node. Simply put it on the freelist.
*/ */
void void
Dbtux::deleteNode(Signal* signal, NodeHandle& node) Dbtux::deleteNode(NodeHandle& node)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
ndbrequire(node.getOccup() == 0); ndbrequire(node.getOccup() == 0);
TupLoc loc = node.m_loc; // link to freelist
Uint32 pageId = loc.getPageId(); node.setLink(0, frag.m_freeLoc);
Uint32 pageOffset = loc.getPageOffset(); frag.m_freeLoc = node.m_loc;
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node); // invalidate the handle
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
jamEntry();
// invalidate handle and storage
node.m_loc = NullTupLoc; node.m_loc = NullTupLoc;
node.m_node = 0; node.m_node = 0;
} }
...@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node) ...@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead * attribute headers for now. XXX use null mask instead
*/ */
void void
Dbtux::setNodePref(Signal* signal, NodeHandle& node) Dbtux::setNodePref(NodeHandle& node)
{ {
const Frag& frag = node.m_frag; const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
...@@ -121,7 +118,7 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node) ...@@ -121,7 +118,7 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* Add list of scans at the new entry. * Add list of scans at the new entry.
*/ */
void void
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList) Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -129,7 +126,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -129,7 +126,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
ndbrequire(occup < tree.m_maxOccup && pos <= occup); ndbrequire(occup < tree.m_maxOccup && pos <= occup);
// fix old scans // fix old scans
if (node.getNodeScan() != RNIL) if (node.getNodeScan() != RNIL)
nodePushUpScans(signal, node, pos); nodePushUpScans(node, pos);
// fix node // fix node
TreeEnt* const entList = tree.getEntList(node.m_node); TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0]; entList[occup] = entList[0];
...@@ -146,11 +143,11 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -146,11 +143,11 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
addScanList(node, pos, scanList); addScanList(node, pos, scanList);
// fix prefix // fix prefix
if (occup == 0 || pos == 0) if (occup == 0 || pos == 0)
setNodePref(signal, node); setNodePref(node);
} }
void void
Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos) Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos)
{ {
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
...@@ -187,7 +184,7 @@ Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos) ...@@ -187,7 +184,7 @@ Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos)
* else moved forward. * else moved forward.
*/ */
void void
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList) Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -196,12 +193,12 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, ...@@ -196,12 +193,12 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent,
if (node.getNodeScan() != RNIL) { if (node.getNodeScan() != RNIL) {
// remove or move scans at this position // remove or move scans at this position
if (scanList == 0) if (scanList == 0)
moveScanList(signal, node, pos); moveScanList(node, pos);
else else
removeScanList(node, pos, *scanList); removeScanList(node, pos, *scanList);
// fix other scans // fix other scans
if (node.getNodeScan() != RNIL) if (node.getNodeScan() != RNIL)
nodePopDownScans(signal, node, pos); nodePopDownScans(node, pos);
} }
// fix node // fix node
TreeEnt* const entList = tree.getEntList(node.m_node); TreeEnt* const entList = tree.getEntList(node.m_node);
...@@ -216,11 +213,11 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, ...@@ -216,11 +213,11 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent,
node.setOccup(occup - 1); node.setOccup(occup - 1);
// fix prefix // fix prefix
if (occup != 1 && pos == 0) if (occup != 1 && pos == 0)
setNodePref(signal, node); setNodePref(node);
} }
void void
Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos) Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos)
{ {
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
...@@ -258,7 +255,7 @@ Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos) ...@@ -258,7 +255,7 @@ Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos)
* Return list of scans at the removed position 0. * Return list of scans at the removed position 0.
*/ */
void void
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList) Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -269,7 +266,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -269,7 +266,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
removeScanList(node, 0, scanList); removeScanList(node, 0, scanList);
// fix other scans // fix other scans
if (node.getNodeScan() != RNIL) if (node.getNodeScan() != RNIL)
nodePushDownScans(signal, node, pos); nodePushDownScans(node, pos);
} }
// fix node // fix node
TreeEnt* const entList = tree.getEntList(node.m_node); TreeEnt* const entList = tree.getEntList(node.m_node);
...@@ -285,11 +282,11 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -285,11 +282,11 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
entList[0] = entList[occup]; entList[0] = entList[occup];
// fix prefix // fix prefix
if (true) if (true)
setNodePref(signal, node); setNodePref(node);
} }
void void
Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos) Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos)
{ {
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
...@@ -328,7 +325,7 @@ Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos) ...@@ -328,7 +325,7 @@ Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos)
* Move scans at removed entry and add scans at the new entry. * Move scans at removed entry and add scans at the new entry.
*/ */
void void
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList) Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -336,10 +333,10 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, U ...@@ -336,10 +333,10 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, U
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
if (node.getNodeScan() != RNIL) { if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // move scans whose entry disappears
moveScanList(signal, node, pos); moveScanList(node, pos);
// fix other scans // fix other scans
if (node.getNodeScan() != RNIL) if (node.getNodeScan() != RNIL)
nodePopUpScans(signal, node, pos); nodePopUpScans(node, pos);
} }
// fix node // fix node
TreeEnt* const entList = tree.getEntList(node.m_node); TreeEnt* const entList = tree.getEntList(node.m_node);
...@@ -358,11 +355,11 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, U ...@@ -358,11 +355,11 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, U
addScanList(node, 0, scanList); addScanList(node, 0, scanList);
// fix prefix // fix prefix
if (true) if (true)
setNodePref(signal, node); setNodePref(node);
} }
void void
Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos) Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos)
{ {
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
...@@ -392,7 +389,7 @@ Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos) ...@@ -392,7 +389,7 @@ Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos)
* (i=0) or after the max (i=1). Expensive but not often used. * (i=0) or after the max (i=1). Expensive but not often used.
*/ */
void void
Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i) Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
{ {
Frag& frag = dstNode.m_frag; Frag& frag = dstNode.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -400,8 +397,8 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig ...@@ -400,8 +397,8 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig
while (cnt != 0) { while (cnt != 0) {
TreeEnt ent; TreeEnt ent;
Uint32 scanList = RNIL; Uint32 scanList = RNIL;
nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList); nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList); nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
cnt--; cnt--;
} }
} }
...@@ -476,7 +473,7 @@ Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList) ...@@ -476,7 +473,7 @@ Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
* method scanNext(). * method scanNext().
*/ */
void void
Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos) Dbtux::moveScanList(NodeHandle& node, unsigned pos)
{ {
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
...@@ -494,7 +491,7 @@ Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos) ...@@ -494,7 +491,7 @@ Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos)
debugOut << "At pos=" << pos << " " << node << endl; debugOut << "At pos=" << pos << " " << node << endl;
} }
#endif #endif
scanNext(signal, scanPtr); scanNext(scanPtr);
ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos)); ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
} }
scanPtr.i = nextPtrI; scanPtr.i = nextPtrI;
......
...@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jam(); jam();
const TupLoc loc = scan.m_scanPos.m_loc; const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, loc); selectNode(node, loc);
unlinkScan(node, scanPtr); unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc; scan.m_scanPos.m_loc = NullTupLoc;
} }
...@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::First) { if (scan.m_state == ScanOp::First) {
jam(); jam();
// search is done only once in single range scan // search is done only once in single range scan
scanFirst(signal, scanPtr); scanFirst(scanPtr);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & DebugScan) {
debugOut << "First scan " << scanPtr.i << " " << scan << endl; debugOut << "First scan " << scanPtr.i << " " << scan << endl;
...@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::Next) { if (scan.m_state == ScanOp::Next) {
jam(); jam();
// look for next // look for next
scanNext(signal, scanPtr); scanNext(scanPtr);
} }
// for reading tuple key in Current or Locked state // for reading tuple key in Current or Locked state
Data pkData = c_dataBuffer; Data pkData = c_dataBuffer;
...@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal) ...@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
* by scanNext. * by scanNext.
*/ */
void void
Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanFirst(ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
...@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
} }
// search for scan start position // search for scan start position
TreePos treePos; TreePos treePos;
searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos); searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
if (treePos.m_loc == NullTupLoc) { if (treePos.m_loc == NullTupLoc) {
// empty tree // empty tree
jam(); jam();
...@@ -710,7 +710,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -710,7 +710,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
// link the scan to node found // link the scan to node found
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc); selectNode(node, treePos.m_loc);
linkScan(node, scanPtr); linkScan(node, scanPtr);
} }
...@@ -730,7 +730,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -730,7 +730,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* re-organizations need not worry about scan direction. * re-organizations need not worry about scan direction.
*/ */
void void
Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanNext(ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
...@@ -739,20 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -739,20 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
debugOut << "Next in scan " << scanPtr.i << " " << scan << endl; debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
if (scan.m_state == ScanOp::Locked) { // cannot be moved away from tuple we have locked
jam(); ndbrequire(scan.m_state != ScanOp::Locked);
// version of a tuple locked by us cannot disappear (assert only)
ndbassert(false);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current;
}
// set up index keys for this operation // set up index keys for this operation
setKeyAttrs(frag); setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer // unpack upper bound into c_dataBuffer
...@@ -768,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -768,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
TreePos pos = scan.m_scanPos; TreePos pos = scan.m_scanPos;
// get and remember original node // get and remember original node
NodeHandle origNode(frag); NodeHandle origNode(frag);
selectNode(signal, origNode, pos.m_loc); selectNode(origNode, pos.m_loc);
ndbrequire(islinkScan(origNode, scanPtr)); ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop // current node in loop
NodeHandle node = origNode; NodeHandle node = origNode;
...@@ -785,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -785,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
} }
if (node.m_loc != pos.m_loc) { if (node.m_loc != pos.m_loc) {
jam(); jam();
selectNode(signal, node, pos.m_loc); selectNode(node, pos.m_loc);
} }
if (pos.m_dir == 4) { if (pos.m_dir == 4) {
// coming down from parent proceed to left child // coming down from parent proceed to left child
...@@ -833,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -833,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break; break;
} }
// can we see it // can we see it
if (! scanVisible(signal, scanPtr, ent)) { if (! scanVisible(scanPtr, ent)) {
jam(); jam();
continue; continue;
} }
...@@ -896,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -896,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
* which are not analyzed or handled yet. * which are not analyzed or handled yet.
*/ */
bool bool
Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent) Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
{ {
const ScanOp& scan = *scanPtr.p; const ScanOp& scan = *scanPtr.p;
const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max * TODO optimize for initial equal attrs in node min/max
*/ */
void void
Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
...@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
unsigned start = 0; unsigned start = 0;
...@@ -164,7 +164,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -164,7 +164,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
* to it. * to it.
*/ */
void void
Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
...@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s ...@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
unsigned start = 0; unsigned start = 0;
...@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s ...@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
* Similar to searchToAdd. * Similar to searchToAdd.
*/ */
void void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos) Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag); NodeHandle currNode(frag);
...@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo ...@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize); ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
......
This diff is collapsed.
...@@ -42,7 +42,7 @@ struct Opt { ...@@ -42,7 +42,7 @@ struct Opt {
CHARSET_INFO* m_cs; CHARSET_INFO* m_cs;
bool m_dups; bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype; NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop; unsigned m_subsubloop;
const char* m_index; const char* m_index;
unsigned m_loop; unsigned m_loop;
bool m_nologging; bool m_nologging;
...@@ -66,7 +66,7 @@ struct Opt { ...@@ -66,7 +66,7 @@ struct Opt {
m_cs(0), m_cs(0),
m_dups(false), m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined), m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4), m_subsubloop(4),
m_index(0), m_index(0),
m_loop(1), m_loop(1),
m_nologging(false), m_nologging(false),
...@@ -79,7 +79,7 @@ struct Opt { ...@@ -79,7 +79,7 @@ struct Opt {
m_seed(0), m_seed(0),
m_subloop(4), m_subloop(4),
m_table(0), m_table(0),
m_threads(4), m_threads(6), // table + 5 indexes
m_v(1) { m_v(1) {
} }
}; };
...@@ -208,6 +208,8 @@ struct Par : public Opt { ...@@ -208,6 +208,8 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; } Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr; Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; } Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
unsigned m_lno;
unsigned m_slno;
unsigned m_totrows; unsigned m_totrows;
// value calculation // value calculation
unsigned m_range; unsigned m_range;
...@@ -226,6 +228,8 @@ struct Par : public Opt { ...@@ -226,6 +228,8 @@ struct Par : public Opt {
m_tab(0), m_tab(0),
m_set(0), m_set(0),
m_tmr(0), m_tmr(0),
m_lno(0),
m_slno(0),
m_totrows(m_threads * m_rows), m_totrows(m_threads * m_rows),
m_range(m_rows), m_range(m_rows),
m_pctrange(0), m_pctrange(0),
...@@ -2069,7 +2073,8 @@ pkinsert(Par par) ...@@ -2069,7 +2073,8 @@ pkinsert(Par par)
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
Lst lst; Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock(); set.lock();
if (set.exist(i) || set.pending(i)) { if (set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
...@@ -2174,7 +2179,8 @@ pkdelete(Par par) ...@@ -2174,7 +2179,8 @@ pkdelete(Par par)
Lst lst; Lst lst;
bool deadlock = false; bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock(); set.lock();
if (! set.exist(i) || set.pending(i)) { if (! set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
...@@ -2398,7 +2404,7 @@ static int ...@@ -2398,7 +2404,7 @@ static int
scanreadindex(Par par, const ITab& itab) scanreadindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) { for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); bset.calc(par);
CHK(scanreadindex(par, itab, bset) == 0); CHK(scanreadindex(par, itab, bset) == 0);
...@@ -2645,7 +2651,7 @@ static int ...@@ -2645,7 +2651,7 @@ static int
scanupdateindex(Par par, const ITab& itab) scanupdateindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) { for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0); CHK(scanupdateindex(par, itab, bset) == 0);
...@@ -2685,6 +2691,53 @@ readverify(Par par) ...@@ -2685,6 +2691,53 @@ readverify(Par par)
return 0; return 0;
} }
static int
readverifyfull(Par par)
{
par.m_verify = true;
if (par.m_no == 0)
CHK(scanreadtable(par) == 0);
else {
const Tab& tab = par.tab();
unsigned i = par.m_no;
if (i <= tab.m_itabs && useindex(i)) {
const ITab& itab = tab.m_itab[i - 1];
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset) == 0);
}
}
return 0;
}
static int
pkops(Par par)
{
par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
unsigned sel = urandom(10);
if (par.m_slno % 2 == 0) {
// favor insert
if (sel < 8) {
CHK(pkinsert(par) == 0);
} else if (sel < 9) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
} else {
// favor delete
if (sel < 1) {
CHK(pkinsert(par) == 0);
} else if (sel < 2) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
}
}
return 0;
}
static int static int
pkupdatescanread(Par par) pkupdatescanread(Par par)
{ {
...@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode) ...@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
thr.m_par.m_tab = par.m_tab; thr.m_par.m_tab = par.m_tab;
thr.m_par.m_set = par.m_set; thr.m_par.m_set = par.m_set;
thr.m_par.m_tmr = par.m_tmr; thr.m_par.m_tmr = par.m_tmr;
thr.m_par.m_lno = par.m_lno;
thr.m_par.m_slno = par.m_slno;
thr.m_func = func; thr.m_func = func;
thr.start(); thr.start();
} }
...@@ -2953,8 +3008,8 @@ tbuild(Par par) ...@@ -2953,8 +3008,8 @@ tbuild(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
if (i % 2 == 0) { if (par.m_slno % 2 == 0) {
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
...@@ -2964,9 +3019,9 @@ tbuild(Par par) ...@@ -2964,9 +3019,9 @@ tbuild(Par par)
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
} }
RUNSTEP(par, pkupdate, MT); RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, pkdelete, MT); RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST); RUNSTEP(par, dropindex, ST);
} }
return 0; return 0;
...@@ -2974,6 +3029,22 @@ tbuild(Par par) ...@@ -2974,6 +3029,22 @@ tbuild(Par par)
static int static int
tpkops(Par par) tpkops(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count());
RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
static int
tpkopsread(Par par)
{ {
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
...@@ -2982,7 +3053,7 @@ tpkops(Par par) ...@@ -2982,7 +3053,7 @@ tpkops(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdatescanread, MT); RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
} }
...@@ -3001,7 +3072,7 @@ tmixedops(Par par) ...@@ -3001,7 +3072,7 @@ tmixedops(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, mixedoperations, MT); RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
} }
...@@ -3015,7 +3086,7 @@ tbusybuild(Par par) ...@@ -3015,7 +3086,7 @@ tbusybuild(Par par)
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdateindexbuild, MT); RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
...@@ -3031,7 +3102,7 @@ ttimebuild(Par par) ...@@ -3031,7 +3102,7 @@ ttimebuild(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
t1.on(); t1.on();
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
...@@ -3050,7 +3121,7 @@ ttimemaint(Par par) ...@@ -3050,7 +3121,7 @@ ttimemaint(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
t1.on(); t1.on();
RUNSTEP(par, pkupdate, MT); RUNSTEP(par, pkupdate, MT);
...@@ -3075,7 +3146,7 @@ ttimescan(Par par) ...@@ -3075,7 +3146,7 @@ ttimescan(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
par.m_tmr = &t1; par.m_tmr = &t1;
...@@ -3097,7 +3168,7 @@ ttimepkread(Par par) ...@@ -3097,7 +3168,7 @@ ttimepkread(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
par.m_tmr = &t1; par.m_tmr = &t1;
...@@ -3133,9 +3204,10 @@ struct TCase { ...@@ -3133,9 +3204,10 @@ struct TCase {
static const TCase static const TCase
tcaselist[] = { tcaselist[] = {
TCase("a", tbuild, "index build"), TCase("a", tbuild, "index build"),
TCase("b", tpkops, "pk operations and scan reads"), TCase("b", tpkops, "pk operations"),
TCase("c", tmixedops, "pk operations and scan operations"), TCase("c", tpkopsread, "pk operations and scan reads"),
TCase("d", tbusybuild, "pk operations and index build"), TCase("d", tmixedops, "pk operations and scan operations"),
TCase("e", tbusybuild, "pk operations and index build"),
TCase("t", ttimebuild, "time index build"), TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"), TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"), TCase("v", ttimescan, "time full scan table vs index on pk"),
...@@ -3193,10 +3265,10 @@ runtest(Par par) ...@@ -3193,10 +3265,10 @@ runtest(Par par)
Thr& thr = *g_thrlist[n]; Thr& thr = *g_thrlist[n];
assert(thr.m_thread != 0); assert(thr.m_thread != 0);
} }
for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) { for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
LL1("loop " << l); LL1("loop " << par.m_lno);
if (par.m_seed == 0) if (par.m_seed == 0)
srandom(l); srandom(par.m_lno);
for (unsigned i = 0; i < tcasecount; i++) { for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i]; const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment