/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "HugoTransactions.hpp" #include <NdbSleep.h> HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab): HugoOperations(_tab), row(_tab){ m_defaultScanUpdateMethod = 3; } HugoTransactions::~HugoTransactions(){ deallocRows(); } int HugoTransactions::scanReadRecords(Ndb* pNdb, int records, int abortPercent, int parallelism, NdbOperation::LockMode lm) { int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; NdbScanOperation *pOp; while (true){ if (retryAttempt >= retryMax){ g_err << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } pOp = pTrans->getNdbScanOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } NdbResultSet * rs; rs = pOp ->readTuples(lm); if( rs == 0 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->interpret_exit_ok(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } for(a = 0; a<tab.getNoOfColumns(); a++){ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } // Abort after 1-100 or 1-records rows int ranVal = rand(); int abortCount = ranVal % (records == 0 ? 100 : records); bool abortTrans = false; if (abort > 0){ // Abort if abortCount is less then abortPercent if (abortCount < abortPercent) abortTrans = true; } int eof; int rows = 0; while((eof = rs->nextResult(true)) == 0){ rows++; if (calc.verifyRowValues(&row) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } if (abortCount == rows && abortTrans == true){ ndbout << "Scan is aborted" << endl; g_info << "Scan is aborted" << endl; rs->close(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } closeTransaction(pNdb); return NDBT_OK; } } if (eof == -1) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR_INFO(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); switch (err.code){ case 488: case 245: case 490: // Too many active scans, no limit on number of retry attempts break; default: retryAttempt++; } continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } closeTransaction(pNdb); g_info << rows << " rows have been read" << endl; if (records != 0 && rows != records){ g_err << "Check expected number of records failed" << endl << " expected=" << records <<", " << endl << " read=" << rows << endl; return NDBT_FAILED; } return NDBT_OK; } return NDBT_FAILED; } int HugoTransactions::scanReadRecords(Ndb* pNdb, const NdbDictionary::Index * pIdx, int records, int abortPercent, int parallelism, NdbOperation::LockMode lm, bool sorted) { int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; NdbIndexScanOperation *pOp; while (true){ if (retryAttempt >= retryMax){ g_err << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } pOp = pTrans->getNdbIndexScanOperation(pIdx->getName(), tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } NdbResultSet * rs; rs = pOp ->readTuples(lm, 0, parallelism, sorted); if( rs == 0 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->interpret_exit_ok(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } for(a = 0; a<tab.getNoOfColumns(); a++){ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } // Abort after 1-100 or 1-records rows int ranVal = rand(); int abortCount = ranVal % (records == 0 ? 100 : records); bool abortTrans = false; if (abort > 0){ // Abort if abortCount is less then abortPercent if (abortCount < abortPercent) abortTrans = true; } int eof; int rows = 0; while((eof = rs->nextResult(true)) == 0){ rows++; if (calc.verifyRowValues(&row) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } if (abortCount == rows && abortTrans == true){ ndbout << "Scan is aborted" << endl; g_info << "Scan is aborted" << endl; rs->close(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } closeTransaction(pNdb); return NDBT_OK; } } if (eof == -1) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR_INFO(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); switch (err.code){ case 488: case 245: case 490: // Too many active scans, no limit on number of retry attempts break; default: retryAttempt++; } continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } closeTransaction(pNdb); g_info << rows << " rows have been read" << endl; if (records != 0 && rows != records){ g_err << "Check expected number of records failed" << endl << " expected=" << records <<", " << endl << " read=" << rows << endl; return NDBT_FAILED; } return NDBT_OK; } return NDBT_FAILED; } #define RESTART_SCAN 99 int HugoTransactions::scanUpdateRecords(Ndb* pNdb, int records, int abortPercent, int parallelism){ if(m_defaultScanUpdateMethod == 1){ return scanUpdateRecords1(pNdb, records, abortPercent, parallelism); } else if(m_defaultScanUpdateMethod == 2){ return scanUpdateRecords2(pNdb, records, abortPercent, parallelism); } else { return scanUpdateRecords3(pNdb, records, abortPercent, parallelism); } } // Scan all records exclusive and update // them one by one int HugoTransactions::scanUpdateRecords1(Ndb* pNdb, int records, int abortPercent, int parallelism){ #if 1 return scanUpdateRecords3(pNdb, records, abortPercent, 1); #else int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; NdbOperation *pOp; while (true){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->openScanExclusive(parallelism); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->interpret_exit_ok(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Read all attributes from this table for(a=0; a<tab.getNoOfColumns(); a++){ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } check = pTrans->executeScan(); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } // Abort after 1-100 or 1-records rows int ranVal = rand(); int abortCount = ranVal % (records == 0 ? 100 : records); bool abortTrans = false; if (abort > 0){ // Abort if abortCount is less then abortPercent if (abortCount < abortPercent) abortTrans = true; } int eof; int rows = 0; eof = pTrans->nextScanResult(); while(eof == 0){ rows++; if (abortCount == rows && abortTrans == true){ g_info << "Scan is aborted" << endl; // This scan should be aborted check = pTrans->stopScan(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } closeTransaction(pNdb); return NDBT_OK; } int res = takeOverAndUpdateRecord(pNdb, pOp); if(res == RESTART_SCAN){ eof = -2; continue; } if (res != 0){ closeTransaction(pNdb); return res; } eof = pTrans->nextScanResult(); } if (eof == -1) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); switch (err.code){ case 488: case 245: case 490: // Too many active scans, no limit on number of retry attempts break; default: retryAttempt++; } continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } if(eof == -2){ closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } closeTransaction(pNdb); g_info << rows << " rows have been updated" << endl; return NDBT_OK; } return NDBT_FAILED; #endif } // Scan all records exclusive and update // them batched by asking nextScanResult to // give us all cached records before fetching new // records from db int HugoTransactions::scanUpdateRecords2(Ndb* pNdb, int records, int abortPercent, int parallelism){ #if 1 return scanUpdateRecords3(pNdb, records, abortPercent, parallelism); #else int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; NdbOperation *pOp; while (true){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->openScanExclusive(parallelism); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->interpret_exit_ok(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Read all attributes from this table for(a=0; a<tab.getNoOfColumns(); a++){ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } check = pTrans->executeScan(); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } // Abort after 1-100 or 1-records rows int ranVal = rand(); int abortCount = ranVal % (records == 0 ? 100 : records); bool abortTrans = false; if (abort > 0){ // Abort if abortCount is less then abortPercent if (abortCount < abortPercent) abortTrans = true; } int eof; int rows = 0; NdbConnection* pUpTrans; while((eof = pTrans->nextScanResult(true)) == 0){ pUpTrans = pNdb->startTransaction(); if (pUpTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } do { rows++; if (addRowToUpdate(pNdb, pUpTrans, pOp) != 0){ pNdb->closeTransaction(pUpTrans); closeTransaction(pNdb); return NDBT_FAILED; } } while((eof = pTrans->nextScanResult(false)) == 0); if (abortCount == rows && abortTrans == true){ g_info << "Scan is aborted" << endl; // This scan should be aborted check = pTrans->stopScan(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); pNdb->closeTransaction(pUpTrans); return NDBT_FAILED; } closeTransaction(pNdb); pNdb->closeTransaction(pUpTrans); return NDBT_OK; } check = pUpTrans->execute(Commit); if( check == -1 ) { const NdbError err = pUpTrans->getNdbError(); ERR(err); pNdb->closeTransaction(pUpTrans); closeTransaction(pNdb); return NDBT_FAILED; } pNdb->closeTransaction(pUpTrans); } if (eof == -1) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } closeTransaction(pNdb); g_info << rows << " rows have been updated" << endl; return NDBT_OK; } return NDBT_FAILED; #endif } int HugoTransactions::scanUpdateRecords3(Ndb* pNdb, int records, int abortPercent, int parallelism){ int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; NdbScanOperation *pOp; while (true){ restart: if (retryAttempt++ >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); ERR(err); if (err.status == NdbError::TemporaryError){ NdbSleep_MilliSleep(50); continue; } return NDBT_FAILED; } pOp = pTrans->getNdbScanOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } NdbResultSet *rs = pOp->readTuplesExclusive(parallelism); if( rs == 0 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Read all attributes from this table for(a=0; a<tab.getNoOfColumns(); a++){ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); ERR(err); closeTransaction(pNdb); if (err.status == NdbError::TemporaryError){ NdbSleep_MilliSleep(50); continue; } return NDBT_FAILED; } // Abort after 1-100 or 1-records rows int ranVal = rand(); int abortCount = ranVal % (records == 0 ? 100 : records); bool abortTrans = false; if (abort > 0){ // Abort if abortCount is less then abortPercent if (abortCount < abortPercent) abortTrans = true; } int rows = 0; while((check = rs->nextResult(true)) == 0){ do { rows++; NdbOperation* pUp = rs->updateTuple(); if(pUp == 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } const int updates = calc.getUpdatesValue(&row) + 1; const int r = calc.getIdValue(&row); for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == false){ if(setValueForAttr(pUp, a, r, updates ) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } if (rows == abortCount && abortTrans == true){ g_info << "Scan is aborted" << endl; // This scan should be aborted closeTransaction(pNdb); return NDBT_OK; } } while((check = rs->nextResult(false)) == 0); if(check != -1){ check = pTrans->execute(Commit); pTrans->restart(); } const NdbError err = pTrans->getNdbError(); if( check == -1 ) { closeTransaction(pNdb); ERR(err); if (err.status == NdbError::TemporaryError){ NdbSleep_MilliSleep(50); goto restart; } return NDBT_FAILED; } } const NdbError err = pTrans->getNdbError(); if( check == -1 ) { closeTransaction(pNdb); ERR(err); if (err.status == NdbError::TemporaryError){ NdbSleep_MilliSleep(50); goto restart; } return NDBT_FAILED; } closeTransaction(pNdb); g_info << rows << " rows have been updated" << endl; return NDBT_OK; } return NDBT_FAILED; } int HugoTransactions::loadTable(Ndb* pNdb, int records, int batch, bool allowConstraintViolation, int doSleep, bool oneTrans){ int check, a; int retryAttempt = 0; int retryMax = 5; NdbConnection *pTrans; NdbOperation *pOp; bool first_batch = true; const int org = batch; const int cols = tab.getNoOfColumns(); const int brow = tab.getRowSizeInBytes(); const int bytes = 12 + brow + 4 * cols; batch = (batch * 256); // -> 512 -> 65536k per commit batch = batch/bytes; // batch = batch == 0 ? 1 : batch; if(batch != org){ g_info << "batch = " << org << " rowsize = " << bytes << " -> rows/commit = " << batch << endl; } g_info << "|- Inserting records..." << endl; for (int c=0 ; c<records ; ){ bool closeTrans; if (retryAttempt >= retryMax){ g_info << "Record " << c << " could not be inserted, has retried " << retryAttempt << " times " << endl; // Reset retry counters and continue with next record retryAttempt = 0; c++; } if (doSleep > 0) NdbSleep_MilliSleep(doSleep); // if (first_batch || !oneTrans) { if (first_batch || !pTrans) { first_batch = false; pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } } for(int b = 0; b < batch && c+b<records; b++){ pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->insertTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Set a calculated value for each attribute in this table for (a = 0; a<tab.getNoOfColumns(); a++){ if(setValueForAttr(pOp, a, c+b, 0 ) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Execute the transaction and insert the record if (!oneTrans || (c + batch) >= records) { // closeTrans = true; closeTrans = false; check = pTrans->execute( Commit ); pTrans->restart(); } else { closeTrans = false; check = pTrans->execute( NoCommit ); } if(check == -1 ) { const NdbError err = pTrans->getNdbError(); closeTransaction(pNdb); pTrans= 0; switch(err.status){ case NdbError::Success: ERR(err); g_info << "ERROR: NdbError reports success when transcaction failed" << endl; return NDBT_FAILED; break; case NdbError::TemporaryError: ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; break; case NdbError::UnknownResult: ERR(err); return NDBT_FAILED; break; case NdbError::PermanentError: if (allowConstraintViolation == true){ switch (err.classification){ case NdbError::ConstraintViolation: // Tuple already existed, OK but should be reported g_info << c << ": " << err.code << " " << err.message << endl; c++; continue; break; default: break; } } ERR(err); return err.code; break; } } else{ if (closeTrans) { closeTransaction(pNdb); pTrans= 0; } } // Step to next record c = c+batch; retryAttempt = 0; } return NDBT_OK; } int HugoTransactions::fillTable(Ndb* pNdb, int batch){ int check, a, b; int retryAttempt = 0; int retryMax = 5; NdbConnection *pTrans; NdbOperation *pOp; g_info << "|- Inserting records..." << endl; for (int c=0 ; ; ){ if (retryAttempt >= retryMax){ g_info << "Record " << c << " could not be inserted, has retried " << retryAttempt << " times " << endl; // Reset retry counters and continue with next record retryAttempt = 0; c++; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } for(b = 0; b < batch; b++){ pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->insertTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Set a calculated value for each attribute in this table for (a = 0; a<tab.getNoOfColumns(); a++){ if(setValueForAttr(pOp, a, c+b, 0 ) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Execute the transaction and insert the record check = pTrans->execute( Commit, CommitAsMuchAsPossible ); if(check == -1 ) { const NdbError err = pTrans->getNdbError(); closeTransaction(pNdb); switch(err.status){ case NdbError::Success: ERR(err); g_info << "ERROR: NdbError reports success when transcaction failed" << endl; return NDBT_FAILED; break; case NdbError::TemporaryError: ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; break; case NdbError::UnknownResult: ERR(err); return NDBT_FAILED; break; case NdbError::PermanentError: // if (allowConstraintViolation == true){ // switch (err.classification){ // case NdbError::ConstraintViolation: // // Tuple already existed, OK but should be reported // g_info << c << ": " << err.code << " " << err.message << endl; // c++; // continue; // break; // default: // break;es // } // } // Check if this is the "db full" error if (err.classification==NdbError::InsufficientSpace){ ERR(err); return NDBT_OK; } if (err.classification == NdbError::ConstraintViolation){ ERR(err); break; } ERR(err); return NDBT_FAILED; break; } } else{ closeTransaction(pNdb); } // Step to next record c = c+batch; retryAttempt = 0; } return NDBT_OK; } int HugoTransactions::createEvent(Ndb* pNdb){ char eventName[1024]; sprintf(eventName,"%s_EVENT",tab.getName()); NdbDictionary::Dictionary *myDict = pNdb->getDictionary(); if (!myDict) { printf("Event Creation failedDictionary not found"); return NDBT_FAILED; } NdbDictionary::Event myEvent(eventName); myEvent.setTable(tab.getName()); myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); // myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); // myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE); // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE); // const NdbDictionary::Table *_table = myDict->getTable(tab.getName()); for(int a = 0; a < tab.getNoOfColumns(); a++){ // myEvent.addEventColumn(_table->getColumn(a)->getName()); myEvent.addEventColumn(a); } int res = myDict->createEvent(myEvent); // Add event to database if (res == 0) myEvent.print(); else { g_info << "Event creation failed\n"; g_info << "trying drop Event, maybe event exists\n"; res = myDict->dropEvent(eventName); if (res) { g_err << "failed to drop event\n"; return NDBT_FAILED; } // try again res = myDict->createEvent(myEvent); // Add event to database if (res) { g_err << "failed to create event\n"; return NDBT_FAILED; } } return NDBT_OK; } #include <NdbEventOperation.hpp> #include "TestNdbEventOperation.hpp" #include <NdbAutoPtr.hpp> struct receivedEvent { Uint32 pk; Uint32 count; Uint32 event; }; int XXXXX = 0; int HugoTransactions::eventOperation(Ndb* pNdb, void* pstats, int records) { int myXXXXX = XXXXX++; Uint32 i; const char function[] = "HugoTransactions::eventOperation: "; struct receivedEvent* recInsertEvent; NdbAutoObjArrayPtr<struct receivedEvent> p00( recInsertEvent = new struct receivedEvent[3*records] ); struct receivedEvent* recUpdateEvent = &recInsertEvent[records]; struct receivedEvent* recDeleteEvent = &recInsertEvent[2*records]; EventOperationStats &stats = *(EventOperationStats*)pstats; stats.n_inserts = 0; stats.n_deletes = 0; stats.n_updates = 0; stats.n_consecutive = 0; stats.n_duplicates = 0; stats.n_inconsistent_gcis = 0; for (i = 0; i < records; i++) { recInsertEvent[i].pk = 0xFFFFFFFF; recInsertEvent[i].count = 0; recInsertEvent[i].event = 0xFFFFFFFF; recUpdateEvent[i].pk = 0xFFFFFFFF; recUpdateEvent[i].count = 0; recUpdateEvent[i].event = 0xFFFFFFFF; recDeleteEvent[i].pk = 0xFFFFFFFF; recDeleteEvent[i].count = 0; recDeleteEvent[i].event = 0xFFFFFFFF; } NdbDictionary::Dictionary *myDict = pNdb->getDictionary(); if (!myDict) { g_err << function << "Event Creation failedDictionary not found\n"; return NDBT_FAILED; } int r = 0; NdbEventOperation *pOp; char eventName[1024]; sprintf(eventName,"%s_EVENT",tab.getName()); int noEventColumnName = tab.getNoOfColumns(); g_info << function << "create EventOperation\n"; pOp = pNdb->createEventOperation(eventName, 100); if ( pOp == NULL ) { g_err << function << "Event operation creation failed\n"; return NDBT_FAILED; } g_info << function << "get values\n"; NdbRecAttr* recAttr[1024]; NdbRecAttr* recAttrPre[1024]; const NdbDictionary::Table *_table = myDict->getTable(tab.getName()); for (int a = 0; a < noEventColumnName; a++) { recAttr[a] = pOp->getValue(_table->getColumn(a)->getName()); recAttrPre[a] = pOp->getPreValue(_table->getColumn(a)->getName()); } // set up the callbacks g_info << function << "execute\n"; if (pOp->execute()) { // This starts changes to "start flowing" g_err << function << "operation execution failed\n"; return NDBT_FAILED; } g_info << function << "ok\n"; int count = 0; Uint32 last_inconsitant_gci = 0xEFFFFFF0; while (r < records){ //printf("now waiting for event...\n"); int res = pNdb->pollEvents(1000); // wait for event or 1000 ms if (res > 0) { //printf("got data! %d\n", r); int overrun; while (pOp->next(&overrun) > 0) { r++; r += overrun; count++; Uint32 gci = pOp->getGCI(); Uint32 pk = recAttr[0]->u_32_value(); if (!pOp->isConsistent()) { if (last_inconsitant_gci != gci) { last_inconsitant_gci = gci; stats.n_inconsistent_gcis++; } g_warning << "A node failure has occured and events might be missing\n"; } g_info << function << "GCI " << gci << ": " << count; struct receivedEvent* recEvent; switch (pOp->getEventType()) { case NdbDictionary::Event::TE_INSERT: stats.n_inserts++; g_info << " INSERT: "; recEvent = recInsertEvent; break; case NdbDictionary::Event::TE_DELETE: stats.n_deletes++; g_info << " DELETE: "; recEvent = recDeleteEvent; break; case NdbDictionary::Event::TE_UPDATE: stats.n_updates++; g_info << " UPDATE: "; recEvent = recUpdateEvent; break; case NdbDictionary::Event::TE_ALL: abort(); } if ((int)pk < records) { recEvent[pk].pk = pk; recEvent[pk].count++; } g_info << "overrun " << overrun << " pk " << pk; for (i = 1; i < noEventColumnName; i++) { if (recAttr[i]->isNULL() >= 0) { // we have a value g_info << " post[" << i << "]="; if (recAttr[i]->isNULL() == 0) // we have a non-null value g_info << recAttr[i]->u_32_value(); else // we have a null value g_info << "NULL"; } if (recAttrPre[i]->isNULL() >= 0) { // we have a value g_info << " pre[" << i << "]="; if (recAttrPre[i]->isNULL() == 0) // we have a non-null value g_info << recAttrPre[i]->u_32_value(); else // we have a null value g_info << "NULL"; } } g_info << endl; } } else ;//printf("timed out\n"); } // sleep ((XXXXX-myXXXXX)*2); g_info << myXXXXX << "dropping event operation" << endl; int res = pNdb->dropEventOperation(pOp); if (res != 0) { g_err << "operation execution failed\n"; return NDBT_FAILED; } g_info << myXXXXX << " ok" << endl; if (stats.n_inserts > 0) { stats.n_consecutive++; } if (stats.n_deletes > 0) { stats.n_consecutive++; } if (stats.n_updates > 0) { stats.n_consecutive++; } for (i = 0; i < (Uint32)records/3; i++) { if (recInsertEvent[i].pk != i) { stats.n_consecutive ++; ndbout << "missing insert pk " << i << endl; } else if (recInsertEvent[i].count > 1) { ndbout << "duplicates insert pk " << i << " count " << recInsertEvent[i].count << endl; stats.n_duplicates += recInsertEvent[i].count-1; } if (recUpdateEvent[i].pk != i) { stats.n_consecutive ++; ndbout << "missing update pk " << i << endl; } else if (recUpdateEvent[i].count > 1) { ndbout << "duplicates update pk " << i << " count " << recUpdateEvent[i].count << endl; stats.n_duplicates += recUpdateEvent[i].count-1; } if (recDeleteEvent[i].pk != i) { stats.n_consecutive ++; ndbout << "missing delete pk " << i << endl; } else if (recDeleteEvent[i].count > 1) { ndbout << "duplicates delete pk " << i << " count " << recDeleteEvent[i].count << endl; stats.n_duplicates += recDeleteEvent[i].count-1; } } return NDBT_OK; } int HugoTransactions::pkReadRecords(Ndb* pNdb, int records, int batchsize, NdbOperation::LockMode lm){ int reads = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; NdbOperation *pOp; if (batchsize == 0) { g_info << "ERROR: Argument batchsize == 0 in pkReadRecords(). Not allowed." << endl; return NDBT_FAILED; } allocRows(batchsize); while (r < records){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } for(int b=0; (b<batchsize) && (r+b < records); b++){ pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } rand_lock_mode: switch(lm){ case NdbOperation::LM_Read: check = pOp->readTuple(); break; case NdbOperation::LM_Exclusive: check = pOp->readTupleExclusive(); break; case NdbOperation::LM_CommittedRead: check = pOp->dirtyRead(); break; default: lm = (NdbOperation::LockMode)((rand() >> 16) & 3); goto rand_lock_mode; } if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Define primary keys for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pOp, a, r+b) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Define attributes to read for(a = 0; a<tab.getNoOfColumns(); a++){ if((rows[b]->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(Commit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } switch(err.code){ case 626: // Tuple did not exist g_info << r << ": " << err.code << " " << err.message << endl; r++; break; default: ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } } else{ for (int b=0; (b<batchsize) && (r+b<records); b++){ if (calc.verifyRowValues(rows[b]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } reads++; r++; } } closeTransaction(pNdb); } deallocRows(); g_info << reads << " records read" << endl; return NDBT_OK; } int HugoTransactions::pkUpdateRecords(Ndb* pNdb, int records, int batch, int doSleep){ int updated = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a, b; NdbConnection *pTrans; NdbOperation *pOp; allocRows(batch); g_info << "|- Updating records (batch=" << batch << ")..." << endl; while (r < records){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } if (doSleep > 0) NdbSleep_MilliSleep(doSleep); pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } for(b = 0; b<batch && (r+b) < records; b++){ pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->readTupleExclusive(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Define primary keys for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pOp, a, r+b) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Define attributes to read for(a = 0; a<tab.getNoOfColumns(); a++){ if((rows[b]->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } for(b = 0; b<batch && (b+r)<records; b++){ if (calc.verifyRowValues(rows[b]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } int updates = calc.getUpdatesValue(rows[b]) + 1; NdbOperation* pUpdOp; pUpdOp = pTrans->getNdbOperation(tab.getName()); if (pUpdOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pUpdOp->updateTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pUpdOp, a, r+b) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == false){ if(setValueForAttr(pUpdOp, a, r+b, updates ) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } } check = pTrans->execute(Commit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); ndbout << "r = " << r << endl; closeTransaction(pNdb); return NDBT_FAILED; } else{ updated += batch; } closeTransaction(pNdb); r += batch; // Read next record } deallocRows(); g_info << "|- " << updated << " records updated" << endl; return NDBT_OK; } int HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb, int records, int batch){ int updated = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; while (r < records){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } NdbOperation* pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->readTupleExclusive(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Define primary keys for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pOp, a, r) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Read update value for(a = 0; a<tab.getNoOfColumns(); a++){ if (calc.isUpdateCol(a) == true){ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(NoCommit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } int updates = calc.getUpdatesValue(&row) + 1; NdbOperation* pUpdOp; pUpdOp = pTrans->getNdbOperation(tab.getName()); if (pUpdOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pUpdOp->interpretedUpdateTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // PKs for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pUpdOp, a, r) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Update col for(a = 0; a<tab.getNoOfColumns(); a++){ if ((tab.getColumn(a)->getPrimaryKey() == false) && (calc.isUpdateCol(a) == true)){ // TODO switch for 32/64 bit const NdbDictionary::Column* attr = tab.getColumn(a); Uint32 valToIncWith = 1; check = pUpdOp->incValue(attr->getName(), valToIncWith); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Remaining attributes for(a = 0; a<tab.getNoOfColumns(); a++){ if ((tab.getColumn(a)->getPrimaryKey() == false) && (calc.isUpdateCol(a) == false)){ if(setValueForAttr(pUpdOp, a, r, updates ) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(Commit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); ndbout << "r = " << r << endl; closeTransaction(pNdb); return NDBT_FAILED; } else{ updated++; } closeTransaction(pNdb); r++; // Read next record } g_info << "|- " << updated << " records updated" << endl; return NDBT_OK; } int HugoTransactions::pkDelRecords(Ndb* pNdb, int records, int batch, bool allowConstraintViolation, int doSleep){ // TODO Batch is not implemented int deleted = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; NdbOperation *pOp; g_info << "|- Deleting records..." << endl; while (r < records){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } if (doSleep > 0) NdbSleep_MilliSleep(doSleep); pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->deleteTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Define primary keys for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pOp, a, r) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(Commit); if( check == -1) { const NdbError err = pTrans->getNdbError(); switch(err.status){ case NdbError::TemporaryError: ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; break; case NdbError::PermanentError: if (allowConstraintViolation == true){ switch (err.classification){ case NdbError::ConstraintViolation: // Tuple did not exist, OK but should be reported g_info << r << ": " << err.code << " " << err.message << endl; continue; break; default: break; } } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; break; default: ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } } else { deleted++; } closeTransaction(pNdb); r++; // Read next record } g_info << "|- " << deleted << " records deleted" << endl; return NDBT_OK; } int HugoTransactions::lockRecords(Ndb* pNdb, int records, int percentToLock, int lockTime){ // Place a lock on percentToLock% of the records in the Db // Keep the locks for lockTime ms, commit operation // and lock som other records int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a, b; NdbConnection *pTrans; NdbOperation *pOp; // Calculate how many records to lock in each batch if (percentToLock <= 0) percentToLock = 1; double percentVal = (double)percentToLock / 100; int lockBatch = (int)(records * percentVal); if (lockBatch <= 0) lockBatch = 1; allocRows(lockBatch); while (r < records){ g_info << "|- Locking " << lockBatch << " records..." << endl; if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } for(b = 0; (b<lockBatch) && (r+b < records); b++){ pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->readTupleExclusive(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Define primary keys for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pOp, a, r+b) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Define attributes to read for(a = 0; a<tab.getNoOfColumns(); a++){ if((rows[b]->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // NoCommit lockTime times with 100 millis interval int sleepInterval = 50; int lockCount = lockTime / sleepInterval; int commitCount = 0; do { check = pTrans->execute(NoCommit); if( check == -1) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } for (int b=0; (b<lockBatch) && (r+b<records); b++){ if (calc.verifyRowValues(rows[b]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } } commitCount++; NdbSleep_MilliSleep(sleepInterval); } while (commitCount < lockCount); // Really commit the trans, puuh! check = pTrans->execute(Commit); if( check == -1) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } else{ for (int b=0; (b<lockBatch) && (r<records); b++){ if (calc.verifyRowValues(rows[b]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } r++; // Read next record } } closeTransaction(pNdb); } deallocRows(); g_info << "|- Record locking completed" << endl; return NDBT_OK; } int HugoTransactions::indexReadRecords(Ndb* pNdb, const char * idxName, int records, int batchsize){ int reads = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; NdbOperation *pOp; NdbIndexScanOperation *sOp; NdbResultSet * rs; const NdbDictionary::Index* pIndex = pNdb->getDictionary()->getIndex(idxName, tab.getName()); const bool ordered = (pIndex->getType()==NdbDictionary::Index::OrderedIndex); if (batchsize == 0) { g_info << "ERROR: Argument batchsize == 0 in indexReadRecords(). " << "Not allowed." << endl; return NDBT_FAILED; } if (ordered) { batchsize = 1; } allocRows(batchsize); while (r < records){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } for(int b=0; (b<batchsize) && (r+b < records); b++){ if(!ordered){ pOp = pTrans->getNdbIndexOperation(idxName, tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->readTuple(); } else { pOp = sOp = pTrans->getNdbIndexScanOperation(idxName, tab.getName()); if (sOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = 0; rs = sOp->readTuples(); } if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } // Define primary keys for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pOp, a, r+b) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Define attributes to read for(a = 0; a<tab.getNoOfColumns(); a++){ if((rows[b]->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(Commit); check = (check == -1 ? -1 : !ordered ? check : rs->nextResult(true)); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); closeTransaction(pNdb); NdbSleep_MilliSleep(50); retryAttempt++; continue; } switch(err.code){ case 626: // Tuple did not exist g_info << r << ": " << err.code << " " << err.message << endl; r++; break; default: ERR(err); closeTransaction(pNdb); return NDBT_FAILED; } } else{ for (int b=0; (b<batchsize) && (r+b<records); b++){ if (calc.verifyRowValues(rows[b]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } reads++; r++; } if(ordered && rs->nextResult(true) == 0){ ndbout << "Error when comparing records " << " - index op next_result to many" << endl; closeTransaction(pNdb); return NDBT_FAILED; } } closeTransaction(pNdb); } deallocRows(); g_info << reads << " records read" << endl; return NDBT_OK; } int HugoTransactions::indexUpdateRecords(Ndb* pNdb, const char * idxName, int records, int batchsize){ int updated = 0; int r = 0; int retryAttempt = 0; const int retryMax = 100; int check, a, b; NdbConnection *pTrans; NdbOperation *pOp; NdbScanOperation * sOp; NdbResultSet * rs; const NdbDictionary::Index* pIndex = pNdb->getDictionary()->getIndex(idxName, tab.getName()); const bool ordered = (pIndex->getType()==NdbDictionary::Index::OrderedIndex); if (ordered){ batchsize = 1; } allocRows(batchsize); while (r < records){ if (retryAttempt >= retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); if (err.status == NdbError::TemporaryError){ ERR(err); NdbSleep_MilliSleep(50); retryAttempt++; continue; } ERR(err); return NDBT_FAILED; } for(b = 0; b<batchsize && (b+r)<records; b++){ if(!ordered){ pOp = pTrans->getNdbIndexOperation(idxName, tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = pOp->readTupleExclusive(); if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } else { pOp = sOp = pTrans->getNdbIndexScanOperation(idxName, tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } check = 0; rs = sOp->readTuplesExclusive(); } // Define primary keys for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pOp, a, r+b) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } // Define attributes to read for(a = 0; a<tab.getNoOfColumns(); a++){ if((rows[b]->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } check = pTrans->execute(NoCommit); check = (check == -1 ? -1 : !ordered ? check : rs->nextResult(true)); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); ERR(err); closeTransaction(pNdb); if (err.status == NdbError::TemporaryError){ NdbSleep_MilliSleep(50); retryAttempt++; continue; } return NDBT_FAILED; } if(ordered && check != 0){ g_err << "Row: " << r << " not found!!" << endl; closeTransaction(pNdb); return NDBT_FAILED; } for(b = 0; b<batchsize && (b+r)<records; b++){ if (calc.verifyRowValues(rows[b]) != 0){ closeTransaction(pNdb); return NDBT_FAILED; } int updates = calc.getUpdatesValue(rows[b]) + 1; NdbOperation* pUpdOp; if(!ordered){ pUpdOp = pTrans->getNdbIndexOperation(idxName, tab.getName()); check = (pUpdOp == 0 ? -1 : pUpdOp->updateTuple()); } else { pUpdOp = rs->updateTuple(); } if (pUpdOp == NULL) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } if( check == -1 ) { ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } if(!ordered){ for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == true){ if(equalForAttr(pUpdOp, a, r+b) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } } for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == false){ if(setValueForAttr(pUpdOp, a, r+b, updates ) != 0){ ERR(pTrans->getNdbError()); closeTransaction(pNdb); return NDBT_FAILED; } } } } check = pTrans->execute(Commit); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); ERR(err); closeTransaction(pNdb); if (err.status == NdbError::TemporaryError){ NdbSleep_MilliSleep(50); retryAttempt++; continue; } ndbout << "r = " << r << endl; return NDBT_FAILED; } else { updated += batchsize; } closeTransaction(pNdb); r+= batchsize; // Read next record } g_info << "|- " << updated << " records updated" << endl; return NDBT_OK; } template class Vector<NDBT_ResultRow*>;