Commit abfdb348 authored by unknown's avatar unknown

Bug #4479

Ensures that the node doesn't crash by overflowing the UNDO log buffer
at local checkpoints. Inserts a real-time break after 512 operations
and when low on UNDO log buffer.

parent 9a3c364c
......@@ -218,6 +218,7 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: "
#define ZREL_FRAG 6
#define ZREL_DIR 7
#define ZREPORT_MEMORY_USAGE 8
#define ZLCP_OP_WRITE_RT_BREAK 9
/* ------------------------------------------------------------------------- */
/* ERROR CODES */
......@@ -1190,6 +1191,7 @@ private:
void zpagesize_error(const char* where);
void reportMemoryUsage(Signal* signal, int gth);
void lcp_write_op_to_undolog(Signal* signal);
// Initialisation
......
......@@ -46,13 +46,17 @@ Dbacc::remainingUndoPages(){
ndbrequire(HeadPage>=TailPage);
Uint32 UsedPages = HeadPage - TailPage;
Uint32 Remaining = cundopagesize - UsedPages;
Int32 Remaining = cundopagesize - UsedPages;
// There can not be more than cundopagesize remaining
ndbrequire(Remaining<=cundopagesize);
if (Remaining <= 0){
// No more undolog, crash node
progError(__LINE__,
ERR_NO_MORE_UNDOLOG,
"There are more than 1Mbyte undolog writes outstanding");
}
return Remaining;
}//Dbacc::remainingUndoPages()
}
void
Dbacc::updateLastUndoPageIdWritten(Signal* signal, Uint32 aNewValue){
......@@ -193,6 +197,17 @@ void Dbacc::execCONTINUEB(Signal* signal)
return;
}
case ZLCP_OP_WRITE_RT_BREAK:
{
operationRecPtr.i= signal->theData[1];
fragrecptr.i= signal->theData[2];
lcpConnectptr.i= signal->theData[3];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
lcp_write_op_to_undolog(signal);
return;
}
default:
ndbrequire(false);
break;
......@@ -7697,32 +7712,70 @@ void Dbacc::execACC_LCPREQ(Signal* signal)
fragrecptr.p->lcpMaxOverDirIndex = fragrecptr.p->lastOverIndex;
fragrecptr.p->createLcp = ZTRUE;
operationRecPtr.i = fragrecptr.p->lockOwnersList;
while (operationRecPtr.i != RNIL) {
lcp_write_op_to_undolog(signal);
}
void
Dbacc::lcp_write_op_to_undolog(Signal* signal)
{
bool delay_continueb= false;
Uint32 i, j;
for (i= 0; i < 16; i++) {
jam();
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
if (remainingUndoPages() <= ZMIN_UNDO_PAGES_AT_COMMIT) {
jam();
delay_continueb= true;
break;
}
for (j= 0; j < 32; j++) {
if (operationRecPtr.i == RNIL) {
jam();
break;
}
jam();
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
if ((operationRecPtr.p->operation == ZINSERT) ||
(operationRecPtr.p->elementIsDisappeared == ZTRUE)){
if ((operationRecPtr.p->operation == ZINSERT) ||
(operationRecPtr.p->elementIsDisappeared == ZTRUE)){
/*******************************************************************
* Only log inserts and elements that are marked as dissapeared.
* All other operations update the element header and that is handled
* when pages are written to disk
********************************************************************/
undopageptr.i = (cundoposition>>ZUNDOPAGEINDEXBITS) & (cundopagesize-1);
ptrAss(undopageptr, undopage);
theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
tundoindex = theadundoindex + ZUNDOHEADSIZE;
writeUndoOpInfo(signal);/* THE INFORMATION ABOUT ELEMENT HEADER, STORED*/
/* IN OP REC, IS WRITTEN AT UNDO PAGES */
cundoElemIndex = 0;/* DEFAULT VALUE USED BY WRITE_UNDO_HEADER SUBROTINE */
writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */
checkUndoPages(signal); /* SEND UNDO PAGE TO DISK WHEN A GROUP OF */
/* UNDO PAGES,CURRENTLY 8, IS FILLED */
}//if
undopageptr.i = (cundoposition>>ZUNDOPAGEINDEXBITS) & (cundopagesize-1);
ptrAss(undopageptr, undopage);
theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
tundoindex = theadundoindex + ZUNDOHEADSIZE;
operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp;
}//while
writeUndoOpInfo(signal);/* THE INFORMATION ABOUT ELEMENT HEADER, STORED*/
/* IN OP REC, IS WRITTEN AT UNDO PAGES */
cundoElemIndex = 0;/* DEFAULT VALUE USED BY WRITE_UNDO_HEADER SUBROTINE */
writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */
checkUndoPages(signal); /* SEND UNDO PAGE TO DISK WHEN A GROUP OF */
/* UNDO PAGES,CURRENTLY 8, IS FILLED */
}
operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp;
}
if (operationRecPtr.i == RNIL) {
jam();
break;
}
}
if (operationRecPtr.i != RNIL) {
jam();
signal->theData[0]= ZLCP_OP_WRITE_RT_BREAK;
signal->theData[1]= operationRecPtr.i;
signal->theData[2]= fragrecptr.i;
signal->theData[3]= lcpConnectptr.i;
if (delay_continueb) {
jam();
sendSignalWithDelay(cownBlockref, GSN_CONTINUEB, signal, 10, 4);
} else {
jam();
sendSignal(cownBlockref, GSN_CONTINUEB, signal, 4, JBB);
}
return;
}
signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPSTARTED,
......@@ -7735,8 +7788,7 @@ void Dbacc::execACC_LCPREQ(Signal* signal)
signal->theData[0] = lcpConnectptr.i;
signal->theData[1] = fragrecptr.i;
sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
return;
}//Dbacc::execACC_LCPREQ()
}
/* ******************--------------------------------------------------------------- */
/* ACC_SAVE_PAGES A GROUP OF PAGES IS ALLOCATED. THE PAGES AND OVERFLOW */
......@@ -8595,12 +8647,6 @@ void Dbacc::checkUndoPages(Signal* signal)
* RECORDS IN
*/
Uint16 nextUndoPageId = tundoPageId + 1;
if (nextUndoPageId > (clastUndoPageIdWritten + cundopagesize)){
// No more undolog, crash node
progError(__LINE__,
ERR_NO_MORE_UNDOLOG,
"There are more than 1Mbyte undolog writes outstanding");
}
updateUndoPositionPage(signal, nextUndoPageId << ZUNDOPAGEINDEXBITS);
if ((tundoPageId & (ZWRITE_UNDOPAGESIZE - 1)) == (ZWRITE_UNDOPAGESIZE - 1)) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment