Commit b05d1a3a authored by pekka@mysql.com's avatar pekka@mysql.com

Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into mysql.com:/space/pekka/ndb/version/my41-tux
parents aed03906 11bfc5ec
......@@ -110,7 +110,7 @@
*/
#define MAX_TTREE_NODE_SIZE 64 // total words in node
#define MAX_TTREE_PREF_SIZE 4 // words in min prefix
#define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy
#define MAX_TTREE_NODE_SLACK 2 // diff between max and min occupancy
/*
* Blobs.
......
......@@ -32,7 +32,6 @@
// signal classes
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp>
#include <signaldata/BuildIndx.hpp>
#include <signaldata/TupFrag.hpp>
#include <signaldata/AlterIndx.hpp>
#include <signaldata/DropTab.hpp>
......@@ -478,7 +477,7 @@ private:
Uint16 m_numAttrs;
bool m_storeNullKey;
TreeHead m_tree;
TupLoc m_freeLoc; // one node pre-allocated for insert
TupLoc m_freeLoc; // list of free index nodes
DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2];
......@@ -582,17 +581,24 @@ private:
* DbtuxNode.cpp
*/
int allocNode(Signal* signal, NodeHandle& node);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc);
void insertNode(Signal* signal, NodeHandle& node);
void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node);
void selectNode(NodeHandle& node, TupLoc loc);
void insertNode(NodeHandle& node);
void deleteNode(NodeHandle& node);
void setNodePref(NodeHandle& node);
// node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i);
void nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
void nodePushUpScans(NodeHandle& node, unsigned pos);
void nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
void nodePopDownScans(NodeHandle& node, unsigned pos);
void nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
void nodePushDownScans(NodeHandle& node, unsigned pos);
void nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
void nodePopUpScans(NodeHandle& node, unsigned pos);
void nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
// scans linked to node
void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList);
void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList);
void moveScanList(NodeHandle& node, unsigned pos);
void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
......@@ -600,10 +606,21 @@ private:
/*
* DbtuxTree.cpp
*/
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
// add entry
void treeAdd(Frag& frag, TreePos treePos, TreeEnt ent);
void treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent);
void treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i);
void treeAddRebalance(Frag& frag, NodeHandle node, unsigned i);
// remove entry
void treeRemove(Frag& frag, TreePos treePos);
void treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos);
void treeRemoveSemi(Frag& frag, NodeHandle node, unsigned i);
void treeRemoveLeaf(Frag& frag, NodeHandle node);
void treeRemoveNode(Frag& frag, NodeHandle node);
void treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i);
// rotate
void treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i);
/*
* DbtuxScan.cpp
......@@ -615,9 +632,9 @@ private:
void execACCKEYCONF(Signal* signal);
void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal);
void scanFirst(Signal* signal, ScanOpPtr scanPtr);
void scanNext(Signal* signal, ScanOpPtr scanPtr);
bool scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent);
void scanFirst(ScanOpPtr scanPtr);
void scanNext(ScanOpPtr scanPtr);
bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
......@@ -626,9 +643,9 @@ private:
/*
* DbtuxSearch.cpp
*/
void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/*
* DbtuxCmp.cpp
......@@ -652,7 +669,7 @@ private:
PrintPar();
};
void printTree(Signal* signal, Frag& frag, NdbOut& out);
void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
void printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
friend class NdbOut& operator<<(NdbOut&, const TupLoc&);
friend class NdbOut& operator<<(NdbOut&, const TreeEnt&);
friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
......
......@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
strcpy(par.m_path, ".");
par.m_side = 2;
par.m_parent = NullTupLoc;
printNode(signal, frag, out, tree.m_root, par);
printNode(frag, out, tree.m_root, par);
out.m_out->flush();
if (! par.m_ok) {
if (debugFile == 0) {
......@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
}
void
Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
Dbtux::printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
{
if (loc == NullTupLoc) {
par.m_depth = 0;
......@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
}
TreeHead& tree = frag.m_tree;
NodeHandle node(frag);
selectNode(signal, node, loc);
selectNode(node, loc);
out << par.m_path << " " << node << endl;
// check children
PrintPar cpar[2];
......@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i;
cpar[i].m_depth = 0;
cpar[i].m_parent = loc;
printNode(signal, frag, out, node.getLink(i), cpar[i]);
printNode(frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) {
par.m_ok = false;
}
......@@ -178,16 +178,19 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl;
}
// check missed half-leaf/leaf merge
#ifdef dbtux_totally_groks_t_trees
// check missed semi-leaf/leaf merge
for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) != NullTupLoc &&
node.getLink(1 - i) == NullTupLoc &&
node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
// our semi-leaf seems to satify interior minOccup condition
node.getOccup() < tree.m_minOccup) {
par.m_ok = false;
out << par.m_path << sep;
out << "missed merge with child " << i << endl;
}
}
#endif
// check inline prefix
{ ConstData data1 = node.getPref();
Uint32 data2[MaxPrefSize];
......
......@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
switch (opCode) {
case TuxMaintReq::OpAdd:
jam();
searchToAdd(signal, frag, c_searchKey, ent, treePos);
searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
......@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
}
/*
* At most one new node is inserted in the operation. We keep one
* free node pre-allocated so the operation cannot fail.
* At most one new node is inserted in the operation. Pre-allocate
* it so that the operation cannot fail.
*/
if (frag.m_freeLoc == NullTupLoc) {
jam();
......@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
jam();
break;
}
// link to freelist
node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc);
}
treeAdd(signal, frag, treePos, ent);
treeAdd(frag, treePos, ent);
break;
case TuxMaintReq::OpRemove:
jam();
searchToRemove(signal, frag, c_searchKey, ent, treePos);
searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
......@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
}
break;
}
treeRemove(signal, frag, treePos);
treeRemove(frag, treePos);
break;
default:
ndbrequire(false);
......
......@@ -211,11 +211,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
// make these configurable later
tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
tree.m_prefSize = MAX_TTREE_PREF_SIZE;
#ifdef dbtux_min_occup_less_max_occup
const unsigned maxSlack = MAX_TTREE_NODE_SLACK;
#else
const unsigned maxSlack = 0;
#endif
// size up to and including first 2 entries
const unsigned pref = tree.getSize(AccPref);
if (! (pref <= tree.m_nodeSize)) {
......
......@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
* Set handle to point to existing node.
*/
void
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
Dbtux::selectNode(NodeHandle& node, TupLoc loc)
{
Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc);
......@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
}
/*
* Set handle to point to new node. Uses the pre-allocated node.
* Set handle to point to new node. Uses a pre-allocated node.
*/
void
Dbtux::insertNode(Signal* signal, NodeHandle& node)
Dbtux::insertNode(NodeHandle& node)
{
Frag& frag = node.m_frag;
TupLoc loc = frag.m_freeLoc;
frag.m_freeLoc = NullTupLoc;
selectNode(signal, node, loc);
// unlink from freelist
selectNode(node, frag.m_freeLoc);
frag.m_freeLoc = node.getLink(0);
new (node.m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
......@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node)
}
/*
* Delete existing node.
* Delete existing node. Simply put it on the freelist.
*/
void
Dbtux::deleteNode(Signal* signal, NodeHandle& node)
Dbtux::deleteNode(NodeHandle& node)
{
Frag& frag = node.m_frag;
ndbrequire(node.getOccup() == 0);
TupLoc loc = node.m_loc;
Uint32 pageId = loc.getPageId();
Uint32 pageOffset = loc.getPageOffset();
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
jamEntry();
// invalidate handle and storage
// link to freelist
node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc;
// invalidate the handle
node.m_loc = NullTupLoc;
node.m_node = 0;
}
......@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead
*/
void
Dbtux::setNodePref(Signal* signal, NodeHandle& node)
Dbtux::setNodePref(NodeHandle& node)
{
const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree;
......@@ -117,18 +114,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* v
* A B C D E _ _ => A B C X D E _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Add list of scans at the new entry.
*/
void
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent)
Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup < tree.m_maxOccup && pos <= occup);
// fix scans
// fix old scans
if (node.getNodeScan() != RNIL)
nodePushUpScans(node, pos);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// add new scans
if (scanList != RNIL)
addScanList(node, pos, scanList);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(node);
}
void
Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
......@@ -144,21 +168,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
scanPos.m_pos++;
}
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(signal, node);
} while (scanPtr.i != RNIL);
}
/*
......@@ -169,42 +179,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
* ^ ^
* A B C D E F _ => A B C E F _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Scans at removed entry are returned if non-zero location is passed or
* else moved forward.
*/
void
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
if (node.getNodeScan() != RNIL) {
// remove or move scans at this position
if (scanList == 0)
moveScanList(node, pos);
else
removeScanList(node, pos, *scanList);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePopDownScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popDown pos=" << pos << " " << node << endl;
}
#endif
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
tmpList[i] = tmpList[i + 1];
}
// fix other scans
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(node);
}
void
Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
// handled before
ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos > pos) {
jam();
......@@ -217,21 +240,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos--;
}
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(signal, node);
} while (scanPtr.i != RNIL);
}
/*
......@@ -242,43 +251,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
* ^ v ^
* A B C D E _ _ => B C D X E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Return list of scans at the removed position 0.
*/
void
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
if (node.getNodeScan() != RNIL) {
// remove scans at 0
removeScanList(node, 0, scanList);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePushDownScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
for (unsigned i = 0; i < pos; i++) {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == 0) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pushDown pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
tmpList[i] = tmpList[i + 1];
}
// fix other scans
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(node);
}
void
Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
// handled before
ndbrequire(scanPos.m_pos != 0);
if (scanPos.m_pos <= pos) {
jam();
......@@ -291,22 +309,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
scanPos.m_pos--;
}
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
for (unsigned i = 0; i < pos; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
} while (scanPtr.i != RNIL);
}
/*
......@@ -318,39 +321,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
* v ^ ^
* A B C D E _ _ => X A B C E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Move scans at removed entry and add scans at the new entry.
*/
void
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears
moveScanList(node, pos);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePopUpScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
ent = tmpList[pos];
for (unsigned i = pos; i > 0; i--) {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popUp pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
tmpList[i] = tmpList[i - 1];
}
// fix other scans
tmpList[0] = newMin;
entList[0] = entList[occup];
// add scans
if (scanList != RNIL)
addScanList(node, 0, scanList);
// fix prefix
if (true)
setNodePref(node);
}
void
Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
......@@ -367,41 +381,123 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos++;
}
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
ent = tmpList[pos];
for (unsigned i = pos; i > 0; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[0] = newMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
} while (scanPtr.i != RNIL);
}
/*
* Move all possible entries from another node before the min (i=0) or
* after the max (i=1). XXX can be optimized
* Move number of entries from another node to this node before the min
* (i=0) or after the max (i=1). Expensive but not often used.
*/
void
Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i)
Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
{
Frag& frag = dstNode.m_frag;
TreeHead& tree = frag.m_tree;
ndbrequire(i <= 1);
while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) {
while (cnt != 0) {
TreeEnt ent;
nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent);
nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent);
Uint32 scanList = RNIL;
nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
cnt--;
}
}
// scans linked to node
/*
* Add list of scans to node at given position.
*/
void
Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList)
{
ScanOpPtr scanPtr;
scanPtr.i = scanList;
do {
jam();
c_scanOpPool.getPtr(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "To pos=" << pos << " " << node << endl;
}
#endif
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
scanPtr.p->m_nodeScan = RNIL;
linkScan(node, scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
// set position but leave direction alone
scanPos.m_loc = node.m_loc;
scanPos.m_pos = pos;
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Remove list of scans from node at given position. The return
* location must point to existing list (in fact RNIL always).
*/
void
Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
do {
jam();
c_scanOpPool.getPtr(scanPtr);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "Fron pos=" << pos << " " << node << endl;
}
#endif
unlinkScan(node, scanPtr);
scanPtr.p->m_nodeScan = scanList;
scanList = scanPtr.i;
// unset position but leave direction alone
scanPos.m_loc = NullTupLoc;
scanPos.m_pos = ZNIL;
}
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Move list of scans away from entry about to be removed. Uses scan
* method scanNext().
*/
void
Dbtux::moveScanList(NodeHandle& node, unsigned pos)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
ndbrequire(scanPos.m_loc == node.m_loc);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pos=" << pos << " " << node << endl;
}
#endif
scanNext(scanPtr);
ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
}
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Link scan to the list under the node. The list is single-linked and
* ordering does not matter.
......
......@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jam();
const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandle node(frag);
selectNode(signal, node, loc);
selectNode(node, loc);
unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc;
}
......@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::First) {
jam();
// search is done only once in single range scan
scanFirst(signal, scanPtr);
scanFirst(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "First scan " << scanPtr.i << " " << scan << endl;
......@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::Next) {
jam();
// look for next
scanNext(signal, scanPtr);
scanNext(scanPtr);
}
// for reading tuple key in Current or Locked state
Data pkData = c_dataBuffer;
......@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
* by scanNext.
*/
void
Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
Dbtux::scanFirst(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
}
// search for scan start position
TreePos treePos;
searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
......@@ -710,13 +710,13 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Next;
// link the scan to node found
NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc);
selectNode(node, treePos.m_loc);
linkScan(node, scanPtr);
}
/*
* Move to next entry. The scan is already linked to some node. When
* we leave, if any entry was found, it will be linked to a possibly
* we leave, if an entry was found, it will be linked to a possibly
* different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of:
*
......@@ -725,9 +725,12 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* 2 - up from root (the scan ends)
* 3 - left to right within node (at end proceed to right child)
* 4 - down from parent (proceed to left child)
*
* If an entry was found, scan direction is 3. Therefore tree
* re-organizations need not worry about scan direction.
*/
void
Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
Dbtux::scanNext(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......@@ -736,22 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
}
#endif
if (scan.m_state == ScanOp::Locked) {
jam();
// version of a tuple locked by us cannot disappear (assert only)
#ifdef dbtux_wl_1942_is_done
ndbassert(false);
#endif
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current;
}
// cannot be moved away from tuple we have locked
ndbrequire(scan.m_state != ScanOp::Locked);
// set up index keys for this operation
setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer
......@@ -767,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
TreePos pos = scan.m_scanPos;
// get and remember original node
NodeHandle origNode(frag);
selectNode(signal, origNode, pos.m_loc);
selectNode(origNode, pos.m_loc);
ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop
NodeHandle node = origNode;
......@@ -784,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
}
if (node.m_loc != pos.m_loc) {
jam();
selectNode(signal, node, pos.m_loc);
selectNode(node, pos.m_loc);
}
if (pos.m_dir == 4) {
// coming down from parent proceed to left child
......@@ -832,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break;
}
// can we see it
if (! scanVisible(signal, scanPtr, ent)) {
if (! scanVisible(scanPtr, ent)) {
jam();
continue;
}
......@@ -864,6 +853,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos;
// relink
if (scan.m_state == ScanOp::Current) {
ndbrequire(pos.m_match == true && pos.m_dir == 3);
ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) {
jam();
......@@ -894,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
* which are not analyzed or handled yet.
*/
bool
Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent)
Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
{
const ScanOp& scan = *scanPtr.p;
const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......
......@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max
*/
void
Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
......@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc);
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
unsigned start = 0;
......@@ -159,12 +159,12 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
*
* Compares search key to each node min. A move to right subtree can
* overshoot target node. The last such node is saved. The final node
* is a half-leaf or leaf. If search key is less than final node min
* is a semi-leaf or leaf. If search key is less than final node min
* then the saved node is the g.l.b of the final node and we move back
* to it.
*/
void
Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
......@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc);
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
unsigned start = 0;
......@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
* Similar to searchToAdd.
*/
void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
......@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc);
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
......
......@@ -18,71 +18,105 @@
#include "Dbtux.hpp"
/*
* Add entry.
* Add entry. Handle the case when there is room for one more. This
* is the common case given slack in nodes.
*/
void
Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
Dbtux::treeAdd(Frag& frag, TreePos treePos, TreeEnt ent)
{
TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandle node(frag);
// check for empty tree
if (treePos.m_loc == NullTupLoc) {
jam();
insertNode(signal, node);
nodePushUp(signal, node, 0, ent);
node.setSide(2);
tree.m_root = node.m_loc;
return;
}
selectNode(signal, node, treePos.m_loc);
// check if it is bounding node
if (pos != 0 && pos != node.getOccup()) {
if (treePos.m_loc != NullTupLoc) {
// non-empty tree
jam();
// check if room for one more
selectNode(node, treePos.m_loc);
unsigned pos = treePos.m_pos;
if (node.getOccup() < tree.m_maxOccup) {
// node has room
jam();
nodePushUp(signal, node, pos, ent);
nodePushUp(node, pos, ent, RNIL);
return;
}
// returns min entry
nodePushDown(signal, node, pos - 1, ent);
// find position to add the removed min entry
TupLoc childLoc = node.getLink(0);
if (childLoc == NullTupLoc) {
treeAddFull(frag, node, pos, ent);
return;
}
jam();
insertNode(node);
nodePushUp(node, 0, ent, RNIL);
node.setSide(2);
tree.m_root = node.m_loc;
}
/*
* Add entry when node is full. Handle the case when there is g.l.b
* node in left subtree with room for one more. It will receive the min
* entry of this node. The min entry could be the entry to add.
*/
void
Dbtux::treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent)
{
TreeHead& tree = frag.m_tree;
TupLoc loc = lubNode.getLink(0);
if (loc != NullTupLoc) {
// find g.l.b node
NodeHandle glbNode(frag);
do {
jam();
// left child will be added
pos = 0;
} else {
selectNode(glbNode, loc);
loc = glbNode.getLink(1);
} while (loc != NullTupLoc);
if (glbNode.getOccup() < tree.m_maxOccup) {
// g.l.b node has room
jam();
// find glb node
while (childLoc != NullTupLoc) {
Uint32 scanList = RNIL;
if (pos != 0) {
jam();
selectNode(signal, node, childLoc);
childLoc = node.getLink(1);
// add the new entry and return min entry
nodePushDown(lubNode, pos - 1, ent, scanList);
}
pos = node.getOccup();
// g.l.b node receives min entry from l.u.b node
nodePushUp(glbNode, glbNode.getOccup(), ent, scanList);
return;
}
// fall thru to next case
}
// adding new min or max
unsigned i = (pos == 0 ? 0 : 1);
ndbrequire(node.getLink(i) == NullTupLoc);
// check if the half-leaf/leaf has room for one more
if (node.getOccup() < tree.m_maxOccup) {
jam();
nodePushUp(signal, node, pos, ent);
treeAddNode(frag, lubNode, pos, ent, glbNode, 1);
return;
}
// add a new node
NodeHandle childNode(frag);
insertNode(signal, childNode);
nodePushUp(signal, childNode, 0, ent);
treeAddNode(frag, lubNode, pos, ent, lubNode, 0);
}
/*
* Add entry when there is no g.l.b node in left subtree or the g.l.b
* node is full. We must add a new left or right child node which
* becomes the new g.l.b node.
*/
void
Dbtux::treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i)
{
NodeHandle glbNode(frag);
insertNode(glbNode);
// connect parent and child
node.setLink(i, childNode.m_loc);
childNode.setLink(2, node.m_loc);
childNode.setSide(i);
// re-balance tree at each node
parentNode.setLink(i, glbNode.m_loc);
glbNode.setLink(2, parentNode.m_loc);
glbNode.setSide(i);
Uint32 scanList = RNIL;
if (pos != 0) {
jam();
// add the new entry and return min entry
nodePushDown(lubNode, pos - 1, ent, scanList);
}
// g.l.b node receives min entry from l.u.b node
nodePushUp(glbNode, 0, ent, scanList);
// re-balance the tree
treeAddRebalance(frag, parentNode, i);
}
/*
* Re-balance tree after adding a node. The process starts with the
* parent of the added node.
*/
void
Dbtux::treeAddRebalance(Frag& frag, NodeHandle node, unsigned i)
{
while (true) {
// height of subtree i has increased by 1
int j = (i == 0 ? -1 : +1);
......@@ -102,14 +136,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// height of longer subtree increased
jam();
NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(i));
selectNode(childNode, node.getLink(i));
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
treeRotateSingle(signal, frag, node, i);
treeRotateSingle(frag, node, i);
} else if (b2 == -b) {
jam();
treeRotateDouble(signal, frag, node, i);
treeRotateDouble(frag, node, i);
} else {
// height of subtree increased so it cannot be perfectly balanced
ndbrequire(false);
......@@ -126,115 +160,169 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
break;
}
i = node.getSide();
selectNode(signal, node, parentLoc);
selectNode(node, parentLoc);
}
}
/*
* Remove entry.
* Remove entry. Optimize for nodes with slack. Handle the case when
* there is no underflow i.e. occupancy remains at least minOccup. For
* interior nodes this is a requirement. For others it means that we do
* not need to consider merge of semi-leaf and leaf.
*/
void
Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
Dbtux::treeRemove(Frag& frag, TreePos treePos)
{
TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc);
selectNode(node, treePos.m_loc);
TreeEnt ent;
// check interior node first
if (node.getChilds() == 2) {
if (node.getOccup() > tree.m_minOccup) {
// no underflow in any node type
jam();
ndbrequire(node.getOccup() >= tree.m_minOccup);
// check if no underflow
if (node.getOccup() > tree.m_minOccup) {
jam();
nodePopDown(signal, node, pos, ent);
return;
}
// save current handle
NodeHandle parentNode = node;
// find glb node
TupLoc childLoc = node.getLink(0);
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, node, childLoc);
childLoc = node.getLink(1);
}
// use glb max as new parent min
ent = node.getEnt(node.getOccup() - 1);
nodePopUp(signal, parentNode, pos, ent);
// set up to remove glb max
pos = node.getOccup() - 1;
// fall thru to next case
nodePopDown(node, pos, ent, 0);
return;
}
// remove the element
nodePopDown(signal, node, pos, ent);
ndbrequire(node.getChilds() <= 1);
// handle half-leaf
unsigned i;
for (i = 0; i <= 1; i++) {
if (node.getChilds() == 2) {
// underflow in interior node
jam();
TupLoc childLoc = node.getLink(i);
if (childLoc != NullTupLoc) {
// move to child
selectNode(signal, node, childLoc);
// balance of half-leaf parent requires child to be leaf
break;
}
treeRemoveInner(frag, node, pos);
return;
}
ndbrequire(node.getChilds() == 0);
// get parent if any
TupLoc parentLoc = node.getLink(2);
NodeHandle parentNode(frag);
i = node.getSide();
// move all that fits into parent
if (parentLoc != NullTupLoc) {
// remove entry in semi/leaf
nodePopDown(node, pos, ent, 0);
if (node.getLink(0) != NullTupLoc) {
jam();
selectNode(signal, parentNode, node.getLink(2));
nodeSlide(signal, parentNode, node, i);
// fall thru to next case
treeRemoveSemi(frag, node, 0);
return;
}
// non-empty leaf
if (node.getOccup() >= 1) {
if (node.getLink(1) != NullTupLoc) {
jam();
treeRemoveSemi(frag, node, 1);
return;
}
// remove empty leaf
deleteNode(signal, node);
if (parentLoc == NullTupLoc) {
treeRemoveLeaf(frag, node);
}
/*
* Remove entry when interior node underflows. There is g.l.b node in
* left subtree to borrow an entry from. The max entry of the g.l.b
* node becomes the min entry of this node.
*/
void
Dbtux::treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos)
{
TreeHead& tree = frag.m_tree;
TreeEnt ent;
// find g.l.b node
NodeHandle glbNode(frag);
TupLoc loc = lubNode.getLink(0);
do {
jam();
selectNode(glbNode, loc);
loc = glbNode.getLink(1);
} while (loc != NullTupLoc);
// borrow max entry from semi/leaf
Uint32 scanList = RNIL;
nodePopDown(glbNode, glbNode.getOccup() - 1, ent, &scanList);
nodePopUp(lubNode, pos, ent, scanList);
if (glbNode.getLink(0) != NullTupLoc) {
jam();
// tree is now empty
tree.m_root = NullTupLoc;
treeRemoveSemi(frag, glbNode, 0);
return;
}
node = parentNode;
node.setLink(i, NullTupLoc);
#ifdef dbtux_min_occup_less_max_occup
// check if we created a half-leaf
if (node.getBalance() == 0) {
treeRemoveLeaf(frag, glbNode);
}
/*
* Handle semi-leaf after removing an entry. Move entries from leaf to
* semi-leaf to bring semi-leaf occupancy above minOccup, if possible.
* The leaf may become empty.
*/
void
Dbtux::treeRemoveSemi(Frag& frag, NodeHandle semiNode, unsigned i)
{
TreeHead& tree = frag.m_tree;
ndbrequire(semiNode.getChilds() < 2);
TupLoc leafLoc = semiNode.getLink(i);
NodeHandle leafNode(frag);
selectNode(leafNode, leafLoc);
if (semiNode.getOccup() < tree.m_minOccup) {
jam();
unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - semiNode.getOccup());
nodeSlide(semiNode, leafNode, cnt, i);
if (leafNode.getOccup() == 0) {
// remove empty leaf
jam();
treeRemoveNode(frag, leafNode);
}
}
}
/*
* Handle leaf after removing an entry. If parent is semi-leaf, move
* entries to it as in the semi-leaf case. If parent is interior node,
* do nothing.
*/
void
Dbtux::treeRemoveLeaf(Frag& frag, NodeHandle leafNode)
{
TreeHead& tree = frag.m_tree;
TupLoc parentLoc = leafNode.getLink(2);
if (parentLoc != NullTupLoc) {
jam();
// move entries from the other child
TupLoc childLoc = node.getLink(1 - i);
NodeHandle childNode(frag);
selectNode(signal, childNode, childLoc);
nodeSlide(signal, node, childNode, 1 - i);
if (childNode.getOccup() == 0) {
NodeHandle parentNode(frag);
selectNode(parentNode, parentLoc);
unsigned i = leafNode.getSide();
if (parentNode.getLink(1 - i) == NullTupLoc) {
// parent is semi-leaf
jam();
deleteNode(signal, childNode);
node.setLink(1 - i, NullTupLoc);
// we are balanced again but our parent balance changes by -1
parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) {
if (parentNode.getOccup() < tree.m_minOccup) {
jam();
return;
unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - parentNode.getOccup());
nodeSlide(parentNode, leafNode, cnt, i);
}
// fix side and become parent
i = node.getSide();
selectNode(signal, node, parentLoc);
}
}
#endif
// re-balance tree at each node
if (leafNode.getOccup() == 0) {
jam();
// remove empty leaf
treeRemoveNode(frag, leafNode);
}
}
/*
* Remove empty leaf.
*/
void
Dbtux::treeRemoveNode(Frag& frag, NodeHandle leafNode)
{
TreeHead& tree = frag.m_tree;
ndbrequire(leafNode.getChilds() == 0);
TupLoc parentLoc = leafNode.getLink(2);
unsigned i = leafNode.getSide();
deleteNode(leafNode);
if (parentLoc != NullTupLoc) {
jam();
NodeHandle parentNode(frag);
selectNode(parentNode, parentLoc);
parentNode.setLink(i, NullTupLoc);
// re-balance the tree
treeRemoveRebalance(frag, parentNode, i);
return;
}
// tree is now empty
tree.m_root = NullTupLoc;
}
/*
* Re-balance tree after removing a node. The process starts with the
* parent of the removed node.
*/
void
Dbtux::treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i)
{
while (true) {
// height of subtree i has decreased by 1
int j = (i == 0 ? -1 : +1);
......@@ -255,19 +343,19 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
jam();
// child on the other side
NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(1 - i));
selectNode(childNode, node.getLink(1 - i));
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
treeRotateSingle(signal, frag, node, 1 - i);
treeRotateSingle(frag, node, 1 - i);
// height of tree decreased and propagates up
} else if (b2 == -b) {
jam();
treeRotateDouble(signal, frag, node, 1 - i);
treeRotateDouble(frag, node, 1 - i);
// height of tree decreased and propagates up
} else {
jam();
treeRotateSingle(signal, frag, node, 1 - i);
treeRotateSingle(frag, node, 1 - i);
// height of tree did not change - done
return;
}
......@@ -281,7 +369,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
return;
}
i = node.getSide();
selectNode(signal, node, parentLoc);
selectNode(node, parentLoc);
}
}
......@@ -302,10 +390,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
* all optional. If 4 are there it changes side.
*/
void
Dbtux::treeRotateSingle(Signal* signal,
Frag& frag,
NodeHandle& node,
unsigned i)
Dbtux::treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i)
{
ndbrequire(i <= 1);
/*
......@@ -325,7 +410,7 @@ Dbtux::treeRotateSingle(Signal* signal,
*/
TupLoc loc3 = node5.getLink(i);
NodeHandle node3(frag);
selectNode(signal, node3, loc3);
selectNode(node3, loc3);
const int bal3 = node3.getBalance();
/*
2 must always be there but is not changed. Thus we mereley check that it
......@@ -342,7 +427,7 @@ Dbtux::treeRotateSingle(Signal* signal,
NodeHandle node4(frag);
if (loc4 != NullTupLoc) {
jam();
selectNode(signal, node4, loc4);
selectNode(node4, loc4);
ndbrequire(node4.getSide() == (1 - i) &&
node4.getLink(2) == loc3);
node4.setSide(i);
......@@ -377,7 +462,7 @@ Dbtux::treeRotateSingle(Signal* signal,
if (loc0 != NullTupLoc) {
jam();
NodeHandle node0(frag);
selectNode(signal, node0, loc0);
selectNode(node0, loc0);
node0.setLink(side5, loc3);
} else {
jam();
......@@ -514,8 +599,10 @@ Dbtux::treeRotateSingle(Signal* signal,
*
*/
void
Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i)
Dbtux::treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i)
{
TreeHead& tree = frag.m_tree;
// old top node
NodeHandle node6 = node;
const TupLoc loc6 = node6.m_loc;
......@@ -526,13 +613,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// level 1
TupLoc loc2 = node6.getLink(i);
NodeHandle node2(frag);
selectNode(signal, node2, loc2);
selectNode(node2, loc2);
const int bal2 = node2.getBalance();
// level 2
TupLoc loc4 = node2.getLink(1 - i);
NodeHandle node4(frag);
selectNode(signal, node4, loc4);
selectNode(node4, loc4);
const int bal4 = node4.getBalance();
ndbrequire(i <= 1);
......@@ -549,23 +636,26 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// fill up leaf before it becomes internal
if (loc3 == NullTupLoc && loc5 == NullTupLoc) {
jam();
TreeHead& tree = frag.m_tree;
nodeSlide(signal, node4, node2, i);
// implied by rule of merging half-leaves with leaves
ndbrequire(node4.getOccup() >= tree.m_minOccup);
ndbrequire(node2.getOccup() != 0);
if (node4.getOccup() < tree.m_minOccup) {
jam();
unsigned cnt = tree.m_minOccup - node4.getOccup();
ndbrequire(cnt < node2.getOccup());
nodeSlide(node4, node2, cnt, i);
ndbrequire(node4.getOccup() >= tree.m_minOccup);
ndbrequire(node2.getOccup() != 0);
}
} else {
if (loc3 != NullTupLoc) {
jam();
NodeHandle node3(frag);
selectNode(signal, node3, loc3);
selectNode(node3, loc3);
node3.setLink(2, loc2);
node3.setSide(1 - i);
}
if (loc5 != NullTupLoc) {
jam();
NodeHandle node5(frag);
selectNode(signal, node5, loc5);
selectNode(node5, loc5);
node5.setLink(2, node6.m_loc);
node5.setSide(i);
}
......@@ -588,7 +678,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
if (loc0 != NullTupLoc) {
jam();
selectNode(signal, node0, loc0);
selectNode(node0, loc0);
node0.setLink(side6, loc4);
} else {
jam();
......
......@@ -29,6 +29,7 @@ shows ms / 1000 rows for each and index time overhead
samples 10% of all PKs (100,000 pk reads, 100,000 scans)
the "pct" values are from more accurate total times (not shown)
comments [ ... ] are after the case
040616 mc02/a 40 ms 87 ms 114 pct
mc02/b 51 ms 128 ms 148 pct
......@@ -76,13 +77,12 @@ optim 13 mc02/a 40 ms 57 ms 42 pct
mc02/c 9 ms 13 ms 50 pct
mc02/d 170 ms 256 ms 50 pct
after wl-1884 store all-NULL keys (the tests have pctnull=10 per column)
optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/b 47 ms 77 ms 61 pct
mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct
[ after wl-1884 store all-NULL keys (the tests have pctnull=10 per column) ]
[ case d: bug in testOIBasic killed PK read performance ]
optim 14 mc02/a 41 ms 60 ms 44 pct
......@@ -98,8 +98,7 @@ none mc02/a 35 ms 60 ms 71 pct
mc02/c 5 ms 12 ms 106 pct
mc02/d 165 ms 238 ms 44 pct
[ johan re-installed mc02 as fedora gcc-3.3.2 ]
[ case c: table scan has improved... ]
[ johan re-installed mc02 as fedora gcc-3.3.2, tux uses more C++ stuff than tup]
charsets mc02/a 35 ms 60 ms 71 pct
mc02/b 42 ms 84 ms 97 pct
......@@ -118,6 +117,19 @@ optim 15 mc02/a 34 ms 60 ms 72 pct
optim 16 mc02/a 34 ms 53 ms 53 pct
mc02/b 42 ms 75 ms 75 pct
[ case a, b: binary search of bounding node when adding entry ]
[ binary search of bounding node when adding entry ]
none mc02/a 35 ms 53 ms 51 pct
mc02/b 42 ms 75 ms 76 pct
[ rewrote treeAdd / treeRemove ]
optim 17 mc02/a 35 ms 52 ms 49 pct
mc02/b 43 ms 75 ms 75 pct
[ allow slack (2) in interior nodes - almost no effect?? ]
wl-1942 mc02/a 35 ms 52 ms 49 pct
mc02/b 42 ms 75 ms 76 pct
vim: set et:
......@@ -42,7 +42,7 @@ struct Opt {
CHARSET_INFO* m_cs;
bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop;
unsigned m_subsubloop;
const char* m_index;
unsigned m_loop;
bool m_nologging;
......@@ -66,7 +66,7 @@ struct Opt {
m_cs(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4),
m_subsubloop(4),
m_index(0),
m_loop(1),
m_nologging(false),
......@@ -79,7 +79,7 @@ struct Opt {
m_seed(0),
m_subloop(4),
m_table(0),
m_threads(4),
m_threads(6), // table + 5 indexes
m_v(1) {
}
};
......@@ -208,6 +208,8 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
unsigned m_lno;
unsigned m_slno;
unsigned m_totrows;
// value calculation
unsigned m_range;
......@@ -226,6 +228,8 @@ struct Par : public Opt {
m_tab(0),
m_set(0),
m_tmr(0),
m_lno(0),
m_slno(0),
m_totrows(m_threads * m_rows),
m_range(m_rows),
m_pctrange(0),
......@@ -2069,7 +2073,8 @@ pkinsert(Par par)
CHK(con.startTransaction() == 0);
Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j);
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (set.exist(i) || set.pending(i)) {
set.unlock();
......@@ -2174,7 +2179,8 @@ pkdelete(Par par)
Lst lst;
bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j);
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (! set.exist(i) || set.pending(i)) {
set.unlock();
......@@ -2398,7 +2404,7 @@ static int
scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) {
for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanreadindex(par, itab, bset) == 0);
......@@ -2645,7 +2651,7 @@ static int
scanupdateindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) {
for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0);
......@@ -2685,6 +2691,53 @@ readverify(Par par)
return 0;
}
static int
readverifyfull(Par par)
{
par.m_verify = true;
if (par.m_no == 0)
CHK(scanreadtable(par) == 0);
else {
const Tab& tab = par.tab();
unsigned i = par.m_no;
if (i <= tab.m_itabs && useindex(i)) {
const ITab& itab = tab.m_itab[i - 1];
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset) == 0);
}
}
return 0;
}
static int
pkops(Par par)
{
par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
unsigned sel = urandom(10);
if (par.m_slno % 2 == 0) {
// favor insert
if (sel < 8) {
CHK(pkinsert(par) == 0);
} else if (sel < 9) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
} else {
// favor delete
if (sel < 1) {
CHK(pkinsert(par) == 0);
} else if (sel < 2) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
}
}
return 0;
}
static int
pkupdatescanread(Par par)
{
......@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
thr.m_par.m_tab = par.m_tab;
thr.m_par.m_set = par.m_set;
thr.m_par.m_tmr = par.m_tmr;
thr.m_par.m_lno = par.m_lno;
thr.m_par.m_slno = par.m_slno;
thr.m_func = func;
thr.start();
}
......@@ -2953,8 +3008,8 @@ tbuild(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
if (i % 2 == 0) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
if (par.m_slno % 2 == 0) {
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT);
......@@ -2963,9 +3018,10 @@ tbuild(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
}
RUNSTEP(par, readverify, MT);
RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverify, MT);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST);
}
return 0;
......@@ -2973,6 +3029,22 @@ tbuild(Par par)
static int
tpkops(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count());
RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
static int
tpkopsread(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
......@@ -2981,7 +3053,7 @@ tpkops(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, ST);
}
......@@ -3000,7 +3072,7 @@ tmixedops(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST);
}
......@@ -3014,7 +3086,7 @@ tbusybuild(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
......@@ -3030,7 +3102,7 @@ ttimebuild(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, createindex, ST);
......@@ -3049,7 +3121,7 @@ ttimemaint(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, pkupdate, MT);
......@@ -3074,7 +3146,7 @@ ttimescan(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
......@@ -3096,7 +3168,7 @@ ttimepkread(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
......@@ -3132,9 +3204,10 @@ struct TCase {
static const TCase
tcaselist[] = {
TCase("a", tbuild, "index build"),
TCase("b", tpkops, "pk operations and scan reads"),
TCase("c", tmixedops, "pk operations and scan operations"),
TCase("d", tbusybuild, "pk operations and index build"),
TCase("b", tpkops, "pk operations"),
TCase("c", tpkopsread, "pk operations and scan reads"),
TCase("d", tmixedops, "pk operations and scan operations"),
TCase("e", tbusybuild, "pk operations and index build"),
TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"),
......@@ -3192,10 +3265,10 @@ runtest(Par par)
Thr& thr = *g_thrlist[n];
assert(thr.m_thread != 0);
}
for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) {
LL1("loop " << l);
for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
LL1("loop " << par.m_lno);
if (par.m_seed == 0)
srandom(l);
srandom(par.m_lno);
for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment