Commit fd566261 authored by unknown's avatar unknown

First step for WL 2025

Not yet fully working
Scan reads work fine, not scan updates


ndb/include/kernel/ndb_limits.h:
  Introducing a new parameter plus increasing the max no of parallel
  operations per scan in LQH, first step in WL 2025
ndb/include/kernel/signaldata/ScanFrag.hpp:
  Only need one clientOpPtr
  Concurrency is batch_size to use in this scan
  batch_byte_size is max no of bytes sent in a batch
  first_batch_size is the batch size in the first batch
ndb/include/kernel/signaldata/ScanTab.hpp:
  apiOperationPtr is sent as long signal data
  batch_byte_size and first_batch_size is needed for further transport
  to LQH
  batch size can now be bigger than before
ndb/include/kernel/signaldata/TcKeyReq.hpp:
  More concurrency means more size for scanInfo also in TCKEYREQ
ndb/include/ndbapi/NdbReceiver.hpp:
  New subroutine to caclculate batch size and similar parameters
ndb/include/ndbapi/NdbScanOperation.hpp:
  batch size calculated before sending, not necessary to store anymore
ndb/src/common/debugger/signaldata/ScanTab.cpp:
  Updated signal printer for SCAN_TABREQ
ndb/src/kernel/blocks/backup/Backup.cpp:
  Fixes to make it compile, not fixed for BACKUP being useful yet
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  Removed parameters no longer needed and added some new ones.
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Fix for cmaxAccOps that was using the wrong constant
  Removed old code
  New SCAN_FRAGREQ signal
ndb/src/kernel/blocks/dbtc/Dbtc.hpp:
  New variables
  Removed dead code
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  New SCAN_TABREQ, SCAN_FRAGREQ, SCAN_FRAGCONF and SCAN_TABCONF
  Fixed some error handling to be more efficient
ndb/src/kernel/blocks/suma/Suma.cpp:
  Fixes to make it compile, not yet usable for SUMA features
ndb/src/kernel/vm/Configuration.cpp:
  Fix for wrong constant
ndb/src/ndbapi/NdbApiSignal.cpp:
  Fix for not using constants
ndb/src/ndbapi/NdbApiSignal.hpp:
  Added possibility to get signal sending node from signal
ndb/src/ndbapi/NdbConnectionScan.cpp:
  Moved declaration
ndb/src/ndbapi/NdbReceiver.cpp:
  New routine to calculate batch_size etc.
ndb/src/ndbapi/NdbScanOperation.cpp:
  Various fixes for sending SCAN_TABREQ and other stuff
parent 01e1451a
......@@ -68,7 +68,22 @@
* API can order a multiple of this number of records at a time since
* fragments can be scanned in parallel.
*/
#define MAX_PARALLEL_OP_PER_SCAN 16
#define MAX_PARALLEL_OP_PER_SCAN 64
/*
* When calculating the number of records sent from LQH in each batch
* one uses SCAN_BATCH_SIZE divided by the expected size of signals
* per row. This gives the batch size used for the scan. The NDB API
* will receive one batch from each node at a time so there has to be
* some care taken also so that the NDB API is not overloaded with
* signals.
*/
#define SCAN_BATCH_SIZE 32768
/*
* To protect the NDB API from overload we also define a maximum total
* batch size from all nodes. This parameter should most likely be
* configurable, or dependent on sendBufferSize.
*/
#define MAX_SCAN_BATCH_SIZE 196608
/*
* Maximum number of Parallel Scan queries on one hash index fragment
*/
......
......@@ -33,7 +33,7 @@ class ScanFragReq {
*/
friend class Dblqh;
public:
STATIC_CONST( SignalLength = 25 );
STATIC_CONST( SignalLength = 13 );
public:
Uint32 senderData;
......@@ -45,9 +45,11 @@ public:
Uint32 schemaVersion;
Uint32 transId1;
Uint32 transId2;
Uint32 clientOpPtr[MAX_PARALLEL_OP_PER_SCAN];
Uint32 clientOpPtr;
Uint32 concurrency;
Uint32 batch_byte_size;
Uint32 first_batch_size;
static Uint32 getConcurrency(const Uint32 & requestInfo);
static Uint32 getLockMode(const Uint32 & requestInfo);
static Uint32 getHoldLockFlag(const Uint32 & requestInfo);
static Uint32 getKeyinfoFlag(const Uint32 & requestInfo);
......@@ -56,7 +58,6 @@ public:
static Uint32 getAttrLen(const Uint32 & requestInfo);
static Uint32 getScanPrio(const Uint32 & requestInfo);
static void setConcurrency(Uint32 & requestInfo, Uint32 concurrency);
static void setLockMode(Uint32 & requestInfo, Uint32 lockMode);
static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock);
static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo);
......@@ -79,7 +80,6 @@ class KeyInfo20 {
friend class NdbOperation;
friend class NdbScanReceiver;
public:
//STATIC_CONST( SignalLength = 21 );
STATIC_CONST( HeaderLength = 5);
STATIC_CONST( DataLength = 20 );
......@@ -110,15 +110,15 @@ class ScanFragConf {
friend class Backup;
friend class Suma;
public:
STATIC_CONST( SignalLength = 21 );
STATIC_CONST( SignalLength = 6 );
public:
Uint32 senderData;
Uint32 completedOps;
Uint32 fragmentCompleted;
Uint32 opReturnDataLen[16];
Uint32 transId1;
Uint32 transId2;
Uint32 total_len;
};
class ScanFragRef {
......@@ -188,7 +188,6 @@ public:
* Request Info
*
* a = Length of attrinfo - 16 Bits (16-31)
* c = Concurrency - 5 Bits (0-4) -> Max 31
* l = Lock Mode - 1 Bit 5
* h = Hold lock - 1 Bit 7
* k = Keyinfo - 1 Bit 8
......@@ -198,11 +197,8 @@ public:
*
* 1111111111222222222233
* 01234567890123456789012345678901
* ccccclxhkr ppppaaaaaaaaaaaaaaaa
* lxhkr ppppaaaaaaaaaaaaaaaa
*/
#define SF_CONCURRENCY_SHIFT (0)
#define SF_CONCURRENCY_MASK (31)
#define SF_LOCK_MODE_SHIFT (5)
#define SF_LOCK_MODE_MASK (1)
......@@ -217,12 +213,6 @@ public:
#define SF_PRIO_SHIFT 12
#define SF_PRIO_MASK 15
inline
Uint32
ScanFragReq::getConcurrency(const Uint32 & requestInfo){
return (requestInfo >> SF_CONCURRENCY_SHIFT) & SF_CONCURRENCY_MASK;
}
inline
Uint32
ScanFragReq::getLockMode(const Uint32 & requestInfo){
......@@ -272,13 +262,6 @@ ScanFragReq::setScanPrio(UintR & requestInfo, UintR val){
requestInfo |= (val << SF_PRIO_SHIFT);
}
inline
void
ScanFragReq::setConcurrency(UintR & requestInfo, UintR val){
ASSERT_MAX(val, SF_CONCURRENCY_MASK, "ScanFragReq::setConcurrency");
requestInfo |= (val << SF_CONCURRENCY_SHIFT);
}
inline
void
ScanFragReq::setLockMode(UintR & requestInfo, UintR val){
......@@ -324,7 +307,7 @@ ScanFragReq::setAttrLen(UintR & requestInfo, UintR val){
inline
Uint32
KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){
ASSERT_MAX(opNo, 15, "KeyInfo20::setScanInfo");
ASSERT_MAX(opNo, 1023, "KeyInfo20::setScanInfo");
ASSERT_MAX(scanNo, 255, "KeyInfo20::setScanInfo");
return (opNo << 8) + scanNo;
}
......@@ -338,7 +321,7 @@ KeyInfo20::getScanNo(Uint32 scanInfo){
inline
Uint32
KeyInfo20::getScanOp(Uint32 scanInfo){
return (scanInfo >> 8) & 0xF;
return (scanInfo >> 8) & 0x1023;
}
#endif
......@@ -45,7 +45,7 @@ public:
/**
* Length of signal
*/
STATIC_CONST( SignalLength = 25 );
STATIC_CONST( StaticLength = 11 );
private:
......@@ -63,7 +63,8 @@ private:
UintR transId1; // DATA 6
UintR transId2; // DATA 7
UintR buddyConPtr; // DATA 8
UintR apiOperationPtr[16]; // DATA 9-25
UintR batch_byte_size; // DATA 9
UintR first_batch_size; // DATA 10
/**
* Get:ers for requestInfo
......@@ -95,11 +96,11 @@ private:
h = Hold lock mode - 1 Bit 10
c = Read Committed - 1 Bit 11
x = Range Scan (TUX) - 1 Bit 15
b = Scan batch - 5 Bit 16-19 (max 15)
b = Scan batch - 10 Bit 16-25 (max 1023)
1111111111222222222233
01234567890123456789012345678901
ppppppppl hc xbbbbb
ppppppppl hc xbbbbbbbbbb
*/
#define PARALLELL_SHIFT (0)
......@@ -118,7 +119,7 @@ private:
#define RANGE_SCAN_MASK (1)
#define SCAN_BATCH_SHIFT (16)
#define SCAN_BATCH_MASK (31)
#define SCAN_BATCH_MASK (1023)
inline
Uint8
......@@ -201,6 +202,7 @@ inline
void
ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){
ASSERT_MAX(flag, SCAN_BATCH_MASK, "ScanTabReq::setScanBatch");
requestInfo &= ~(SCAN_BATCH_MASK << SCAN_BATCH_SHIFT);
requestInfo |= (flag << SCAN_BATCH_SHIFT);
}
......@@ -250,8 +252,8 @@ private:
Uint32 info;
};
static Uint32 getLength(Uint32 opDataInfo) { return opDataInfo >> 5; };
static Uint32 getRows(Uint32 opDataInfo) { return opDataInfo & 31;}
static Uint32 getLength(Uint32 opDataInfo) { return opDataInfo >> 10; };
static Uint32 getRows(Uint32 opDataInfo) { return opDataInfo & 1023;}
};
/**
......
......@@ -228,21 +228,21 @@ private:
* Scan Info
*
t = Scan take over indicator - 1 Bit
n = Take over node - 16 Bits -> max 65535
p = Scan Info - 12 Bits -> max 4095
n = Take over node - 12 Bits -> max 65535
p = Scan Info - 18 Bits -> max 4095
1111111111222222222233
01234567890123456789012345678901
tpppppppppppp nnnnnnnnnnnnnnnn
tpppppppppppppppppp nnnnnnnnnnnn
*/
#define TAKE_OVER_SHIFT (0)
#define TAKE_OVER_NODE_SHIFT (16)
#define TAKE_OVER_NODE_MASK (65535)
#define TAKE_OVER_NODE_SHIFT (20)
#define TAKE_OVER_NODE_MASK (4095)
#define SCAN_INFO_SHIFT (1)
#define SCAN_INFO_MASK (4095)
#define SCAN_INFO_MASK (262143)
/**
* Attr Len
......
......@@ -75,6 +75,7 @@ private:
class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr);
void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size);
void prepareSend();
void calculate_batch_size(Uint32, Uint32, Uint32&, Uint32&, Uint32&);
int execKEYINFO20(Uint32 info, const Uint32* ptr, Uint32 len);
int execTRANSID_AI(const Uint32* ptr, Uint32 len);
......
......@@ -122,7 +122,6 @@ protected:
NdbConnection *m_transConnection;
// Scan related variables
Uint32 theBatchSize;
Uint32 theParallelism;
Uint32 m_keyInfo;
NdbApiSignal* theSCAN_TABREQ;
......
......@@ -27,7 +27,7 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
const UintR requestInfo = sig->requestInfo;
fprintf(output, " apiConnectPtr: H\'%.8x\n",
fprintf(output, " apiConnectPtr: H\'%.8x",
sig->apiConnectPtr);
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Holdlock: %u, RangeScan: %u\n",
......@@ -42,23 +42,8 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " transId(1, 2): (H\'%.8x, H\'%.8x) storedProcId: H\'%.8x\n",
sig->transId1, sig->transId2, sig->storedProcId);
fprintf(output, " OperationPtr(s):\n ");
Uint32 restLen = (len - 9);
const Uint32 * rest = &sig->apiOperationPtr[0];
while(restLen >= 7){
fprintf(output,
" H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n",
rest[0], rest[1], rest[2], rest[3],
rest[4], rest[5], rest[6]);
restLen -= 7;
rest += 7;
}
if(restLen > 0){
for(Uint32 i = 0; i<restLen; i++)
fprintf(output, " H\'%.8x", rest[i]);
fprintf(output, "\n");
}
fprintf(output, " batch_byte_size: %d, first_batch_size: %d\n",
sig->batch_byte_size, sig->first_batch_size);
return false;
}
......
......@@ -3324,20 +3324,16 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
req->requestInfo = 0;
req->savePointId = 0;
req->tableId = table.tableId;
ScanFragReq::setConcurrency(req->requestInfo, parallelism);
//ScanFragReq::setConcurrency(req->requestInfo, parallelism);
ScanFragReq::setLockMode(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 1);
ScanFragReq::setAttrLen(req->requestInfo,attrLen);
req->transId1 = 0;
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
Uint32 i;
for(i = 0; i<parallelism; i++) {
jam();
req->clientOpPtr[i] = filePtr.i;
}//for
sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB);
req->clientOpPtr= filePtr.i;
sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
signal->theData[0] = filePtr.i;
signal->theData[1] = 0;
......@@ -3351,6 +3347,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
signal->theData[7] = 0;
Uint32 dataPos = 8;
Uint32 i;
for(i = 0; i<table.noOfAttributes; i++) {
jam();
AttributePtr attr;
......@@ -3655,7 +3652,7 @@ Backup::execSCAN_FRAGCONF(Signal* signal)
c_backupFilePool.getPtr(filePtr, filePtrI);
OperationRecord & op = filePtr.p->operation;
op.scanConf(conf->completedOps, conf->opReturnDataLen);
//op.scanConf(conf->completedOps, conf->opReturnDataLen);
const Uint32 completed = conf->fragmentCompleted;
if(completed != 2) {
......
......@@ -533,9 +533,11 @@ public:
COPY = 2
};
UintR scanAccOpPtr[MAX_PARALLEL_OP_PER_SCAN];
UintR scanApiOpPtr[MAX_PARALLEL_OP_PER_SCAN];
UintR scanOpLength[MAX_PARALLEL_OP_PER_SCAN];
UintR scanApiOpPtr;
UintR scanLocalref[2];
Uint32 scan_batch_len;
Uint32 first_batch_size;
Uint32 batch_byte_size;
UintR copyPtr;
union {
Uint32 nextPool;
......
......@@ -890,7 +890,7 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* signal)
&ctcConnectrecFileSize));
clogFileFileSize = 4 * cnoLogFiles;
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize));
cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_SCANS_PER_FRAG;
cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN;
initRecords();
initialiseRecordsLab(signal, 0, ref, senderData);
......@@ -2099,8 +2099,6 @@ void Dblqh::execTIME_SIGNAL(Signal* signal)
c_scanRecordPool.getPtr(TscanPtr, tTcConptr.p->tcScanRec);
ndbout << " scanState = " << TscanPtr.p->scanState << endl;
//TscanPtr.p->scanAccOpPtr[16];
//TscanPtr.p->scanApiOpPtr[16];
//TscanPtr.p->scanOpLength[16];
//TscanPtr.p->scanLocalref[2];
ndbout << " copyPtr="<<TscanPtr.p->copyPtr
<< " scanAccPtr="<<TscanPtr.p->scanAccPtr
......@@ -6988,6 +6986,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
initScanAccOp(signal);
scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0;
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
scanNextLoopLab(signal);
}//Dblqh::continueScanNextReqLab()
......@@ -7142,6 +7141,7 @@ void Dblqh::closeScanRequestLab(Signal* signal)
}//if
tcConnectptr.p->abortState = TcConnectionrec::ABORT_ACTIVE;
scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0;
sendScanFragConf(signal, ZTRUE);
break;
case TcConnectionrec::SCAN_TUPKEY:
......@@ -7225,7 +7225,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
* ------------------------------------------------------------------------- */
void Dblqh::execSCAN_FRAGREQ(Signal* signal)
{
const ScanFragReq * const scanFragReq = (ScanFragReq *)&signal->theData[0];
ScanFragReq * const scanFragReq = (ScanFragReq *)&signal->theData[0];
ScanFragRef * ref;
const Uint32 transid1 = scanFragReq->transId1;
const Uint32 transid2 = scanFragReq->transId2;
......@@ -7238,7 +7238,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
const Uint32 reqinfo = scanFragReq->requestInfo;
const Uint32 fragId = scanFragReq->fragmentNo;
tabptr.i = scanFragReq->tableId;
const Uint32 scanConcurrentOperations = ScanFragReq::getConcurrency(reqinfo);
const Uint32 scanConcurrentOperations = scanFragReq->concurrency;
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
const Uint8 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
const Uint8 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
......@@ -7256,9 +7256,9 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
tcConnectptr.p->savePointId = scanFragReq->savePointId;
} else {
jam();
/* ---------------------------------------------------------------------
* NO FREE TC RECORD AVAILABLE, THUS WE CANNOT HANDLE THE REQUEST.
* --------------------------------------------------------------------- */
/* --------------------------------------------------------------------
* NO FREE TC RECORD AVAILABLE, THUS WE CANNOT HANDLE THE REQUEST.
* -------------------------------------------------------------------- */
errorCode = ZNO_TC_CONNECT_ERROR;
senderData = scanFragReq->senderData;
goto error_handler_early;
......@@ -7871,8 +7871,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1];
tupKeyReq->attrBufLen = 0;
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
tupKeyReq->opRef =
scanptr.p->scanApiOpPtr[scanptr.p->scanCompletedOperations];
tupKeyReq->opRef = scanptr.p->scanApiOpPtr;
tupKeyReq->applRef = scanptr.p->scanApiBlockref;
tupKeyReq->schemaVersion = scanptr.p->scanSchemaVersion;
tupKeyReq->storedProcedure = scanptr.p->scanStoredProcId;
......@@ -7963,7 +7962,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
tdata4 += tcConnectptr.p->primKeyLen;// Inform API about keyinfo len aswell
}//if
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->scanOpLength[scanptr.p->scanCompletedOperations] = tdata4;
scanptr.p->scan_batch_len+= tdata4;
scanptr.p->scanCompletedOperations++;
if ((scanptr.p->scanCompletedOperations ==
scanptr.p->scanConcurrentOperations) &&
......@@ -8217,6 +8216,7 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
} else {
jam();
scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0;
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
}//if
finishScanrec(signal);
......@@ -8249,7 +8249,7 @@ void Dblqh::initScanAccOp(Signal* signal)
Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
{
const Uint32 reqinfo = scanFragReq->requestInfo;
const Uint32 scanConcurrentOperations = ScanFragReq::getConcurrency(reqinfo);
const Uint32 scanConcurrentOperations = scanFragReq->concurrency;
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
const Uint32 scanLockHold = ScanFragReq::getHoldLockFlag(reqinfo);
const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
......@@ -8267,7 +8267,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanTcrec = tcConnectptr.i;
scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0;
scanptr.p->scanConcurrentOperations = scanConcurrentOperations;
scanptr.p->batch_byte_size= scanFragReq->batch_byte_size;
scanptr.p->first_batch_size= scanFragReq->first_batch_size;
scanptr.p->scanErrorCounter = 0;
scanptr.p->scanLockMode = scanLockMode;
scanptr.p->readCommitted = readCommitted;
......@@ -8279,11 +8282,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanLocalFragid = 0;
scanptr.p->scanTcWaiting = ZTRUE;
scanptr.p->scanNumber = ~0;
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
for (Uint32 i = 0; i < scanConcurrentOperations; i++) {
jam();
scanptr.p->scanApiOpPtr[i] = scanFragReq->clientOpPtr[i];
scanptr.p->scanOpLength[i] = 0;
scanptr.p->scanAccOpPtr[i] = 0;
}//for
......@@ -8547,11 +8547,11 @@ void Dblqh::sendKeyinfo20(Signal* signal,
TdataBuf.i = TdataBuf.p->nextDatabuf;
}
keyInfo->clientOpPtr = scanP->scanApiOpPtr[scanOp];
keyInfo->clientOpPtr = scanP->scanApiOpPtr;
keyInfo->keyLen = keyLen;
keyInfo->scanInfo_Node = KeyInfo20::setScanInfo(scanOp,
scanP->scanNumber)+
(getOwnNodeId() << 16);
(getOwnNodeId() << 20);
keyInfo->transId1 = tcConP->transid[0];
keyInfo->transId2 = tcConP->transid[1];
......@@ -8632,23 +8632,27 @@ void Dblqh::sendKeyinfo20(Signal* signal,
* ------------------------------------------------------------------------ */
void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
{
Uint32 completed_ops= scanptr.p->scanCompletedOperations;
Uint32 total_len= scanptr.p->scan_batch_len;
scanptr.p->scanTcWaiting = ZFALSE;
if(ERROR_INSERTED(5037)){
CLEAR_ERROR_INSERT_VALUE;
return;
}
scanptr.p->scanTcWaiting = ZFALSE;
ScanFragConf * conf = (ScanFragConf*)&signal->theData[0];
NodeId tc_node_id= refToNode(tcConnectptr.p->clientBlockref);
Uint32 trans_id1= tcConnectptr.p->transid[0];
Uint32 trans_id2= tcConnectptr.p->transid[1];
conf->senderData = tcConnectptr.p->clientConnectrec;
conf->completedOps = scanptr.p->scanCompletedOperations;
conf->completedOps = completed_ops;
conf->fragmentCompleted = scanCompleted;
for(Uint32 i = 0; i<MAX_PARALLEL_OP_PER_SCAN; i++)
conf->opReturnDataLen[i] = scanptr.p->scanOpLength[i];
conf->transId1 = tcConnectptr.p->transid[0];
conf->transId2 = tcConnectptr.p->transid[1];
conf->transId1 = trans_id1;
conf->transId2 = trans_id2;
conf->total_len= total_len;
sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF,
signal, ScanFragConf::SignalLength, JBB);
signal, ScanFragConf::SignalLength, JBB);
}//Dblqh::sendScanFragConf()
/* ######################################################################### */
......
......@@ -1184,7 +1184,11 @@ public:
Uint32 scanTableref;
// Number of operation records per scanned fragment
// Number of operations in first batch
// Max number of bytes per batch
Uint16 noOprecPerFrag;
Uint16 first_batch_size;
Uint32 batch_byte_size;
// Shall the locks be held until the application have read the
// records
......@@ -1417,17 +1421,13 @@ private:
UintR anApiConnectPtr);
void handleScanStop(Signal* signal, UintR aFailedNode);
void initScanTcrec(Signal* signal);
void initScanApirec(Signal* signal,
Uint32 buddyPtr,
UintR transid1,
UintR transid2);
void initScanrec(ScanRecordPtr, const class ScanTabReq*,
void initScanrec(ScanRecordPtr, const class ScanTabReq*,
const UintR scanParallel,
const UintR noOprecPerFrag);
void initScanfragrec(Signal* signal);
void releaseScanResources(ScanRecordPtr);
ScanRecordPtr seizeScanrec(Signal* signal);
void sendScanFragReq(Signal* signal, ScanRecord*, ScanFragRec*);
void sendScanFragReq(Signal*, ScanRecord*, ScanFragRec*);
void sendScanTabConf(Signal* signal, ScanRecord*);
void close_scan_req(Signal*, ScanRecordPtr, bool received_req);
void close_scan_req_send_conf(Signal*, ScanRecordPtr);
......
This diff is collapsed.
......@@ -1844,7 +1844,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->tableId = tabPtr.p->m_tableId;
req->requestInfo = 0;
req->savePointId = 0;
ScanFragReq::setConcurrency(req->requestInfo, parallelism);
//ScanFragReq::setConcurrency(req->requestInfo, parallelism);
ScanFragReq::setLockMode(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
......@@ -1853,9 +1853,10 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->schemaVersion = tabPtr.p->m_schemaVersion;
req->transId1 = 0;
req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
for(unsigned int i = 0; i<parallelism; i++){
req->clientOpPtr[i] = (ptrI << 16) + (i + 1);
//req->clientOpPtr[i] = (ptrI << 16) + (i + 1);
req->clientOpPtr = (ptrI << 16) + (i + 1);
}
suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB);
......
......@@ -511,7 +511,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){
/*-----------------------------------------------------------------------*/
cfg.put(CFG_ACC_OP_RECS,
((11 * noOfOperations) / 10 + 50) +
(noOfLocalScanRecords * MAX_PARALLEL_SCANS_PER_FRAG) +
(noOfLocalScanRecords * MAX_PARALLEL_OP_PER_SCAN) +
NODE_RECOVERY_SCAN_OP_RECORDS);
cfg.put(CFG_ACC_OVERFLOW_RECS,
......
......@@ -177,7 +177,7 @@ NdbApiSignal::setSignal(int aNdbSignalType)
theTrace = TestOrd::TraceAPI;
theReceiversBlockNumber = DBTC;
theVerId_signalNumber = GSN_SCAN_TABREQ;
theLength = 9; // ScanTabReq::SignalLength;
theLength = ScanTabReq::StaticLength;
}
break;
......@@ -186,7 +186,7 @@ NdbApiSignal::setSignal(int aNdbSignalType)
theTrace = TestOrd::TraceAPI;
theReceiversBlockNumber = DBTC;
theVerId_signalNumber = GSN_SCAN_NEXTREQ;
theLength = 4;
theLength = ScanNextReq::SignalLength;
}
break;
......
......@@ -71,6 +71,8 @@ public:
const Uint32 * getDataPtr() const;
Uint32 * getDataPtrSend();
NodeId get_sender_node();
/**
* Fragmentation
*/
......@@ -103,6 +105,17 @@ private:
NdbApiSignal *theNextSignal;
Uint32 *theRealData;
};
/**********************************************************************
NodeId get_sender_node
Remark: Get the node id of the sender
***********************************************************************/
inline
NodeId
NdbApiSignal::get_sender_node()
{
return refToNode(theSendersBlockRef);
}
/**********************************************************************
void getLength
Remark: Get the length of the signal.
......
......@@ -99,11 +99,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
}
for(Uint32 i = 0; i<len; i += 3){
Uint32 opCount, totalLen;
Uint32 ptrI = * ops++;
Uint32 tcPtrI = * ops++;
Uint32 info = * ops++;
Uint32 opCount = ScanTabConf::getRows(info);
Uint32 totalLen = ScanTabConf::getLength(info);
opCount = ScanTabConf::getRows(info);
totalLen = ScanTabConf::getLength(info);
void * tPtr = theNdb->int2void(ptrI);
assert(tPtr); // For now
......
......@@ -89,6 +89,47 @@ NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
#define KEY_ATTR_ID (~0)
void
NdbReceiver::calculate_batch_size(Uint32 key_size,
Uint32 parallelism,
Uint32& batch_size,
Uint32& batch_byte_size,
Uint32& first_batch_size)
{
Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
NdbRecAttr *rec_attr= theFirstRecAttr;
while (rec_attr != NULL) {
Uint32 attr_size= rec_attr->attrSize() * rec_attr->arraySize();
attr_size= ((attr_size + 7) >> 2) << 2; //Even to word + overhead
tot_size+= attr_size;
rec_attr= rec_attr->next();
}
tot_size+= 32; //include signal overhead
/**
* Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
* bytes sent for each batch from each node. We do however ensure that
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
* batch.
*/
batch_byte_size= SCAN_BATCH_SIZE;
if (SCAN_BATCH_SIZE * parallelism > MAX_SCAN_BATCH_SIZE) {
batch_byte_size= MAX_SCAN_BATCH_SIZE / parallelism;
}
batch_size= batch_byte_size / tot_size;
#ifdef VM_TRACE
ndbout << "batch_byte_size = " << batch_byte_size << " batch_size = ";
ndbout << batch_size << "tot_size = " << tot_size << endl;
#endif
if (batch_size == 0) {
batch_size= 1;
} else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
batch_size= MAX_PARALLEL_OP_PER_SCAN;
}
first_batch_size= batch_size;
return;
}
void
NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
if(rows > m_defined_rows){
......@@ -139,7 +180,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
}
prepareSend();
return ; //0;
return;
}
void
......
......@@ -140,17 +140,9 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 fragCount = m_currentTable->m_fragmentCount;
if (batch + parallel == 0) {
batch = 16;
parallel= fragCount;
} else {
if (batch == 0 && parallel > 0) { // Backward
batch = (parallel >= 16 ? 16 : parallel);
parallel = (parallel + 15) / 16;
}
if (parallel > fragCount || parallel == 0)
if (parallel > fragCount || parallel == 0) {
parallel = fragCount;
}
}
// It is only possible to call openScan if
// 1. this transcation don't already contain another scan operation
......@@ -201,7 +193,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
}
theParallelism = parallel;
theBatchSize = batch;
if(fix_receivers(parallel) == -1){
setErrorCodeAbort(4000);
......@@ -223,7 +214,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 reqInfo = 0;
ScanTabReq::setParallelism(reqInfo, parallel);
ScanTabReq::setScanBatch(reqInfo, batch);
ScanTabReq::setScanBatch(reqInfo, 0);
ScanTabReq::setLockMode(reqInfo, lockExcl);
ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
......@@ -815,8 +806,23 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
theReceiver.prepareSend();
bool keyInfo = m_keyInfo;
Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
/**
* The number of records sent by each LQH is calculated and the kernel
* is informed of this number by updating the SCAN_TABREQ signal
*/
Uint32 batch_size, batch_byte_size, first_batch_size;
theReceiver.calculate_batch_size(key_size,
theParallelism,
batch_size,
batch_byte_size,
first_batch_size);
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
req->batch_byte_size= batch_byte_size;
req->first_batch_size= first_batch_size;
for(Uint32 i = 0; i<theParallelism; i++){
m_receivers[i]->do_get_value(&theReceiver, theBatchSize, key_size);
m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size);
}
return 0;
}
......@@ -856,23 +862,13 @@ NdbScanOperation::doSendScan(int aProcessorId)
if (theOperationType == OpenRangeScanRequest)
req->attrLen += theTotalBoundAI_Len;
TransporterFacade *tp = TransporterFacade::instance();
if(theParallelism > 16){
LinearSectionPtr ptr[3];
ptr[0].p = m_prepared_receivers;
ptr[0].sz = theParallelism;
if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) {
setErrorCode(4002);
return -1;
}
} else {
tSignal->setLength(9+theParallelism);
memcpy(tSignal->getDataPtrSend()+9, m_prepared_receivers, 4*theParallelism);
if (tp->sendSignal(tSignal, aProcessorId) == -1) {
setErrorCode(4002);
return -1;
}
}
LinearSectionPtr ptr[3];
ptr[0].p = m_prepared_receivers;
ptr[0].sz = theParallelism;
if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) {
setErrorCode(4002);
return -1;
}
if (theOperationType == OpenRangeScanRequest) {
// must have at least one signal since it contains attrLen for bounds
assert(theBoundATTRINFO != NULL);
......@@ -969,8 +965,8 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
}
const Uint32 * src = (Uint32*)tRecAttr->aRef();
const Uint32 tScanInfo = src[len] & 0xFFFF;
const Uint32 tTakeOverNode = src[len] >> 16;
const Uint32 tScanInfo = src[len] & 0x3FFFF;
const Uint32 tTakeOverNode = src[len] >> 20;
{
UintR scanInfo = 0;
TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment