Commit 5ec5cde2 authored by joreland@mysql.com's avatar joreland@mysql.com

bug#11133 - ndb write handling

  fix handling of write's in lock queue
  add test case
  add support for pkWrite+async exec in HugoOperations
parent 544a8aca
...@@ -6198,7 +6198,24 @@ Uint32 Dbacc::executeNextOperation(Signal* signal) ...@@ -6198,7 +6198,24 @@ Uint32 Dbacc::executeNextOperation(Signal* signal)
sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, 2, JBB); sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, 2, JBB);
return operationRecPtr.p->elementIsDisappeared; return operationRecPtr.p->elementIsDisappeared;
}//if }//if
}//if }
else if(operationRecPtr.p->operation == ZWRITE)
{
jam();
operationRecPtr.p->operation = ZINSERT;
if (operationRecPtr.p->prevParallelQue != RNIL) {
OperationrecPtr prevOpPtr;
jam();
prevOpPtr.i = operationRecPtr.p->prevParallelQue;
ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
if (prevOpPtr.p->operation != ZDELETE)
{
jam();
operationRecPtr.p->operation = ZUPDATE;
}
}
}
if (operationRecPtr.p->operation == ZSCAN_OP && if (operationRecPtr.p->operation == ZSCAN_OP &&
! operationRecPtr.p->isAccLockReq) { ! operationRecPtr.p->isAccLockReq) {
jam(); jam();
......
...@@ -38,6 +38,11 @@ public: ...@@ -38,6 +38,11 @@ public:
int numRecords = 1, int numRecords = 1,
int updatesValue = 0); int updatesValue = 0);
int pkWriteRecord(Ndb*,
int recordNo,
int numRecords = 1,
int updatesValue = 0);
int pkReadRecord(Ndb*, int pkReadRecord(Ndb*,
int recordNo, int recordNo,
int numRecords = 1, int numRecords = 1,
...@@ -88,6 +93,10 @@ public: ...@@ -88,6 +93,10 @@ public:
NdbScanOperation::LM_CommittedRead, NdbScanOperation::LM_CommittedRead,
int numRecords = 1); int numRecords = 1);
int execute_async(Ndb*, ExecType, AbortOption = AbortOnError);
int wait_async(Ndb*, int timeout = -1);
protected: protected:
void allocRows(int rows); void allocRows(int rows);
void deallocRows(); void deallocRows();
...@@ -102,6 +111,11 @@ protected: ...@@ -102,6 +111,11 @@ protected:
Vector<RsPair> m_executed_result_sets; Vector<RsPair> m_executed_result_sets;
NdbConnection* pTrans; NdbConnection* pTrans;
int m_async_reply;
int m_async_return;
friend void HugoOperations_async_callback(int, NdbConnection*, void*);
void callback(int res, NdbConnection*);
}; };
#endif #endif
...@@ -1049,6 +1049,8 @@ int runCheckGetNdbErrorOperation(NDBT_Context* ctx, NDBT_Step* step){ ...@@ -1049,6 +1049,8 @@ int runCheckGetNdbErrorOperation(NDBT_Context* ctx, NDBT_Step* step){
return result; return result;
} }
#define C2(x) { int _x= (x); if(_x == 0) return NDBT_FAILED; }
int runBug_11133(NDBT_Context* ctx, NDBT_Step* step){ int runBug_11133(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK; int result = NDBT_OK;
const NdbDictionary::Table* pTab = ctx->getTab(); const NdbDictionary::Table* pTab = ctx->getTab();
...@@ -1056,227 +1058,75 @@ int runBug_11133(NDBT_Context* ctx, NDBT_Step* step){ ...@@ -1056,227 +1058,75 @@ int runBug_11133(NDBT_Context* ctx, NDBT_Step* step){
HugoOperations hugoOps(*pTab); HugoOperations hugoOps(*pTab);
Ndb* pNdb = GETNDB(step); Ndb* pNdb = GETNDB(step);
Uint32 lm;
NdbConnection* pCon = pNdb->startTransaction();
if (pCon == NULL){
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
NdbOperation* pOp = pCon->getNdbOperation(pTab->getName());
if (pOp == NULL){
ERR(pCon->getNdbError());
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
if (pOp->readTuple(NdbOperation::LM_Exclusive) != 0){
pNdb->closeTransaction(pCon);
ERR(pOp->getNdbError());
return NDBT_FAILED;
}
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if (pTab->getColumn(a)->getPrimaryKey() == true){
if(hugoOps.equalForAttr(pOp, a, 1) != 0){
ERR(pCon->getNdbError());
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
}
}
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if (pTab->getColumn(a)->getPrimaryKey() != true){
if (pOp->getValue(pTab->getColumn(a)->getName()) == NULL) {
ERR(pCon->getNdbError());
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
}
}
int check = pCon->execute(NoCommit);
if (check == 0){
ndbout << "execute worked" << endl;
} else {
ERR(pCon->getNdbError());
result = NDBT_FAILED;
}
pOp = pCon->getNdbOperation(pTab->getName());
if (pOp == NULL){
ERR(pCon->getNdbError());
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
if (pOp->deleteTuple() != 0){
pNdb->closeTransaction(pCon);
ERR(pOp->getNdbError());
return NDBT_FAILED;
}
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if (pTab->getColumn(a)->getPrimaryKey() == true){
if(hugoOps.equalForAttr(pOp, a, 1) != 0){
ERR(pCon->getNdbError());
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
}
}
check = pCon->execute(NoCommit);
if (check == 0){
ndbout << "execute worked" << endl;
} else {
ERR(pCon->getNdbError());
result = NDBT_FAILED;
}
pOp = pCon->getNdbOperation(pTab->getName());
if (pOp == NULL){
ERR(pCon->getNdbError());
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
if (pOp->writeTuple() != 0){ C2(hugoOps.startTransaction(pNdb) == 0);
pNdb->closeTransaction(pCon); C2(hugoOps.pkInsertRecord(pNdb, 0, 1) == 0);
ERR(pOp->getNdbError()); C2(hugoOps.execute_NoCommit(pNdb) == 0);
return NDBT_FAILED; C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
} C2(hugoOps.execute_NoCommit(pNdb) == 0);
C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
for(int a = 0; a<pTab->getNoOfColumns(); a++){ C2(hugoOps.execute_NoCommit(pNdb) == 0);
if (pTab->getColumn(a)->getPrimaryKey() == true){ C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
if(hugoOps.equalForAttr(pOp, a, 1) != 0){ C2(hugoOps.execute_NoCommit(pNdb) == 0);
ERR(pCon->getNdbError()); C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
pNdb->closeTransaction(pCon); C2(hugoOps.execute_Commit(pNdb) == 0);
return NDBT_FAILED; C2(hugoOps.closeTransaction(pNdb) == 0);
}
} C2(hugoOps.startTransaction(pNdb) == 0);
} C2(hugoOps.pkInsertRecord(pNdb, 0, 1) == 0);
C2(hugoOps.execute_NoCommit(pNdb) == 0);
for(int a = 0; a<pTab->getNoOfColumns(); a++){ C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
if (pTab->getColumn(a)->getPrimaryKey() != true){ C2(hugoOps.execute_NoCommit(pNdb) == 0);
if(hugoOps.setValueForAttr(pOp, a, 1, 1) != 0) C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
{ C2(hugoOps.execute_NoCommit(pNdb) == 0);
ERR(pCon->getNdbError()); C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
pNdb->closeTransaction(pCon); C2(hugoOps.execute_Commit(pNdb) == 0);
return NDBT_FAILED; C2(hugoOps.closeTransaction(pNdb) == 0);
}
} C2(hugoOps.startTransaction(pNdb) == 0);
} C2(hugoOps.pkInsertRecord(pNdb, 0, 1) == 0);
C2(hugoOps.execute_Commit(pNdb) == 0);
check = pCon->execute(NoCommit); C2(hugoOps.closeTransaction(pNdb) == 0);
if (check == 0){
ndbout << "execute worked" << endl; C2(hugoOps.startTransaction(pNdb) == 0);
} else { C2(hugoOps.pkReadRecord(pNdb, 0, 1, NdbOperation::LM_Exclusive) == 0);
ERR(pCon->getNdbError()); C2(hugoOps.execute_NoCommit(pNdb) == 0);
result = NDBT_FAILED; C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
} C2(hugoOps.execute_NoCommit(pNdb) == 0);
C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
pOp = pCon->getNdbOperation(pTab->getName()); C2(hugoOps.execute_NoCommit(pNdb) == 0);
if (pOp == NULL){ C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
ERR(pCon->getNdbError()); C2(hugoOps.execute_NoCommit(pNdb) == 0);
pNdb->closeTransaction(pCon); C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
return NDBT_FAILED; C2(hugoOps.execute_Commit(pNdb) == 0);
} C2(hugoOps.closeTransaction(pNdb) == 0);
if (pOp->writeTuple() != 0){ Ndb ndb2("TEST_DB");
pNdb->closeTransaction(pCon); C2(ndb2.init() == 0);
ERR(pOp->getNdbError()); C2(ndb2.waitUntilReady() == 0);
return NDBT_FAILED; HugoOperations hugoOps2(*pTab);
}
C2(hugoOps.startTransaction(pNdb) == 0);
for(int a = 0; a<pTab->getNoOfColumns(); a++){ C2(hugoOps.pkInsertRecord(pNdb, 0, 1) == 0);
if (pTab->getColumn(a)->getPrimaryKey() == true){ C2(hugoOps.execute_NoCommit(pNdb) == 0);
if(hugoOps.equalForAttr(pOp, a, 1) != 0){ C2(hugoOps2.startTransaction(&ndb2) == 0);
ERR(pCon->getNdbError()); C2(hugoOps2.pkWriteRecord(&ndb2, 0, 1) == 0);
pNdb->closeTransaction(pCon); C2(hugoOps2.execute_async(&ndb2, NoCommit) == 0);
return NDBT_FAILED; C2(hugoOps.execute_Commit(pNdb) == 0);
} C2(hugoOps2.wait_async(&ndb2) == 0);
} C2(hugoOps.closeTransaction(pNdb) == 0);
} C2(hugoOps2.closeTransaction(&ndb2) == 0);
for(int a = 0; a<pTab->getNoOfColumns(); a++){ C2(hugoOps.startTransaction(pNdb) == 0);
if (pTab->getColumn(a)->getPrimaryKey() != true){ C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
if(hugoOps.setValueForAttr(pOp, a, 1, 1) != 0) C2(hugoOps.execute_NoCommit(pNdb) == 0);
{ C2(hugoOps2.startTransaction(&ndb2) == 0);
ERR(pCon->getNdbError()); C2(hugoOps2.pkWriteRecord(&ndb2, 0, 1) == 0);
pNdb->closeTransaction(pCon); C2(hugoOps2.execute_async(&ndb2, NoCommit) == 0);
return NDBT_FAILED; C2(hugoOps.execute_Commit(pNdb) == 0);
} C2(hugoOps2.wait_async(&ndb2) == 0);
} C2(hugoOps.closeTransaction(pNdb) == 0);
} C2(hugoOps2.closeTransaction(&ndb2) == 0);
check = pCon->execute(NoCommit);
if (check == 0){
ndbout << "execute worked" << endl;
} else {
ERR(pCon->getNdbError());
result = NDBT_FAILED;
}
check = pCon->execute(Rollback);
if (check == 0){
ndbout << "execute worked" << endl;
} else {
ERR(pCon->getNdbError());
result = NDBT_FAILED;
}
pCon->close();
pCon = pNdb->startTransaction();
if (pCon == NULL){
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
pOp = pCon->getNdbOperation(pTab->getName());
if (pOp == NULL){
ERR(pCon->getNdbError());
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
if (pOp->writeTuple() != 0){
pNdb->closeTransaction(pCon);
ERR(pOp->getNdbError());
return NDBT_FAILED;
}
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if (pTab->getColumn(a)->getPrimaryKey() == true){
if(hugoOps.equalForAttr(pOp, a, 1) != 0){
ERR(pCon->getNdbError());
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
}
}
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if (pTab->getColumn(a)->getPrimaryKey() != true){
if(hugoOps.setValueForAttr(pOp, a, 1, 1) != 0)
{
ERR(pCon->getNdbError());
pNdb->closeTransaction(pCon);
return NDBT_FAILED;
}
}
}
check = pCon->execute(Commit);
if (check == 0){
ndbout << "execute worked" << endl;
} else {
ERR(pCon->getNdbError());
result = NDBT_FAILED;
}
return result; return result;
} }
...@@ -1359,7 +1209,6 @@ TESTCASE("ReadWithoutGetValue", ...@@ -1359,7 +1209,6 @@ TESTCASE("ReadWithoutGetValue",
} }
TESTCASE("Bug_11133", TESTCASE("Bug_11133",
"Test ReadEx-Delete-Write\n"){ "Test ReadEx-Delete-Write\n"){
INITIALIZER(runLoadTable);
INITIALIZER(runBug_11133); INITIALIZER(runBug_11133);
FINALIZER(runClearTable); FINALIZER(runClearTable);
} }
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#include <HugoOperations.hpp> #include <HugoOperations.hpp>
int HugoOperations::startTransaction(Ndb* pNdb){ int HugoOperations::startTransaction(Ndb* pNdb){
if (pTrans != NULL){ if (pTrans != NULL){
...@@ -199,6 +198,48 @@ int HugoOperations::pkInsertRecord(Ndb* pNdb, ...@@ -199,6 +198,48 @@ int HugoOperations::pkInsertRecord(Ndb* pNdb,
return NDBT_OK; return NDBT_OK;
} }
int HugoOperations::pkWriteRecord(Ndb* pNdb,
int recordNo,
int numRecords,
int updatesValue){
int a, check;
for(int r=0; r < numRecords; r++){
NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
return NDBT_FAILED;
}
check = pOp->writeTuple();
if( check == -1 ) {
ERR(pTrans->getNdbError());
return NDBT_FAILED;
}
// Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r+recordNo) != 0){
ERR(pTrans->getNdbError());
return NDBT_FAILED;
}
}
}
// Define attributes to update
for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == false){
if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){
ERR(pTrans->getNdbError());
return NDBT_FAILED;
}
}
}
}
return NDBT_OK;
}
int HugoOperations::pkDeleteRecord(Ndb* pNdb, int HugoOperations::pkDeleteRecord(Ndb* pNdb,
int recordNo, int recordNo,
int numRecords){ int numRecords){
...@@ -399,16 +440,58 @@ int HugoOperations::execute_Rollback(Ndb* pNdb){ ...@@ -399,16 +440,58 @@ int HugoOperations::execute_Rollback(Ndb* pNdb){
return NDBT_OK; return NDBT_OK;
} }
void
HugoOperations_async_callback(int res, NdbConnection* pCon, void* ho)
{
((HugoOperations*)ho)->callback(res, pCon);
}
void
HugoOperations::callback(int res, NdbConnection* pCon)
{
assert(pCon == pTrans);
m_async_reply= 1;
m_async_return= res;
}
int
HugoOperations::execute_async(Ndb* pNdb, ExecType et, AbortOption eao){
m_async_reply= 0;
pTrans->executeAsynchPrepare(et,
HugoOperations_async_callback,
this,
eao);
pNdb->sendPreparedTransactions();
return NDBT_OK;
}
int
HugoOperations::wait_async(Ndb* pNdb, int timeout)
{
pNdb->pollNdb(1000);
if(m_async_reply)
{
return m_async_return;
}
ndbout_c("wait returned nothing...");
return -1;
}
HugoOperations::HugoOperations(const NdbDictionary::Table& _tab): HugoOperations::HugoOperations(const NdbDictionary::Table& _tab):
UtilTransactions(_tab), UtilTransactions(_tab),
calc(_tab), calc(_tab),
pTrans(NULL){ pTrans(NULL)
{
} }
HugoOperations::~HugoOperations(){ HugoOperations::~HugoOperations(){
deallocRows(); deallocRows();
if (pTrans != NULL){ if (pTrans != NULL)
{
pTrans->close(); pTrans->close();
pTrans = NULL; pTrans = NULL;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment