Commit ab198e52 authored by unknown's avatar unknown

Update error handling of new scan

Still known bugs :-(


ndb/include/kernel/signaldata/ScanTab.hpp:
  Add close flag
ndb/include/ndbapi/NdbConnection.hpp:
  Moved mehtod outside
ndb/include/ndbapi/NdbScanOperation.hpp:
  Removed err code from
ndb/src/common/debugger/signaldata/ScanTab.cpp:
  Updated printer
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  New error inserts for SCAN
ndb/src/kernel/blocks/dbtc/Dbtc.hpp:
  Update handling of frag timeouts
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  Update handling of frag timeouts
ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp:
  Don't send empty TRANSID_AI's
ndb/src/ndbapi/NdbConnectionScan.cpp:
  Update error handling of scan
ndb/src/ndbapi/NdbScanOperation.cpp:
  Update error handling of scan
ndb/src/ndbapi/Ndbif.cpp:
  Update error handling of scan
parent 9ff4d240
......@@ -367,7 +367,7 @@ public:
/**
* Length of signal
*/
STATIC_CONST( SignalLength = 4 );
STATIC_CONST( SignalLength = 5 );
private:
......@@ -380,7 +380,7 @@ private:
UintR transId1; // DATA 1
UintR transId2; // DATA 2
UintR errorCode; // DATA 3
// UintR sendScanNextReqWithClose; // DATA 4
UintR closeNeeded; // DATA 4
};
......
......@@ -633,17 +633,7 @@ private:
#ifdef VM_TRACE
void printState();
#endif
bool checkState_TransId(const Uint32 * transId) const {
const Uint32 tTmp1 = transId[0];
const Uint32 tTmp2 = transId[1];
Uint64 tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
bool b = theStatus == Connected && theTransactionId == tRecTransId;
#ifdef NDB_NO_DROPPED_SIGNAL
if(!b) abort();
#endif
return b;
}
bool checkState_TransId(const Uint32 * transId) const;
};
inline
......@@ -678,6 +668,19 @@ NdbConnection::checkMagicNumber()
}
}
inline
bool
NdbConnection::checkState_TransId(const Uint32 * transId) const {
const Uint32 tTmp1 = transId[0];
const Uint32 tTmp2 = transId[1];
Uint64 tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
bool b = theStatus == Connected && theTransactionId == tRecTransId;
#ifdef NDB_NO_DROPPED_SIGNAL
if(!b) abort();
#endif
return b;
}
/************************************************************************************************
void setTransactionId(Uint64 aTransactionId);
......
......@@ -146,7 +146,7 @@ protected:
int send_next_scan(Uint32 cnt, bool close);
void receiver_delivered(NdbReceiver*);
void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP(Uint32 errCode);
void execCLOSE_SCAN_REP();
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
......
......@@ -120,7 +120,7 @@ printSCANTABREF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " Errorcode: %u\n", sig->errorCode);
// fprintf(output, " sendScanNextReqWithClose: %u\n", sig->sendScanNextReqWithClose);
fprintf(output, " closeNeeded: %u\n", sig->closeNeeded);
return false;
}
......
......@@ -6785,7 +6785,8 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
if (findTransaction(transid1, transid2, senderData) != ZOK){
jam();
DEBUG("Received SCAN_NEXTREQ in LQH with close flag when closed");
DEBUG(senderData <<
" Received SCAN_NEXTREQ in LQH with close flag when closed");
ndbrequire(nextReq->closeFlag == ZTRUE);
return;
}
......@@ -6825,6 +6826,10 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
return;
}//if
if(ERROR_INSERTED(5036)){
return;
}
scanptr.i = tcConnectptr.p->tcScanRec;
ndbrequire(scanptr.i != RNIL);
c_scanRecordPool.getPtr(scanptr);
......@@ -6841,6 +6846,10 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
if(ERROR_INSERTED(5034)){
CLEAR_ERROR_INSERT_VALUE;
}
if(ERROR_INSERTED(5036)){
CLEAR_ERROR_INSERT_VALUE;
return;
}
closeScanRequestLab(signal);
return;
}//if
......@@ -8517,6 +8526,11 @@ void Dblqh::sendKeyinfo20(Signal* signal,
* ------------------------------------------------------------------------ */
void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
{
if(ERROR_INSERTED(5037)){
CLEAR_ERROR_INSERT_VALUE;
return;
}
scanptr.p->scanTcWaiting = ZFALSE;
ScanFragConf * conf = (ScanFragConf*)&signal->theData[0];
......
......@@ -1194,6 +1194,9 @@ public:
// Scan is on ordered index
Uint8 rangeScan;
// Close is ordered
bool m_close_scan_req;
};
typedef Ptr<ScanRecord> ScanRecordPtr;
......@@ -1414,15 +1417,15 @@ private:
Uint32 buddyPtr,
UintR transid1,
UintR transid2);
void initScanrec(Signal* signal,
void initScanrec(ScanRecordPtr, const class ScanTabReq*,
const UintR scanParallel,
const UintR noOprecPerFrag);
void initScanfragrec(Signal* signal);
void releaseScanResources(ScanRecordPtr);
void seizeScanrec(Signal* signal);
void sendScanFragReq(Signal* signal);
void sendScanTabConf(Signal* signal);
void close_scan_req(Signal*, ScanRecordPtr);
ScanRecordPtr seizeScanrec(Signal* signal);
void sendScanFragReq(Signal* signal, ScanRecord*, ScanFragRec*);
void sendScanTabConf(Signal* signal, ScanRecord*);
void close_scan_req(Signal*, ScanRecordPtr, bool received_req);
void close_scan_req_send_conf(Signal*, ScanRecordPtr);
void checkGcp(Signal* signal);
......@@ -1557,11 +1560,11 @@ private:
void systemErrorLab(Signal* signal);
void sendSignalErrorRefuseLab(Signal* signal);
void scanTabRefLab(Signal* signal, Uint32 errCode);
void diFcountReqLab(Signal* signal);
void diFcountReqLab(Signal* signal, ScanRecordPtr);
void signalErrorRefuseLab(Signal* signal);
void abort080Lab(Signal* signal);
void packKeyData000Lab(Signal* signal, BlockReference TBRef);
void abortScanLab(Signal* signal, Uint32 errCode);
void abortScanLab(Signal* signal, ScanRecordPtr, Uint32 errCode);
void sendAbortedAfterTimeout(Signal* signal, int Tcheck);
void abort010Lab(Signal* signal);
void abort015Lab(Signal* signal);
......@@ -1589,7 +1592,7 @@ private:
void attrinfo020Lab(Signal* signal);
void scanReleaseResourcesLab(Signal* signal);
void scanCompletedLab(Signal* signal);
void scanFragError(Signal* signal, Uint32 errorCode);
void scanError(Signal* signal, ScanRecordPtr, Uint32 errorCode);
void diverify010Lab(Signal* signal);
void intstartphase2x010Lab(Signal* signal);
void intstartphase3x010Lab(Signal* signal);
......@@ -1699,7 +1702,6 @@ private:
ApiConnectRecordPtr timeOutptr;
ScanRecord *scanRecord;
ScanRecordPtr scanptr;
UintR cscanrecFileSize;
UnsafeArrayPool<ScanFragRec> c_scan_frag_pool;
......
......@@ -76,6 +76,39 @@
#define INTERNAL_TRIGGER_TCKEYREQ_JBA 0
#ifdef VM_TRACE
NdbOut &
operator<<(NdbOut& out, Dbtc::ConnectionState state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dbtc::OperationState state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dbtc::AbortState state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dbtc::ReturnSignal state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dbtc::ScanRecord::ScanState state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dbtc::ScanFragRec::ScanFragState state){
out << (int)state;
return out;
}
#endif
void
Dbtc::updateBuddyTimer(ApiConnectRecordPtr apiPtr)
{
......@@ -915,7 +948,7 @@ Dbtc::handleFailedApiNode(Signal* signal,
ScanRecordPtr scanPtr;
scanPtr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanPtr, cscanrecFileSize, scanRecord);
close_scan_req(signal, scanPtr);
close_scan_req(signal, scanPtr, true);
TloopCount += 64;
break;
......@@ -1095,138 +1128,6 @@ void Dbtc::handleApiFailState(Signal* signal, UintR TapiConnectptr)
}//if
}//Dbtc::handleApiFailState()
/**
* Dbtc::handleScanStop
* This function is called when an entire scan should be stopped
* Check state of the scan and take appropriate action.
* The parameter TapiFailedNode indicates if the scan is stopped
* because an API node has failed or if it has been stopped because
* the scan has timed out.
*
*/
void Dbtc::handleScanStop(Signal* signal, UintR TapiFailedNode)
{
#if JONAS_NOT_DONE
arrGuard(TapiFailedNode, MAX_NODES);
scanptr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
// If api has failed we must release all resources
bool apiNodeHasFailed = (TapiFailedNode != 0);
DEBUG("handleScanStop: scanState = "<< scanptr.p->scanState);
switch (scanptr.p->scanState) {
case ScanRecord::WAIT_SCAN_TAB_INFO:
case ScanRecord::WAIT_AI:
jam();
/**
* The scan process is still in the definition phase.
* We will release the resources and then release the connection
* to the failed API.
*/
releaseScanResources(scanptr);
if (apiNodeHasFailed) {
jam();
releaseApiCon(signal, apiConnectptr.i);
}//if
break;
case ScanRecord::WAIT_FRAGMENT_COUNT:
jam();
if (!apiNodeHasFailed) {
jam();
/**
* Time-out waiting for a local signal can only happen
* if we have a serious problem.
*/
systemErrorLab(signal);
}//if
capiConnectClosing[TapiFailedNode]++;
apiConnectptr.p->apiFailState = ZTRUE;
scanptr.p->apiIsClosed = true;
break;
case ScanRecord::CLOSING_SCAN:
jam();
/**
* With CLOSING_SCAN it is enough to set the
* fail state such that the connection is released at the end of the
* closing process. The close process is already ongoing.
* Set apiIsClosed to true to indicate that resources should be released
* at the end of the close process.
**/
if (apiNodeHasFailed) {
jam();
capiConnectClosing[TapiFailedNode]++;
apiConnectptr.p->apiFailState = ZTRUE;
scanptr.p->apiIsClosed = true;
}//if
if (apiConnectptr.p->apiFailState == ZTRUE) {
jam();
handleApiFailState(signal, apiConnectptr.i);
return;
}//if
break;
case ScanRecord::SCAN_NEXT_ORDERED:
/**
* In the SCAN_NEXT_ORDERED state we will wait for the next natural place
* to receive some action from the API and instead of waiting for the
* API here we will start the abort process.
* After the abort process is completed we will release the connection.
*/
if (apiNodeHasFailed) {
jam();
capiConnectClosing[TapiFailedNode]++;
apiConnectptr.p->apiFailState = ZTRUE;
}//if
// Release resources and send a response to API
scanptr.p->apiIsClosed = true;
scanCompletedLab(signal);
break;
case ScanRecord::DELIVERED:
case ScanRecord::QUEUED_DELIVERED:
/**
* A response has been sent to the api but it has not responded
*/
if (apiNodeHasFailed) {
jam();
capiConnectClosing[TapiFailedNode]++;
apiConnectptr.p->apiFailState = ZTRUE;
scanptr.p->apiIsClosed = true;
} else {
jam();
/*
In this case we have received a time-out caused by the application
waiting too long to continue the scan. We will check the application
time-out instead of the deadlock detetection time-out. If the
application time-out hasn't fired we will simply ignore the condition.
*/
if ((ctcTimer - getApiConTimer(apiConnectptr.i)) <= c_appl_timeout_value) {
jam();
return;
}//if
// Dont' release, wait until api responds or fails
scanptr.p->apiIsClosed = false;
}
scanCompletedLab(signal);
break;
default:
jam();
systemErrorLab(signal);
break;
}//switch
#endif
}//Dbtc::handleScanStop()
/****************************************************************************
* T C S E I Z E R E Q
* THE APPLICATION SENDS A REQUEST TO SEIZE A CONNECT RECORD TO CARRY OUT A
......@@ -1409,7 +1310,7 @@ void Dbtc::printState(Signal* signal, int place)
<< " counter = " << apiConnectptr.p->counter
<< " lqhkeyconfrec = " << apiConnectptr.p->lqhkeyconfrec
<< " lqhkeyreqrec = " << apiConnectptr.p->lqhkeyreqrec << endl;
ndbout << "abortState = " << (int)apiConnectptr.p->abortState
ndbout << "abortState = " << apiConnectptr.p->abortState
<< " apiScanRec = " << apiConnectptr.p->apiScanRec
<< " returncode = " << apiConnectptr.p->returncode << endl;
ndbout << "tckeyrec = " << apiConnectptr.p->tckeyrec
......@@ -6155,11 +6056,14 @@ void Dbtc::timeOutFoundLab(Signal* signal, Uint32 TapiConPtr)
tcConnectptr.i = apiConnectptr.p->firstTcConnect;
sendAbortedAfterTimeout(signal, 0);
break;
case CS_START_SCAN:
case CS_START_SCAN:{
jam();
apiConnectptr.p->returncode = ZSCANTIME_OUT_ERROR;
handleScanStop(signal, 0);
ScanRecordPtr scanPtr;
scanPtr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanPtr, cscanrecFileSize, scanRecord);
scanError(signal, scanPtr, ZSCANTIME_OUT_ERROR);
break;
}
case CS_WAIT_ABORT_CONF:
jam();
tcConnectptr.i = apiConnectptr.p->currentTcConnect;
......@@ -6529,14 +6433,15 @@ void Dbtc::execSCAN_HBREP(Signal* signal)
c_scan_frag_pool.getPtr(scanFragptr);
switch (scanFragptr.p->scanFragState){
case ScanFragRec::LQH_ACTIVE:
//case ScanFragRec::LQH_ACTIVE_CLOSE:
break;
default:
DEBUG("execSCAN_HBREP: scanFragState="<<scanFragptr.p->scanFragState);
systemErrorLab(signal);
break;
}
ScanRecordPtr scanptr;
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
......@@ -6567,6 +6472,7 @@ void Dbtc::execSCAN_HBREP(Signal* signal)
updateBuddyTimer(apiConnectptr);
scanFragptr.p->startFragTimer(ctcTimer);
} else {
ndbassert(false);
DEBUG("SCAN_HBREP when scanFragTimer was turned off");
}
}//execSCAN_HBREP()
......@@ -6575,34 +6481,56 @@ void Dbtc::execSCAN_HBREP(Signal* signal)
/* Timeout has occured on a fragment which means a scan has timed out. */
/* If this is true we have an error in LQH/ACC. */
/*--------------------------------------------------------------------------*/
static int kalle = 0;
void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr)
{
scanFragptr.i = TscanConPtr;
c_scan_frag_pool.getPtr(scanFragptr);
DEBUG("timeOutFoundFragLab: scanFragState = "<<scanFragptr.p->scanFragState);
ScanFragRecPtr ptr;
c_scan_frag_pool.getPtr(ptr, TscanConPtr);
DEBUG(TscanConPtr << " timeOutFoundFragLab: scanFragState = "<< ptr.p->scanFragState);
/*-------------------------------------------------------------------------*/
// The scan fragment has expired its timeout. Check its state to decide
// what to do.
/*-------------------------------------------------------------------------*/
switch (scanFragptr.p->scanFragState) {
switch (ptr.p->scanFragState) {
case ScanFragRec::WAIT_GET_PRIMCONF:
jam();
// Crash the system if we do not return from DIGETPRIMREQ in time.
systemErrorLab(signal);
ndbrequire(false);
break;
case ScanFragRec::LQH_ACTIVE:
case ScanFragRec::LQH_ACTIVE:{
jam();
/**
* The LQH expired it's timeout, try to close it
*/
scanFragError(signal, ZSCAN_FRAG_LQH_ERROR);
DEBUG(" LQH_ACTIVE - closing the fragment scan in node "
<< refToNode(scanFragptr.p->lqhBlockref));
break;
Uint32 nodeId = refToNode(ptr.p->lqhBlockref);
Uint32 connectCount = getNodeInfo(nodeId).m_connectCount;
ScanRecordPtr scanptr;
scanptr.i = ptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
if(connectCount != ptr.p->m_connectCount){
jam();
/**
* The node has died
*/
ndbout_c("Node %d has died", nodeId);
ptr.p->scanFragState = ScanFragRec::COMPLETED;
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(ptr);
comp.add(ptr);
ptr.p->stopFragTimer();
} else {
kalle++;
if(kalle > 5)
ndbassert(scanptr.p->scanState != ScanRecord::CLOSING_SCAN);
}
scanError(signal, scanptr, ZSCAN_FRAG_LQH_ERROR);
break;
}
case ScanFragRec::DELIVERED:
jam();
case ScanFragRec::IDLE:
......@@ -6863,12 +6791,36 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
Uint32 scanPtrI,
Uint32 failedNodeId){
ScanRecordPtr scanptr;
for (scanptr.i = scanPtrI; scanptr.i < cscanrecFileSize; scanptr.i++) {
jam();
ptrAss(scanptr, scanRecord);
if (scanptr.p->scanState != ScanRecord::IDLE){
checkScanFragList(signal, failedNodeId,
scanptr.p, scanptr.p->m_running_scan_frags);
jam();
ScanFragRecPtr ptr;
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
bool found = false;
for(run.first(ptr); !ptr.isNull(); ){
jam();
ScanFragRecPtr curr = ptr;
run.next(ptr);
if (curr.p->scanFragState == ScanFragRec::LQH_ACTIVE &&
refToNode(curr.p->lqhBlockref) == failedNodeId){
jam();
run.remove(curr);
comp.add(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
found = true;
}
}
if(found){
jam();
scanError(signal, scanptr, ZSCAN_LQH_ERROR);
}
}
// Send CONTINUEB to continue later
......@@ -6886,29 +6838,7 @@ Dbtc::checkScanFragList(Signal* signal,
ScanRecord * scanP,
ScanFragList::Head & head){
ScanFragRecPtr ptr;
ScanFragList list(c_scan_frag_pool, head);
for(list.first(ptr); !ptr.isNull(); list.next(ptr)){
if (refToNode(ptr.p->lqhBlockref) == failedNodeId){
switch (ptr.p->scanFragState){
case ScanFragRec::LQH_ACTIVE:
jam();
apiConnectptr.i = scanptr.p->scanApiRec;
ptrCheckGuard(apiConnectptr, capiConnectFilesize,
apiConnectRecord);
DEBUG("checkScanActiveInFailedLqh: scanFragError");
scanFragError(signal, ZSCAN_LQH_ERROR);
break;
default:
/* empty */
jam();
break;
}
}
}
}
void Dbtc::execTAKE_OVERTCCONF(Signal* signal)
......@@ -8421,6 +8351,7 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
Uint32 noOprecPerFrag = ScanTabReq::getScanBatch(reqinfo);
Uint32 scanParallel = scanConcurrency;
Uint32 errCode;
ScanRecordPtr scanptr;
if(noOprecPerFrag == 0){
jam();
......@@ -8445,12 +8376,13 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
return;
}//if
ptrAss(apiConnectptr, apiConnectRecord);
ApiConnectRecord * transP = apiConnectptr.p;
if (apiConnectptr.p->apiConnectstate != CS_CONNECTED) {
if (transP->apiConnectstate != CS_CONNECTED) {
jam();
// could be left over from TCKEYREQ rollback
if (apiConnectptr.p->apiConnectstate == CS_ABORTING &&
apiConnectptr.p->abortState == AS_IDLE) {
if (transP->apiConnectstate == CS_ABORTING &&
transP->abortState == AS_IDLE) {
jam();
} else {
jam();
......@@ -8515,15 +8447,26 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
}
seizeTcConnect(signal);
seizeCacheRecord(signal);
seizeScanrec(signal);
initScanrec(signal, scanParallel, noOprecPerFrag);
tcConnectptr.p->apiConnect = apiConnectptr.i;
initScanApirec(signal, buddyPtr, transid1, transid2);
seizeCacheRecord(signal);
scanptr = seizeScanrec(signal);
ndbrequire(transP->apiScanRec == RNIL);
ndbrequire(scanptr.p->scanApiRec == RNIL);
initScanrec(scanptr, scanTabReq, scanParallel, noOprecPerFrag);
//initScanApirec(signal, buddyPtr, transid1, transid2);
transP->apiScanRec = scanptr.i;
transP->returncode = 0;
transP->transid[0] = transid1;
transP->transid[1] = transid2;
transP->buddyPtr = buddyPtr;
// The scan is started
apiConnectptr.p->apiConnectstate = CS_START_SCAN;
apiConnectptr.p->currSavePointId = currSavePointId;
transP->apiConnectstate = CS_START_SCAN;
transP->currSavePointId = currSavePointId;
/**********************************************************
* We start the timer on scanRec to be able to discover a
......@@ -8546,12 +8489,14 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
SCAN_TAB_error:
jam();
ndbrequire(false);
ScanTabRef * ref = (ScanTabRef*)&signal->theData[0];
ref->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
ref->apiConnectPtr = transP->ndbapiConnect;
ref->transId1 = transid1;
ref->transId2 = transid2;
ref->errorCode = errCode;
sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF,
ref->closeNeeded = 0;
sendSignal(transP->ndbapiBlockref, GSN_SCAN_TABREF,
signal, ScanTabRef::SignalLength, JBB);
return;
......@@ -8561,20 +8506,13 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
void Dbtc::initScanApirec(Signal* signal,
Uint32 buddyPtr, UintR transid1, UintR transid2)
{
ApiConnectRecord * apiPtr = apiConnectptr.p;
apiPtr->apiScanRec = scanptr.i;
apiPtr->returncode = 0;
apiPtr->transid[0] = transid1;
apiPtr->transid[1] = transid2;
apiPtr->buddyPtr = buddyPtr;
}//Dbtc::initScanApirec()
void Dbtc::initScanrec(Signal* signal,
void Dbtc::initScanrec(ScanRecordPtr scanptr,
const ScanTabReq * scanTabReq,
UintR scanParallel,
UintR noOprecPerFrag)
{
const ScanTabReq * const scanTabReq = (ScanTabReq *)&signal->theData[0];
const UintR reqinfo = scanTabReq->requestInfo;
ndbrequire(scanParallel < 16);
......@@ -8613,6 +8551,7 @@ void Dbtc::scanTabRefLab(Signal* signal, Uint32 errCode)
ref->transId1 = apiConnectptr.p->transid[0];
ref->transId2 = apiConnectptr.p->transid[1];
ref->errorCode = errCode;
ref->closeNeeded = 0;
sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF,
signal, ScanTabRef::SignalLength, JBB);
}//Dbtc::scanTabRefLab()
......@@ -8623,6 +8562,7 @@ void Dbtc::scanTabRefLab(Signal* signal, Uint32 errCode)
/*---------------------------------------------------------------------------*/
void Dbtc::scanAttrinfoLab(Signal* signal, UintR Tlen)
{
ScanRecordPtr scanptr;
scanptr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
tcConnectptr.i = scanptr.p->scanTcrec;
......@@ -8652,7 +8592,7 @@ void Dbtc::scanAttrinfoLab(Signal* signal, UintR Tlen)
* THIS SCAN. WE ARE READY TO START THE ACTUAL
* EXECUTION OF THE SCAN QUERY
**************************************************/
diFcountReqLab(signal);
diFcountReqLab(signal, scanptr);
return;
}//if
}//if
......@@ -8660,21 +8600,21 @@ void Dbtc::scanAttrinfoLab(Signal* signal, UintR Tlen)
scanAttrinfo_attrbuf_error:
jam();
abortScanLab(signal, ZGET_ATTRBUF_ERROR);
abortScanLab(signal, scanptr, ZGET_ATTRBUF_ERROR);
return;
scanAttrinfo_attrbuf2_error:
jam();
abortScanLab(signal, ZGET_ATTRBUF_ERROR);
abortScanLab(signal, scanptr, ZGET_ATTRBUF_ERROR);
return;
scanAttrinfo_len_error:
jam();
abortScanLab(signal, ZLENGTH_ERROR);
abortScanLab(signal, scanptr, ZLENGTH_ERROR);
return;
}//Dbtc::scanAttrinfoLab()
void Dbtc::diFcountReqLab(Signal* signal)
void Dbtc::diFcountReqLab(Signal* signal, ScanRecordPtr scanptr)
{
/**
* Check so that the table is not being dropped
......@@ -8685,7 +8625,8 @@ void Dbtc::diFcountReqLab(Signal* signal)
if (tabPtr.p->checkTable(scanptr.p->scanSchemaVersion)){
;
} else {
abortScanLab(signal, tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion));
abortScanLab(signal, scanptr,
tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion));
return;
}
......@@ -8717,6 +8658,7 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
apiConnectptr.i = tcConnectptr.p->apiConnect;
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
ScanRecordPtr scanptr;
scanptr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_FRAGMENT_COUNT);
......@@ -8728,7 +8670,7 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal)
}//if
if (tfragCount == 0) {
jam();
abortScanLab(signal, ZNO_FRAGMENT_ERROR);
abortScanLab(signal, scanptr, ZNO_FRAGMENT_ERROR);
return;
}//if
......@@ -8741,19 +8683,22 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal)
if (tabPtr.p->checkTable(scanptr.p->scanSchemaVersion)){
;
} else {
abortScanLab(signal, tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion));
abortScanLab(signal, scanptr,
tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion));
return;
}
if(scanptr.p->scanParallel > tfragCount){
jam();
abortScanLab(signal, ZTOO_HIGH_CONCURRENCY_ERROR);
abortScanLab(signal, scanptr, ZTOO_HIGH_CONCURRENCY_ERROR);
return;
}
scanptr.p->scanParallel = tfragCount;
scanptr.p->scanNoFrag = tfragCount;
scanptr.p->scanNextFragId = 0;
scanptr.p->scanState = ScanRecord::RUNNING;
setApiConTimer(apiConnectptr.i, 0, __LINE__);
updateBuddyTimer(apiConnectptr);
......@@ -8768,6 +8713,7 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal)
scanptr.p->scanTableref, scanptr.p->scanNextFragId);
#endif
ptr.p->lqhBlockref = 0;
ptr.p->startFragTimer(ctcTimer);
ptr.p->scanFragId = scanptr.p->scanNextFragId++;
ptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
......@@ -8792,6 +8738,7 @@ void Dbtc::execDI_FCOUNTREF(Signal* signal)
const Uint32 errCode = signal->theData[1];
apiConnectptr.i = tcConnectptr.p->apiConnect;
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
ScanRecordPtr scanptr;
scanptr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_FRAGMENT_COUNT);
......@@ -8801,10 +8748,10 @@ void Dbtc::execDI_FCOUNTREF(Signal* signal)
handleApiFailState(signal, apiConnectptr.i);
return;
}//if
abortScanLab(signal, errCode);
abortScanLab(signal, scanptr, errCode);
}//Dbtc::execDI_FCOUNTREF()
void Dbtc::abortScanLab(Signal* signal, Uint32 errCode)
void Dbtc::abortScanLab(Signal* signal, ScanRecordPtr scanptr, Uint32 errCode)
{
scanTabRefLab(signal, errCode);
releaseScanResources(scanptr);
......@@ -8835,6 +8782,7 @@ void Dbtc::releaseScanResources(ScanRecordPtr scanPtr)
scanPtr.p->nextScan = cfirstfreeScanrec;
scanPtr.p->scanState = ScanRecord::IDLE;
scanPtr.p->scanTcrec = RNIL;
scanPtr.p->scanApiRec = RNIL;
cfirstfreeScanrec = scanPtr.i;
apiConnectptr.p->apiScanRec = RNIL;
......@@ -8863,6 +8811,7 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF);
scanFragptr.p->stopFragTimer();
ScanRecordPtr scanptr;
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
......@@ -8876,7 +8825,12 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
Uint32 schemaVersion = scanptr.p->scanSchemaVersion;
if(tabPtr.p->checkTable(schemaVersion) == false){
jam();
scanFragError(signal, tabPtr.p->getErrorCode(schemaVersion));
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr);
comp.add(scanFragptr);
scanError(signal, scanptr, tabPtr.p->getErrorCode(schemaVersion));
return;
}
}
......@@ -8908,7 +8862,7 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
Uint32 ref = calcLqhBlockRef(tnodeid);
scanFragptr.p->lqhBlockref = ref;
scanFragptr.p->m_connectCount = getNodeInfo(tnodeid).m_connectCount;
sendScanFragReq(signal);
sendScanFragReq(signal, scanptr.p, scanFragptr.p);
attrbufptr.i = cachePtr.p->firstAttrbuf;
while (attrbufptr.i != RNIL) {
jam();
......@@ -8943,7 +8897,18 @@ void Dbtc::execDIGETPRIMREF(Signal* signal)
const Uint32 errCode = signal->theData[2];
c_scan_frag_pool.getPtr(scanFragptr);
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF);
scanFragError(signal, errCode);
ScanRecordPtr scanptr;
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr);
comp.add(scanFragptr);
scanError(signal, scanptr, errCode);
}//Dbtc::execDIGETPRIMREF()
/**
......@@ -8962,6 +8927,7 @@ void Dbtc::execSCAN_FRAGREF(Signal* signal)
scanFragptr.i = ref->senderData;
c_scan_frag_pool.getPtr(scanFragptr);
ScanRecordPtr scanptr;
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
......@@ -8981,42 +8947,64 @@ void Dbtc::execSCAN_FRAGREF(Signal* signal)
* stop fragment timer and call scanFragError to start
* close of the other fragment scans
*/
scanFragError(signal, errCode);
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
{
scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr);
comp.add(scanFragptr);
scanFragptr.p->stopFragTimer();
}
scanError(signal, scanptr, errCode);
}//Dbtc::execSCAN_FRAGREF()
/**
* Dbtc::scanFragError
* Dbtc::scanError
*
* Called when an error occurs during
* a scan of a fragment.
* NOTE that one scan may consist of several fragment scans.
*
*/
void Dbtc::scanFragError(Signal* signal, Uint32 errorCode)
void Dbtc::scanError(Signal* signal, ScanRecordPtr scanptr, Uint32 errorCode)
{
jam();
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
DEBUG("scanFragError, errorCode = "<< errorCode
<< ", scanState = " << scanptr.p->scanState);
ScanRecord* scanP = scanptr.p;
scanFragptr.p->stopFragTimer();
#if JONAS_NOT_DONE
DEBUG("scanError, errorCode = "<< errorCode <<
", scanState = " << scanptr.p->scanState);
apiConnectptr.i = scanptr.p->scanApiRec;
if(scanP->scanState == ScanRecord::CLOSING_SCAN){
jam();
close_scan_req_send_conf(signal, scanptr);
return;
}
ndbrequire(scanP->scanState == ScanRecord::RUNNING);
apiConnectptr.i = scanP->scanApiRec;
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
ndbrequire(apiConnectptr.p->apiScanRec == scanptr.i);
// If close of the scan is not already started
if (scanptr.p->scanState != ScanRecord::CLOSING_SCAN) {
jam();
apiConnectptr.p->returncode = errorCode;
/**
* Close scan wo/ having received an order to do so
*/
close_scan_req(signal, scanptr, false);
scanCompletedLab(signal);
const bool apiFail = (apiConnectptr.p->apiFailState == ZTRUE);
if(apiFail){
jam();
return;
}//if
#endif
}//Dbtc::scanFragError()
}
ScanTabRef * ref = (ScanTabRef*)&signal->theData[0];
ref->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
ref->transId1 = apiConnectptr.p->transid[0];
ref->transId2 = apiConnectptr.p->transid[1];
ref->errorCode = errorCode;
ref->closeNeeded = 1;
sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF,
signal, ScanTabRef::SignalLength, JBB);
}//Dbtc::scanError()
/************************************************************
* execSCAN_FRAGCONF
......@@ -9034,6 +9022,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
scanFragptr.i = conf->senderData;
c_scan_frag_pool.getPtr(scanFragptr);
ScanRecordPtr scanptr;
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
......@@ -9051,32 +9040,34 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
const Uint32 status = conf->fragmentCompleted;
scanFragptr.p->stopFragTimer();
DEBUG(apiConnectptr.i << " " << scanFragptr.i <<
" execSCAN_FRAGCONF() status: " << status
<< " ops: " << noCompletedOps << " from: " << refToNode(signal->getSendersBlockRef()));
if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
jam();
if(status == ZFALSE){
/**
* Dont deliver to api, but instead close in LQH
* Dont need to mess with queues
* We have started closing = we sent a close -> ignore this
*/
ndbout_c("running -> running(close)");
jam();
ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0];
nextReq->senderData = scanFragptr.i;
nextReq->closeFlag = ZTRUE;
nextReq->transId1 = apiConnectptr.p->transid[0];
nextReq->transId2 = apiConnectptr.p->transid[1];
sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
DEBUG(apiConnectptr.i << " " << scanFragptr.i <<
" Received SCANFRAG_CONF wo/ close when in "
" CLOSING_SCAN:" << status << " " << noCompletedOps);
return;
} else {
jam();
DEBUG(apiConnectptr.i << " " << scanFragptr.i
<< " Received SCANFRAG_CONF w/ close when in "
" CLOSING_SCAN:" << status << " " << noCompletedOps);
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr);
comp.add(scanFragptr);
scanFragptr.p->stopFragTimer();
scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
}
close_scan_req_send_conf(signal, scanptr);
return;
......@@ -9126,7 +9117,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
if(scanptr.p->m_queued_count > /** Min */ 0){
jam();
sendScanTabConf(signal);
sendScanTabConf(signal, scanptr.p);
}
}//Dbtc::execSCAN_FRAGCONF()
......@@ -9164,6 +9155,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
ref->transId1 = transid1;
ref->transId2 = transid2;
ref->errorCode = ZSTATE_ERROR;
ref->closeNeeded = 0;
sendSignal(signal->senderBlockRef(), GSN_SCAN_TABREF,
signal, ScanTabRef::SignalLength, JBB);
DEBUG("Wrong transid");
......@@ -9188,6 +9180,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
}
DEBUG("scanTabRefLab: ZSTATE_ERROR");
DEBUG(" apiConnectstate="<<apiConnectptr.p->apiConnectstate);
ndbrequire(false); //B2 indication of strange things going on
scanTabRefLab(signal, ZSTATE_ERROR);
return;
}//if
......@@ -9197,6 +9190,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
********************************************************/
// Stop the timer that is used to check for timeout in the API
setApiConTimer(apiConnectptr.i, 0, __LINE__);
ScanRecordPtr scanptr;
scanptr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
ScanRecord* scanP = scanptr.p;
......@@ -9209,10 +9203,21 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
* APPLICATION IS CLOSING THE SCAN.
**********************************************************************/
ndbrequire(len == 0);
close_scan_req(signal, scanptr);
close_scan_req(signal, scanptr, true);
return;
}//if
if (scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
jam();
/**
* The scan is closing (typically due to error)
* but the API hasn't understood it yet
*
* Wait for API close request
*/
return;
}
// Copy op ptrs so I dont overwrite them when sending...
memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len);
......@@ -9243,26 +9248,25 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
}//Dbtc::execSCAN_NEXTREQ()
void
Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){
Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
#ifdef VM_TRACE
ndbout_c("%d close_scan_req", apiConnectptr.i);
#endif
ScanRecord* scanP = scanPtr.p;
scanPtr.p->scanState = ScanRecord::CLOSING_SCAN;
scanPtr.p->m_close_scan_req = req_received;
/**
* Queue : Action
* ========== : =================
* ============= : =================
* completed : -
* running : -
* delivered : close -> LQH
* running : close -> LQH
* delivered w/ : close -> LQH
* delivered wo/ : move to completed
* queued w/ : close -> LQH
* queued wo/ : move to completed
*/
/**
* All delivered should to be closed
*/
ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0];
nextReq->closeFlag = ZTRUE;
nextReq->transId1 = apiConnectptr.p->transid[0];
......@@ -9273,6 +9277,28 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
// Close running
for(running.first(ptr); !ptr.isNull(); ){
ScanFragRecPtr curr = ptr; // Remove while iterating...
running.next(ptr);
if(curr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF){
jam();
continue;
}
ndbrequire(curr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
curr.p->startFragTimer(ctcTimer);
curr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
nextReq->senderData = curr.i;
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
ndbout_c("%d running -> closing", curr.i);
}
// Close delivered
for(delivered.first(ptr); !ptr.isNull(); ){
jam();
ScanFragRecPtr curr = ptr; // Remove while iterating...
......@@ -9290,20 +9316,19 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
ndbout_c("delivered -> running");
ndbout_c("%d delivered -> closing (%d)", curr.i, curr.p->m_ops);
} else {
jam();
completed.add(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
ndbout_c("delivered -> completed");
ndbout_c("%d delivered -> completed", curr.i);
}
}//for
/**
* All queued with data should be closed
*/
ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
for(queued.first(ptr); !ptr.isNull(); ){
jam();
ndbrequire(ptr.p->scanFragState == ScanFragRec::QUEUED_FOR_DELIVERY);
......@@ -9322,32 +9347,59 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
ndbout_c("queued -> running");
ndbout_c("%d queued -> closing", curr.i);
} else {
jam();
completed.add(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
ndbout_c("queued -> completed");
ndbout_c("%d queued -> completed", curr.i);
}
}
}
close_scan_req_send_conf(signal, scanptr);
close_scan_req_send_conf(signal, scanPtr);
}
void
Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
jam();
ndbrequire(scanPtr.p->m_queued_scan_frags.isEmpty());
ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty());
//ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty());
#if 1
{
ScanFragList comp(c_scan_frag_pool, scanPtr.p->m_completed_scan_frags);
ScanFragRecPtr ptr;
for(comp.first(ptr); !ptr.isNull(); comp.next(ptr)){
ndbrequire(ptr.p->scanFragTimer == 0);
ndbrequire(ptr.p->scanFragState == ScanFragRec::COMPLETED);
}
}
#endif
if(!scanPtr.p->m_running_scan_frags.isEmpty()){
jam();
ndbout_c("%d close_scan_req_send_conf: not ready", apiConnectptr.i);
return;
}
const bool apiFail = (apiConnectptr.p->apiFailState == ZTRUE);
if(!scanPtr.p->m_close_scan_req){
jam();
/**
* The API hasn't order closing yet
*/
ndbout_c("%d close_scan_req_send_conf: api not ready", apiConnectptr.i);
return;
}
ndbout_c("%d close_scan_req_send_conf: ready", apiConnectptr.i);
if(!apiFail){
jam();
Uint32 ref = apiConnectptr.p->ndbapiBlockref;
......@@ -9370,53 +9422,58 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
}
}
void Dbtc::seizeScanrec(Signal* signal) {
Dbtc::ScanRecordPtr
Dbtc::seizeScanrec(Signal* signal) {
ScanRecordPtr scanptr;
scanptr.i = cfirstfreeScanrec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
cfirstfreeScanrec = scanptr.p->nextScan;
scanptr.p->nextScan = RNIL;
ndbrequire(scanptr.p->scanState == ScanRecord::IDLE);
return scanptr;
}//Dbtc::seizeScanrec()
void Dbtc::sendScanFragReq(Signal* signal) {
void Dbtc::sendScanFragReq(Signal* signal,
ScanRecord* scanP,
ScanFragRec* scanFragP){
Uint32 requestInfo = 0;
ScanFragReq::setConcurrency(requestInfo, scanFragptr.p->scanFragConcurrency);
ScanFragReq::setLockMode(requestInfo, scanptr.p->scanLockMode);
ScanFragReq::setHoldLockFlag(requestInfo, scanptr.p->scanLockHold);
if(scanptr.p->scanLockMode == 1){ // Not read -> keyinfo
ScanFragReq::setConcurrency(requestInfo, scanFragP->scanFragConcurrency);
ScanFragReq::setLockMode(requestInfo, scanP->scanLockMode);
ScanFragReq::setHoldLockFlag(requestInfo, scanP->scanLockHold);
if(scanP->scanLockMode == 1){ // Not read -> keyinfo
jam();
ScanFragReq::setKeyinfoFlag(requestInfo, 1);
}
ScanFragReq::setReadCommittedFlag(requestInfo, scanptr.p->readCommitted);
ScanFragReq::setRangeScanFlag(requestInfo, scanptr.p->rangeScan);
ScanFragReq::setAttrLen(requestInfo, scanptr.p->scanAiLength);
ScanFragReq::setReadCommittedFlag(requestInfo, scanP->readCommitted);
ScanFragReq::setRangeScanFlag(requestInfo, scanP->rangeScan);
ScanFragReq::setAttrLen(requestInfo, scanP->scanAiLength);
ScanFragReq::setScanPrio(requestInfo, 1);
apiConnectptr.i = scanptr.p->scanApiRec;
apiConnectptr.i = scanP->scanApiRec;
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
ScanFragReq * const req = (ScanFragReq *)&signal->theData[0];
req->senderData = scanFragptr.i;
req->resultRef = apiConnectptr.p->ndbapiBlockref;
req->requestInfo = requestInfo;
req->savePointId = apiConnectptr.p->currSavePointId;
req->tableId = scanptr.p->scanTableref;
req->fragmentNo = scanFragptr.p->scanFragId;
req->schemaVersion = scanptr.p->scanSchemaVersion;
req->tableId = scanP->scanTableref;
req->fragmentNo = scanFragP->scanFragId;
req->schemaVersion = scanP->scanSchemaVersion;
req->transId1 = apiConnectptr.p->transid[0];
req->transId2 = apiConnectptr.p->transid[1];
for(int i = 0; i<16; i++){
req->clientOpPtr[i] = scanFragptr.p->m_apiPtr;
req->clientOpPtr[i] = scanFragP->m_apiPtr;
}
sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_FRAGREQ, signal, 25, JBB);
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, 25, JBB);
updateBuddyTimer(apiConnectptr);
scanFragptr.p->startFragTimer(ctcTimer);
scanFragP->startFragTimer(ctcTimer);
}//Dbtc::sendScanFragReq()
void Dbtc::sendScanTabConf(Signal* signal) {
void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
jam();
Uint32* ops = signal->getDataPtrSend()+4;
Uint32 op_count = scanptr.p->m_queued_count;
Uint32 op_count = scanP->m_queued_count;
if(4 + 3 * op_count > 25){
jam();
ops += 21;
......@@ -9428,7 +9485,6 @@ void Dbtc::sendScanTabConf(Signal* signal) {
conf->transId1 = apiConnectptr.p->transid[0];
conf->transId2 = apiConnectptr.p->transid[1];
ScanFragRecPtr ptr;
ScanRecord* scanP = scanptr.p;
ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
......@@ -9466,7 +9522,7 @@ void Dbtc::sendScanTabConf(Signal* signal) {
sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABCONF, signal,
ScanTabConf::SignalLength + 3 * op_count, JBB);
}
scanptr.p->m_queued_count = 0;
scanP->m_queued_count = 0;
}//Dbtc::sendScanTabConf()
......@@ -9715,12 +9771,14 @@ void Dbtc::initialiseRecordsLab(Signal* signal, UintR Tdata0,
/* ========================================================================= */
void Dbtc::initialiseScanrec(Signal* signal)
{
ScanRecordPtr scanptr;
ndbrequire(cscanrecFileSize > 0);
for (scanptr.i = 0; scanptr.i < cscanrecFileSize; scanptr.i++) {
jam();
ptrAss(scanptr, scanRecord);
new (scanptr.p) ScanRecord();
scanptr.p->scanState = ScanRecord::IDLE;
scanptr.p->scanApiRec = RNIL;
scanptr.p->nextScan = scanptr.i + 1;
}//for
scanptr.i = cscanrecFileSize - 1;
......@@ -11496,7 +11554,8 @@ void Dbtc::readIndexTable(Signal* signal,
Uint32 transId1 = indexOp->tcIndxReq->transId1;
Uint32 transId2 = indexOp->tcIndxReq->transId2;
const Uint8 opType = TcKeyReq::getOperationType(tcKeyRequestInfo);
const Operation_t opType =
(Operation_t)TcKeyReq::getOperationType(tcKeyRequestInfo);
// Find index table
if ((indexData = c_theIndexes.getPtr(indexOp->tcIndxReq->indexId)) == NULL) {
......
......@@ -54,6 +54,9 @@ void Dbtup::execSEND_PACKED(Signal* signal)
void Dbtup::bufferTRANSID_AI(Signal* signal, BlockReference aRef,
Uint32 Tlen)
{
if(Tlen == 3)
return;
Uint32 hostId = refToNode(aRef);
Uint32 Theader = ((refToBlock(aRef) << 16)+(Tlen-3));
......
......@@ -61,7 +61,14 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){
const ScanTabRef * ref = CAST_CONSTPTR(ScanTabRef, aSignal->getDataPtr());
if(checkState_TransId(&ref->transId1)){
theScanningOp->execCLOSE_SCAN_REP(ref->errorCode);
theScanningOp->theError.code = ref->errorCode;
if(!ref->closeNeeded){
theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
assert(theScanningOp->m_sent_receivers_count);
theScanningOp->m_sent_receivers_count--;
theScanningOp->m_conf_receivers_count++;
return 0;
}
return -1;
......@@ -88,11 +95,10 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
if(checkState_TransId(&conf->transId1)){
if (conf->requestInfo == ScanTabConf::EndOfData) {
theScanningOp->execCLOSE_SCAN_REP(0);
theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
int noComp = -1;
for(Uint32 i = 0; i<len; i += 3){
Uint32 ptrI = * ops++;
Uint32 tcPtrI = * ops++;
......@@ -108,15 +114,13 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
/**
*
*/
noComp++;
theScanningOp->receiver_delivered(tOp);
} else if(info == ScanTabConf::EndOfData){
noComp++;
theScanningOp->receiver_completed(tOp);
}
}
}
return noComp;
return 0;
}
return -1;
......
......@@ -135,6 +135,8 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
m_ordered = 0;
Uint32 fragCount = m_currentTable->m_fragmentCount;
ndbout_c("batch: %d parallell: %d fragCount: %d",
batch, parallell, fragCount);
if(batch + parallell == 0){ // Max speed
batch = 16;
......@@ -154,6 +156,9 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
else if(parallell == 0)
parallell = fragCount;
ndbout_c("batch: %d parallell: %d fragCount: %d",
batch, parallell, fragCount);
assert(parallell > 0);
// It is only possible to call openScan if
......@@ -486,6 +491,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
last = m_api_receivers_count;
do {
if(theError.code){
setErrorCode(theError.code);
return -1;
}
Uint32 cnt = m_conf_receivers_count;
Uint32 sent = m_sent_receivers_count;
......@@ -502,12 +512,17 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
*/
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
} else {
idx = last;
retVal = -1; //return_code;
retVal = -2; //return_code;
}
} else if(retVal == 2){
/**
......@@ -516,6 +531,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
if(send_next_scan(0, true) == 0){ // Close scan
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
return 1;
......@@ -633,6 +653,12 @@ NdbScanOperation::doSend(int ProcessorId)
void NdbScanOperation::closeScan()
{
ndbout_c("closeScan %d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
do {
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
......@@ -651,6 +677,11 @@ void NdbScanOperation::closeScan()
while(m_sent_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
......@@ -679,6 +710,11 @@ void NdbScanOperation::closeScan()
do {
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
......@@ -701,22 +737,7 @@ void NdbScanOperation::closeScan()
}
void
NdbScanOperation::execCLOSE_SCAN_REP(Uint32 errCode){
/**
* We will receive no further signals from this scan
*/
if(!errCode){
/**
* Normal termination
*/
theNdbCon->theCommitStatus = NdbConnection::Committed;
theNdbCon->theCompletionStatus = NdbConnection::CompletedSuccess;
} else {
/**
* Something is fishy
*/
abort();
}
NdbScanOperation::execCLOSE_SCAN_REP(){
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
......@@ -1206,7 +1227,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(u_idx)){
Uint32 tmp = m_sent_receivers_count;
while(m_sent_receivers_count > 0){
while(m_sent_receivers_count > 0 && !theError.code){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
......@@ -1223,6 +1244,10 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
if(theError.code){
setErrorCode(theError.code);
return -1;
}
}
} else {
return 2;
......@@ -1279,9 +1304,12 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(0, true) == 0){
if(seq == tp->getNodeSequence(nodeId) &&
send_next_scan(0, true) == 0 &&
theError.code == 0){
return 1;
}
setErrorCode(theError.code);
return -1;
}
......
......@@ -706,8 +706,11 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
if (tWaitState == WAIT_SCAN){
tCon = void2con(tFirstDataPtr);
assert(tFirstDataPtr != 0 &&
void2con(tFirstDataPtr)->checkMagicNumber() == 0);
if (tCon->checkMagicNumber() == 0){
tReturnCode = tCon->receiveSCAN_TABREF(aSignal);
if (tReturnCode != -1){
......@@ -715,7 +718,6 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
break;
}
}
goto InvalidSignal;
}
case GSN_SCAN_TABINFO:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment