ndb - dbacc rewamp

  fix so that getElement read localkey from lockowner instead of from page
  plus some cleanups
parent 8b8aa227
...@@ -448,7 +448,7 @@ void Dbacc::initialiseOperationRec(Signal* signal) ...@@ -448,7 +448,7 @@ void Dbacc::initialiseOperationRec(Signal* signal)
for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) { for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) {
refresh_watch_dog(); refresh_watch_dog();
ptrAss(operationRecPtr, operationrec); ptrAss(operationRecPtr, operationrec);
operationRecPtr.p->m_op_bits = ~0; operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
operationRecPtr.p->nextOp = operationRecPtr.i + 1; operationRecPtr.p->nextOp = operationRecPtr.i + 1;
}//for }//for
operationRecPtr.i = coprecsize - 1; operationRecPtr.i = coprecsize - 1;
...@@ -910,7 +910,7 @@ void Dbacc::execACCSEIZEREQ(Signal* signal) ...@@ -910,7 +910,7 @@ void Dbacc::execACCSEIZEREQ(Signal* signal)
ptrGuard(operationRecPtr); ptrGuard(operationRecPtr);
operationRecPtr.p->userptr = tuserptr; operationRecPtr.p->userptr = tuserptr;
operationRecPtr.p->userblockref = tuserblockref; operationRecPtr.p->userblockref = tuserblockref;
operationRecPtr.p->m_op_bits = ~0; operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
/* ******************************< */ /* ******************************< */
/* ACCSEIZECONF */ /* ACCSEIZECONF */
/* ******************************< */ /* ******************************< */
...@@ -1413,7 +1413,7 @@ conf: ...@@ -1413,7 +1413,7 @@ conf:
sendAcckeyconf(signal); sendAcckeyconf(signal);
sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF, sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF,
signal, 6, JBA); signal, 6, JBB);
} }
operationRecPtr = save; operationRecPtr = save;
...@@ -1688,6 +1688,21 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) ...@@ -1688,6 +1688,21 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr)
} }
} }
bool exists = true;
switch (loPtr.p->m_op_bits & Operationrec::OP_MASK){
case ZREAD:
case ZINSERT:
case ZUPDATE:
case ZSCAN_OP:
exists = true;
break;
case ZDELETE:
exists = false;
break;
case ZWRITE:
vlqrequire(false);
}
// Validate parallel queue // Validate parallel queue
{ {
bool many = false; bool many = false;
...@@ -1739,6 +1754,26 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) ...@@ -1739,6 +1754,26 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr)
{ {
vlqrequire(orlockmode == 0); vlqrequire(orlockmode == 0);
} }
if (opstate == Operationrec::OP_STATE_RUNNING ||
opstate == Operationrec::OP_STATE_EXECUTED)
{
switch (lastP.p->m_op_bits & Operationrec::OP_MASK){
case ZREAD:
case ZUPDATE:
case ZSCAN_OP:
vlqrequire(exists);
break;
case ZDELETE:
vlqrequire(exists);
exists = false;
break;
case ZINSERT:
vlqrequire(!exists);
exists = true;
break;
}
}
} }
if (lastP.i != loPtr.i) if (lastP.i != loPtr.i)
...@@ -2227,7 +2262,7 @@ void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr, ...@@ -2227,7 +2262,7 @@ void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr,
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code) void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code)
{ {
operationRecPtr.p->m_op_bits = ~0; operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
/* ************************<< */ /* ************************<< */
/* ACCKEYREF */ /* ACCKEYREF */
/* ************************<< */ /* ************************<< */
...@@ -2446,7 +2481,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) ...@@ -2446,7 +2481,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
// init as in ACCSEIZEREQ // init as in ACCSEIZEREQ
operationRecPtr.p->userptr = req->userPtr; operationRecPtr.p->userptr = req->userPtr;
operationRecPtr.p->userblockref = req->userRef; operationRecPtr.p->userblockref = req->userRef;
operationRecPtr.p->m_op_bits = ~0; operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
operationRecPtr.p->scanRecPtr = RNIL; operationRecPtr.p->scanRecPtr = RNIL;
// do read with lock via ACCKEYREQ // do read with lock via ACCKEYREQ
Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1; Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1;
...@@ -3312,6 +3347,7 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) ...@@ -3312,6 +3347,7 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr)
tgeElementHeader = gePageptr.p->word32[tgeElementptr]; tgeElementHeader = gePageptr.p->word32[tgeElementptr];
tgeRemLen = tgeRemLen - TelemLen; tgeRemLen = tgeRemLen - TelemLen;
Uint32 hashValuePart; Uint32 hashValuePart;
Uint32 localkey1, localkey2;
lockOwnerPtr.i = RNIL; lockOwnerPtr.i = RNIL;
lockOwnerPtr.p = NULL; lockOwnerPtr.p = NULL;
if (ElementHeader::getLocked(tgeElementHeader)) { if (ElementHeader::getLocked(tgeElementHeader)) {
...@@ -3319,14 +3355,16 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) ...@@ -3319,14 +3355,16 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr)
lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader); lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec); ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec);
hashValuePart = lockOwnerPtr.p->hashvaluePart; hashValuePart = lockOwnerPtr.p->hashvaluePart;
localkey1 = lockOwnerPtr.p->localdata[0];
localkey2 = lockOwnerPtr.p->localdata[1];
} else { } else {
jam(); jam();
hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader); hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader);
localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward];
localkey2 = 0;
} }
if (hashValuePart == opHashValuePart) { if (hashValuePart == opHashValuePart) {
jam(); jam();
Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward];
Uint32 localkey2 = 0;
bool found; bool found;
if (! searchLocalKey) if (! searchLocalKey)
{ {
...@@ -4644,7 +4682,7 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) ...@@ -4644,7 +4682,7 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit)
{ {
jam(); jam();
report_dealloc(signal, opPtr.p); report_dealloc(signal, opPtr.p);
newOwner.p->localdata[0] = ~0; newOwner.p->localdata[0] = ~(Uint32)0;
} }
else else
{ {
...@@ -4692,7 +4730,7 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) ...@@ -4692,7 +4730,7 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit)
if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
{ {
report_dealloc(signal, opPtr.p); report_dealloc(signal, opPtr.p);
newOwner.p->localdata[0] = ~0; newOwner.p->localdata[0] = ~(Uint32)0;
} }
else else
{ {
...@@ -4840,7 +4878,7 @@ conf: ...@@ -4840,7 +4878,7 @@ conf:
sendAcckeyconf(signal); sendAcckeyconf(signal);
sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF, sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF,
signal, 6, JBA); signal, 6, JBB);
operationRecPtr = save; operationRecPtr = save;
return; return;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment