post merge

parent a937ab0a
...@@ -477,186 +477,6 @@ restart: ...@@ -477,186 +477,6 @@ restart:
return NDBT_FAILED; return NDBT_FAILED;
} }
#define RESTART_SCAN 99
int
HugoTransactions::scanUpdateRecords(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
if(m_defaultScanUpdateMethod == 1){
return scanUpdateRecords1(pNdb, records, abortPercent, parallelism);
} else if(m_defaultScanUpdateMethod == 2){
return scanUpdateRecords2(pNdb, records, abortPercent, parallelism);
} else {
return scanUpdateRecords3(pNdb, records, abortPercent, parallelism);
}
}
// Scan all records exclusive and update
// them one by one
int
HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
return scanUpdateRecords3(pNdb, records, abortPercent, 1);
}
// Scan all records exclusive and update
// them batched by asking nextScanResult to
// give us all cached records before fetching new
// records from db
int
HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
return scanUpdateRecords3(pNdb, records, abortPercent, parallelism);
}
int
HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
int retryAttempt = 0;
int check, a;
NdbScanOperation *pOp;
while (true){
restart:
if (retryAttempt++ >= m_retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
ERR(err);
if (err.status == NdbError::TemporaryError){
NdbSleep_MilliSleep(50);
continue;
}
return NDBT_FAILED;
}
pOp = getScanOperation(pTrans);
if (pOp == NULL) {
ERR(pTrans->getNdbError());
closeTransaction(pNdb);
return NDBT_FAILED;
}
if( pOp->readTuplesExclusive(parallelism) ) {
ERR(pTrans->getNdbError());
closeTransaction(pNdb);
return NDBT_FAILED;
}
// Read all attributes from this table
for(a=0; a<tab.getNoOfColumns(); a++){
if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){
ERR(pTrans->getNdbError());
closeTransaction(pNdb);
return NDBT_FAILED;
}
}
check = pTrans->execute(NoCommit, AbortOnError);
if( check == -1 ) {
const NdbError err = pTrans->getNdbError();
ERR(err);
closeTransaction(pNdb);
if (err.status == NdbError::TemporaryError){
NdbSleep_MilliSleep(50);
continue;
}
return NDBT_FAILED;
}
// Abort after 1-100 or 1-records rows
int ranVal = rand();
int abortCount = ranVal % (records == 0 ? 100 : records);
bool abortTrans = false;
if (abort > 0){
// Abort if abortCount is less then abortPercent
if (abortCount < abortPercent)
abortTrans = true;
}
int rows = 0;
while((check = pOp->nextResult(true)) == 0){
do {
rows++;
NdbOperation* pUp = pOp->updateCurrentTuple();
if(pUp == 0){
ERR(pTrans->getNdbError());
closeTransaction(pNdb);
return NDBT_FAILED;
}
const int updates = calc.getUpdatesValue(&row) + 1;
const int r = calc.getIdValue(&row);
for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == false){
if(setValueForAttr(pUp, a, r, updates ) != 0){
ERR(pTrans->getNdbError());
closeTransaction(pNdb);
return NDBT_FAILED;
}
}
}
if (rows == abortCount && abortTrans == true){
g_info << "Scan is aborted" << endl;
// This scan should be aborted
closeTransaction(pNdb);
return NDBT_OK;
}
} while((check = pOp->nextResult(false)) == 0);
if(check != -1){
check = pTrans->execute(Commit, AbortOnError);
if(check != -1)
m_latest_gci = pTrans->getGCI();
pTrans->restart();
}
const NdbError err = pTrans->getNdbError();
if( check == -1 ) {
closeTransaction(pNdb);
ERR(err);
if (err.status == NdbError::TemporaryError){
NdbSleep_MilliSleep(50);
goto restart;
}
return NDBT_FAILED;
}
}
const NdbError err = pTrans->getNdbError();
if( check == -1 ) {
closeTransaction(pNdb);
ERR(err);
if (err.status == NdbError::TemporaryError){
NdbSleep_MilliSleep(50);
goto restart;
}
return NDBT_FAILED;
}
closeTransaction(pNdb);
g_info << rows << " rows have been updated" << endl;
return NDBT_OK;
}
return NDBT_FAILED;
}
int int
HugoTransactions::scanUpdateRecords(Ndb* pNdb, HugoTransactions::scanUpdateRecords(Ndb* pNdb,
int records, int records,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment