/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include <HugoOperations.hpp> int HugoOperations::startTransaction(Ndb* pNdb, const NdbDictionary::Table *table, const char *keyData, Uint32 keyLen){ if (pTrans != NULL){ ndbout << "HugoOperations::startTransaction, pTrans != NULL" << endl; return NDBT_FAILED; } pTrans = pNdb->startTransaction(table, keyData, keyLen); if (pTrans == NULL) { const NdbError err = pNdb->getNdbError(); ERR(err); return NDBT_FAILED; } return NDBT_OK; } int HugoOperations::setTransaction(NdbTransaction* new_trans, bool not_null_ok){ if (pTrans != NULL && !not_null_ok){ ndbout << "HugoOperations::startTransaction, pTrans != NULL" << endl; return NDBT_FAILED; } pTrans = new_trans; if (pTrans == NULL) { return NDBT_FAILED; } return NDBT_OK; } void HugoOperations::setTransactionId(Uint64 id){ if (pTrans != NULL){ pTrans->setTransactionId(id); } } int HugoOperations::closeTransaction(Ndb* pNdb){ UtilTransactions::closeTransaction(pNdb); m_result_sets.clear(); m_executed_result_sets.clear(); return NDBT_OK; } NdbConnection* HugoOperations::getTransaction(){ return pTrans; } int HugoOperations::pkReadRecord(Ndb* pNdb, int recordNo, int numRecords, NdbOperation::LockMode lm){ int a; allocRows(numRecords); int check; NdbOperation* pOp = 0; pIndexScanOp = 0; for(int r=0; r < numRecords; r++){ if(pOp == 0) { pOp = getOperation(pTrans, NdbOperation::ReadRequest); } if (pOp == NULL) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } rand_lock_mode: switch(lm){ case NdbOperation::LM_Read: case NdbOperation::LM_Exclusive: case NdbOperation::LM_CommittedRead: if(idx && idx->getType() == NdbDictionary::Index::OrderedIndex && pIndexScanOp == 0) { pIndexScanOp = ((NdbIndexScanOperation*)pOp); check = pIndexScanOp->readTuples(lm); } else check = pOp->readTuple(lm); break; default: lm = (NdbOperation::LockMode)((rand() >> 16) & 3); goto rand_lock_mode; } if( check == -1 ) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } // Define primary keys if (equalForRow(pOp, r+recordNo) != 0) return NDBT_FAILED; if(pIndexScanOp) pIndexScanOp->end_of_bound(r); if(r == 0 || pIndexScanOp == 0) { // Define attributes to read for(a = 0; a<tab.getNoOfColumns(); a++){ if((rows[r]->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } } } pOp = pIndexScanOp; } return NDBT_OK; } int HugoOperations::pkUpdateRecord(Ndb* pNdb, int recordNo, int numRecords, int updatesValue){ allocRows(numRecords); int check; for(int r=0; r < numRecords; r++){ NdbOperation* pOp = getOperation(pTrans, NdbOperation::UpdateRequest); if (pOp == NULL) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } check = pOp->updateTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } if(setValues(pOp, r+recordNo, updatesValue) != NDBT_OK) { return NDBT_FAILED; } } return NDBT_OK; } int HugoOperations::setValues(NdbOperation* pOp, int rowId, int updateId) { // Define primary keys int a; if (equalForRow(pOp, rowId) != 0) return NDBT_FAILED; for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == false){ if(setValueForAttr(pOp, a, rowId, updateId ) != 0){ ERR(pTrans->getNdbError()); return NDBT_FAILED; } } } return NDBT_OK; } int HugoOperations::pkInsertRecord(Ndb* pNdb, int recordNo, int numRecords, int updatesValue){ int check; for(int r=0; r < numRecords; r++){ NdbOperation* pOp = getOperation(pTrans, NdbOperation::InsertRequest); if (pOp == NULL) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } check = pOp->insertTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } if(setValues(pOp, r+recordNo, updatesValue) != NDBT_OK) { return NDBT_FAILED; } } return NDBT_OK; } int HugoOperations::pkWriteRecord(Ndb* pNdb, int recordNo, int numRecords, int updatesValue){ int a, check; for(int r=0; r < numRecords; r++){ NdbOperation* pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } check = pOp->writeTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } // Define primary keys if (equalForRow(pOp, r+recordNo) != 0) return NDBT_FAILED; // Define attributes to update for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == false){ if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){ ERR(pTrans->getNdbError()); return NDBT_FAILED; } } } } return NDBT_OK; } int HugoOperations::pkWritePartialRecord(Ndb* pNdb, int recordNo, int numRecords){ int check; for(int r=0; r < numRecords; r++){ NdbOperation* pOp = pTrans->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } check = pOp->writeTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } // Define primary keys if (equalForRow(pOp, r+recordNo) != 0) return NDBT_FAILED; } return NDBT_OK; } int HugoOperations::pkDeleteRecord(Ndb* pNdb, int recordNo, int numRecords){ int check; for(int r=0; r < numRecords; r++){ NdbOperation* pOp = getOperation(pTrans, NdbOperation::DeleteRequest); if (pOp == NULL) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } check = pOp->deleteTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } // Define primary keys if (equalForRow(pOp, r+recordNo) != 0) return NDBT_FAILED; } return NDBT_OK; } int HugoOperations::execute_Commit(Ndb* pNdb, AbortOption eao){ int check = 0; check = pTrans->execute(Commit, eao); const NdbError err = pTrans->getNdbError(); if( check == -1 || err.code) { ERR(err); NdbOperation* pOp = pTrans->getNdbErrorOperation(); if (pOp != NULL){ const NdbError err2 = pOp->getNdbError(); ERR(err2); } if (err.code == 0) return NDBT_FAILED; return err.code; } for(int i = 0; i<m_result_sets.size(); i++){ m_executed_result_sets.push_back(m_result_sets[i]); int rows = m_result_sets[i].records; NdbScanOperation* rs = m_result_sets[i].m_result_set; int res = rs->nextResult(); switch(res){ case 1: return 626; case -1: const NdbError err = pTrans->getNdbError(); ERR(err); return (err.code > 0 ? err.code : NDBT_FAILED); } // A row found switch(rows){ case 0: return 4000; default: m_result_sets[i].records--; break; } } m_result_sets.clear(); return NDBT_OK; } int HugoOperations::execute_NoCommit(Ndb* pNdb, AbortOption eao){ int check; check = pTrans->execute(NoCommit, eao); const NdbError err = pTrans->getNdbError(); if( check == -1 || err.code) { ERR(err); const NdbOperation* pOp = pTrans->getNdbErrorOperation(); while (pOp != NULL) { const NdbError err2 = pOp->getNdbError(); if (err2.code) ERR(err2); pOp = pTrans->getNextCompletedOperation(pOp); } if (err.code == 0) return NDBT_FAILED; return err.code; } for(int i = 0; i<m_result_sets.size(); i++){ m_executed_result_sets.push_back(m_result_sets[i]); int rows = m_result_sets[i].records; NdbScanOperation* rs = m_result_sets[i].m_result_set; int res = rs->nextResult(); switch(res){ case 1: return 626; case -1: const NdbError err = pTrans->getNdbError(); ERR(err); return (err.code > 0 ? err.code : NDBT_FAILED); } // A row found switch(rows){ case 0: return 4000; default: case 1: break; } } m_result_sets.clear(); return NDBT_OK; } int HugoOperations::execute_Rollback(Ndb* pNdb){ int check; check = pTrans->execute(Rollback); if( check == -1 ) { const NdbError err = pTrans->getNdbError(); ERR(err); return NDBT_FAILED; } return NDBT_OK; } void HugoOperations_async_callback(int res, NdbTransaction* pCon, void* ho) { ((HugoOperations*)ho)->callback(res, pCon); } void HugoOperations::callback(int res, NdbTransaction* pCon) { assert(pCon == pTrans); m_async_reply= 1; if(res) { m_async_return = pCon->getNdbError().code; } else { m_async_return = 0; } } int HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et, NdbOperation::AbortOption eao){ m_async_reply= 0; pTrans->executeAsynchPrepare(et, HugoOperations_async_callback, this, eao); pNdb->sendPreparedTransactions(); return NDBT_OK; } int HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et, NdbOperation::AbortOption eao){ m_async_reply= 0; pTrans->executeAsynchPrepare(et, HugoOperations_async_callback, this, eao); return NDBT_OK; } int HugoOperations::wait_async(Ndb* pNdb, int timeout) { volatile int * wait = &m_async_reply; while (!* wait) { pNdb->sendPollNdb(1000); if(* wait) { if(m_async_return) ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl; return m_async_return; } } ndbout_c("wait returned nothing..."); return -1; } HugoOperations::HugoOperations(const NdbDictionary::Table& _tab, const NdbDictionary::Index* idx): UtilTransactions(_tab, idx), calc(_tab) { } HugoOperations::~HugoOperations(){ deallocRows(); if (pTrans != NULL) { pTrans->close(); pTrans = NULL; } } int HugoOperations::equalForRow(NdbOperation* pOp, int row) { for(int a = 0; a<tab.getNoOfColumns(); a++) { if (tab.getColumn(a)->getPrimaryKey() == true) { if(equalForAttr(pOp, a, row) != 0) { ERR(pOp->getNdbError()); return NDBT_FAILED; } } } return NDBT_OK; } int HugoOperations::equalForAttr(NdbOperation* pOp, int attrId, int rowId){ int check = -1; const NdbDictionary::Column* attr = tab.getColumn(attrId); if (attr->getPrimaryKey() == false){ g_info << "Can't call equalForAttr on non PK attribute" << endl; return NDBT_FAILED; } int len = attr->getSizeInBytes(); char buf[8000]; memset(buf, 0, sizeof(buf)); Uint32 real_len; const char * value = calc.calcValue(rowId, attrId, 0, buf, len, &real_len); return pOp->equal( attr->getName(), value, real_len); } int HugoOperations::setValueForAttr(NdbOperation* pOp, int attrId, int rowId, int updateId){ int check = -1; const NdbDictionary::Column* attr = tab.getColumn(attrId); int len = attr->getSizeInBytes(); char buf[8000]; memset(buf, 0, sizeof(buf)); Uint32 real_len; const char * value = calc.calcValue(rowId, attrId, updateId, buf, len, &real_len); return pOp->setValue( attr->getName(), value, real_len); } int HugoOperations::verifyUpdatesValue(int updatesValue, int _numRows){ _numRows = (_numRows == 0 ? rows.size() : _numRows); int result = NDBT_OK; for(int i = 0; i<_numRows; i++){ if(calc.verifyRowValues(rows[i]) != NDBT_OK){ g_err << "Inconsistent row" << endl << "\t" << rows[i]->c_str().c_str() << endl; result = NDBT_FAILED; continue; } if(calc.getUpdatesValue(rows[i]) != updatesValue){ result = NDBT_FAILED; g_err << "Invalid updates value for row " << i << endl << " updatesValue: " << updatesValue << endl << " calc.getUpdatesValue: " << calc.getUpdatesValue(rows[i]) << endl << rows[i]->c_str().c_str() << endl; continue; } } if(_numRows == 0){ g_err << "No rows -> Invalid updates value" << endl; return NDBT_FAILED; } return result; } void HugoOperations::allocRows(int _numRows){ if(_numRows <= 0){ g_info << "Illegal value for num rows : " << _numRows << endl; abort(); } for(int b=rows.size(); b<_numRows; b++){ rows.push_back(new NDBT_ResultRow(tab)); } } void HugoOperations::deallocRows(){ while(rows.size() > 0){ delete rows.back(); rows.erase(rows.size() - 1); } } int HugoOperations::saveCopyOfRecord(int numRecords ){ if (numRecords > (int)rows.size()) return NDBT_FAILED; for (int i = 0; i < numRecords; i++){ savedRecords.push_back(rows[i]->c_str()); } return NDBT_OK; } BaseString HugoOperations::getRecordStr(int recordNum){ if (recordNum > (int)rows.size()) return NULL; return rows[recordNum]->c_str(); } int HugoOperations::getRecordGci(int recordNum){ return pTrans->getGCI(); } int HugoOperations::compareRecordToCopy(int numRecords ){ if (numRecords > (int)rows.size()) return NDBT_FAILED; if ((unsigned)numRecords > savedRecords.size()) return NDBT_FAILED; int result = NDBT_OK; for (int i = 0; i < numRecords; i++){ BaseString str = rows[i]->c_str(); ndbout << "row["<<i<<"]: " << str << endl; ndbout << "sav["<<i<<"]: " << savedRecords[i] << endl; if (savedRecords[i] == str){ ; } else { result = NDBT_FAILED; } } return result; } void HugoOperations::refresh() { NdbTransaction * t = getTransaction(); if(t) t->refresh(); } int HugoOperations::indexReadRecords(Ndb*, const char * idxName, int recordNo, bool exclusive, int numRecords){ int a; allocRows(numRecords); int check; for(int r=0; r < numRecords; r++){ NdbOperation* pOp = pTrans->getNdbIndexOperation(idxName, tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } if (exclusive == true) check = pOp->readTupleExclusive(); else check = pOp->readTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } // Define primary keys if (equalForRow(pOp, r+recordNo) != 0) return NDBT_FAILED; // Define attributes to read for(a = 0; a<tab.getNoOfColumns(); a++){ if((rows[r]->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } } } return NDBT_OK; } int HugoOperations::indexUpdateRecord(Ndb*, const char * idxName, int recordNo, int numRecords, int updatesValue){ int a; allocRows(numRecords); int check; for(int r=0; r < numRecords; r++){ NdbOperation* pOp = pTrans->getNdbIndexOperation(idxName, tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } check = pOp->updateTuple(); if( check == -1 ) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } // Define primary keys if (equalForRow(pOp, r+recordNo) != 0) return NDBT_FAILED; // Define attributes to update for(a = 0; a<tab.getNoOfColumns(); a++){ if (tab.getColumn(a)->getPrimaryKey() == false){ if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){ ERR(pTrans->getNdbError()); return NDBT_FAILED; } } } } return NDBT_OK; } int HugoOperations::scanReadRecords(Ndb* pNdb, NdbScanOperation::LockMode lm, int records){ allocRows(records); NdbScanOperation * pOp = pTrans->getNdbScanOperation(tab.getName()); if(!pOp) return -1; if(pOp->readTuples(lm, 0, 1)){ return -1; } for(int a = 0; a<tab.getNoOfColumns(); a++){ if((rows[0]->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(pTrans->getNdbError()); return NDBT_FAILED; } } RsPair p = {pOp, records}; m_result_sets.push_back(p); return 0; } template class Vector<HugoOperations::RsPair>;