ndb - bug#24655

  Handle events "differently" so that dict doesnt get lock too often
parent fc09c8a6
...@@ -3618,7 +3618,11 @@ void Dbdict::execNODE_FAILREP(Signal* signal) ...@@ -3618,7 +3618,11 @@ void Dbdict::execNODE_FAILREP(Signal* signal)
case BS_IDLE: case BS_IDLE:
jam(); jam();
ok = true; ok = true;
if(c_opRecordPool.getSize() != c_opRecordPool.getNoOfFree()){ if(c_opRecordPool.getSize() !=
(c_opRecordPool.getNoOfFree() +
c_opSubEvent.get_count() + c_opCreateEvent.get_count() +
c_opDropEvent.get_count() + c_opSignalUtil.get_count()))
{
jam(); jam();
c_blockState = BS_NODE_FAILURE; c_blockState = BS_NODE_FAILURE;
} }
...@@ -9840,6 +9844,8 @@ Dbdict::createEvent_RT_DICT_AFTER_GET(Signal* signal, OpCreateEventPtr evntRecPt ...@@ -9840,6 +9844,8 @@ Dbdict::createEvent_RT_DICT_AFTER_GET(Signal* signal, OpCreateEventPtr evntRecPt
// Seize a Create Event record, the Coordinator will now have two seized // Seize a Create Event record, the Coordinator will now have two seized
// but that's ok, it's like a recursion // but that's ok, it's like a recursion
CRASH_INSERTION2(6009, getOwnNodeId() != c_masterNodeId);
SubCreateReq * sumaReq = (SubCreateReq *)signal->getDataPtrSend(); SubCreateReq * sumaReq = (SubCreateReq *)signal->getDataPtrSend();
sumaReq->senderRef = reference(); // reference to DICT sumaReq->senderRef = reference(); // reference to DICT
...@@ -10099,6 +10105,8 @@ busy: ...@@ -10099,6 +10105,8 @@ busy:
*/ */
ndbrequire(refToBlock(origSenderRef) == DBDICT); ndbrequire(refToBlock(origSenderRef) == DBDICT);
CRASH_INSERTION(6007);
{ {
SubStartReq* req = (SubStartReq*) signal->getDataPtrSend(); SubStartReq* req = (SubStartReq*) signal->getDataPtrSend();
...@@ -10328,6 +10336,9 @@ busy: ...@@ -10328,6 +10336,9 @@ busy:
ndbout_c("SUB_STOP_REQ 2"); ndbout_c("SUB_STOP_REQ 2");
#endif #endif
ndbrequire(refToBlock(origSenderRef) == DBDICT); ndbrequire(refToBlock(origSenderRef) == DBDICT);
CRASH_INSERTION(6008);
{ {
SubStopReq* req = (SubStopReq*) signal->getDataPtrSend(); SubStopReq* req = (SubStopReq*) signal->getDataPtrSend();
...@@ -10653,6 +10664,8 @@ Dbdict::execSUB_REMOVE_REQ(Signal* signal) ...@@ -10653,6 +10664,8 @@ Dbdict::execSUB_REMOVE_REQ(Signal* signal)
subbPtr.p->m_errorCode = 0; subbPtr.p->m_errorCode = 0;
} }
CRASH_INSERTION2(6010, getOwnNodeId() != c_masterNodeId);
SubRemoveReq* req = (SubRemoveReq*) signal->getDataPtrSend(); SubRemoveReq* req = (SubRemoveReq*) signal->getDataPtrSend();
req->senderRef = reference(); req->senderRef = reference();
req->senderData = subbPtr.i; req->senderData = subbPtr.i;
......
...@@ -2041,10 +2041,10 @@ private: ...@@ -2041,10 +2041,10 @@ private:
KeyTable2<OpDropIndex, OpRecordUnion> c_opDropIndex; KeyTable2<OpDropIndex, OpRecordUnion> c_opDropIndex;
KeyTable2<OpAlterIndex, OpRecordUnion> c_opAlterIndex; KeyTable2<OpAlterIndex, OpRecordUnion> c_opAlterIndex;
KeyTable2<OpBuildIndex, OpRecordUnion> c_opBuildIndex; KeyTable2<OpBuildIndex, OpRecordUnion> c_opBuildIndex;
KeyTable2<OpCreateEvent, OpRecordUnion> c_opCreateEvent; KeyTable2C<OpCreateEvent, OpRecordUnion> c_opCreateEvent;
KeyTable2<OpSubEvent, OpRecordUnion> c_opSubEvent; KeyTable2C<OpSubEvent, OpRecordUnion> c_opSubEvent;
KeyTable2<OpDropEvent, OpRecordUnion> c_opDropEvent; KeyTable2C<OpDropEvent, OpRecordUnion> c_opDropEvent;
KeyTable2<OpSignalUtil, OpRecordUnion> c_opSignalUtil; KeyTable2C<OpSignalUtil, OpRecordUnion> c_opSignalUtil;
KeyTable2<OpCreateTrigger, OpRecordUnion> c_opCreateTrigger; KeyTable2<OpCreateTrigger, OpRecordUnion> c_opCreateTrigger;
KeyTable2<OpDropTrigger, OpRecordUnion> c_opDropTrigger; KeyTable2<OpDropTrigger, OpRecordUnion> c_opDropTrigger;
KeyTable2<OpAlterTrigger, OpRecordUnion> c_opAlterTrigger; KeyTable2<OpAlterTrigger, OpRecordUnion> c_opAlterTrigger;
......
...@@ -40,4 +40,76 @@ public: ...@@ -40,4 +40,76 @@ public:
} }
}; };
template <class T, class U>
class KeyTable2C : public KeyTable2<T, U> {
Uint32 m_count;
public:
KeyTable2C(ArrayPool<U>& pool) :
KeyTable2<T, U>(pool), m_count(0) {
}
Uint32 get_count() const { return m_count; }
bool seize(Ptr<T> & ptr) {
if (KeyTable2<T, U>::seize(ptr))
{
m_count ++;
return true;
}
return false;
}
void add(Ptr<T> & ptr) {
KeyTable2<T, U>::add(ptr);
m_count ++;
}
void remove(Ptr<T> & ptr, const T & key) {
KeyTable2<T, U>::remove(ptr, key);
if (ptr.i != RNIL)
{
assert(m_count);
m_count --;
}
}
void remove(Uint32 i) {
KeyTable2<T, U>::remove(i);
assert(m_count);
m_count --;
}
void remove(Ptr<T> & ptr) {
KeyTable2<T, U>::remove(ptr);
assert(m_count);
m_count --;
}
void removeAll() {
KeyTable2<T, U>::removeAll();
m_count = 0;
}
void release(Ptr<T> & ptr, const T & key) {
KeyTable2<T, U>::release(ptr, key);
if (ptr.i != RNIL)
{
assert(m_count);
m_count --;
}
}
void release(Uint32 i) {
KeyTable2<T, U>::release(i);
assert(m_count);
m_count --;
}
void release(Ptr<T> & ptr) {
KeyTable2<T, U>::release(ptr);
assert(m_count);
m_count --;
}
};
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment