Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
39e68ea9
Commit
39e68ea9
authored
May 04, 2006
by
jonas@perch.ndb.mysql.com
Browse files
Options
Browse Files
Download
Plain Diff
Merge perch.ndb.mysql.com:/home/jonas/src/51-work
into perch.ndb.mysql.com:/home/jonas/src/51-jonas
parents
0b38b42f
851132fb
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
282 additions
and
115 deletions
+282
-115
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+179
-108
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
+37
-0
storage/ndb/src/kernel/blocks/pgman.cpp
storage/ndb/src/kernel/blocks/pgman.cpp
+48
-5
storage/ndb/src/kernel/blocks/pgman.hpp
storage/ndb/src/kernel/blocks/pgman.hpp
+18
-2
No files found.
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
View file @
39e68ea9
...
@@ -1394,6 +1394,7 @@ void Dblqh::execTUP_ADD_ATTCONF(Signal* signal)
...
@@ -1394,6 +1394,7 @@ void Dblqh::execTUP_ADD_ATTCONF(Signal* signal)
if (! DictTabInfo::isOrderedIndex(addfragptr.p->tableType))
if (! DictTabInfo::isOrderedIndex(addfragptr.p->tableType))
{
{
fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED;
fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED;
//fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
}
}
else
else
...
@@ -2470,6 +2471,8 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
...
@@ -2470,6 +2471,8 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
jamEntry();
jamEntry();
tcConnectptr.i = tcIndex;
tcConnectptr.i = tcIndex;
ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
TcConnectionrec * regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
FragrecordPtr regFragptr;
FragrecordPtr regFragptr;
regFragptr.i = tcConnectptr.p->fragmentptr;
regFragptr.i = tcConnectptr.p->fragmentptr;
...
@@ -2497,6 +2500,32 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
...
@@ -2497,6 +2500,32 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
// Abort was not ready to start until this signal came back. Now we are ready
// Abort was not ready to start until this signal came back. Now we are ready
// to start the abort.
// to start the abort.
/* ------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------- */
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
ndbrequire(regTcPtr->m_nr_delete.m_cnt);
regTcPtr->m_nr_delete.m_cnt--;
if (regTcPtr->m_nr_delete.m_cnt)
{
jam();
/**
* Let operation wait for pending NR operations
* even for before writing log...(as it's simpler)
*/
#ifdef VM_TRACE
/**
* Only disk table can have pending ops...
*/
TablerecPtr tablePtr;
tablePtr.i = regTcPtr->tableref;
ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec);
ndbrequire(tablePtr.p->m_disk_table);
#endif
return;
}
}
abortCommonLab(signal);
abortCommonLab(signal);
break;
break;
case TcConnectionrec::WAIT_ACC_ABORT:
case TcConnectionrec::WAIT_ACC_ABORT:
...
@@ -2523,13 +2552,23 @@ void Dblqh::execTUPKEYREF(Signal* signal)
...
@@ -2523,13 +2552,23 @@ void Dblqh::execTUPKEYREF(Signal* signal)
tcConnectptr.i = tupKeyRef->userRef;
tcConnectptr.i = tupKeyRef->userRef;
terrorCode = tupKeyRef->errorCode;
terrorCode = tupKeyRef->errorCode;
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec* regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
FragrecordPtr regFragptr;
FragrecordPtr regFragptr;
regFragptr
.
i
=
tcConnectptr
.
p
->
fragmentptr
;
regFragptr.i =
regTcPtr
->fragmentptr;
c_fragment_pool.getPtr(regFragptr);
c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr;
fragptr = regFragptr;
TcConnectionrec
*
regTcPtr
=
tcConnectptr
.
p
;
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
ndbrequire(regTcPtr->m_nr_delete.m_cnt);
regTcPtr->m_nr_delete.m_cnt--;
ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP ||
regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT);
}
switch (tcConnectptr.p->transactionState) {
switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP:
case TcConnectionrec::WAIT_TUP:
jam();
jam();
...
@@ -3767,7 +3806,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
...
@@ -3767,7 +3806,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
jamEntry();
jamEntry();
execACC_ABORTCONF
(
signal
);
packLqhkeyreqLab
(signal);
}
}
}
}
...
@@ -3890,16 +3929,17 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr)
...
@@ -3890,16 +3929,17 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr)
if (TRACENR_FLAG)
if (TRACENR_FLAG)
TRACENR(" performing DELETE key: "
TRACENR(" performing DELETE key: "
<< dst[0] << endl);
<< dst[0] << endl);
regTcPtr
.
p
->
tupkeyData
[
0
]
=
regTcPtr
.
p
->
m_row_id
.
ref
();
if
(
g_key_descriptor_pool
.
getPtr
(
tableId
)
->
hasCharAttr
)
nr_copy_delete_row(signal, regTcPtr, ®TcPtr.p->m_row_id, len);
{
ndbassert(regTcPtr.p->m_nr_delete.m_cnt);
regTcPtr
.
p
->
hashValue
=
calculateHash
(
tableId
,
dst
);
regTcPtr.p->m_nr_delete.m_cnt--; // No real op is run
}
if (regTcPtr.p->m_nr_delete.m_cnt)
else
{
{
regTcPtr
.
p
->
hashValue
=
md5_hash
((
Uint64
*
)
dst
,
len
);
jam();
return;
}
}
goto
run
;
packLqhkeyreqLab(signal);
return;
}
}
else if (len == 0 && op == ZDELETE)
else if (len == 0 && op == ZDELETE)
{
{
...
@@ -3993,9 +4033,7 @@ update_gci_ignore:
...
@@ -3993,9 +4033,7 @@ update_gci_ignore:
signal->theData[0] = regTcPtr.p->tupConnectrec;
signal->theData[0] = regTcPtr.p->tupConnectrec;
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
regTcPtr
.
p
->
transactionState
=
TcConnectionrec
::
WAIT_ACC_ABORT
;
packLqhkeyreqLab(signal);
signal
->
theData
[
0
]
=
regTcPtr
.
i
;
execACC_ABORTCONF
(
signal
);
}
}
int
int
...
@@ -4149,7 +4187,6 @@ Dblqh::get_nr_op_info(Nr_op_info* op, Uint32 page_id)
...
@@ -4149,7 +4187,6 @@ Dblqh::get_nr_op_info(Nr_op_info* op, Uint32 page_id)
op->m_gci = tcPtr.p->gci;
op->m_gci = tcPtr.p->gci;
op->m_tup_frag_ptr_i = fragPtr.p->tupFragptr;
op->m_tup_frag_ptr_i = fragPtr.p->tupFragptr;
ndbrequire
(
tcPtr
.
p
->
transactionState
==
TcConnectionrec
::
WAIT_TUP_COMMIT
);
ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY);
ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY);
ndbrequire(tcPtr.p->m_nr_delete.m_cnt);
ndbrequire(tcPtr.p->m_nr_delete.m_cnt);
...
@@ -4194,16 +4231,36 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op)
...
@@ -4194,16 +4231,36 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op)
tcPtr.i = op->m_ptr_i;
tcPtr.i = op->m_ptr_i;
ptrCheckGuard(tcPtr, ctcConnectrecFileSize, tcConnectionrec);
ptrCheckGuard(tcPtr, ctcConnectrecFileSize, tcConnectionrec);
ndbrequire
(
tcPtr
.
p
->
transactionState
==
TcConnectionrec
::
WAIT_TUP_COMMIT
);
ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY);
ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY);
ndbrequire(tcPtr.p->m_nr_delete.m_cnt);
ndbrequire(tcPtr.p->m_nr_delete.m_cnt);
tcPtr.p->m_nr_delete.m_cnt--;
tcPtr.p->m_nr_delete.m_cnt--;
if (tcPtr.p->m_nr_delete.m_cnt == 0)
if (tcPtr.p->m_nr_delete.m_cnt == 0)
{
{
jam();
tcConnectptr = tcPtr;
tcConnectptr = tcPtr;
c_fragment_pool.getPtr(fragptr, tcPtr.p->fragmentptr);
c_fragment_pool.getPtr(fragptr, tcPtr.p->fragmentptr);
if (tcPtr.p->abortState != TcConnectionrec::ABORT_IDLE)
{
jam();
tcPtr.p->activeCreat = Fragrecord::AC_NORMAL;
abortCommonLab(signal);
}
else if (tcPtr.p->operation == ZDELETE &&
LqhKeyReq::getNrCopyFlag(tcPtr.p->reqinfo))
{
/**
* This is run directly in handle_nr_copy
*/
jam();
packLqhkeyreqLab(signal);
packLqhkeyreqLab(signal);
}
else
{
jam();
rwConcludedLab(signal);
}
return;
return;
}
}
...
@@ -4319,7 +4376,6 @@ void Dblqh::execACCKEYCONF(Signal* signal)
...
@@ -4319,7 +4376,6 @@ void Dblqh::execACCKEYCONF(Signal* signal)
return;
return;
}//if
}//if
// reset the activeCreat since that is only valid in cases where the record was not present.
/* ------------------------------------------------------------------------
/* ------------------------------------------------------------------------
* IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE
* IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE
* INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO
* INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO
...
@@ -4536,6 +4592,7 @@ Dblqh::acckeyconf_load_diskpage(Signal* signal, TcConnectionrecPtr regTcPtr,
...
@@ -4536,6 +4592,7 @@ Dblqh::acckeyconf_load_diskpage(Signal* signal, TcConnectionrecPtr regTcPtr,
}
}
else
else
{
{
regTcPtr.p->transactionState = TcConnectionrec::WAIT_TUP;
TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
ref->userRef= regTcPtr.i;
ref->userRef= regTcPtr.i;
ref->errorCode= ~0;
ref->errorCode= ~0;
...
@@ -4571,6 +4628,7 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal,
...
@@ -4571,6 +4628,7 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal,
}
}
else
else
{
{
regTcPtr->transactionState = TcConnectionrec::WAIT_TUP;
TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
ref->userRef= callbackData;
ref->userRef= callbackData;
ref->errorCode= disk_page;
ref->errorCode= disk_page;
...
@@ -4592,9 +4650,11 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal,
...
@@ -4592,9 +4650,11 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal,
* -------------------------------------------------------------------------- */
* -------------------------------------------------------------------------- */
void Dblqh::tupkeyConfLab(Signal* signal)
void Dblqh::tupkeyConfLab(Signal* signal)
{
{
/* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED ---
-
*/
/* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED --- */
const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
if (regTcPtr->simpleRead) {
if (regTcPtr->simpleRead) {
jam();
jam();
/* ----------------------------------------------------------------------
/* ----------------------------------------------------------------------
...
@@ -4616,6 +4676,34 @@ void Dblqh::tupkeyConfLab(Signal* signal)
...
@@ -4616,6 +4676,34 @@ void Dblqh::tupkeyConfLab(Signal* signal)
}//if
}//if
regTcPtr->totSendlenAi = tupKeyConf->writeLength;
regTcPtr->totSendlenAi = tupKeyConf->writeLength;
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
ndbrequire(regTcPtr->m_nr_delete.m_cnt);
regTcPtr->m_nr_delete.m_cnt--;
if (regTcPtr->m_nr_delete.m_cnt)
{
jam();
/**
* Let operation wait for pending NR operations
* even for before writing log...(as it's simpler)
*/
#ifdef VM_TRACE
/**
* Only disk table can have pending ops...
*/
TablerecPtr tablePtr;
tablePtr.i = regTcPtr->tableref;
ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec);
ndbrequire(tablePtr.p->m_disk_table);
#endif
return;
}
}
rwConcludedLab(signal);
rwConcludedLab(signal);
return;
return;
}//Dblqh::tupkeyConfLab()
}//Dblqh::tupkeyConfLab()
...
@@ -6325,27 +6413,19 @@ Dblqh::tupcommit_conf(Signal* signal,
...
@@ -6325,27 +6413,19 @@ Dblqh::tupcommit_conf(Signal* signal,
/*SEND ANY COMMIT OR COMPLETE MESSAGES TO OTHER NODES. THEY WILL MERELY SEND */
/*SEND ANY COMMIT OR COMPLETE MESSAGES TO OTHER NODES. THEY WILL MERELY SEND */
/*THOSE SIGNALS INTERNALLY. */
/*THOSE SIGNALS INTERNALLY. */
/* ------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------- */
if
(
tcPtrP
->
abortState
==
TcConnectionrec
::
ABORT_IDLE
)
{
if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE)
{
jam();
jam();
if
(
activeCreat
==
Fragrecord
::
AC_NR_COPY
&&
if (activeCreat == Fragrecord::AC_NR_COPY)
tcPtrP
->
m_nr_delete
.
m_cnt
>
1
)
{
{
jam();
jam();
/**
ndbrequire(LqhKeyReq::getNrCopyFlag(tcPtrP->reqinfo));
* Nr delete waiting for disk delete to complete...
ndbrequire(tcPtrP->m_nr_delete.m_cnt == 0);
*/
#ifdef VM_TRACE
TablerecPtr
tablePtr
;
tablePtr
.
i
=
tcPtrP
->
tableref
;
ptrCheckGuard
(
tablePtr
,
ctabrecFileSize
,
tablerec
);
ndbrequire
(
tablePtr
.
p
->
m_disk_table
);
#endif
tcPtrP
->
m_nr_delete
.
m_cnt
--
;
tcPtrP
->
transactionState
=
TcConnectionrec
::
WAIT_TUP_COMMIT
;
return
;
}
}
packLqhkeyreqLab(signal);
packLqhkeyreqLab(signal);
}
else
{
}
else
{
ndbrequire(tcPtrP->abortState != TcConnectionrec::NEW_FROM_TC);
ndbrequire(tcPtrP->abortState != TcConnectionrec::NEW_FROM_TC);
jam();
jam();
sendLqhTransconf(signal, LqhTransConf::Committed);
sendLqhTransconf(signal, LqhTransConf::Committed);
...
@@ -6549,7 +6629,7 @@ void Dblqh::execABORT(Signal* signal)
...
@@ -6549,7 +6629,7 @@ void Dblqh::execABORT(Signal* signal)
}//if
}//if
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
if (ERROR_INSERTED(5100))
if (ERROR_INSERTED(5100))
{
{
SET_ERROR_INSERT_VALUE(5101);
SET_ERROR_INSERT_VALUE(5101);
...
@@ -6574,10 +6654,10 @@ void Dblqh::execABORT(Signal* signal)
...
@@ -6574,10 +6654,10 @@ void Dblqh::execABORT(Signal* signal)
sendSignal(TLqhRef, GSN_ABORT, signal, 4, JBB);
sendSignal(TLqhRef, GSN_ABORT, signal, 4, JBB);
}//if
}//if
regTcPtr->abortState = TcConnectionrec::ABORT_FROM_TC;
regTcPtr->abortState = TcConnectionrec::ABORT_FROM_TC;
regTcPtr
->
activeCreat
=
Fragrecord
::
AC_NORMAL
;
const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
if
(
commitAckMarker
!=
RNIL
){
if(commitAckMarker != RNIL)
{
jam();
jam();
#ifdef MARKER_TRACE
#ifdef MARKER_TRACE
{
{
...
@@ -6627,6 +6707,7 @@ void Dblqh::execABORTREQ(Signal* signal)
...
@@ -6627,6 +6707,7 @@ void Dblqh::execABORTREQ(Signal* signal)
return;
return;
}//if
}//if
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
if (regTcPtr->transactionState != TcConnectionrec::PREPARED) {
if (regTcPtr->transactionState != TcConnectionrec::PREPARED) {
warningReport(signal, 10);
warningReport(signal, 10);
return;
return;
...
@@ -6634,7 +6715,7 @@ void Dblqh::execABORTREQ(Signal* signal)
...
@@ -6634,7 +6715,7 @@ void Dblqh::execABORTREQ(Signal* signal)
regTcPtr->reqBlockref = reqBlockref;
regTcPtr->reqBlockref = reqBlockref;
regTcPtr->reqRef = reqPtr;
regTcPtr->reqRef = reqPtr;
regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
regTcPtr
->
activeCreat
=
Fragrecord
::
AC_NORMAL
;
abortCommonLab(signal);
abortCommonLab(signal);
return;
return;
}//Dblqh::execABORTREQ()
}//Dblqh::execABORTREQ()
...
@@ -6704,22 +6785,7 @@ void Dblqh::execACCKEYREF(Signal* signal)
...
@@ -6704,22 +6785,7 @@ void Dblqh::execACCKEYREF(Signal* signal)
}
}
if
(
tcPtr
->
activeCreat
==
Fragrecord
::
AC_NR_COPY
)
ndbrequire(tcPtr->activeCreat == Fragrecord::AC_NORMAL);
{
jam
();
Uint32
op
=
tcPtr
->
operation
;
switch
(
errCode
){
case
ZNO_TUPLE_FOUND
:
ndbrequire
(
op
==
ZDELETE
);
break
;
break
;
default:
ndbrequire
(
false
);
}
tcPtr
->
activeCreat
=
Fragrecord
::
AC_IGNORED
;
}
else
{
ndbrequire(!LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo));
ndbrequire(!LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo));
/**
/**
...
@@ -6739,7 +6805,6 @@ void Dblqh::execACCKEYREF(Signal* signal)
...
@@ -6739,7 +6805,6 @@ void Dblqh::execACCKEYREF(Signal* signal)
(tcPtr->seqNoReplica == 0 ||
(tcPtr->seqNoReplica == 0 ||
errCode != ZTUPLE_ALREADY_EXIST ||
errCode != ZTUPLE_ALREADY_EXIST ||
(tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple)));
(tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple)));
}
tcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
tcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
abortCommonLab(signal);
abortCommonLab(signal);
...
@@ -6753,7 +6818,6 @@ void Dblqh::localAbortStateHandlerLab(Signal* signal)
...
@@ -6753,7 +6818,6 @@ void Dblqh::localAbortStateHandlerLab(Signal* signal)
jam();
jam();
return;
return;
}//if
}//if
regTcPtr
->
activeCreat
=
Fragrecord
::
AC_NORMAL
;
regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
regTcPtr->errorCode = terrorCode;
regTcPtr->errorCode = terrorCode;
abortStateHandlerLab(signal);
abortStateHandlerLab(signal);
...
@@ -6929,11 +6993,6 @@ void Dblqh::abortErrorLab(Signal* signal)
...
@@ -6929,11 +6993,6 @@ void Dblqh::abortErrorLab(Signal* signal)
regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
regTcPtr->errorCode = terrorCode;
regTcPtr->errorCode = terrorCode;
}//if
}//if
/* -----------------------------------------------------------------------
* ACTIVE CREATION IS RESET FOR ALL ERRORS WHICH SHOULD BE HANDLED
* WITH NORMAL ABORT HANDLING.
* ----------------------------------------------------------------------- */
regTcPtr
->
activeCreat
=
Fragrecord
::
AC_NORMAL
;
abortCommonLab(signal);
abortCommonLab(signal);
return;
return;
}//Dblqh::abortErrorLab()
}//Dblqh::abortErrorLab()
...
@@ -6942,8 +7001,9 @@ void Dblqh::abortCommonLab(Signal* signal)
...
@@ -6942,8 +7001,9 @@ void Dblqh::abortCommonLab(Signal* signal)
{
{
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TcConnectionrec * const regTcPtr = tcConnectptr.p;
const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
if
(
regTcPtr
->
activeCreat
!=
Fragrecord
::
AC_IGNORED
&&
const Uint32 activeCreat = regTcPtr->activeCreat;
commitAckMarker
!=
RNIL
){
if (commitAckMarker != RNIL)
{
/**
/**
* There is no NR ongoing and we have a marker
* There is no NR ongoing and we have a marker
*/
*/
...
@@ -6959,6 +7019,29 @@ void Dblqh::abortCommonLab(Signal* signal)
...
@@ -6959,6 +7019,29 @@ void Dblqh::abortCommonLab(Signal* signal)
regTcPtr->commitAckMarker = RNIL;
regTcPtr->commitAckMarker = RNIL;
}
}
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
if (regTcPtr->m_nr_delete.m_cnt)
{
jam();
/**
* Let operation wait for pending NR operations
*/
#ifdef VM_TRACE
/**
* Only disk table can have pending ops...
*/
TablerecPtr tablePtr;
tablePtr.i = regTcPtr->tableref;
ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec);
ndbrequire(tablePtr.p->m_disk_table);
#endif
return;
}
}
fragptr.i = regTcPtr->fragmentptr;
fragptr.i = regTcPtr->fragmentptr;
if (fragptr.i != RNIL) {
if (fragptr.i != RNIL) {
jam();
jam();
...
@@ -7034,25 +7117,6 @@ void Dblqh::execACC_ABORTCONF(Signal* signal)
...
@@ -7034,25 +7117,6 @@ void Dblqh::execACC_ABORTCONF(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TcConnectionrec * const regTcPtr = tcConnectptr.p;
ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
if
(
regTcPtr
->
activeCreat
==
Fragrecord
::
AC_IGNORED
)
{
/* ----------------------------------------------------------------------
* A NORMAL EVENT DURING CREATION OF A FRAGMENT. WE NOW NEED TO CONTINUE
* WITH NORMAL COMMIT PROCESSING.
* --------------------------------------------------------------------- */
if
(
regTcPtr
->
currTupAiLen
==
regTcPtr
->
totReclenAi
)
{
jam
();
regTcPtr
->
abortState
=
TcConnectionrec
::
ABORT_IDLE
;
fragptr
.
i
=
regTcPtr
->
fragmentptr
;
c_fragment_pool
.
getPtr
(
fragptr
);
rwConcludedLab
(
signal
);
return
;
}
else
{
ndbrequire
(
regTcPtr
->
currTupAiLen
<
regTcPtr
->
totReclenAi
);
jam
();
regTcPtr
->
transactionState
=
TcConnectionrec
::
WAIT_AI_AFTER_ABORT
;
return
;
}
//if
}
//if
continueAbortLab(signal);
continueAbortLab(signal);
return;
return;
}//Dblqh::execACC_ABORTCONF()
}//Dblqh::execACC_ABORTCONF()
...
@@ -9450,7 +9514,7 @@ void Dblqh::initScanTc(const ScanFragReq* req,
...
@@ -9450,7 +9514,7 @@ void Dblqh::initScanTc(const ScanFragReq* req,
tcConnectptr.p->m_offset_current_keybuf = 0;
tcConnectptr.p->m_offset_current_keybuf = 0;
tcConnectptr.p->m_scan_curr_range_no = 0;
tcConnectptr.p->m_scan_curr_range_no = 0;
tcConnectptr.p->m_dealloc = 0;
tcConnectptr.p->m_dealloc = 0;
tcConnectptr.p->activeCreat = Fragrecord::AC_NORMAL;
TablerecPtr tTablePtr;
TablerecPtr tTablePtr;
tTablePtr.i = tabptr.p->primaryTableId;
tTablePtr.i = tabptr.p->primaryTableId;
ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec);
ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec);
...
@@ -9929,16 +9993,21 @@ void Dblqh::continueFirstCopyAfterBlockedLab(Signal* signal)
...
@@ -9929,16 +9993,21 @@ void Dblqh::continueFirstCopyAfterBlockedLab(Signal* signal)
*/
*/
fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
if
(
0
)
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
if (false && fragptr.p->tabRef > 4)
{
{
ndbout_c
(
"STOPPING COPY
(%d -> %d %d %d)
"
,
ndbout_c("STOPPING COPY
X = [ %d %d %d %d ]
",
scanptr
.
p
->
scanBlockref
,
refToBlock(scanptr.p->scanBlockref)
,
scanptr.p->scanAccPtr, RNIL, NextScanReq::ZSCAN_NEXT);
scanptr.p->scanAccPtr, RNIL, NextScanReq::ZSCAN_NEXT);
/**
* RESTART: > DUMP 7020 332 X
*/
return;
return;
}
}
scanptr
.
i
=
tcConnectptr
.
p
->
tcScanRec
;
c_scanRecordPool
.
getPtr
(
scanptr
);
signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = RNIL;
signal->theData[1] = RNIL;
signal->theData[2] = NextScanReq::ZSCAN_NEXT;
signal->theData[2] = NextScanReq::ZSCAN_NEXT;
...
@@ -18351,6 +18420,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
...
@@ -18351,6 +18420,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
<< " tcBlockref = " << hex << tcRec.p->tcBlockref
<< " tcBlockref = " << hex << tcRec.p->tcBlockref
<< " reqBlockref = " << hex << tcRec.p->reqBlockref
<< " reqBlockref = " << hex << tcRec.p->reqBlockref
<< " primKeyLen = " << tcRec.p->primKeyLen
<< " primKeyLen = " << tcRec.p->primKeyLen
<< " nrcopyflag = " << LqhKeyReq::getNrCopyFlag(tcRec.p->reqinfo)
<< endl;
<< endl;
ndbout << " nextReplica = " << tcRec.p->nextReplica
ndbout << " nextReplica = " << tcRec.p->nextReplica
<< " tcBlockref = " << hex << tcRec.p->tcBlockref
<< " tcBlockref = " << hex << tcRec.p->tcBlockref
...
@@ -18421,6 +18491,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
...
@@ -18421,6 +18491,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
<< endl;
<< endl;
ndbout << " tupkeyData2 = " << tcRec.p->tupkeyData[2]
ndbout << " tupkeyData2 = " << tcRec.p->tupkeyData[2]
<< " tupkeyData3 = " << tcRec.p->tupkeyData[3]
<< " tupkeyData3 = " << tcRec.p->tupkeyData[3]
<< " m_nr_delete.m_cnt = " << tcRec.p->m_nr_delete.m_cnt
<< endl;
<< endl;
switch (tcRec.p->transactionState) {
switch (tcRec.p->transactionState) {
...
...
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
View file @
39e68ea9
...
@@ -484,6 +484,14 @@ Dbtup::load_diskpage(Signal* signal,
...
@@ -484,6 +484,14 @@ Dbtup::load_diskpage(Signal* signal,
req
.
m_callback
.
m_callbackFunction
=
req
.
m_callback
.
m_callbackFunction
=
safe_cast
(
&
Dbtup
::
disk_page_load_callback
);
safe_cast
(
&
Dbtup
::
disk_page_load_callback
);
#ifdef ERROR_INSERTED
if
(
ERROR_INSERTED
(
4022
))
{
flags
|=
Page_cache_client
::
DELAY_REQ
;
req
.
m_delay_until_time
=
NdbTick_CurrentMillisecond
()
+
(
Uint64
)
3000
;
}
#endif
if
((
res
=
m_pgman
.
get_page
(
signal
,
req
,
flags
))
>
0
)
if
((
res
=
m_pgman
.
get_page
(
signal
,
req
,
flags
))
>
0
)
{
{
//ndbout_c("in cache");
//ndbout_c("in cache");
...
@@ -3119,6 +3127,35 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData,
...
@@ -3119,6 +3127,35 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData,
preq
.
m_callback
.
m_callbackFunction
=
preq
.
m_callback
.
m_callbackFunction
=
safe_cast
(
&
Dbtup
::
nr_delete_page_callback
);
safe_cast
(
&
Dbtup
::
nr_delete_page_callback
);
int
flags
=
Page_cache_client
::
COMMIT_REQ
;
int
flags
=
Page_cache_client
::
COMMIT_REQ
;
#ifdef ERROR_INSERT
if
(
ERROR_INSERTED
(
4023
)
||
ERROR_INSERTED
(
4024
))
{
int
rnd
=
rand
()
%
100
;
int
slp
=
0
;
if
(
ERROR_INSERTED
(
4024
))
{
slp
=
3000
;
}
else
if
(
rnd
>
90
)
{
slp
=
3000
;
}
else
if
(
rnd
>
70
)
{
slp
=
100
;
}
ndbout_c
(
"rnd: %d slp: %d"
,
rnd
,
slp
);
if
(
slp
)
{
flags
|=
Page_cache_client
::
DELAY_REQ
;
preq
.
m_delay_until_time
=
NdbTick_CurrentMillisecond
()
+
(
Uint64
)
slp
;
}
}
#endif
res
=
m_pgman
.
get_page
(
signal
,
preq
,
flags
);
res
=
m_pgman
.
get_page
(
signal
,
preq
,
flags
);
if
(
res
==
0
)
if
(
res
==
0
)
{
{
...
...
storage/ndb/src/kernel/blocks/pgman.cpp
View file @
39e68ea9
...
@@ -944,12 +944,16 @@ Pgman::process_callback(Signal* signal)
...
@@ -944,12 +944,16 @@ Pgman::process_callback(Signal* signal)
int
max_count
=
1
;
int
max_count
=
1
;
Page_sublist
&
pl_callback
=
*
m_page_sublist
[
Page_entry
::
SL_CALLBACK
];
Page_sublist
&
pl_callback
=
*
m_page_sublist
[
Page_entry
::
SL_CALLBACK
];
while
(
!
pl_callback
.
isEmpty
()
&&
--
max_count
>=
0
)
{
jam
();
Ptr
<
Page_entry
>
ptr
;
Ptr
<
Page_entry
>
ptr
;
pl_callback
.
first
(
ptr
);
pl_callback
.
first
(
ptr
);
if
(
!
process_callback
(
signal
,
ptr
))
while
(
!
ptr
.
isNull
()
&&
--
max_count
>=
0
)
{
jam
();
Ptr
<
Page_entry
>
curr
=
ptr
;
pl_callback
.
next
(
ptr
);
if
(
!
process_callback
(
signal
,
curr
))
{
{
jam
();
jam
();
break
;
break
;
...
@@ -987,6 +991,18 @@ Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr)
...
@@ -987,6 +991,18 @@ Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr)
#ifdef VM_TRACE
#ifdef VM_TRACE
debugOut
<<
"PGMAN: "
<<
req_ptr
<<
" : process_callback"
<<
endl
;
debugOut
<<
"PGMAN: "
<<
req_ptr
<<
" : process_callback"
<<
endl
;
#endif
#endif
#ifdef ERROR_INSERT
if
(
req_ptr
.
p
->
m_flags
&
Page_request
::
DELAY_REQ
)
{
Uint64
now
=
NdbTick_CurrentMillisecond
();
if
(
now
<
req_ptr
.
p
->
m_delay_until_time
)
{
break
;
}
}
#endif
b
=
globalData
.
getBlock
(
req_ptr
.
p
->
m_block
);
b
=
globalData
.
getBlock
(
req_ptr
.
p
->
m_block
);
callback
=
req_ptr
.
p
->
m_callback
;
callback
=
req_ptr
.
p
->
m_callback
;
...
@@ -1314,6 +1330,24 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr)
...
@@ -1314,6 +1330,24 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr)
state
|=
Page_entry
::
MAPPED
;
state
|=
Page_entry
::
MAPPED
;
set_page_state
(
ptr
,
state
);
set_page_state
(
ptr
,
state
);
{
/**
* Update lsn record on page
* as it can be modified/flushed wo/ update_lsn has been called
* (e.g. prealloc) and it then would get lsn 0, which is bad
* when running undo and following SR
*/
Ptr
<
GlobalPage
>
pagePtr
;
m_global_page_pool
.
getPtr
(
pagePtr
,
ptr
.
p
->
m_real_page_i
);
File_formats
::
Datafile
::
Data_page
*
page
=
(
File_formats
::
Datafile
::
Data_page
*
)
pagePtr
.
p
;
Uint64
lsn
=
0
;
lsn
+=
page
->
m_page_header
.
m_page_lsn_hi
;
lsn
<<=
32
;
lsn
+=
page
->
m_page_header
.
m_page_lsn_lo
;
ptr
.
p
->
m_lsn
=
lsn
;
}
ndbrequire
(
m_stats
.
m_current_io_waits
>
0
);
ndbrequire
(
m_stats
.
m_current_io_waits
>
0
);
m_stats
.
m_current_io_waits
--
;
m_stats
.
m_current_io_waits
--
;
...
@@ -1576,6 +1610,12 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
...
@@ -1576,6 +1610,12 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
bool
only_request
=
ptr
.
p
->
m_requests
.
isEmpty
();
bool
only_request
=
ptr
.
p
->
m_requests
.
isEmpty
();
if
(
req_flags
&
Page_request
::
DELAY_REQ
)
{
jam
();
only_request
=
false
;
}
if
(
only_request
&&
if
(
only_request
&&
state
&
Page_entry
::
MAPPED
)
state
&
Page_entry
::
MAPPED
)
{
{
...
@@ -1623,6 +1663,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
...
@@ -1623,6 +1663,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
req_ptr
.
p
->
m_block
=
page_req
.
m_block
;
req_ptr
.
p
->
m_block
=
page_req
.
m_block
;
req_ptr
.
p
->
m_flags
=
page_req
.
m_flags
;
req_ptr
.
p
->
m_flags
=
page_req
.
m_flags
;
req_ptr
.
p
->
m_callback
=
page_req
.
m_callback
;
req_ptr
.
p
->
m_callback
=
page_req
.
m_callback
;
#ifdef ERROR_INSERT
req_ptr
.
p
->
m_delay_until_time
=
page_req
.
m_delay_until_time
;
#endif
state
|=
Page_entry
::
REQUEST
;
state
|=
Page_entry
::
REQUEST
;
if
(
only_request
&&
req_flags
&
Page_request
::
EMPTY_PAGE
)
if
(
only_request
&&
req_flags
&
Page_request
::
EMPTY_PAGE
)
...
...
storage/ndb/src/kernel/blocks/pgman.hpp
View file @
39e68ea9
...
@@ -256,12 +256,18 @@ private:
...
@@ -256,12 +256,18 @@ private:
,
DIRTY_REQ
=
0x0200
// make page dirty wo/ update_lsn
,
DIRTY_REQ
=
0x0200
// make page dirty wo/ update_lsn
,
UNLOCK_PAGE
=
0x0400
,
UNLOCK_PAGE
=
0x0400
,
CORR_REQ
=
0x0800
// correlated request (no LIRS update)
,
CORR_REQ
=
0x0800
// correlated request (no LIRS update)
#ifdef ERROR_INSERT
,
DELAY_REQ
=
0x1000
// Force request to be delayed
#endif
};
};
Uint16
m_block
;
Uint16
m_block
;
Uint16
m_flags
;
Uint16
m_flags
;
SimulatedBlock
::
Callback
m_callback
;
SimulatedBlock
::
Callback
m_callback
;
#ifdef ERROR_INSERT
Uint64
m_delay_until_time
;
#endif
Uint32
nextList
;
Uint32
nextList
;
Uint32
m_magic
;
Uint32
m_magic
;
};
};
...
@@ -508,6 +514,10 @@ public:
...
@@ -508,6 +514,10 @@ public:
struct
Request
{
struct
Request
{
Local_key
m_page
;
Local_key
m_page
;
SimulatedBlock
::
Callback
m_callback
;
SimulatedBlock
::
Callback
m_callback
;
#ifdef ERROR_INSERT
Uint64
m_delay_until_time
;
#endif
};
};
Ptr
<
GlobalPage
>
m_ptr
;
// TODO remove
Ptr
<
GlobalPage
>
m_ptr
;
// TODO remove
...
@@ -520,6 +530,9 @@ public:
...
@@ -520,6 +530,9 @@ public:
,
DIRTY_REQ
=
Pgman
::
Page_request
::
DIRTY_REQ
,
DIRTY_REQ
=
Pgman
::
Page_request
::
DIRTY_REQ
,
UNLOCK_PAGE
=
Pgman
::
Page_request
::
UNLOCK_PAGE
,
UNLOCK_PAGE
=
Pgman
::
Page_request
::
UNLOCK_PAGE
,
CORR_REQ
=
Pgman
::
Page_request
::
CORR_REQ
,
CORR_REQ
=
Pgman
::
Page_request
::
CORR_REQ
#ifdef ERROR_INSERT
,
DELAY_REQ
=
Pgman
::
Page_request
::
DELAY_REQ
#endif
};
};
/**
/**
...
@@ -588,6 +601,9 @@ Page_cache_client::get_page(Signal* signal, Request& req, Uint32 flags)
...
@@ -588,6 +601,9 @@ Page_cache_client::get_page(Signal* signal, Request& req, Uint32 flags)
page_req
.
m_block
=
m_block
;
page_req
.
m_block
=
m_block
;
page_req
.
m_flags
=
flags
;
page_req
.
m_flags
=
flags
;
page_req
.
m_callback
=
req
.
m_callback
;
page_req
.
m_callback
=
req
.
m_callback
;
#ifdef ERROR_INSERT
page_req
.
m_delay_until_time
=
req
.
m_delay_until_time
;
#endif
int
i
=
m_pgman
->
get_page
(
signal
,
entry_ptr
,
page_req
);
int
i
=
m_pgman
->
get_page
(
signal
,
entry_ptr
,
page_req
);
if
(
i
>
0
)
if
(
i
>
0
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment