Commit f67262fd authored by mronstrom@mysql.com's avatar mronstrom@mysql.com

Fix bugs + include check of batch_byte_size and

use of first_batch_size
parent 6f24d1eb
...@@ -68,7 +68,7 @@ ...@@ -68,7 +68,7 @@
* API can order a multiple of this number of records at a time since * API can order a multiple of this number of records at a time since
* fragments can be scanned in parallel. * fragments can be scanned in parallel.
*/ */
#define MAX_PARALLEL_OP_PER_SCAN 512 #define MAX_PARALLEL_OP_PER_SCAN 992
/* /*
* When calculating the number of records sent from LQH in each batch * When calculating the number of records sent from LQH in each batch
* one uses SCAN_BATCH_SIZE divided by the expected size of signals * one uses SCAN_BATCH_SIZE divided by the expected size of signals
......
...@@ -143,6 +143,7 @@ public: ...@@ -143,6 +143,7 @@ public:
ZSCAN_NO_FRAGMENT_ERROR = 487, ZSCAN_NO_FRAGMENT_ERROR = 487,
ZTOO_MANY_ACTIVE_SCAN_ERROR = 488, ZTOO_MANY_ACTIVE_SCAN_ERROR = 488,
ZNO_FREE_SCANREC_ERROR = 489, ZNO_FREE_SCANREC_ERROR = 489,
ZWRONG_BATCH_SIZE = 1230,
ZSTANDBY_SCAN_ERROR = 1209, ZSTANDBY_SCAN_ERROR = 1209,
ZSCAN_BOOK_ACC_OP_ERROR = 1219, ZSCAN_BOOK_ACC_OP_ERROR = 1219,
ZUNKNOWN_TRANS_ERROR = 1227 ZUNKNOWN_TRANS_ERROR = 1227
......
...@@ -74,7 +74,7 @@ private: ...@@ -74,7 +74,7 @@ private:
static Uint8 getHoldLockFlag(const UintR & requestInfo); static Uint8 getHoldLockFlag(const UintR & requestInfo);
static Uint8 getReadCommittedFlag(const UintR & requestInfo); static Uint8 getReadCommittedFlag(const UintR & requestInfo);
static Uint8 getRangeScanFlag(const UintR & requestInfo); static Uint8 getRangeScanFlag(const UintR & requestInfo);
static Uint8 getScanBatch(const UintR & requestInfo); static Uint16 getScanBatch(const UintR & requestInfo);
/** /**
* Set:ers for requestInfo * Set:ers for requestInfo
...@@ -152,9 +152,9 @@ ScanTabReq::getRangeScanFlag(const UintR & requestInfo){ ...@@ -152,9 +152,9 @@ ScanTabReq::getRangeScanFlag(const UintR & requestInfo){
} }
inline inline
Uint8 Uint16
ScanTabReq::getScanBatch(const Uint32 & requestInfo){ ScanTabReq::getScanBatch(const Uint32 & requestInfo){
return (Uint8)((requestInfo >> SCAN_BATCH_SHIFT) & SCAN_BATCH_MASK); return (Uint16)((requestInfo >> SCAN_BATCH_SHIFT) & SCAN_BATCH_MASK);
} }
inline inline
......
...@@ -538,8 +538,10 @@ public: ...@@ -538,8 +538,10 @@ public:
UintR scanApiOpPtr; UintR scanApiOpPtr;
UintR scanLocalref[2]; UintR scanLocalref[2];
Uint32 scan_batch_len; Uint32 scan_batch_len;
Uint32 batch_size;
Uint32 first_batch_size; Uint32 first_batch_size;
Uint32 batch_byte_size; Uint32 batch_byte_size;
UintR copyPtr; UintR copyPtr;
union { union {
Uint32 nextPool; Uint32 nextPool;
...@@ -569,14 +571,15 @@ public: ...@@ -569,14 +571,15 @@ public:
ScanType scanType; ScanType scanType;
BlockReference scanApiBlockref; BlockReference scanApiBlockref;
NodeId scanNodeId; NodeId scanNodeId;
Uint16 scanReleaseCounter;
Uint16 scanNumber;
Uint8 scanCompletedStatus; Uint8 scanCompletedStatus;
Uint8 scanFlag; Uint8 scanFlag;
Uint8 scanLockHold; Uint8 scanLockHold;
Uint8 scanLockMode; Uint8 scanLockMode;
Uint8 readCommitted; Uint8 readCommitted;
Uint8 rangeScan; Uint8 rangeScan;
Uint8 scanNumber;
Uint8 scanReleaseCounter;
Uint8 scanTcWaiting; Uint8 scanTcWaiting;
Uint8 scanKeyinfoFlag; Uint8 scanKeyinfoFlag;
}; // Size 272 bytes }; // Size 272 bytes
...@@ -2225,9 +2228,10 @@ private: ...@@ -2225,9 +2228,10 @@ private:
void init_acc_ptr_list(ScanRecord*); void init_acc_ptr_list(ScanRecord*);
bool seize_acc_ptr_list(ScanRecord*, Uint32); bool seize_acc_ptr_list(ScanRecord*, Uint32);
void release_acc_ptr_list(ScanRecord*); void release_acc_ptr_list(ScanRecord*);
Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32); Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32, bool);
void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32); void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32);
void get_acc_ptr(ScanRecord*, Uint32*, Uint32); void i_get_acc_ptr(ScanRecord*, Uint32*&, Uint32);
bool check_scan_batch_completed(ScanRecord*);
void removeTable(Uint32 tableId); void removeTable(Uint32 tableId);
void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId); void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId);
...@@ -2926,4 +2930,11 @@ public: ...@@ -2926,4 +2930,11 @@ public:
DLHashTable<ScanRecord> c_scanTakeOverHash; DLHashTable<ScanRecord> c_scanTakeOverHash;
}; };
inline
bool
Dblqh::check_scan_batch_completed(ScanRecord* scanP)
{
return (scanP->scanCompletedOperations == scanP->scanConcurrentOperations) ||
(scanP->scan_batch_len >= scanP->batch_byte_size);
}
#endif #endif
...@@ -3581,7 +3581,9 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) ...@@ -3581,7 +3581,9 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
takeOverErrorLab(signal); takeOverErrorLab(signal);
return; return;
}//if }//if
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, ttcScanOp); Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
ttcScanOp,
true);
if (accOpPtr == RNIL) { if (accOpPtr == RNIL) {
jam(); jam();
releaseActiveFrag(signal); releaseActiveFrag(signal);
...@@ -7023,7 +7025,9 @@ void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal) ...@@ -7023,7 +7025,9 @@ void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal)
scanptr.p->scanState = ScanRecord::WAIT_RELEASE_LOCK; scanptr.p->scanState = ScanRecord::WAIT_RELEASE_LOCK;
signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1]= signal->theData[1]=
get_acc_ptr_from_scan_record(scanptr.p, scanptr.p->scanReleaseCounter -1); get_acc_ptr_from_scan_record(scanptr.p,
scanptr.p->scanReleaseCounter -1,
false);
signal->theData[2] = NextScanReq::ZSCAN_COMMIT; signal->theData[2] = NextScanReq::ZSCAN_COMMIT;
if (! scanptr.p->rangeScan) if (! scanptr.p->rangeScan)
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB); sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
...@@ -7181,8 +7185,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal) ...@@ -7181,8 +7185,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
(scanptr.p->scanCompletedStatus == ZTRUE)) { (scanptr.p->scanCompletedStatus == ZTRUE)) {
jam(); jam();
closeScanLab(signal); closeScanLab(signal);
} else if ((scanptr.p->scanConcurrentOperations == } else if (check_scan_batch_completed(scanptr.p) &&
scanptr.p->scanCompletedOperations) &&
scanptr.p->scanLockHold != ZTRUE) { scanptr.p->scanLockHold != ZTRUE) {
jam(); jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
...@@ -7279,31 +7282,36 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP) ...@@ -7279,31 +7282,36 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP)
inline inline
void void
Dblqh::get_acc_ptr(ScanRecord* scanP, Uint32 *acc_ptr, Uint32 index) Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
{ {
if (index == 0) { if (index == 0) {
jam(); acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
acc_ptr= &scanP->scan_acc_op_ptr[0];
} else { } else {
Uint32 attr_buf_index, attr_buf_rec; Uint32 attr_buf_index, attr_buf_rec;
AttrbufPtr regAttrPtr; AttrbufPtr regAttrPtr;
jam(); jam();
attr_buf_rec= (index + 30) / 32; attr_buf_rec= (index + 31) / 32;
attr_buf_index= (index - 1) & 31; attr_buf_index= (index - 1) & 31;
regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
acc_ptr= &regAttrPtr.p->attrbuf[attr_buf_index]; acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
} }
} }
Uint32 Uint32
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index) Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP,
Uint32 index,
bool crash_flag)
{ {
Uint32 *acc_ptr; Uint32* acc_ptr;
Uint32 attr_buf_rec, attr_buf_index; Uint32 attr_buf_rec, attr_buf_index;
ndbrequire((index < MAX_PARALLEL_OP_PER_SCAN) && if (!((index < MAX_PARALLEL_OP_PER_SCAN) &&
index < scanP->scan_acc_index); index < scanP->scan_acc_index)) {
get_acc_ptr(scanP, acc_ptr, index); ndbrequire(crash_flag);
return RNIL;
}
i_get_acc_ptr(scanP, acc_ptr, index);
return *acc_ptr; return *acc_ptr;
} }
...@@ -7315,7 +7323,7 @@ Dblqh::set_acc_ptr_in_scan_record(ScanRecord* scanP, ...@@ -7315,7 +7323,7 @@ Dblqh::set_acc_ptr_in_scan_record(ScanRecord* scanP,
ndbrequire((index == 0 || scanP->scan_acc_index == index) && ndbrequire((index == 0 || scanP->scan_acc_index == index) &&
(index < MAX_PARALLEL_OP_PER_SCAN)); (index < MAX_PARALLEL_OP_PER_SCAN));
scanP->scan_acc_index= index + 1; scanP->scan_acc_index= index + 1;
get_acc_ptr(scanP, acc_ptr, index); i_get_acc_ptr(scanP, acc_ptr, index);
*acc_ptr= acc; *acc_ptr= acc;
} }
...@@ -7882,10 +7890,11 @@ void Dblqh::nextScanConfScanLab(Signal* signal) ...@@ -7882,10 +7890,11 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
GSN_ACC_CHECK_SCAN, signal, 2, JBB); GSN_ACC_CHECK_SCAN, signal, 2, JBB);
return; return;
}//if }//if
jam();
set_acc_ptr_in_scan_record(scanptr.p, set_acc_ptr_in_scan_record(scanptr.p,
scanptr.p->scanCompletedOperations, scanptr.p->scanCompletedOperations,
nextScanConf->accOperationPtr); nextScanConf->accOperationPtr);
jam();
scanptr.p->scanLocalref[0] = nextScanConf->localKey[0]; scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
scanptr.p->scanLocalref[1] = nextScanConf->localKey[1]; scanptr.p->scanLocalref[1] = nextScanConf->localKey[1];
scanptr.p->scanLocalFragid = nextScanConf->fragId; scanptr.p->scanLocalFragid = nextScanConf->fragId;
...@@ -7904,6 +7913,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal) ...@@ -7904,6 +7913,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
return; return;
}//if }//if
}//if }//if
jam();
nextScanConfLoopLab(signal); nextScanConfLoopLab(signal);
}//Dblqh::nextScanConfScanLab() }//Dblqh::nextScanConfScanLab()
...@@ -7926,7 +7936,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal) ...@@ -7926,7 +7936,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
closeScanLab(signal); closeScanLab(signal);
return; return;
}//if }//if
jam();
Uint32 tableRef; Uint32 tableRef;
Uint32 tupFragPtr; Uint32 tupFragPtr;
Uint32 reqinfo = (scanptr.p->scanLockHold == ZFALSE); Uint32 reqinfo = (scanptr.p->scanLockHold == ZFALSE);
...@@ -7960,6 +7970,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal) ...@@ -7960,6 +7970,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
}//if }//if
} }
{ {
jam();
TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend(); TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
tupKeyReq->connectPtr = tcConnectptr.p->tupConnectrec; tupKeyReq->connectPtr = tcConnectptr.p->tupConnectrec;
...@@ -8062,26 +8073,27 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) ...@@ -8062,26 +8073,27 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN); ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->scan_batch_len+= tdata4; scanptr.p->scan_batch_len+= tdata4;
scanptr.p->scanCompletedOperations++; scanptr.p->scanCompletedOperations++;
if ((scanptr.p->scanCompletedOperations == if (check_scan_batch_completed(scanptr.p)) {
scanptr.p->scanConcurrentOperations) && if (scanptr.p->scanLockHold == ZTRUE) {
(scanptr.p->scanLockHold == ZTRUE)) {
jam(); jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE); sendScanFragConf(signal, ZFALSE);
return; return;
} else if (scanptr.p->scanCompletedOperations == } else {
scanptr.p->scanConcurrentOperations) {
jam(); jam();
scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations; scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations;
scanReleaseLocksLab(signal); scanReleaseLocksLab(signal);
return; return;
} else if (scanptr.p->scanLockHold == ZTRUE) { }
} else {
if (scanptr.p->scanLockHold == ZTRUE) {
jam(); jam();
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
} else { } else {
jam(); jam();
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
}//if }
}
scanNextLoopLab(signal); scanNextLoopLab(signal);
}//Dblqh::scanTupkeyConfLab() }//Dblqh::scanTupkeyConfLab()
...@@ -8123,11 +8135,13 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal) ...@@ -8123,11 +8135,13 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal)
jam(); jam();
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
scanptr.p->scanCompletedOperations); scanptr.p->scanCompletedOperations,
false);
} else if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_COMMIT) { } else if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_COMMIT) {
jam(); jam();
accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
scanptr.p->scanCompletedOperations); scanptr.p->scanCompletedOperations,
false);
} else { } else {
jam(); jam();
accOpPtr = RNIL; // The value is not used in ACC accOpPtr = RNIL; // The value is not used in ACC
...@@ -8355,7 +8369,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8355,7 +8369,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion; scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
scanptr.p->scanCompletedOperations = 0; scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0; scanptr.p->scan_batch_len= 0;
scanptr.p->scanConcurrentOperations = scanConcurrentOperations; scanptr.p->scanConcurrentOperations = scanFragReq->first_batch_size;
scanptr.p->batch_size= scanConcurrentOperations;
scanptr.p->batch_byte_size= scanFragReq->batch_byte_size; scanptr.p->batch_byte_size= scanFragReq->batch_byte_size;
scanptr.p->first_batch_size= scanFragReq->first_batch_size; scanptr.p->first_batch_size= scanFragReq->first_batch_size;
scanptr.p->scanErrorCounter = 0; scanptr.p->scanErrorCounter = 0;
...@@ -8371,6 +8386,14 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8371,6 +8386,14 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanNumber = ~0; scanptr.p->scanNumber = ~0;
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
ndbout << "batch_size = " << scanptr.p->batch_size;
ndbout << " first_batch_size = " << scanptr.p->scanConcurrentOperations;
ndbout << endl;
if ((scanptr.p->scanConcurrentOperations == 0) ||
(scanptr.p->scanConcurrentOperations > scanptr.p->batch_size)) {
jam();
return ScanFragRef::ZWRONG_BATCH_SIZE;
}
if (!seize_acc_ptr_list(scanptr.p, scanConcurrentOperations)) { if (!seize_acc_ptr_list(scanptr.p, scanConcurrentOperations)) {
jam(); jam();
return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR; return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR;
...@@ -8693,6 +8716,7 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) ...@@ -8693,6 +8716,7 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
{ {
Uint32 completed_ops= scanptr.p->scanCompletedOperations; Uint32 completed_ops= scanptr.p->scanCompletedOperations;
Uint32 total_len= scanptr.p->scan_batch_len; Uint32 total_len= scanptr.p->scan_batch_len;
scanptr.p->scanConcurrentOperations= scanptr.p->batch_size;
scanptr.p->scanTcWaiting = ZFALSE; scanptr.p->scanTcWaiting = ZFALSE;
if(ERROR_INSERTED(5037)){ if(ERROR_INSERTED(5037)){
...@@ -9276,7 +9300,7 @@ void Dblqh::continueCopyAfterBlockedLab(Signal* signal) ...@@ -9276,7 +9300,7 @@ void Dblqh::continueCopyAfterBlockedLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec; scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
tcConnectptr.p->errorCode = 0; tcConnectptr.p->errorCode = 0;
Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0); Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0, false);
signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = acc_op_ptr; signal->theData[1] = acc_op_ptr;
signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT; signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment