Commit 56589afd authored by unknown's avatar unknown

ndb -

  fix scan bugs introduced by acc modifications
  add more error testcases


storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp:
  remove unused state
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
  1) remove unused state
  2) Fix abort of running lock owner
  3) Fix abort of running op in parallell queue (especially scans)
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  remove some printouts
  add some jams
  fix so that close tupscan, can not acciently start acc scan in queue
    (NOTE limits #tupscans to 12 which is not really necessary...but the fix was easy)
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp:
  Use abort of locks when closing/blocked
    as Dbacc gets annoyed when committing an op with state running
storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp:
  Use abort of locks when closing/blocked
    as Dbacc gets annoyed when committing an op with state running
storage/ndb/test/include/HugoOperations.hpp:
  new method
storage/ndb/test/ndbapi/testBasic.cpp:
  add more test cases
storage/ndb/test/ndbapi/testScan.cpp:
  add more testcases
storage/ndb/test/run-test/daily-basic-tests.txt:
  add more testcases
storage/ndb/test/src/HugoOperations.cpp:
  add more testcases
parent 8bfc2d9c
......@@ -533,7 +533,6 @@ struct Operationrec {
,OP_STATE_WAITING = 0x00000
,OP_STATE_RUNNING = 0x10000
,OP_STATE_EXECUTED = 0x30000
,OP_STATE_RUNNING_ABORT = 0x20000
,OP_EXECUTED_DIRTY_READ = 0x3050F
,OP_INITIAL = ~(Uint32)0
......
......@@ -1181,6 +1181,7 @@ void Dbacc::execACCKEYREQ(Signal* signal)
void
Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI)
{
jamEntry();
OperationrecPtr lastOp;
lastOp.i = opPtrI;
ptrCheckGuard(lastOp, coprecsize, operationrec);
......@@ -1200,9 +1201,6 @@ Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI)
startNext(signal, lastOp);
return;
}
else if (opstate == Operationrec::OP_STATE_RUNNING_ABORT)
{
}
else
{
}
......@@ -1240,15 +1238,14 @@ Dbacc::startNext(Signal* signal, OperationrecPtr lastOp)
{
jam();
ptrCheckGuard(loPtr, coprecsize, operationrec);
nextOp.i = loPtr.p->nextSerialQue;
}
else
{
jam();
nextOp.i = loPtr.i;
loPtr = lastOp;
}
nextOp.i = loPtr.p->nextSerialQue;
ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
if (nextOp.i == RNIL)
......@@ -1411,6 +1408,9 @@ Dbacc::startNext(Signal* signal, OperationrecPtr lastOp)
else
{
jam();
fragrecptr.i = nextOp.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
sendAcckeyconf(signal);
sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF,
signal, 6, JBA);
......@@ -1680,8 +1680,7 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr)
bool running = false;
{
Uint32 opstate = loPtr.p->m_op_bits & Operationrec::OP_STATE_MASK;
if (opstate == Operationrec::OP_STATE_RUNNING ||
opstate == Operationrec::OP_STATE_RUNNING_ABORT)
if (opstate == Operationrec::OP_STATE_RUNNING)
running = true;
else
{
......@@ -1719,8 +1718,7 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr)
}
else
{
if (opstate == Operationrec::OP_STATE_RUNNING ||
opstate == Operationrec::OP_STATE_RUNNING_ABORT)
if (opstate == Operationrec::OP_STATE_RUNNING)
running = true;
else
vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED);
......@@ -1830,8 +1828,6 @@ operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr)
out << " RUNNING "; break;
case Dbacc::Operationrec::OP_STATE_EXECUTED:
out << " EXECUTED "; break;
case Dbacc::Operationrec::OP_STATE_RUNNING_ABORT:
out << " RUNNIG_ABORT "; break;
case Dbacc::Operationrec::OP_STATE_IDLE:
out << " IDLE "; break;
default:
......@@ -1857,7 +1853,6 @@ operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr)
,OP_STATE_WAITING = 0x0000
,OP_STATE_RUNNING = 0x1000
,OP_STATE_EXECUTED = 0x3000
,OP_STATE_RUNNING_ABORT = 0x2000
};
*/
if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER)
......@@ -3950,6 +3945,7 @@ void Dbacc::checkoverfreelist(Signal* signal)
void
Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
{
jam();
OperationrecPtr nextP;
OperationrecPtr prevP;
OperationrecPtr loPtr;
......@@ -3992,6 +3988,11 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
else
{
jam();
/**
* P0 - P1
*
* Abort P1, check start next
*/
ndbassert(prevP.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
prevP.p->m_lo_last_parallel_op_ptr_i = RNIL;
startNext(signal, prevP);
......@@ -3999,6 +4000,9 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
return;
}
/**
* Abort P1/P2
*/
if (opbits & Operationrec::OP_LOCK_MODE)
{
Uint32 nextbits = nextP.p->m_op_bits;
......@@ -4024,12 +4028,23 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
/**
* Abort P1, P2
*/
if (opstate == Operationrec::OP_STATE_RUNNING)
{
jam();
startNext(signal, prevP);
validate_lock_queue(prevP);
return;
}
ndbassert(opstate == Operationrec::OP_STATE_EXECUTED ||
opstate == Operationrec::OP_STATE_WAITING);
/**
* Scan to last of run queue
*/
while (nextP.p->nextParallelQue != RNIL)
{
jam();
nextP.i = nextP.p->nextParallelQue;
ptrCheckGuard(nextP, coprecsize, operationrec);
}
......@@ -4049,6 +4064,7 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
void
Dbacc::abortSerieQueueOperation(Signal* signal, OperationrecPtr opPtr)
{
jam();
OperationrecPtr prevS, nextS;
OperationrecPtr prevP, nextP;
OperationrecPtr loPtr;
......@@ -4620,6 +4636,24 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit)
* Aborting an operation can *always* lead to lock upgrade
*/
action = CHECK_LOCK_UPGRADE;
Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
if (opstate != Operationrec::OP_STATE_EXECUTED)
{
ndbassert(opstate == Operationrec::OP_STATE_RUNNING);
if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
{
jam();
report_dealloc(signal, opPtr.p);
newOwner.p->localdata[0] = ~0;
}
else
{
jam();
newOwner.p->localdata[0] = opPtr.p->localdata[0];
newOwner.p->localdata[1] = opPtr.p->localdata[1];
}
action = START_NEW;
}
/**
* Update ACC_LOCK_MODE
......
......@@ -2604,11 +2604,6 @@ void Dblqh::execTUPKEYREF(Signal* signal)
ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP ||
regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT);
}
else if (getNodeState().startLevel == NodeState::SL_STARTED)
{
if (terrorCode == 899)
ndbout << "899: " << regTcPtr->m_row_id << endl;
}
switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP:
......@@ -9095,6 +9090,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
if (accOpPtr != (Uint32)-1)
{
c_acc->execACCKEY_ORD(signal, accOpPtr);
jamEntry();
}
else
{
......@@ -9419,7 +9415,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo);
const Uint32 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo);
const Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo);
const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo);
......@@ -9458,7 +9454,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
scanptr.p->m_last_row = 0;
scanptr.p->scanStoredProcId = RNIL;
scanptr.p->copyPtr = RNIL;
if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){
jam();
return ScanFragRef::ZWRONG_BATCH_SIZE;
......@@ -9479,8 +9475,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
* !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
* idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42)
*/
Uint32 start = (rangeScan || tupScan ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
Uint32 stop = (rangeScan || tupScan ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
tupScan = 0; // Make sure that close tup scan does not start acc scan incorrectly
Uint32 start = (rangeScan || tupScan) ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ;
Uint32 stop = (rangeScan || tupScan) ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG :
MAX_PARALLEL_SCANS_PER_FRAG - 1;
stop += start;
Uint32 free = tFragPtr.p->m_scanNumberMask.find(start);
......
......@@ -182,7 +182,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal)
ndbrequire(scan.m_accLockOp != RNIL);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
signal, AccLockReq::UndoSignalLength);
......@@ -433,7 +433,7 @@ Dbtup::execACCKEYCONF(Signal* signal)
jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
......
......@@ -344,7 +344,8 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::AbortWithConf;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting;
......@@ -355,9 +356,10 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
ndbrequire(scan.m_accLockOp != RNIL);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
......@@ -612,7 +614,7 @@ Dbtux::execACCKEYCONF(Signal* signal)
jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
......
......@@ -108,6 +108,8 @@ public:
NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];}
int execute_async(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError);
int execute_async_prepare(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError);
int wait_async(Ndb*, int timeout = -1);
protected:
......
......@@ -1090,11 +1090,6 @@ runMassiveRollback4(NDBT_Context* ctx, NDBT_Step* step){
ok = false;
break;
}
if (hugoOps.execute_NoCommit(pNdb) != 0)
{
ok = false;
break;
}
}
hugoOps.execute_Rollback(pNdb);
CHECK(hugoOps.closeTransaction(pNdb) == 0);
......@@ -1199,6 +1194,61 @@ runTupErrors(NDBT_Context* ctx, NDBT_Step* step){
return NDBT_OK;
}
int
runInsertError(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
HugoOperations hugoOp1(*ctx->getTab());
HugoOperations hugoOp2(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
NdbRestarter restarter;
restarter.insertErrorInAllNodes(4017);
const Uint32 LOOPS = 10;
for (Uint32 i = 0; i<LOOPS; i++)
{
CHECK(hugoOp1.startTransaction(pNdb) == 0);
CHECK(hugoOp1.pkInsertRecord(pNdb, 1) == 0);
CHECK(hugoOp2.startTransaction(pNdb) == 0);
CHECK(hugoOp2.pkReadRecord(pNdb, 1, 1) == 0);
CHECK(hugoOp1.execute_async_prepare(pNdb, NdbTransaction::Commit) == 0);
CHECK(hugoOp2.execute_async_prepare(pNdb, NdbTransaction::Commit) == 0);
hugoOp1.wait_async(pNdb);
hugoOp2.wait_async(pNdb);
CHECK(hugoOp1.closeTransaction(pNdb) == 0);
CHECK(hugoOp2.closeTransaction(pNdb) == 0);
}
restarter.insertErrorInAllNodes(0);
return result;
}
int
runInsertError2(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
HugoOperations hugoOp1(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
NdbRestarter restarter;
restarter.insertErrorInAllNodes(4017);
const Uint32 LOOPS = 1;
for (Uint32 i = 0; i<LOOPS; i++)
{
CHECK(hugoOp1.startTransaction(pNdb) == 0);
CHECK(hugoOp1.pkInsertRecord(pNdb, 1) == 0);
CHECK(hugoOp1.pkDeleteRecord(pNdb, 1) == 0);
CHECK(hugoOp1.execute_NoCommit(pNdb) == 0);
CHECK(hugoOp1.closeTransaction(pNdb) == 0);
}
restarter.insertErrorInAllNodes(0);
}
NDBT_TESTSUITE(testBasic);
TESTCASE("PkInsert",
"Verify that we can insert and delete from this table using PK"
......@@ -1449,16 +1499,16 @@ TESTCASE("MassiveTransaction",
INITIALIZER(runLoadTable2);
FINALIZER(runClearTable2);
}
TESTCASE("Fill",
"Verify what happens when we fill the db" ){
INITIALIZER(runFillTable);
INITIALIZER(runPkRead);
FINALIZER(runClearTable2);
}
TESTCASE("TupError",
"Verify what happens when we fill the db" ){
INITIALIZER(runTupErrors);
}
TESTCASE("InsertError", "" ){
INITIALIZER(runInsertError);
}
TESTCASE("InsertError2", "" ){
INITIALIZER(runInsertError2);
}
NDBT_TESTSUITE_END(testBasic);
#if 0
......@@ -1469,6 +1519,12 @@ TESTCASE("ReadConsistency",
STEP(runReadOne);
FINALIZER(runClearTable2);
}
TESTCASE("Fill",
"Verify what happens when we fill the db" ){
INITIALIZER(runFillTable);
INITIALIZER(runPkRead);
FINALIZER(runClearTable2);
}
#endif
int main(int argc, const char** argv){
......
......@@ -286,15 +286,26 @@ int runRandScanRead(NDBT_Context* ctx, NDBT_Step* step){
int records = ctx->getNumRecords();
int parallelism = ctx->getProperty("Parallelism", 240);
int abort = ctx->getProperty("AbortProb", 5);
int tupscan = ctx->getProperty("TupScan", (Uint32)0);
int i = 0;
HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops && !ctx->isTestStopped()) {
g_info << i << ": ";
NdbOperation::LockMode lm = (NdbOperation::LockMode)(rand() % 3);
int scan_flags = 0;
if (tupscan == 1)
scan_flags |= NdbScanOperation::SF_TupScan;
else if (tupscan == 2 && ((rand() & 0x800)))
{
scan_flags |= NdbScanOperation::SF_TupScan;
}
if (hugoTrans.scanReadRecords(GETNDB(step),
records, abort, parallelism,
lm) != 0){
lm,
scan_flags) != 0){
return NDBT_FAILED;
}
i++;
......@@ -1320,6 +1331,16 @@ TESTCASE("ScanRead488",
STEPS(runRandScanRead, 70);
FINALIZER(runClearTable);
}
TESTCASE("ScanRead488T",
"Verify scan requirement: It's only possible to have 11 concurrent "\
"scans per fragment running in Ndb kernel at the same time. "\
"When this limit is exceeded the scan will be aborted with errorcode "\
"488."){
TC_PROPERTY("TupScan", 1);
INITIALIZER(runLoadTable);
STEPS(runRandScanRead, 70);
FINALIZER(runClearTable);
}
TESTCASE("ScanRead488O",
"Verify scan requirement: It's only possible to have 11 concurrent "\
"scans per fragment running in Ndb kernel at the same time. "\
......@@ -1336,6 +1357,7 @@ TESTCASE("ScanRead488_Mixed",
"scans per fragment running in Ndb kernel at the same time. "\
"When this limit is exceeded the scan will be aborted with errorcode "\
"488."){
TC_PROPERTY("TupScan", 2);
INITIALIZER(createOrderedPkIndex);
INITIALIZER(runLoadTable);
STEPS(runRandScanRead, 50);
......
......@@ -219,6 +219,14 @@ max-time: 500
cmd: testBasic
args: -n TupError
max-time: 500
cmd: testBasic
args: -n InsertError T1
max-time: 500
cmd: testBasic
args: -n InsertError2 T1
max-time: 500
cmd: testTimeout
args: T1
......@@ -273,6 +281,10 @@ max-time: 500
cmd: testScan
args: -n ScanRead488O -l 10 T6 D1 D2
max-time: 1000
cmd: testScan
args: -n ScanRead488T -l 10 T6 D1 D2
max-time: 1000
cmd: testScan
args: -n ScanRead488_Mixed -l 10 T6 D1 D2
......
......@@ -471,17 +471,34 @@ HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et,
return NDBT_OK;
}
int
HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et,
NdbTransaction::AbortOption eao){
m_async_reply= 0;
pTrans->executeAsynchPrepare(et,
HugoOperations_async_callback,
this,
eao);
return NDBT_OK;
}
int
HugoOperations::wait_async(Ndb* pNdb, int timeout)
{
pNdb->pollNdb(1000);
volatile int * wait = &m_async_reply;
while (!* wait)
{
pNdb->sendPollNdb(1000);
if(m_async_reply)
if(* wait)
{
if(m_async_return)
ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl;
return m_async_return;
}
}
ndbout_c("wait returned nothing...");
return -1;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment