Commit c74e8a62 authored by unknown's avatar unknown

Fix simple/dirty read handling


ndb/include/kernel/signaldata/TcKeyConf.hpp:
  A bit for differenciating between "real" tckey conf and simple read conf
ndb/include/ndbapi/Ndb.hpp:
  Removed/ifdef's unused stuff
ndb/include/ndbapi/NdbConnection.hpp:
  Added bitmask that keeps track of used nodes when
    a transaction can be dependant on several nodes
ndb/include/ndbapi/NdbOperation.hpp:
  removed unused methods
ndb/include/ndbapi/NdbReceiver.hpp:
  Tween execTCOPCONF to handle TcKeyConf::SimpleReadBit
ndb/src/common/debugger/signaldata/TcKeyConf.cpp:
  Update printer
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  removed unused define
ndb/src/ndbapi/NdbApiSignal.cpp:
  Fix length of TC_COMMITREQ
ndb/test/ndbapi/testOperations.cpp:
  fix simple/dirty read
parent 809d52d1
......@@ -47,7 +47,8 @@ public:
*/
STATIC_CONST( StaticLength = 5 );
STATIC_CONST( OperationLength = 2 );
STATIC_CONST( SimpleReadBit = (((Uint32)1) << 31) );
private:
/**
......
......@@ -1612,7 +1612,6 @@ private:
char prefixName[NDB_MAX_INTERNAL_TABLE_LENGTH];
char * prefixEnd;
//Table* theTable; // The table object
class NdbImpl * theImpl;
class NdbDictionaryImpl* theDictionary;
class NdbGlobalEventBufferHandle* theGlobalEventBufferHandle;
......@@ -1698,10 +1697,13 @@ private:
NdbApiSignal* theCommitAckSignal;
#ifdef POORMANSPURIFY
int cfreeSignals;
int cnewSignals;
int cgetSignals;
int creleaseSignals;
#endif
static void executeMessage(void*, NdbApiSignal *,
struct LinearSectionPtr ptr[3]);
......
......@@ -526,9 +526,8 @@ private:
int sendCOMMIT(); // Send a TC_COMMITREQ signal;
void setGCI(int GCI); // Set the global checkpoint identity
int OpCompleteFailure(); // Operation Completed with success
int OpCompleteSuccess(); // Operation Completed with success
int OpCompleteFailure(Uint8 abortoption);
int OpCompleteSuccess();
void CompletedOperations(); // Move active ops to list of completed
void OpSent(); // Operation Sent with success
......@@ -649,6 +648,16 @@ private:
Uint32 theNodeSequence; // The sequence no of the db node
bool theReleaseOnClose;
/**
* handle transaction spanning
* multiple TC/db nodes
*
* 1) Bitmask with used nodes
* 2) Bitmask with nodes failed during op
*/
Uint32 m_db_nodes[2];
Uint32 m_failed_db_nodes[2];
// Scan operations
bool m_waitForReply;
NdbIndexScanOperation* m_theFirstScanOperation;
......
......@@ -787,11 +787,6 @@ protected:
int receiveTCKEYREF(NdbApiSignal*);
int receiveTRANSID_AI(const Uint32* aDataPtr, Uint32 aDataLength);
int receiveREAD_CONF(const Uint32* aDataPtr, Uint32 aDataLength);
int checkMagicNumber(bool b = true); // Verify correct object
int checkState_TransId(NdbApiSignal* aSignal);
......@@ -815,8 +810,6 @@ protected:
int branch_col_null(Uint32 type, Uint32 col, Uint32 Label);
// Handle ATTRINFO signals
int receiveREAD_AI(Uint32* aDataPtr, Uint32 aLength);
int insertATTRINFO(Uint32 aData);
int insertATTRINFOloop(const Uint32* aDataPtr, Uint32 aLength);
......
......@@ -19,6 +19,7 @@
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL // Not part of public interface
#include <ndb_types.h>
#include <ndb_global.h>
class Ndb;
class NdbReceiver
......@@ -127,7 +128,8 @@ int
NdbReceiver::execTCOPCONF(Uint32 len){
Uint32 tmp = m_received_result_length;
m_expected_result_length = len;
return (tmp == len ? 1 : 0);
assert(!(tmp && !len));
return ((bool)len ^ (bool)tmp ? 0 : 1);
}
inline
......
......@@ -22,38 +22,48 @@ printTCKEYCONF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receive
if (receiverBlockNo == API_PACKED) {
fprintf(output, "Signal data: ");
Uint32 i = 0;
while (i < len)
fprintf(output, "H\'%.8x ", theData[i++]);
fprintf(output,"\n");
return false;
Uint32 Theader = * theData++;
Uint32 TpacketLen = (Theader & 0x1F) + 3;
Uint32 TrecBlockNo = Theader >> 16;
do {
fprintf(output, "Block: %d %d %d\n", TrecBlockNo, len, TpacketLen);
printTCKEYCONF(output, theData, TpacketLen, TrecBlockNo);
assert(len >= (1 + TpacketLen));
len -= (1 + TpacketLen);
theData += TpacketLen;
} while(len);
return true;
}
else {
const TcKeyConf * const sig = (TcKeyConf *) theData;
fprintf(output, "Signal data: ");
Uint32 i = 0;
Uint32 confInfo = sig->confInfo;
Uint32 noOfOp = TcKeyConf::getNoOfOperations(confInfo);
if (noOfOp > 10) noOfOp = 10;
while (i < len)
fprintf(output, "H\'%.8x ", theData[i++]);
fprintf(output,"\n");
fprintf(output, "apiConnectPtr: H'%.8x, gci: %u, transId:(H'%.8x, H'%.8x)\n",
fprintf(output, " apiConnectPtr: H'%.8x, gci: %u, transId:(H'%.8x, H'%.8x)\n",
sig->apiConnectPtr, sig->gci, sig->transId1, sig->transId2);
fprintf(output, "noOfOperations: %u, commitFlag: %s, markerFlag: %s\n",
fprintf(output, " noOfOperations: %u, commitFlag: %s, markerFlag: %s\n",
noOfOp,
(TcKeyConf::getCommitFlag(confInfo) == 0)?"false":"true",
(TcKeyConf::getMarkerFlag(confInfo) == 0)?"false":"true");
fprintf(output, "Operations:\n");
for(i = 0; i < noOfOp; i++) {
fprintf(output,
"apiOperationPtr: H'%.8x, attrInfoLen: %u\n",
sig->operations[i].apiOperationPtr,
sig->operations[i].attrInfoLen);
if(sig->operations[i].attrInfoLen > TcKeyConf::SimpleReadBit)
fprintf(output,
" apiOperationPtr: H'%.8x, simplereadnode: %u\n",
sig->operations[i].apiOperationPtr,
sig->operations[i].attrInfoLen & (~TcKeyConf::SimpleReadBit));
else
fprintf(output,
" apiOperationPtr: H'%.8x, attrInfoLen: %u\n",
sig->operations[i].apiOperationPtr,
sig->operations[i].attrInfoLen);
}
}
return true;
}
......@@ -234,10 +234,6 @@
#define ZNODE_UP 0
#define ZNODE_DOWN 1
/* ------------------------------------------------------------------------- */
/* OPERATION TYPES */
/* ------------------------------------------------------------------------- */
#define ZSIMPLE_READ 1
/* ------------------------------------------------------------------------- */
/* START PHASES */
/* ------------------------------------------------------------------------- */
#define ZLAST_START_PHASE 255
......
......@@ -3299,8 +3299,8 @@ void Dblqh::execLQHKEYREQ(Signal* signal)
regTcPtr->dirtyOp = LqhKeyReq::getDirtyFlag(Treqinfo);
regTcPtr->opExec = LqhKeyReq::getInterpretedFlag(Treqinfo);
regTcPtr->opSimple = LqhKeyReq::getSimpleFlag(Treqinfo);
regTcPtr->simpleRead = ((Treqinfo >> 18) & 15);
regTcPtr->operation = LqhKeyReq::getOperation(Treqinfo);
regTcPtr->simpleRead = regTcPtr->operation == ZREAD && regTcPtr->opSimple;
regTcPtr->seqNoReplica = LqhKeyReq::getSeqNoReplica(Treqinfo);
UintR TreclenAiLqhkey = LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
regTcPtr->apiVersionNo = 0;
......@@ -3431,7 +3431,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal)
if ((tfragDistKey != TdistKey) &&
(regTcPtr->seqNoReplica == 0) &&
(regTcPtr->dirtyOp == ZFALSE) &&
(regTcPtr->simpleRead != ZSIMPLE_READ)) {
(regTcPtr->simpleRead == ZFALSE)) {
/* ----------------------------------------------------------------------
* WE HAVE DIFFERENT OPINION THAN THE DIH THAT STARTED THE TRANSACTION.
* THE REASON COULD BE THAT THIS IS AN OLD DISTRIBUTION WHICH IS NO LONGER
......@@ -3439,7 +3439,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal)
* ONE IS ADDED TO THE DISTRIBUTION KEY EVERY TIME WE ADD A NEW REPLICA.
* FAILED REPLICAS DO NOT AFFECT THE DISTRIBUTION KEY. THIS MEANS THAT THE
* MAXIMUM DEVIATION CAN BE ONE BETWEEN THOSE TWO VALUES.
* ---------------------------------------------------------------------- */
* --------------------------------------------------------------------- */
Int32 tmp = TdistKey - tfragDistKey;
tmp = (tmp < 0 ? - tmp : tmp);
if ((tmp <= 1) || (tfragDistKey == 0)) {
......@@ -3879,7 +3879,7 @@ void Dblqh::tupkeyConfLab(Signal* signal)
/* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED ---- */
const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
TcConnectionrec * const regTcPtr = tcConnectptr.p;
if (regTcPtr->simpleRead == ZSIMPLE_READ) {
if (regTcPtr->simpleRead) {
jam();
/* ----------------------------------------------------------------------
* THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE OPERATION.
......@@ -5467,6 +5467,8 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
TcConnectionrec * const regTcPtr = tcConnectptr.p;
Fragrecord * const regFragptr = fragptr.p;
Uint32 operation = regTcPtr->operation;
Uint32 simpleRead = regTcPtr->simpleRead;
Uint32 dirtyOp = regTcPtr->dirtyOp;
if (regTcPtr->activeCreat == ZFALSE) {
if ((cCommitBlocked == true) &&
(regFragptr->fragActiveStatus == ZTRUE)) {
......@@ -5504,13 +5506,18 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
tupCommitReq->hashValue = regTcPtr->hashValue;
EXECUTE_DIRECT(tup, GSN_TUP_COMMITREQ, signal,
TupCommitReq::SignalLength);
}//if
Uint32 acc = refToBlock(regTcPtr->tcAccBlockref);
signal->theData[0] = regTcPtr->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
Uint32 simpleRead = regTcPtr->simpleRead;
Uint32 acc = refToBlock(regTcPtr->tcAccBlockref);
signal->theData[0] = regTcPtr->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
} else {
if(!dirtyOp){
Uint32 acc = refToBlock(regTcPtr->tcAccBlockref);
signal->theData[0] = regTcPtr->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
}
}
jamEntry();
if (simpleRead == ZSIMPLE_READ) {
if (simpleRead) {
jam();
/* ------------------------------------------------------------------------- */
/*THE OPERATION WAS A SIMPLE READ THUS THE COMMIT PHASE IS ONLY NEEDED TO */
......@@ -5523,7 +5530,6 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
return;
}//if
}//if
Uint32 dirtyOp = regTcPtr->dirtyOp;
Uint32 seqNoReplica = regTcPtr->seqNoReplica;
if (regTcPtr->gci > regFragptr->newestGci) {
jam();
......@@ -6092,7 +6098,7 @@ void Dblqh::abortStateHandlerLab(Signal* signal)
/* ------------------------------------------------------------------------- */
return;
}//if
if (regTcPtr->simpleRead == ZSIMPLE_READ) {
if (regTcPtr->simpleRead) {
jam();
/* ------------------------------------------------------------------------- */
/*A SIMPLE READ IS CURRENTLY RELEASING THE LOCKS OR WAITING FOR ACCESS TO */
......@@ -6378,7 +6384,7 @@ void Dblqh::continueAbortLab(Signal* signal)
void Dblqh::continueAfterLogAbortWriteLab(Signal* signal)
{
TcConnectionrec * const regTcPtr = tcConnectptr.p;
if (regTcPtr->simpleRead == ZSIMPLE_READ) {
if (regTcPtr->simpleRead) {
jam();
TcKeyRef * const tcKeyRef = (TcKeyRef *) signal->getDataPtrSend();
......
......@@ -1459,7 +1459,7 @@ private:
void releaseAttrinfo();
void releaseGcp(Signal* signal);
void releaseKeys();
void releaseSimpleRead(Signal* signal);
void releaseSimpleRead(Signal*, ApiConnectRecordPtr, TcConnectRecord*);
void releaseDirtyWrite(Signal* signal);
void releaseTcCon();
void releaseTcConnectFail(Signal* signal);
......
This diff is collapsed.
......@@ -168,7 +168,7 @@ NdbApiSignal::setSignal(int aNdbSignalType)
theTrace = TestOrd::TraceAPI;
theReceiversBlockNumber = DBTC;
theVerId_signalNumber = GSN_TC_COMMITREQ;
theLength = 5;
theLength = 3;
}
break;
......
......@@ -83,6 +83,11 @@ NdbConnection::NdbConnection( Ndb* aNdb ) :
theListState = NotInList;
theError.code = 0;
theId = theNdb->theNdbObjectIdMap->map(this);
#define CHECK_SZ(mask, sz) assert((sizeof(mask)/sizeof(mask[0])) == sz)
CHECK_SZ(m_db_nodes, NdbNodeBitmask::Size);
CHECK_SZ(m_failed_db_nodes, NdbNodeBitmask::Size);
}//NdbConnection::NdbConnection()
/*****************************************************************************
......@@ -490,11 +495,6 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
theListState = InPreparedList;
tNdb->theNoOfPreparedTransactions = tnoOfPreparedTransactions + 1;
if(tCommitStatus == Committed){
tCommitStatus = Started;
tTransactionIsStarted = false;
}
if ((tCommitStatus != Started) ||
(aTypeOfExec == Rollback)) {
/*****************************************************************************
......@@ -503,7 +503,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
* same action.
****************************************************************************/
if (aTypeOfExec == Rollback) {
if (theTransactionIsStarted == false) {
if (theTransactionIsStarted == false || theSimpleState) {
theCommitStatus = Aborted;
theSendStatus = sendCompleted;
} else {
......@@ -529,6 +529,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
}//if
} else {
if (aTypeOfExec == Commit) {
if (aTypeOfExec == Commit && !theSimpleState) {
/**********************************************************************
* A Transaction have been started and no more operations exist.
* We will use the commit method.
......@@ -610,6 +611,8 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
theNoOfOpSent = 0;
theNoOfOpCompleted = 0;
theSendStatus = sendOperations;
NdbNodeBitmask::clear(m_db_nodes);
NdbNodeBitmask::clear(m_failed_db_nodes);
DBUG_VOID_RETURN;
}//NdbConnection::executeAsynchPrepare()
......@@ -1541,6 +1544,10 @@ from other transactions.
tPtr++;
if (tOp && tOp->checkMagicNumber()) {
tNoComp += tOp->execTCOPCONF(tAttrInfoLen);
if(tAttrInfoLen > TcKeyConf::SimpleReadBit){
NdbNodeBitmask::set(m_db_nodes,
tAttrInfoLen & (~TcKeyConf::SimpleReadBit));
}
} else {
return -1;
}//if
......@@ -1790,7 +1797,7 @@ Parameters: aErrorCode: The error code.
Remark: An operation was completed with failure.
*******************************************************************************/
int
NdbConnection::OpCompleteFailure()
NdbConnection::OpCompleteFailure(Uint8 abortOption)
{
Uint32 tNoComp = theNoOfOpCompleted;
Uint32 tNoSent = theNoOfOpSent;
......@@ -1804,10 +1811,7 @@ NdbConnection::OpCompleteFailure()
//decide the success of the whole transaction since a simple
//operation is not really part of that transaction.
//------------------------------------------------------------------------
if (theSimpleState == 1) {
theCommitStatus = NdbConnection::Aborted;
}//if
if (m_abortOption == IgnoreError){
if (abortOption == IgnoreError){
/**
* There's always a TCKEYCONF when using IgnoreError
*/
......@@ -1842,9 +1846,6 @@ NdbConnection::OpCompleteSuccess()
tNoComp++;
theNoOfOpCompleted = tNoComp;
if (tNoComp == tNoSent) { // Last operation completed
if (theSimpleState == 1) {
theCommitStatus = NdbConnection::Committed;
}//if
return 0;
} else if (tNoComp < tNoSent) {
return -1; // Continue waiting for more signals
......
......@@ -87,7 +87,19 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex,
int NdbIndexOperation::readTuple(NdbOperation::LockMode lm)
{
return NdbOperation::readTuple(lm);
switch(lm) {
case LM_Read:
return readTuple();
break;
case LM_Exclusive:
return readTupleExclusive();
break;
case LM_CommittedRead:
return readTuple();
break;
default:
return -1;
};
}
int NdbIndexOperation::readTuple()
......@@ -108,21 +120,21 @@ int NdbIndexOperation::simpleRead()
{
// First check that index is unique
return NdbOperation::simpleRead();
return NdbOperation::readTuple();
}
int NdbIndexOperation::dirtyRead()
{
// First check that index is unique
return NdbOperation::dirtyRead();
return NdbOperation::readTuple();
}
int NdbIndexOperation::committedRead()
{
// First check that index is unique
return NdbOperation::committedRead();
return NdbOperation::readTuple();
}
int NdbIndexOperation::updateTuple()
......@@ -536,7 +548,7 @@ NdbIndexOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransactionId)
//-------------------------------------------------------------
Uint8 tReadInd = (theOperationType == ReadRequest);
Uint8 tSimpleState = tReadInd & tSimpleAlt;
theNdbCon->theSimpleState = tSimpleState;
//theNdbCon->theSimpleState = tSimpleState;
tcIndxReq->transId1 = tTransId1;
tcIndxReq->transId2 = tTransId2;
......@@ -723,23 +735,10 @@ NdbIndexOperation::receiveTCINDXREF( NdbApiSignal* aSignal)
theStatus = Finished;
theNdbCon->theReturnStatus = NdbConnection::ReturnFailure;
//--------------------------------------------------------------------------//
// If the transaction this operation belongs to consists only of simple reads
// we set the error code on the transaction object.
// If the transaction consists of other types of operations we set
// the error code only on the operation since the simple read is not really
// part of this transaction and we can not decide the status of the whole
// transaction based on this operation.
//--------------------------------------------------------------------------//
Uint32 errorCode = tcIndxRef->errorCode;
if (theNdbCon->theSimpleState == 0) {
theError.code = errorCode;
theNdbCon->setOperationErrorCodeAbort(errorCode);
return theNdbCon->OpCompleteFailure();
} else {
theError.code = errorCode;
return theNdbCon->OpCompleteSuccess();
}
theError.code = errorCode;
theNdbCon->setOperationErrorCodeAbort(errorCode);
return theNdbCon->OpCompleteFailure(theNdbCon->m_abortOption);
}//NdbIndexOperation::receiveTCINDXREF()
......
......@@ -116,7 +116,7 @@ NdbOperation::readTuple(NdbOperation::LockMode lm)
return readTupleExclusive();
break;
case LM_CommittedRead:
return readTuple();
return committedRead();
break;
default:
return -1;
......@@ -191,18 +191,24 @@ NdbOperation::readTupleExclusive()
int
NdbOperation::simpleRead()
{
/**
* Currently/still disabled
*/
return readTuple();
#if 0
int tErrorLine = theErrorLine;
if (theStatus == Init) {
theStatus = OperationDefined;
theOperationType = ReadRequest;
theSimpleIndicator = 1;
theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
theLockMode = LM_Read;
return 0;
} else {
setErrorCode(4200);
return -1;
}//if
#endif
}//NdbOperation::simpleRead()
/*****************************************************************************
......
......@@ -161,28 +161,17 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId)
tTransId1 = (Uint32) aTransId;
tTransId2 = (Uint32) (aTransId >> 32);
//-------------------------------------------------------------
// Simple is simple if simple or both start and commit is set.
//-------------------------------------------------------------
// Temporarily disable simple stuff
Uint8 tSimpleIndicator = 0;
// Uint8 tSimpleIndicator = theSimpleIndicator;
Uint8 tSimpleIndicator = theSimpleIndicator;
Uint8 tCommitIndicator = theCommitIndicator;
Uint8 tStartIndicator = theStartIndicator;
// if ((theNdbCon->theLastOpInList == this) && (theCommitIndicator == 0))
// abort();
// Temporarily disable simple stuff
Uint8 tSimpleAlt = 0;
// Uint8 tSimpleAlt = tStartIndicator & tCommitIndicator;
tSimpleIndicator = tSimpleIndicator | tSimpleAlt;
Uint8 tInterpretIndicator = theInterpretIndicator;
//-------------------------------------------------------------
// Simple state is set if start and commit is set and it is
// a read request. Otherwise it is set to zero.
//-------------------------------------------------------------
Uint8 tReadInd = (theOperationType == ReadRequest);
Uint8 tSimpleState = tReadInd & tSimpleAlt;
theNdbCon->theSimpleState = tSimpleState;
Uint8 tSimpleState = tReadInd & tSimpleIndicator;
tcKeyReq->transId1 = tTransId1;
tcKeyReq->transId2 = tTransId2;
......@@ -197,7 +186,6 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId)
tcKeyReq->setSimpleFlag(tReqInfo, tSimpleIndicator);
tcKeyReq->setCommitFlag(tReqInfo, tCommitIndicator);
tcKeyReq->setStartFlag(tReqInfo, tStartIndicator);
const Uint8 tInterpretIndicator = theInterpretIndicator;
tcKeyReq->setInterpretedFlag(tReqInfo, tInterpretIndicator);
Uint8 tDirtyIndicator = theDirtyIndicator;
......@@ -208,6 +196,9 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId)
tcKeyReq->setDirtyFlag(tReqInfo, tDirtyIndicator);
tcKeyReq->setOperationType(tReqInfo, tOperationType);
tcKeyReq->setKeyLength(tReqInfo, tTupKeyLen);
// A simple read is always ignore error
abortOption = tSimpleIndicator ? IgnoreError : abortOption;
tcKeyReq->setAbortOption(tReqInfo, abortOption);
Uint8 tDistrKeyIndicator = theDistrKeyIndicator;
......@@ -551,25 +542,16 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal)
}//if
theStatus = Finished;
theNdbCon->theReturnStatus = NdbConnection::ReturnFailure;
//-------------------------------------------------------------------------//
// If the transaction this operation belongs to consists only of simple reads
// we set the error code on the transaction object.
// If the transaction consists of other types of operations we set
// the error code only on the operation since the simple read is not really
// part of this transaction and we can not decide the status of the whole
// transaction based on this operation.
//-------------------------------------------------------------------------//
if (theNdbCon->theSimpleState == 0) {
theError.code = aSignal->readData(4);
theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4));
return theNdbCon->OpCompleteFailure();
} else {
theError.code = aSignal->readData(4);
return theNdbCon->OpCompleteSuccess();
}
theError.code = aSignal->readData(4);
theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4));
if(theOperationType != ReadRequest || !theSimpleIndicator) // not simple read
return theNdbCon->OpCompleteFailure(theNdbCon->m_abortOption);
// Simple read is always ignore error
return theNdbCon->OpCompleteFailure(IgnoreError);
}//NdbOperation::receiveTCKEYREF()
......
......@@ -22,6 +22,7 @@
#include <AttributeHeader.hpp>
#include <NdbConnection.hpp>
#include <TransporterFacade.hpp>
#include <signaldata/TcKeyConf.hpp>
NdbReceiver::NdbReceiver(Ndb *aNdb) :
theMagicNumber(0),
......@@ -249,10 +250,11 @@ NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
/**
* Update m_received_result_length
*/
Uint32 exp = m_expected_result_length;
Uint32 tmp = m_received_result_length + aLength;
m_received_result_length = tmp;
return (tmp == m_expected_result_length ? 1 : 0);
return (tmp == exp || (exp > TcKeyConf::SimpleReadBit) ? 1 : 0);
}
int
......
......@@ -908,6 +908,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
if (newOp == NULL){
return NULL;
}
pTrans->theSimpleState = 0;
const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
......
......@@ -107,8 +107,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theOpIdleList= NULL;
theScanOpIdleList= NULL;
theIndexOpIdleList= NULL;
// theSchemaConIdleList= NULL;
// theSchemaConToNdbList= NULL;
theTransactionList= NULL;
theConnectionArray= NULL;
theRecAttrIdleList= NULL;
......@@ -134,10 +132,13 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
fullyQualifiedNames = true;
#ifdef POORMANSPURIFY
cgetSignals =0;
cfreeSignals = 0;
cnewSignals = 0;
creleaseSignals = 0;
#endif
theError.code = 0;
theNdbObjectIdMap = new NdbObjectIdMap(1024,1024);
......
......@@ -432,11 +432,15 @@ Ndb::getSignal()
theSignalIdleList = tSignalNext;
} else {
tSignal = new NdbApiSignal(theMyRef);
#ifdef POORMANSPURIFY
cnewSignals++;
#endif
if (tSignal != NULL)
tSignal->next(NULL);
}
#ifdef POORMANSPURIFY
cgetSignals++;
#endif
return tSignal;
}
......@@ -605,7 +609,9 @@ Ndb::releaseSignal(NdbApiSignal* aSignal)
}
#endif
#endif
#ifdef POORMANSPURIFY
creleaseSignals++;
#endif
aSignal->next(theSignalIdleList);
theSignalIdleList = aSignal;
}
......@@ -769,7 +775,9 @@ Ndb::freeSignal()
NdbApiSignal* tSignal = theSignalIdleList;
theSignalIdleList = tSignal->next();
delete tSignal;
#ifdef POORMANSPURIFY
cfreeSignals++;
#endif
}
void
......
......@@ -86,7 +86,7 @@ OperationTestCase matrix[] = {
{ "DeleteRead", true, "DELETE", 0, 0, "READ", 626, 0, 0, 0 },
{ "DeleteReadEx", true, "DELETE", 0, 0, "READ-EX", 626, 0, 0, 0 },
{ "DeleteSimpleRead", true, "DELETE", 0, 0, "S-READ", 626, 0, 0, 0 },
{ "DeleteDirtyRead", true, "DELETE", 0, 0, "D-READ", 626, 0, 0, 0 },
{ "DeleteDirtyRead", true, "DELETE", 0, 0, "D-READ", 626, 0, 626, 0 },
{ "DeleteInsert", true, "DELETE", 0, 0, "INSERT", 0, 1, 0, 1 },
{ "DeleteUpdate", true, "DELETE", 0, 0, "UPDATE", 626, 1, 0, 0 },
{ "DeleteDelete", true, "DELETE", 0, 0, "DELETE", 626, 0, 0, 0 }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment