Commit 26d2d453 authored by pekka@mysql.com's avatar pekka@mysql.com

Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into mysql.com:/space/pekka/ndb/version/my41
parents e445902b a8fa01de
...@@ -39,7 +39,7 @@ public: ...@@ -39,7 +39,7 @@ public:
SearchError = 895, // add + found or remove + not found SearchError = 895, // add + found or remove + not found
NoMemError = 827 NoMemError = 827
}; };
STATIC_CONST( SignalLength = 7 ); STATIC_CONST( SignalLength = 8 );
private: private:
/* /*
* Error code set by TUX. Zero means no error. * Error code set by TUX. Zero means no error.
...@@ -52,10 +52,11 @@ private: ...@@ -52,10 +52,11 @@ private:
Uint32 indexId; Uint32 indexId;
Uint32 fragId; Uint32 fragId;
/* /*
* Tuple version identified by logical address of "original" tuple and * Tuple version identified by physical address of "original" tuple
* version number. * and version number.
*/ */
Uint32 tupAddr; Uint32 pageId;
Uint32 pageOffset;
Uint32 tupVersion; Uint32 tupVersion;
/* /*
* Operation code and flags. * Operation code and flags.
......
...@@ -24,10 +24,10 @@ printTUX_MAINT_REQ(FILE* output, const Uint32* theData, Uint32 len, Uint16 rbn) ...@@ -24,10 +24,10 @@ printTUX_MAINT_REQ(FILE* output, const Uint32* theData, Uint32 len, Uint16 rbn)
//const bool inOut = rbn & (1 << 15); //const bool inOut = rbn & (1 << 15);
const TuxMaintReq* const sig = (const TuxMaintReq*)theData; const TuxMaintReq* const sig = (const TuxMaintReq*)theData;
fprintf(output, " errorCode=%d\n", sig->errorCode); fprintf(output, " errorCode=%d\n", sig->errorCode);
fprintf(output, " table: id=%d", sig->tableId); fprintf(output, " table: id=%u", sig->tableId);
fprintf(output, " index: id=%d", sig->indexId); fprintf(output, " index: id=%u", sig->indexId);
fprintf(output, " fragment: id=%d\n", sig->fragId); fprintf(output, " fragment: id=%u\n", sig->fragId);
fprintf(output, " tuple: addr=0x%x version=%d\n", sig->tupAddr, sig->tupVersion); fprintf(output, " tuple: loc=%u.%u version=%u\n", sig->pageId, sig->pageOffset, sig->tupVersion);
const Uint32 opCode = sig->opInfo & 0xFF; const Uint32 opCode = sig->opInfo & 0xFF;
const Uint32 opFlag = sig->opInfo >> 8; const Uint32 opFlag = sig->opInfo >> 8;
switch (opCode ) { switch (opCode ) {
......
...@@ -996,6 +996,11 @@ public: ...@@ -996,6 +996,11 @@ public:
Dbtup(const class Configuration &); Dbtup(const class Configuration &);
virtual ~Dbtup(); virtual ~Dbtup();
/*
* TUX uses logical tuple address when talking to ACC and LQH.
*/
void tuxGetTupAddr(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32& tupAddr);
/* /*
* TUX index in TUP has single Uint32 array attribute which stores an * TUX index in TUP has single Uint32 array attribute which stores an
* index node. TUX uses following methods. * index node. TUX uses following methods.
...@@ -1004,6 +1009,13 @@ public: ...@@ -1004,6 +1009,13 @@ public:
void tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node); void tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node);
void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node); void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node);
/*
* TUX reads primary table attributes for 1) index key 2) primary key
* when returning keyinfo. TUX uses following methods.
*/
void tuxReadAttrs(); // under construction
void tuxReadKeys(); // under construction
private: private:
BLOCK_DEFINES(Dbtup); BLOCK_DEFINES(Dbtup);
......
...@@ -30,6 +30,97 @@ ...@@ -30,6 +30,97 @@
// methods used by ordered index // methods used by ordered index
void
Dbtup::tuxGetTupAddr(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32& tupAddr)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
Uint32 fragPageId = pagePtr.p->pageWord[ZPAGE_FRAG_PAGE_ID_POS];
Uint32 tupheadsize = tablePtr.p->tupheadsize;
ndbrequire(pageOffset >= ZPAGE_HEADER_SIZE);
Uint32 offset = pageOffset - ZPAGE_HEADER_SIZE;
ndbrequire(offset % tupheadsize == 0);
Uint32 pageIndex = (offset / tupheadsize) << 1;
tupAddr = (fragPageId << MAX_TUPLES_BITS) | pageIndex;
}
int
Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
terrorCode = 0;
if (! allocTh(fragPtr.p, tablePtr.p, NORMAL_PAGE, signal, pageOffset, pagePtr)) {
jam();
ndbrequire(terrorCode != 0);
return terrorCode;
}
pageId = pagePtr.i;
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
return 0;
}
void
Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
ndbrequire(node == &pagePtr.p->pageWord[pageOffset] + attrDataOffset);
freeTh(fragPtr.p, tablePtr.p, signal, pagePtr.p, pageOffset);
}
void
Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
}
void // under construction
Dbtup::tuxReadAttrs()
{
}
void // under construction
Dbtup::tuxReadKeys()
{
}
// deprecated signal interfaces
void void
Dbtup::execTUP_READ_ATTRS(Signal* signal) Dbtup::execTUP_READ_ATTRS(Signal* signal)
{ {
...@@ -179,64 +270,6 @@ Dbtup::execTUP_QUERY_TH(Signal* signal) ...@@ -179,64 +270,6 @@ Dbtup::execTUP_QUERY_TH(Signal* signal)
return; return;
} }
int
Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
terrorCode = 0;
if (! allocTh(fragPtr.p, tablePtr.p, NORMAL_PAGE, signal, pageOffset, pagePtr)) {
jam();
ndbrequire(terrorCode != 0);
return terrorCode;
}
pageId = pagePtr.i;
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
return 0;
}
void
Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
ndbrequire(node == &pagePtr.p->pageWord[pageOffset] + attrDataOffset);
freeTh(fragPtr.p, tablePtr.p, signal, pagePtr.p, pageOffset);
}
void
Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
}
void void
Dbtup::execTUP_STORE_TH(Signal* signal) Dbtup::execTUP_STORE_TH(Signal* signal)
{ {
...@@ -483,7 +516,8 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI) ...@@ -483,7 +516,8 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI)
buildPtr.p->m_tupleNo = 0; buildPtr.p->m_tupleNo = 0;
break; break;
} }
pagePtr.i = getRealpid(fragPtr.p, buildPtr.p->m_pageId); Uint32 realPageId = getRealpid(fragPtr.p, buildPtr.p->m_pageId);
pagePtr.i = realPageId;
ptrCheckGuard(pagePtr, cnoOfPage, page); ptrCheckGuard(pagePtr, cnoOfPage, page);
const Uint32 pageState = pagePtr.p->pageWord[ZPAGE_STATE_POS]; const Uint32 pageState = pagePtr.p->pageWord[ZPAGE_STATE_POS];
if (pageState != ZTH_MM_FREE && if (pageState != ZTH_MM_FREE &&
...@@ -497,8 +531,7 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI) ...@@ -497,8 +531,7 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI)
} }
// get tuple // get tuple
const Uint32 tupheadsize = tablePtr.p->tupheadsize; const Uint32 tupheadsize = tablePtr.p->tupheadsize;
const Uint32 pageOffset = ZPAGE_HEADER_SIZE + Uint32 pageOffset = ZPAGE_HEADER_SIZE + buildPtr.p->m_tupleNo * tupheadsize;
buildPtr.p->m_tupleNo * tupheadsize;
if (pageOffset + tupheadsize > ZWORDS_ON_PAGE) { if (pageOffset + tupheadsize > ZWORDS_ON_PAGE) {
ljam(); ljam();
buildPtr.p->m_pageId++; buildPtr.p->m_pageId++;
...@@ -530,15 +563,14 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI) ...@@ -530,15 +563,14 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI)
buildPtr.p->m_tupleNo++; buildPtr.p->m_tupleNo++;
break; break;
} }
Uint32 tupVersion = pagePtr.p->pageWord[pageOffset + 1];
OperationrecPtr pageOperPtr; OperationrecPtr pageOperPtr;
pageOperPtr.i = pagePtr.p->pageWord[pageOffset]; pageOperPtr.i = pagePtr.p->pageWord[pageOffset];
Uint32 pageId = buildPtr.p->m_pageId;
Uint32 pageIndex = buildPtr.p->m_tupleNo << 1;
if (pageOperPtr.i != RNIL) { if (pageOperPtr.i != RNIL) {
/* /*
If there is an ongoing operation on the tuple then it is either a If there is an ongoing operation on the tuple then it is either a
copy tuple or an original tuple with an ongoing transaction. In copy tuple or an original tuple with an ongoing transaction. In
both cases fragPageId and pageIndex refers to the original tuple. both cases realPageId and pageOffset refer to the original tuple.
The tuple address stored in TUX will always be the original tuple The tuple address stored in TUX will always be the original tuple
but with the tuple version of the tuple we found. but with the tuple version of the tuple we found.
...@@ -550,10 +582,9 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI) ...@@ -550,10 +582,9 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI)
*/ */
jam(); jam();
ptrCheckGuard(pageOperPtr, cnoOfOprec, operationrec); ptrCheckGuard(pageOperPtr, cnoOfOprec, operationrec);
pageId = pageOperPtr.p->fragPageId; realPageId = pageOperPtr.p->realPageId;
pageIndex = pageOperPtr.p->pageIndex; pageOffset = pageOperPtr.p->pageOffset;
}//if }//if
Uint32 tup_version = pagePtr.p->pageWord[pageOffset + 1];
#ifdef TIME_MEASUREMENT #ifdef TIME_MEASUREMENT
NdbTick_getMicroTimer(&start); NdbTick_getMicroTimer(&start);
#endif #endif
...@@ -563,8 +594,9 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI) ...@@ -563,8 +594,9 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI)
req->tableId = tablePtr.i; req->tableId = tablePtr.i;
req->indexId = triggerPtr.p->indexId; req->indexId = triggerPtr.p->indexId;
req->fragId = tablePtr.p->fragid[buildPtr.p->m_fragNo]; req->fragId = tablePtr.p->fragid[buildPtr.p->m_fragNo];
req->tupAddr = (pageId << MAX_TUPLES_BITS) | pageIndex; req->pageId = realPageId;
req->tupVersion = tup_version; req->pageOffset = pageOffset;
req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpAdd; req->opInfo = TuxMaintReq::OpAdd;
EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ, EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ,
signal, TuxMaintReq::SignalLength); signal, TuxMaintReq::SignalLength);
......
...@@ -962,7 +962,8 @@ Dbtup::executeTuxInsertTriggers(Signal* signal, ...@@ -962,7 +962,8 @@ Dbtup::executeTuxInsertTriggers(Signal* signal,
// fill in constant part // fill in constant part
req->tableId = regOperPtr->tableRef; req->tableId = regOperPtr->tableRef;
req->fragId = regOperPtr->fragId; req->fragId = regOperPtr->fragId;
req->tupAddr = (regOperPtr->fragPageId << MAX_TUPLES_BITS) | regOperPtr->pageIndex; req->pageId = regOperPtr->realPageId;
req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpAdd; req->opInfo = TuxMaintReq::OpAdd;
// loop over index list // loop over index list
...@@ -1000,7 +1001,8 @@ Dbtup::executeTuxUpdateTriggers(Signal* signal, ...@@ -1000,7 +1001,8 @@ Dbtup::executeTuxUpdateTriggers(Signal* signal,
// fill in constant part // fill in constant part
req->tableId = regOperPtr->tableRef; req->tableId = regOperPtr->tableRef;
req->fragId = regOperPtr->fragId; req->fragId = regOperPtr->fragId;
req->tupAddr = (regOperPtr->fragPageId << MAX_TUPLES_BITS) | regOperPtr->pageIndex; req->pageId = regOperPtr->realPageId;
req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpAdd; req->opInfo = TuxMaintReq::OpAdd;
// loop over index list // loop over index list
...@@ -1009,7 +1011,6 @@ Dbtup::executeTuxUpdateTriggers(Signal* signal, ...@@ -1009,7 +1011,6 @@ Dbtup::executeTuxUpdateTriggers(Signal* signal,
triggerList.first(triggerPtr); triggerList.first(triggerPtr);
while (triggerPtr.i != RNIL) { while (triggerPtr.i != RNIL) {
ljam(); ljam();
req->tupAddr = (regOperPtr->fragPageId << MAX_TUPLES_BITS) | regOperPtr->pageIndex;
req->indexId = triggerPtr.p->indexId; req->indexId = triggerPtr.p->indexId;
req->errorCode = RNIL; req->errorCode = RNIL;
EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ, EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ,
...@@ -1074,7 +1075,8 @@ Dbtup::executeTuxCommitTriggers(Signal* signal, ...@@ -1074,7 +1075,8 @@ Dbtup::executeTuxCommitTriggers(Signal* signal,
// fill in constant part // fill in constant part
req->tableId = regOperPtr->tableRef; req->tableId = regOperPtr->tableRef;
req->fragId = regOperPtr->fragId; req->fragId = regOperPtr->fragId;
req->tupAddr = (regOperPtr->fragPageId << MAX_TUPLES_BITS) | regOperPtr->pageIndex; req->pageId = regOperPtr->realPageId;
req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpRemove; req->opInfo = TuxMaintReq::OpRemove;
// loop over index list // loop over index list
...@@ -1117,7 +1119,8 @@ Dbtup::executeTuxAbortTriggers(Signal* signal, ...@@ -1117,7 +1119,8 @@ Dbtup::executeTuxAbortTriggers(Signal* signal,
// fill in constant part // fill in constant part
req->tableId = regOperPtr->tableRef; req->tableId = regOperPtr->tableRef;
req->fragId = regOperPtr->fragId; req->fragId = regOperPtr->fragId;
req->tupAddr = (regOperPtr->fragPageId << MAX_TUPLES_BITS) | regOperPtr->pageIndex; req->pageId = regOperPtr->realPageId;
req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpRemove; req->opInfo = TuxMaintReq::OpRemove;
// loop over index list // loop over index list
......
...@@ -183,17 +183,17 @@ private: ...@@ -183,17 +183,17 @@ private:
// tree definitions // tree definitions
/* /*
* Tree entry. Points to a tuple in primary table via logical address * Tree entry. Points to a tuple in primary table via physical
* of "original" tuple and tuple version. Uses 2 words to get correct * address of "original" tuple and tuple version.
* aligment (one byte is wasted currently). *
* ZTUP_VERSION_BITS must be 15 (or less).
*/ */
struct TreeEnt; struct TreeEnt;
friend struct TreeEnt; friend struct TreeEnt;
struct TreeEnt { struct TreeEnt {
TupAddr m_tupAddr; // address of original tuple TupLoc m_tupLoc; // address of original tuple
Uint16 m_tupVersion; // version unsigned m_tupVersion : 15; // version
Uint8 m_fragBit; // which duplicated table fragment unsigned m_fragBit : 1; // which duplicated table fragment
Uint8 pad1;
TreeEnt(); TreeEnt();
// methods // methods
int cmp(const TreeEnt ent) const; int cmp(const TreeEnt ent) const;
...@@ -318,10 +318,9 @@ private: ...@@ -318,10 +318,9 @@ private:
* Attribute metadata. Size must be multiple of word size. * Attribute metadata. Size must be multiple of word size.
*/ */
struct DescAttr { struct DescAttr {
unsigned m_primaryAttrId : 16; Uint32 m_attrDesc; // standard AttributeDescriptor
unsigned m_typeId : 8; Uint16 m_primaryAttrId;
unsigned m_nullable : 1; Uint16 m_typeId;
unsigned pad1 : 7;
}; };
static const unsigned DescAttrSize = sizeof(DescAttr) >> 2; static const unsigned DescAttrSize = sizeof(DescAttr) >> 2;
...@@ -615,8 +614,6 @@ private: ...@@ -615,8 +614,6 @@ private:
bool allocDescEnt(IndexPtr indexPtr); bool allocDescEnt(IndexPtr indexPtr);
void freeDescEnt(IndexPtr indexPtr); void freeDescEnt(IndexPtr indexPtr);
void dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 senderData); void dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 senderData);
// helpers
DescEnt& getDescEnt(Uint32 descPage, Uint32 descOff);
/* /*
* DbtuxMaint.cpp * DbtuxMaint.cpp
...@@ -722,7 +719,9 @@ private: ...@@ -722,7 +719,9 @@ private:
// buffers // buffers
Data c_keyBuffer; // search key or scan bound Data c_keyBuffer; // search key or scan bound
// small stuff // inlined utils
DescEnt& getDescEnt(Uint32 descPage, Uint32 descOff);
Uint32 getTupAddr(const Frag& frag, const TreeEnt ent);
static unsigned min(unsigned x, unsigned y); static unsigned min(unsigned x, unsigned y);
static unsigned max(unsigned x, unsigned y); static unsigned max(unsigned x, unsigned y);
}; };
...@@ -852,24 +851,26 @@ Dbtux::TupLoc::operator!=(const TupLoc& loc) const ...@@ -852,24 +851,26 @@ Dbtux::TupLoc::operator!=(const TupLoc& loc) const
inline inline
Dbtux::TreeEnt::TreeEnt() : Dbtux::TreeEnt::TreeEnt() :
m_tupAddr(NullTupAddr), m_tupLoc(),
m_tupVersion(0), m_tupVersion(0),
m_fragBit(255), m_fragBit(0)
pad1(0)
{ {
} }
inline int inline int
Dbtux::TreeEnt::cmp(const TreeEnt ent) const Dbtux::TreeEnt::cmp(const TreeEnt ent) const
{ {
// compare frags first (not optimal but makes easier to read logs)
if (m_fragBit < ent.m_fragBit) if (m_fragBit < ent.m_fragBit)
return -1; return -1;
if (m_fragBit > ent.m_fragBit) if (m_fragBit > ent.m_fragBit)
return +1; return +1;
if (m_tupAddr < ent.m_tupAddr) if (m_tupLoc.m_pageId < ent.m_tupLoc.m_pageId)
return -1;
if (m_tupLoc.m_pageId > ent.m_tupLoc.m_pageId)
return +1;
if (m_tupLoc.m_pageOffset < ent.m_tupLoc.m_pageOffset)
return -1; return -1;
if (m_tupAddr > ent.m_tupAddr) if (m_tupLoc.m_pageOffset > ent.m_tupLoc.m_pageOffset)
return +1; return +1;
if (m_tupVersion < ent.m_tupVersion) if (m_tupVersion < ent.m_tupVersion)
return -1; return -1;
...@@ -1252,7 +1253,7 @@ Dbtux::PrintPar::PrintPar() : ...@@ -1252,7 +1253,7 @@ Dbtux::PrintPar::PrintPar() :
} }
#endif #endif
// other methods // utils
inline Dbtux::DescEnt& inline Dbtux::DescEnt&
Dbtux::getDescEnt(Uint32 descPage, Uint32 descOff) Dbtux::getDescEnt(Uint32 descPage, Uint32 descOff)
...@@ -1265,6 +1266,16 @@ Dbtux::getDescEnt(Uint32 descPage, Uint32 descOff) ...@@ -1265,6 +1266,16 @@ Dbtux::getDescEnt(Uint32 descPage, Uint32 descOff)
return *descEnt; return *descEnt;
} }
inline Uint32
Dbtux::getTupAddr(const Frag& frag, const TreeEnt ent)
{
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc;
Uint32 tupAddr = NullTupAddr;
c_tup->tuxGetTupAddr(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupAddr);
return tupAddr;
}
inline unsigned inline unsigned
Dbtux::min(unsigned x, unsigned y) Dbtux::min(unsigned x, unsigned y)
{ {
......
...@@ -198,7 +198,7 @@ operator<<(NdbOut& out, const Dbtux::TupLoc& loc) ...@@ -198,7 +198,7 @@ operator<<(NdbOut& out, const Dbtux::TupLoc& loc)
if (loc == Dbtux::NullTupLoc) { if (loc == Dbtux::NullTupLoc) {
out << "null"; out << "null";
} else { } else {
out << hex << loc.m_pageId; out << dec << loc.m_pageId;
out << "." << dec << loc.m_pageOffset; out << "." << dec << loc.m_pageOffset;
} }
return out; return out;
...@@ -208,7 +208,7 @@ NdbOut& ...@@ -208,7 +208,7 @@ NdbOut&
operator<<(NdbOut& out, const Dbtux::TreeEnt& ent) operator<<(NdbOut& out, const Dbtux::TreeEnt& ent)
{ {
out << dec << ent.m_fragBit; out << dec << ent.m_fragBit;
out << "-" << hex << ent.m_tupAddr; out << "-" << ent.m_tupLoc;
out << "-" << dec << ent.m_tupVersion; out << "-" << dec << ent.m_tupVersion;
return out; return out;
} }
...@@ -264,9 +264,9 @@ NdbOut& ...@@ -264,9 +264,9 @@ NdbOut&
operator<<(NdbOut& out, const Dbtux::DescAttr& descAttr) operator<<(NdbOut& out, const Dbtux::DescAttr& descAttr)
{ {
out << "[DescAttr " << hex << &descAttr; out << "[DescAttr " << hex << &descAttr;
out << " [attrDesc " << hex << descAttr.m_attrDesc;
out << " [primaryAttrId " << dec << descAttr.m_primaryAttrId << "]"; out << " [primaryAttrId " << dec << descAttr.m_primaryAttrId << "]";
out << " [typeId " << dec << descAttr.m_typeId << "]"; out << " [typeId " << dec << descAttr.m_typeId << "]";
out << " [nullable " << dec << descAttr.m_nullable << "]";
out << "]"; out << "]";
return out; return out;
} }
......
...@@ -35,10 +35,10 @@ Dbtux::Dbtux(const Configuration& conf) : ...@@ -35,10 +35,10 @@ Dbtux::Dbtux(const Configuration& conf) :
BLOCK_CONSTRUCTOR(Dbtux); BLOCK_CONSTRUCTOR(Dbtux);
// verify size assumptions (also when release-compiled) // verify size assumptions (also when release-compiled)
ndbrequire( ndbrequire(
(sizeof(DescHead) & 0x3) == 0 &&
(sizeof(DescAttr) & 0x3) == 0 &&
(sizeof(TreeEnt) & 0x3) == 0 && (sizeof(TreeEnt) & 0x3) == 0 &&
(sizeof(TreeNode) & 0x3) == 0 (sizeof(TreeNode) & 0x3) == 0 &&
(sizeof(DescHead) & 0x3) == 0 &&
(sizeof(DescAttr) & 0x3) == 0
); );
/* /*
* DbtuxGen.cpp * DbtuxGen.cpp
......
...@@ -33,11 +33,12 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -33,11 +33,12 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
jam(); jam();
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
TupLoc tupLoc(sig->pageId, sig->pageOffset);
debugOut << "opInfo=" << hex << sig->opInfo; debugOut << "opInfo=" << hex << sig->opInfo;
debugOut << " tableId=" << dec << sig->tableId; debugOut << " tableId=" << dec << sig->tableId;
debugOut << " indexId=" << dec << sig->indexId; debugOut << " indexId=" << dec << sig->indexId;
debugOut << " fragId=" << dec << sig->fragId; debugOut << " fragId=" << dec << sig->fragId;
debugOut << " tupAddr=" << hex << sig->tupAddr; debugOut << " tupLoc=" << tupLoc;
debugOut << " tupVersion=" << dec << sig->tupVersion; debugOut << " tupVersion=" << dec << sig->tupVersion;
debugOut << " -- ignored at ISP=" << dec << c_internalStartPhase; debugOut << " -- ignored at ISP=" << dec << c_internalStartPhase;
debugOut << " TOS=" << dec << c_typeOfStart; debugOut << " TOS=" << dec << c_typeOfStart;
...@@ -74,7 +75,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -74,7 +75,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
Frag& frag = *fragPtr.p; Frag& frag = *fragPtr.p;
// set up index entry // set up index entry
TreeEnt ent; TreeEnt ent;
ent.m_tupAddr = req->tupAddr; ent.m_tupLoc = TupLoc(req->pageId, req->pageOffset);
ent.m_tupVersion = req->tupVersion; ent.m_tupVersion = req->tupVersion;
ent.m_fragBit = fragBit; ent.m_fragBit = fragBit;
// read search key // read search key
...@@ -199,10 +200,10 @@ Dbtux::tupReadAttrs(Signal* signal, const Frag& frag, ReadPar& readPar) ...@@ -199,10 +200,10 @@ Dbtux::tupReadAttrs(Signal* signal, const Frag& frag, ReadPar& readPar)
req->tableId = frag.m_tableId; req->tableId = frag.m_tableId;
req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff); req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
req->fragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; req->fragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
req->tupAddr = ent.m_tupAddr; req->tupAddr = (Uint32)-1;
req->tupVersion = ent.m_tupVersion; req->tupVersion = ent.m_tupVersion;
req->pageId = RNIL; req->pageId = ent.m_tupLoc.m_pageId;
req->pageOffset = 0; req->pageOffset = ent.m_tupLoc.m_pageOffset;
req->bufferId = 0; req->bufferId = 0;
// add count and list of attribute ids // add count and list of attribute ids
Data data = (Uint32*)req + TupReadAttrs::SignalLength; Data data = (Uint32*)req + TupReadAttrs::SignalLength;
...@@ -246,10 +247,10 @@ Dbtux::tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar) ...@@ -246,10 +247,10 @@ Dbtux::tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar)
req->tableId = frag.m_tableId; req->tableId = frag.m_tableId;
req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff); req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
req->fragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; req->fragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
req->tupAddr = ent.m_tupAddr; req->tupAddr = (Uint32)-1;
req->tupVersion = RNIL; // not used req->tupVersion = RNIL; // not used
req->pageId = RNIL; req->pageId = ent.m_tupLoc.m_pageId;
req->pageOffset = 0; req->pageOffset = ent.m_tupLoc.m_pageOffset;
req->bufferId = 0; req->bufferId = 0;
// execute // execute
EXECUTE_DIRECT(DBTUP, GSN_TUP_READ_ATTRS, signal, TupReadAttrs::SignalLength); EXECUTE_DIRECT(DBTUP, GSN_TUP_READ_ATTRS, signal, TupReadAttrs::SignalLength);
......
...@@ -181,10 +181,9 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) ...@@ -181,10 +181,9 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
// define the attribute // define the attribute
DescEnt& descEnt = getDescEnt(indexPtr.p->m_descPage, indexPtr.p->m_descOff); DescEnt& descEnt = getDescEnt(indexPtr.p->m_descPage, indexPtr.p->m_descOff);
DescAttr& descAttr = descEnt.m_descAttr[attrId]; DescAttr& descAttr = descEnt.m_descAttr[attrId];
descAttr.m_attrDesc = req->attrDescriptor;
descAttr.m_primaryAttrId = req->primaryAttrId; descAttr.m_primaryAttrId = req->primaryAttrId;
descAttr.m_typeId = req->extTypeInfo & 0xFF; descAttr.m_typeId = req->extTypeInfo & 0xFF;
descAttr.m_nullable = AttributeDescriptor::getNullable(req->attrDescriptor);
descAttr.pad1 = 0;
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMeta) { if (debugFlags & DebugMeta) {
debugOut << "Add frag " << fragPtr.i << " attr " << attrId << " " << descAttr << endl; debugOut << "Add frag " << fragPtr.i << " attr " << attrId << " " << descAttr << endl;
......
...@@ -406,7 +406,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -406,7 +406,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
const Uint32* const buf32 = static_cast<Uint32*>(keyPar.m_data); const Uint32* const buf32 = static_cast<Uint32*>(keyPar.m_data);
const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32); const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32);
lockReq->hashValue = md5_hash(buf64, keyPar.m_size); lockReq->hashValue = md5_hash(buf64, keyPar.m_size);
lockReq->tupAddr = ent.m_tupAddr; lockReq->tupAddr = getTupAddr(frag, ent);
lockReq->transId1 = scan.m_transId1; lockReq->transId1 = scan.m_transId1;
lockReq->transId2 = scan.m_transId2; lockReq->transId2 = scan.m_transId2;
// execute // execute
...@@ -503,7 +503,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -503,7 +503,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
} }
conf->accOperationPtr = accLockOp; conf->accOperationPtr = accLockOp;
conf->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff); conf->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
conf->localKey[0] = ent.m_tupAddr; conf->localKey[0] = getTupAddr(frag, ent);
conf->localKey[1] = 0; conf->localKey[1] = 0;
conf->localKeyLength = 1; conf->localKeyLength = 1;
unsigned signalLength = 6; unsigned signalLength = 6;
...@@ -1001,10 +1001,10 @@ Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent) ...@@ -1001,10 +1001,10 @@ Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent)
Uint32 tableId = frag.m_tableId; Uint32 tableId = frag.m_tableId;
Uint32 fragBit = ent.m_fragBit; Uint32 fragBit = ent.m_fragBit;
Uint32 fragId = frag.m_fragId | (fragBit << frag.m_fragOff); Uint32 fragId = frag.m_fragId | (fragBit << frag.m_fragOff);
Uint32 tupAddr = ent.m_tupAddr; Uint32 tupAddr = getTupAddr(frag, ent);
Uint32 tupVersion = ent.m_tupVersion; Uint32 tupVersion = ent.m_tupVersion;
/* Check for same tuple twice in row */ /* Check for same tuple twice in row */
if (scan.m_lastEnt.m_tupAddr == tupAddr && if (scan.m_lastEnt.m_tupLoc == ent.m_tupLoc &&
scan.m_lastEnt.m_fragBit == fragBit) { scan.m_lastEnt.m_fragBit == fragBit) {
jam(); jam();
return false; return false;
......
...@@ -34,4 +34,11 @@ optim 5 mc02/a 43 ms 77 ms 77 pct ...@@ -34,4 +34,11 @@ optim 5 mc02/a 43 ms 77 ms 77 pct
optim 6 mc02/a 42 ms 70 ms 66 pct optim 6 mc02/a 42 ms 70 ms 66 pct
mc02/b 53 ms 109 ms 105 pct mc02/b 53 ms 109 ms 105 pct
optim 7 mc02/a 42 ms 69 ms 61 pct
mc02/b 52 ms 106 ms 101 pct
optim 8 mc02/a 42 ms 69 ms 62 pct
mc02/b 54 ms 104 ms 92 pct
vim: set et: vim: set et:
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment