Commit 2082e8b9 authored by joreland@mysql.com's avatar joreland@mysql.com

bug#4909 + testSystemRestart -n SR_FULLDB

1) Fix so that scan takeover is possible after SR
2) Reserve two pages for SR "zero pages"
parent a622fdc9
......@@ -746,8 +746,6 @@ protected:
int prepareSendInterpreted(); // Help routine to prepare*
void TCOPCONF(Uint32 anNdbColumnImplLen); // Handle TC[KEY/INDX]CONF signal
int receiveTCKEYREF(NdbApiSignal*);
......
......@@ -107,6 +107,7 @@ operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){
#endif
//#define MARKER_TRACE 1
//#define TRACE_SCAN_TAKEOVER 1
const Uint32 NR_ScanNo = 0;
......@@ -3574,6 +3575,10 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
key.scanNumber = KeyInfo20::getScanNo(regTcPtr->tcScanInfo);
key.fragPtrI = fragptr.i;
c_scanTakeOverHash.find(scanptr, key);
#ifdef TRACE_SCAN_TAKEOVER
if(scanptr.i == RNIL)
ndbout_c("not finding (%d %d)", key.scanNumber, key.fragPtrI);
#endif
}
if (scanptr.i == RNIL) {
jam();
......@@ -8272,7 +8277,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanLocalref[1] = 0;
scanptr.p->scanLocalFragid = 0;
scanptr.p->scanTcWaiting = ZTRUE;
scanptr.p->scanNumber = ZNIL;
scanptr.p->scanNumber = ~0;
for (Uint32 i = 0; i < scanConcurrentOperations; i++) {
jam();
......@@ -8327,6 +8332,11 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
#ifdef VM_TRACE
ScanRecordPtr tmp;
ndbrequire(!c_scanTakeOverHash.find(tmp, * scanptr.p));
#endif
#ifdef TRACE_SCAN_TAKEOVER
ndbout_c("adding (%d %d) table: %d fragId: %d frag.i: %d tableFragptr: %d",
scanptr.p->scanNumber, scanptr.p->fragPtrI,
tabptr.i, scanFragReq->fragmentNo, fragptr.i, fragptr.p->tableFragptr);
#endif
c_scanTakeOverHash.add(scanptr);
}
......@@ -8418,6 +8428,9 @@ void Dblqh::finishScanrec(Signal* signal)
if(scanptr.p->scanKeyinfoFlag){
jam();
ScanRecordPtr tmp;
#ifdef TRACE_SCAN_TAKEOVER
ndbout_c("removing (%d %d)", scanptr.p->scanNumber, scanptr.p->fragPtrI);
#endif
c_scanTakeOverHash.remove(tmp, * scanptr.p);
ndbrequire(tmp.p == scanptr.p);
}
......@@ -8461,6 +8474,9 @@ void Dblqh::finishScanrec(Signal* signal)
ndbrequire(!c_scanTakeOverHash.find(tmp, * restart.p));
#endif
c_scanTakeOverHash.add(restart);
#ifdef TRACE_SCAN_TAKEOVER
ndbout_c("adding-r (%d %d)", restart.p->scanNumber, restart.p->fragPtrI);
#endif
}
scanptr = restart;
......@@ -12034,18 +12050,18 @@ void Dblqh::writeLogfileLab(Signal* signal)
/* WRITE. */
/*---------------------------------------------------------------------------*/
switch (logFilePtr.p->fileChangeState) {
#if 0
case LogFileRecord::BOTH_WRITES_ONGOING:
jam();
ndbout_c("not crashing!!");
// Fall-through
#endif
case LogFileRecord::NOT_ONGOING:
jam();
checkGcpCompleted(signal,
((lfoPtr.p->lfoPageNo + lfoPtr.p->noPagesRw) - 1),
lfoPtr.p->lfoWordWritten);
break;
#if 0
case LogFileRecord::BOTH_WRITES_ONGOING:
jam();
ndbout_c("not crashing!!");
// Fall-through
#endif
case LogFileRecord::WRITE_PAGE_ZERO_ONGOING:
case LogFileRecord::LAST_WRITE_ONGOING:
jam();
......@@ -13133,20 +13149,11 @@ void Dblqh::execSTART_FRAGREQ(Signal* signal)
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
if (!getFragmentrec(signal, fragId)) {
jam();
/* ----------------------------------------------------------------------
* FRAGMENT WAS NOT DEFINED YET. PUT IT IN. IF NO LOCAL CHECKPOINT EXISTED
* THEN THE FRAGMENT HAS ALREADY BEEN ADDED.
* ---------------------------------------------------------------------- */
if (!insertFragrec(signal, fragId)) {
jam();
startFragRefLab(signal);
return;
}//if
}//if
tabptr.p->tableStatus = Tablerec::TABLE_DEFINED;
initFragrec(signal, tabptr.i, fragId, ZPRIMARY_NODE);
initFragrecSr(signal);
if (startFragReq->lcpNo == ZNIL) {
jam();
......@@ -16414,6 +16421,7 @@ void Dblqh::initFragrec(Signal* signal,
fragptr.p->execSrNoReplicas = 0;
fragptr.p->fragDistributionKey = 0;
fragptr.p->activeTcCounter = 0;
fragptr.p->tableFragptr = RNIL;
}//Dblqh::initFragrec()
/* ==========================================================================
......
......@@ -5280,8 +5280,9 @@ void Dbtc::execTCROLLBACKREQ(Signal* signal)
signal->theData[1] = apiConnectptr.p->transid[0];
signal->theData[2] = apiConnectptr.p->transid[1];
signal->theData[3] = ZROLLBACKNOTALLOWED;
signal->theData[4] = apiConnectptr.p->apiConnectstate;
sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_TCROLLBACKREF,
signal, 4, JBB);
signal, 5, JBB);
break;
/* SEND A REFUSAL SIGNAL*/
case CS_ABORTING:
......
......@@ -2322,10 +2322,15 @@ private:
// Counters for num UNDO log records executed
Uint32 cSrUndoRecords[9];
STATIC_CONST(MAX_PARALLELL_TUP_SRREQ = 2);
Uint32 c_sr_free_page_0;
Uint32 c_errorInsert4000TableId;
void initGlobalTemporaryVars();
void reportMemoryUsage(Signal* signal, int incDec);
#ifdef VM_TRACE
struct Th {
Uint32 data[1];
......
......@@ -201,6 +201,10 @@ Dbtup::execDUMP_STATE_ORD(Signal* signal)
ndbrequire(chunk.pageCount <= alloc);
if(chunk.pageCount != 0){
chunks.push_back(chunk);
if(chunk.pageCount != alloc) {
ndbout_c(" Tried to allocate %d - only allocated %d - free: %d",
alloc, chunk.pageCount, free);
}
} else {
ndbout_c(" Failed to alloc %d pages with %d pages free",
alloc, free);
......@@ -212,6 +216,9 @@ Dbtup::execDUMP_STATE_ORD(Signal* signal)
ptrCheckGuard(pagePtr, cnoOfPage, page);
pagePtr.p->pageWord[ZPAGE_STATE_POS] = ~ZFREE_COMMON;
}
if(alloc == 1 && free > 0)
ndbrequire(chunk.pageCount == alloc);
}
break;
}
......
......@@ -139,19 +139,21 @@ void Dbtup::initializePage()
ptrAss(pagePtr, page);
pagePtr.p->pageWord[ZPAGE_STATE_POS] = ~ZFREE_COMMON;
returnCommonArea(1, cnoOfPage - 1);
cnoOfAllocatedPages = 1;
cnoOfAllocatedPages = 1 + MAX_PARALLELL_TUP_SRREQ;
returnCommonArea(cnoOfAllocatedPages, cnoOfPage - cnoOfAllocatedPages);
c_sr_free_page_0 = ~0;
}//Dbtup::initializePage()
void Dbtup::allocConsPages(Uint32 noOfPagesToAllocate,
Uint32& noOfPagesAllocated,
Uint32& allocPageRef)
{
if (noOfPagesToAllocate == 0) {
if (noOfPagesToAllocate == 0){
ljam();
noOfPagesAllocated = 0;
return;
}//if
Uint32 firstListToCheck = nextHigherTwoLog(noOfPagesToAllocate - 1);
for (Uint32 i = firstListToCheck; i < 16; i++) {
ljam();
......
......@@ -92,11 +92,24 @@ void Dbtup::rfrReadRestartInfoLab(Signal* signal, RestartInfoRecordPtr riPtr)
seizeDiskBufferSegmentRecord(dbsiPtr);
riPtr.p->sriDataBufferSegmentP = dbsiPtr.i;
Uint32 retPageRef;
Uint32 retPageRef = RNIL;
Uint32 noAllocPages = 1;
Uint32 noOfPagesAllocated;
allocConsPages(noAllocPages, noOfPagesAllocated, retPageRef);
ndbrequire(noOfPagesAllocated == 1);
{
/**
* Use low pages for 0-pages during SR
* bitmask of free pages is kept in c_sr_free_page_0
*/
Uint32 tmp = c_sr_free_page_0;
for(Uint32 i = 1; i<(1+MAX_PARALLELL_TUP_SRREQ); i++){
if(tmp & (1 << i)){
retPageRef = i;
c_sr_free_page_0 = tmp & (~(1 << i));
break;
}
}
ndbrequire(retPageRef != RNIL);
}
dbsiPtr.p->pdxDataPage[0] = retPageRef;
dbsiPtr.p->pdxNumDataPages = 1;
......@@ -150,7 +163,10 @@ Dbtup::rfrInitRestartInfoLab(Signal* signal, DiskBufferSegmentInfoPtr dbsiPtr)
/* LETS REMOVE IT AND REUSE THE SEGMENT FOR REAL DATA PAGES */
/* REMOVE ONE PAGE ONLY, PAGEP IS ALREADY SET TO THE RESTART INFO PAGE */
/************************************************************************/
returnCommonArea(pagePtr.i, 1);
{
ndbrequire(pagePtr.i > 0 && pagePtr.i <= MAX_PARALLELL_TUP_SRREQ);
c_sr_free_page_0 |= (1 << pagePtr.i);
}
Uint32 undoFileVersion = TzeroDataPage[ZSRI_UNDO_FILE_VER];
lliPtr.i = (undoFileVersion << 2) + (regTabPtr.i & 0x3);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment