diff --git a/ndb/include/kernel/signaldata/TupAccess.hpp b/ndb/include/kernel/signaldata/TupAccess.hpp index 5cfb8c0d153cac6d5bef135b954e7f82fc1cce46..ab56a73322c5e98db43f82bb9e124ec2904e3cc4 100644 --- a/ndb/include/kernel/signaldata/TupAccess.hpp +++ b/ndb/include/kernel/signaldata/TupAccess.hpp @@ -129,6 +129,8 @@ private: /* * Operate on entire tuple. Used by TUX where the table has a single * Uint32 array attribute representing an index tree node. + * + * XXX this signal is no longer used by TUX and can be removed */ class TupStoreTh { friend class Dbtup; diff --git a/ndb/include/kernel/signaldata/TupFrag.hpp b/ndb/include/kernel/signaldata/TupFrag.hpp index ffde22178936d41dd9d3a28ae7e04d64540a6159..fc88dacd48f176949dc748a888dc3aeb91f44afe 100644 --- a/ndb/include/kernel/signaldata/TupFrag.hpp +++ b/ndb/include/kernel/signaldata/TupFrag.hpp @@ -69,7 +69,7 @@ class TuxFragReq { friend class Dblqh; friend class Dbtux; public: - STATIC_CONST( SignalLength = 9 ); + STATIC_CONST( SignalLength = 14 ); private: Uint32 userPtr; Uint32 userRef; @@ -80,6 +80,9 @@ private: Uint32 fragOff; Uint32 tableType; Uint32 primaryTableId; + Uint32 tupIndexFragPtrI; + Uint32 tupTableFragPtrI[2]; + Uint32 accTableFragPtrI[2]; }; class TuxFragConf { diff --git a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 02474f6bee04d3c80dfdb63b924a1bc500291ea1..933ee2cf8e185bb4776b3096fdf57913d697eacc 100644 --- a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -2432,6 +2432,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) } fragrecptr.i = req->fragPtrI; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); + ndbrequire(req->fragId == fragrecptr.p->myfid); // caller must be explicit here ndbrequire(req->accOpPtr == RNIL); // seize operation to hold the lock diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 613f9ccea1187f58e82edbb4f998f0c390067092..1abf4b3a7e9beeedbb3a4b7515f3f90305ef3bfc 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -1225,6 +1225,18 @@ Dblqh::sendAddFragReq(Signal* signal) tuxreq->fragOff = addfragptr.p->lh3DistrBits; tuxreq->tableType = addfragptr.p->tableType; tuxreq->primaryTableId = addfragptr.p->primaryTableId; + // pointer to index fragment in TUP + tuxreq->tupIndexFragPtrI = + addfragptr.p->addfragStatus == AddFragRecord::WAIT_TWO_TUX ? + fragptr.p->tupFragptr[0] : fragptr.p->tupFragptr[1]; + // pointers to table fragments in TUP and ACC + FragrecordPtr tFragPtr; + tFragPtr.i = fragptr.p->tableFragptr; + ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); + tuxreq->tupTableFragPtrI[0] = tFragPtr.p->tupFragptr[0]; + tuxreq->tupTableFragPtrI[1] = tFragPtr.p->tupFragptr[1]; + tuxreq->accTableFragPtrI[0] = tFragPtr.p->accFragptr[0]; + tuxreq->accTableFragPtrI[1] = tFragPtr.p->accFragptr[1]; sendSignal(fragptr.p->tuxBlockref, GSN_TUXFRAGREQ, signal, TuxFragReq::SignalLength, JBB); return; diff --git a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp index c09c8984ce253ebe249206ceb2e4e2eb77081a5f..f2ce54ec96cb258154e5750d51725453a7b2b487 100644 --- a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp +++ b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp @@ -996,6 +996,14 @@ public: Dbtup(const class Configuration &); virtual ~Dbtup(); + /* + * TUX index in TUP has single Uint32 array attribute which stores an + * index node. TUX uses following methods. + */ + int tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node); + void tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node); + void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node); + private: BLOCK_DEFINES(Dbtup); diff --git a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp index 3a074f7fe5bc53f50a7b7a00c56b3eaad5c8fd94..baa738a8c4273af7fbc4614267b637c9c2cef897 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp @@ -179,6 +179,64 @@ Dbtup::execTUP_QUERY_TH(Signal* signal) return; } +int +Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node) +{ + FragrecordPtr fragPtr; + fragPtr.i = fragPtrI; + ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); + TablerecPtr tablePtr; + tablePtr.i = fragPtr.p->fragTableId; + ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec); + PagePtr pagePtr; + terrorCode = 0; + if (! allocTh(fragPtr.p, tablePtr.p, NORMAL_PAGE, signal, pageOffset, pagePtr)) { + jam(); + ndbrequire(terrorCode != 0); + return terrorCode; + } + pageId = pagePtr.i; + Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE); + Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr); + node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset; + return 0; +} + +void +Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node) +{ + FragrecordPtr fragPtr; + fragPtr.i = fragPtrI; + ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); + TablerecPtr tablePtr; + tablePtr.i = fragPtr.p->fragTableId; + ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec); + PagePtr pagePtr; + pagePtr.i = pageId; + ptrCheckGuard(pagePtr, cnoOfPage, page); + Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE); + Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr); + ndbrequire(node == &pagePtr.p->pageWord[pageOffset] + attrDataOffset); + freeTh(fragPtr.p, tablePtr.p, signal, pagePtr.p, pageOffset); +} + +void +Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node) +{ + FragrecordPtr fragPtr; + fragPtr.i = fragPtrI; + ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); + TablerecPtr tablePtr; + tablePtr.i = fragPtr.p->fragTableId; + ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec); + PagePtr pagePtr; + pagePtr.i = pageId; + ptrCheckGuard(pagePtr, cnoOfPage, page); + Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE); + Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr); + node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset; +} + void Dbtup::execTUP_STORE_TH(Signal* signal) { diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp index e871fc86dea202be34ab167f94bfa408c466000f..dba778ccebeed2bd58456ca5143425fe8002d6fc 100644 --- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp +++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp @@ -25,6 +25,9 @@ #include <DataBuffer.hpp> #include <md5_hash.hpp> +// big brother +#include <Dbtup.hpp> + // signal classes #include <signaldata/DictTabInfo.hpp> #include <signaldata/TuxContinueB.hpp> @@ -92,15 +95,13 @@ public: Dbtux(const Configuration& conf); virtual ~Dbtux(); + // pointer to TUP instance in this thread + Dbtup* c_tup; + private: // sizes are in words (Uint32) static const unsigned MaxIndexFragments = 2 * NO_OF_FRAG_PER_NODE; static const unsigned MaxIndexAttributes = MAX_ATTRIBUTES_IN_INDEX; -#ifdef VM_TRACE - static const unsigned MaxNodeHandles = 10000; // More space for printTree -#else - static const unsigned MaxNodeHandles = 128; // enough for 1 operation -#endif static const unsigned MaxAttrDataSize = 2048; public: static const unsigned DescPageSize = 256; @@ -153,8 +154,7 @@ private: static const unsigned AttributeHeaderSize = 1; /* - * Logical tuple address, "local key". Identifies both table tuples - * and index tuples. The code assumes it is one word. + * Logical tuple address, "local key". Identifies table tuples. */ typedef Uint32 TupAddr; static const unsigned NullTupAddr = (Uint32)-1; @@ -168,8 +168,18 @@ private: Uint32 m_pageId; // page i-value Uint16 m_pageOffset; // page offset in words TupLoc(); + TupLoc(Uint32 pageId, Uint16 pageOffset); + bool operator==(const TupLoc& loc) const; + bool operator!=(const TupLoc& loc) const; }; + /* + * There is no const member NullTupLoc since the compiler may not be + * able to optimize it to TupLoc() constants. Instead null values are + * constructed on the stack with TupLoc(). + */ +#define NullTupLoc TupLoc() + // tree definitions /* @@ -183,7 +193,7 @@ private: TupAddr m_tupAddr; // address of original tuple Uint16 m_tupVersion; // version Uint8 m_fragBit; // which duplicated table fragment - Uint8 unused1; + Uint8 pad1; TreeEnt(); // methods int cmp(const TreeEnt ent) const; @@ -196,7 +206,7 @@ private: * prefix 3) max and min entries 4) rest of entries 5) one extra entry * used as work space. * - * struct TreeNode part 1 + * struct TreeNode part 1, size 6 words * min prefix part 2, size TreeHead::m_prefSize * max prefix part 2, size TreeHead::m_prefSize * max entry part 3 @@ -204,6 +214,10 @@ private: * rest of entries part 4 * work entry part 5 * + * There are 3 links to other nodes: left child, right child, parent. + * These are in TupLoc format but the pageIds and pageOffsets are + * stored in separate arrays (saves 1 word). + * * Occupancy (number of entries) is at least 1 except temporarily when * a node is about to be removed. If occupancy is 1, only max entry * is present but both min and max prefixes are set. @@ -211,11 +225,12 @@ private: struct TreeNode; friend struct TreeNode; struct TreeNode { - TupAddr m_link[3]; // link to 0-left child 1-right child 2-parent - Uint8 m_side; // we are 0-left child 1-right child 2-root + Uint32 m_linkPI[3]; // link to 0-left child 1-right child 2-parent + Uint16 m_linkPO[3]; // page offsets for above real page ids + unsigned m_side : 2; // we are 0-left child 1-right child 2-root + int m_balance : 2; // balance -1, 0, +1 + unsigned pad1 : 4; Uint8 m_occup; // current number of entries - Int8 m_balance; // balance -1, 0, +1 - Uint8 unused1; Uint32 m_nodeScan; // list of scans at this node TreeNode(); }; @@ -243,7 +258,7 @@ private: Uint8 m_prefSize; // words in min/max prefix each Uint8 m_minOccup; // min entries in internal node Uint8 m_maxOccup; // max entries in node - TupAddr m_root; // root node + TupLoc m_root; // root node TreeHead(); // methods unsigned getSize(AccSize acc) const; @@ -261,8 +276,7 @@ private: struct TreePos; friend struct TreePos; struct TreePos { - TupAddr m_addr; // logical node address - TupLoc m_loc; // physical address + TupLoc m_loc; // physical node address Uint16 m_pos; // position 0 to m_occup Uint8 m_match; // at an existing entry Uint8 m_dir; // from link (0-2) or within node (3) @@ -443,9 +457,11 @@ private: Uint16 m_descOff; Uint16 m_numAttrs; TreeHead m_tree; - Uint32 m_nodeList; // node cache of current operation - Uint32 m_nodeFree; // one node pre-allocated for insert + TupLoc m_freeLoc; // one node pre-allocated for insert DLList<ScanOp> m_scanList; // current scans on this fragment + Uint32 m_tupIndexFragPtrI; + Uint32 m_tupTableFragPtrI[2]; + Uint32 m_accTableFragPtrI[2]; union { Uint32 nextPool; }; @@ -476,62 +492,39 @@ private: // node handles /* - * A tree operation builds a cache of accessed nodes. This allows - * different implementations of index memory access. The cache is - * committed and released at the end of the operation. + * A node handle is a reference to a tree node in TUP. It is used to + * operate on the node. Node handles are allocated on the stack. */ struct NodeHandle; friend struct NodeHandle; struct NodeHandle { - enum Flags { - // bits 0,1 mark need for left,right prefix - DoInsert = (1 << 2), - DoDelete = (1 << 3), - DoUpdate = (1 << 4) - }; - Dbtux& m_tux; // this block Frag& m_frag; // fragment using the node - TupAddr m_addr; // logical node address TupLoc m_loc; // physical node address - AccSize m_acc; // accessed size - unsigned m_flags; // flags - union { - Uint32 m_next; // next active node under fragment - Uint32 nextPool; - }; TreeNode* m_node; // pointer to node storage - Uint32 m_cache[MaxTreeNodeSize]; - NodeHandle(Dbtux& tux, Frag& frag); + AccSize m_acc; // accessed size + NodeHandle(Frag& frag); + NodeHandle(const NodeHandle& node); + NodeHandle& operator=(const NodeHandle& node); // getters - TupAddr getLink(unsigned i); + TupLoc getLink(unsigned i); unsigned getChilds(); // cannot spell unsigned getSide(); unsigned getOccup(); int getBalance(); Uint32 getNodeScan(); - Data getPref(unsigned i); - TreeEnt getEnt(unsigned pos); - TreeEnt getMinMax(unsigned i); // setters - void setLink(unsigned i, TupAddr addr); + void setLink(unsigned i, TupLoc loc); void setSide(unsigned i); void setOccup(unsigned n); void setBalance(int b); void setNodeScan(Uint32 scanPtrI); - // operations XXX maybe these should move to Dbtux level - void pushUp(Signal* signal, unsigned pos, const TreeEnt& ent); - void popDown(Signal* signal, unsigned pos, TreeEnt& ent); - void pushDown(Signal* signal, unsigned pos, TreeEnt& ent); - void popUp(Signal* signal, unsigned pos, TreeEnt& ent); - void slide(Signal* signal, Ptr<NodeHandle> nodePtr, unsigned i); - void linkScan(Dbtux::ScanOpPtr scanPtr); - void unlinkScan(Dbtux::ScanOpPtr scanPtr); - bool islinkScan(Dbtux::ScanOpPtr scanPtr); - // for ndbrequire - void progError(int line, int cause, const char* extra); + // access other parts of the node + Data getPref(unsigned i); + TreeEnt getEnt(unsigned pos); + TreeEnt getMinMax(unsigned i); + // for ndbrequire and ndbassert + void progError(int line, int cause, const char* file); }; - typedef Ptr<NodeHandle> NodeHandlePtr; - ArrayPool<NodeHandle> c_nodeHandlePool; // parameters for methods @@ -562,17 +555,6 @@ private: ReadPar(); }; - /* - * Node storage operation. - */ - struct StorePar { - TupStoreTh::OpCode m_opCode;// operation code - unsigned m_offset; // data offset in words - unsigned m_size; // number of words - Uint32 m_errorCode; // terrorCode from TUP - StorePar(); - }; - /* * Tree search for entry. */ @@ -642,20 +624,26 @@ private: void execTUX_MAINT_REQ(Signal* signal); void tupReadAttrs(Signal* signal, const Frag& frag, ReadPar& readPar); void tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar); - void tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, StorePar storePar); /* * DbtuxNode.cpp */ - void seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr); - void preallocNode(Signal* signal, Frag& frag, Uint32& errorCode); - void findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr); - void selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr, AccSize acc); - void insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc); - void deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr); - void accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc); - void setNodePref(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i); - void commitNodes(Signal* signal, Frag& frag, bool updateOk); + int allocNode(Signal* signal, NodeHandle& node); + void accessNode(Signal* signal, NodeHandle& node, AccSize acc); + void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc); + void insertNode(Signal* signal, NodeHandle& node, AccSize acc); + void deleteNode(Signal* signal, NodeHandle& node); + void setNodePref(Signal* signal, NodeHandle& node, unsigned i); + // node operations + void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent); + void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); + void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); + void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); + void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i); + // scans linked to node + void linkScan(NodeHandle& node, ScanOpPtr scanPtr); + void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr); + bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr); /* * DbtuxTree.cpp @@ -663,8 +651,8 @@ private: void treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& treePos); void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent); void treeRemove(Signal* signal, Frag& frag, TreePos treePos); - void treeRotateSingle(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i); - void treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i); + void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); + void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); /* * DbtuxScan.cpp @@ -698,23 +686,24 @@ private: struct PrintPar { char m_path[100]; // LR prefix unsigned m_side; // expected side - TupAddr m_parent; // expected parent address + TupLoc m_parent; // expected parent address int m_depth; // returned depth unsigned m_occup; // returned occupancy bool m_ok; // returned status PrintPar(); }; void printTree(Signal* signal, Frag& frag, NdbOut& out); - void printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar& par); + void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par); + friend class NdbOut& operator<<(NdbOut&, const TupLoc&); friend class NdbOut& operator<<(NdbOut&, const TreeEnt&); friend class NdbOut& operator<<(NdbOut&, const TreeNode&); friend class NdbOut& operator<<(NdbOut&, const TreeHead&); friend class NdbOut& operator<<(NdbOut&, const TreePos&); friend class NdbOut& operator<<(NdbOut&, const DescAttr&); + friend class NdbOut& operator<<(NdbOut&, const ScanOp&); friend class NdbOut& operator<<(NdbOut&, const Index&); friend class NdbOut& operator<<(NdbOut&, const Frag&); friend class NdbOut& operator<<(NdbOut&, const NodeHandle&); - friend class NdbOut& operator<<(NdbOut&, const ScanOp&); FILE* debugFile; NdbOut debugOut; unsigned debugFlags; @@ -831,8 +820,45 @@ Dbtux::ConstData::operator=(Data data) return *this; } +// Dbtux::TupLoc + +inline +Dbtux::TupLoc::TupLoc() : + m_pageId(RNIL), + m_pageOffset(0) +{ +} + +inline +Dbtux::TupLoc::TupLoc(Uint32 pageId, Uint16 pageOffset) : + m_pageId(pageId), + m_pageOffset(pageOffset) +{ +} + +inline bool +Dbtux::TupLoc::operator==(const TupLoc& loc) const +{ + return m_pageId == loc.m_pageId && m_pageOffset == loc.m_pageOffset; +} + +inline bool +Dbtux::TupLoc::operator!=(const TupLoc& loc) const +{ + return ! (*this == loc); +} + // Dbtux::TreeEnt +inline +Dbtux::TreeEnt::TreeEnt() : + m_tupAddr(NullTupAddr), + m_tupVersion(0), + m_fragBit(255), + pad1(0) +{ +} + inline int Dbtux::TreeEnt::cmp(const TreeEnt ent) const { @@ -852,8 +878,36 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const return 0; } +// Dbtux::TreeNode + +inline +Dbtux::TreeNode::TreeNode() : + m_side(2), + m_balance(0), + pad1(0), + m_occup(0), + m_nodeScan(RNIL) +{ + m_linkPI[0] = NullTupLoc.m_pageId; + m_linkPO[0] = NullTupLoc.m_pageOffset; + m_linkPI[1] = NullTupLoc.m_pageId; + m_linkPO[1] = NullTupLoc.m_pageOffset; + m_linkPI[2] = NullTupLoc.m_pageId; + m_linkPO[2] = NullTupLoc.m_pageOffset; +} + // Dbtux::TreeHead +inline +Dbtux::TreeHead::TreeHead() : + m_nodeSize(0), + m_prefSize(0), + m_minOccup(0), + m_maxOccup(0), + m_root() +{ +} + inline unsigned Dbtux::TreeHead::getSize(AccSize acc) const { @@ -885,52 +939,10 @@ Dbtux::TreeHead::getEntList(TreeNode* node) const return (TreeEnt*)ptr; } -// Dbtux - -// constructors - -inline -Dbtux::TupLoc::TupLoc() : - m_pageId(RNIL), - m_pageOffset(0) -{ -} - -inline -Dbtux::TreeEnt::TreeEnt() : - m_tupAddr(NullTupAddr), - m_tupVersion(0), - m_fragBit(255), - unused1(0) -{ -} - -inline -Dbtux::TreeNode::TreeNode() : - m_side(255), - m_occup(0), - m_balance(0), - unused1(0xa1), - m_nodeScan(RNIL) -{ - m_link[0] = NullTupAddr; - m_link[1] = NullTupAddr; - m_link[2] = NullTupAddr; -} - -inline -Dbtux::TreeHead::TreeHead() : - m_nodeSize(0), - m_prefSize(0), - m_minOccup(0), - m_maxOccup(0), - m_root(0) -{ -} +// Dbtux::TreePos inline Dbtux::TreePos::TreePos() : - m_addr(NullTupAddr), m_loc(), m_pos(ZNIL), m_match(false), @@ -939,6 +951,8 @@ Dbtux::TreePos::TreePos() : { } +// Dbtux::DescPage + inline Dbtux::DescPage::DescPage() : m_nextPage(RNIL), @@ -953,6 +967,41 @@ Dbtux::DescPage::DescPage() : } } +// Dbtux::ScanOp + +inline +Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) : + m_state(Undef), + m_lockwait(false), + m_userPtr(RNIL), + m_userRef(RNIL), + m_tableId(RNIL), + m_indexId(RNIL), + m_fragPtrI(RNIL), + m_transId1(0), + m_transId2(0), + m_savePointId(0), + m_accLockOp(RNIL), + m_readCommitted(0), + m_lockMode(0), + m_keyInfo(0), + m_boundMin(scanBoundPool), + m_boundMax(scanBoundPool), + m_scanPos(), + m_lastEnt(), + m_nodeScan(RNIL) +{ + m_bound[0] = &m_boundMin; + m_bound[1] = &m_boundMax; + m_boundCnt[0] = 0; + m_boundCnt[1] = 0; + for (unsigned i = 0; i < MaxAccLockOps; i++) { + m_accLockOps[i] = RNIL; + } +} + +// Dbtux::Index + inline Dbtux::Index::Index() : m_state(NotDefined), @@ -969,6 +1018,8 @@ Dbtux::Index::Index() : }; }; +// Dbtux::Frag + inline Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) : m_tableId(RNIL), @@ -979,12 +1030,18 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) : m_descOff(0), m_numAttrs(ZNIL), m_tree(), - m_nodeList(RNIL), - m_nodeFree(RNIL), - m_scanList(scanOpPool) + m_freeLoc(), + m_scanList(scanOpPool), + m_tupIndexFragPtrI(RNIL) { + m_tupTableFragPtrI[0] = RNIL; + m_tupTableFragPtrI[1] = RNIL; + m_accTableFragPtrI[0] = RNIL; + m_accTableFragPtrI[1] = RNIL; } +// Dbtux::FragOp + inline Dbtux::FragOp::FragOp() : m_userPtr(RNIL), @@ -997,160 +1054,107 @@ Dbtux::FragOp::FragOp() : { }; +// Dbtux::NodeHandle + inline -Dbtux::NodeHandle::NodeHandle(Dbtux& tux, Frag& frag) : - m_tux(tux), +Dbtux::NodeHandle::NodeHandle(Frag& frag) : m_frag(frag), - m_addr(NullTupAddr), m_loc(), - m_acc(AccNone), - m_flags(0), - m_next(RNIL), - m_node(0) + m_node(0), + m_acc(AccNone) { } inline -Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) : - m_state(Undef), - m_lockwait(false), - m_userPtr(RNIL), - m_userRef(RNIL), - m_tableId(RNIL), - m_indexId(RNIL), - m_fragPtrI(RNIL), - m_transId1(0), - m_transId2(0), - m_savePointId(0), - m_accLockOp(RNIL), - m_readCommitted(0), - m_lockMode(0), - m_keyInfo(0), - m_boundMin(scanBoundPool), - m_boundMax(scanBoundPool), - m_scanPos(), - m_lastEnt(), - m_nodeScan(RNIL) +Dbtux::NodeHandle::NodeHandle(const NodeHandle& node) : + m_frag(node.m_frag), + m_loc(node.m_loc), + m_node(node.m_node), + m_acc(node.m_acc) { - m_bound[0] = &m_boundMin; - m_bound[1] = &m_boundMax; - m_boundCnt[0] = 0; - m_boundCnt[1] = 0; - for (unsigned i = 0; i < MaxAccLockOps; i++) { - m_accLockOps[i] = RNIL; - } } -inline -Dbtux::CopyPar::CopyPar() : - m_items(0), - m_headers(true), - m_maxwords(~0), // max unsigned - // output - m_numitems(0), - m_numwords(0) +inline Dbtux::NodeHandle& +Dbtux::NodeHandle::operator=(const NodeHandle& node) { + ndbassert(&m_frag == &node.m_frag); + m_loc = node.m_loc; + m_node = node.m_node; + m_acc = node.m_acc; + return *this; } -inline -Dbtux::ReadPar::ReadPar() : - m_first(0), - m_count(0), - m_data(0), - m_size(0) +inline Dbtux::TupLoc +Dbtux::NodeHandle::getLink(unsigned i) { + ndbrequire(i <= 2); + return TupLoc(m_node->m_linkPI[i], m_node->m_linkPO[i]); } -inline -Dbtux::StorePar::StorePar() : - m_opCode(TupStoreTh::OpUndefined), - m_offset(0), - m_size(0), - m_errorCode(0) +inline unsigned +Dbtux::NodeHandle::getChilds() { + return (getLink(0) != NullTupLoc) + (getLink(1) != NullTupLoc); } -inline -Dbtux::SearchPar::SearchPar() : - m_data(0), - m_ent() +inline unsigned +Dbtux::NodeHandle::getSide() { + return m_node->m_side; } -inline -Dbtux::CmpPar::CmpPar() : - m_data1(0), - m_data2(0), - m_len2(0), - m_first(0), - m_numEq(0) +inline unsigned +Dbtux::NodeHandle::getOccup() { + return m_node->m_occup; } -inline -Dbtux::BoundPar::BoundPar() : - m_data1(0), - m_data2(0), - m_count1(0), - m_len2(0), - m_dir(255) +inline int +Dbtux::NodeHandle::getBalance() { + return m_node->m_balance; } -#ifdef VM_TRACE -inline -Dbtux::PrintPar::PrintPar() : - // caller fills in - m_path(), - m_side(255), - m_parent(NullTupAddr), - // default return values - m_depth(0), - m_occup(0), - m_ok(true) +inline Uint32 +Dbtux::NodeHandle::getNodeScan() { + return m_node->m_nodeScan; } -#endif -// node handles - -inline Dbtux::TupAddr -Dbtux::NodeHandle::getLink(unsigned i) +inline void +Dbtux::NodeHandle::setLink(unsigned i, TupLoc loc) { ndbrequire(i <= 2); - return m_node->m_link[i]; -} - -inline unsigned -Dbtux::NodeHandle::getChilds() -{ - return - (m_node->m_link[0] != NullTupAddr) + - (m_node->m_link[1] != NullTupAddr); + m_node->m_linkPI[i] = loc.m_pageId; + m_node->m_linkPO[i] = loc.m_pageOffset; } -inline Dbtux::TupAddr -Dbtux::NodeHandle::getSide() +inline void +Dbtux::NodeHandle::setSide(unsigned i) { - return m_node->m_side; + ndbrequire(i <= 2); + m_node->m_side = i; } -inline unsigned -Dbtux::NodeHandle::getOccup() +inline void +Dbtux::NodeHandle::setOccup(unsigned n) { - return m_node->m_occup; + TreeHead& tree = m_frag.m_tree; + ndbrequire(n <= tree.m_maxOccup); + m_node->m_occup = n; } -inline int -Dbtux::NodeHandle::getBalance() +inline void +Dbtux::NodeHandle::setBalance(int b) { - return m_node->m_balance; + ndbrequire(abs(b) <= 1); + m_node->m_balance = b; } -inline Uint32 -Dbtux::NodeHandle::getNodeScan() +inline void +Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI) { - return m_node->m_nodeScan; + m_node->m_nodeScan = scanPtrI; } inline Dbtux::Data @@ -1184,45 +1188,69 @@ Dbtux::NodeHandle::getMinMax(unsigned i) return getEnt(i == 0 ? 0 : occup - 1); } -inline void -Dbtux::NodeHandle::setLink(unsigned i, TupAddr addr) +// parameters for methods + +inline +Dbtux::CopyPar::CopyPar() : + m_items(0), + m_headers(true), + m_maxwords(~0), // max unsigned + // output + m_numitems(0), + m_numwords(0) { - ndbrequire(i <= 2); - m_node->m_link[i] = addr; - m_flags |= DoUpdate; } -inline void -Dbtux::NodeHandle::setSide(unsigned i) +inline +Dbtux::ReadPar::ReadPar() : + m_first(0), + m_count(0), + m_data(0), + m_size(0) { - // ndbrequire(i <= 1); - m_node->m_side = i; - m_flags |= DoUpdate; } -inline void -Dbtux::NodeHandle::setOccup(unsigned n) +inline +Dbtux::SearchPar::SearchPar() : + m_data(0), + m_ent() { - TreeHead& tree = m_frag.m_tree; - ndbrequire(n <= tree.m_maxOccup); - m_node->m_occup = n; - m_flags |= DoUpdate; } -inline void -Dbtux::NodeHandle::setBalance(int b) +inline +Dbtux::CmpPar::CmpPar() : + m_data1(0), + m_data2(0), + m_len2(0), + m_first(0), + m_numEq(0) { - ndbrequire(abs(b) <= 1); - m_node->m_balance = b; - m_flags |= DoUpdate; } -inline void -Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI) +inline +Dbtux::BoundPar::BoundPar() : + m_data1(0), + m_data2(0), + m_count1(0), + m_len2(0), + m_dir(255) +{ +} + +#ifdef VM_TRACE +inline +Dbtux::PrintPar::PrintPar() : + // caller fills in + m_path(), + m_side(255), + m_parent(), + // default return values + m_depth(0), + m_occup(0), + m_ok(true) { - m_node->m_nodeScan = scanPtrI; - m_flags |= DoUpdate; } +#endif // other methods diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp index c5d24205d1ab082753869207226b78a1e3aea371..11411c886bece3ebe731fcd5fb649832a7cdefb7 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp @@ -97,7 +97,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) PrintPar par; strcpy(par.m_path, "."); par.m_side = 2; - par.m_parent = NullTupAddr; + par.m_parent = NullTupLoc; printNode(signal, frag, out, tree.m_root, par); out.m_out->flush(); if (! par.m_ok) { @@ -106,26 +106,24 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) signal->theData[1] = 1; execDUMP_STATE_ORD(signal); if (debugFile != 0) { - commitNodes(signal, frag, false); printTree(signal, frag, debugOut); } } ndbrequire(false); } - commitNodes(signal, frag, false); } void -Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar& par) +Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par) { - if (addr == NullTupAddr) { + if (loc == NullTupLoc) { par.m_depth = 0; return; } TreeHead& tree = frag.m_tree; - NodeHandlePtr nodePtr; - selectNode(signal, frag, nodePtr, addr, AccFull); - out << par.m_path << " " << *nodePtr.p << endl; + NodeHandle node(frag); + selectNode(signal, node, loc, AccFull); + out << par.m_path << " " << node << endl; // check children PrintPar cpar[2]; ndbrequire(strlen(par.m_path) + 1 < sizeof(par.m_path)); @@ -133,57 +131,57 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar sprintf(cpar[i].m_path, "%s%c", par.m_path, "LR"[i]); cpar[i].m_side = i; cpar[i].m_depth = 0; - cpar[i].m_parent = addr; - printNode(signal, frag, out, nodePtr.p->getLink(i), cpar[i]); + cpar[i].m_parent = loc; + printNode(signal, frag, out, node.getLink(i), cpar[i]); if (! cpar[i].m_ok) { par.m_ok = false; } } // check child-parent links - if (nodePtr.p->getLink(2) != par.m_parent) { + if (node.getLink(2) != par.m_parent) { par.m_ok = false; out << par.m_path << " *** "; - out << "parent addr " << hex << nodePtr.p->getLink(2); + out << "parent loc " << hex << node.getLink(2); out << " should be " << hex << par.m_parent << endl; } - if (nodePtr.p->getSide() != par.m_side) { + if (node.getSide() != par.m_side) { par.m_ok = false; out << par.m_path << " *** "; - out << "side " << dec << nodePtr.p->getSide(); + out << "side " << dec << node.getSide(); out << " should be " << dec << par.m_side << endl; } // check balance const int balance = -cpar[0].m_depth + cpar[1].m_depth; - if (nodePtr.p->getBalance() != balance) { + if (node.getBalance() != balance) { par.m_ok = false; out << par.m_path << " *** "; - out << "balance " << nodePtr.p->getBalance(); + out << "balance " << node.getBalance(); out << " should be " << balance << endl; } - if (abs(nodePtr.p->getBalance()) > 1) { + if (abs(node.getBalance()) > 1) { par.m_ok = false; out << par.m_path << " *** "; - out << "balance " << nodePtr.p->getBalance() << " is invalid" << endl; + out << "balance " << node.getBalance() << " is invalid" << endl; } // check occupancy - if (nodePtr.p->getOccup() > tree.m_maxOccup) { + if (node.getOccup() > tree.m_maxOccup) { par.m_ok = false; out << par.m_path << " *** "; - out << "occupancy " << nodePtr.p->getOccup(); + out << "occupancy " << node.getOccup(); out << " greater than max " << tree.m_maxOccup << endl; } // check for occupancy of interior node - if (nodePtr.p->getChilds() == 2 && nodePtr.p->getOccup() < tree.m_minOccup) { + if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) { par.m_ok = false; out << par.m_path << " *** "; - out << "occupancy " << nodePtr.p->getOccup() << " of interior node"; + out << "occupancy " << node.getOccup() << " of interior node"; out << " less than min " << tree.m_minOccup << endl; } // check missed half-leaf/leaf merge for (unsigned i = 0; i <= 1; i++) { - if (nodePtr.p->getLink(i) != NullTupAddr && - nodePtr.p->getLink(1 - i) == NullTupAddr && - nodePtr.p->getOccup() + cpar[i].m_occup <= tree.m_maxOccup) { + if (node.getLink(i) != NullTupLoc && + node.getLink(1 - i) == NullTupLoc && + node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) { par.m_ok = false; out << par.m_path << " *** "; out << "missed merge with child " << i << endl; @@ -191,7 +189,19 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar } // return values par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth); - par.m_occup = nodePtr.p->getOccup(); + par.m_occup = node.getOccup(); +} + +NdbOut& +operator<<(NdbOut& out, const Dbtux::TupLoc& loc) +{ + if (loc == Dbtux::NullTupLoc) { + out << "null"; + } else { + out << hex << loc.m_pageId; + out << "." << dec << loc.m_pageOffset; + } + return out; } NdbOut& @@ -206,10 +216,13 @@ operator<<(NdbOut& out, const Dbtux::TreeEnt& ent) NdbOut& operator<<(NdbOut& out, const Dbtux::TreeNode& node) { + Dbtux::TupLoc link0(node.m_linkPI[0], node.m_linkPO[0]); + Dbtux::TupLoc link1(node.m_linkPI[1], node.m_linkPO[1]); + Dbtux::TupLoc link2(node.m_linkPI[2], node.m_linkPO[2]); out << "[TreeNode " << hex << &node; - out << " [left " << hex << node.m_link[0] << "]"; - out << " [right " << hex << node.m_link[1] << "]"; - out << " [up " << hex << node.m_link[2] << "]"; + out << " [left " << link0 << "]"; + out << " [right " << link1 << "]"; + out << " [up " << link2 << "]"; out << " [side " << dec << node.m_side << "]"; out << " [occup " << dec << node.m_occup << "]"; out << " [balance " << dec << (int)node.m_balance << "]"; @@ -238,7 +251,7 @@ NdbOut& operator<<(NdbOut& out, const Dbtux::TreePos& pos) { out << "[TreePos " << hex << &pos; - out << " [addr " << hex << pos.m_addr << "]"; + out << " [loc " << pos.m_loc << "]"; out << " [pos " << dec << pos.m_pos << "]"; out << " [match " << dec << pos.m_match << "]"; out << " [dir " << dec << pos.m_dir << "]"; @@ -338,9 +351,8 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node) const Dbtux::Frag& frag = node.m_frag; const Dbtux::TreeHead& tree = frag.m_tree; out << "[NodeHandle " << hex << &node; - out << " [addr " << hex << node.m_addr << "]"; + out << " [loc " << node.m_loc << "]"; out << " [acc " << dec << node.m_acc << "]"; - out << " [flags " << hex << node.m_flags << "]"; out << " [node " << *node.m_node << "]"; if (node.m_acc >= Dbtux::AccPref) { for (unsigned i = 0; i <= 1; i++) { diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp index 082b243bcb1e564804293c88f23ea26679575b1a..788d29f3bcea68a1d6657ece14a674414ed41c16 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp @@ -21,6 +21,7 @@ Dbtux::Dbtux(const Configuration& conf) : SimulatedBlock(DBTUX, conf), + c_tup(0), c_descPageList(RNIL), #ifdef VM_TRACE debugFile(0), @@ -123,6 +124,8 @@ Dbtux::execSTTOR(Signal* signal) case 1: jam(); CLEAR_ERROR_INSERT_VALUE; + c_tup = (Dbtup*)globalData.getBlock(DBTUP); + ndbrequire(c_tup != 0); break; case 3: jam(); @@ -175,12 +178,11 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) c_fragPool.setSize(nFragment); c_descPagePool.setSize(nDescPage); c_fragOpPool.setSize(MaxIndexFragments); - c_nodeHandlePool.setSize(MaxNodeHandles); c_scanOpPool.setSize(nScanOp); c_scanBoundPool.setSize(nScanBoundWords); /* * Index id is physical array index. We seize and initialize all - * index records now. This assumes ArrayPool is an array. + * index records now. */ IndexPtr indexPtr; while (1) { diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp index a6b2485067c4730511946863d2784a8b8b378b01..e674cdfca740e9d0f5941ee26559d2503de34c5f 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp @@ -72,7 +72,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) } ndbrequire(fragPtr.i != RNIL); Frag& frag = *fragPtr.p; - ndbrequire(frag.m_nodeList == RNIL); // set up index entry TreeEnt ent; ent.m_tupAddr = req->tupAddr; @@ -143,17 +142,18 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) } /* * At most one new node is inserted in the operation. We keep one - * free node pre-allocated so the operation cannot fail. This also - * gives a real TupAddr for links to the new node. + * free node pre-allocated so the operation cannot fail. */ - if (frag.m_nodeFree == RNIL) { + if (frag.m_freeLoc == NullTupLoc) { jam(); - preallocNode(signal, frag, req->errorCode); + NodeHandle node(frag); + req->errorCode = allocNode(signal, node); if (req->errorCode != 0) { jam(); break; } - ndbrequire(frag.m_nodeFree != RNIL); + frag.m_freeLoc = node.m_loc; + ndbrequire(frag.m_freeLoc != NullTupLoc); } treeAdd(signal, frag, treePos, ent); break; @@ -175,7 +175,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) break; } // commit and release nodes - commitNodes(signal, frag, req->errorCode == 0); #ifdef VM_TRACE if (debugFlags & DebugTree) { printTree(signal, frag, debugOut); @@ -199,7 +198,7 @@ Dbtux::tupReadAttrs(Signal* signal, const Frag& frag, ReadPar& readPar) req->requestInfo = 0; req->tableId = frag.m_tableId; req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff); - req->fragPtrI = RNIL; + req->fragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; req->tupAddr = ent.m_tupAddr; req->tupVersion = ent.m_tupVersion; req->pageId = RNIL; @@ -246,7 +245,7 @@ Dbtux::tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar) req->requestInfo = TupReadAttrs::ReadKeys; req->tableId = frag.m_tableId; req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff); - req->fragPtrI = RNIL; + req->fragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; req->tupAddr = ent.m_tupAddr; req->tupVersion = RNIL; // not used req->pageId = RNIL; @@ -270,100 +269,3 @@ Dbtux::tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar) readPar.m_count = numKeys; readPar.m_size = copyPar.m_numwords; } - -/* - * Operate on index node tuple in TUP. The data is copied between node - * cache and index storage via signal data. - */ -void -Dbtux::tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, StorePar storePar) -{ - const TreeHead& tree = frag.m_tree; - // define the direct signal - TupStoreTh* req = (TupStoreTh*)signal->getDataPtrSend(); - req->errorCode = RNIL; - req->tableId = frag.m_indexId; - req->fragId = frag.m_fragId; - req->fragPtrI = RNIL; - req->tupAddr = nodePtr.p->m_addr; - req->tupVersion = 0; - req->pageId = nodePtr.p->m_loc.m_pageId; - req->pageOffset = nodePtr.p->m_loc.m_pageOffset; - req->bufferId = 0; - req->opCode = storePar.m_opCode; - ndbrequire(storePar.m_offset + storePar.m_size <= tree.m_nodeSize); - req->dataOffset = storePar.m_offset; - req->dataSize = storePar.m_size; - // the node cache - ndbrequire(nodePtr.p->m_node != 0); - // the buffer in signal data - Uint32* const buffer = (Uint32*)req + TupStoreTh::SignalLength; - // copy in data - switch (storePar.m_opCode) { - case TupStoreTh::OpRead: - jam(); - #ifdef VM_TRACE - { - Uint32* dst = buffer + storePar.m_offset; - memset(dst, 0xa9, storePar.m_size << 2); - } - #endif - break; - case TupStoreTh::OpInsert: - jam(); - // fallthru - case TupStoreTh::OpUpdate: - jam(); - // copy from cache to signal data - { - Uint32* dst = buffer + storePar.m_offset; - const Uint32* src = (const Uint32*)nodePtr.p->m_node + storePar.m_offset; - memcpy(dst, src, storePar.m_size << 2); - } - break; - case TupStoreTh::OpDelete: - jam(); - break; - default: - ndbrequire(false); - break; - } - // execute - EXECUTE_DIRECT(DBTUP, GSN_TUP_STORE_TH, signal, TupStoreTh::SignalLength); - jamEntry(); - if (req->errorCode != 0) { - jam(); - storePar.m_errorCode = req->errorCode; - return; - } - ndbrequire(req->errorCode == 0); - // copy out data - switch (storePar.m_opCode) { - case TupStoreTh::OpRead: - jam(); - { - Uint32* dst = (Uint32*)nodePtr.p->m_node + storePar.m_offset; - const Uint32* src = (const Uint32*)buffer + storePar.m_offset; - memcpy(dst, src, storePar.m_size << 2); - } - // fallthru - case TupStoreTh::OpInsert: - jam(); - // fallthru - case TupStoreTh::OpUpdate: - jam(); - nodePtr.p->m_addr = req->tupAddr; - nodePtr.p->m_loc.m_pageId = req->pageId; - nodePtr.p->m_loc.m_pageOffset = req->pageOffset; - break; - case TupStoreTh::OpDelete: - jam(); - nodePtr.p->m_addr = NullTupAddr; - nodePtr.p->m_loc.m_pageId = RNIL; - nodePtr.p->m_loc.m_pageOffset = 0; - break; - default: - ndbrequire(false); - break; - } -} diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp index 2ffd599429c17048fd6753300aa7369927ceae07..9e17f18279845e9430201a510ec69a0b03e20c07 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp @@ -85,6 +85,11 @@ Dbtux::execTUXFRAGREQ(Signal* signal) fragPtr.p->m_fragOff = req->fragOff; fragPtr.p->m_fragId = req->fragId; fragPtr.p->m_numAttrs = req->noOfAttr; + fragPtr.p->m_tupIndexFragPtrI = req->tupIndexFragPtrI; + fragPtr.p->m_tupTableFragPtrI[0] = req->tupTableFragPtrI[0]; + fragPtr.p->m_tupTableFragPtrI[1] = req->tupTableFragPtrI[1]; + fragPtr.p->m_accTableFragPtrI[0] = req->accTableFragPtrI[0]; + fragPtr.p->m_accTableFragPtrI[1] = req->accTableFragPtrI[1]; // add the fragment to the index indexPtr.p->m_fragId[indexPtr.p->m_numFrags] = req->fragId; indexPtr.p->m_fragPtrI[indexPtr.p->m_numFrags] = fragPtr.i; @@ -197,6 +202,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) jam(); // initialize tree header TreeHead& tree = fragPtr.p->m_tree; + new (&tree) TreeHead(); // make these configurable later tree.m_nodeSize = MAX_TTREE_NODE_SIZE; tree.m_prefSize = MAX_TTREE_PREF_SIZE; @@ -222,8 +228,8 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) break; } tree.m_minOccup = tree.m_maxOccup - maxSlack; - // root node does not exist - tree.m_root = NullTupAddr; + // root node does not exist (also set by ctor) + tree.m_root = NullTupLoc; // fragment is defined c_fragOpPool.release(fragOpPtr); } @@ -310,12 +316,6 @@ Dbtux::dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 sen unsigned i = --indexPtr.p->m_numFrags; FragPtr fragPtr; c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]); - Frag& frag = *fragPtr.p; - ndbrequire(frag.m_nodeList == RNIL); - if (frag.m_nodeFree != RNIL) { - c_nodeHandlePool.release(frag.m_nodeFree); - frag.m_nodeFree = RNIL; - } c_fragPool.release(fragPtr); // the real time break is not used for anything currently signal->theData[0] = TuxContinueB::DropIndex; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp index 6733a87da97a1c6ed0fd74314bd55812fe7889c5..6b3508d21c247476910e6da18888166cf2b7536e 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp @@ -18,175 +18,107 @@ #include "Dbtux.hpp" /* - * Node handles. - * - * We use the "cache" implementation. Node operations are done on - * cached copies. Index memory is updated at the end of the operation. - * At most one node is inserted and it is always pre-allocated. - * - * An alternative "pointer" implementation which writes directly into - * index memory is planned for later. + * Allocate index node in TUP. */ - -// Dbtux - -void -Dbtux::seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr) -{ - if (! c_nodeHandlePool.seize(nodePtr)) { - jam(); - return; - } - new (nodePtr.p) NodeHandle(*this, frag); - nodePtr.p->m_next = frag.m_nodeList; - frag.m_nodeList = nodePtr.i; - // node cache used always - nodePtr.p->m_node = (TreeNode*)nodePtr.p->m_cache; - new (nodePtr.p->m_node) TreeNode(); -#ifdef VM_TRACE - TreeHead& tree = frag.m_tree; - TreeNode* node = nodePtr.p->m_node; - memset(tree.getPref(node, 0), 0xa2, tree.m_prefSize << 2); - memset(tree.getPref(node, 1), 0xa2, tree.m_prefSize << 2); - TreeEnt* entList = tree.getEntList(node); - memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2)); -#endif -} - -void -Dbtux::preallocNode(Signal* signal, Frag& frag, Uint32& errorCode) +int +Dbtux::allocNode(Signal* signal, NodeHandle& node) { - ndbrequire(frag.m_nodeFree == RNIL); - NodeHandlePtr nodePtr; - seizeNode(signal, frag, nodePtr); - ndbrequire(nodePtr.i != RNIL); - // remove from cache XXX ugly - frag.m_nodeFree = frag.m_nodeList; - frag.m_nodeList = nodePtr.p->m_next; - StorePar storePar; - storePar.m_opCode = TupStoreTh::OpInsert; - storePar.m_offset = 0; - storePar.m_size = 0; - tupStoreTh(signal, frag, nodePtr, storePar); - if (storePar.m_errorCode != 0) { + Frag& frag = node.m_frag; + Uint32 pageId = NullTupLoc.m_pageId; + Uint32 pageOffset = NullTupLoc.m_pageOffset; + Uint32* node32 = 0; + int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); + if (errorCode == 0) { jam(); - errorCode = storePar.m_errorCode; - c_nodeHandlePool.release(nodePtr); - frag.m_nodeFree = RNIL; + node.m_loc = TupLoc(pageId, pageOffset); + node.m_node = reinterpret_cast<TreeNode*>(node32); + node.m_acc = AccNone; + ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0); } + return errorCode; } /* - * Find node in the cache. XXX too slow, use direct links instead - */ -void -Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr) -{ - NodeHandlePtr tmpPtr; - tmpPtr.i = frag.m_nodeList; - while (tmpPtr.i != RNIL) { - jam(); - c_nodeHandlePool.getPtr(tmpPtr); - if (tmpPtr.p->m_addr == addr) { - jam(); - nodePtr = tmpPtr; - return; - } - tmpPtr.i = tmpPtr.p->m_next; - } - nodePtr.i = RNIL; - nodePtr.p = 0; -} - -/* - * Get handle for existing node. + * Access more of the node. */ void -Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr, AccSize acc) +Dbtux::accessNode(Signal* signal, NodeHandle& node, AccSize acc) { - ndbrequire(addr != NullTupAddr && acc > AccNone); - NodeHandlePtr tmpPtr; - // search in cache - findNode(signal, frag, tmpPtr, addr); - if (tmpPtr.i == RNIL) { - jam(); - // add new node - seizeNode(signal, frag, tmpPtr); - ndbrequire(tmpPtr.i != RNIL); - tmpPtr.p->m_addr = addr; - } - if (tmpPtr.p->m_acc < acc) { - jam(); - accessNode(signal, frag, tmpPtr, acc); - } - nodePtr = tmpPtr; + ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0); + if (node.m_acc >= acc) + return; + // XXX could do prefetch + node.m_acc = acc; } /* - * Create new node in the cache and mark it for insert. + * Set handle to point to existing node. */ void -Dbtux::insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc) +Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc) { - ndbrequire(acc > AccNone); - NodeHandlePtr tmpPtr; - // use the pre-allocated node - tmpPtr.i = frag.m_nodeFree; - frag.m_nodeFree = RNIL; - c_nodeHandlePool.getPtr(tmpPtr); - // move it to the cache - tmpPtr.p->m_next = frag.m_nodeList; - frag.m_nodeList = tmpPtr.i; - tmpPtr.p->m_acc = acc; - tmpPtr.p->m_flags |= NodeHandle::DoInsert; - nodePtr = tmpPtr; + Frag& frag = node.m_frag; + ndbrequire(loc != NullTupLoc); + Uint32 pageId = loc.m_pageId; + Uint32 pageOffset = loc.m_pageOffset; + Uint32* node32 = 0; + c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); + node.m_loc = loc; + node.m_node = reinterpret_cast<TreeNode*>(node32); + node.m_acc = AccNone; + ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0); + accessNode(signal, node, acc); } /* - * Mark existing node for deletion. + * Set handle to point to new node. Uses the pre-allocated node. */ void -Dbtux::deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr) +Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc) { - NodeHandlePtr tmpPtr = nodePtr; - ndbrequire(tmpPtr.p->getOccup() == 0); - tmpPtr.p->m_flags |= NodeHandle::DoDelete; - // scans have already been moved by popDown or popUp + Frag& frag = node.m_frag; + TupLoc loc = frag.m_freeLoc; + frag.m_freeLoc = NullTupLoc; + selectNode(signal, node, loc, acc); + new (node.m_node) TreeNode(); +#ifdef VM_TRACE + TreeHead& tree = frag.m_tree; + memset(tree.getPref(node.m_node, 0), 0xa2, tree.m_prefSize << 2); + memset(tree.getPref(node.m_node, 1), 0xa2, tree.m_prefSize << 2); + TreeEnt* entList = tree.getEntList(node.m_node); + memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2)); +#endif } /* - * Access more of the node. + * Delete existing node. */ void -Dbtux::accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc) +Dbtux::deleteNode(Signal* signal, NodeHandle& node) { - TreeHead& tree = frag.m_tree; - NodeHandlePtr tmpPtr = nodePtr; - if (tmpPtr.p->m_acc >= acc) - return; - if (! (tmpPtr.p->m_flags & NodeHandle::DoInsert)) { - jam(); - StorePar storePar; - storePar.m_opCode = TupStoreTh::OpRead; - storePar.m_offset = tree.getSize(tmpPtr.p->m_acc); - storePar.m_size = tree.getSize(acc) - tree.getSize(tmpPtr.p->m_acc); - tmpPtr.p->m_tux.tupStoreTh(signal, frag, tmpPtr, storePar); - ndbrequire(storePar.m_errorCode == 0); - } - tmpPtr.p->m_acc = acc; + Frag& frag = node.m_frag; + ndbrequire(node.getOccup() == 0); + TupLoc loc = node.m_loc; + Uint32 pageId = loc.m_pageId; + Uint32 pageOffset = loc.m_pageOffset; + Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node); + c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); + // invalidate handle and storage + node.m_loc = NullTupLoc; + node.m_node = 0; } /* * Set prefix. */ void -Dbtux::setNodePref(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i) +Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i) { + Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; - NodeHandlePtr tmpPtr = nodePtr; ReadPar readPar; ndbrequire(i <= 1); - readPar.m_ent = tmpPtr.p->getMinMax(i); + readPar.m_ent = node.getMinMax(i); readPar.m_first = 0; readPar.m_count = frag.m_numAttrs; // leave in signal data @@ -198,61 +130,11 @@ Dbtux::setNodePref(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned copyPar.m_items = readPar.m_count; copyPar.m_headers = true; copyPar.m_maxwords = tree.m_prefSize; - Data pref = tmpPtr.p->getPref(i); + Data pref = node.getPref(i); copyAttrs(pref, readPar.m_data, copyPar); - nodePtr.p->m_flags |= NodeHandle::DoUpdate; } -/* - * Commit and release nodes at the end of an operation. Used also on - * error since no changes have been made (updateOk false). - */ -void -Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk) -{ - TreeHead& tree = frag.m_tree; - NodeHandlePtr nodePtr; - nodePtr.i = frag.m_nodeList; - frag.m_nodeList = RNIL; - while (nodePtr.i != RNIL) { - c_nodeHandlePool.getPtr(nodePtr); - const unsigned flags = nodePtr.p->m_flags; - if (flags & NodeHandle::DoDelete) { - jam(); - ndbrequire(updateOk); - // delete - StorePar storePar; - storePar.m_opCode = TupStoreTh::OpDelete; - nodePtr.p->m_tux.tupStoreTh(signal, frag, nodePtr, storePar); - ndbrequire(storePar.m_errorCode == 0); - } else if (flags & NodeHandle::DoUpdate) { - jam(); - ndbrequire(updateOk); - // set prefixes - if (flags & (1 << 0)) { - jam(); - setNodePref(signal, frag, nodePtr, 0); - } - if (flags & (1 << 1)) { - jam(); - setNodePref(signal, frag, nodePtr, 1); - } - // update - StorePar storePar; - storePar.m_opCode = TupStoreTh::OpUpdate; - storePar.m_offset = 0; - storePar.m_size = tree.getSize(nodePtr.p->m_acc); - nodePtr.p->m_tux.tupStoreTh(signal, frag, nodePtr, storePar); - ndbrequire(storePar.m_errorCode == 0); - } - // release - NodeHandlePtr tmpPtr = nodePtr; - nodePtr.i = nodePtr.p->m_next; - c_nodeHandlePool.release(tmpPtr); - } -} - -// Dbtux::NodeHandle +// node operations /* * Add entry at position. Move entries greater than or equal to the old @@ -264,25 +146,26 @@ Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk) * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 */ void -Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent) +Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent) { - TreeHead& tree = m_frag.m_tree; - const unsigned occup = getOccup(); + Frag& frag = node.m_frag; + TreeHead& tree = frag.m_tree; + const unsigned occup = node.getOccup(); ndbrequire(occup < tree.m_maxOccup && pos <= occup); // fix scans ScanOpPtr scanPtr; - scanPtr.i = getNodeScan(); + scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); - m_tux.c_scanOpPool.getPtr(scanPtr); + c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); if (scanPos.m_pos >= pos) { jam(); #ifdef VM_TRACE - if (m_tux.debugFlags & m_tux.DebugScan) { - m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; - m_tux.debugOut << "At pushUp pos=" << pos << " " << *this << endl; + if (debugFlags & DebugScan) { + debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At pushUp pos=" << pos << " " << node << endl; } #endif scanPos.m_pos++; @@ -290,7 +173,7 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent) scanPtr.i = scanPtr.p->m_nodeScan; } // fix node - TreeEnt* const entList = tree.getEntList(m_node); + TreeEnt* const entList = tree.getEntList(node.m_node); entList[occup] = entList[0]; TreeEnt* const tmpList = entList + 1; for (unsigned i = occup; i > pos; i--) { @@ -298,18 +181,18 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent) tmpList[i] = tmpList[i - 1]; } tmpList[pos] = ent; + entList[0] = entList[occup + 1]; + node.setOccup(occup + 1); + // fix prefixes if (occup == 0 || pos == 0) - m_flags |= (1 << 0); + setNodePref(signal, node, 0); if (occup == 0 || pos == occup) - m_flags |= (1 << 1); - entList[0] = entList[occup + 1]; - setOccup(occup + 1); - m_flags |= DoUpdate; + setNodePref(signal, node, 1); } /* * Remove and return entry at position. Move entries greater than the - * removed one to the left. This is the opposite of pushUp. + * removed one to the left. This is the opposite of nodePushUp. * * D * ^ ^ @@ -317,46 +200,47 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent) * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 */ void -Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent) +Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) { - TreeHead& tree = m_frag.m_tree; - const unsigned occup = getOccup(); + Frag& frag = node.m_frag; + TreeHead& tree = frag.m_tree; + const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); ScanOpPtr scanPtr; // move scans whose entry disappears - scanPtr.i = getNodeScan(); + scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); - m_tux.c_scanOpPool.getPtr(scanPtr); + c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); const Uint32 nextPtrI = scanPtr.p->m_nodeScan; if (scanPos.m_pos == pos) { jam(); #ifdef VM_TRACE - if (m_tux.debugFlags & m_tux.DebugScan) { - m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; - m_tux.debugOut << "At popDown pos=" << pos << " " << *this << endl; + if (debugFlags & DebugScan) { + debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At popDown pos=" << pos << " " << node << endl; } #endif - m_tux.scanNext(signal, scanPtr); + scanNext(signal, scanPtr); } scanPtr.i = nextPtrI; } // fix other scans - scanPtr.i = getNodeScan(); + scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); - m_tux.c_scanOpPool.getPtr(scanPtr); + c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_pos != pos); if (scanPos.m_pos > pos) { jam(); #ifdef VM_TRACE - if (m_tux.debugFlags & m_tux.DebugScan) { - m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; - m_tux.debugOut << "At popDown pos=" << pos << " " << *this << endl; + if (debugFlags & DebugScan) { + debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At popDown pos=" << pos << " " << node << endl; } #endif scanPos.m_pos--; @@ -364,7 +248,7 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent) scanPtr.i = scanPtr.p->m_nodeScan; } // fix node - TreeEnt* const entList = tree.getEntList(m_node); + TreeEnt* const entList = tree.getEntList(node.m_node); entList[occup] = entList[0]; TreeEnt* const tmpList = entList + 1; ent = tmpList[pos]; @@ -372,13 +256,13 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent) jam(); tmpList[i] = tmpList[i + 1]; } + entList[0] = entList[occup - 1]; + node.setOccup(occup - 1); + // fix prefixes if (occup != 1 && pos == 0) - m_flags |= (1 << 0); + setNodePref(signal, node, 0); if (occup != 1 && pos == occup - 1) - m_flags |= (1 << 1); - entList[0] = entList[occup - 1]; - setOccup(occup - 1); - m_flags |= DoUpdate; + setNodePref(signal, node, 1); } /* @@ -391,47 +275,48 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent) * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 */ void -Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent) +Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) { - TreeHead& tree = m_frag.m_tree; - const unsigned occup = getOccup(); + Frag& frag = node.m_frag; + TreeHead& tree = frag.m_tree; + const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); ScanOpPtr scanPtr; // move scans whose entry disappears - scanPtr.i = getNodeScan(); + scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); - m_tux.c_scanOpPool.getPtr(scanPtr); + c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); const Uint32 nextPtrI = scanPtr.p->m_nodeScan; if (scanPos.m_pos == 0) { jam(); #ifdef VM_TRACE - if (m_tux.debugFlags & m_tux.DebugScan) { - m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; - m_tux.debugOut << "At pushDown pos=" << pos << " " << *this << endl; + if (debugFlags & DebugScan) { + debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At pushDown pos=" << pos << " " << node << endl; } #endif // here we may miss a valid entry "X" XXX known bug - m_tux.scanNext(signal, scanPtr); + scanNext(signal, scanPtr); } scanPtr.i = nextPtrI; } // fix other scans - scanPtr.i = getNodeScan(); + scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); - m_tux.c_scanOpPool.getPtr(scanPtr); + c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_pos != 0); if (scanPos.m_pos <= pos) { jam(); #ifdef VM_TRACE - if (m_tux.debugFlags & m_tux.DebugScan) { - m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; - m_tux.debugOut << "At pushDown pos=" << pos << " " << *this << endl; + if (debugFlags & DebugScan) { + debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At pushDown pos=" << pos << " " << node << endl; } #endif scanPos.m_pos--; @@ -439,7 +324,7 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent) scanPtr.i = scanPtr.p->m_nodeScan; } // fix node - TreeEnt* const entList = tree.getEntList(m_node); + TreeEnt* const entList = tree.getEntList(node.m_node); entList[occup] = entList[0]; TreeEnt* const tmpList = entList + 1; TreeEnt oldMin = tmpList[0]; @@ -449,18 +334,18 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent) } tmpList[pos] = ent; ent = oldMin; + entList[0] = entList[occup]; + // fix prefixes if (true) - m_flags |= (1 << 0); + setNodePref(signal, node, 0); if (occup == 1 || pos == occup - 1) - m_flags |= (1 << 1); - entList[0] = entList[occup]; - m_flags |= DoUpdate; + setNodePref(signal, node, 1); } /* * Remove and return entry at position. Move entries less than the * removed one to the right. Replace min entry by the input entry. - * This is the opposite of pushDown. + * This is the opposite of nodePushDown. * * X D * v ^ ^ @@ -468,47 +353,48 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent) * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 */ void -Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent) +Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) { - TreeHead& tree = m_frag.m_tree; - const unsigned occup = getOccup(); + Frag& frag = node.m_frag; + TreeHead& tree = frag.m_tree; + const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); ScanOpPtr scanPtr; // move scans whose entry disappears - scanPtr.i = getNodeScan(); + scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); - m_tux.c_scanOpPool.getPtr(scanPtr); + c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); const Uint32 nextPtrI = scanPtr.p->m_nodeScan; if (scanPos.m_pos == pos) { jam(); #ifdef VM_TRACE - if (m_tux.debugFlags & m_tux.DebugScan) { - m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; - m_tux.debugOut << "At popUp pos=" << pos << " " << *this << endl; + if (debugFlags & DebugScan) { + debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At popUp pos=" << pos << " " << node << endl; } #endif // here we may miss a valid entry "X" XXX known bug - m_tux.scanNext(signal, scanPtr); + scanNext(signal, scanPtr); } scanPtr.i = nextPtrI; } // fix other scans - scanPtr.i = getNodeScan(); + scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); - m_tux.c_scanOpPool.getPtr(scanPtr); + c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_pos != pos); if (scanPos.m_pos < pos) { jam(); #ifdef VM_TRACE - if (m_tux.debugFlags & m_tux.DebugScan) { - m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; - m_tux.debugOut << "At popUp pos=" << pos << " " << *this << endl; + if (debugFlags & DebugScan) { + debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At popUp pos=" << pos << " " << node << endl; } #endif scanPos.m_pos++; @@ -516,7 +402,7 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent) scanPtr.i = scanPtr.p->m_nodeScan; } // fix node - TreeEnt* const entList = tree.getEntList(m_node); + TreeEnt* const entList = tree.getEntList(node.m_node); entList[occup] = entList[0]; TreeEnt* const tmpList = entList + 1; TreeEnt newMin = ent; @@ -526,12 +412,12 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent) tmpList[i] = tmpList[i - 1]; } tmpList[0] = newMin; + entList[0] = entList[occup]; + // fix prefixes if (true) - m_flags |= (1 << 0); + setNodePref(signal, node, 0); if (occup == 1 || pos == occup - 1) - m_flags |= (1 << 1); - entList[0] = entList[occup]; - m_flags |= DoUpdate; + setNodePref(signal, node, 1); } /* @@ -539,14 +425,15 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent) * after the max (i=1). XXX can be optimized */ void -Dbtux::NodeHandle::slide(Signal* signal, NodeHandlePtr nodePtr, unsigned i) +Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i) { + Frag& frag = dstNode.m_frag; + TreeHead& tree = frag.m_tree; ndbrequire(i <= 1); - TreeHead& tree = m_frag.m_tree; - while (getOccup() < tree.m_maxOccup && nodePtr.p->getOccup() != 0) { + while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) { TreeEnt ent; - nodePtr.p->popDown(signal, i == 0 ? nodePtr.p->getOccup() - 1 : 0, ent); - pushUp(signal, i == 0 ? 0 : getOccup(), ent); + nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent); + nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent); } } @@ -555,50 +442,50 @@ Dbtux::NodeHandle::slide(Signal* signal, NodeHandlePtr nodePtr, unsigned i) * ordering does not matter. */ void -Dbtux::NodeHandle::linkScan(Dbtux::ScanOpPtr scanPtr) +Dbtux::linkScan(NodeHandle& node, ScanOpPtr scanPtr) { #ifdef VM_TRACE - if (m_tux.debugFlags & m_tux.DebugScan) { - m_tux.debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl; - m_tux.debugOut << "To node " << *this << endl; + if (debugFlags & DebugScan) { + debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "To node " << node << endl; } #endif - ndbrequire(! islinkScan(scanPtr) && scanPtr.p->m_nodeScan == RNIL); - scanPtr.p->m_nodeScan = getNodeScan(); - setNodeScan(scanPtr.i); + ndbrequire(! islinkScan(node, scanPtr) && scanPtr.p->m_nodeScan == RNIL); + scanPtr.p->m_nodeScan = node.getNodeScan(); + node.setNodeScan(scanPtr.i); } /* * Unlink a scan from the list under the node. */ void -Dbtux::NodeHandle::unlinkScan(Dbtux::ScanOpPtr scanPtr) +Dbtux::unlinkScan(NodeHandle& node, ScanOpPtr scanPtr) { #ifdef VM_TRACE - if (m_tux.debugFlags & m_tux.DebugScan) { - m_tux.debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl; - m_tux.debugOut << "From node " << *this << endl; + if (debugFlags & DebugScan) { + debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "From node " << node << endl; } #endif - Dbtux::ScanOpPtr currPtr; - currPtr.i = getNodeScan(); - Dbtux::ScanOpPtr prevPtr; + ScanOpPtr currPtr; + currPtr.i = node.getNodeScan(); + ScanOpPtr prevPtr; prevPtr.i = RNIL; while (true) { jam(); - m_tux.c_scanOpPool.getPtr(currPtr); + c_scanOpPool.getPtr(currPtr); Uint32 nextPtrI = currPtr.p->m_nodeScan; if (currPtr.i == scanPtr.i) { jam(); if (prevPtr.i == RNIL) { - setNodeScan(nextPtrI); + node.setNodeScan(nextPtrI); } else { jam(); prevPtr.p->m_nodeScan = nextPtrI; } scanPtr.p->m_nodeScan = RNIL; // check for duplicates - ndbrequire(! islinkScan(scanPtr)); + ndbrequire(! islinkScan(node, scanPtr)); return; } prevPtr = currPtr; @@ -610,13 +497,13 @@ Dbtux::NodeHandle::unlinkScan(Dbtux::ScanOpPtr scanPtr) * Check if a scan is linked to this node. Only for ndbrequire. */ bool -Dbtux::NodeHandle::islinkScan(Dbtux::ScanOpPtr scanPtr) +Dbtux::islinkScan(NodeHandle& node, ScanOpPtr scanPtr) { - Dbtux::ScanOpPtr currPtr; - currPtr.i = getNodeScan(); + ScanOpPtr currPtr; + currPtr.i = node.getNodeScan(); while (currPtr.i != RNIL) { jam(); - m_tux.c_scanOpPool.getPtr(currPtr); + c_scanOpPool.getPtr(currPtr); if (currPtr.i == scanPtr.i) { jam(); return true; @@ -627,7 +514,7 @@ Dbtux::NodeHandle::islinkScan(Dbtux::ScanOpPtr scanPtr) } void -Dbtux::NodeHandle::progError(int line, int cause, const char* extra) +Dbtux::NodeHandle::progError(int line, int cause, const char* file) { - m_tux.progError(line, cause, extra); + ErrorReporter::handleAssert("Dbtux::NodeHandle: assert failed", file, line); } diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index eaa539d9cfcaa3469c8e49ff0e58032d1ab02f49..7baea224c9333e697581fc0d5a3be6de5926085b 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -42,12 +42,11 @@ Dbtux::execACC_SCANREQ(Signal* signal) } ndbrequire(fragPtr.i != RNIL); Frag& frag = *fragPtr.p; - ndbrequire(frag.m_nodeList == RNIL); // must be normal DIH/TC fragment ndbrequire(frag.m_fragId < (1 << frag.m_fragOff)); TreeHead& tree = frag.m_tree; // check for empty fragment - if (tree.m_root == NullTupAddr) { + if (tree.m_root == NullTupLoc) { jam(); AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend(); conf->scanPtr = req->senderData; @@ -241,7 +240,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl; } #endif - ndbrequire(frag.m_nodeList == RNIL); // handle unlock previous and close scan switch (req->scanFlag) { case NextScanReq::ZSCAN_NEXT: @@ -275,13 +273,13 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) case NextScanReq::ZSCAN_CLOSE: jam(); // unlink from tree node first to avoid state changes - if (scan.m_scanPos.m_addr != NullTupAddr) { + if (scan.m_scanPos.m_loc != NullTupLoc) { jam(); - const TupAddr addr = scan.m_scanPos.m_addr; - NodeHandlePtr nodePtr; - selectNode(signal, frag, nodePtr, addr, AccHead); - nodePtr.p->unlinkScan(scanPtr); - scan.m_scanPos.m_addr = NullTupAddr; + const TupLoc loc = scan.m_scanPos.m_loc; + NodeHandle node(frag); + selectNode(signal, node, loc, AccHead); + unlinkScan(node, scanPtr); + scan.m_scanPos.m_loc = NullTupLoc; } if (scan.m_lockwait) { jam(); @@ -295,7 +293,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_state = ScanOp::Aborting; - commitNodes(signal, frag, true); return; } if (scan.m_state == ScanOp::Locked) { @@ -350,7 +347,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); - commitNodes(signal, frag, true); return; // stop } if (scan.m_lockwait) { @@ -365,7 +361,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) // if TC has ordered scan close, it will be detected here sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); - commitNodes(signal, frag, true); return; // stop } if (scan.m_state == ScanOp::First) { @@ -407,8 +402,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) lockReq->userRef = reference(); lockReq->tableId = scan.m_tableId; lockReq->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff); - // should cache this at fragment create - lockReq->fragPtrI = RNIL; + lockReq->fragPtrI = frag.m_accTableFragPtrI[ent.m_fragBit]; const Uint32* const buf32 = static_cast<Uint32*>(keyPar.m_data); const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32); lockReq->hashValue = md5_hash(buf64, keyPar.m_size); @@ -445,7 +439,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); - commitNodes(signal, frag, true); return; // stop break; case AccLockReq::Refused: @@ -458,7 +451,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); - commitNodes(signal, frag, true); return; // stop break; case AccLockReq::NoFreeOp: @@ -471,7 +463,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); - commitNodes(signal, frag, true); return; // stop break; default: @@ -555,7 +546,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) scan.m_lastEnt = ent; // next time look for next entry scan.m_state = ScanOp::Next; - commitNodes(signal, frag, true); return; } // XXX in ACC this is checked before req->checkLcpStop @@ -569,7 +559,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) unsigned signalLength = 3; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); - commitNodes(signal, frag, true); return; } ndbrequire(false); @@ -700,15 +689,15 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); TreeHead& tree = frag.m_tree; - if (tree.m_root == NullTupAddr) { + if (tree.m_root == NullTupLoc) { // tree may have become empty jam(); scan.m_state = ScanOp::Last; return; } TreePos pos; - pos.m_addr = tree.m_root; - NodeHandlePtr nodePtr; + pos.m_loc = tree.m_root; + NodeHandle node(frag); // unpack lower bound const ScanBound& bound = *scan.m_bound[0]; ScanBoundIterator iter; @@ -725,20 +714,20 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) boundPar.m_dir = 0; loop: { jam(); - selectNode(signal, frag, nodePtr, pos.m_addr, AccPref); - const unsigned occup = nodePtr.p->getOccup(); + selectNode(signal, node, pos.m_loc, AccPref); + const unsigned occup = node.getOccup(); ndbrequire(occup != 0); for (unsigned i = 0; i <= 1; i++) { jam(); // compare prefix - boundPar.m_data2 = nodePtr.p->getPref(i); + boundPar.m_data2 = node.getPref(i); boundPar.m_len2 = tree.m_prefSize; int ret = cmpScanBound(frag, boundPar); if (ret == NdbSqlUtil::CmpUnknown) { jam(); // read full value ReadPar readPar; - readPar.m_ent = nodePtr.p->getMinMax(i); + readPar.m_ent = node.getMinMax(i); readPar.m_first = 0; readPar.m_count = frag.m_numAttrs; readPar.m_data = 0; // leave in signal data @@ -751,11 +740,11 @@ loop: { } if (i == 0 && ret < 0) { jam(); - const TupAddr tupAddr = nodePtr.p->getLink(i); - if (tupAddr != NullTupAddr) { + const TupLoc loc = node.getLink(i); + if (loc != NullTupLoc) { jam(); // continue to left subtree - pos.m_addr = tupAddr; + pos.m_loc = loc; goto loop; } // start scanning this node @@ -764,34 +753,34 @@ loop: { pos.m_dir = 3; scan.m_scanPos = pos; scan.m_state = ScanOp::Next; - nodePtr.p->linkScan(scanPtr); + linkScan(node, scanPtr); return; } if (i == 1 && ret > 0) { jam(); - const TupAddr tupAddr = nodePtr.p->getLink(i); - if (tupAddr != NullTupAddr) { + const TupLoc loc = node.getLink(i); + if (loc != NullTupLoc) { jam(); // continue to right subtree - pos.m_addr = tupAddr; + pos.m_loc = loc; goto loop; } // start scanning upwards pos.m_dir = 1; scan.m_scanPos = pos; scan.m_state = ScanOp::Next; - nodePtr.p->linkScan(scanPtr); + linkScan(node, scanPtr); return; } } // read rest of current node - accessNode(signal, frag, nodePtr, AccFull); + accessNode(signal, node, AccFull); // look for first entry ndbrequire(occup >= 2); for (unsigned j = 1; j < occup; j++) { jam(); ReadPar readPar; - readPar.m_ent = nodePtr.p->getEnt(j); + readPar.m_ent = node.getEnt(j); readPar.m_first = 0; readPar.m_count = frag.m_numAttrs; readPar.m_data = 0; // leave in signal data @@ -809,7 +798,7 @@ loop: { pos.m_dir = 3; scan.m_scanPos = pos; scan.m_state = ScanOp::Next; - nodePtr.p->linkScan(scanPtr); + linkScan(node, scanPtr); return; } } @@ -869,31 +858,31 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) // use copy of position TreePos pos = scan.m_scanPos; // get and remember original node - NodeHandlePtr origNodePtr; - selectNode(signal, frag, origNodePtr, pos.m_addr, AccHead); - ndbrequire(origNodePtr.p->islinkScan(scanPtr)); + NodeHandle origNode(frag); + selectNode(signal, origNode, pos.m_loc, AccHead); + ndbrequire(islinkScan(origNode, scanPtr)); // current node in loop - NodeHandlePtr nodePtr = origNodePtr; + NodeHandle node = origNode; while (true) { jam(); if (pos.m_dir == 2) { // coming up from root ends the scan jam(); - pos.m_addr = NullTupAddr; + pos.m_loc = NullTupLoc; scan.m_state = ScanOp::Last; break; } - if (nodePtr.p->m_addr != pos.m_addr) { + if (node.m_loc != pos.m_loc) { jam(); - selectNode(signal, frag, nodePtr, pos.m_addr, AccHead); + selectNode(signal, node, pos.m_loc, AccHead); } if (pos.m_dir == 4) { // coming down from parent proceed to left child jam(); - TupAddr addr = nodePtr.p->getLink(0); - if (addr != NullTupAddr) { + TupLoc loc = node.getLink(0); + if (loc != NullTupLoc) { jam(); - pos.m_addr = addr; + pos.m_loc = loc; pos.m_dir = 4; // unchanged continue; } @@ -910,10 +899,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) if (pos.m_dir == 3) { // within node jam(); - unsigned occup = nodePtr.p->getOccup(); + unsigned occup = node.getOccup(); ndbrequire(occup >= 1); // access full node - accessNode(signal, frag, nodePtr, AccFull); + accessNode(signal, node, AccFull); // advance position if (! pos.m_match) pos.m_match = true; @@ -921,7 +910,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) pos.m_pos++; if (pos.m_pos < occup) { jam(); - pos.m_ent = nodePtr.p->getEnt(pos.m_pos); + pos.m_ent = node.getEnt(pos.m_pos); pos.m_dir = 3; // unchanged // XXX implement prefix optimization ReadPar readPar; @@ -938,7 +927,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) if (ret < 0) { jam(); // hit upper bound of single range scan - pos.m_addr = NullTupAddr; + pos.m_loc = NullTupLoc; scan.m_state = ScanOp::Last; break; } @@ -952,10 +941,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) break; } // after node proceed to right child - TupAddr addr = nodePtr.p->getLink(1); - if (addr != NullTupAddr) { + TupLoc loc = node.getLink(1); + if (loc != NullTupLoc) { jam(); - pos.m_addr = addr; + pos.m_loc = loc; pos.m_dir = 4; continue; } @@ -965,8 +954,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) if (pos.m_dir == 1) { // coming from right child proceed to parent jam(); - pos.m_addr = nodePtr.p->getLink(2); - pos.m_dir = nodePtr.p->getSide(); + pos.m_loc = node.getLink(2); + pos.m_dir = node.getSide(); continue; } ndbrequire(false); @@ -975,16 +964,16 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) scan.m_scanPos = pos; // relink if (scan.m_state == ScanOp::Current) { - ndbrequire(pos.m_addr == nodePtr.p->m_addr); - if (origNodePtr.i != nodePtr.i) { + ndbrequire(pos.m_loc == node.m_loc); + if (origNode.m_loc != node.m_loc) { jam(); - origNodePtr.p->unlinkScan(scanPtr); - nodePtr.p->linkScan(scanPtr); + unlinkScan(origNode, scanPtr); + linkScan(node, scanPtr); } } else if (scan.m_state == ScanOp::Last) { jam(); - ndbrequire(pos.m_addr == NullTupAddr); - origNodePtr.p->unlinkScan(scanPtr); + ndbrequire(pos.m_loc == NullTupLoc); + unlinkScan(origNode, scanPtr); } else { ndbrequire(false); } @@ -1044,7 +1033,6 @@ void Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) { ScanOp& scan = *scanPtr.p; - Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI); ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL); // unlock all not unlocked by LQH for (unsigned i = 0; i < MaxAccLockOps; i++) { @@ -1069,7 +1057,6 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); releaseScanOp(scanPtr); - commitNodes(signal, frag, true); } void diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp index 860aa65414ffa2417737f9b7b30182a30a1232eb..ede828b5fc351eb75939175788f68f33c1adeb36 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp @@ -30,19 +30,19 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree { const TreeHead& tree = frag.m_tree; const unsigned numAttrs = frag.m_numAttrs; - treePos.m_addr = tree.m_root; - NodeHandlePtr nodePtr; - if (treePos.m_addr == NullTupAddr) { + treePos.m_loc = tree.m_root; + if (treePos.m_loc == NullTupLoc) { // empty tree jam(); treePos.m_pos = 0; treePos.m_match = false; return; } + NodeHandle node(frag); loop: { jam(); - selectNode(signal, frag, nodePtr, treePos.m_addr, AccPref); - const unsigned occup = nodePtr.p->getOccup(); + selectNode(signal, node, treePos.m_loc, AccPref); + const unsigned occup = node.getOccup(); ndbrequire(occup != 0); // number of equal initial attributes in bounding node unsigned numEq = ZNIL; @@ -51,7 +51,7 @@ loop: { // compare prefix CmpPar cmpPar; cmpPar.m_data1 = searchPar.m_data; - cmpPar.m_data2 = nodePtr.p->getPref(i); + cmpPar.m_data2 = node.getPref(i); cmpPar.m_len2 = tree.m_prefSize; cmpPar.m_first = 0; cmpPar.m_numEq = 0; @@ -60,7 +60,7 @@ loop: { jam(); // read full value ReadPar readPar; - readPar.m_ent = nodePtr.p->getMinMax(i); + readPar.m_ent = node.getMinMax(i); ndbrequire(cmpPar.m_numEq < numAttrs); readPar.m_first = cmpPar.m_numEq; readPar.m_count = numAttrs - cmpPar.m_numEq; @@ -78,19 +78,18 @@ loop: { if (ret == 0) { jam(); // keys are equal, compare entry values - ret = searchPar.m_ent.cmp(nodePtr.p->getMinMax(i)); + ret = searchPar.m_ent.cmp(node.getMinMax(i)); } if (i == 0 ? (ret < 0) : (ret > 0)) { jam(); - const TupAddr tupAddr = nodePtr.p->getLink(i); - if (tupAddr != NullTupAddr) { + const TupLoc loc = node.getLink(i); + if (loc != NullTupLoc) { jam(); // continue to left/right subtree - treePos.m_addr = tupAddr; + treePos.m_loc = loc; goto loop; } // position is immediately before/after this node - // XXX disallow second case treePos.m_pos = (i == 0 ? 0 : occup); treePos.m_match = false; return; @@ -103,8 +102,8 @@ loop: { return; } } - // read rest of the bounding node - accessNode(signal, frag, nodePtr, AccFull); + // access rest of the bounding node + accessNode(signal, node, AccFull); // position is strictly within the node ndbrequire(occup >= 2); const unsigned numWithin = occup - 2; @@ -115,7 +114,7 @@ loop: { if (numEq < numAttrs) { jam(); ReadPar readPar; - readPar.m_ent = nodePtr.p->getEnt(j); + readPar.m_ent = node.getEnt(j); readPar.m_first = numEq; readPar.m_count = numAttrs - numEq; readPar.m_data = 0; // leave in signal data @@ -132,7 +131,7 @@ loop: { if (ret == 0) { jam(); // keys are equal, compare entry values - ret = searchPar.m_ent.cmp(nodePtr.p->getEnt(j)); + ret = searchPar.m_ent.cmp(node.getEnt(j)); } if (ret <= 0) { jam(); @@ -157,94 +156,94 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) { TreeHead& tree = frag.m_tree; unsigned pos = treePos.m_pos; - NodeHandlePtr nodePtr; + NodeHandle node(frag); // check for empty tree - if (treePos.m_addr == NullTupAddr) { + if (treePos.m_loc == NullTupLoc) { jam(); - insertNode(signal, frag, nodePtr, AccPref); - nodePtr.p->pushUp(signal, 0, ent); - nodePtr.p->setSide(2); - tree.m_root = nodePtr.p->m_addr; + insertNode(signal, node, AccPref); + nodePushUp(signal, node, 0, ent); + node.setSide(2); + tree.m_root = node.m_loc; return; } // access full node - selectNode(signal, frag, nodePtr, treePos.m_addr, AccFull); + selectNode(signal, node, treePos.m_loc, AccFull); // check if it is bounding node - if (pos != 0 && pos != nodePtr.p->getOccup()) { + if (pos != 0 && pos != node.getOccup()) { jam(); // check if room for one more - if (nodePtr.p->getOccup() < tree.m_maxOccup) { + if (node.getOccup() < tree.m_maxOccup) { jam(); - nodePtr.p->pushUp(signal, pos, ent); + nodePushUp(signal, node, pos, ent); return; } // returns min entry - nodePtr.p->pushDown(signal, pos - 1, ent); + nodePushDown(signal, node, pos - 1, ent); // find position to add the removed min entry - TupAddr childAddr = nodePtr.p->getLink(0); - if (childAddr == NullTupAddr) { + TupLoc childLoc = node.getLink(0); + if (childLoc == NullTupLoc) { jam(); // left child will be added pos = 0; } else { jam(); // find glb node - while (childAddr != NullTupAddr) { + while (childLoc != NullTupLoc) { jam(); - selectNode(signal, frag, nodePtr, childAddr, AccHead); - childAddr = nodePtr.p->getLink(1); + selectNode(signal, node, childLoc, AccHead); + childLoc = node.getLink(1); } // access full node again - accessNode(signal, frag, nodePtr, AccFull); - pos = nodePtr.p->getOccup(); + accessNode(signal, node, AccFull); + pos = node.getOccup(); } // fall thru to next case } // adding new min or max unsigned i = (pos == 0 ? 0 : 1); - ndbrequire(nodePtr.p->getLink(i) == NullTupAddr); + ndbrequire(node.getLink(i) == NullTupLoc); // check if the half-leaf/leaf has room for one more - if (nodePtr.p->getOccup() < tree.m_maxOccup) { + if (node.getOccup() < tree.m_maxOccup) { jam(); - nodePtr.p->pushUp(signal, pos, ent); + nodePushUp(signal, node, pos, ent); return; } // add a new node - NodeHandlePtr childPtr; - insertNode(signal, frag, childPtr, AccPref); - childPtr.p->pushUp(signal, 0, ent); + NodeHandle childNode(frag); + insertNode(signal, childNode, AccPref); + nodePushUp(signal, childNode, 0, ent); // connect parent and child - nodePtr.p->setLink(i, childPtr.p->m_addr); - childPtr.p->setLink(2, nodePtr.p->m_addr); - childPtr.p->setSide(i); + node.setLink(i, childNode.m_loc); + childNode.setLink(2, node.m_loc); + childNode.setSide(i); // re-balance tree at each node while (true) { // height of subtree i has increased by 1 int j = (i == 0 ? -1 : +1); - int b = nodePtr.p->getBalance(); + int b = node.getBalance(); if (b == 0) { // perfectly balanced jam(); - nodePtr.p->setBalance(j); + node.setBalance(j); // height change propagates up } else if (b == -j) { // height of shorter subtree increased jam(); - nodePtr.p->setBalance(0); + node.setBalance(0); // height of tree did not change - done break; } else if (b == j) { // height of longer subtree increased jam(); - NodeHandlePtr childPtr; - selectNode(signal, frag, childPtr, nodePtr.p->getLink(i), AccHead); - int b2 = childPtr.p->getBalance(); + NodeHandle childNode(frag); + selectNode(signal, childNode, node.getLink(i), AccHead); + int b2 = childNode.getBalance(); if (b2 == b) { jam(); - treeRotateSingle(signal, frag, nodePtr, i); + treeRotateSingle(signal, frag, node, i); } else if (b2 == -b) { jam(); - treeRotateDouble(signal, frag, nodePtr, i); + treeRotateDouble(signal, frag, node, i); } else { // height of subtree increased so it cannot be perfectly balanced ndbrequire(false); @@ -254,14 +253,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) } else { ndbrequire(false); } - TupAddr parentAddr = nodePtr.p->getLink(2); - if (parentAddr == NullTupAddr) { + TupLoc parentLoc = node.getLink(2); + if (parentLoc == NullTupLoc) { jam(); // root node - done break; } - i = nodePtr.p->getSide(); - selectNode(signal, frag, nodePtr, parentAddr, AccHead); + i = node.getSide(); + selectNode(signal, node, parentLoc, AccHead); } } @@ -273,101 +272,101 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) { TreeHead& tree = frag.m_tree; unsigned pos = treePos.m_pos; - NodeHandlePtr nodePtr; + NodeHandle node(frag); // access full node - selectNode(signal, frag, nodePtr, treePos.m_addr, AccFull); + selectNode(signal, node, treePos.m_loc, AccFull); TreeEnt ent; // check interior node first - if (nodePtr.p->getChilds() == 2) { + if (node.getChilds() == 2) { jam(); - ndbrequire(nodePtr.p->getOccup() >= tree.m_minOccup); + ndbrequire(node.getOccup() >= tree.m_minOccup); // check if no underflow - if (nodePtr.p->getOccup() > tree.m_minOccup) { + if (node.getOccup() > tree.m_minOccup) { jam(); - nodePtr.p->popDown(signal, pos, ent); + nodePopDown(signal, node, pos, ent); return; } // save current handle - NodeHandlePtr parentPtr = nodePtr; + NodeHandle parentNode = node; // find glb node - TupAddr childAddr = nodePtr.p->getLink(0); - while (childAddr != NullTupAddr) { + TupLoc childLoc = node.getLink(0); + while (childLoc != NullTupLoc) { jam(); - selectNode(signal, frag, nodePtr, childAddr, AccHead); - childAddr = nodePtr.p->getLink(1); + selectNode(signal, node, childLoc, AccHead); + childLoc = node.getLink(1); } // access full node again - accessNode(signal, frag, nodePtr, AccFull); + accessNode(signal, node, AccFull); // use glb max as new parent min - ent = nodePtr.p->getEnt(nodePtr.p->getOccup() - 1); - parentPtr.p->popUp(signal, pos, ent); + ent = node.getEnt(node.getOccup() - 1); + nodePopUp(signal, parentNode, pos, ent); // set up to remove glb max - pos = nodePtr.p->getOccup() - 1; + pos = node.getOccup() - 1; // fall thru to next case } // remove the element - nodePtr.p->popDown(signal, pos, ent); - ndbrequire(nodePtr.p->getChilds() <= 1); + nodePopDown(signal, node, pos, ent); + ndbrequire(node.getChilds() <= 1); // handle half-leaf for (unsigned i = 0; i <= 1; i++) { jam(); - TupAddr childAddr = nodePtr.p->getLink(i); - if (childAddr != NullTupAddr) { + TupLoc childLoc = node.getLink(i); + if (childLoc != NullTupLoc) { // move to child - selectNode(signal, frag, nodePtr, childAddr, AccFull); + selectNode(signal, node, childLoc, AccFull); // balance of half-leaf parent requires child to be leaf break; } } - ndbrequire(nodePtr.p->getChilds() == 0); + ndbrequire(node.getChilds() == 0); // get parent if any - TupAddr parentAddr = nodePtr.p->getLink(2); - NodeHandlePtr parentPtr; - unsigned i = nodePtr.p->getSide(); + TupLoc parentLoc = node.getLink(2); + NodeHandle parentNode(frag); + unsigned i = node.getSide(); // move all that fits into parent - if (parentAddr != NullTupAddr) { + if (parentLoc != NullTupLoc) { jam(); - selectNode(signal, frag, parentPtr, nodePtr.p->getLink(2), AccFull); - parentPtr.p->slide(signal, nodePtr, i); + selectNode(signal, parentNode, node.getLink(2), AccFull); + nodeSlide(signal, parentNode, node, i); // fall thru to next case } // non-empty leaf - if (nodePtr.p->getOccup() >= 1) { + if (node.getOccup() >= 1) { jam(); return; } // remove empty leaf - deleteNode(signal, frag, nodePtr); - if (parentAddr == NullTupAddr) { + deleteNode(signal, node); + if (parentLoc == NullTupLoc) { jam(); // tree is now empty - tree.m_root = NullTupAddr; + tree.m_root = NullTupLoc; return; } - nodePtr = parentPtr; - nodePtr.p->setLink(i, NullTupAddr); + node = parentNode; + node.setLink(i, NullTupLoc); #ifdef dbtux_min_occup_less_max_occup // check if we created a half-leaf - if (nodePtr.p->getBalance() == 0) { + if (node.getBalance() == 0) { jam(); // move entries from the other child - TupAddr childAddr = nodePtr.p->getLink(1 - i); - NodeHandlePtr childPtr; - selectNode(signal, frag, childPtr, childAddr, AccFull); - nodePtr.p->slide(signal, childPtr, 1 - i); - if (childPtr.p->getOccup() == 0) { + TupLoc childLoc = node.getLink(1 - i); + NodeHandle childNode(frag); + selectNode(signal, childNode, childLoc, AccFull); + nodeSlide(signal, node, childNode, 1 - i); + if (childNode.getOccup() == 0) { jam(); - deleteNode(signal, frag, childPtr); - nodePtr.p->setLink(1 - i, NullTupAddr); + deleteNode(signal, childNode); + node.setLink(1 - i, NullTupLoc); // we are balanced again but our parent balance changes by -1 - parentAddr = nodePtr.p->getLink(2); - if (parentAddr == NullTupAddr) { + parentLoc = node.getLink(2); + if (parentLoc == NullTupLoc) { jam(); return; } // fix side and become parent - i = nodePtr.p->getSide(); - selectNode(signal, frag, nodePtr, parentAddr, AccHead); + i = node.getSide(); + selectNode(signal, node, parentLoc, AccHead); } } #endif @@ -375,50 +374,50 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) while (true) { // height of subtree i has decreased by 1 int j = (i == 0 ? -1 : +1); - int b = nodePtr.p->getBalance(); + int b = node.getBalance(); if (b == 0) { // perfectly balanced jam(); - nodePtr.p->setBalance(-j); + node.setBalance(-j); // height of tree did not change - done return; } else if (b == j) { // height of longer subtree has decreased jam(); - nodePtr.p->setBalance(0); + node.setBalance(0); // height change propagates up } else if (b == -j) { // height of shorter subtree has decreased jam(); - NodeHandlePtr childPtr; // child on the other side - selectNode(signal, frag, childPtr, nodePtr.p->getLink(1 - i), AccHead); - int b2 = childPtr.p->getBalance(); + NodeHandle childNode(frag); + selectNode(signal, childNode, node.getLink(1 - i), AccHead); + int b2 = childNode.getBalance(); if (b2 == b) { jam(); - treeRotateSingle(signal, frag, nodePtr, 1 - i); + treeRotateSingle(signal, frag, node, 1 - i); // height of tree decreased and propagates up } else if (b2 == -b) { jam(); - treeRotateDouble(signal, frag, nodePtr, 1 - i); + treeRotateDouble(signal, frag, node, 1 - i); // height of tree decreased and propagates up } else { jam(); - treeRotateSingle(signal, frag, nodePtr, 1 - i); + treeRotateSingle(signal, frag, node, 1 - i); // height of tree did not change - done return; } } else { ndbrequire(false); } - TupAddr parentAddr = nodePtr.p->getLink(2); - if (parentAddr == NullTupAddr) { + TupLoc parentLoc = node.getLink(2); + if (parentLoc == NullTupLoc) { jam(); // root node - done return; } - i = nodePtr.p->getSide(); - selectNode(signal, frag, nodePtr, parentAddr, AccHead); + i = node.getSide(); + selectNode(signal, node, parentLoc, AccHead); } } @@ -441,55 +440,55 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) void Dbtux::treeRotateSingle(Signal* signal, Frag& frag, - NodeHandlePtr& nodePtr, + NodeHandle& node, unsigned i) { ndbrequire(i <= 1); /* 5 is the old top node that have been unbalanced due to an insert or delete. The balance is still the old balance before the update. - Verify that n5Bal is 1 if RR rotate and -1 if LL rotate. + Verify that bal5 is 1 if RR rotate and -1 if LL rotate. */ - NodeHandlePtr n5Ptr = nodePtr; - const TupAddr n5Addr = n5Ptr.p->m_addr; - const int n5Bal = n5Ptr.p->getBalance(); - const int n5side = n5Ptr.p->getSide(); - ndbrequire(n5Bal + (1 - i) == i); + NodeHandle node5 = node; + const TupLoc loc5 = node5.m_loc; + const int bal5 = node5.getBalance(); + const int side5 = node5.getSide(); + ndbrequire(bal5 + (1 - i) == i); /* 3 is the new root of this part of the tree which is to swap place with node 5. For an insert to cause this it must have the same balance as 5. For deletes it can have the balance 0. */ - TupAddr n3Addr = n5Ptr.p->getLink(i); - NodeHandlePtr n3Ptr; - selectNode(signal, frag, n3Ptr, n3Addr, AccHead); - const int n3Bal = n3Ptr.p->getBalance(); + TupLoc loc3 = node5.getLink(i); + NodeHandle node3(frag); + selectNode(signal, node3, loc3, AccHead); + const int bal3 = node3.getBalance(); /* 2 must always be there but is not changed. Thus we mereley check that it exists. */ - ndbrequire(n3Ptr.p->getLink(i) != NullTupAddr); + ndbrequire(node3.getLink(i) != NullTupLoc); /* 4 is not necessarily there but if it is there it will move from one side of 3 to the other side of 5. For LL it moves from the right side to the left side and for RR it moves from the left side to the right side. This means that it also changes parent from 3 to 5. */ - TupAddr n4Addr = n3Ptr.p->getLink(1 - i); - NodeHandlePtr n4Ptr; - if (n4Addr != NullTupAddr) { + TupLoc loc4 = node3.getLink(1 - i); + NodeHandle node4(frag); + if (loc4 != NullTupLoc) { jam(); - selectNode(signal, frag, n4Ptr, n4Addr, AccHead); - ndbrequire(n4Ptr.p->getSide() == (1 - i) && - n4Ptr.p->getLink(2) == n3Addr); - n4Ptr.p->setSide(i); - n4Ptr.p->setLink(2, n5Addr); + selectNode(signal, node4, loc4, AccHead); + ndbrequire(node4.getSide() == (1 - i) && + node4.getLink(2) == loc3); + node4.setSide(i); + node4.setLink(2, loc5); }//if /* Retrieve the address of 5's parent before it is destroyed */ - TupAddr n0Addr = n5Ptr.p->getLink(2); + TupLoc loc0 = node5.getLink(2); /* The next step is to perform the rotation. 3 will inherit 5's parent @@ -503,22 +502,22 @@ Dbtux::treeRotateSingle(Signal* signal, 1. 3 must have had 5 as parent before the change. 2. 3's side is left for LL and right for RR before change. */ - ndbrequire(n3Ptr.p->getLink(2) == n5Addr); - ndbrequire(n3Ptr.p->getSide() == i); - n3Ptr.p->setLink(1 - i, n5Addr); - n3Ptr.p->setLink(2, n0Addr); - n3Ptr.p->setSide(n5side); - n5Ptr.p->setLink(i, n4Addr); - n5Ptr.p->setLink(2, n3Addr); - n5Ptr.p->setSide(1 - i); - if (n0Addr != NullTupAddr) { + ndbrequire(node3.getLink(2) == loc5); + ndbrequire(node3.getSide() == i); + node3.setLink(1 - i, loc5); + node3.setLink(2, loc0); + node3.setSide(side5); + node5.setLink(i, loc4); + node5.setLink(2, loc3); + node5.setSide(1 - i); + if (loc0 != NullTupLoc) { jam(); - NodeHandlePtr n0Ptr; - selectNode(signal, frag, n0Ptr, n0Addr, AccHead); - n0Ptr.p->setLink(n5side, n3Addr); + NodeHandle node0(frag); + selectNode(signal, node0, loc0, AccHead); + node0.setLink(side5, loc3); } else { jam(); - frag.m_tree.m_root = n3Addr; + frag.m_tree.m_root = loc3; }//if /* The final step of the change is to update the balance of 3 and 5 that changed places. There are two cases here. The first case is @@ -531,22 +530,22 @@ Dbtux::treeRotateSingle(Signal* signal, In this case 5 will change balance but still be unbalanced and 3 will be unbalanced in the opposite direction of 5. */ - if (n3Bal == n5Bal) { + if (bal3 == bal5) { jam(); - n3Ptr.p->setBalance(0); - n5Ptr.p->setBalance(0); - } else if (n3Bal == 0) { + node3.setBalance(0); + node5.setBalance(0); + } else if (bal3 == 0) { jam(); - n3Ptr.p->setBalance(-n5Bal); - n5Ptr.p->setBalance(n5Bal); + node3.setBalance(-bal5); + node5.setBalance(bal5); } else { ndbrequire(false); }//if /* - Set nodePtr to 3 as return parameter for enabling caller to continue + Set node to 3 as return parameter for enabling caller to continue traversing the tree. */ - nodePtr = n3Ptr; + node = node3; } /* @@ -651,105 +650,105 @@ Dbtux::treeRotateSingle(Signal* signal, * */ void -Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i) +Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i) { // old top node - NodeHandlePtr n6Ptr = nodePtr; - const TupAddr n6Addr = n6Ptr.p->m_addr; + NodeHandle node6 = node; + const TupLoc loc6 = node6.m_loc; // the un-updated balance - const int n6Bal = n6Ptr.p->getBalance(); - const unsigned n6Side = n6Ptr.p->getSide(); + const int bal6 = node6.getBalance(); + const unsigned side6 = node6.getSide(); // level 1 - TupAddr n2Addr = n6Ptr.p->getLink(i); - NodeHandlePtr n2Ptr; - selectNode(signal, frag, n2Ptr, n2Addr, AccHead); - const int n2Bal = n2Ptr.p->getBalance(); + TupLoc loc2 = node6.getLink(i); + NodeHandle node2(frag); + selectNode(signal, node2, loc2, AccHead); + const int bal2 = node2.getBalance(); // level 2 - TupAddr n4Addr = n2Ptr.p->getLink(1 - i); - NodeHandlePtr n4Ptr; - selectNode(signal, frag, n4Ptr, n4Addr, AccHead); - const int n4Bal = n4Ptr.p->getBalance(); + TupLoc loc4 = node2.getLink(1 - i); + NodeHandle node4(frag); + selectNode(signal, node4, loc4, AccHead); + const int bal4 = node4.getBalance(); ndbrequire(i <= 1); - ndbrequire(n6Bal + (1 - i) == i); - ndbrequire(n2Bal == -n6Bal); - ndbrequire(n2Ptr.p->getLink(2) == n6Addr); - ndbrequire(n2Ptr.p->getSide() == i); - ndbrequire(n4Ptr.p->getLink(2) == n2Addr); + ndbrequire(bal6 + (1 - i) == i); + ndbrequire(bal2 == -bal6); + ndbrequire(node2.getLink(2) == loc6); + ndbrequire(node2.getSide() == i); + ndbrequire(node4.getLink(2) == loc2); // level 3 - TupAddr n3Addr = n4Ptr.p->getLink(i); - TupAddr n5Addr = n4Ptr.p->getLink(1 - i); + TupLoc loc3 = node4.getLink(i); + TupLoc loc5 = node4.getLink(1 - i); // fill up leaf before it becomes internal - if (n3Addr == NullTupAddr && n5Addr == NullTupAddr) { + if (loc3 == NullTupLoc && loc5 == NullTupLoc) { jam(); TreeHead& tree = frag.m_tree; - accessNode(signal, frag, n2Ptr, AccFull); - accessNode(signal, frag, n4Ptr, AccFull); - n4Ptr.p->slide(signal, n2Ptr, i); + accessNode(signal, node2, AccFull); + accessNode(signal, node4, AccFull); + nodeSlide(signal, node4, node2, i); // implied by rule of merging half-leaves with leaves - ndbrequire(n4Ptr.p->getOccup() >= tree.m_minOccup); - ndbrequire(n2Ptr.p->getOccup() != 0); + ndbrequire(node4.getOccup() >= tree.m_minOccup); + ndbrequire(node2.getOccup() != 0); } else { - if (n3Addr != NullTupAddr) { + if (loc3 != NullTupLoc) { jam(); - NodeHandlePtr n3Ptr; - selectNode(signal, frag, n3Ptr, n3Addr, AccHead); - n3Ptr.p->setLink(2, n2Addr); - n3Ptr.p->setSide(1 - i); + NodeHandle node3(frag); + selectNode(signal, node3, loc3, AccHead); + node3.setLink(2, loc2); + node3.setSide(1 - i); } - if (n5Addr != NullTupAddr) { + if (loc5 != NullTupLoc) { jam(); - NodeHandlePtr n5Ptr; - selectNode(signal, frag, n5Ptr, n5Addr, AccHead); - n5Ptr.p->setLink(2, n6Ptr.p->m_addr); - n5Ptr.p->setSide(i); + NodeHandle node5(frag); + selectNode(signal, node5, loc5, AccHead); + node5.setLink(2, node6.m_loc); + node5.setSide(i); } } // parent - TupAddr n0Addr = n6Ptr.p->getLink(2); - NodeHandlePtr n0Ptr; + TupLoc loc0 = node6.getLink(2); + NodeHandle node0(frag); // perform the rotation - n6Ptr.p->setLink(i, n5Addr); - n6Ptr.p->setLink(2, n4Addr); - n6Ptr.p->setSide(1 - i); + node6.setLink(i, loc5); + node6.setLink(2, loc4); + node6.setSide(1 - i); - n2Ptr.p->setLink(1 - i, n3Addr); - n2Ptr.p->setLink(2, n4Addr); + node2.setLink(1 - i, loc3); + node2.setLink(2, loc4); - n4Ptr.p->setLink(i, n2Addr); - n4Ptr.p->setLink(1 - i, n6Addr); - n4Ptr.p->setLink(2, n0Addr); - n4Ptr.p->setSide(n6Side); + node4.setLink(i, loc2); + node4.setLink(1 - i, loc6); + node4.setLink(2, loc0); + node4.setSide(side6); - if (n0Addr != NullTupAddr) { + if (loc0 != NullTupLoc) { jam(); - selectNode(signal, frag, n0Ptr, n0Addr, AccHead); - n0Ptr.p->setLink(n6Side, n4Addr); + selectNode(signal, node0, loc0, AccHead); + node0.setLink(side6, loc4); } else { jam(); - frag.m_tree.m_root = n4Addr; + frag.m_tree.m_root = loc4; } // set balance of changed nodes - n4Ptr.p->setBalance(0); - if (n4Bal == 0) { + node4.setBalance(0); + if (bal4 == 0) { jam(); - n2Ptr.p->setBalance(0); - n6Ptr.p->setBalance(0); - } else if (n4Bal == -n2Bal) { + node2.setBalance(0); + node6.setBalance(0); + } else if (bal4 == -bal2) { jam(); - n2Ptr.p->setBalance(0); - n6Ptr.p->setBalance(n2Bal); - } else if (n4Bal == n2Bal) { + node2.setBalance(0); + node6.setBalance(bal2); + } else if (bal4 == bal2) { jam(); - n2Ptr.p->setBalance(-n2Bal); - n6Ptr.p->setBalance(0); + node2.setBalance(-bal2); + node6.setBalance(0); } else { ndbrequire(false); } // new top node - nodePtr = n4Ptr; + node = node4; } diff --git a/ndb/src/kernel/blocks/dbtux/Makefile.am b/ndb/src/kernel/blocks/dbtux/Makefile.am index 5ba59e8b3b785b43f8fa0a68927ed185831ee4d2..0b48ad5724f083bc33884f0c0c85355d36d51998 100644 --- a/ndb/src/kernel/blocks/dbtux/Makefile.am +++ b/ndb/src/kernel/blocks/dbtux/Makefile.am @@ -10,6 +10,8 @@ libdbtux_a_SOURCES = \ DbtuxCmp.cpp \ DbtuxDebug.cpp +INCLUDES_LOC = -I$(top_srcdir)/ndb/src/kernel/blocks/dbtup + include $(top_srcdir)/ndb/config/common.mk.am include $(top_srcdir)/ndb/config/type_kernel.mk.am diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt new file mode 100644 index 0000000000000000000000000000000000000000..78e35804b9ddab2c3b626d445cd01ad25a50ac4a --- /dev/null +++ b/ndb/src/kernel/blocks/dbtux/Times.txt @@ -0,0 +1,37 @@ +index maintenance overhead +========================== + +"mc02" 2x1700 MHz linux-2.4.9 gcc-2.96 -O3 one db-node + +case a: index on Unsigned +testOIBasic -case u -table 1 -index 1 -fragtype small -threads 10 -rows 100000 -subloop 1 -nologging + +case b: index on Varchar(5) + Varchar(5) + Varchar(20) + Unsigned +testOIBasic -case u -table 2 -index 4 -fragtype small -threads 10 -rows 100000 -subloop 1 -nologging + +1 million rows, pk update without index, pk update with index +shows ms / 1000 rows for each and pct overhead +the figures are based on single run on idle machine + +040616 mc02/a 40 ms 87 ms 114 pct + mc02/b 51 ms 128 ms 148 pct + +optim 1 mc02/a 38 ms 85 ms 124 pct + mc02/b 51 ms 123 ms 140 pct + +optim 2 mc02/a 41 ms 80 ms 96 pct + mc02/b 51 ms 117 ms 128 pct + +optim 3 mc02/a 43 ms 80 ms 85 pct + mc02/b 54 ms 118 ms 117 pct + +optim 4 mc02/a 42 ms 80 ms 87 pct + mc02/b 51 ms 119 ms 129 pct + +optim 5 mc02/a 43 ms 77 ms 77 pct + mc02/b 54 ms 118 ms 117 pct + +optim 6 mc02/a 42 ms 70 ms 66 pct + mc02/b 53 ms 109 ms 105 pct + +vim: set et: diff --git a/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp b/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp index ac29614bc70b07e10507397fe53272a67ae7822d..f2d2edb615df6ebaf7b7e9da9b46c0baa663c2d3 100644 --- a/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp +++ b/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp @@ -525,7 +525,7 @@ void Qmgr::execCM_REGREQ(Signal* signal) cmRegConf->dynamicId = TdynId; c_clusterNodes.copyto(NdbNodeBitmask::Size, cmRegConf->allNdbNodes); sendSignal(Tblockref, GSN_CM_REGCONF, signal, - CmRegConf::SignalLength, JBB); + CmRegConf::SignalLength, JBA); DEBUG_START(GSN_CM_REGCONF, refToNode(Tblockref), ""); /** @@ -847,7 +847,7 @@ void Qmgr::execCM_NODEINFOCONF(Signal* signal) nodePtr.i = getOwnNodeId(); ptrAss(nodePtr, nodeRec); ndbrequire(nodePtr.p->phase == ZSTARTING); - ndbrequire(c_start.m_gsn = GSN_CM_NODEINFOREQ); + ndbrequire(c_start.m_gsn == GSN_CM_NODEINFOREQ); c_start.m_nodes.clearWaitingFor(nodeId); /** @@ -1019,7 +1019,7 @@ void Qmgr::execCM_ADD(Signal* signal) ndbrequire(addNodePtr.i == nodePtr.i); switch(type){ case CmAdd::Prepare: - ndbrequire(c_start.m_gsn = GSN_CM_NODEINFOREQ); + ndbrequire(c_start.m_gsn == GSN_CM_NODEINFOREQ); /** * Wait for CM_NODEINFO_CONF */ diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp index a47d9d2099ec113f9f30f1f2766e8a7f1e6205b8..0ca8ce79e2eacdaeb4258cec5f7537154cd402f2 100644 --- a/ndb/test/ndbapi/testOIBasic.cpp +++ b/ndb/test/ndbapi/testOIBasic.cpp @@ -39,6 +39,7 @@ struct Opt { NdbDictionary::Object::FragmentType m_fragtype; const char* m_index; unsigned m_loop; + bool m_nologging; unsigned m_rows; unsigned m_scanrd; unsigned m_scanex; @@ -54,6 +55,7 @@ struct Opt { m_fragtype(NdbDictionary::Object::FragUndefined), m_index(0), m_loop(1), + m_nologging(false), m_rows(1000), m_scanrd(240), m_scanex(240), @@ -82,6 +84,7 @@ printhelp() << " -fragtype T fragment type single/small/medium/large" << endl << " -index xyz only given index numbers (digits 1-9)" << endl << " -loop N loop count full suite forever=0 [" << d.m_loop << "]" << endl + << " -nologging create tables in no-logging mode" << endl << " -rows N rows per thread [" << d.m_rows << "]" << endl << " -scanrd N scan read parallelism [" << d.m_scanrd << "]" << endl << " -scanex N scan exclusive parallelism [" << d.m_scanex << "]" << endl @@ -476,7 +479,7 @@ tt1 = { "TT1", 5, tt1col, 4, tt1itab }; -// tt2 + tt2x1 tt2x2 tt2x3 +// tt2 + tt2x1 tt2x2 tt2x3 tt2x4 static const Col tt2col[] = { @@ -505,6 +508,14 @@ tt2x3col[] = { { 1, tt2col[4] } }; +static const ICol +tt2x4col[] = { + { 0, tt2col[4] }, + { 1, tt2col[3] }, + { 2, tt2col[2] }, + { 3, tt2col[1] } +}; + static const ITab tt2x1 = { "TT2X1", 2, tt2x1col @@ -520,16 +531,22 @@ tt2x3 = { "TT2X3", 2, tt2x3col }; +static const ITab +tt2x4 = { + "TT2X4", 4, tt2x4col +}; + static const ITab tt2itab[] = { tt2x1, tt2x2, - tt2x3 + tt2x3, + tt2x4 }; static const Tab tt2 = { - "TT2", 5, tt2col, 3, tt2itab + "TT2", 5, tt2col, 4, tt2itab }; // all tables @@ -823,6 +840,9 @@ createtable(Par par) if (par.m_fragtype != NdbDictionary::Object::FragUndefined) { t.setFragmentType(par.m_fragtype); } + if (par.m_nologging) { + t.setLogging(false); + } for (unsigned k = 0; k < tab.m_cols; k++) { const Col& col = tab.m_col[k]; NdbDictionary::Column c(col.m_name); @@ -2202,7 +2222,6 @@ pkupdateindexbuild(Par par) { if (par.m_no == 0) { CHK(createindex(par) == 0); - CHK(invalidateindex(par) == 0); } else { CHK(pkupdate(par) == 0); } @@ -2493,6 +2512,7 @@ tbusybuild(Par par) RUNSTEP(par, pkinsert, MT); for (unsigned i = 0; i < par.m_subloop; i++) { RUNSTEP(par, pkupdateindexbuild, MT); + RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, readverify, MT); RUNSTEP(par, dropindex, ST); } @@ -2500,9 +2520,28 @@ tbusybuild(Par par) } static int -ttiming(Par par) +ttimebuild(Par par) +{ + Tmr t1; + RUNSTEP(par, droptable, ST); + RUNSTEP(par, createtable, ST); + RUNSTEP(par, invalidatetable, MT); + for (unsigned i = 0; i < par.m_subloop; i++) { + RUNSTEP(par, pkinsert, MT); + t1.on(); + RUNSTEP(par, createindex, ST); + t1.off(par.m_totrows); + RUNSTEP(par, invalidateindex, MT); + RUNSTEP(par, dropindex, ST); + } + LL1("build index - " << t1.time()); + return 0; +} + +static int +ttimemaint(Par par) { - Tmr t0, t1, t2; + Tmr t1, t2; RUNSTEP(par, droptable, ST); RUNSTEP(par, createtable, ST); RUNSTEP(par, invalidatetable, MT); @@ -2511,16 +2550,13 @@ ttiming(Par par) t1.on(); RUNSTEP(par, pkupdate, MT); t1.off(par.m_totrows); - t0.on(); RUNSTEP(par, createindex, ST); RUNSTEP(par, invalidateindex, MT); - t0.off(par.m_totrows); t2.on(); RUNSTEP(par, pkupdate, MT); t2.off(par.m_totrows); RUNSTEP(par, dropindex, ST); } - LL1("build index - " << t0.time()); LL1("update - " << t1.time()); LL1("update indexed - " << t2.time()); LL1("overhead - " << t2.over(t1)); @@ -2551,7 +2587,8 @@ tcaselist[] = { TCase("b", tpkops, "pk operations and scan reads"), TCase("c", tmixedops, "pk operations and scan operations"), TCase("d", tbusybuild, "pk operations and index build"), - TCase("t", ttiming, "time index build and maintenance"), + TCase("t", ttimebuild, "time index build"), + TCase("u", ttimemaint, "time index maintenance"), TCase("z", tdrop, "drop test tables") }; @@ -2689,6 +2726,10 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) continue; } } + if (strcmp(arg, "-nologging") == 0) { + g_opt.m_nologging = true; + continue; + } if (strcmp(arg, "-rows") == 0) { if (++argv, --argc > 0) { g_opt.m_rows = atoi(argv[0]); diff --git a/ndb/test/run-test/make-html-reports.sh b/ndb/test/run-test/make-html-reports.sh index 079650a729f5e6e2a7100f6d5a30c943648368ba..63fa00f95b05d90a81d90c3fb6e229e277637331 100755 --- a/ndb/test/run-test/make-html-reports.sh +++ b/ndb/test/run-test/make-html-reports.sh @@ -1,106 +1,23 @@ #!/bin/sh -# NAME -# make-html-reports.sh -# -# SYNOPSIS -# make-html-reports.sh [-q] [ -R <YYYY-MM-DD> ] [ -s <src dir> ] [ -d <dst dir> ] [ -c <conf dir> ] -# -# DESCRIPTION -# -# OPTIONS -# -# EXAMPLES -# -# -# ENVIRONMENT -# NDB_PROJ_HOME Home dir for ndb -# -# FILES -# $NDB_PROJ_HOME/lib/funcs.sh general shell script functions -# -# -# SEE ALSO -# -# DIAGNOSTICTS -# -# VERSION -# 1.0 -# -# AUTHOR -# Jonas Oreland -# - -progname=`basename $0` -synopsis="make-html-reports.sh [ -R <YYYY-MM-DD> ] [ -s <src dir> ] [ -d <dst dir> ] [ -c <conf dir> ]" - -: ${NDB_PROJ_HOME:?} # If undefined, exit with error message - -: ${NDB_LOCAL_BUILD_OPTIONS:=--} # If undef, set to --. Keeps getopts happy. - # You may have to experiment a bit - # to get quoting right (if you need it). - - -. $NDB_PROJ_HOME/lib/funcs.sh # Load some good stuff - -# defaults for options related variables -# - - -src_dir=`pwd` -dst_dir=`pwd` -conf_dir=`pwd` -report_date=`date '+%Y-%m-%d'` -uniq_id=$$.$$ -verbose=yes - -# used if error when parsing the options environment variable -# -env_opterr="options environment variable: <<$options>>" - -# Option parsing, for the options variable as well as the command line. -# -# We want to be able to set options in an environment variable, -# as well as on the command line. In order not to have to repeat -# the same getopts information twice, we loop two times over the -# getopts while loop. The first time, we process options from -# the options environment variable, the second time we process -# options from the command line. -# -# The things to change are the actual options and what they do. -# -# - -for optstring in "$options" "" # 1. options variable 2. cmd line -do - - while getopts q:s:R:d:c: i $optstring # optstring empty => no arg => cmd line - do - case $i in - - q) verbose="";; # echo important things - d) dst_dir=$OPTARG;; # Destination directory - s) src_dir=$OPTARG;; # Destination directory - c) conf_dir=$OPTARG;; # - R) report_date=$OPTARG;; # - \?) syndie $env_opterr;; # print synopsis and exit - - esac - done - - [ -n "$optstring" ] && OPTIND=1 # Reset for round 2, cmdline options - env_opterr= # Round 2 should not use the value +src_dir=$1 +run=$2 +date=$3 +src_file=$src_dir/report.txt -done -shift `expr $OPTIND - 1` - -src_dir=`abspath $src_dir` -dst_dir=`abspath $dst_dir` -conf_dir=`abspath $conf_dir` +if [ ! -f $src_dir/report.txt ] +then + echo "$src_dir/report.txt is missing" + exit 1 +fi ### # # General html functions +trim(){ + echo $* +} + header(){ cat <<EOF <html><head><title>$*</title></head> @@ -166,64 +83,7 @@ hr(){ EOF } -# --- option parsing done --- - # -- Verify -trace "Verifying arguments" -summary_file=$src_dir/reports/summary.$report_date - -if [ ! -r $summary_file ] -then - syndie "Invalid src directory or report date: $summary_file not found" -fi - -if [ ! -d $conf_dir/configurations ] -then - syndie "Invalid src directory: $conf_dir/configurations not found" -fi - -if [ ! -d $conf_dir/testcases ] -then - syndie "Invalid src directory: $conf_dir/testcases not found" -fi - -if [ ! -d $dst_dir ] -then - syndie "Invalid dst dir..." -fi - -# --- option verifying done --- - -trace "src_dir: $src_dir" -trace "dst_dir: $dst_dir" -trace "conf_dir: $conf_dir" -trace "report date: $report_date" - -### -config_spec(){ - cat <<EOF -<a href=#$1>$1</a> -EOF -} - -config_spec_include(){ - # Print the $1 file to the file we are generating - cat <<EOF -<a name=$1><pre> -EOF - if [ -r $conf_dir/configurations/$1 ] - then - cat -E $conf_dir/configurations/$1 | sed 's/\$/<BR>/g' - else - cat <<EOF - Config spec $1 not found -EOF - fi -cat <<EOF -</pre></a> -EOF -} - time_spec(){ # $1 - secs _ts_tmp=$1 @@ -232,8 +92,14 @@ time_spec(){ _ts_tmp=`expr $_ts_tmp / 60` _ts_m=`expr $_ts_tmp % 60` - _ts_tmp=`expr $_ts_tmp / 60` + if [ $_ts_tmp -ge 60 ] + then + _ts_tmp=`expr $_ts_tmp / 60` + else + _ts_tmp=0 + fi + a=3 _ts_h=$_ts_tmp if [ $_ts_h -gt 0 ] @@ -247,191 +113,77 @@ time_spec(){ echo $ret } -log_spec(){ - _ff_=$src_dir/log/$report_date/$1.$2/test.$3.out - if [ -r $_ff_ ] && [ -s $_ff_ ] - then - _f2_=$dst_dir/log.$report_date.$1.$2.$3.out.gz - if [ -r $_f2_ ] - then - rm $_f2_ - fi - cp $_ff_ $dst_dir/log.$report_date.$1.$2.$3.out - gzip $dst_dir/log.$report_date.$1.$2.$3.out - rm -f $dst_dir/log.$report_date.$1.$2.$3.out - echo "<a href=log.$report_date.$1.$2.$3.out.gz>Log file</a>" - else - echo "-" - fi -} +### Main -err_spec(){ - _ff_=$src_dir/log/$report_date/$1.$2/test.$3.err.tar - if [ -r $_ff_ ] && [ -s $_ff_ ] - then - cp $_ff_ $dst_dir/err.$report_date.$1.$2.$3.err.tar - gzip $dst_dir/err.$report_date.$1.$2.$3.err.tar - rm -f $dst_dir/err.$report_date.$1.$2.$3.err.tar - echo "<a href=err.$report_date.$1.$2.$3.err.tar.gz>Error tarball</a>" - else - echo "-" - fi -} +report_file=$src_dir/report.html +summary_file=$src_dir/summary.html -command_spec(){ - echo $* | sed 's/;/<BR>/g' -} +passed=0 +failed=0 +total=0 -### Main +pass(){ + passed=`expr $passed + 1` +} -html_summary_file=$dst_dir/summary.$report_date.html +fail(){ + failed=`expr $failed + 1` +} -trace "Creating summary" ( - eval `grep "TOTAL" $summary_file | awk -F";" '{ printf("test_file=\"%s\"; elapsed=\"%s\"; started=\"%s\"; stopped=\"%s\"", $2, $3, $4, $5); }'` - - header "Autotest summary $report_date" - heading 1 "Autotest summary $report_date" - table - row ; column `bold test file: `; column $test_file ; end_row - row ; column `bold Started:` ; column "$started "; end_row - row ; column `bold Stopped:` ; column "$stopped "; end_row - row ; column `bold Elapsed:` ; column "`time_spec $elapsed secs`" ; end_row - end_table - hr - - table "border=1" - row - c_column `bold Report` - c_column `bold Tag` - c_column `bold Version` - c_column `bold Distr-Config` - c_column `bold Db-Config` - c_column `bold Type` - c_column `bold Test file` - c_column `bold Make` - c_column `bold Config` - c_column `bold Test time` - c_column `bold Passed` - c_column `bold Failed` - end_row - - grep -v "^#" $summary_file | grep -v TOTAL | sed 's/;/ /g' | \ - while read tag version config template type test_file make_res make_time conf_res conf_time test_time passed failed - do + header Report $run $date + table "border=1" row - if [ -r $src_dir/reports/report.$tag.$version.$config.$template.$type.$test_file.$report_date ] - then - column "<a href=\"report.$tag.$version.$config.$template.$type.$test_file.$report_date.html\">report</a>" - else - column "-" - fi - - column $tag - column $version - column $config - column $template - column $type - column $test_file - column "$make_res(`time_spec $make_time`)" - column "$conf_res(`time_spec $conf_time`)" - c_column "`time_spec $test_time`" - c_column `bold $passed` - c_column `bold $failed` + column `bold Test case` + column `bold Result` + column `bold Elapsed` + column `bold Log` end_row - done - end_table +) > $report_file - footer -) > $html_summary_file - -for i in $src_dir/reports/report.*.$report_date +cat $src_file | while read line do - f=`basename $i` - trace "Creating report: $f" - eval `echo $f | awk -F"." '{printf("tag=%s;version=%s;config=%s;template=%s;type=%s;test_file=%s", $2, $3, $4, $5, $6, $7);}'` - - ( - header "Autotest report $report_date" - heading 1 "Autotest report $report_date" - table #"border=1" - row ; column `bold Tag:`; column $tag ; end_row - row ; column `bold Version:` ; column $version ; end_row - row ; column `bold Configuration:` ; column `config_spec $config`; end_row - row ; column `bold Template:` ; column `config_spec $template`; end_row - row ; column `bold Type:` ; column $type ; end_row - row ; column `bold Test file:` ; column $test_file; end_row - end_table - hr - - table "border=1" - row - c_column `bold Test case` - c_column `bold Result` - c_column `bold Test time` - c_column `bold Logfile` - c_column `bold Error tarfile` - end_row - - grep -v "^#" $i | sed 's/;/ /g' | \ - while read test_no test_res test_time cmd - do - row - column "`command_spec $cmd`" - case "$test_res" in - 0) - column "PASSED";; - 1001) - column "API error";; - 1002) - column "Max time expired";; - 1003) - column "Mgm port busy";; - *) - column "Unknown: $test_res";; - esac - - column "`time_spec $test_time`" - - column "`log_spec $tag $version $test_no`" - column "`err_spec $tag $version $test_no`" - end_row - done - end_table - - # Last on page we include spec - # of used machines and template for config - # for future reference - hr - table "border=1" - row; column `bold Configuration:` $config; end_row - row; column `config_spec_include $config`; end_row - end_table - hr - table "border=1" - row; column `bold Template:` $template; end_row - row; column `config_spec_include $template`; end_row - end_table - - footer - - ) > $dst_dir/$f.html + eval `echo $line | awk -F";" '{ printf("prg=\"%s\"; no=\"%s\"; res=\"%s\"; time=\"%s\"", $1, $2, $3, $4); }'` + + prg=`trim $prg` + no=`trim $no` + res=`trim $res` + time=`trim $time` + res_dir="<a href=\"result-$run/$date/result.$no/\">log</a>" + + ts=`time_spec $time` + res_txt="" + case $res in + 0) pass; res_txt="PASSED"; res_dir=" ";; + *) fail; res_txt="FAILED";; + esac + total=`expr $total + $time` + + ( + row + column $prg + column $res_txt + column $ts + column $res_dir + end_row + ) >> $report_file + + ( + row + column $run + column $date + column $passed + column $failed + column `time_spec $total` + column "<a href=\"result-$run/$date/report.html\">report</a>" + column "<a href=\"result-$run/$date/log.txt\">log.txt</a>" + end_row + ) > $summary_file done -# Re creating index -trace "Recreating index" ( - header "Autotest super-duper index" - heading 1 "<center>Autotest super-duper index</center>" - hr - for i in `ls $dst_dir/summary.*.html | sort -r -n` - do - f=`basename $i` - cat <<EOF -<p><a href=$f>$f</a></p> -EOF - done - footer -) > $dst_dir/index.html + end_table + footer +) >> $report_file exit 0