Commit d2c8f1a7 authored by unknown's avatar unknown

Implemented general purpose psuedo columns: row_count & fragment

Implemented new interpreter instruction: exit_ok_last

This two new features combined can be used to make fast select count
 


ndb/include/kernel/AttributeHeader.hpp:
  Psuedo columns fragment & row_count
ndb/include/kernel/GlobalSignalNumbers.h:
  Impl. READ_ROWCOUNT
ndb/include/kernel/signaldata/TupKey.hpp:
  Remove unused pageId pageIndex and replace with lastRow flag
ndb/include/ndbapi/NdbOperation.hpp:
  New instruction for last_row
ndb/src/kernel/blocks/dbacc/Dbacc.hpp:
  REQ_ROWCOUNT
ndb/src/kernel/blocks/dbacc/DbaccInit.cpp:
  REQ_ROWCOUNT
ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
  REQ_ROWCOUNT
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  REQ_ROWCOUNT + last row
ndb/src/kernel/blocks/dblqh/DblqhInit.cpp:
  REQ_ROWCOUNT
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  REQ_ROWCOUNT + last row
ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  Add readers for new psuedo columns
ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  Add readers for new psuedo columns
ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp:
  Add readers for new psuedo columns
ndb/src/ndbapi/NdbOperationDefine.cpp:
  Use exit ok (not ok_last)
ndb/src/ndbapi/NdbOperationExec.cpp:
  Use exit ok (not ok_last)
ndb/src/ndbapi/NdbOperationInt.cpp:
  New instruction
parent 1e91f015
......@@ -30,6 +30,12 @@ class AttributeHeader {
friend class Suma;
public:
/**
* Psuedo columns
*/
STATIC_CONST( FRAGMENT = 0xFFFE );
STATIC_CONST( ROW_COUNT = 0xFFFD );
/** Initialize AttributeHeader at location aHeaderPtr */
static AttributeHeader& init(void* aHeaderPtr, Uint32 anAttributeId,
Uint32 aDataSize);
......
......@@ -23,14 +23,8 @@
*
* When adding a new signal, remember to update MAX_GSN and SignalNames.cpp
*/
const GlobalSignalNumber MAX_GSN = 712;
struct GsnName {
GlobalSignalNumber gsn;
const char * name;
......@@ -898,7 +892,6 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
#define GSN_TUX_MAINT_REF 679
// not used 680
// not used 712
// not used 681
/**
......@@ -951,6 +944,6 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
#define GSN_TUX_BOUND_INFO 710
#define GSN_ACC_LOCKREQ 711
#define GSN_READ_ROWCOUNT_REQ 712
#endif
......@@ -80,7 +80,7 @@ class TupKeyConf {
friend bool printTUPKEYCONF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
public:
STATIC_CONST( SignalLength = 6 );
STATIC_CONST( SignalLength = 5 );
private:
......@@ -88,11 +88,10 @@ private:
* DATA VARIABLES
*/
Uint32 userPtr;
Uint32 pageId;
Uint32 pageIndex;
Uint32 readLength;
Uint32 writeLength;
Uint32 noFiredTriggers;
Uint32 lastRow;
};
class TupKeyRef {
......
......@@ -609,6 +609,20 @@ public:
int interpret_exit_nok(Uint32 ErrorCode);
int interpret_exit_nok();
/**
* Interpreted program instruction:
*
* For scanning transactions,
* return this row, but no more from this fragment
*
* For non-scanning transactions,
* abort the whole transaction.
*
* @return -1 if unsuccessful.
*/
int interpret_exit_last_row();
/**
* Interpreted program instruction:
* Define a subroutine in an interpreted operation.
......
......@@ -926,6 +926,7 @@ private:
void execACC_OVER_REC(Signal* signal);
void execACC_SAVE_PAGES(Signal* signal);
void execNEXTOPERATION(Signal* signal);
void execREAD_ROWCOUNTREQ(Signal* signal);
// Received signals
void execSTTOR(Signal* signal);
......
......@@ -148,6 +148,7 @@ Dbacc::Dbacc(const class Configuration & conf):
addRecSignal(GSN_ACC_OVER_REC, &Dbacc::execACC_OVER_REC);
addRecSignal(GSN_ACC_SAVE_PAGES, &Dbacc::execACC_SAVE_PAGES);
addRecSignal(GSN_NEXTOPERATION, &Dbacc::execNEXTOPERATION);
addRecSignal(GSN_READ_ROWCOUNT_REQ, &Dbacc::execREAD_ROWCOUNTREQ);
// Received signals
addRecSignal(GSN_STTOR, &Dbacc::execSTTOR);
......
......@@ -13384,3 +13384,17 @@ void Dbacc::execSET_VAR_REQ(Signal* signal)
#endif
}//execSET_VAR_REQ()
void
Dbacc::execREAD_ROWCOUNTREQ(Signal* signal){
jamEntry();
fragrecptr.i = signal->theData[0];
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
rootfragrecptr.i = fragrecptr.p->myroot;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
Uint64 tmp = rootfragrecptr.p->noOfElements;
Uint32 * src = (Uint32*)&tmp;
signal->theData[0] = src[0];
signal->theData[1] = src[1];
}
......@@ -575,6 +575,7 @@ public:
Uint8 scanReleaseCounter;
Uint8 scanTcWaiting;
Uint8 scanKeyinfoFlag;
Uint8 m_last_row;
}; // Size 272 bytes
typedef Ptr<ScanRecord> ScanRecordPtr;
......@@ -2097,7 +2098,8 @@ private:
void execSTART_EXEC_SR(Signal* signal);
void execEXEC_SRREQ(Signal* signal);
void execEXEC_SRCONF(Signal* signal);
void execREAD_ROWCOUNTREQ(Signal* signal);
void execDUMP_STATE_ORD(Signal* signal);
void execACC_COM_BLOCK(Signal* signal);
void execACC_COM_UNBLOCK(Signal* signal);
......
......@@ -323,6 +323,8 @@ Dblqh::Dblqh(const class Configuration & conf):
addRecSignal(GSN_TUX_ADD_ATTRCONF, &Dblqh::execTUX_ADD_ATTRCONF);
addRecSignal(GSN_TUX_ADD_ATTRREF, &Dblqh::execTUX_ADD_ATTRREF);
addRecSignal(GSN_READ_ROWCOUNT_REQ, &Dblqh::execREAD_ROWCOUNTREQ);
initData();
#ifdef VM_TRACE
......
......@@ -2511,6 +2511,21 @@ Dblqh::updatePackedList(Signal* signal, HostRecord * ahostptr, Uint16 hostId)
}//if
}//Dblqh::updatePackedList()
void
Dblqh::execREAD_ROWCOUNTREQ(Signal* signal){
jamEntry();
TcConnectionrecPtr regTcPtr;
regTcPtr.i = signal->theData[0];
ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
FragrecordPtr regFragptr;
regFragptr.i = regTcPtr.p->fragmentptr;
ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
signal->theData[0] = regFragptr.p->accFragptr[regTcPtr.p->localFragptr];
EXECUTE_DIRECT(DBACC, GSN_READ_ROWCOUNT_REQ, signal, 1);
}
/* ************>> */
/* TUPKEYCONF > */
/* ************>> */
......@@ -7014,6 +7029,14 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
return;
}//if
if(scanptr.p->m_last_row){
jam();
scanptr.p->scanCompletedStatus = ZTRUE;
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
return;
}
// Update timer on tcConnectRecord
tcConnectptr.p->tcTimer = cLqhTimeOutCount;
......@@ -7959,13 +7982,10 @@ bool Dblqh::keyinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length)
* ------------------------------------------------------------------------- */
void Dblqh::scanTupkeyConfLab(Signal* signal)
{
UintR tdata3;
UintR tdata4;
UintR tdata5;
const TupKeyConf * conf = (TupKeyConf *)signal->getDataPtr();
UintR tdata4 = conf->readLength;
UintR tdata5 = conf->lastRow;
tdata3 = signal->theData[2];
tdata4 = signal->theData[3];
tdata5 = signal->theData[4];
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec;
releaseActiveFrag(signal);
......@@ -7996,15 +8016,15 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->scanOpLength[scanptr.p->scanCompletedOperations] = tdata4;
scanptr.p->scanCompletedOperations++;
if ((scanptr.p->scanCompletedOperations ==
scanptr.p->scanConcurrentOperations) &&
(scanptr.p->scanLockHold == ZTRUE)) {
scanptr.p->m_last_row = conf->lastRow;
const bool done = (scanptr.p->scanCompletedOperations == scanptr.p->scanConcurrentOperations) | conf->lastRow;
if (done && (scanptr.p->scanLockHold == ZTRUE)) {
jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
return;
} else if (scanptr.p->scanCompletedOperations ==
scanptr.p->scanConcurrentOperations) {
} else if (done){
jam();
scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations;
scanReleaseLocksLab(signal);
......@@ -8310,6 +8330,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanLocalFragid = 0;
scanptr.p->scanTcWaiting = ZTRUE;
scanptr.p->scanNumber = ~0;
scanptr.p->m_last_row = 0;
for (Uint32 i = 0; i < scanConcurrentOperations; i++) {
jam();
......
......@@ -622,7 +622,10 @@ struct Operationrec {
Uint32 tcOpIndex;
Uint32 gci;
Uint32 noFiredTriggers;
Uint32 hashValue; // only used in TUP_COMMITREQ
union {
Uint32 hashValue; // only used in TUP_COMMITREQ
Uint32 lastRow;
};
Bitmask<MAXNROFATTRIBUTESINWORDS> changeMask;
};
typedef Ptr<Operationrec> OperationrecPtr;
......@@ -1623,6 +1626,7 @@ private:
//------------------------------------------------------------------
//------------------------------------------------------------------
bool nullFlagCheck(Uint32 attrDes2);
bool readRowcount(Uint32 userPtr, Uint32* outBuffer);
//------------------------------------------------------------------
//------------------------------------------------------------------
......
......@@ -708,7 +708,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
regOperPtr->tupleState = TUPLE_BLOCKED;
regOperPtr->changeMask.clear();
if (Rstoredid != ZNIL) {
ndbrequire(initStoredOperationrec(regOperPtr, Rstoredid) == ZOK);
}//if
......@@ -844,20 +844,18 @@ void Dbtup::sendTUPKEYCONF(Signal* signal,
TupKeyConf * const tupKeyConf = (TupKeyConf *)signal->getDataPtrSend();
Uint32 RuserPointer = regOperPtr->userpointer;
Uint32 RfragPageId = regOperPtr->fragPageId;
Uint32 RpageIndex = regOperPtr->pageIndex;
Uint32 RattroutbufLen = regOperPtr->attroutbufLen;
Uint32 RnoFiredTriggers = regOperPtr->noFiredTriggers;
BlockReference Ruserblockref = regOperPtr->userblockref;
Uint32 lastRow = regOperPtr->lastRow;
regOperPtr->transstate = STARTED;
regOperPtr->tupleState = NO_OTHER_OP;
tupKeyConf->userPtr = RuserPointer;
tupKeyConf->pageId = RfragPageId;
tupKeyConf->pageIndex = RpageIndex;
tupKeyConf->readLength = RattroutbufLen;
tupKeyConf->writeLength = TlogSize;
tupKeyConf->noFiredTriggers = RnoFiredTriggers;
tupKeyConf->lastRow = lastRow;
EXECUTE_DIRECT(refToBlock(Ruserblockref), GSN_TUPKEYCONF, signal,
TupKeyConf::SignalLength);
......@@ -920,6 +918,7 @@ int Dbtup::handleReadReq(Signal* signal,
return -1;
} else {
jam();
regOperPtr->lastRow = 0;
if (interpreterStartLab(signal, pagePtr, Ttupheadoffset) != -1) {
return 0;
}//if
......@@ -1978,12 +1977,19 @@ int Dbtup::interpreterNextLab(Signal* signal,
}
case Interpreter::EXIT_OK:
case Interpreter::EXIT_OK_LAST:
jam();
#ifdef TRACE_INTERPRETER
ndbout_c(" - exit_ok");
#endif
return TdataWritten;
case Interpreter::EXIT_OK_LAST:
jam();
#if 1
ndbout_c(" - exit_ok_last");
#endif
operPtr.p->lastRow = 1;
return TdataWritten;
case Interpreter::EXIT_REFUSE:
jam();
......
......@@ -187,6 +187,14 @@ int Dbtup::readAttributes(Page* const pagePtr,
} else {
return (Uint32)-1;
}//if
} else if(attributeId == AttributeHeader::FRAGMENT){
AttributeHeader::init(&outBuffer[tmpAttrBufIndex], attributeId, 1);
outBuffer[tmpAttrBufIndex+1] = fragptr.p->fragmentId;
tOutBufIndex = tmpAttrBufIndex + 2;
} else if(attributeId == AttributeHeader::ROW_COUNT){
AttributeHeader::init(&outBuffer[tmpAttrBufIndex], attributeId, 2);
readRowcount(operPtr.p->userpointer, outBuffer+tmpAttrBufIndex+1);
tOutBufIndex = tmpAttrBufIndex + 3;
} else {
terrorCode = ZATTRIBUTE_ID_ERROR;
return (Uint32)-1;
......@@ -195,6 +203,7 @@ int Dbtup::readAttributes(Page* const pagePtr,
return tOutBufIndex;
}//Dbtup::readAttributes()
#if 0
int Dbtup::readAttributesWithoutHeader(Page* const pagePtr,
Uint32 tupHeadOffset,
Uint32* inBuffer,
......@@ -247,6 +256,7 @@ int Dbtup::readAttributesWithoutHeader(Page* const pagePtr,
ndbrequire(attrBufIndex == inBufLen);
return tOutBufIndex;
}//Dbtup::readAttributes()
#endif
bool
Dbtup::readFixedSizeTHOneWordNotNULL(Uint32* outBuffer,
......@@ -893,4 +903,13 @@ Dbtup::updateDynSmallVarSize(Uint32* inBuffer,
return false;
}//Dbtup::updateDynSmallVarSize()
bool
Dbtup::readRowcount(Uint32 userPtr, Uint32* outBuffer){
Uint32 tmp[sizeof(SignalHeader)+25];
Signal * signal = (Signal*)&tmp;
signal->theData[0] = userPtr;
EXECUTE_DIRECT(DBLQH, GSN_READ_ROWCOUNT_REQ, signal, 1);
outBuffer[0] = signal->theData[0];
outBuffer[1] = signal->theData[1];
}
......@@ -325,7 +325,7 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue)
if (theStatus == FinalGetValue) {
; // Simply continue with getValue
} else if (theStatus == ExecInterpretedValue) {
if (insertATTRINFO(Interpreter::EXIT_OK_LAST) == -1)
if (insertATTRINFO(Interpreter::EXIT_OK) == -1)
return NULL;
theInterpretedSize = theTotalCurrAI_Len -
(theInitialReadSize + 5);
......@@ -415,7 +415,7 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
// We insert an exit from interpretation since we are now starting
// to set values in the tuple by setValue.
//--------------------------------------------------------------------
if (insertATTRINFO(Interpreter::EXIT_OK_LAST) == -1){
if (insertATTRINFO(Interpreter::EXIT_OK) == -1){
return -1;
}
theInterpretedSize = theTotalCurrAI_Len -
......
......@@ -354,7 +354,7 @@ NdbOperation::prepareSendInterpreted()
Uint32 tTotalCurrAI_Len = theTotalCurrAI_Len;
Uint32 tInitReadSize = theInitialReadSize;
if (theStatus == ExecInterpretedValue) {
if (insertATTRINFO(Interpreter::EXIT_OK_LAST) != -1) {
if (insertATTRINFO(Interpreter::EXIT_OK) != -1) {
//-------------------------------------------------------------------------
// Since we read the total length before inserting the last entry in the
// signals we need to add one to the total length.
......
......@@ -888,6 +888,18 @@ NdbOperation::interpret_exit_ok()
return 0;
}
int
NdbOperation::interpret_exit_last_row()
{
INT_DEBUG(("interpret_exit_last_row"));
if (initial_interpreterCheck() == -1)
return -1;
if (insertATTRINFO(Interpreter::EXIT_OK_LAST) == -1)
return -1;
theErrorLine++;
return 0;
}
/************************************************************************************************
int NdbOperation::interpret_exit_nok(Uint32 ErrorCode)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment