Commit fd8acbc2 authored by pekka@mysql.com's avatar pekka@mysql.com

Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into mysql.com:/space/pekka/ndb/version/my41
parents 64b18197 7d1f0333
...@@ -129,6 +129,8 @@ private: ...@@ -129,6 +129,8 @@ private:
/* /*
* Operate on entire tuple. Used by TUX where the table has a single * Operate on entire tuple. Used by TUX where the table has a single
* Uint32 array attribute representing an index tree node. * Uint32 array attribute representing an index tree node.
*
* XXX this signal will be replaced by method in TUP
*/ */
class TupStoreTh { class TupStoreTh {
friend class Dbtup; friend class Dbtup;
...@@ -153,8 +155,8 @@ private: ...@@ -153,8 +155,8 @@ private:
Uint32 tableId; Uint32 tableId;
Uint32 fragId; Uint32 fragId;
Uint32 fragPtrI; Uint32 fragPtrI;
Uint32 tupAddr; Uint32 tupAddr; // no longer used
Uint32 tupVersion; Uint32 tupVersion; // no longer used
Uint32 pageId; Uint32 pageId;
Uint32 pageOffset; Uint32 pageOffset;
Uint32 bufferId; Uint32 bufferId;
......
...@@ -69,7 +69,7 @@ class TuxFragReq { ...@@ -69,7 +69,7 @@ class TuxFragReq {
friend class Dblqh; friend class Dblqh;
friend class Dbtux; friend class Dbtux;
public: public:
STATIC_CONST( SignalLength = 9 ); STATIC_CONST( SignalLength = 14 );
private: private:
Uint32 userPtr; Uint32 userPtr;
Uint32 userRef; Uint32 userRef;
...@@ -80,6 +80,9 @@ private: ...@@ -80,6 +80,9 @@ private:
Uint32 fragOff; Uint32 fragOff;
Uint32 tableType; Uint32 tableType;
Uint32 primaryTableId; Uint32 primaryTableId;
Uint32 tupIndexFragPtrI;
Uint32 tupTableFragPtrI[2];
Uint32 accTableFragPtrI[2];
}; };
class TuxFragConf { class TuxFragConf {
......
...@@ -2432,6 +2432,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) ...@@ -2432,6 +2432,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
} }
fragrecptr.i = req->fragPtrI; fragrecptr.i = req->fragPtrI;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
ndbrequire(req->fragId == fragrecptr.p->myfid);
// caller must be explicit here // caller must be explicit here
ndbrequire(req->accOpPtr == RNIL); ndbrequire(req->accOpPtr == RNIL);
// seize operation to hold the lock // seize operation to hold the lock
......
...@@ -1225,6 +1225,18 @@ Dblqh::sendAddFragReq(Signal* signal) ...@@ -1225,6 +1225,18 @@ Dblqh::sendAddFragReq(Signal* signal)
tuxreq->fragOff = addfragptr.p->lh3DistrBits; tuxreq->fragOff = addfragptr.p->lh3DistrBits;
tuxreq->tableType = addfragptr.p->tableType; tuxreq->tableType = addfragptr.p->tableType;
tuxreq->primaryTableId = addfragptr.p->primaryTableId; tuxreq->primaryTableId = addfragptr.p->primaryTableId;
// pointer to index fragment in TUP
tuxreq->tupIndexFragPtrI =
addfragptr.p->addfragStatus == AddFragRecord::WAIT_TWO_TUX ?
fragptr.p->tupFragptr[0] : fragptr.p->tupFragptr[1];
// pointers to table fragments in TUP and ACC
FragrecordPtr tFragPtr;
tFragPtr.i = fragptr.p->tableFragptr;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
tuxreq->tupTableFragPtrI[0] = tFragPtr.p->tupFragptr[0];
tuxreq->tupTableFragPtrI[1] = tFragPtr.p->tupFragptr[1];
tuxreq->accTableFragPtrI[0] = tFragPtr.p->accFragptr[0];
tuxreq->accTableFragPtrI[1] = tFragPtr.p->accFragptr[1];
sendSignal(fragptr.p->tuxBlockref, GSN_TUXFRAGREQ, sendSignal(fragptr.p->tuxBlockref, GSN_TUXFRAGREQ,
signal, TuxFragReq::SignalLength, JBB); signal, TuxFragReq::SignalLength, JBB);
return; return;
......
...@@ -153,8 +153,7 @@ private: ...@@ -153,8 +153,7 @@ private:
static const unsigned AttributeHeaderSize = 1; static const unsigned AttributeHeaderSize = 1;
/* /*
* Logical tuple address, "local key". Identifies both table tuples * Logical tuple address, "local key". Identifies table tuples.
* and index tuples. The code assumes it is one word.
*/ */
typedef Uint32 TupAddr; typedef Uint32 TupAddr;
static const unsigned NullTupAddr = (Uint32)-1; static const unsigned NullTupAddr = (Uint32)-1;
...@@ -168,8 +167,18 @@ private: ...@@ -168,8 +167,18 @@ private:
Uint32 m_pageId; // page i-value Uint32 m_pageId; // page i-value
Uint16 m_pageOffset; // page offset in words Uint16 m_pageOffset; // page offset in words
TupLoc(); TupLoc();
TupLoc(Uint32 pageId, Uint16 pageOffset);
bool operator==(const TupLoc& loc) const;
bool operator!=(const TupLoc& loc) const;
}; };
/*
* There is no const variable NullTupLoc since the compiler may not be
* able to optimize it to TupLoc() constants. Instead null values are
* constructed on the stack with TupLoc().
*/
#define NullTupLoc TupLoc()
// tree definitions // tree definitions
/* /*
...@@ -183,7 +192,7 @@ private: ...@@ -183,7 +192,7 @@ private:
TupAddr m_tupAddr; // address of original tuple TupAddr m_tupAddr; // address of original tuple
Uint16 m_tupVersion; // version Uint16 m_tupVersion; // version
Uint8 m_fragBit; // which duplicated table fragment Uint8 m_fragBit; // which duplicated table fragment
Uint8 unused1; Uint8 pad1;
TreeEnt(); TreeEnt();
// methods // methods
int cmp(const TreeEnt ent) const; int cmp(const TreeEnt ent) const;
...@@ -196,7 +205,7 @@ private: ...@@ -196,7 +205,7 @@ private:
* prefix 3) max and min entries 4) rest of entries 5) one extra entry * prefix 3) max and min entries 4) rest of entries 5) one extra entry
* used as work space. * used as work space.
* *
* struct TreeNode part 1 * struct TreeNode part 1, size 6 words
* min prefix part 2, size TreeHead::m_prefSize * min prefix part 2, size TreeHead::m_prefSize
* max prefix part 2, size TreeHead::m_prefSize * max prefix part 2, size TreeHead::m_prefSize
* max entry part 3 * max entry part 3
...@@ -204,6 +213,10 @@ private: ...@@ -204,6 +213,10 @@ private:
* rest of entries part 4 * rest of entries part 4
* work entry part 5 * work entry part 5
* *
* There are 3 links to other nodes: left child, right child, parent.
* These are in TupLoc format but the pageIds and pageOffsets are
* stored in separate arrays (saves 1 word).
*
* Occupancy (number of entries) is at least 1 except temporarily when * Occupancy (number of entries) is at least 1 except temporarily when
* a node is about to be removed. If occupancy is 1, only max entry * a node is about to be removed. If occupancy is 1, only max entry
* is present but both min and max prefixes are set. * is present but both min and max prefixes are set.
...@@ -211,11 +224,12 @@ private: ...@@ -211,11 +224,12 @@ private:
struct TreeNode; struct TreeNode;
friend struct TreeNode; friend struct TreeNode;
struct TreeNode { struct TreeNode {
TupAddr m_link[3]; // link to 0-left child 1-right child 2-parent Uint32 m_linkPI[3]; // link to 0-left child 1-right child 2-parent
Uint8 m_side; // we are 0-left child 1-right child 2-root Uint16 m_linkPO[3]; // page offsets for above real page ids
unsigned m_side : 2; // we are 0-left child 1-right child 2-root
int m_balance : 2; // balance -1, 0, +1
unsigned pad1 : 4;
Uint8 m_occup; // current number of entries Uint8 m_occup; // current number of entries
Int8 m_balance; // balance -1, 0, +1
Uint8 unused1;
Uint32 m_nodeScan; // list of scans at this node Uint32 m_nodeScan; // list of scans at this node
TreeNode(); TreeNode();
}; };
...@@ -243,7 +257,7 @@ private: ...@@ -243,7 +257,7 @@ private:
Uint8 m_prefSize; // words in min/max prefix each Uint8 m_prefSize; // words in min/max prefix each
Uint8 m_minOccup; // min entries in internal node Uint8 m_minOccup; // min entries in internal node
Uint8 m_maxOccup; // max entries in node Uint8 m_maxOccup; // max entries in node
TupAddr m_root; // root node TupLoc m_root; // root node
TreeHead(); TreeHead();
// methods // methods
unsigned getSize(AccSize acc) const; unsigned getSize(AccSize acc) const;
...@@ -261,8 +275,7 @@ private: ...@@ -261,8 +275,7 @@ private:
struct TreePos; struct TreePos;
friend struct TreePos; friend struct TreePos;
struct TreePos { struct TreePos {
TupAddr m_addr; // logical node address TupLoc m_loc; // physical node address
TupLoc m_loc; // physical address
Uint16 m_pos; // position 0 to m_occup Uint16 m_pos; // position 0 to m_occup
Uint8 m_match; // at an existing entry Uint8 m_match; // at an existing entry
Uint8 m_dir; // from link (0-2) or within node (3) Uint8 m_dir; // from link (0-2) or within node (3)
...@@ -446,6 +459,9 @@ private: ...@@ -446,6 +459,9 @@ private:
Uint32 m_nodeList; // node cache of current operation Uint32 m_nodeList; // node cache of current operation
Uint32 m_nodeFree; // one node pre-allocated for insert Uint32 m_nodeFree; // one node pre-allocated for insert
DLList<ScanOp> m_scanList; // current scans on this fragment DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2];
Uint32 m_accTableFragPtrI[2];
union { union {
Uint32 nextPool; Uint32 nextPool;
}; };
...@@ -491,7 +507,6 @@ private: ...@@ -491,7 +507,6 @@ private:
}; };
Dbtux& m_tux; // this block Dbtux& m_tux; // this block
Frag& m_frag; // fragment using the node Frag& m_frag; // fragment using the node
TupAddr m_addr; // logical node address
TupLoc m_loc; // physical node address TupLoc m_loc; // physical node address
AccSize m_acc; // accessed size AccSize m_acc; // accessed size
unsigned m_flags; // flags unsigned m_flags; // flags
...@@ -503,7 +518,7 @@ private: ...@@ -503,7 +518,7 @@ private:
Uint32 m_cache[MaxTreeNodeSize]; Uint32 m_cache[MaxTreeNodeSize];
NodeHandle(Dbtux& tux, Frag& frag); NodeHandle(Dbtux& tux, Frag& frag);
// getters // getters
TupAddr getLink(unsigned i); TupLoc getLink(unsigned i);
unsigned getChilds(); // cannot spell unsigned getChilds(); // cannot spell
unsigned getSide(); unsigned getSide();
unsigned getOccup(); unsigned getOccup();
...@@ -513,7 +528,7 @@ private: ...@@ -513,7 +528,7 @@ private:
TreeEnt getEnt(unsigned pos); TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i); TreeEnt getMinMax(unsigned i);
// setters // setters
void setLink(unsigned i, TupAddr addr); void setLink(unsigned i, TupLoc loc);
void setSide(unsigned i); void setSide(unsigned i);
void setOccup(unsigned n); void setOccup(unsigned n);
void setBalance(int b); void setBalance(int b);
...@@ -649,8 +664,8 @@ private: ...@@ -649,8 +664,8 @@ private:
*/ */
void seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr); void seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr);
void preallocNode(Signal* signal, Frag& frag, Uint32& errorCode); void preallocNode(Signal* signal, Frag& frag, Uint32& errorCode);
void findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr); void findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc);
void selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr, AccSize acc); void selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc, AccSize acc);
void insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc); void insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
void deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr); void deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr);
void accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc); void accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
...@@ -698,23 +713,24 @@ private: ...@@ -698,23 +713,24 @@ private:
struct PrintPar { struct PrintPar {
char m_path[100]; // LR prefix char m_path[100]; // LR prefix
unsigned m_side; // expected side unsigned m_side; // expected side
TupAddr m_parent; // expected parent address TupLoc m_parent; // expected parent address
int m_depth; // returned depth int m_depth; // returned depth
unsigned m_occup; // returned occupancy unsigned m_occup; // returned occupancy
bool m_ok; // returned status bool m_ok; // returned status
PrintPar(); PrintPar();
}; };
void printTree(Signal* signal, Frag& frag, NdbOut& out); void printTree(Signal* signal, Frag& frag, NdbOut& out);
void printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar& par); void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
friend class NdbOut& operator<<(NdbOut&, const TupLoc&);
friend class NdbOut& operator<<(NdbOut&, const TreeEnt&); friend class NdbOut& operator<<(NdbOut&, const TreeEnt&);
friend class NdbOut& operator<<(NdbOut&, const TreeNode&); friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
friend class NdbOut& operator<<(NdbOut&, const TreeHead&); friend class NdbOut& operator<<(NdbOut&, const TreeHead&);
friend class NdbOut& operator<<(NdbOut&, const TreePos&); friend class NdbOut& operator<<(NdbOut&, const TreePos&);
friend class NdbOut& operator<<(NdbOut&, const DescAttr&); friend class NdbOut& operator<<(NdbOut&, const DescAttr&);
friend class NdbOut& operator<<(NdbOut&, const ScanOp&);
friend class NdbOut& operator<<(NdbOut&, const Index&); friend class NdbOut& operator<<(NdbOut&, const Index&);
friend class NdbOut& operator<<(NdbOut&, const Frag&); friend class NdbOut& operator<<(NdbOut&, const Frag&);
friend class NdbOut& operator<<(NdbOut&, const NodeHandle&); friend class NdbOut& operator<<(NdbOut&, const NodeHandle&);
friend class NdbOut& operator<<(NdbOut&, const ScanOp&);
FILE* debugFile; FILE* debugFile;
NdbOut debugOut; NdbOut debugOut;
unsigned debugFlags; unsigned debugFlags;
...@@ -831,8 +847,45 @@ Dbtux::ConstData::operator=(Data data) ...@@ -831,8 +847,45 @@ Dbtux::ConstData::operator=(Data data)
return *this; return *this;
} }
// Dbtux::TupLoc
inline
Dbtux::TupLoc::TupLoc() :
m_pageId(RNIL),
m_pageOffset(0)
{
}
inline
Dbtux::TupLoc::TupLoc(Uint32 pageId, Uint16 pageOffset) :
m_pageId(pageId),
m_pageOffset(pageOffset)
{
}
inline bool
Dbtux::TupLoc::operator==(const TupLoc& loc) const
{
return m_pageId == loc.m_pageId && m_pageOffset == loc.m_pageOffset;
}
inline bool
Dbtux::TupLoc::operator!=(const TupLoc& loc) const
{
return ! (*this == loc);
}
// Dbtux::TreeEnt // Dbtux::TreeEnt
inline
Dbtux::TreeEnt::TreeEnt() :
m_tupAddr(NullTupAddr),
m_tupVersion(0),
m_fragBit(255),
pad1(0)
{
}
inline int inline int
Dbtux::TreeEnt::cmp(const TreeEnt ent) const Dbtux::TreeEnt::cmp(const TreeEnt ent) const
{ {
...@@ -852,8 +905,36 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const ...@@ -852,8 +905,36 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const
return 0; return 0;
} }
// Dbtux::TreeNode
inline
Dbtux::TreeNode::TreeNode() :
m_side(2),
m_balance(0),
pad1(0),
m_occup(0),
m_nodeScan(RNIL)
{
m_linkPI[0] = NullTupLoc.m_pageId;
m_linkPO[0] = NullTupLoc.m_pageOffset;
m_linkPI[1] = NullTupLoc.m_pageId;
m_linkPO[1] = NullTupLoc.m_pageOffset;
m_linkPI[2] = NullTupLoc.m_pageId;
m_linkPO[2] = NullTupLoc.m_pageOffset;
}
// Dbtux::TreeHead // Dbtux::TreeHead
inline
Dbtux::TreeHead::TreeHead() :
m_nodeSize(0),
m_prefSize(0),
m_minOccup(0),
m_maxOccup(0),
m_root()
{
}
inline unsigned inline unsigned
Dbtux::TreeHead::getSize(AccSize acc) const Dbtux::TreeHead::getSize(AccSize acc) const
{ {
...@@ -885,52 +966,10 @@ Dbtux::TreeHead::getEntList(TreeNode* node) const ...@@ -885,52 +966,10 @@ Dbtux::TreeHead::getEntList(TreeNode* node) const
return (TreeEnt*)ptr; return (TreeEnt*)ptr;
} }
// Dbtux // Dbtux::TreePos
// constructors
inline
Dbtux::TupLoc::TupLoc() :
m_pageId(RNIL),
m_pageOffset(0)
{
}
inline
Dbtux::TreeEnt::TreeEnt() :
m_tupAddr(NullTupAddr),
m_tupVersion(0),
m_fragBit(255),
unused1(0)
{
}
inline
Dbtux::TreeNode::TreeNode() :
m_side(255),
m_occup(0),
m_balance(0),
unused1(0xa1),
m_nodeScan(RNIL)
{
m_link[0] = NullTupAddr;
m_link[1] = NullTupAddr;
m_link[2] = NullTupAddr;
}
inline
Dbtux::TreeHead::TreeHead() :
m_nodeSize(0),
m_prefSize(0),
m_minOccup(0),
m_maxOccup(0),
m_root(0)
{
}
inline inline
Dbtux::TreePos::TreePos() : Dbtux::TreePos::TreePos() :
m_addr(NullTupAddr),
m_loc(), m_loc(),
m_pos(ZNIL), m_pos(ZNIL),
m_match(false), m_match(false),
...@@ -939,6 +978,8 @@ Dbtux::TreePos::TreePos() : ...@@ -939,6 +978,8 @@ Dbtux::TreePos::TreePos() :
{ {
} }
// Dbtux::DescPage
inline inline
Dbtux::DescPage::DescPage() : Dbtux::DescPage::DescPage() :
m_nextPage(RNIL), m_nextPage(RNIL),
...@@ -953,6 +994,41 @@ Dbtux::DescPage::DescPage() : ...@@ -953,6 +994,41 @@ Dbtux::DescPage::DescPage() :
} }
} }
// Dbtux::ScanOp
inline
Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_state(Undef),
m_lockwait(false),
m_userPtr(RNIL),
m_userRef(RNIL),
m_tableId(RNIL),
m_indexId(RNIL),
m_fragPtrI(RNIL),
m_transId1(0),
m_transId2(0),
m_savePointId(0),
m_accLockOp(RNIL),
m_readCommitted(0),
m_lockMode(0),
m_keyInfo(0),
m_boundMin(scanBoundPool),
m_boundMax(scanBoundPool),
m_scanPos(),
m_lastEnt(),
m_nodeScan(RNIL)
{
m_bound[0] = &m_boundMin;
m_bound[1] = &m_boundMax;
m_boundCnt[0] = 0;
m_boundCnt[1] = 0;
for (unsigned i = 0; i < MaxAccLockOps; i++) {
m_accLockOps[i] = RNIL;
}
}
// Dbtux::Index
inline inline
Dbtux::Index::Index() : Dbtux::Index::Index() :
m_state(NotDefined), m_state(NotDefined),
...@@ -969,6 +1045,8 @@ Dbtux::Index::Index() : ...@@ -969,6 +1045,8 @@ Dbtux::Index::Index() :
}; };
}; };
// Dbtux::Frag
inline inline
Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) : Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) :
m_tableId(RNIL), m_tableId(RNIL),
...@@ -981,10 +1059,17 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) : ...@@ -981,10 +1059,17 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) :
m_tree(), m_tree(),
m_nodeList(RNIL), m_nodeList(RNIL),
m_nodeFree(RNIL), m_nodeFree(RNIL),
m_scanList(scanOpPool) m_scanList(scanOpPool),
m_tupIndexFragPtrI(RNIL)
{ {
m_tupTableFragPtrI[0] = RNIL;
m_tupTableFragPtrI[1] = RNIL;
m_accTableFragPtrI[0] = RNIL;
m_accTableFragPtrI[1] = RNIL;
} }
// Dbtux::FragOp
inline inline
Dbtux::FragOp::FragOp() : Dbtux::FragOp::FragOp() :
m_userPtr(RNIL), m_userPtr(RNIL),
...@@ -997,11 +1082,12 @@ Dbtux::FragOp::FragOp() : ...@@ -997,11 +1082,12 @@ Dbtux::FragOp::FragOp() :
{ {
}; };
// Dbtux::NodeHandle
inline inline
Dbtux::NodeHandle::NodeHandle(Dbtux& tux, Frag& frag) : Dbtux::NodeHandle::NodeHandle(Dbtux& tux, Frag& frag) :
m_tux(tux), m_tux(tux),
m_frag(frag), m_frag(frag),
m_addr(NullTupAddr),
m_loc(), m_loc(),
m_acc(AccNone), m_acc(AccNone),
m_flags(0), m_flags(0),
...@@ -1010,126 +1096,20 @@ Dbtux::NodeHandle::NodeHandle(Dbtux& tux, Frag& frag) : ...@@ -1010,126 +1096,20 @@ Dbtux::NodeHandle::NodeHandle(Dbtux& tux, Frag& frag) :
{ {
} }
inline inline Dbtux::TupLoc
Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_state(Undef),
m_lockwait(false),
m_userPtr(RNIL),
m_userRef(RNIL),
m_tableId(RNIL),
m_indexId(RNIL),
m_fragPtrI(RNIL),
m_transId1(0),
m_transId2(0),
m_savePointId(0),
m_accLockOp(RNIL),
m_readCommitted(0),
m_lockMode(0),
m_keyInfo(0),
m_boundMin(scanBoundPool),
m_boundMax(scanBoundPool),
m_scanPos(),
m_lastEnt(),
m_nodeScan(RNIL)
{
m_bound[0] = &m_boundMin;
m_bound[1] = &m_boundMax;
m_boundCnt[0] = 0;
m_boundCnt[1] = 0;
for (unsigned i = 0; i < MaxAccLockOps; i++) {
m_accLockOps[i] = RNIL;
}
}
inline
Dbtux::CopyPar::CopyPar() :
m_items(0),
m_headers(true),
m_maxwords(~0), // max unsigned
// output
m_numitems(0),
m_numwords(0)
{
}
inline
Dbtux::ReadPar::ReadPar() :
m_first(0),
m_count(0),
m_data(0),
m_size(0)
{
}
inline
Dbtux::StorePar::StorePar() :
m_opCode(TupStoreTh::OpUndefined),
m_offset(0),
m_size(0),
m_errorCode(0)
{
}
inline
Dbtux::SearchPar::SearchPar() :
m_data(0),
m_ent()
{
}
inline
Dbtux::CmpPar::CmpPar() :
m_data1(0),
m_data2(0),
m_len2(0),
m_first(0),
m_numEq(0)
{
}
inline
Dbtux::BoundPar::BoundPar() :
m_data1(0),
m_data2(0),
m_count1(0),
m_len2(0),
m_dir(255)
{
}
#ifdef VM_TRACE
inline
Dbtux::PrintPar::PrintPar() :
// caller fills in
m_path(),
m_side(255),
m_parent(NullTupAddr),
// default return values
m_depth(0),
m_occup(0),
m_ok(true)
{
}
#endif
// node handles
inline Dbtux::TupAddr
Dbtux::NodeHandle::getLink(unsigned i) Dbtux::NodeHandle::getLink(unsigned i)
{ {
ndbrequire(i <= 2); ndbrequire(i <= 2);
return m_node->m_link[i]; return TupLoc(m_node->m_linkPI[i], m_node->m_linkPO[i]);
} }
inline unsigned inline unsigned
Dbtux::NodeHandle::getChilds() Dbtux::NodeHandle::getChilds()
{ {
return return (getLink(0) != NullTupLoc) + (getLink(1) != NullTupLoc);
(m_node->m_link[0] != NullTupAddr) +
(m_node->m_link[1] != NullTupAddr);
} }
inline Dbtux::TupAddr inline unsigned
Dbtux::NodeHandle::getSide() Dbtux::NodeHandle::getSide()
{ {
return m_node->m_side; return m_node->m_side;
...@@ -1185,17 +1165,18 @@ Dbtux::NodeHandle::getMinMax(unsigned i) ...@@ -1185,17 +1165,18 @@ Dbtux::NodeHandle::getMinMax(unsigned i)
} }
inline void inline void
Dbtux::NodeHandle::setLink(unsigned i, TupAddr addr) Dbtux::NodeHandle::setLink(unsigned i, TupLoc loc)
{ {
ndbrequire(i <= 2); ndbrequire(i <= 2);
m_node->m_link[i] = addr; m_node->m_linkPI[i] = loc.m_pageId;
m_node->m_linkPO[i] = loc.m_pageOffset;
m_flags |= DoUpdate; m_flags |= DoUpdate;
} }
inline void inline void
Dbtux::NodeHandle::setSide(unsigned i) Dbtux::NodeHandle::setSide(unsigned i)
{ {
// ndbrequire(i <= 1); ndbrequire(i <= 2);
m_node->m_side = i; m_node->m_side = i;
m_flags |= DoUpdate; m_flags |= DoUpdate;
} }
...@@ -1224,6 +1205,79 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI) ...@@ -1224,6 +1205,79 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI)
m_flags |= DoUpdate; m_flags |= DoUpdate;
} }
// parameters for methods
inline
Dbtux::CopyPar::CopyPar() :
m_items(0),
m_headers(true),
m_maxwords(~0), // max unsigned
// output
m_numitems(0),
m_numwords(0)
{
}
inline
Dbtux::ReadPar::ReadPar() :
m_first(0),
m_count(0),
m_data(0),
m_size(0)
{
}
inline
Dbtux::StorePar::StorePar() :
m_opCode(TupStoreTh::OpUndefined),
m_offset(0),
m_size(0),
m_errorCode(0)
{
}
inline
Dbtux::SearchPar::SearchPar() :
m_data(0),
m_ent()
{
}
inline
Dbtux::CmpPar::CmpPar() :
m_data1(0),
m_data2(0),
m_len2(0),
m_first(0),
m_numEq(0)
{
}
inline
Dbtux::BoundPar::BoundPar() :
m_data1(0),
m_data2(0),
m_count1(0),
m_len2(0),
m_dir(255)
{
}
#ifdef VM_TRACE
inline
Dbtux::PrintPar::PrintPar() :
// caller fills in
m_path(),
m_side(255),
m_parent(),
// default return values
m_depth(0),
m_occup(0),
m_ok(true)
{
}
#endif
// other methods // other methods
inline Dbtux::DescEnt& inline Dbtux::DescEnt&
......
...@@ -97,7 +97,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -97,7 +97,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
PrintPar par; PrintPar par;
strcpy(par.m_path, "."); strcpy(par.m_path, ".");
par.m_side = 2; par.m_side = 2;
par.m_parent = NullTupAddr; par.m_parent = NullTupLoc;
printNode(signal, frag, out, tree.m_root, par); printNode(signal, frag, out, tree.m_root, par);
out.m_out->flush(); out.m_out->flush();
if (! par.m_ok) { if (! par.m_ok) {
...@@ -116,15 +116,15 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -116,15 +116,15 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
} }
void void
Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar& par) Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
{ {
if (addr == NullTupAddr) { if (loc == NullTupLoc) {
par.m_depth = 0; par.m_depth = 0;
return; return;
} }
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
NodeHandlePtr nodePtr; NodeHandlePtr nodePtr;
selectNode(signal, frag, nodePtr, addr, AccFull); selectNode(signal, frag, nodePtr, loc, AccFull);
out << par.m_path << " " << *nodePtr.p << endl; out << par.m_path << " " << *nodePtr.p << endl;
// check children // check children
PrintPar cpar[2]; PrintPar cpar[2];
...@@ -133,7 +133,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar ...@@ -133,7 +133,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar
sprintf(cpar[i].m_path, "%s%c", par.m_path, "LR"[i]); sprintf(cpar[i].m_path, "%s%c", par.m_path, "LR"[i]);
cpar[i].m_side = i; cpar[i].m_side = i;
cpar[i].m_depth = 0; cpar[i].m_depth = 0;
cpar[i].m_parent = addr; cpar[i].m_parent = loc;
printNode(signal, frag, out, nodePtr.p->getLink(i), cpar[i]); printNode(signal, frag, out, nodePtr.p->getLink(i), cpar[i]);
if (! cpar[i].m_ok) { if (! cpar[i].m_ok) {
par.m_ok = false; par.m_ok = false;
...@@ -143,7 +143,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar ...@@ -143,7 +143,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar
if (nodePtr.p->getLink(2) != par.m_parent) { if (nodePtr.p->getLink(2) != par.m_parent) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "parent addr " << hex << nodePtr.p->getLink(2); out << "parent loc " << hex << nodePtr.p->getLink(2);
out << " should be " << hex << par.m_parent << endl; out << " should be " << hex << par.m_parent << endl;
} }
if (nodePtr.p->getSide() != par.m_side) { if (nodePtr.p->getSide() != par.m_side) {
...@@ -181,8 +181,8 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar ...@@ -181,8 +181,8 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar
} }
// check missed half-leaf/leaf merge // check missed half-leaf/leaf merge
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
if (nodePtr.p->getLink(i) != NullTupAddr && if (nodePtr.p->getLink(i) != NullTupLoc &&
nodePtr.p->getLink(1 - i) == NullTupAddr && nodePtr.p->getLink(1 - i) == NullTupLoc &&
nodePtr.p->getOccup() + cpar[i].m_occup <= tree.m_maxOccup) { nodePtr.p->getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
...@@ -194,6 +194,18 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar ...@@ -194,6 +194,18 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar
par.m_occup = nodePtr.p->getOccup(); par.m_occup = nodePtr.p->getOccup();
} }
NdbOut&
operator<<(NdbOut& out, const Dbtux::TupLoc& loc)
{
if (loc == Dbtux::NullTupLoc) {
out << "null";
} else {
out << hex << loc.m_pageId;
out << "." << dec << loc.m_pageOffset;
}
return out;
}
NdbOut& NdbOut&
operator<<(NdbOut& out, const Dbtux::TreeEnt& ent) operator<<(NdbOut& out, const Dbtux::TreeEnt& ent)
{ {
...@@ -206,10 +218,13 @@ operator<<(NdbOut& out, const Dbtux::TreeEnt& ent) ...@@ -206,10 +218,13 @@ operator<<(NdbOut& out, const Dbtux::TreeEnt& ent)
NdbOut& NdbOut&
operator<<(NdbOut& out, const Dbtux::TreeNode& node) operator<<(NdbOut& out, const Dbtux::TreeNode& node)
{ {
Dbtux::TupLoc link0(node.m_linkPI[0], node.m_linkPO[0]);
Dbtux::TupLoc link1(node.m_linkPI[1], node.m_linkPO[1]);
Dbtux::TupLoc link2(node.m_linkPI[2], node.m_linkPO[2]);
out << "[TreeNode " << hex << &node; out << "[TreeNode " << hex << &node;
out << " [left " << hex << node.m_link[0] << "]"; out << " [left " << link0 << "]";
out << " [right " << hex << node.m_link[1] << "]"; out << " [right " << link1 << "]";
out << " [up " << hex << node.m_link[2] << "]"; out << " [up " << link2 << "]";
out << " [side " << dec << node.m_side << "]"; out << " [side " << dec << node.m_side << "]";
out << " [occup " << dec << node.m_occup << "]"; out << " [occup " << dec << node.m_occup << "]";
out << " [balance " << dec << (int)node.m_balance << "]"; out << " [balance " << dec << (int)node.m_balance << "]";
...@@ -238,7 +253,7 @@ NdbOut& ...@@ -238,7 +253,7 @@ NdbOut&
operator<<(NdbOut& out, const Dbtux::TreePos& pos) operator<<(NdbOut& out, const Dbtux::TreePos& pos)
{ {
out << "[TreePos " << hex << &pos; out << "[TreePos " << hex << &pos;
out << " [addr " << hex << pos.m_addr << "]"; out << " [loc " << pos.m_loc << "]";
out << " [pos " << dec << pos.m_pos << "]"; out << " [pos " << dec << pos.m_pos << "]";
out << " [match " << dec << pos.m_match << "]"; out << " [match " << dec << pos.m_match << "]";
out << " [dir " << dec << pos.m_dir << "]"; out << " [dir " << dec << pos.m_dir << "]";
...@@ -338,7 +353,7 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node) ...@@ -338,7 +353,7 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node)
const Dbtux::Frag& frag = node.m_frag; const Dbtux::Frag& frag = node.m_frag;
const Dbtux::TreeHead& tree = frag.m_tree; const Dbtux::TreeHead& tree = frag.m_tree;
out << "[NodeHandle " << hex << &node; out << "[NodeHandle " << hex << &node;
out << " [addr " << hex << node.m_addr << "]"; out << " [loc " << node.m_loc << "]";
out << " [acc " << dec << node.m_acc << "]"; out << " [acc " << dec << node.m_acc << "]";
out << " [flags " << hex << node.m_flags << "]"; out << " [flags " << hex << node.m_flags << "]";
out << " [node " << *node.m_node << "]"; out << " [node " << *node.m_node << "]";
......
...@@ -199,7 +199,7 @@ Dbtux::tupReadAttrs(Signal* signal, const Frag& frag, ReadPar& readPar) ...@@ -199,7 +199,7 @@ Dbtux::tupReadAttrs(Signal* signal, const Frag& frag, ReadPar& readPar)
req->requestInfo = 0; req->requestInfo = 0;
req->tableId = frag.m_tableId; req->tableId = frag.m_tableId;
req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff); req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
req->fragPtrI = RNIL; req->fragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
req->tupAddr = ent.m_tupAddr; req->tupAddr = ent.m_tupAddr;
req->tupVersion = ent.m_tupVersion; req->tupVersion = ent.m_tupVersion;
req->pageId = RNIL; req->pageId = RNIL;
...@@ -246,7 +246,7 @@ Dbtux::tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar) ...@@ -246,7 +246,7 @@ Dbtux::tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar)
req->requestInfo = TupReadAttrs::ReadKeys; req->requestInfo = TupReadAttrs::ReadKeys;
req->tableId = frag.m_tableId; req->tableId = frag.m_tableId;
req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff); req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
req->fragPtrI = RNIL; req->fragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
req->tupAddr = ent.m_tupAddr; req->tupAddr = ent.m_tupAddr;
req->tupVersion = RNIL; // not used req->tupVersion = RNIL; // not used
req->pageId = RNIL; req->pageId = RNIL;
...@@ -284,9 +284,9 @@ Dbtux::tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, Store ...@@ -284,9 +284,9 @@ Dbtux::tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, Store
req->errorCode = RNIL; req->errorCode = RNIL;
req->tableId = frag.m_indexId; req->tableId = frag.m_indexId;
req->fragId = frag.m_fragId; req->fragId = frag.m_fragId;
req->fragPtrI = RNIL; req->fragPtrI = frag.m_tupIndexFragPtrI;
req->tupAddr = nodePtr.p->m_addr; req->tupAddr = RNIL; // no longer used
req->tupVersion = 0; req->tupVersion = 0; // no longer used
req->pageId = nodePtr.p->m_loc.m_pageId; req->pageId = nodePtr.p->m_loc.m_pageId;
req->pageOffset = nodePtr.p->m_loc.m_pageOffset; req->pageOffset = nodePtr.p->m_loc.m_pageOffset;
req->bufferId = 0; req->bufferId = 0;
...@@ -346,21 +346,18 @@ Dbtux::tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, Store ...@@ -346,21 +346,18 @@ Dbtux::tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, Store
const Uint32* src = (const Uint32*)buffer + storePar.m_offset; const Uint32* src = (const Uint32*)buffer + storePar.m_offset;
memcpy(dst, src, storePar.m_size << 2); memcpy(dst, src, storePar.m_size << 2);
} }
// fallthru break;
case TupStoreTh::OpInsert: case TupStoreTh::OpInsert:
jam(); jam();
// fallthru
case TupStoreTh::OpUpdate:
jam();
nodePtr.p->m_addr = req->tupAddr;
nodePtr.p->m_loc.m_pageId = req->pageId; nodePtr.p->m_loc.m_pageId = req->pageId;
nodePtr.p->m_loc.m_pageOffset = req->pageOffset; nodePtr.p->m_loc.m_pageOffset = req->pageOffset;
break; break;
case TupStoreTh::OpUpdate:
jam();
break;
case TupStoreTh::OpDelete: case TupStoreTh::OpDelete:
jam(); jam();
nodePtr.p->m_addr = NullTupAddr; nodePtr.p->m_loc = NullTupLoc;
nodePtr.p->m_loc.m_pageId = RNIL;
nodePtr.p->m_loc.m_pageOffset = 0;
break; break;
default: default:
ndbrequire(false); ndbrequire(false);
......
...@@ -85,6 +85,11 @@ Dbtux::execTUXFRAGREQ(Signal* signal) ...@@ -85,6 +85,11 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
fragPtr.p->m_fragOff = req->fragOff; fragPtr.p->m_fragOff = req->fragOff;
fragPtr.p->m_fragId = req->fragId; fragPtr.p->m_fragId = req->fragId;
fragPtr.p->m_numAttrs = req->noOfAttr; fragPtr.p->m_numAttrs = req->noOfAttr;
fragPtr.p->m_tupIndexFragPtrI = req->tupIndexFragPtrI;
fragPtr.p->m_tupTableFragPtrI[0] = req->tupTableFragPtrI[0];
fragPtr.p->m_tupTableFragPtrI[1] = req->tupTableFragPtrI[1];
fragPtr.p->m_accTableFragPtrI[0] = req->accTableFragPtrI[0];
fragPtr.p->m_accTableFragPtrI[1] = req->accTableFragPtrI[1];
// add the fragment to the index // add the fragment to the index
indexPtr.p->m_fragId[indexPtr.p->m_numFrags] = req->fragId; indexPtr.p->m_fragId[indexPtr.p->m_numFrags] = req->fragId;
indexPtr.p->m_fragPtrI[indexPtr.p->m_numFrags] = fragPtr.i; indexPtr.p->m_fragPtrI[indexPtr.p->m_numFrags] = fragPtr.i;
...@@ -197,6 +202,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) ...@@ -197,6 +202,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
jam(); jam();
// initialize tree header // initialize tree header
TreeHead& tree = fragPtr.p->m_tree; TreeHead& tree = fragPtr.p->m_tree;
new (&tree) TreeHead();
// make these configurable later // make these configurable later
tree.m_nodeSize = MAX_TTREE_NODE_SIZE; tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
tree.m_prefSize = MAX_TTREE_PREF_SIZE; tree.m_prefSize = MAX_TTREE_PREF_SIZE;
...@@ -222,8 +228,8 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) ...@@ -222,8 +228,8 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
break; break;
} }
tree.m_minOccup = tree.m_maxOccup - maxSlack; tree.m_minOccup = tree.m_maxOccup - maxSlack;
// root node does not exist // root node does not exist (also set by ctor)
tree.m_root = NullTupAddr; tree.m_root = NullTupLoc;
// fragment is defined // fragment is defined
c_fragOpPool.release(fragOpPtr); c_fragOpPool.release(fragOpPtr);
} }
......
...@@ -80,14 +80,14 @@ Dbtux::preallocNode(Signal* signal, Frag& frag, Uint32& errorCode) ...@@ -80,14 +80,14 @@ Dbtux::preallocNode(Signal* signal, Frag& frag, Uint32& errorCode)
* Find node in the cache. XXX too slow, use direct links instead * Find node in the cache. XXX too slow, use direct links instead
*/ */
void void
Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr) Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc)
{ {
NodeHandlePtr tmpPtr; NodeHandlePtr tmpPtr;
tmpPtr.i = frag.m_nodeList; tmpPtr.i = frag.m_nodeList;
while (tmpPtr.i != RNIL) { while (tmpPtr.i != RNIL) {
jam(); jam();
c_nodeHandlePool.getPtr(tmpPtr); c_nodeHandlePool.getPtr(tmpPtr);
if (tmpPtr.p->m_addr == addr) { if (tmpPtr.p->m_loc == loc) {
jam(); jam();
nodePtr = tmpPtr; nodePtr = tmpPtr;
return; return;
...@@ -102,18 +102,18 @@ Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr ...@@ -102,18 +102,18 @@ Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr
* Get handle for existing node. * Get handle for existing node.
*/ */
void void
Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr, AccSize acc) Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc, AccSize acc)
{ {
ndbrequire(addr != NullTupAddr && acc > AccNone); ndbrequire(loc != NullTupLoc && acc > AccNone);
NodeHandlePtr tmpPtr; NodeHandlePtr tmpPtr;
// search in cache // search in cache
findNode(signal, frag, tmpPtr, addr); findNode(signal, frag, tmpPtr, loc);
if (tmpPtr.i == RNIL) { if (tmpPtr.i == RNIL) {
jam(); jam();
// add new node // add new node
seizeNode(signal, frag, tmpPtr); seizeNode(signal, frag, tmpPtr);
ndbrequire(tmpPtr.i != RNIL); ndbrequire(tmpPtr.i != RNIL);
tmpPtr.p->m_addr = addr; tmpPtr.p->m_loc = loc;
} }
if (tmpPtr.p->m_acc < acc) { if (tmpPtr.p->m_acc < acc) {
jam(); jam();
...@@ -276,7 +276,7 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent) ...@@ -276,7 +276,7 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
jam(); jam();
m_tux.c_scanOpPool.getPtr(scanPtr); m_tux.c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
if (scanPos.m_pos >= pos) { if (scanPos.m_pos >= pos) {
jam(); jam();
#ifdef VM_TRACE #ifdef VM_TRACE
...@@ -329,7 +329,7 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent) ...@@ -329,7 +329,7 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
jam(); jam();
m_tux.c_scanOpPool.getPtr(scanPtr); m_tux.c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan; const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) { if (scanPos.m_pos == pos) {
jam(); jam();
...@@ -349,7 +349,7 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent) ...@@ -349,7 +349,7 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
jam(); jam();
m_tux.c_scanOpPool.getPtr(scanPtr); m_tux.c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
ndbrequire(scanPos.m_pos != pos); ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos > pos) { if (scanPos.m_pos > pos) {
jam(); jam();
...@@ -403,7 +403,7 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent) ...@@ -403,7 +403,7 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
jam(); jam();
m_tux.c_scanOpPool.getPtr(scanPtr); m_tux.c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan; const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == 0) { if (scanPos.m_pos == 0) {
jam(); jam();
...@@ -424,7 +424,7 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent) ...@@ -424,7 +424,7 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
jam(); jam();
m_tux.c_scanOpPool.getPtr(scanPtr); m_tux.c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
ndbrequire(scanPos.m_pos != 0); ndbrequire(scanPos.m_pos != 0);
if (scanPos.m_pos <= pos) { if (scanPos.m_pos <= pos) {
jam(); jam();
...@@ -480,7 +480,7 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent) ...@@ -480,7 +480,7 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
jam(); jam();
m_tux.c_scanOpPool.getPtr(scanPtr); m_tux.c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan; const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) { if (scanPos.m_pos == pos) {
jam(); jam();
...@@ -501,7 +501,7 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent) ...@@ -501,7 +501,7 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
jam(); jam();
m_tux.c_scanOpPool.getPtr(scanPtr); m_tux.c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
ndbrequire(scanPos.m_pos != pos); ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos < pos) { if (scanPos.m_pos < pos) {
jam(); jam();
......
...@@ -47,7 +47,7 @@ Dbtux::execACC_SCANREQ(Signal* signal) ...@@ -47,7 +47,7 @@ Dbtux::execACC_SCANREQ(Signal* signal)
ndbrequire(frag.m_fragId < (1 << frag.m_fragOff)); ndbrequire(frag.m_fragId < (1 << frag.m_fragOff));
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
// check for empty fragment // check for empty fragment
if (tree.m_root == NullTupAddr) { if (tree.m_root == NullTupLoc) {
jam(); jam();
AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend(); AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
conf->scanPtr = req->senderData; conf->scanPtr = req->senderData;
...@@ -275,13 +275,13 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -275,13 +275,13 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
case NextScanReq::ZSCAN_CLOSE: case NextScanReq::ZSCAN_CLOSE:
jam(); jam();
// unlink from tree node first to avoid state changes // unlink from tree node first to avoid state changes
if (scan.m_scanPos.m_addr != NullTupAddr) { if (scan.m_scanPos.m_loc != NullTupLoc) {
jam(); jam();
const TupAddr addr = scan.m_scanPos.m_addr; const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandlePtr nodePtr; NodeHandlePtr nodePtr;
selectNode(signal, frag, nodePtr, addr, AccHead); selectNode(signal, frag, nodePtr, loc, AccHead);
nodePtr.p->unlinkScan(scanPtr); nodePtr.p->unlinkScan(scanPtr);
scan.m_scanPos.m_addr = NullTupAddr; scan.m_scanPos.m_loc = NullTupLoc;
} }
if (scan.m_lockwait) { if (scan.m_lockwait) {
jam(); jam();
...@@ -407,8 +407,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -407,8 +407,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
lockReq->userRef = reference(); lockReq->userRef = reference();
lockReq->tableId = scan.m_tableId; lockReq->tableId = scan.m_tableId;
lockReq->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff); lockReq->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
// should cache this at fragment create lockReq->fragPtrI = frag.m_accTableFragPtrI[ent.m_fragBit];
lockReq->fragPtrI = RNIL;
const Uint32* const buf32 = static_cast<Uint32*>(keyPar.m_data); const Uint32* const buf32 = static_cast<Uint32*>(keyPar.m_data);
const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32); const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32);
lockReq->hashValue = md5_hash(buf64, keyPar.m_size); lockReq->hashValue = md5_hash(buf64, keyPar.m_size);
...@@ -700,14 +699,14 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -700,14 +699,14 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
if (tree.m_root == NullTupAddr) { if (tree.m_root == NullTupLoc) {
// tree may have become empty // tree may have become empty
jam(); jam();
scan.m_state = ScanOp::Last; scan.m_state = ScanOp::Last;
return; return;
} }
TreePos pos; TreePos pos;
pos.m_addr = tree.m_root; pos.m_loc = tree.m_root;
NodeHandlePtr nodePtr; NodeHandlePtr nodePtr;
// unpack lower bound // unpack lower bound
const ScanBound& bound = *scan.m_bound[0]; const ScanBound& bound = *scan.m_bound[0];
...@@ -725,7 +724,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -725,7 +724,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
boundPar.m_dir = 0; boundPar.m_dir = 0;
loop: { loop: {
jam(); jam();
selectNode(signal, frag, nodePtr, pos.m_addr, AccPref); selectNode(signal, frag, nodePtr, pos.m_loc, AccPref);
const unsigned occup = nodePtr.p->getOccup(); const unsigned occup = nodePtr.p->getOccup();
ndbrequire(occup != 0); ndbrequire(occup != 0);
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
...@@ -751,11 +750,11 @@ loop: { ...@@ -751,11 +750,11 @@ loop: {
} }
if (i == 0 && ret < 0) { if (i == 0 && ret < 0) {
jam(); jam();
const TupAddr tupAddr = nodePtr.p->getLink(i); const TupLoc loc = nodePtr.p->getLink(i);
if (tupAddr != NullTupAddr) { if (loc != NullTupLoc) {
jam(); jam();
// continue to left subtree // continue to left subtree
pos.m_addr = tupAddr; pos.m_loc = loc;
goto loop; goto loop;
} }
// start scanning this node // start scanning this node
...@@ -769,11 +768,11 @@ loop: { ...@@ -769,11 +768,11 @@ loop: {
} }
if (i == 1 && ret > 0) { if (i == 1 && ret > 0) {
jam(); jam();
const TupAddr tupAddr = nodePtr.p->getLink(i); const TupLoc loc = nodePtr.p->getLink(i);
if (tupAddr != NullTupAddr) { if (loc != NullTupLoc) {
jam(); jam();
// continue to right subtree // continue to right subtree
pos.m_addr = tupAddr; pos.m_loc = loc;
goto loop; goto loop;
} }
// start scanning upwards // start scanning upwards
...@@ -870,7 +869,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -870,7 +869,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
TreePos pos = scan.m_scanPos; TreePos pos = scan.m_scanPos;
// get and remember original node // get and remember original node
NodeHandlePtr origNodePtr; NodeHandlePtr origNodePtr;
selectNode(signal, frag, origNodePtr, pos.m_addr, AccHead); selectNode(signal, frag, origNodePtr, pos.m_loc, AccHead);
ndbrequire(origNodePtr.p->islinkScan(scanPtr)); ndbrequire(origNodePtr.p->islinkScan(scanPtr));
// current node in loop // current node in loop
NodeHandlePtr nodePtr = origNodePtr; NodeHandlePtr nodePtr = origNodePtr;
...@@ -879,21 +878,21 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -879,21 +878,21 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pos.m_dir == 2) { if (pos.m_dir == 2) {
// coming up from root ends the scan // coming up from root ends the scan
jam(); jam();
pos.m_addr = NullTupAddr; pos.m_loc = NullTupLoc;
scan.m_state = ScanOp::Last; scan.m_state = ScanOp::Last;
break; break;
} }
if (nodePtr.p->m_addr != pos.m_addr) { if (nodePtr.p->m_loc != pos.m_loc) {
jam(); jam();
selectNode(signal, frag, nodePtr, pos.m_addr, AccHead); selectNode(signal, frag, nodePtr, pos.m_loc, AccHead);
} }
if (pos.m_dir == 4) { if (pos.m_dir == 4) {
// coming down from parent proceed to left child // coming down from parent proceed to left child
jam(); jam();
TupAddr addr = nodePtr.p->getLink(0); TupLoc loc = nodePtr.p->getLink(0);
if (addr != NullTupAddr) { if (loc != NullTupLoc) {
jam(); jam();
pos.m_addr = addr; pos.m_loc = loc;
pos.m_dir = 4; // unchanged pos.m_dir = 4; // unchanged
continue; continue;
} }
...@@ -938,7 +937,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -938,7 +937,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (ret < 0) { if (ret < 0) {
jam(); jam();
// hit upper bound of single range scan // hit upper bound of single range scan
pos.m_addr = NullTupAddr; pos.m_loc = NullTupLoc;
scan.m_state = ScanOp::Last; scan.m_state = ScanOp::Last;
break; break;
} }
...@@ -952,10 +951,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -952,10 +951,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break; break;
} }
// after node proceed to right child // after node proceed to right child
TupAddr addr = nodePtr.p->getLink(1); TupLoc loc = nodePtr.p->getLink(1);
if (addr != NullTupAddr) { if (loc != NullTupLoc) {
jam(); jam();
pos.m_addr = addr; pos.m_loc = loc;
pos.m_dir = 4; pos.m_dir = 4;
continue; continue;
} }
...@@ -965,7 +964,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -965,7 +964,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pos.m_dir == 1) { if (pos.m_dir == 1) {
// coming from right child proceed to parent // coming from right child proceed to parent
jam(); jam();
pos.m_addr = nodePtr.p->getLink(2); pos.m_loc = nodePtr.p->getLink(2);
pos.m_dir = nodePtr.p->getSide(); pos.m_dir = nodePtr.p->getSide();
continue; continue;
} }
...@@ -975,7 +974,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -975,7 +974,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos; scan.m_scanPos = pos;
// relink // relink
if (scan.m_state == ScanOp::Current) { if (scan.m_state == ScanOp::Current) {
ndbrequire(pos.m_addr == nodePtr.p->m_addr); ndbrequire(pos.m_loc == nodePtr.p->m_loc);
if (origNodePtr.i != nodePtr.i) { if (origNodePtr.i != nodePtr.i) {
jam(); jam();
origNodePtr.p->unlinkScan(scanPtr); origNodePtr.p->unlinkScan(scanPtr);
...@@ -983,7 +982,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -983,7 +982,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
} }
} else if (scan.m_state == ScanOp::Last) { } else if (scan.m_state == ScanOp::Last) {
jam(); jam();
ndbrequire(pos.m_addr == NullTupAddr); ndbrequire(pos.m_loc == NullTupLoc);
origNodePtr.p->unlinkScan(scanPtr); origNodePtr.p->unlinkScan(scanPtr);
} else { } else {
ndbrequire(false); ndbrequire(false);
......
...@@ -30,9 +30,9 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree ...@@ -30,9 +30,9 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
treePos.m_addr = tree.m_root; treePos.m_loc = tree.m_root;
NodeHandlePtr nodePtr; NodeHandlePtr nodePtr;
if (treePos.m_addr == NullTupAddr) { if (treePos.m_loc == NullTupLoc) {
// empty tree // empty tree
jam(); jam();
treePos.m_pos = 0; treePos.m_pos = 0;
...@@ -41,7 +41,7 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree ...@@ -41,7 +41,7 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree
} }
loop: { loop: {
jam(); jam();
selectNode(signal, frag, nodePtr, treePos.m_addr, AccPref); selectNode(signal, frag, nodePtr, treePos.m_loc, AccPref);
const unsigned occup = nodePtr.p->getOccup(); const unsigned occup = nodePtr.p->getOccup();
ndbrequire(occup != 0); ndbrequire(occup != 0);
// number of equal initial attributes in bounding node // number of equal initial attributes in bounding node
...@@ -82,15 +82,14 @@ loop: { ...@@ -82,15 +82,14 @@ loop: {
} }
if (i == 0 ? (ret < 0) : (ret > 0)) { if (i == 0 ? (ret < 0) : (ret > 0)) {
jam(); jam();
const TupAddr tupAddr = nodePtr.p->getLink(i); const TupLoc loc = nodePtr.p->getLink(i);
if (tupAddr != NullTupAddr) { if (loc != NullTupLoc) {
jam(); jam();
// continue to left/right subtree // continue to left/right subtree
treePos.m_addr = tupAddr; treePos.m_loc = loc;
goto loop; goto loop;
} }
// position is immediately before/after this node // position is immediately before/after this node
// XXX disallow second case
treePos.m_pos = (i == 0 ? 0 : occup); treePos.m_pos = (i == 0 ? 0 : occup);
treePos.m_match = false; treePos.m_match = false;
return; return;
...@@ -159,16 +158,16 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -159,16 +158,16 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
unsigned pos = treePos.m_pos; unsigned pos = treePos.m_pos;
NodeHandlePtr nodePtr; NodeHandlePtr nodePtr;
// check for empty tree // check for empty tree
if (treePos.m_addr == NullTupAddr) { if (treePos.m_loc == NullTupLoc) {
jam(); jam();
insertNode(signal, frag, nodePtr, AccPref); insertNode(signal, frag, nodePtr, AccPref);
nodePtr.p->pushUp(signal, 0, ent); nodePtr.p->pushUp(signal, 0, ent);
nodePtr.p->setSide(2); nodePtr.p->setSide(2);
tree.m_root = nodePtr.p->m_addr; tree.m_root = nodePtr.p->m_loc;
return; return;
} }
// access full node // access full node
selectNode(signal, frag, nodePtr, treePos.m_addr, AccFull); selectNode(signal, frag, nodePtr, treePos.m_loc, AccFull);
// check if it is bounding node // check if it is bounding node
if (pos != 0 && pos != nodePtr.p->getOccup()) { if (pos != 0 && pos != nodePtr.p->getOccup()) {
jam(); jam();
...@@ -181,18 +180,18 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -181,18 +180,18 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// returns min entry // returns min entry
nodePtr.p->pushDown(signal, pos - 1, ent); nodePtr.p->pushDown(signal, pos - 1, ent);
// find position to add the removed min entry // find position to add the removed min entry
TupAddr childAddr = nodePtr.p->getLink(0); TupLoc childLoc = nodePtr.p->getLink(0);
if (childAddr == NullTupAddr) { if (childLoc == NullTupLoc) {
jam(); jam();
// left child will be added // left child will be added
pos = 0; pos = 0;
} else { } else {
jam(); jam();
// find glb node // find glb node
while (childAddr != NullTupAddr) { while (childLoc != NullTupLoc) {
jam(); jam();
selectNode(signal, frag, nodePtr, childAddr, AccHead); selectNode(signal, frag, nodePtr, childLoc, AccHead);
childAddr = nodePtr.p->getLink(1); childLoc = nodePtr.p->getLink(1);
} }
// access full node again // access full node again
accessNode(signal, frag, nodePtr, AccFull); accessNode(signal, frag, nodePtr, AccFull);
...@@ -202,7 +201,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -202,7 +201,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
} }
// adding new min or max // adding new min or max
unsigned i = (pos == 0 ? 0 : 1); unsigned i = (pos == 0 ? 0 : 1);
ndbrequire(nodePtr.p->getLink(i) == NullTupAddr); ndbrequire(nodePtr.p->getLink(i) == NullTupLoc);
// check if the half-leaf/leaf has room for one more // check if the half-leaf/leaf has room for one more
if (nodePtr.p->getOccup() < tree.m_maxOccup) { if (nodePtr.p->getOccup() < tree.m_maxOccup) {
jam(); jam();
...@@ -214,8 +213,8 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -214,8 +213,8 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
insertNode(signal, frag, childPtr, AccPref); insertNode(signal, frag, childPtr, AccPref);
childPtr.p->pushUp(signal, 0, ent); childPtr.p->pushUp(signal, 0, ent);
// connect parent and child // connect parent and child
nodePtr.p->setLink(i, childPtr.p->m_addr); nodePtr.p->setLink(i, childPtr.p->m_loc);
childPtr.p->setLink(2, nodePtr.p->m_addr); childPtr.p->setLink(2, nodePtr.p->m_loc);
childPtr.p->setSide(i); childPtr.p->setSide(i);
// re-balance tree at each node // re-balance tree at each node
while (true) { while (true) {
...@@ -254,14 +253,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -254,14 +253,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
} else { } else {
ndbrequire(false); ndbrequire(false);
} }
TupAddr parentAddr = nodePtr.p->getLink(2); TupLoc parentLoc = nodePtr.p->getLink(2);
if (parentAddr == NullTupAddr) { if (parentLoc == NullTupLoc) {
jam(); jam();
// root node - done // root node - done
break; break;
} }
i = nodePtr.p->getSide(); i = nodePtr.p->getSide();
selectNode(signal, frag, nodePtr, parentAddr, AccHead); selectNode(signal, frag, nodePtr, parentLoc, AccHead);
} }
} }
...@@ -275,7 +274,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -275,7 +274,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
unsigned pos = treePos.m_pos; unsigned pos = treePos.m_pos;
NodeHandlePtr nodePtr; NodeHandlePtr nodePtr;
// access full node // access full node
selectNode(signal, frag, nodePtr, treePos.m_addr, AccFull); selectNode(signal, frag, nodePtr, treePos.m_loc, AccFull);
TreeEnt ent; TreeEnt ent;
// check interior node first // check interior node first
if (nodePtr.p->getChilds() == 2) { if (nodePtr.p->getChilds() == 2) {
...@@ -290,11 +289,11 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -290,11 +289,11 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
// save current handle // save current handle
NodeHandlePtr parentPtr = nodePtr; NodeHandlePtr parentPtr = nodePtr;
// find glb node // find glb node
TupAddr childAddr = nodePtr.p->getLink(0); TupLoc childLoc = nodePtr.p->getLink(0);
while (childAddr != NullTupAddr) { while (childLoc != NullTupLoc) {
jam(); jam();
selectNode(signal, frag, nodePtr, childAddr, AccHead); selectNode(signal, frag, nodePtr, childLoc, AccHead);
childAddr = nodePtr.p->getLink(1); childLoc = nodePtr.p->getLink(1);
} }
// access full node again // access full node again
accessNode(signal, frag, nodePtr, AccFull); accessNode(signal, frag, nodePtr, AccFull);
...@@ -311,21 +310,21 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -311,21 +310,21 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
// handle half-leaf // handle half-leaf
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
jam(); jam();
TupAddr childAddr = nodePtr.p->getLink(i); TupLoc childLoc = nodePtr.p->getLink(i);
if (childAddr != NullTupAddr) { if (childLoc != NullTupLoc) {
// move to child // move to child
selectNode(signal, frag, nodePtr, childAddr, AccFull); selectNode(signal, frag, nodePtr, childLoc, AccFull);
// balance of half-leaf parent requires child to be leaf // balance of half-leaf parent requires child to be leaf
break; break;
} }
} }
ndbrequire(nodePtr.p->getChilds() == 0); ndbrequire(nodePtr.p->getChilds() == 0);
// get parent if any // get parent if any
TupAddr parentAddr = nodePtr.p->getLink(2); TupLoc parentLoc = nodePtr.p->getLink(2);
NodeHandlePtr parentPtr; NodeHandlePtr parentPtr;
unsigned i = nodePtr.p->getSide(); unsigned i = nodePtr.p->getSide();
// move all that fits into parent // move all that fits into parent
if (parentAddr != NullTupAddr) { if (parentLoc != NullTupLoc) {
jam(); jam();
selectNode(signal, frag, parentPtr, nodePtr.p->getLink(2), AccFull); selectNode(signal, frag, parentPtr, nodePtr.p->getLink(2), AccFull);
parentPtr.p->slide(signal, nodePtr, i); parentPtr.p->slide(signal, nodePtr, i);
...@@ -338,36 +337,36 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -338,36 +337,36 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
} }
// remove empty leaf // remove empty leaf
deleteNode(signal, frag, nodePtr); deleteNode(signal, frag, nodePtr);
if (parentAddr == NullTupAddr) { if (parentLoc == NullTupLoc) {
jam(); jam();
// tree is now empty // tree is now empty
tree.m_root = NullTupAddr; tree.m_root = NullTupLoc;
return; return;
} }
nodePtr = parentPtr; nodePtr = parentPtr;
nodePtr.p->setLink(i, NullTupAddr); nodePtr.p->setLink(i, NullTupLoc);
#ifdef dbtux_min_occup_less_max_occup #ifdef dbtux_min_occup_less_max_occup
// check if we created a half-leaf // check if we created a half-leaf
if (nodePtr.p->getBalance() == 0) { if (nodePtr.p->getBalance() == 0) {
jam(); jam();
// move entries from the other child // move entries from the other child
TupAddr childAddr = nodePtr.p->getLink(1 - i); TupLoc childLoc = nodePtr.p->getLink(1 - i);
NodeHandlePtr childPtr; NodeHandlePtr childPtr;
selectNode(signal, frag, childPtr, childAddr, AccFull); selectNode(signal, frag, childPtr, childLoc, AccFull);
nodePtr.p->slide(signal, childPtr, 1 - i); nodePtr.p->slide(signal, childPtr, 1 - i);
if (childPtr.p->getOccup() == 0) { if (childPtr.p->getOccup() == 0) {
jam(); jam();
deleteNode(signal, frag, childPtr); deleteNode(signal, frag, childPtr);
nodePtr.p->setLink(1 - i, NullTupAddr); nodePtr.p->setLink(1 - i, NullTupLoc);
// we are balanced again but our parent balance changes by -1 // we are balanced again but our parent balance changes by -1
parentAddr = nodePtr.p->getLink(2); parentLoc = nodePtr.p->getLink(2);
if (parentAddr == NullTupAddr) { if (parentLoc == NullTupLoc) {
jam(); jam();
return; return;
} }
// fix side and become parent // fix side and become parent
i = nodePtr.p->getSide(); i = nodePtr.p->getSide();
selectNode(signal, frag, nodePtr, parentAddr, AccHead); selectNode(signal, frag, nodePtr, parentLoc, AccHead);
} }
} }
#endif #endif
...@@ -411,14 +410,14 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -411,14 +410,14 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
} else { } else {
ndbrequire(false); ndbrequire(false);
} }
TupAddr parentAddr = nodePtr.p->getLink(2); TupLoc parentLoc = nodePtr.p->getLink(2);
if (parentAddr == NullTupAddr) { if (parentLoc == NullTupLoc) {
jam(); jam();
// root node - done // root node - done
return; return;
} }
i = nodePtr.p->getSide(); i = nodePtr.p->getSide();
selectNode(signal, frag, nodePtr, parentAddr, AccHead); selectNode(signal, frag, nodePtr, parentLoc, AccHead);
} }
} }
...@@ -451,7 +450,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -451,7 +450,7 @@ Dbtux::treeRotateSingle(Signal* signal,
Verify that n5Bal is 1 if RR rotate and -1 if LL rotate. Verify that n5Bal is 1 if RR rotate and -1 if LL rotate.
*/ */
NodeHandlePtr n5Ptr = nodePtr; NodeHandlePtr n5Ptr = nodePtr;
const TupAddr n5Addr = n5Ptr.p->m_addr; const TupLoc n5Loc = n5Ptr.p->m_loc;
const int n5Bal = n5Ptr.p->getBalance(); const int n5Bal = n5Ptr.p->getBalance();
const int n5side = n5Ptr.p->getSide(); const int n5side = n5Ptr.p->getSide();
ndbrequire(n5Bal + (1 - i) == i); ndbrequire(n5Bal + (1 - i) == i);
...@@ -460,36 +459,36 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -460,36 +459,36 @@ Dbtux::treeRotateSingle(Signal* signal,
node 5. For an insert to cause this it must have the same balance as 5. node 5. For an insert to cause this it must have the same balance as 5.
For deletes it can have the balance 0. For deletes it can have the balance 0.
*/ */
TupAddr n3Addr = n5Ptr.p->getLink(i); TupLoc n3Loc = n5Ptr.p->getLink(i);
NodeHandlePtr n3Ptr; NodeHandlePtr n3Ptr;
selectNode(signal, frag, n3Ptr, n3Addr, AccHead); selectNode(signal, frag, n3Ptr, n3Loc, AccHead);
const int n3Bal = n3Ptr.p->getBalance(); const int n3Bal = n3Ptr.p->getBalance();
/* /*
2 must always be there but is not changed. Thus we mereley check that it 2 must always be there but is not changed. Thus we mereley check that it
exists. exists.
*/ */
ndbrequire(n3Ptr.p->getLink(i) != NullTupAddr); ndbrequire(n3Ptr.p->getLink(i) != NullTupLoc);
/* /*
4 is not necessarily there but if it is there it will move from one 4 is not necessarily there but if it is there it will move from one
side of 3 to the other side of 5. For LL it moves from the right side side of 3 to the other side of 5. For LL it moves from the right side
to the left side and for RR it moves from the left side to the right to the left side and for RR it moves from the left side to the right
side. This means that it also changes parent from 3 to 5. side. This means that it also changes parent from 3 to 5.
*/ */
TupAddr n4Addr = n3Ptr.p->getLink(1 - i); TupLoc n4Loc = n3Ptr.p->getLink(1 - i);
NodeHandlePtr n4Ptr; NodeHandlePtr n4Ptr;
if (n4Addr != NullTupAddr) { if (n4Loc != NullTupLoc) {
jam(); jam();
selectNode(signal, frag, n4Ptr, n4Addr, AccHead); selectNode(signal, frag, n4Ptr, n4Loc, AccHead);
ndbrequire(n4Ptr.p->getSide() == (1 - i) && ndbrequire(n4Ptr.p->getSide() == (1 - i) &&
n4Ptr.p->getLink(2) == n3Addr); n4Ptr.p->getLink(2) == n3Loc);
n4Ptr.p->setSide(i); n4Ptr.p->setSide(i);
n4Ptr.p->setLink(2, n5Addr); n4Ptr.p->setLink(2, n5Loc);
}//if }//if
/* /*
Retrieve the address of 5's parent before it is destroyed Retrieve the address of 5's parent before it is destroyed
*/ */
TupAddr n0Addr = n5Ptr.p->getLink(2); TupLoc n0Loc = n5Ptr.p->getLink(2);
/* /*
The next step is to perform the rotation. 3 will inherit 5's parent The next step is to perform the rotation. 3 will inherit 5's parent
...@@ -503,22 +502,22 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -503,22 +502,22 @@ Dbtux::treeRotateSingle(Signal* signal,
1. 3 must have had 5 as parent before the change. 1. 3 must have had 5 as parent before the change.
2. 3's side is left for LL and right for RR before change. 2. 3's side is left for LL and right for RR before change.
*/ */
ndbrequire(n3Ptr.p->getLink(2) == n5Addr); ndbrequire(n3Ptr.p->getLink(2) == n5Loc);
ndbrequire(n3Ptr.p->getSide() == i); ndbrequire(n3Ptr.p->getSide() == i);
n3Ptr.p->setLink(1 - i, n5Addr); n3Ptr.p->setLink(1 - i, n5Loc);
n3Ptr.p->setLink(2, n0Addr); n3Ptr.p->setLink(2, n0Loc);
n3Ptr.p->setSide(n5side); n3Ptr.p->setSide(n5side);
n5Ptr.p->setLink(i, n4Addr); n5Ptr.p->setLink(i, n4Loc);
n5Ptr.p->setLink(2, n3Addr); n5Ptr.p->setLink(2, n3Loc);
n5Ptr.p->setSide(1 - i); n5Ptr.p->setSide(1 - i);
if (n0Addr != NullTupAddr) { if (n0Loc != NullTupLoc) {
jam(); jam();
NodeHandlePtr n0Ptr; NodeHandlePtr n0Ptr;
selectNode(signal, frag, n0Ptr, n0Addr, AccHead); selectNode(signal, frag, n0Ptr, n0Loc, AccHead);
n0Ptr.p->setLink(n5side, n3Addr); n0Ptr.p->setLink(n5side, n3Loc);
} else { } else {
jam(); jam();
frag.m_tree.m_root = n3Addr; frag.m_tree.m_root = n3Loc;
}//if }//if
/* The final step of the change is to update the balance of 3 and /* The final step of the change is to update the balance of 3 and
5 that changed places. There are two cases here. The first case is 5 that changed places. There are two cases here. The first case is
...@@ -655,36 +654,36 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsi ...@@ -655,36 +654,36 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsi
{ {
// old top node // old top node
NodeHandlePtr n6Ptr = nodePtr; NodeHandlePtr n6Ptr = nodePtr;
const TupAddr n6Addr = n6Ptr.p->m_addr; const TupLoc n6Loc = n6Ptr.p->m_loc;
// the un-updated balance // the un-updated balance
const int n6Bal = n6Ptr.p->getBalance(); const int n6Bal = n6Ptr.p->getBalance();
const unsigned n6Side = n6Ptr.p->getSide(); const unsigned n6Side = n6Ptr.p->getSide();
// level 1 // level 1
TupAddr n2Addr = n6Ptr.p->getLink(i); TupLoc n2Loc = n6Ptr.p->getLink(i);
NodeHandlePtr n2Ptr; NodeHandlePtr n2Ptr;
selectNode(signal, frag, n2Ptr, n2Addr, AccHead); selectNode(signal, frag, n2Ptr, n2Loc, AccHead);
const int n2Bal = n2Ptr.p->getBalance(); const int n2Bal = n2Ptr.p->getBalance();
// level 2 // level 2
TupAddr n4Addr = n2Ptr.p->getLink(1 - i); TupLoc n4Loc = n2Ptr.p->getLink(1 - i);
NodeHandlePtr n4Ptr; NodeHandlePtr n4Ptr;
selectNode(signal, frag, n4Ptr, n4Addr, AccHead); selectNode(signal, frag, n4Ptr, n4Loc, AccHead);
const int n4Bal = n4Ptr.p->getBalance(); const int n4Bal = n4Ptr.p->getBalance();
ndbrequire(i <= 1); ndbrequire(i <= 1);
ndbrequire(n6Bal + (1 - i) == i); ndbrequire(n6Bal + (1 - i) == i);
ndbrequire(n2Bal == -n6Bal); ndbrequire(n2Bal == -n6Bal);
ndbrequire(n2Ptr.p->getLink(2) == n6Addr); ndbrequire(n2Ptr.p->getLink(2) == n6Loc);
ndbrequire(n2Ptr.p->getSide() == i); ndbrequire(n2Ptr.p->getSide() == i);
ndbrequire(n4Ptr.p->getLink(2) == n2Addr); ndbrequire(n4Ptr.p->getLink(2) == n2Loc);
// level 3 // level 3
TupAddr n3Addr = n4Ptr.p->getLink(i); TupLoc n3Loc = n4Ptr.p->getLink(i);
TupAddr n5Addr = n4Ptr.p->getLink(1 - i); TupLoc n5Loc = n4Ptr.p->getLink(1 - i);
// fill up leaf before it becomes internal // fill up leaf before it becomes internal
if (n3Addr == NullTupAddr && n5Addr == NullTupAddr) { if (n3Loc == NullTupLoc && n5Loc == NullTupLoc) {
jam(); jam();
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
accessNode(signal, frag, n2Ptr, AccFull); accessNode(signal, frag, n2Ptr, AccFull);
...@@ -694,44 +693,44 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsi ...@@ -694,44 +693,44 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsi
ndbrequire(n4Ptr.p->getOccup() >= tree.m_minOccup); ndbrequire(n4Ptr.p->getOccup() >= tree.m_minOccup);
ndbrequire(n2Ptr.p->getOccup() != 0); ndbrequire(n2Ptr.p->getOccup() != 0);
} else { } else {
if (n3Addr != NullTupAddr) { if (n3Loc != NullTupLoc) {
jam(); jam();
NodeHandlePtr n3Ptr; NodeHandlePtr n3Ptr;
selectNode(signal, frag, n3Ptr, n3Addr, AccHead); selectNode(signal, frag, n3Ptr, n3Loc, AccHead);
n3Ptr.p->setLink(2, n2Addr); n3Ptr.p->setLink(2, n2Loc);
n3Ptr.p->setSide(1 - i); n3Ptr.p->setSide(1 - i);
} }
if (n5Addr != NullTupAddr) { if (n5Loc != NullTupLoc) {
jam(); jam();
NodeHandlePtr n5Ptr; NodeHandlePtr n5Ptr;
selectNode(signal, frag, n5Ptr, n5Addr, AccHead); selectNode(signal, frag, n5Ptr, n5Loc, AccHead);
n5Ptr.p->setLink(2, n6Ptr.p->m_addr); n5Ptr.p->setLink(2, n6Ptr.p->m_loc);
n5Ptr.p->setSide(i); n5Ptr.p->setSide(i);
} }
} }
// parent // parent
TupAddr n0Addr = n6Ptr.p->getLink(2); TupLoc n0Loc = n6Ptr.p->getLink(2);
NodeHandlePtr n0Ptr; NodeHandlePtr n0Ptr;
// perform the rotation // perform the rotation
n6Ptr.p->setLink(i, n5Addr); n6Ptr.p->setLink(i, n5Loc);
n6Ptr.p->setLink(2, n4Addr); n6Ptr.p->setLink(2, n4Loc);
n6Ptr.p->setSide(1 - i); n6Ptr.p->setSide(1 - i);
n2Ptr.p->setLink(1 - i, n3Addr); n2Ptr.p->setLink(1 - i, n3Loc);
n2Ptr.p->setLink(2, n4Addr); n2Ptr.p->setLink(2, n4Loc);
n4Ptr.p->setLink(i, n2Addr); n4Ptr.p->setLink(i, n2Loc);
n4Ptr.p->setLink(1 - i, n6Addr); n4Ptr.p->setLink(1 - i, n6Loc);
n4Ptr.p->setLink(2, n0Addr); n4Ptr.p->setLink(2, n0Loc);
n4Ptr.p->setSide(n6Side); n4Ptr.p->setSide(n6Side);
if (n0Addr != NullTupAddr) { if (n0Loc != NullTupLoc) {
jam(); jam();
selectNode(signal, frag, n0Ptr, n0Addr, AccHead); selectNode(signal, frag, n0Ptr, n0Loc, AccHead);
n0Ptr.p->setLink(n6Side, n4Addr); n0Ptr.p->setLink(n6Side, n4Loc);
} else { } else {
jam(); jam();
frag.m_tree.m_root = n4Addr; frag.m_tree.m_root = n4Loc;
} }
// set balance of changed nodes // set balance of changed nodes
n4Ptr.p->setBalance(0); n4Ptr.p->setBalance(0);
......
index maintenance overhead
==========================
"mc02" 2x1700 MHz linux-2.4.9 gcc-2.96 -O3 one db-node
case a: index on Unsigned
testOIBasic -case u -table 1 -index 1 -fragtype small -threads 10 -rows 100000 -subloop 1 -nologging
case b: index on Varchar(5) + Varchar(5) + Varchar(20) + Unsigned
testOIBasic -case u -table 2 -index 4 -fragtype small -threads 10 -rows 100000 -subloop 1 -nologging
1 million rows, pk update without index, pk update with index
shows ms / 1000 rows for each and pct overhead
040616 mc02/a 40 ms 87 ms 114 pct
mc02/b 51 ms 128 ms 148 pct
optim 1 mc02/a 38 ms 85 ms 124 pct
mc02/b 51 ms 123 ms 140 pct
optim 2 mc02/a 41 ms 80 ms 96 pct
mc02/b 51 ms 117 ms 128 pct
vim: set et:
...@@ -39,6 +39,7 @@ struct Opt { ...@@ -39,6 +39,7 @@ struct Opt {
NdbDictionary::Object::FragmentType m_fragtype; NdbDictionary::Object::FragmentType m_fragtype;
const char* m_index; const char* m_index;
unsigned m_loop; unsigned m_loop;
bool m_nologging;
unsigned m_rows; unsigned m_rows;
unsigned m_scanrd; unsigned m_scanrd;
unsigned m_scanex; unsigned m_scanex;
...@@ -54,6 +55,7 @@ struct Opt { ...@@ -54,6 +55,7 @@ struct Opt {
m_fragtype(NdbDictionary::Object::FragUndefined), m_fragtype(NdbDictionary::Object::FragUndefined),
m_index(0), m_index(0),
m_loop(1), m_loop(1),
m_nologging(false),
m_rows(1000), m_rows(1000),
m_scanrd(240), m_scanrd(240),
m_scanex(240), m_scanex(240),
...@@ -82,6 +84,7 @@ printhelp() ...@@ -82,6 +84,7 @@ printhelp()
<< " -fragtype T fragment type single/small/medium/large" << endl << " -fragtype T fragment type single/small/medium/large" << endl
<< " -index xyz only given index numbers (digits 1-9)" << endl << " -index xyz only given index numbers (digits 1-9)" << endl
<< " -loop N loop count full suite forever=0 [" << d.m_loop << "]" << endl << " -loop N loop count full suite forever=0 [" << d.m_loop << "]" << endl
<< " -nologging create tables in no-logging mode" << endl
<< " -rows N rows per thread [" << d.m_rows << "]" << endl << " -rows N rows per thread [" << d.m_rows << "]" << endl
<< " -scanrd N scan read parallelism [" << d.m_scanrd << "]" << endl << " -scanrd N scan read parallelism [" << d.m_scanrd << "]" << endl
<< " -scanex N scan exclusive parallelism [" << d.m_scanex << "]" << endl << " -scanex N scan exclusive parallelism [" << d.m_scanex << "]" << endl
...@@ -476,7 +479,7 @@ tt1 = { ...@@ -476,7 +479,7 @@ tt1 = {
"TT1", 5, tt1col, 4, tt1itab "TT1", 5, tt1col, 4, tt1itab
}; };
// tt2 + tt2x1 tt2x2 tt2x3 // tt2 + tt2x1 tt2x2 tt2x3 tt2x4
static const Col static const Col
tt2col[] = { tt2col[] = {
...@@ -505,6 +508,14 @@ tt2x3col[] = { ...@@ -505,6 +508,14 @@ tt2x3col[] = {
{ 1, tt2col[4] } { 1, tt2col[4] }
}; };
static const ICol
tt2x4col[] = {
{ 0, tt2col[4] },
{ 1, tt2col[3] },
{ 2, tt2col[2] },
{ 3, tt2col[1] }
};
static const ITab static const ITab
tt2x1 = { tt2x1 = {
"TT2X1", 2, tt2x1col "TT2X1", 2, tt2x1col
...@@ -520,16 +531,22 @@ tt2x3 = { ...@@ -520,16 +531,22 @@ tt2x3 = {
"TT2X3", 2, tt2x3col "TT2X3", 2, tt2x3col
}; };
static const ITab
tt2x4 = {
"TT2X4", 4, tt2x4col
};
static const ITab static const ITab
tt2itab[] = { tt2itab[] = {
tt2x1, tt2x1,
tt2x2, tt2x2,
tt2x3 tt2x3,
tt2x4
}; };
static const Tab static const Tab
tt2 = { tt2 = {
"TT2", 5, tt2col, 3, tt2itab "TT2", 5, tt2col, 4, tt2itab
}; };
// all tables // all tables
...@@ -823,6 +840,9 @@ createtable(Par par) ...@@ -823,6 +840,9 @@ createtable(Par par)
if (par.m_fragtype != NdbDictionary::Object::FragUndefined) { if (par.m_fragtype != NdbDictionary::Object::FragUndefined) {
t.setFragmentType(par.m_fragtype); t.setFragmentType(par.m_fragtype);
} }
if (par.m_nologging) {
t.setLogging(false);
}
for (unsigned k = 0; k < tab.m_cols; k++) { for (unsigned k = 0; k < tab.m_cols; k++) {
const Col& col = tab.m_col[k]; const Col& col = tab.m_col[k];
NdbDictionary::Column c(col.m_name); NdbDictionary::Column c(col.m_name);
...@@ -2500,9 +2520,28 @@ tbusybuild(Par par) ...@@ -2500,9 +2520,28 @@ tbusybuild(Par par)
} }
static int static int
ttiming(Par par) ttimebuild(Par par)
{ {
Tmr t0, t1, t2; Tmr t1;
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, createindex, ST);
t1.off(par.m_totrows);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, dropindex, ST);
}
LL1("build index - " << t1.time());
return 0;
}
static int
ttimemaint(Par par)
{
Tmr t1, t2;
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
...@@ -2511,16 +2550,13 @@ ttiming(Par par) ...@@ -2511,16 +2550,13 @@ ttiming(Par par)
t1.on(); t1.on();
RUNSTEP(par, pkupdate, MT); RUNSTEP(par, pkupdate, MT);
t1.off(par.m_totrows); t1.off(par.m_totrows);
t0.on();
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
t0.off(par.m_totrows);
t2.on(); t2.on();
RUNSTEP(par, pkupdate, MT); RUNSTEP(par, pkupdate, MT);
t2.off(par.m_totrows); t2.off(par.m_totrows);
RUNSTEP(par, dropindex, ST); RUNSTEP(par, dropindex, ST);
} }
LL1("build index - " << t0.time());
LL1("update - " << t1.time()); LL1("update - " << t1.time());
LL1("update indexed - " << t2.time()); LL1("update indexed - " << t2.time());
LL1("overhead - " << t2.over(t1)); LL1("overhead - " << t2.over(t1));
...@@ -2551,7 +2587,8 @@ tcaselist[] = { ...@@ -2551,7 +2587,8 @@ tcaselist[] = {
TCase("b", tpkops, "pk operations and scan reads"), TCase("b", tpkops, "pk operations and scan reads"),
TCase("c", tmixedops, "pk operations and scan operations"), TCase("c", tmixedops, "pk operations and scan operations"),
TCase("d", tbusybuild, "pk operations and index build"), TCase("d", tbusybuild, "pk operations and index build"),
TCase("t", ttiming, "time index build and maintenance"), TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("z", tdrop, "drop test tables") TCase("z", tdrop, "drop test tables")
}; };
...@@ -2689,6 +2726,10 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) ...@@ -2689,6 +2726,10 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
continue; continue;
} }
} }
if (strcmp(arg, "-nologging") == 0) {
g_opt.m_nologging = true;
continue;
}
if (strcmp(arg, "-rows") == 0) { if (strcmp(arg, "-rows") == 0) {
if (++argv, --argc > 0) { if (++argv, --argc > 0) {
g_opt.m_rows = atoi(argv[0]); g_opt.m_rows = atoi(argv[0]);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment