Commit be0ab479 authored by unknown's avatar unknown

ndb - bug#19293 and family

  introduce acc per row logical mutex to fix difficult error handling cases
  


storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp:
  1) Fix per row mutex so that only 1 op at a time is running on a row
  2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) }
  3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue
  4) Impl. a validate_lock_queue/dump_lock_queue test framework
storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp:
  1) Fix per row mutex so that only 1 op at a time is running on a row
  2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) }
  3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue
  4) Impl. a validate_lock_queue/dump_lock_queue test framework
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
  1) Fix per row mutex so that only 1 op at a time is running on a row
  2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) }
  3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue
  4) Impl. a validate_lock_queue/dump_lock_queue test framework
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  1) impl. a new read key from operation record needed by acc
  2) expand TRACE_OP toolkit
  3) impl. ACCKEY_ORD as needed by ACC changes
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  1) impl. a new read key from operation record needed by acc
  2) expand TRACE_OP toolkit
  3) impl. ACCKEY_ORD as needed by ACC changes
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  remove unused states/methods
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  remove extremly tricky code that handles disk_insert_but_no_mem_insert
    that is no long needed with current acc changes
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp:
  remove unused states/methods
storage/ndb/test/ndbapi/testOperations.cpp:
  renable last 3 lock upgrade testcases since they now pass
parent 7d6ee98f
...@@ -17,14 +17,13 @@ ...@@ -17,14 +17,13 @@
#ifndef DBACC_H #ifndef DBACC_H
#define DBACC_H #define DBACC_H
#ifdef VM_TRACE
#define ACC_SAFE_QUEUE
#endif
#include <pc.hpp> #include <pc.hpp>
#include <SimulatedBlock.hpp> #include <SimulatedBlock.hpp>
// primary key is stored in TUP
#include "../dbtup/Dbtup.hpp"
#ifdef DBACC_C #ifdef DBACC_C
// Debug Macros // Debug Macros
#define dbgWord32(ptr, ind, val) #define dbgWord32(ptr, ind, val)
...@@ -135,7 +134,10 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: " ...@@ -135,7 +134,10 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: "
#define ZRIGHT 2 #define ZRIGHT 2
#define ZROOTFRAGMENTSIZE 32 #define ZROOTFRAGMENTSIZE 32
#define ZSCAN_LOCK_ALL 3 #define ZSCAN_LOCK_ALL 3
#define ZSCAN_OP 5 /**
* Check kernel_types for other operation types
*/
#define ZSCAN_OP 6
#define ZSCAN_REC_SIZE 256 #define ZSCAN_REC_SIZE 256
#define ZSTAND_BY 2 #define ZSTAND_BY 2
#define ZTABLESIZE 16 #define ZTABLESIZE 16
...@@ -477,6 +479,7 @@ struct Fragmentrec { ...@@ -477,6 +479,7 @@ struct Fragmentrec {
/* OPERATIONREC */ /* OPERATIONREC */
/* --------------------------------------------------------------------------------- */ /* --------------------------------------------------------------------------------- */
struct Operationrec { struct Operationrec {
Uint32 m_op_bits;
Uint32 localdata[2]; Uint32 localdata[2];
Uint32 elementIsforward; Uint32 elementIsforward;
Uint32 elementPage; Uint32 elementPage;
...@@ -485,36 +488,62 @@ struct Operationrec { ...@@ -485,36 +488,62 @@ struct Operationrec {
Uint32 fragptr; Uint32 fragptr;
Uint32 hashvaluePart; Uint32 hashvaluePart;
Uint32 hashValue; Uint32 hashValue;
Uint32 insertDeleteLen;
Uint32 nextLockOwnerOp; Uint32 nextLockOwnerOp;
Uint32 nextOp; Uint32 nextOp;
Uint32 nextParallelQue; Uint32 nextParallelQue;
union {
Uint32 nextSerialQue; Uint32 nextSerialQue;
Uint32 m_lock_owner_ptr_i; // if nextParallelQue = RNIL, else undefined
};
Uint32 prevOp; Uint32 prevOp;
Uint32 prevLockOwnerOp; Uint32 prevLockOwnerOp;
union {
Uint32 prevParallelQue; Uint32 prevParallelQue;
Uint32 m_lo_last_parallel_op_ptr_i;
};
union {
Uint32 prevSerialQue; Uint32 prevSerialQue;
Uint32 m_lo_last_serial_op_ptr_i;
};
Uint32 scanRecPtr; Uint32 scanRecPtr;
Uint32 transId1; Uint32 transId1;
Uint32 transId2; Uint32 transId2;
State opState;
Uint32 userptr; Uint32 userptr;
State transactionstate;
Uint16 elementContainer; Uint16 elementContainer;
Uint16 tupkeylen; Uint16 tupkeylen;
Uint32 xfrmtupkeylen; Uint32 xfrmtupkeylen;
Uint32 userblockref; Uint32 userblockref;
Uint32 scanBits; Uint32 scanBits;
Uint8 elementIsDisappeared;
Uint8 insertIsDone; enum OpBits {
Uint8 lockMode; OP_MASK = 0x0000F // 4 bits for operation type
Uint8 lockOwner; ,OP_LOCK_MODE = 0x00010 // 0 - shared lock, 1 = exclusive lock
Uint8 nodeType; ,OP_ACC_LOCK_MODE = 0x00020 // Or:de lock mode of all operation
Uint8 operation; // before me
Uint8 opSimple; ,OP_LOCK_OWNER = 0x00040
Uint8 dirtyRead; ,OP_RUN_QUEUE = 0x00080 // In parallell queue of lock owner
Uint8 commitDeleteCheckFlag; ,OP_DIRTY_READ = 0x00100
Uint8 isAccLockReq; ,OP_LOCK_REQ = 0x00200 // isAccLockReq
,OP_COMMIT_DELETE_CHECK = 0x00400
,OP_INSERT_IS_DONE = 0x00800
,OP_ELEMENT_DISAPPEARED = 0x01000
,OP_STATE_MASK = 0xF0000
,OP_STATE_IDLE = 0xF0000
,OP_STATE_WAITING = 0x00000
,OP_STATE_RUNNING = 0x10000
,OP_STATE_EXECUTED = 0x30000
,OP_STATE_RUNNING_ABORT = 0x20000
,OP_EXECUTED_DIRTY_READ = 0x3050F
,OP_INITIAL = ~(Uint32)0
};
bool is_same_trans(const Operationrec* op) const {
return
transId1 == op->transId1 && transId2 == op->transId2;
}
}; /* p2c: size = 168 bytes */ }; /* p2c: size = 168 bytes */
typedef Ptr<Operationrec> OperationrecPtr; typedef Ptr<Operationrec> OperationrecPtr;
...@@ -577,7 +606,6 @@ struct ScanRec { ...@@ -577,7 +606,6 @@ struct ScanRec {
Uint32 scanUserblockref; Uint32 scanUserblockref;
Uint32 scanMask; Uint32 scanMask;
Uint8 scanLockMode; Uint8 scanLockMode;
Uint8 scanKeyinfoFlag;
Uint8 scanTimer; Uint8 scanTimer;
Uint8 scanContinuebCounter; Uint8 scanContinuebCounter;
Uint8 scanReadCommittedFlag; Uint8 scanReadCommittedFlag;
...@@ -602,7 +630,8 @@ public: ...@@ -602,7 +630,8 @@ public:
virtual ~Dbacc(); virtual ~Dbacc();
// pointer to TUP instance in this thread // pointer to TUP instance in this thread
Dbtup* c_tup; class Dbtup* c_tup;
class Dblqh* c_lqh;
void execACCMINUPDATE(Signal* signal); void execACCMINUPDATE(Signal* signal);
...@@ -640,6 +669,7 @@ private: ...@@ -640,6 +669,7 @@ private:
void ACCKEY_error(Uint32 fromWhere); void ACCKEY_error(Uint32 fromWhere);
void commitDeleteCheck(); void commitDeleteCheck();
void report_dealloc(Signal* signal, const Operationrec* opPtrP);
typedef void * RootfragmentrecPtr; typedef void * RootfragmentrecPtr;
void initRootFragPageZero(FragmentrecPtr, Page8Ptr); void initRootFragPageZero(FragmentrecPtr, Page8Ptr);
...@@ -679,14 +709,30 @@ private: ...@@ -679,14 +709,30 @@ private:
bool addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fragId); bool addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fragId);
void initOpRec(Signal* signal); void initOpRec(Signal* signal);
void sendAcckeyconf(Signal* signal); void sendAcckeyconf(Signal* signal);
Uint32 placeReadInLockQueue(Signal* signal);
void placeSerialQueueRead(Signal* signal);
void checkOnlyReadEntry(Signal* signal);
Uint32 getNoParallelTransaction(const Operationrec*); Uint32 getNoParallelTransaction(const Operationrec*);
void moveLastParallelQueue(Signal* signal);
void moveLastParallelQueueWrite(Signal* signal); #ifdef VM_TRACE
Uint32 placeWriteInLockQueue(Signal* signal); Uint32 getNoParallelTransactionFull(const Operationrec*);
void placeSerialQueueWrite(Signal* signal); #endif
#ifdef ACC_SAFE_QUEUE
bool validate_lock_queue(OperationrecPtr opPtr);
Uint32 get_parallel_head(OperationrecPtr opPtr);
void dump_lock_queue(OperationrecPtr loPtr);
#else
bool validate_lock_queue(OperationrecPtr) { return true;}
#endif
public:
void execACCKEY_ORD(Signal* signal, Uint32 opPtrI);
void startNext(Signal* signal, OperationrecPtr lastOp);
private:
Uint32 placeReadInLockQueue(OperationrecPtr lockOwnerPtr);
Uint32 placeWriteInLockQueue(OperationrecPtr lockOwnerPtr);
void placeSerialQueue(OperationrecPtr lockOwner, OperationrecPtr op);
void abortSerieQueueOperation(Signal* signal, OperationrecPtr op);
void abortParallelQueueOperation(Signal* signal, OperationrecPtr op);
void expandcontainer(Signal* signal); void expandcontainer(Signal* signal);
void shrinkcontainer(Signal* signal); void shrinkcontainer(Signal* signal);
void nextcontainerinfoExp(Signal* signal); void nextcontainerinfoExp(Signal* signal);
...@@ -716,8 +762,8 @@ private: ...@@ -716,8 +762,8 @@ private:
void increaselistcont(Signal* signal); void increaselistcont(Signal* signal);
void seizeLeftlist(Signal* signal); void seizeLeftlist(Signal* signal);
void seizeRightlist(Signal* signal); void seizeRightlist(Signal* signal);
Uint32 readTablePk(Uint32 localkey1); Uint32 readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec*);
void getElement(Signal* signal); Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
void getdirindex(Signal* signal); void getdirindex(Signal* signal);
void commitdelete(Signal* signal); void commitdelete(Signal* signal);
void deleteElement(Signal* signal); void deleteElement(Signal* signal);
...@@ -726,12 +772,17 @@ private: ...@@ -726,12 +772,17 @@ private:
void releaseRightlist(Signal* signal); void releaseRightlist(Signal* signal);
void checkoverfreelist(Signal* signal); void checkoverfreelist(Signal* signal);
void abortOperation(Signal* signal); void abortOperation(Signal* signal);
void accAbortReqLab(Signal* signal);
void commitOperation(Signal* signal); void commitOperation(Signal* signal);
void copyOpInfo(Signal* signal); void copyOpInfo(OperationrecPtr dst, OperationrecPtr src);
Uint32 executeNextOperation(Signal* signal); Uint32 executeNextOperation(Signal* signal);
void releaselock(Signal* signal); void releaselock(Signal* signal);
void release_lockowner(Signal* signal, OperationrecPtr, bool commit);
void startNew(Signal* signal, OperationrecPtr newOwner);
void abortWaitingOperation(Signal*, OperationrecPtr);
void abortExecutedOperation(Signal*, OperationrecPtr);
void takeOutFragWaitQue(Signal* signal); void takeOutFragWaitQue(Signal* signal);
void check_lock_upgrade(Signal* signal, OperationrecPtr release_op, bool lo);
void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner, void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner,
OperationrecPtr release_op); OperationrecPtr release_op);
void allocOverflowPage(Signal* signal); void allocOverflowPage(Signal* signal);
...@@ -780,8 +831,8 @@ private: ...@@ -780,8 +831,8 @@ private:
void senddatapagesLab(Signal* signal); void senddatapagesLab(Signal* signal);
void sttorrysignalLab(Signal* signal); void sttorrysignalLab(Signal* signal);
void sendholdconfsignalLab(Signal* signal); void sendholdconfsignalLab(Signal* signal);
void accIsLockedLab(Signal* signal); void accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr);
void insertExistElemLab(Signal* signal); void insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr);
void refaccConnectLab(Signal* signal); void refaccConnectLab(Signal* signal);
void releaseScanLab(Signal* signal); void releaseScanLab(Signal* signal);
void ndbrestart1Lab(Signal* signal); void ndbrestart1Lab(Signal* signal);
...@@ -840,8 +891,6 @@ private: ...@@ -840,8 +891,6 @@ private:
Operationrec *operationrec; Operationrec *operationrec;
OperationrecPtr operationRecPtr; OperationrecPtr operationRecPtr;
OperationrecPtr idrOperationRecPtr; OperationrecPtr idrOperationRecPtr;
OperationrecPtr copyInOperPtr;
OperationrecPtr copyOperPtr;
OperationrecPtr mlpqOperPtr; OperationrecPtr mlpqOperPtr;
OperationrecPtr queOperPtr; OperationrecPtr queOperPtr;
OperationrecPtr readWriteOpPtr; OperationrecPtr readWriteOpPtr;
...@@ -885,8 +934,6 @@ private: ...@@ -885,8 +934,6 @@ private:
Page8Ptr lcnPageptr; Page8Ptr lcnPageptr;
Page8Ptr lcnCopyPageptr; Page8Ptr lcnCopyPageptr;
Page8Ptr lupPageptr; Page8Ptr lupPageptr;
Page8Ptr priPageptr;
Page8Ptr pwiPageptr;
Page8Ptr ciPageidptr; Page8Ptr ciPageidptr;
Page8Ptr gsePageidptr; Page8Ptr gsePageidptr;
Page8Ptr isoPageptr; Page8Ptr isoPageptr;
...@@ -926,8 +973,6 @@ private: ...@@ -926,8 +973,6 @@ private:
Tabrec *tabrec; Tabrec *tabrec;
TabrecPtr tabptr; TabrecPtr tabptr;
Uint32 ctablesize; Uint32 ctablesize;
Uint32 tpwiElementptr;
Uint32 tpriElementptr;
Uint32 tgseElementptr; Uint32 tgseElementptr;
Uint32 tgseContainerptr; Uint32 tgseContainerptr;
Uint32 trlHead; Uint32 trlHead;
...@@ -961,8 +1006,6 @@ private: ...@@ -961,8 +1006,6 @@ private:
Uint32 tdelForward; Uint32 tdelForward;
Uint32 tiopPageId; Uint32 tiopPageId;
Uint32 tipPageId; Uint32 tipPageId;
Uint32 tgeLocked;
Uint32 tgeResult;
Uint32 tgeContainerptr; Uint32 tgeContainerptr;
Uint32 tgeElementptr; Uint32 tgeElementptr;
Uint32 tgeForward; Uint32 tgeForward;
......
...@@ -130,8 +130,6 @@ Dbacc::Dbacc(Block_context& ctx): ...@@ -130,8 +130,6 @@ Dbacc::Dbacc(Block_context& ctx):
&fragrecptr, &fragrecptr,
&operationRecPtr, &operationRecPtr,
&idrOperationRecPtr, &idrOperationRecPtr,
&copyInOperPtr,
&copyOperPtr,
&mlpqOperPtr, &mlpqOperPtr,
&queOperPtr, &queOperPtr,
&readWriteOpPtr, &readWriteOpPtr,
...@@ -161,8 +159,6 @@ Dbacc::Dbacc(Block_context& ctx): ...@@ -161,8 +159,6 @@ Dbacc::Dbacc(Block_context& ctx):
&lcnPageptr, &lcnPageptr,
&lcnCopyPageptr, &lcnCopyPageptr,
&lupPageptr, &lupPageptr,
&priPageptr,
&pwiPageptr,
&ciPageidptr, &ciPageidptr,
&gsePageidptr, &gsePageidptr,
&isoPageptr, &isoPageptr,
......
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -33,8 +33,6 @@ ...@@ -33,8 +33,6 @@
// primary key is stored in TUP // primary key is stored in TUP
#include "../dbtup/Dbtup.hpp" #include "../dbtup/Dbtup.hpp"
#include "../dbacc/Dbacc.hpp"
#ifdef DBLQH_C #ifdef DBLQH_C
// Constants // Constants
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
...@@ -2556,8 +2554,19 @@ private: ...@@ -2556,8 +2554,19 @@ private:
Dbtup* c_tup; Dbtup* c_tup;
Dbacc* c_acc; Dbacc* c_acc;
/**
* Read primary key from tup
*/
Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst); Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst);
/**
* Read primary key from operation
*/
public:
Uint32 readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm);
private:
void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32); void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32);
void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32); void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32);
...@@ -2924,6 +2933,11 @@ public: ...@@ -2924,6 +2933,11 @@ public:
} }
DLHashTable<ScanRecord> c_scanTakeOverHash; DLHashTable<ScanRecord> c_scanTakeOverHash;
#ifdef ERROR_INSERT
inline bool TRACE_OP_CHECK(const TcConnectionrec* regTcPtr);
void TRACE_OP_DUMP(const TcConnectionrec* regTcPtr, const char * pos);
#endif
}; };
inline inline
...@@ -2991,10 +3005,19 @@ Dblqh::accminupdate(Signal* signal, Uint32 opId, const Local_key* key) ...@@ -2991,10 +3005,19 @@ Dblqh::accminupdate(Signal* signal, Uint32 opId, const Local_key* key)
signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx; signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx;
c_acc->execACCMINUPDATE(signal); c_acc->execACCMINUPDATE(signal);
if (ERROR_INSERTED(5712)) if (ERROR_INSERTED(5712) || ERROR_INSERTED(5713))
ndbout << " LK: " << *key; ndbout << " LK: " << *key;
regTcPtr.p->m_row_id = *key; regTcPtr.p->m_row_id = *key;
} }
inline
bool
Dblqh::TRACE_OP_CHECK(const TcConnectionrec* regTcPtr)
{
return (ERROR_INSERTED(5712) &&
(regTcPtr->operation == ZINSERT ||
regTcPtr->operation == ZDELETE)) ||
ERROR_INSERTED(5713);
}
#endif #endif
...@@ -67,48 +67,70 @@ ...@@ -67,48 +67,70 @@
// seen only when we debug the product // seen only when we debug the product
#ifdef VM_TRACE #ifdef VM_TRACE
#define DEBUG(x) ndbout << "DBLQH: "<< x << endl; #define DEBUG(x) ndbout << "DBLQH: "<< x << endl;
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){ operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){ operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){ operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){ operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){ operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){ operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){ operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut &
operator<<(NdbOut& out, Operation_t op)
{
switch(op){
case ZREAD: out << "READ"; break;
case ZREAD_EX: out << "READ-EX"; break;
case ZINSERT: out << "INSERT"; break;
case ZUPDATE: out << "UPDATE"; break;
case ZDELETE: out << "DELETE"; break;
case ZWRITE: out << "WRITE"; break;
}
return out;
}
#else #else
#define DEBUG(x) #define DEBUG(x)
#endif #endif
...@@ -120,7 +142,7 @@ const Uint32 NR_ScanNo = 0; ...@@ -120,7 +142,7 @@ const Uint32 NR_ScanNo = 0;
#if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR #if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR
#include <NdbConfig.h> #include <NdbConfig.h>
NdbOut * tracenrout = 0; static NdbOut * tracenrout = 0;
static int TRACENR_FLAG = 0; static int TRACENR_FLAG = 0;
#define TRACENR(x) (* tracenrout) << x #define TRACENR(x) (* tracenrout) << x
#define SET_TRACENR_FLAG TRACENR_FLAG = 1 #define SET_TRACENR_FLAG TRACENR_FLAG = 1
...@@ -132,6 +154,13 @@ static int TRACENR_FLAG = 0; ...@@ -132,6 +154,13 @@ static int TRACENR_FLAG = 0;
#define CLEAR_TRACENR_FLAG #define CLEAR_TRACENR_FLAG
#endif #endif
#ifdef ERROR_INSERT
static NdbOut * traceopout = 0;
#define TRACE_OP(regTcPtr, place) do { if (TRACE_OP_CHECK(regTcPtr)) TRACE_OP_DUMP(regTcPtr, place); } while(0)
#else
#define TRACE_OP(x) {}
#endif
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/* ------- SEND SYSTEM ERROR ------- */ /* ------- SEND SYSTEM ERROR ------- */
/* */ /* */
...@@ -455,6 +484,10 @@ void Dblqh::execSTTOR(Signal* signal) ...@@ -455,6 +484,10 @@ void Dblqh::execSTTOR(Signal* signal)
tracenrout = new NdbOut(* new FileOutputStream(fopen(name, "w+"))); tracenrout = new NdbOut(* new FileOutputStream(fopen(name, "w+")));
#endif #endif
#ifdef ERROR_INSERT
traceopout = &ndbout;
#endif
return; return;
break; break;
case 4: case 4:
...@@ -2531,14 +2564,15 @@ void Dblqh::execTUPKEYCONF(Signal* signal) ...@@ -2531,14 +2564,15 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
case TcConnectionrec::WAIT_ACC_ABORT: case TcConnectionrec::WAIT_ACC_ABORT:
case TcConnectionrec::ABORT_QUEUED: case TcConnectionrec::ABORT_QUEUED:
jam(); jam();
/* -------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */ /* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
/* -------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
break; break;
default: default:
ndbrequire(false); ndbrequire(false);
break; break;
}//switch }//switch
}//Dblqh::execTUPKEYCONF() }//Dblqh::execTUPKEYCONF()
/* ************> */ /* ************> */
...@@ -2560,6 +2594,8 @@ void Dblqh::execTUPKEYREF(Signal* signal) ...@@ -2560,6 +2594,8 @@ void Dblqh::execTUPKEYREF(Signal* signal)
c_fragment_pool.getPtr(regFragptr); c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr; fragptr = regFragptr;
TRACE_OP(regTcPtr, "TUPKEYREF");
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{ {
jam(); jam();
...@@ -2568,6 +2604,11 @@ void Dblqh::execTUPKEYREF(Signal* signal) ...@@ -2568,6 +2604,11 @@ void Dblqh::execTUPKEYREF(Signal* signal)
ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP || ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP ||
regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT); regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT);
} }
else if (getNodeState().startLevel == NodeState::SL_STARTED)
{
if (terrorCode == 899)
ndbout << "899: " << regTcPtr->m_row_id << endl;
}
switch (tcConnectptr.p->transactionState) { switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP: case TcConnectionrec::WAIT_TUP:
...@@ -3606,57 +3647,7 @@ void Dblqh::endgettupkeyLab(Signal* signal) ...@@ -3606,57 +3647,7 @@ void Dblqh::endgettupkeyLab(Signal* signal)
regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR; regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR;
return; return;
}//if }//if
//#define TRACE_LQHKEYREQ
#ifdef TRACE_LQHKEYREQ
{
ndbout << (regTcPtr->operation == ZREAD ? "READ" :
regTcPtr->operation == ZUPDATE ? "UPDATE" :
regTcPtr->operation == ZINSERT ? "INSERT" :
regTcPtr->operation == ZDELETE ? "DELETE" : "<Other>")
<< "(" << (int)regTcPtr->operation << ")"
<< " from=(" << getBlockName(refToBlock(regTcPtr->clientBlockref))
<< ", " << refToNode(regTcPtr->clientBlockref) << ")"
<< " table=" << regTcPtr->tableref << " ";
ndbout << "hash: " << hex << regTcPtr->hashValue << endl;
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "]" << endl;
ndbout << "attr=[" << hex;
for(i = 0; i<regTcPtr->reclenAiLqhkey && i < 5; i++)
ndbout << hex << regTcPtr->firstAttrinfo[i] << " ";
AttrbufPtr regAttrinbufptr;
regAttrinbufptr.i= regTcPtr->firstAttrinbuf;
while(i < regTcPtr->totReclenAi)
{
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
ndbrequire(dataLen != 0);
ndbrequire(i + dataLen <= regTcPtr->totReclenAi);
for(Uint32 j= 0; j<dataLen; j++, i++)
ndbout << hex << regAttrinbufptr.p->attrbuf[j] << " ";
regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
}
ndbout << "]" << endl;
}
#endif
/* ---------------------------------------------------------------------- */ /* ---------------------------------------------------------------------- */
/* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/ /* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/
/* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */ /* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */
...@@ -3763,6 +3754,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) ...@@ -3763,6 +3754,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
/* ----------------------------------------------------------------- */ /* ----------------------------------------------------------------- */
if (TRACENR_FLAG) if (TRACENR_FLAG)
{ {
TRACE_OP(regTcPtr, "RECEIVED");
switch (regTcPtr->operation) { switch (regTcPtr->operation) {
case ZREAD: TRACENR("READ"); break; case ZREAD: TRACENR("READ"); break;
case ZUPDATE: TRACENR("UPDATE"); break; case ZUPDATE: TRACENR("UPDATE"); break;
...@@ -3847,6 +3839,9 @@ Dblqh::exec_acckeyreq(Signal* signal, TcConnectionrecPtr regTcPtr) ...@@ -3847,6 +3839,9 @@ Dblqh::exec_acckeyreq(Signal* signal, TcConnectionrecPtr regTcPtr)
signal->theData[8] = sig2; signal->theData[8] = sig2;
signal->theData[9] = sig3; signal->theData[9] = sig3;
signal->theData[10] = sig4; signal->theData[10] = sig4;
TRACE_OP(regTcPtr.p, "ACC");
if (regTcPtr.p->primKeyLen > 4) { if (regTcPtr.p->primKeyLen > 4) {
sendKeyinfoAcc(signal, 11); sendKeyinfoAcc(signal, 11);
}//if }//if
...@@ -4133,7 +4128,7 @@ Dblqh::nr_copy_delete_row(Signal* signal, ...@@ -4133,7 +4128,7 @@ Dblqh::nr_copy_delete_row(Signal* signal,
jam(); jam();
ndbrequire(rowid == 0); ndbrequire(rowid == 0);
signal->theData[0] = accPtr; signal->theData[0] = accPtr;
signal->theData[1] = false; signal->theData[1] = 0;
EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2); EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2);
jamEntry(); jamEntry();
return; return;
...@@ -4144,7 +4139,9 @@ Dblqh::nr_copy_delete_row(Signal* signal, ...@@ -4144,7 +4139,9 @@ Dblqh::nr_copy_delete_row(Signal* signal,
*/ */
ndbrequire(regTcPtr.p->m_dealloc == 0); ndbrequire(regTcPtr.p->m_dealloc == 0);
Local_key save = regTcPtr.p->m_row_id; Local_key save = regTcPtr.p->m_row_id;
signal->theData[0] = regTcPtr.p->accConnectrec;
c_acc->execACCKEY_ORD(signal, accPtr);
signal->theData[0] = accPtr;
EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1); EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1);
jamEntry(); jamEntry();
...@@ -4274,6 +4271,45 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op) ...@@ -4274,6 +4271,45 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op)
} }
} }
Uint32
Dblqh::readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm)
{
TcConnectionrecPtr regTcPtr;
DatabufPtr regDatabufptr;
Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS >> 1];
jamEntry();
regTcPtr.i = opPtrI;
ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
Uint32 tableId = regTcPtr.p->tableref;
Uint32 keyLen = regTcPtr.p->primKeyLen;
regDatabufptr.i = regTcPtr.p->firstTupkeybuf;
Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst;
memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData));
if (keyLen > 4)
{
tmp += 4;
Uint32 pos = 4;
do {
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data));
regDatabufptr.i = regDatabufptr.p->nextDatabuf;
tmp += sizeof(regDatabufptr.p->data) >> 2;
pos += sizeof(regDatabufptr.p->data) >> 2;
} while(pos < keyLen);
}
if (xfrm)
{
jam();
Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
return xfrm_key(tableId, (Uint32*)Tmp, dst, ~0, keyPartLen);
}
return keyLen;
}
/* =*======================================================================= */ /* =*======================================================================= */
/* ======= SEND KEYINFO TO ACC ======= */ /* ======= SEND KEYINFO TO ACC ======= */
...@@ -4447,10 +4483,6 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr, ...@@ -4447,10 +4483,6 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
* ----------------------------------------------------------------------- */ * ----------------------------------------------------------------------- */
Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE; Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE;
Uint32 page_no = local_key >> MAX_TUPLES_BITS; Uint32 page_no = local_key >> MAX_TUPLES_BITS;
#ifdef TRACE_LQHKEYREQ
ndbout << "localkey: [ " << hex << page_no << " " << page_idx << "]"
<< endl;
#endif
Uint32 Ttupreq = regTcPtr->dirtyOp; Uint32 Ttupreq = regTcPtr->dirtyOp;
Ttupreq = Ttupreq + (regTcPtr->opSimple << 1); Ttupreq = Ttupreq + (regTcPtr->opSimple << 1);
Ttupreq = Ttupreq + (op << 6); Ttupreq = Ttupreq + (op << 6);
...@@ -4506,70 +4538,13 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr, ...@@ -4506,70 +4538,13 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
tupKeyReq->m_row_id_page_no = sig0; tupKeyReq->m_row_id_page_no = sig0;
tupKeyReq->m_row_id_page_idx = sig1; tupKeyReq->m_row_id_page_idx = sig1;
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT) TRACE_OP(regTcPtr, "TUPKEYREQ");
{
ndbout << "INSERT " << regFragptrP->tabRef
<< "(" << regFragptrP->fragId << ")";
{
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "] ";
}
if(regTcPtr->m_use_rowid)
ndbout << " " << regTcPtr->m_row_id;
}
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZDELETE)
{
Local_key lk; lk.assref(local_key);
ndbout << "DELETE " << regFragptrP->tabRef
<< "(" << regFragptrP->fragId << ") " << lk;
{
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "]" << endl;
}
}
regTcPtr->m_use_rowid |= (op == ZINSERT); regTcPtr->m_use_rowid |= (op == ZINSERT);
regTcPtr->m_row_id.m_page_no = page_no; regTcPtr->m_row_id.m_page_no = page_no;
regTcPtr->m_row_id.m_page_idx = page_idx; regTcPtr->m_row_id.m_page_idx = page_idx;
EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength); EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
{
ndbout << endl;
}
}//Dblqh::execACCKEYCONF() }//Dblqh::execACCKEYCONF()
void void
...@@ -4654,27 +4629,37 @@ void Dblqh::tupkeyConfLab(Signal* signal) ...@@ -4654,27 +4629,37 @@ void Dblqh::tupkeyConfLab(Signal* signal)
const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0]; const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat; Uint32 activeCreat = regTcPtr->activeCreat;
Uint32 readLen = tupKeyConf->readLength;
Uint32 writeLen = tupKeyConf->writeLength;
Uint32 accOp = regTcPtr->accConnectrec;
c_acc->execACCKEY_ORD(signal, accOp);
TRACE_OP(regTcPtr, "TUPKEYCONF");
if (regTcPtr->simpleRead) { if (regTcPtr->simpleRead) {
jam(); jam();
/* ---------------------------------------------------------------------- /* ----------------------------------------------------------------------
* THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE OPERATION. * THE OPERATION IS A SIMPLE READ.
* SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK (FOR LOCAL CHECKPOINTS) YET * WE WILL IMMEDIATELY COMMIT THE OPERATION.
* SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK
* (FOR LOCAL CHECKPOINTS) YET
* WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED. * WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED.
* WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN READ LENGTH * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN
* ---------------------------------------------------------------------- */ * READ LENGTH
* --------------------------------------------------------------------- */
commitContinueAfterBlockedLab(signal); commitContinueAfterBlockedLab(signal);
return; return;
}//if }//if
if (tupKeyConf->readLength != 0) { if (readLen != 0)
{
jam(); jam();
/* SET BIT 15 IN REQINFO */ /* SET BIT 15 IN REQINFO */
LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1); LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1);
regTcPtr->readlenAi = readLen;
regTcPtr->readlenAi = tupKeyConf->readLength;
}//if }//if
regTcPtr->totSendlenAi = tupKeyConf->writeLength; regTcPtr->totSendlenAi = writeLen;
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen); ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
...@@ -5598,6 +5583,8 @@ void Dblqh::releaseOprec(Signal* signal) ...@@ -5598,6 +5583,8 @@ void Dblqh::releaseOprec(Signal* signal)
if (TRACENR_FLAG) if (TRACENR_FLAG)
TRACENR("DELETED: " << regTcPtr->m_row_id << endl); TRACENR("DELETED: " << regTcPtr->m_row_id << endl);
TRACE_OP(regTcPtr, "DEALLOC");
signal->theData[0] = regTcPtr->fragmentid; signal->theData[0] = regTcPtr->fragmentid;
signal->theData[1] = regTcPtr->tableref; signal->theData[1] = regTcPtr->tableref;
signal->theData[2] = regTcPtr->m_row_id.m_page_no; signal->theData[2] = regTcPtr->m_row_id.m_page_no;
...@@ -5818,6 +5805,10 @@ void Dblqh::execCOMMIT(Signal* signal) ...@@ -5818,6 +5805,10 @@ void Dblqh::execCOMMIT(Signal* signal)
ptrAss(tcConnectptr, regTcConnectionrec); ptrAss(tcConnectptr, regTcConnectionrec);
if ((tcConnectptr.p->transid[0] == transid1) && if ((tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) { (tcConnectptr.p->transid[1] == transid2)) {
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TRACE_OP(regTcPtr, "COMMIT");
commitReqLab(signal, gci); commitReqLab(signal, gci);
return; return;
}//if }//if
...@@ -5937,6 +5928,10 @@ void Dblqh::execCOMPLETE(Signal* signal) ...@@ -5937,6 +5928,10 @@ void Dblqh::execCOMPLETE(Signal* signal)
if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) && if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) &&
(tcConnectptr.p->transid[0] == transid1) && (tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) { (tcConnectptr.p->transid[1] == transid2)) {
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TRACE_OP(regTcPtr, "COMPLETE");
if (tcConnectptr.p->seqNoReplica != 0 && if (tcConnectptr.p->seqNoReplica != 0 &&
tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) { tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) {
jam(); jam();
...@@ -6313,12 +6308,16 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal) ...@@ -6313,12 +6308,16 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
TRACENR(endl); TRACENR(endl);
} }
TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref); Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec; signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
} else { } else {
if(!dirtyOp){ if(!dirtyOp){
TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref); Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec; signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
...@@ -6362,6 +6361,8 @@ Dblqh::tupcommit_conf_callback(Signal* signal, Uint32 tcPtrI) ...@@ -6362,6 +6361,8 @@ Dblqh::tupcommit_conf_callback(Signal* signal, Uint32 tcPtrI)
c_fragment_pool.getPtr(regFragptr); c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr; fragptr = regFragptr;
TRACE_OP(tcPtr, "ACC_COMMITREQ");
Uint32 acc = refToBlock(tcPtr->tcAccBlockref); Uint32 acc = refToBlock(tcPtr->tcAccBlockref);
signal->theData[0] = tcPtr->accConnectrec; signal->theData[0] = tcPtr->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
...@@ -6670,6 +6671,8 @@ void Dblqh::execABORT(Signal* signal) ...@@ -6670,6 +6671,8 @@ void Dblqh::execABORT(Signal* signal)
regTcPtr->commitAckMarker = RNIL; regTcPtr->commitAckMarker = RNIL;
} }
TRACE_OP(regTcPtr, "ABORT");
abortStateHandlerLab(signal); abortStateHandlerLab(signal);
return; return;
...@@ -7087,23 +7090,30 @@ void Dblqh::abortContinueAfterBlockedLab(Signal* signal, bool canBlock) ...@@ -7087,23 +7090,30 @@ void Dblqh::abortContinueAfterBlockedLab(Signal* signal, bool canBlock)
* ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING. * ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING.
* WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN * WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN
* AND SEES A STATE IN TUP. * AND SEES A STATE IN TUP.
* ------------------------------------------------------------------------ */ * ----------------------------------------------------------------------- */
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
fragptr.i = regTcPtr->fragmentptr;
c_fragment_pool.getPtr(fragptr); TRACE_OP(regTcPtr, "ACC ABORT");
signal->theData[0] = regTcPtr->tupConnectrec;
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT; regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
signal->theData[0] = regTcPtr->accConnectrec; signal->theData[0] = regTcPtr->accConnectrec;
signal->theData[1] = true; signal->theData[1] = 2; // JOB BUFFER IF NEEDED
EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2); EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2);
if (signal->theData[1] == RNIL)
{
jam();
/* ------------------------------------------------------------------------ /* ------------------------------------------------------------------------
* We need to insert a real-time break by sending ACC_ABORTCONF through the * We need to insert a real-time break by sending ACC_ABORTCONF through the
* job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
* TUPKEYREF that are in the job buffer but not yet processed. Doing * TUPKEYREF that are in the job buffer but not yet processed. Doing
* everything without that would race and create a state error when they * everything without that would race and create a state error when they
* are executed. * are executed.
* ----------------------------------------------------------------------- */ * --------------------------------------------------------------------- */
return;
}
execACC_ABORTCONF(signal);
return; return;
}//Dblqh::abortContinueAfterBlockedLab() }//Dblqh::abortContinueAfterBlockedLab()
...@@ -7117,6 +7127,11 @@ void Dblqh::execACC_ABORTCONF(Signal* signal) ...@@ -7117,6 +7127,11 @@ void Dblqh::execACC_ABORTCONF(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT); ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
TRACE_OP(regTcPtr, "ACC_ABORTCONF");
signal->theData[0] = regTcPtr->tupConnectrec;
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
continueAbortLab(signal); continueAbortLab(signal);
return; return;
}//Dblqh::execACC_ABORTCONF() }//Dblqh::execACC_ABORTCONF()
...@@ -8837,7 +8852,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal) ...@@ -8837,7 +8852,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
}//if }//if
// If accOperationPtr == RNIL no record was returned by ACC // If accOperationPtr == RNIL no record was returned by ACC
if (nextScanConf->accOperationPtr == RNIL) { Uint32 accOpPtr = nextScanConf->accOperationPtr;
if (accOpPtr == RNIL)
{
jam(); jam();
/************************************************************* /*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
...@@ -8871,7 +8888,8 @@ void Dblqh::nextScanConfScanLab(Signal* signal) ...@@ -8871,7 +8888,8 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
jam(); jam();
set_acc_ptr_in_scan_record(scanptr.p, set_acc_ptr_in_scan_record(scanptr.p,
scanptr.p->m_curr_batch_size_rows, scanptr.p->m_curr_batch_size_rows,
nextScanConf->accOperationPtr); accOpPtr);
jam(); jam();
nextScanConfLoopLab(signal); nextScanConfLoopLab(signal);
}//Dblqh::nextScanConfScanLab() }//Dblqh::nextScanConfScanLab()
...@@ -9071,12 +9089,24 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) ...@@ -9071,12 +9089,24 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED; tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec; scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
Uint32 rows = scanptr.p->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
if (accOpPtr != (Uint32)-1)
{
c_acc->execACCKEY_ORD(signal, accOpPtr);
}
else
{
ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
}
if (scanptr.p->scanCompletedStatus == ZTRUE) { if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* --------------------------------------------------------------------- /* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */ * --------------------------------------------------------------------- */
if ((scanptr.p->scanLockHold == ZTRUE) && if ((scanptr.p->scanLockHold == ZTRUE) && rows)
(scanptr.p->m_curr_batch_size_rows > 0)) { {
jam(); jam();
scanptr.p->scanReleaseCounter = 1; scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal); scanReleaseLocksLab(signal);
...@@ -9093,7 +9123,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) ...@@ -9093,7 +9123,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
}//if }//if
ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN); ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->m_curr_batch_size_bytes+= tdata4; scanptr.p->m_curr_batch_size_bytes+= tdata4;
scanptr.p->m_curr_batch_size_rows++; scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->m_last_row = tdata5; scanptr.p->m_last_row = tdata5;
if (scanptr.p->check_scan_batch_completed() | tdata5){ if (scanptr.p->check_scan_batch_completed() | tdata5){
if (scanptr.p->scanLockHold == ZTRUE) { if (scanptr.p->scanLockHold == ZTRUE) {
...@@ -9103,7 +9133,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) ...@@ -9103,7 +9133,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
return; return;
} else { } else {
jam(); jam();
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows; scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal); scanReleaseLocksLab(signal);
return; return;
} }
...@@ -9187,12 +9217,24 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) ...@@ -9187,12 +9217,24 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED; tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec; scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
Uint32 rows = scanptr.p->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
if (accOpPtr != (Uint32)-1)
{
c_acc->execACCKEY_ORD(signal, accOpPtr);
}
else
{
ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
}
if (scanptr.p->scanCompletedStatus == ZTRUE) { if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* --------------------------------------------------------------------- /* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */ * --------------------------------------------------------------------- */
if ((scanptr.p->scanLockHold == ZTRUE) && if ((scanptr.p->scanLockHold == ZTRUE) && rows)
(scanptr.p->m_curr_batch_size_rows > 0)) { {
jam(); jam();
scanptr.p->scanReleaseCounter = 1; scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal); scanReleaseLocksLab(signal);
...@@ -9213,8 +9255,8 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) ...@@ -9213,8 +9255,8 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
scanptr.p->scanReleaseCounter = 1; scanptr.p->scanReleaseCounter = 1;
} else { } else {
jam(); jam();
scanptr.p->m_curr_batch_size_rows++; scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows; scanptr.p->scanReleaseCounter = rows + 1;
}//if }//if
/* -------------------------------------------------------------------- /* --------------------------------------------------------------------
* WE NEED TO RELEASE ALL LOCKS CURRENTLY * WE NEED TO RELEASE ALL LOCKS CURRENTLY
...@@ -9224,7 +9266,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) ...@@ -9224,7 +9266,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
return; return;
}//if }//if
Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount; Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount;
if (scanptr.p->m_curr_batch_size_rows > 0) { if (rows) {
if (time_passed > 1) { if (time_passed > 1) {
/* ----------------------------------------------------------------------- /* -----------------------------------------------------------------------
* WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A * WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A
...@@ -9232,7 +9274,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) ...@@ -9232,7 +9274,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
* THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we * THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we
* send the found tuples to the API. * send the found tuples to the API.
* ----------------------------------------------------------------------- */ * ----------------------------------------------------------------------- */
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows + 1; scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal); scanReleaseLocksLab(signal);
return; return;
} }
...@@ -9872,6 +9914,8 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal) ...@@ -9872,6 +9914,8 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
fragptr.p->m_scanNumberMask.clear(NR_ScanNo); fragptr.p->m_scanNumberMask.clear(NR_ScanNo);
scanptr.p->scanBlockref = DBTUP_REF; scanptr.p->scanBlockref = DBTUP_REF;
scanptr.p->scanLockHold = ZFALSE; scanptr.p->scanLockHold = ZFALSE;
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
initScanTc(0, initScanTc(0,
0, 0,
...@@ -10197,6 +10241,12 @@ void Dblqh::copyTupkeyConfLab(Signal* signal) ...@@ -10197,6 +10241,12 @@ void Dblqh::copyTupkeyConfLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec; scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
ScanRecord* scanP = scanptr.p; ScanRecord* scanP = scanptr.p;
Uint32 rows = scanP->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, rows, false);
ndbassert(accOpPtr != (Uint32)-1);
c_acc->execACCKEY_ORD(signal, accOpPtr);
if (tcConnectptr.p->errorCode != 0) { if (tcConnectptr.p->errorCode != 0) {
jam(); jam();
closeCopyLab(signal); closeCopyLab(signal);
...@@ -18539,6 +18589,21 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) ...@@ -18539,6 +18589,21 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
ndbrequire(arg != 2308); ndbrequire(arg != 2308);
} }
#ifdef ERROR_INSERT
if (arg == 5712 || arg == 5713)
{
if (arg == 5712)
{
traceopout = &ndbout;
}
else if (arg == 5713)
{
traceopout = tracenrout;
}
SET_ERROR_INSERT_VALUE(arg);
}
#endif
}//Dblqh::execDUMP_STATE_ORD() }//Dblqh::execDUMP_STATE_ORD()
void Dblqh::execSET_VAR_REQ(Signal* signal) void Dblqh::execSET_VAR_REQ(Signal* signal)
...@@ -18702,3 +18767,39 @@ void Dblqh::writeDbgInfoPageHeader(LogPageRecordPtr logP, Uint32 place, ...@@ -18702,3 +18767,39 @@ void Dblqh::writeDbgInfoPageHeader(LogPageRecordPtr logP, Uint32 place,
logP.p->logPageWord[ZPOS_IN_WRITING]= 1; logP.p->logPageWord[ZPOS_IN_WRITING]= 1;
} }
#if defined ERROR_INSERT
void
Dblqh::TRACE_OP_DUMP(const Dblqh::TcConnectionrec* regTcPtr, const char * pos)
{
(* traceopout)
<< "[ " << hex << regTcPtr->transid[0]
<< " " << hex << regTcPtr->transid[1] << " ] " << dec
<< pos
<< " " << (Operation_t)regTcPtr->operation
<< " " << regTcPtr->tableref
<< "(" << regTcPtr->fragmentid << ")"
<< "(" << (regTcPtr->seqNoReplica == 0 ? "P" : "B") << ")" ;
{
(* traceopout) << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
(* traceopout) << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
(* traceopout) << hex << regDatabufptr.p->data[j] << " ";
}
(* traceopout) << "] ";
}
if (regTcPtr->m_use_rowid)
(* traceopout) << " " << regTcPtr->m_row_id;
(* traceopout) << endl;
}
#endif
...@@ -297,7 +297,6 @@ enum TransState { ...@@ -297,7 +297,6 @@ enum TransState {
}; };
enum TupleState { enum TupleState {
TUPLE_INITIAL_INSERT = 0,
TUPLE_PREPARED = 1, TUPLE_PREPARED = 1,
TUPLE_ALREADY_ABORTED = 2, TUPLE_ALREADY_ABORTED = 2,
TUPLE_TO_BE_COMMITTED = 3 TUPLE_TO_BE_COMMITTED = 3
...@@ -305,7 +304,6 @@ enum TupleState { ...@@ -305,7 +304,6 @@ enum TupleState {
enum State { enum State {
NOT_INITIALIZED = 0, NOT_INITIALIZED = 0,
COMMON_AREA_PAGES = 1,
IDLE = 17, IDLE = 17,
ACTIVE = 18, ACTIVE = 18,
SYSTEM_RESTART = 19, SYSTEM_RESTART = 19,
...@@ -1441,7 +1439,6 @@ private: ...@@ -1441,7 +1439,6 @@ private:
void execSET_VAR_REQ(Signal* signal); void execSET_VAR_REQ(Signal* signal);
void execDROP_TAB_REQ(Signal* signal); void execDROP_TAB_REQ(Signal* signal);
void execALTER_TAB_REQ(Signal* signal); void execALTER_TAB_REQ(Signal* signal);
void execTUP_ALLOCREQ(Signal* signal);
void execTUP_DEALLOCREQ(Signal* signal); void execTUP_DEALLOCREQ(Signal* signal);
void execTUP_WRITELOG_REQ(Signal* signal); void execTUP_WRITELOG_REQ(Signal* signal);
......
...@@ -202,29 +202,6 @@ Dbtup::receive_attrinfo(Signal* signal, Uint32 op, ...@@ -202,29 +202,6 @@ Dbtup::receive_attrinfo(Signal* signal, Uint32 op,
} }
} }
void Dbtup::execTUP_ALLOCREQ(Signal* signal)
{
OperationrecPtr regOperPtr;
jamEntry();
regOperPtr.i= signal->theData[0];
c_operation_pool.getPtr(regOperPtr);
regOperPtr.p->op_struct.tuple_state= TUPLE_INITIAL_INSERT;
//ndbout_c("execTUP_ALLOCREQ");
signal->theData[0]= 0;
signal->theData[1]= ~0 >> MAX_TUPLES_BITS;
signal->theData[2]= (1 << MAX_TUPLES_BITS) - 1;
return;
mem_error:
jam();
signal->theData[0]= ZMEM_NOMEM_ERROR;
return;
}
void void
Dbtup::setChecksum(Tuple_header* tuple_ptr, Dbtup::setChecksum(Tuple_header* tuple_ptr,
Tablerec* regTabPtr) Tablerec* regTabPtr)
...@@ -455,7 +432,7 @@ Dbtup::load_diskpage(Signal* signal, ...@@ -455,7 +432,7 @@ Dbtup::load_diskpage(Signal* signal,
ptrCheckGuard(tabptr, cnoOfTablerec, tablerec); ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
Tablerec* regTabPtr = tabptr.p; Tablerec* regTabPtr = tabptr.p;
if(regOperPtr->op_struct.tuple_state == TUPLE_INITIAL_INSERT) if(local_key == ~(Uint32)0)
{ {
jam(); jam();
regOperPtr->op_struct.m_wait_log_buffer= 1; regOperPtr->op_struct.m_wait_log_buffer= 1;
...@@ -663,7 +640,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal) ...@@ -663,7 +640,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
regOperPtr->savepointId= sig1; regOperPtr->savepointId= sig1;
regOperPtr->op_struct.primary_replica= sig2; regOperPtr->op_struct.primary_replica= sig2;
regOperPtr->m_tuple_location.m_page_idx= sig3; Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
sig1= tupKeyReq->opRef; sig1= tupKeyReq->opRef;
sig2= tupKeyReq->tcOpIndex; sig2= tupKeyReq->tcOpIndex;
...@@ -673,7 +650,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal) ...@@ -673,7 +650,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
req_struct.tc_operation_ptr= sig1; req_struct.tc_operation_ptr= sig1;
req_struct.TC_index= sig2; req_struct.TC_index= sig2;
req_struct.TC_ref= sig3; req_struct.TC_ref= sig3;
req_struct.frag_page_id= sig4; Uint32 pageid = req_struct.frag_page_id= sig4;
req_struct.m_use_rowid = (TrequestInfo >> 11) & 1; req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
sig1= tupKeyReq->attrBufLen; sig1= tupKeyReq->attrBufLen;
...@@ -706,7 +683,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal) ...@@ -706,7 +683,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
copyAttrinfo(regOperPtr, &cinBuffer[0]); copyAttrinfo(regOperPtr, &cinBuffer[0]);
if(Roptype == ZINSERT && get_tuple_state(regOperPtr)== TUPLE_INITIAL_INSERT) Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
if(Roptype == ZINSERT && localkey == ~0)
{ {
// No tuple allocatated yet // No tuple allocatated yet
goto do_insert; goto do_insert;
...@@ -1159,49 +1137,6 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct, ...@@ -1159,49 +1137,6 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0; disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
} }
void
Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
Tablerec* regTabPtr)
{
regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
if(cnt1)
{
// Disk part is 32-bit aligned
char *varptr = req_struct->m_var_data[MM].m_data_ptr;
ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
}
else
{
ptr -= Tuple_header::HeaderSize;
}
req_struct->m_disk_ptr= (Tuple_header*)ptr;
if(cnt2)
{
KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
dst->m_var_len_offset= cnt2;
dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
}
// Set all null bits
memset(req_struct->m_disk_ptr->m_null_bits+
regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
4*regTabPtr->m_offsets[DD].m_null_words);
req_struct->m_tuple_ptr->m_header_bits =
(Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
}
int Dbtup::handleInsertReq(Signal* signal, int Dbtup::handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr, Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord> fragPtr, Ptr<Fragrecord> fragPtr,
...@@ -1215,8 +1150,8 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1215,8 +1150,8 @@ int Dbtup::handleInsertReq(Signal* signal,
Tuple_header *tuple_ptr; Tuple_header *tuple_ptr;
bool disk = regTabPtr->m_no_of_disk_attributes > 0; bool disk = regTabPtr->m_no_of_disk_attributes > 0;
bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT; bool mem_insert = regOperPtr.p->is_first_operation();
bool disk_insert = regOperPtr.p->is_first_operation() && disk; bool disk_insert = mem_insert && disk;
bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize; bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
bool rowid = req_struct->m_use_rowid; bool rowid = req_struct->m_use_rowid;
Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no; Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
...@@ -1244,12 +1179,9 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1244,12 +1179,9 @@ int Dbtup::handleInsertReq(Signal* signal,
if(mem_insert) if(mem_insert)
{ {
jam(); jam();
ndbassert(regOperPtr.p->is_first_operation()); // disk insert
prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr); prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
} }
else else
{
if (!regOperPtr.p->is_first_operation())
{ {
Operationrec* prevOp= req_struct->prevOpPtr.p; Operationrec* prevOp= req_struct->prevOpPtr.p;
ndbassert(prevOp->op_struct.op_type == ZDELETE); ndbassert(prevOp->op_struct.op_type == ZDELETE);
...@@ -1257,8 +1189,6 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1257,8 +1189,6 @@ int Dbtup::handleInsertReq(Signal* signal,
if(!prevOp->is_first_operation()) if(!prevOp->is_first_operation())
org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location); org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
}
if (regTabPtr->need_expand()) if (regTabPtr->need_expand())
expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert); expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
else else
...@@ -1268,11 +1198,6 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1268,11 +1198,6 @@ int Dbtup::handleInsertReq(Signal* signal,
if (disk_insert) if (disk_insert)
{ {
int res; int res;
if (unlikely(!mem_insert))
{
sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
}
if (ERROR_INSERTED(4015)) if (ERROR_INSERTED(4015))
{ {
...@@ -1381,6 +1306,7 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1381,6 +1306,7 @@ int Dbtup::handleInsertReq(Signal* signal,
} }
if (unlikely(ptr == 0)) if (unlikely(ptr == 0))
{ {
jam();
goto alloc_rowid_error; goto alloc_rowid_error;
} }
} }
...@@ -1396,7 +1322,7 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1396,7 +1322,7 @@ int Dbtup::handleInsertReq(Signal* signal,
(varsize ? Tuple_header::CHAINED_ROW : 0); (varsize ? Tuple_header::CHAINED_ROW : 0);
regOperPtr.p->m_tuple_location.m_page_no = real_page_id; regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
} }
else if(!rowid || !regOperPtr.p->is_first_operation()) else
{ {
int ret; int ret;
if (ERROR_INSERTED(4020)) if (ERROR_INSERTED(4020))
...@@ -1417,20 +1343,6 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1417,20 +1343,6 @@ int Dbtup::handleInsertReq(Signal* signal,
req_struct->m_use_rowid = false; req_struct->m_use_rowid = false;
base->m_header_bits &= ~(Uint32)Tuple_header::FREE; base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
} }
else
{
if ((req_struct->m_row_id.m_page_no == frag_page_id &&
req_struct->m_row_id.m_page_idx == regOperPtr.p->m_tuple_location.m_page_idx))
{
ndbout_c("no mem insert but rowid (same)");
base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
}
else
{
// no mem insert, but rowid
ndbrequire(false);
}
}
base->m_header_bits |= Tuple_header::ALLOC & base->m_header_bits |= Tuple_header::ALLOC &
(regOperPtr.p->is_first_operation() ? ~0 : 1); (regOperPtr.p->is_first_operation() ? ~0 : 1);
......
...@@ -89,7 +89,6 @@ Dbtup::Dbtup(Block_context& ctx, Pgman* pgman) ...@@ -89,7 +89,6 @@ Dbtup::Dbtup(Block_context& ctx, Pgman* pgman)
addRecSignal(GSN_DROP_TAB_REQ, &Dbtup::execDROP_TAB_REQ); addRecSignal(GSN_DROP_TAB_REQ, &Dbtup::execDROP_TAB_REQ);
addRecSignal(GSN_TUP_ALLOCREQ, &Dbtup::execTUP_ALLOCREQ);
addRecSignal(GSN_TUP_DEALLOCREQ, &Dbtup::execTUP_DEALLOCREQ); addRecSignal(GSN_TUP_DEALLOCREQ, &Dbtup::execTUP_DEALLOCREQ);
addRecSignal(GSN_TUP_WRITELOG_REQ, &Dbtup::execTUP_WRITELOG_REQ); addRecSignal(GSN_TUP_WRITELOG_REQ, &Dbtup::execTUP_WRITELOG_REQ);
......
...@@ -665,7 +665,7 @@ main(int argc, const char** argv){ ...@@ -665,7 +665,7 @@ main(int argc, const char** argv){
for(Uint32 i = 0; i < 12; i++) for(Uint32 i = 0; i < 12; i++)
{ {
if(i == 6 || i == 8 || i == 10) if(false && (i == 6 || i == 8 || i == 10))
continue; continue;
BaseString name("bug_9749"); BaseString name("bug_9749");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment