Commit 18ba5696 authored by pekka@mysql.com's avatar pekka@mysql.com

Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-5.0-ndb

into mysql.com:/space/pekka/ndb/version/my50-cc
parents 7549ca56 1bb31494
......@@ -22,6 +22,9 @@
#include <pc.hpp>
#include <SimulatedBlock.hpp>
// primary key is stored in TUP
#include <Dbtup.hpp>
#ifdef DBACC_C
// Debug Macros
#define dbgWord32(ptr, ind, val)
......@@ -661,9 +664,10 @@ struct Fragmentrec {
//-----------------------------------------------------------------------------
// elementLength: Length of element in bucket and overflow pages
// keyLength: Length of key (== 0 if long key or variable key length)
// wl-2066 always Length of key
//-----------------------------------------------------------------------------
Uint8 elementLength;
Uint8 keyLength;
Uint16 keyLength;
//-----------------------------------------------------------------------------
// This flag is used to avoid sending a big number of expand or shrink signals
......@@ -783,6 +787,7 @@ struct Operationrec {
Uint8 dirtyRead;
Uint8 commitDeleteCheckFlag;
Uint8 isAccLockReq;
Uint8 isUndoLogReq;
Uint32 nextOpList;
}; /* p2c: size = 168 bytes */
......@@ -914,6 +919,9 @@ public:
Dbacc(const class Configuration &);
virtual ~Dbacc();
// pointer to TUP instance in this thread
Dbtup* c_tup;
private:
BLOCK_DEFINES(Dbacc);
......@@ -1075,6 +1083,7 @@ private:
void storeLongKeys(Signal* signal);
void storeLongKeysAtPos(Signal* signal);
void reorgLongPage(Signal* signal);
void readTablePk(Uint32 localkey1);
void getElement(Signal* signal);
void searchLongKey(Signal* signal);
void getdirindex(Signal* signal);
......@@ -1562,7 +1571,10 @@ private:
Uint32 cexcPrevpageindex;
Uint32 cexcPrevforward;
Uint32 clocalkey[32];
union {
Uint32 ckeys[2048];
Uint64 ckeys_align;
};
Uint32 c_errorInsert3000_TableId;
Uint32 cSrUndoRecords[5];
......
......@@ -133,7 +133,8 @@ void Dbacc::initRecords()
}//Dbacc::initRecords()
Dbacc::Dbacc(const class Configuration & conf):
SimulatedBlock(DBACC, conf)
SimulatedBlock(DBACC, conf),
c_tup(0)
{
Uint32 log_page_size= 0;
BLOCK_CONSTRUCTOR(Dbacc);
......
......@@ -570,7 +570,14 @@ void Dbacc::execNDB_STTOR(Signal* signal)
void Dbacc::execSTTOR(Signal* signal)
{
jamEntry();
// tstartphase = signal->theData[1];
Uint32 tstartphase = signal->theData[1];
switch (tstartphase) {
case 1:
jam();
c_tup = (Dbtup*)globalData.getBlock(DBTUP);
ndbrequire(c_tup != 0);
break;
}
tuserblockref = signal->theData[3];
csignalkey = signal->theData[6];
sttorrysignalLab(signal);
......@@ -1567,6 +1574,9 @@ void Dbacc::initOpRec(Signal* signal)
// bit to mark lock operation
operationRecPtr.p->isAccLockReq = (Treqinfo >> 31) & 0x1;
// undo log is not run via ACCKEYREQ
operationRecPtr.p->isUndoLogReq = 0;
}//Dbacc::initOpRec()
/* --------------------------------------------------------------------------------- */
......@@ -1807,7 +1817,7 @@ void Dbacc::insertExistElemLab(Signal* signal)
/* --------------------------------------------------------------------------------- */
void Dbacc::insertelementLab(Signal* signal)
{
Uint32 tinsKeyLen;
Uint32 tinsKeyLen; // wl-2066 remove
if (fragrecptr.p->createLcp == ZTRUE) {
if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_OPERATION) {
......@@ -1828,14 +1838,17 @@ void Dbacc::insertelementLab(Signal* signal)
if (fragrecptr.p->keyLength != operationRecPtr.p->tupkeylen) {
ndbrequire(fragrecptr.p->keyLength == 0);
}//if
if (fragrecptr.p->keyLength != 0) {
if (true)
tinsKeyLen = 0;
else if (fragrecptr.p->keyLength != 0) { // wl-2066 remove
ndbrequire(operationRecPtr.p->tupkeylen <= 8);
for (Uint32 i = 0; i < operationRecPtr.p->tupkeylen; i++) {
jam();
ckeys[i] = signal->theData[i + 7];
}//for
tinsKeyLen = operationRecPtr.p->tupkeylen;
} else {
tinsKeyLen = 0;
} else { // wl-2066 remove
jam();
seizePage(signal);
if (tresult > ZLIMIT_OF_ERROR) {
......@@ -2484,6 +2497,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
signal->theData[4] = 1; // fake primKeyLen
signal->theData[5] = req->transId1;
signal->theData[6] = req->transId2;
// enter local key in place of PK
signal->theData[7] = req->tupAddr;
EXECUTE_DIRECT(DBACC, GSN_ACCKEYREQ, signal, 8);
// translate the result
......@@ -2861,6 +2875,8 @@ void Dbacc::insertContainer(Signal* signal)
idrPageptr.p->word32[tidrIndex] = clocalkey[tidrInputIndex]; /* INSERTS LOCALKEY */
tidrIndex += tidrForward;
}//for
ndbrequire(tidrKeyLen == 0);
#if 0 // wl-2066 remove
guard26 = tidrKeyLen - 1;
arrGuard(guard26, 8);
for (tidrInputIndex = 0; tidrInputIndex <= guard26; tidrInputIndex++) {
......@@ -2869,6 +2885,7 @@ void Dbacc::insertContainer(Signal* signal)
idrPageptr.p->word32[tidrIndex] = ckeys[tidrInputIndex]; /* INSERTS TUPLE KEY */
tidrIndex += tidrForward;
}//for
#endif
tidrContLen = idrPageptr.p->word32[tidrContainerptr] << 6;
tidrContLen = tidrContLen >> 6;
dbgWord32(idrPageptr, tidrContainerptr, (tidrContainerlen << 26) | tidrContLen);
......@@ -4423,7 +4440,7 @@ void Dbacc::printoutInfoAndShutdown(LongKeyPage *page) {
/* --------------------------------------------------------------------------------- */
/* --------------------------------------------------------------------------------- */
/* */
/* MODULE: READ */
/* MODULE: GET_ELEMENT */
/* THE FOLLOWING SUBROUTINES ARE ONLY USED BY GET_ELEMENT AND */
/* GETDIRINDEX. THIS ROUTINE IS THE SOLE INTERFACE TO GET ELEMENTS */
/* FROM THE INDEX. CURRENT USERS ARE ALL REQUESTS AND EXECUTE UNDO LOG */
......@@ -4484,6 +4501,20 @@ void Dbacc::getdirindex(Signal* signal)
ptrCheckGuard(gdiPageptr, cpagesize, page8);
}//Dbacc::getdirindex()
void
Dbacc::readTablePk(Uint32 localkey1)
{
Uint32 tableId = fragrecptr.p->myTableId;
Uint32 fragId = fragrecptr.p->myfid;
Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS;
Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1);
#ifdef VM_TRACE
memset(ckeys, 0x1f, fragrecptr.p->keyLength << 2);
#endif
int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys);
ndbrequire(ret == fragrecptr.p->keyLength);
}
/* --------------------------------------------------------------------------------- */
/* GET_ELEMENT */
/* INPUT: */
......@@ -4534,19 +4565,16 @@ void Dbacc::getElement(Signal* signal)
gePageptr = gdiPageptr;
tgeResult = ZFALSE;
tgeCompareLen = fragrecptr.p->keyLength;
const Uint32 isAccLockReq = operationRecPtr.p->isAccLockReq;
if (isAccLockReq) {
jam();
tgeCompareLen = 0;
}
const Uint32 isAccLockReq = operationRecPtr.p->isAccLockReq; // wl-2066 remove
/*
* The value seached is
* - table key for ACCKEYREQ, stored in TUP
* - local key (1 word) for ACC_LOCKREQ and UNDO, stored in ACC
*/
const bool compareLocalKey =
operationRecPtr.p->isAccLockReq || operationRecPtr.p->isUndoLogReq;
// We can handle keylength up to 8, but not more (0 means dynamic)
if (tgeCompareLen >= 9) {
ACCKEY_error(2); return;
}//if
if (TelemLen < 3) {
ACCKEY_error(3); return;
}//if
ndbrequire(TelemLen == ZELEM_HEAD_SIZE + fragrecptr.p->localkeylen);
tgeNextptrtype = ZLEFT;
tgeLocked = 0;
......@@ -4577,7 +4605,7 @@ void Dbacc::getElement(Signal* signal)
} else {
ACCKEY_error(6); return;
}//if
if (tgeRemLen >= TelemLen) {
if (tgeRemLen >= ZCON_HEAD_SIZE + TelemLen) {
if (tgeRemLen > ZBUF_SIZE) {
ACCKEY_error(7); return;
}//if
......@@ -4585,7 +4613,45 @@ void Dbacc::getElement(Signal* signal)
// There is at least one element in this container. Check if it is the element
// searched for.
/* --------------------------------------------------------------------------------- */
if (tgeCompareLen != 0) {
if (true) {
do {
tgeElementHeader = gePageptr.p->word32[tgeElementptr];
tgeRemLen = tgeRemLen - TelemLen;
Uint32 hashValuePart;
if (ElementHeader::getLocked(tgeElementHeader)) {
jam();
geTmpOperationRecPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
ptrCheckGuard(geTmpOperationRecPtr, coprecsize, operationrec);
hashValuePart = geTmpOperationRecPtr.p->hashvaluePart;
} else {
jam();
hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader);
}
if (hashValuePart == opHashValuePart) {
jam();
Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward];
Uint32 localkey2 = 0;
bool found;
if (! compareLocalKey) {
readTablePk(localkey1);
found = (memcmp(Tkeydata, ckeys, fragrecptr.p->keyLength) == 0);
} else {
found = (localkey1 == Tkeydata[0]);
}
if (found) {
tgeLocked = ElementHeader::getLocked(tgeElementHeader);
tgeResult = ZTRUE;
operationRecPtr.p->localdata[0] = localkey1;
operationRecPtr.p->localdata[1] = localkey2;
return;
}
}
if (tgeRemLen <= ZCON_HEAD_SIZE) {
break;
}
tgeElementptr = tgeElementptr + tgeElemStep;
} while (true);
} else if (tgeCompareLen != 0) { // wl-2066 remove
/* --------------------------------------------------------------------------------- */
/* THIS PART IS USED TO SEARCH FOR KEYS WITH FIXED SIZE. THE LOOP TAKES CARE */
/* OF SEARCHING THROUGH ALL ELEMENTS IN ONE CONTAINER. */
......@@ -4629,7 +4695,7 @@ void Dbacc::getElement(Signal* signal)
tgeKeyptr = tgeKeyptr + tgeElemStep;
tgeElementptr = tgeElementptr + tgeElemStep;
} while (1);
} else if (! isAccLockReq) {
} else if (! isAccLockReq) { // wl-2066 remove
jam();
/* --------------------------------------------------------------------------------- */
/* THIS PART IS USED TO SEARCH FOR KEYS WITH VARIABLE LENGTH OR FIXED LENGTH */
......@@ -4696,7 +4762,7 @@ void Dbacc::getElement(Signal* signal)
tgeKeyptr = tgeKeyptr + tgeElemStep;
tgeElementptr = tgeElementptr + tgeElemStep;
} while (1);
} else {
} else { // wl-2066 remove
jam();
/* --------------------------------------------------------------------------------- */
/* Search for local key in a lock request */
......@@ -4855,7 +4921,7 @@ void Dbacc::commitdelete(Signal* signal, bool systemRestart)
EXECUTE_DIRECT(DBTUP, GSN_TUP_DEALLOCREQ, signal, 4);
jamEntry();
}//if
if (fragrecptr.p->keyLength == 0) {
if (fragrecptr.p->keyLength == 0) { // wl-2066 remove
jam();
tdlkLogicalPageIndex = operationRecPtr.p->longKeyPageIndex;
dlkPageptr.i = operationRecPtr.p->longPagePtr;
......@@ -6607,14 +6673,17 @@ void Dbacc::expandcontainer(Signal* signal)
Uint32 texcHashvalue;
Uint32 texcTmp;
Uint32 texcIndex;
Uint32 texpKeyLen;
Uint32 texpKeyLen; // wl-2066 remove
Uint32 guard20;
texpKeyLen = 0;
#if 0 // wl-2066 remove
texpKeyLen = fragrecptr.p->keyLength;
if (texpKeyLen == 0) {
jam();
texpKeyLen = ZACTIVE_LONG_KEY_LEN;
}//if
#endif
cexcPrevpageptr = RNIL;
cexcPrevconptr = 0;
cexcForward = ZTRUE;
......@@ -6701,6 +6770,7 @@ void Dbacc::expandcontainer(Signal* signal)
clocalkey[texcIndex] = excPageptr.p->word32[texcTmp];
texcTmp = texcTmp + cexcForward;
}//for
#if 0 // wl-2066 remove
guard20 = texpKeyLen - 1;
for (texcIndex = 0; texcIndex <= guard20; texcIndex++) {
arrGuard(texcIndex, 2048);
......@@ -6708,6 +6778,7 @@ void Dbacc::expandcontainer(Signal* signal)
ckeys[texcIndex] = excPageptr.p->word32[texcTmp];
texcTmp = texcTmp + cexcForward;
}//for
#endif
tidrPageindex = fragrecptr.p->expReceiveIndex;
idrPageptr.i = fragrecptr.p->expReceivePageptr;
ptrCheckGuard(idrPageptr, cpagesize, page8);
......@@ -6794,12 +6865,14 @@ void Dbacc::expandcontainer(Signal* signal)
clocalkey[texcIndex] = lastPageptr.p->word32[texcTmp];
texcTmp = texcTmp + tlastForward;
}//for
#if 0 // wl-2066 remove
for (texcIndex = 0; texcIndex < texpKeyLen; texcIndex++) {
arrGuard(texcIndex, 2048);
arrGuard(texcTmp, 2048);
ckeys[texcIndex] = lastPageptr.p->word32[texcTmp];
texcTmp = texcTmp + tlastForward;
}//for
#endif
tidrPageindex = fragrecptr.p->expReceiveIndex;
idrPageptr.i = fragrecptr.p->expReceivePageptr;
ptrCheckGuard(idrPageptr, cpagesize, page8);
......@@ -7285,17 +7358,19 @@ void Dbacc::shrinkcontainer(Signal* signal)
Uint32 tshrElementptr;
Uint32 tshrRemLen;
Uint32 tshrInc;
Uint32 tshrKeyLen;
Uint32 tshrKeyLen = 0; // wl-2066 remove
Uint32 tshrTmp;
Uint32 tshrIndex;
Uint32 guard21;
tshrRemLen = cexcContainerlen - ZCON_HEAD_SIZE;
#if 0 // wl-2066 remove
tshrKeyLen = fragrecptr.p->keyLength;
if (tshrKeyLen == 0) {
jam();
tshrKeyLen = ZACTIVE_LONG_KEY_LEN;
}//if
#endif
tshrInc = (ZELEM_HEAD_SIZE + tshrKeyLen) + fragrecptr.p->localkeylen;
if (cexcForward == ZTRUE) {
jam();
......@@ -7345,6 +7420,7 @@ void Dbacc::shrinkcontainer(Signal* signal)
clocalkey[tshrIndex] = excPageptr.p->word32[tshrTmp];
tshrTmp = tshrTmp + cexcForward;
}//for
#if 0 // wl-2066 remove
guard21 = tshrKeyLen - 1;
for (tshrIndex = 0; tshrIndex <= guard21; tshrIndex++) {
arrGuard(tshrIndex, 2048);
......@@ -7352,6 +7428,7 @@ void Dbacc::shrinkcontainer(Signal* signal)
ckeys[tshrIndex] = excPageptr.p->word32[tshrTmp];
tshrTmp = tshrTmp + cexcForward;
}//for
#endif
tidrPageindex = fragrecptr.p->expReceiveIndex;
idrPageptr.i = fragrecptr.p->expReceivePageptr;
ptrCheckGuard(idrPageptr, cpagesize, page8);
......@@ -9043,15 +9120,27 @@ void Dbacc::writeUndoOpInfo(Signal* signal)
undopageptr.p->undoword[tundoindex + 1] = operationRecPtr.p->hashValue;
undopageptr.p->undoword[tundoindex + 2] = operationRecPtr.p->tupkeylen;
tundoindex = tundoindex + 3;
if (fragrecptr.p->keyLength != 0) {
if (true) {
// log localkey1
jam();
locPageptr.i = operationRecPtr.p->elementPage;
ptrCheckGuard(locPageptr, cpagesize, page8);
Uint32 Tforward = operationRecPtr.p->elementIsforward;
Uint32 TelemPtr = operationRecPtr.p->elementPointer;
TelemPtr += Tforward; // ZELEM_HEAD_SIZE
arrGuard(tundoindex+1, 8192);
undopageptr.p->undoword[tundoindex] = locPageptr.p->word32[TelemPtr];
tundoindex++;
cundoinfolength = ZOP_HEAD_INFO_LN + 1;
} else if (fragrecptr.p->keyLength != 0) { // wl-2066 remove
// Fixed size keys
jam();
locPageptr.i = operationRecPtr.p->elementPage;
ptrCheckGuard(locPageptr, cpagesize, page8);
Uint32 Tforward = operationRecPtr.p->elementIsforward;
Uint32 TelemPtr = operationRecPtr.p->elementPointer;
TelemPtr += Tforward;
TelemPtr += Tforward;
TelemPtr += Tforward; // ZELEM_HEAD_SIZE
TelemPtr += Tforward; // localkey1
//---------------------------------------------------------------------------------
// Now the pointer is at the start of the key part of the element. Now copy from there
// to the UNDO log.
......@@ -9067,7 +9156,7 @@ void Dbacc::writeUndoOpInfo(Signal* signal)
TelemPtr += Tforward;
}//for
cundoinfolength = ZOP_HEAD_INFO_LN + operationRecPtr.p->tupkeylen;
} else {
} else { // wl-2066 remove
// Long keys
jam();
......@@ -9300,10 +9389,13 @@ void Dbacc::initFragAdd(Signal* signal,
regFragPtr.p->dirsize = 1;
regFragPtr.p->loadingFlag = ZFALSE;
regFragPtr.p->keyLength = req->keyLength;
if (req->keyLength == 0) {
ndbrequire(req->keyLength != 0);
if (true) {
regFragPtr.p->elementLength = ZELEM_HEAD_SIZE + regFragPtr.p->localkeylen;
} else if (req->keyLength == 0) { // wl-2066 remove
jam();
regFragPtr.p->elementLength = (1 + ZELEM_HEAD_SIZE) + regFragPtr.p->localkeylen;
} else {
} else { // wl-2066 remove
jam();
regFragPtr.p->elementLength = (ZELEM_HEAD_SIZE + regFragPtr.p->localkeylen) + regFragPtr.p->keyLength;
}//if
......@@ -10362,6 +10454,7 @@ void Dbacc::srDoUndoLab(Signal* signal)
operationRecPtr.p->longKeyPageIndex = RNIL;
operationRecPtr.p->scanRecPtr = RNIL;
operationRecPtr.p->isAccLockReq = ZFALSE;
operationRecPtr.p->isUndoLogReq = ZTRUE;
// Read operation values from undo page
operationRecPtr.p->operation = undopageptr.p->undoword[tmpindex];
......@@ -10373,15 +10466,19 @@ void Dbacc::srDoUndoLab(Signal* signal)
operationRecPtr.p->tupkeylen = tkeylen;
operationRecPtr.p->fragptr = fragrecptr.i;
ndbrequire((fragrecptr.p->keyLength == 0) ||
ndbrequire((fragrecptr.p->keyLength == 0) || // wl-2066 change
((fragrecptr.p->keyLength != 0) &&
(fragrecptr.p->keyLength == tkeylen)));
// Read keydata from undo page
// Read localkey1 from undo page
signal->theData[7 + 0] = undopageptr.p->undoword[tmpindex];
tmpindex = tmpindex + 1;
#if 0 // wl-2066 remove
for (Uint32 tmp = 0; tmp < tkeylen; tmp++) {
signal->theData[7+tmp] = undopageptr.p->undoword[tmpindex];
tmpindex = tmpindex + 1;
}//for
#endif
arrGuard((tmpindex - 1), 8192);
getElement(signal);
if (tgeResult != ZTRUE) {
......@@ -11456,6 +11553,7 @@ void Dbacc::initScanOpRec(Signal* signal)
operationRecPtr.p->elementPointer = tisoElementptr;
operationRecPtr.p->elementPage = isoPageptr.i;
operationRecPtr.p->isAccLockReq = ZFALSE;
operationRecPtr.p->isUndoLogReq = ZFALSE;
tisoLocalPtr = tisoElementptr + tisoIsforward;
guard24 = fragrecptr.p->localkeylen - 1;
for (tisoTmp = 0; tisoTmp <= guard24; tisoTmp++) {
......@@ -11466,7 +11564,10 @@ void Dbacc::initScanOpRec(Signal* signal)
}//for
arrGuard(tisoLocalPtr, 2048);
operationRecPtr.p->keydata[0] = isoPageptr.p->word32[tisoLocalPtr];
if (fragrecptr.p->keyLength != 0) {
if (true) {
jam();
operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength;
} else if (fragrecptr.p->keyLength != 0) { // wl-2066 remove
jam();
operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength;
guard24 = fragrecptr.p->keyLength - 1;
......@@ -11476,7 +11577,7 @@ void Dbacc::initScanOpRec(Signal* signal)
operationRecPtr.p->keydata[tisoTmp] = isoPageptr.p->word32[tisoLocalPtr];
tisoLocalPtr = tisoLocalPtr + tisoIsforward;
}//for
} else {
} else { // wl-2066 remove
// Long key handling. Put the long key reference in the operation records.
tisoPageIndex = operationRecPtr.p->keydata[0] & 0x3ff;
arrGuard(ZWORDS_IN_PAGE - tisoPageIndex, 2048);
......@@ -11694,18 +11795,20 @@ void Dbacc::releaseScanContainer(Signal* signal)
Uint32 trscElemlens;
Uint32 trscElemlen;
if (trscContainerlen < 5) {
if (trscContainerlen < 4) { // wl-2066 do it like in getElement
if (trscContainerlen != ZCON_HEAD_SIZE) {
jam();
sendSystemerror(signal);
}//if
return; /* 3 IS THE MINIMUM SIZE OF THE ELEMENT */
return; /* 2 IS THE MINIMUM SIZE OF THE ELEMENT */
}//if
trscElemlens = trscContainerlen - 2;
if (fragrecptr.p->keyLength != 0) {
trscElemlens = trscContainerlen - 2; // wl-2066 use ZCON_HEAD_SIZE
if (true) { // wl-2066 use fragrecptr.p->elementLength
trscElemlen = (1 + 0) + fragrecptr.p->localkeylen;
} else if (fragrecptr.p->keyLength != 0) { // wl-2066 remove
jam();
trscElemlen = (1 + fragrecptr.p->keyLength) + fragrecptr.p->localkeylen; /* LENGTH OF THE ELEMENT */
} else {
} else { // wl-2066 remove
jam();
trscElemlen = (1 + ZACTIVE_LONG_KEY_LEN) + fragrecptr.p->localkeylen; /* LENGTH OF THE ELEMENT */
}//if
......@@ -11735,7 +11838,7 @@ void Dbacc::releaseScanContainer(Signal* signal)
}//if
trscElemlens = trscElemlens - trscElemlen;
trscElementptr = trscElementptr + trscElemStep;
} while (trscElemlens > 2);
} while (trscElemlens > 1);
if (trscElemlens != 0) {
jam();
sendSystemerror(signal);
......@@ -11794,15 +11897,17 @@ bool Dbacc::searchScanContainer(Signal* signal)
Uint32 tsscElemlen;
Uint32 tsscElemStep;
if (tsscContainerlen < 5) {
if (tsscContainerlen < 4) { // wl-2066 check exact size
jam();
return false; /* 3 IS THE MINIMUM SIZE OF THE ELEMENT */
return false; /* 2 IS THE MINIMUM SIZE OF THE ELEMENT */
}//if
tsscElemlens = tsscContainerlen - ZCON_HEAD_SIZE;
if (fragrecptr.p->keyLength == 0) {
if (true) { // wl-2066 use fragrecptr.p->elementLength
tsscElemlen = (ZELEM_HEAD_SIZE + 0) + fragrecptr.p->localkeylen;
} else if (fragrecptr.p->keyLength == 0) { // wl-2066 remove
jam();
tsscElemlen = (ZELEM_HEAD_SIZE + ZACTIVE_LONG_KEY_LEN) + fragrecptr.p->localkeylen;
} else {
} else { // wl-2066 remove
jam();
/* LENGTH OF THE ELEMENT */
tsscElemlen = (ZELEM_HEAD_SIZE + fragrecptr.p->keyLength) + fragrecptr.p->localkeylen;
......@@ -11844,7 +11949,7 @@ bool Dbacc::searchScanContainer(Signal* signal)
/* THE ELEMENT IS ALREADY SENT. */
/* SEARCH FOR NEXT ONE */
tsscElemlens = tsscElemlens - tsscElemlen;
if (tsscElemlens > 2) {
if (tsscElemlens > 1) { // wl-2066 loop as in getElement
jam();
tsscElementptr = tsscElementptr + tsscElemStep;
goto SCANELEMENTLOOP001;
......@@ -11877,7 +11982,8 @@ void Dbacc::sendNextScanConf(Signal* signal)
fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
if (fragrecptr.p->keyLength != 0) {
readTablePk(operationRecPtr.p->localdata[0]);
if (fragrecptr.p->keyLength != 0) { // wl-2066 remove if
jam();
signal->theData[0] = scanPtr.p->scanUserptr;
signal->theData[1] = operationRecPtr.i;
......@@ -11886,26 +11992,28 @@ void Dbacc::sendNextScanConf(Signal* signal)
signal->theData[4] = operationRecPtr.p->localdata[1];
signal->theData[5] = fragrecptr.p->localkeylen;
signal->theData[6] = fragrecptr.p->keyLength;
signal->theData[7] = operationRecPtr.p->keydata[0];
signal->theData[8] = operationRecPtr.p->keydata[1];
signal->theData[9] = operationRecPtr.p->keydata[2];
signal->theData[10] = operationRecPtr.p->keydata[3];
signal->theData[7] = ckeys[0];
signal->theData[8] = ckeys[1];
signal->theData[9] = ckeys[2];
signal->theData[10] = ckeys[3];
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 11);
if (fragrecptr.p->keyLength > ZKEYINKEYREQ) {
const Uint32 keyLength = fragrecptr.p->keyLength;
Uint32 total = 4;
while (total < keyLength) {
jam();
/* = 4 */
Uint32 length = keyLength - total;
if (length > 20)
length = 20;
signal->theData[0] = scanPtr.p->scanUserptr;
signal->theData[1] = operationRecPtr.i;
signal->theData[2] = operationRecPtr.p->fid;
signal->theData[3] = fragrecptr.p->keyLength - ZKEYINKEYREQ;
signal->theData[4] = operationRecPtr.p->keydata[4];
signal->theData[5] = operationRecPtr.p->keydata[5];
signal->theData[6] = operationRecPtr.p->keydata[6];
signal->theData[7] = operationRecPtr.p->keydata[7];
EXECUTE_DIRECT(blockNo, GSN_ACC_SCAN_INFO, signal, 8);
return;
}//if
} else {
signal->theData[1] = operationRecPtr.i; // not used by LQH
signal->theData[2] = operationRecPtr.p->fid; // not used by LQH
signal->theData[3] = length;
memcpy(&signal->theData[4], &ckeys[total], length << 2);
EXECUTE_DIRECT(blockNo, GSN_ACC_SCAN_INFO24, signal, 4 + length);
// wl-2066 remove GSN_ACC_SCAN_INFO
total += length;
}//if
} else { // wl-2066 remove
jam();
sendScaninfo(signal);
return;
......
......@@ -3,6 +3,8 @@ noinst_LIBRARIES = libdbacc.a
libdbacc_a_SOURCES = DbaccInit.cpp DbaccMain.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/src/kernel/blocks/dbtup
include $(top_srcdir)/ndb/config/common.mk.am
include $(top_srcdir)/ndb/config/type_kernel.mk.am
......
......@@ -4103,7 +4103,7 @@ Dbdict::execADD_FRAGREQ(Signal* signal) {
req->noOfPagesToPreAllocate = 0;
req->schemaVersion = tabPtr.p->tableVersion;
Uint32 keyLen = tabPtr.p->tupKeyLength;
req->keyLength = keyLen > 8 ? 0 : keyLen; // Put this into ACC instead
req->keyLength = keyLen; // wl-2066 no more "long keys"
req->nextLCP = lcpNo;
req->noOfKeyAttr = tabPtr.p->noOfPrimkey;
......
......@@ -1020,6 +1020,13 @@ public:
*/
int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut);
/*
* ACC reads primary key without headers into an array of words. At
* this point in ACC deconstruction, ACC still uses logical references
* to fragment and tuple.
*/
int accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut);
/*
* TUX checks if tuple is visible to scan.
*/
......
......@@ -200,8 +200,7 @@ Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* data
operPtr.i = RNIL;
operPtr.p = NULL;
// do it
int ret = readAttributes(pagePtr.p, pageOffset, attrIds,
numAttrs, dataOut, ZNIL, true);
int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true);
// restore globals
tabptr = tabptr_old;
fragptr = fragptr_old;
......@@ -229,6 +228,27 @@ Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* data
return ret;
}
int
Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut)
{
ljamEntry();
// get table
TablerecPtr tablePtr;
tablePtr.i = tableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
// get fragment
FragrecordPtr fragPtr;
getFragmentrec(fragPtr, fragId, tablePtr.p);
// get real page id and tuple offset
PagePtr pagePtr;
Uint32 pageId = getRealpid(fragPtr.p, fragPageId);
ndbrequire((pageIndex & 0x1) == 0);
Uint32 pageOffset = ZPAGE_HEADER_SIZE + (pageIndex >> 1) * tablePtr.p->tupheadsize;
// use TUX routine - optimize later
int ret = tuxReadPk(fragPtr.i, pageId, pageOffset, dataOut);
return ret;
}
bool
Dbtup::tuxQueryTh(Uint32 fragPtrI, Uint32 tupAddr, Uint32 tupVersion, Uint32 transId1, Uint32 transId2, Uint32 savePointId)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment