ndb - make size of redo log files (fragment log files) configurable

  using new config variable FragmentLogFileSize (4M - 1G)
parent 8a9bf139
......@@ -10,7 +10,8 @@ DataDir= CHOOSE_FILESYSTEM
MaxNoOfOrderedIndexes= CHOOSE_MaxNoOfOrderedIndexes
MaxNoOfAttributes= CHOOSE_MaxNoOfAttributes
TimeBetweenGlobalCheckpoints= 500
NoOfFragmentLogFiles= 3
NoOfFragmentLogFiles= 8
FragmentLogFileSize= 6M
DiskPageBufferMemory= CHOOSE_DiskPageBufferMemory
#
......
......@@ -10,7 +10,8 @@ DataDir= CHOOSE_FILESYSTEM
MaxNoOfOrderedIndexes= CHOOSE_MaxNoOfOrderedIndexes
MaxNoOfAttributes= CHOOSE_MaxNoOfAttributes
TimeBetweenGlobalCheckpoints= 500
NoOfFragmentLogFiles= 3
NoOfFragmentLogFiles= 4
FragmentLogFileSize=12M
DiskPageBufferMemory= CHOOSE_DiskPageBufferMemory
# the following parametes just function as a small regression
# test that the parameter exists
......
......@@ -64,6 +64,7 @@
#define CFG_DB_FILESYSTEM_PATH 125
#define CFG_DB_NO_REDOLOG_FILES 126
#define CFG_DB_REDOLOG_FILE_SIZE 140
#define CFG_DB_LCP_DISC_PAGES_TUP 127
#define CFG_DB_LCP_DISC_PAGES_TUP_SR 128
......
......@@ -71,7 +71,6 @@ class Dbtup;
/* CONSTANTS OF THE LOG PAGES */
/* ------------------------------------------------------------------------- */
#define ZPAGE_HEADER_SIZE 32
#define ZNO_MBYTES_IN_FILE 16
#define ZPAGE_SIZE 8192
#define ZPAGES_IN_MBYTE 32
#define ZTWOLOG_NO_PAGES_IN_MBYTE 5
......@@ -142,7 +141,7 @@ class Dbtup;
/* IN THE MBYTE. */
/* ------------------------------------------------------------------------- */
#define ZFD_HEADER_SIZE 3
#define ZFD_PART_SIZE 48
#define ZFD_MBYTE_SIZE 3
#define ZLOG_HEAD_SIZE 8
#define ZNEXT_LOG_SIZE 2
#define ZABORT_LOG_SIZE 3
......@@ -169,7 +168,6 @@ class Dbtup;
#define ZPOS_LOG_TYPE 0
#define ZPOS_NO_FD 1
#define ZPOS_FILE_NO 2
#define ZMAX_LOG_FILES_IN_PAGE_ZERO 40
/* ------------------------------------------------------------------------- */
/* THE POSITIONS WITHIN A PREPARE LOG RECORD AND A NEW PREPARE */
/* LOG RECORD. */
......@@ -1436,17 +1434,17 @@ public:
* header of each log file. That information is used during
* system restart to find the tail of the log.
*/
UintR logLastPrepRef[16];
UintR *logLastPrepRef;
/**
* The max global checkpoint completed before the mbyte in the
* log file was started. One variable per mbyte.
*/
UintR logMaxGciCompleted[16];
UintR *logMaxGciCompleted;
/**
* The max global checkpoint started before the mbyte in the log
* file was started. One variable per mbyte.
*/
UintR logMaxGciStarted[16];
UintR *logMaxGciStarted;
/**
* This variable contains the file name as needed by the file
* system when opening the file.
......@@ -2162,6 +2160,7 @@ private:
void execSTART_RECREF(Signal* signal);
void execGCP_SAVEREQ(Signal* signal);
void execFSOPENREF(Signal* signal);
void execFSOPENCONF(Signal* signal);
void execFSCLOSECONF(Signal* signal);
void execFSWRITECONF(Signal* signal);
......@@ -2671,6 +2670,8 @@ private:
LogPartRecord *logPartRecord;
LogPartRecordPtr logPartPtr;
UintR clogPartFileSize;
Uint32 clogFileSize; // In MBYTE
Uint32 cmaxLogFilesInPageZero; //
// Configurable
LogFileRecord *logFileRecord;
......
......@@ -60,6 +60,8 @@ void Dblqh::initData()
cLqhTimeOutCheckCount = 0;
cbookedAccOps = 0;
m_backup_ptr = RNIL;
clogFileSize = 16;
cmaxLogFilesInPageZero = 40;
}//Dblqh::initData()
void Dblqh::initRecords()
......@@ -260,6 +262,7 @@ Dblqh::Dblqh(Block_context& ctx):
addRecSignal(GSN_START_FRAGREQ, &Dblqh::execSTART_FRAGREQ);
addRecSignal(GSN_START_RECREF, &Dblqh::execSTART_RECREF);
addRecSignal(GSN_GCP_SAVEREQ, &Dblqh::execGCP_SAVEREQ);
addRecSignal(GSN_FSOPENREF, &Dblqh::execFSOPENREF, true);
addRecSignal(GSN_FSOPENCONF, &Dblqh::execFSOPENCONF);
addRecSignal(GSN_FSCLOSECONF, &Dblqh::execFSCLOSECONF);
addRecSignal(GSN_FSWRITECONF, &Dblqh::execFSWRITECONF);
......
......@@ -62,6 +62,7 @@
#include <signaldata/AttrInfo.hpp>
#include <KeyDescriptor.hpp>
#include <signaldata/RouteOrd.hpp>
#include <signaldata/FsRef.hpp>
// Use DEBUG to print messages that should be
// seen only when we debug the product
......@@ -1020,6 +1021,34 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* signal)
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_FRAG, &tmp));
c_fragment_pool.setSize(tmp);
if (!ndb_mgm_get_int_parameter(p, CFG_DB_REDOLOG_FILE_SIZE,
&clogFileSize))
{
// convert to mbyte
clogFileSize = (clogFileSize + 1024*1024 - 1) / (1024 * 1024);
ndbrequire(clogFileSize >= 4 && clogFileSize <= 1024);
}
cmaxLogFilesInPageZero = (ZPAGE_SIZE - ZPAGE_HEADER_SIZE - 128) /
(ZFD_MBYTE_SIZE * clogFileSize);
/**
* "Old" cmaxLogFilesInPageZero was 40
* Each FD need 3 words per mb, require that they can fit into 1 page
* (atleast 1 FD)
* Is also checked in ConfigInfo.cpp (max FragmentLogFileSize = 1Gb)
* 1Gb = 1024Mb => 3(ZFD_MBYTE_SIZE) * 1024 < 8192 (ZPAGE_SIZE)
*/
if (cmaxLogFilesInPageZero > 40)
{
jam();
cmaxLogFilesInPageZero = 40;
}
else
{
ndbrequire(cmaxLogFilesInPageZero);
}
initRecords();
initialiseRecordsLab(signal, 0, ref, senderData);
......@@ -11750,9 +11779,9 @@ void Dblqh::sendStartLcp(Signal* signal)
Uint32 Dblqh::remainingLogSize(const LogFileRecordPtr &sltCurrLogFilePtr,
const LogPartRecordPtr &sltLogPartPtr)
{
Uint32 hf = sltCurrLogFilePtr.p->fileNo*ZNO_MBYTES_IN_FILE+sltCurrLogFilePtr.p->currentMbyte;
Uint32 tf = sltLogPartPtr.p->logTailFileNo*ZNO_MBYTES_IN_FILE+sltLogPartPtr.p->logTailMbyte;
Uint32 sz = sltLogPartPtr.p->noLogFiles*ZNO_MBYTES_IN_FILE;
Uint32 hf = sltCurrLogFilePtr.p->fileNo*clogFileSize+sltCurrLogFilePtr.p->currentMbyte;
Uint32 tf = sltLogPartPtr.p->logTailFileNo*clogFileSize+sltLogPartPtr.p->logTailMbyte;
Uint32 sz = sltLogPartPtr.p->noLogFiles*clogFileSize;
if (tf > hf) hf += sz;
return sz-(hf-tf);
}
......@@ -11810,7 +11839,7 @@ void Dblqh::setLogTail(Signal* signal, Uint32 keepGci)
/* ------------------------------------------------------------------------- */
SLT_LOOP:
for (tsltIndex = tsltStartMbyte;
tsltIndex <= ZNO_MBYTES_IN_FILE - 1;
tsltIndex <= clogFileSize - 1;
tsltIndex++) {
if (sltLogFilePtr.p->logMaxGciStarted[tsltIndex] >= keepGci) {
/* ------------------------------------------------------------------------- */
......@@ -11826,7 +11855,7 @@ void Dblqh::setLogTail(Signal* signal, Uint32 keepGci)
/* ------------------------------------------------------------------------- */
/*STEPPING BACK INCLUDES ALSO STEPPING BACK TO THE PREVIOUS LOG FILE. */
/* ------------------------------------------------------------------------- */
tsltMbyte = ZNO_MBYTES_IN_FILE - 1;
tsltMbyte = clogFileSize - 1;
sltLogFilePtr.i = sltLogFilePtr.p->prevLogFile;
ptrCheckGuard(sltLogFilePtr, clogFileFileSize, logFileRecord);
}//if
......@@ -11864,7 +11893,7 @@ void Dblqh::setLogTail(Signal* signal, Uint32 keepGci)
UintR ToldTailFileNo = sltLogPartPtr.p->logTailFileNo;
UintR ToldTailMByte = sltLogPartPtr.p->logTailMbyte;
arrGuard(tsltMbyte, 16);
arrGuard(tsltMbyte, clogFileSize);
sltLogPartPtr.p->logTailFileNo =
sltLogFilePtr.p->logLastPrepRef[tsltMbyte] >> 16;
/* ------------------------------------------------------------------------- */
......@@ -12364,6 +12393,26 @@ void Dblqh::execFSOPENCONF(Signal* signal)
}//switch
}//Dblqh::execFSOPENCONF()
void
Dblqh::execFSOPENREF(Signal* signal)
{
jamEntry();
FsRef* ref = (FsRef*)signal->getDataPtr();
Uint32 err = ref->errorCode;
if (err == FsRef::fsErrInvalidFileSize)
{
char buf[256];
BaseString::snprintf(buf, sizeof(buf),
"Invalid file size for redo logfile, "
" size only changable with --initial");
progError(__LINE__,
NDBD_EXIT_INVALID_CONFIG,
buf);
return;
}
SimulatedBlock::execFSOPENREF(signal);
}
/* ************>> */
/* FSREADCONF > */
......@@ -13009,7 +13058,7 @@ void Dblqh::openFileInitLab(Signal* signal)
{
logFilePtr.p->logFileStatus = LogFileRecord::OPEN_INIT;
seizeLogpage(signal);
writeSinglePage(signal, (ZNO_MBYTES_IN_FILE * ZPAGES_IN_MBYTE) - 1,
writeSinglePage(signal, (clogFileSize * ZPAGES_IN_MBYTE) - 1,
ZPAGE_SIZE - 1, __LINE__);
lfoPtr.p->lfoState = LogFileOperationRecord::INIT_WRITE_AT_END;
return;
......@@ -13072,7 +13121,7 @@ void Dblqh::writeInitMbyteLab(Signal* signal)
{
releaseLfo(signal);
logFilePtr.p->currentMbyte = logFilePtr.p->currentMbyte + 1;
if (logFilePtr.p->currentMbyte == ZNO_MBYTES_IN_FILE) {
if (logFilePtr.p->currentMbyte == clogFileSize) {
jam();
releaseLogpage(signal);
logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_INIT;
......@@ -13192,7 +13241,7 @@ void Dblqh::initLogfile(Signal* signal, Uint32 fileNo)
logFilePtr.p->lastPageWritten = 0;
logFilePtr.p->logPageZero = RNIL;
logFilePtr.p->currentMbyte = 0;
for (tilIndex = 0; tilIndex <= 15; tilIndex++) {
for (tilIndex = 0; tilIndex < clogFileSize; tilIndex++) {
logFilePtr.p->logMaxGciCompleted[tilIndex] = (UintR)-1;
logFilePtr.p->logMaxGciStarted[tilIndex] = (UintR)-1;
logFilePtr.p->logLastPrepRef[tilIndex] = 0;
......@@ -13243,8 +13292,12 @@ void Dblqh::openFileRw(Signal* signal, LogFileRecordPtr olfLogFilePtr)
signal->theData[3] = olfLogFilePtr.p->fileName[1];
signal->theData[4] = olfLogFilePtr.p->fileName[2];
signal->theData[5] = olfLogFilePtr.p->fileName[3];
signal->theData[6] = ZOPEN_READ_WRITE | FsOpenReq::OM_AUTOSYNC;
signal->theData[6] = ZOPEN_READ_WRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
Uint64 sz = clogFileSize;
sz *= 1024; sz *= 1024;
req->file_size_hi = sz >> 32;
req->file_size_lo = sz & 0xFFFFFFFF;
sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
}//Dblqh::openFileRw()
......@@ -13299,8 +13352,12 @@ void Dblqh::openNextLogfile(Signal* signal)
signal->theData[3] = onlLogFilePtr.p->fileName[1];
signal->theData[4] = onlLogFilePtr.p->fileName[2];
signal->theData[5] = onlLogFilePtr.p->fileName[3];
signal->theData[6] = 2 | FsOpenReq::OM_AUTOSYNC;
signal->theData[6] = 2 | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
Uint64 sz = clogFileSize;
sz *= 1024; sz *= 1024;
req->file_size_hi = sz >> 32;
req->file_size_lo = sz & 0xFFFFFFFF;
sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
}//if
}//Dblqh::openNextLogfile()
......@@ -13431,7 +13488,7 @@ void Dblqh::writeFileDescriptor(Signal* signal)
/* -------------------------------------------------- */
/* START BY WRITING TO LOG FILE RECORD */
/* -------------------------------------------------- */
arrGuard(logFilePtr.p->currentMbyte, 16);
arrGuard(logFilePtr.p->currentMbyte, clogFileSize);
logFilePtr.p->logMaxGciCompleted[logFilePtr.p->currentMbyte] =
logPartPtr.p->logPartNewestCompletedGCI;
logFilePtr.p->logMaxGciStarted[logFilePtr.p->currentMbyte] = cnewestGci;
......@@ -13457,10 +13514,7 @@ void Dblqh::writeFileDescriptor(Signal* signal)
/* ------------------------------------------------------------------------- */
void Dblqh::writeFileHeaderOpen(Signal* signal, Uint32 wmoType)
{
LogFileRecordPtr wmoLogFilePtr;
UintR twmoNoLogDescriptors;
UintR twmoLoop;
UintR twmoIndex;
/* -------------------------------------------------- */
/* WRITE HEADER INFORMATION IN THE NEW FILE. */
......@@ -13468,52 +13522,44 @@ void Dblqh::writeFileHeaderOpen(Signal* signal, Uint32 wmoType)
logPagePtr.p->logPageWord[ZPAGE_HEADER_SIZE + ZPOS_LOG_TYPE] = ZFD_TYPE;
logPagePtr.p->logPageWord[ZPAGE_HEADER_SIZE + ZPOS_FILE_NO] =
logFilePtr.p->fileNo;
if (logPartPtr.p->noLogFiles > ZMAX_LOG_FILES_IN_PAGE_ZERO) {
if (logPartPtr.p->noLogFiles > cmaxLogFilesInPageZero) {
jam();
twmoNoLogDescriptors = ZMAX_LOG_FILES_IN_PAGE_ZERO;
twmoNoLogDescriptors = cmaxLogFilesInPageZero;
} else {
jam();
twmoNoLogDescriptors = logPartPtr.p->noLogFiles;
}//if
logPagePtr.p->logPageWord[ZPAGE_HEADER_SIZE + ZPOS_NO_FD] =
twmoNoLogDescriptors;
wmoLogFilePtr.i = logFilePtr.i;
twmoLoop = 0;
WMO_LOOP:
jam();
if (twmoLoop < twmoNoLogDescriptors) {
jam();
ptrCheckGuard(wmoLogFilePtr, clogFileFileSize, logFileRecord);
for (twmoIndex = 0; twmoIndex <= ZNO_MBYTES_IN_FILE - 1; twmoIndex++) {
jam();
arrGuard(((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(twmoLoop * ZFD_PART_SIZE)) + twmoIndex, ZPAGE_SIZE);
logPagePtr.p->logPageWord[((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(twmoLoop * ZFD_PART_SIZE)) + twmoIndex] =
wmoLogFilePtr.p->logMaxGciCompleted[twmoIndex];
arrGuard((((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(twmoLoop * ZFD_PART_SIZE)) + ZNO_MBYTES_IN_FILE) +
twmoIndex, ZPAGE_SIZE);
logPagePtr.p->logPageWord[(((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(twmoLoop * ZFD_PART_SIZE)) + ZNO_MBYTES_IN_FILE) + twmoIndex] =
wmoLogFilePtr.p->logMaxGciStarted[twmoIndex];
arrGuard((((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(twmoLoop * ZFD_PART_SIZE)) + (2 * ZNO_MBYTES_IN_FILE)) +
twmoIndex, ZPAGE_SIZE);
logPagePtr.p->logPageWord[(((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(twmoLoop * ZFD_PART_SIZE)) + (2 * ZNO_MBYTES_IN_FILE)) + twmoIndex] =
wmoLogFilePtr.p->logLastPrepRef[twmoIndex];
}//for
wmoLogFilePtr.i = wmoLogFilePtr.p->prevLogFile;
twmoLoop = twmoLoop + 1;
goto WMO_LOOP;
}//if
logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] =
(ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(ZFD_PART_SIZE * twmoNoLogDescriptors);
arrGuard(logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX], ZPAGE_SIZE);
logPagePtr.p->logPageWord[logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX]] =
ZNEXT_LOG_RECORD_TYPE;
{
Uint32 pos = ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE;
LogFileRecordPtr filePtr = logFilePtr;
for (Uint32 fd = 0; fd < twmoNoLogDescriptors; fd++)
{
jam();
ptrCheckGuard(filePtr, clogFileFileSize, logFileRecord);
for (Uint32 mb = 0; mb < clogFileSize; mb ++)
{
jam();
Uint32 pos0 = pos + fd * (ZFD_MBYTE_SIZE * clogFileSize) + mb;
Uint32 pos1 = pos0 + clogFileSize;
Uint32 pos2 = pos1 + clogFileSize;
arrGuard(pos0, ZPAGE_SIZE);
arrGuard(pos1, ZPAGE_SIZE);
arrGuard(pos2, ZPAGE_SIZE);
logPagePtr.p->logPageWord[pos0] = filePtr.p->logMaxGciCompleted[mb];
logPagePtr.p->logPageWord[pos1] = filePtr.p->logMaxGciStarted[mb];
logPagePtr.p->logPageWord[pos2] = filePtr.p->logLastPrepRef[mb];
}
filePtr.i = filePtr.p->prevLogFile;
}
pos += (twmoNoLogDescriptors * ZFD_MBYTE_SIZE * clogFileSize);
arrGuard(pos, ZPAGE_SIZE);
logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = pos;
logPagePtr.p->logPageWord[pos] = ZNEXT_LOG_RECORD_TYPE;
}
/* ------------------------------------------------------- */
/* THIS IS A SPECIAL WRITE OF THE FIRST PAGE IN THE */
/* LOG FILE. THIS HAS SPECIAL SIGNIFANCE TO FIND */
......@@ -13658,9 +13704,9 @@ void Dblqh::openSrLastFileLab(Signal* signal)
void Dblqh::readSrLastFileLab(Signal* signal)
{
logPartPtr.p->logLap = logPagePtr.p->logPageWord[ZPOS_LOG_LAP];
if (logPartPtr.p->noLogFiles > ZMAX_LOG_FILES_IN_PAGE_ZERO) {
if (logPartPtr.p->noLogFiles > cmaxLogFilesInPageZero) {
jam();
initGciInLogFileRec(signal, ZMAX_LOG_FILES_IN_PAGE_ZERO);
initGciInLogFileRec(signal, cmaxLogFilesInPageZero);
} else {
jam();
initGciInLogFileRec(signal, logPartPtr.p->noLogFiles);
......@@ -13685,7 +13731,7 @@ void Dblqh::readSrLastMbyteLab(Signal* signal)
logPartPtr.p->lastMbyte = logFilePtr.p->currentMbyte - 1;
}//if
}//if
arrGuard(logFilePtr.p->currentMbyte, 16);
arrGuard(logFilePtr.p->currentMbyte, clogFileSize);
logFilePtr.p->logMaxGciCompleted[logFilePtr.p->currentMbyte] =
logPagePtr.p->logPageWord[ZPOS_MAX_GCI_COMPLETED];
logFilePtr.p->logMaxGciStarted[logFilePtr.p->currentMbyte] =
......@@ -13693,7 +13739,7 @@ void Dblqh::readSrLastMbyteLab(Signal* signal)
logFilePtr.p->logLastPrepRef[logFilePtr.p->currentMbyte] =
logPagePtr.p->logPageWord[ZLAST_LOG_PREP_REF];
releaseLogpage(signal);
if (logFilePtr.p->currentMbyte < (ZNO_MBYTES_IN_FILE - 1)) {
if (logFilePtr.p->currentMbyte < (clogFileSize - 1)) {
jam();
logFilePtr.p->currentMbyte++;
readSinglePage(signal, ZPAGES_IN_MBYTE * logFilePtr.p->currentMbyte);
......@@ -13707,21 +13753,21 @@ void Dblqh::readSrLastMbyteLab(Signal* signal)
* ---------------------------------------------------------------------- */
if (logPartPtr.p->lastMbyte == ZNIL) {
jam();
logPartPtr.p->lastMbyte = ZNO_MBYTES_IN_FILE - 1;
logPartPtr.p->lastMbyte = clogFileSize - 1;
}//if
}//if
logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
closeFile(signal, logFilePtr, __LINE__);
if (logPartPtr.p->noLogFiles > ZMAX_LOG_FILES_IN_PAGE_ZERO) {
if (logPartPtr.p->noLogFiles > cmaxLogFilesInPageZero) {
Uint32 fileNo;
if (logFilePtr.p->fileNo >= ZMAX_LOG_FILES_IN_PAGE_ZERO) {
if (logFilePtr.p->fileNo >= cmaxLogFilesInPageZero) {
jam();
fileNo = logFilePtr.p->fileNo - ZMAX_LOG_FILES_IN_PAGE_ZERO;
fileNo = logFilePtr.p->fileNo - cmaxLogFilesInPageZero;
} else {
jam();
fileNo =
(logPartPtr.p->noLogFiles + logFilePtr.p->fileNo) -
ZMAX_LOG_FILES_IN_PAGE_ZERO;
cmaxLogFilesInPageZero;
}//if
if (fileNo == 0) {
jam();
......@@ -13731,11 +13777,11 @@ void Dblqh::readSrLastMbyteLab(Signal* signal)
* -------------------------------------------------------------------- */
fileNo = 1;
logPartPtr.p->srRemainingFiles =
logPartPtr.p->noLogFiles - (ZMAX_LOG_FILES_IN_PAGE_ZERO - 1);
logPartPtr.p->noLogFiles - (cmaxLogFilesInPageZero - 1);
} else {
jam();
logPartPtr.p->srRemainingFiles =
logPartPtr.p->noLogFiles - ZMAX_LOG_FILES_IN_PAGE_ZERO;
logPartPtr.p->noLogFiles - cmaxLogFilesInPageZero;
}//if
LogFileRecordPtr locLogFilePtr;
findLogfile(signal, fileNo, logPartPtr, &locLogFilePtr);
......@@ -13760,9 +13806,9 @@ void Dblqh::openSrNextFileLab(Signal* signal)
void Dblqh::readSrNextFileLab(Signal* signal)
{
if (logPartPtr.p->srRemainingFiles > ZMAX_LOG_FILES_IN_PAGE_ZERO) {
if (logPartPtr.p->srRemainingFiles > cmaxLogFilesInPageZero) {
jam();
initGciInLogFileRec(signal, ZMAX_LOG_FILES_IN_PAGE_ZERO);
initGciInLogFileRec(signal, cmaxLogFilesInPageZero);
} else {
jam();
initGciInLogFileRec(signal, logPartPtr.p->srRemainingFiles);
......@@ -13770,16 +13816,16 @@ void Dblqh::readSrNextFileLab(Signal* signal)
releaseLogpage(signal);
logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
closeFile(signal, logFilePtr, __LINE__);
if (logPartPtr.p->srRemainingFiles > ZMAX_LOG_FILES_IN_PAGE_ZERO) {
if (logPartPtr.p->srRemainingFiles > cmaxLogFilesInPageZero) {
Uint32 fileNo;
if (logFilePtr.p->fileNo >= ZMAX_LOG_FILES_IN_PAGE_ZERO) {
if (logFilePtr.p->fileNo >= cmaxLogFilesInPageZero) {
jam();
fileNo = logFilePtr.p->fileNo - ZMAX_LOG_FILES_IN_PAGE_ZERO;
fileNo = logFilePtr.p->fileNo - cmaxLogFilesInPageZero;
} else {
jam();
fileNo =
(logPartPtr.p->noLogFiles + logFilePtr.p->fileNo) -
ZMAX_LOG_FILES_IN_PAGE_ZERO;
cmaxLogFilesInPageZero;
}//if
if (fileNo == 0) {
jam();
......@@ -13788,11 +13834,11 @@ void Dblqh::readSrNextFileLab(Signal* signal)
* -------------------------------------------------------------------- */
fileNo = 1;
logPartPtr.p->srRemainingFiles =
logPartPtr.p->srRemainingFiles - (ZMAX_LOG_FILES_IN_PAGE_ZERO - 1);
logPartPtr.p->srRemainingFiles - (cmaxLogFilesInPageZero - 1);
} else {
jam();
logPartPtr.p->srRemainingFiles =
logPartPtr.p->srRemainingFiles - ZMAX_LOG_FILES_IN_PAGE_ZERO;
logPartPtr.p->srRemainingFiles - cmaxLogFilesInPageZero;
}//if
LogFileRecordPtr locLogFilePtr;
findLogfile(signal, fileNo, logPartPtr, &locLogFilePtr);
......@@ -14663,7 +14709,7 @@ void Dblqh::srLogLimits(Signal* signal)
* EXECUTED.
* ----------------------------------------------------------------------- */
while(true) {
ndbrequire(tmbyte < 16);
ndbrequire(tmbyte < clogFileSize);
if (logPartPtr.p->logExecState == LogPartRecord::LES_SEARCH_STOP) {
if (logFilePtr.p->logMaxGciCompleted[tmbyte] < logPartPtr.p->logLastGci) {
jam();
......@@ -14704,7 +14750,7 @@ void Dblqh::srLogLimits(Signal* signal)
if (logPartPtr.p->logExecState != LogPartRecord::LES_EXEC_LOG) {
if (tmbyte == 0) {
jam();
tmbyte = ZNO_MBYTES_IN_FILE - 1;
tmbyte = clogFileSize - 1;
logFilePtr.i = logFilePtr.p->prevLogFile;
ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
} else {
......@@ -15098,7 +15144,7 @@ void Dblqh::execSr(Signal* signal)
logPagePtr.p->logPageWord[ZPAGE_HEADER_SIZE + ZPOS_NO_FD];
logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] =
(ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(noFdDescriptors * ZFD_PART_SIZE);
(noFdDescriptors * ZFD_MBYTE_SIZE * clogFileSize);
}
break;
/* ========================================================================= */
......@@ -15138,11 +15184,11 @@ void Dblqh::execSr(Signal* signal)
/*---------------------------------------------------------------------------*/
/* START EXECUTION OF A NEW MBYTE IN THE LOG. */
/*---------------------------------------------------------------------------*/
if (logFilePtr.p->currentMbyte < (ZNO_MBYTES_IN_FILE - 1)) {
if (logFilePtr.p->currentMbyte < (clogFileSize - 1)) {
jam();
logPartPtr.p->logExecState = LogPartRecord::LES_EXEC_LOG_NEW_MBYTE;
} else {
ndbrequire(logFilePtr.p->currentMbyte == (ZNO_MBYTES_IN_FILE - 1));
ndbrequire(logFilePtr.p->currentMbyte == (clogFileSize - 1));
jam();
/*---------------------------------------------------------------------------*/
/* WE HAVE TO CHANGE FILE. CLOSE THIS ONE AND THEN OPEN THE NEXT. */
......@@ -15339,7 +15385,7 @@ void Dblqh::invalidateLogAfterLastGCI(Signal* signal) {
jam();
releaseLfo(signal);
releaseLogpage(signal);
if (logPartPtr.p->invalidatePageNo < (ZNO_MBYTES_IN_FILE * ZPAGES_IN_MBYTE - 1)) {
if (logPartPtr.p->invalidatePageNo < (clogFileSize * ZPAGES_IN_MBYTE - 1)) {
// We continue in this file.
logPartPtr.p->invalidatePageNo++;
} else {
......@@ -16680,6 +16726,22 @@ void Dblqh::initialiseLogFile(Signal* signal)
ptrAss(logFilePtr, logFileRecord);
logFilePtr.p->nextLogFile = logFilePtr.i + 1;
logFilePtr.p->logFileStatus = LogFileRecord::LFS_IDLE;
logFilePtr.p->logLastPrepRef = new Uint32[clogFileSize];
logFilePtr.p->logMaxGciCompleted = new Uint32[clogFileSize];
logFilePtr.p->logMaxGciStarted = new Uint32[clogFileSize];
if (logFilePtr.p->logLastPrepRef == 0 ||
logFilePtr.p->logMaxGciCompleted == 0 ||
logFilePtr.p->logMaxGciStarted == 0)
{
char buf[256];
BaseString::snprintf(buf, sizeof(buf),
"Failed to alloc mbyte(%u) arrays for logfile %u",
clogFileSize, logFilePtr.i);
progError(__LINE__, NDBD_EXIT_MEMALLOC, buf);
}
}//for
logFilePtr.i = clogFileFileSize - 1;
ptrAss(logFilePtr, logFileRecord);
......@@ -17008,41 +17070,31 @@ void Dblqh::initFragrec(Signal* signal,
* ========================================================================= */
void Dblqh::initGciInLogFileRec(Signal* signal, Uint32 noFdDescriptors)
{
LogFileRecordPtr iglLogFilePtr;
UintR tiglLoop;
UintR tiglIndex;
tiglLoop = 0;
iglLogFilePtr.i = logFilePtr.i;
iglLogFilePtr.p = logFilePtr.p;
IGL_LOOP:
for (tiglIndex = 0; tiglIndex <= ZNO_MBYTES_IN_FILE - 1; tiglIndex++) {
arrGuard(((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(tiglLoop * ZFD_PART_SIZE)) + tiglIndex, ZPAGE_SIZE);
iglLogFilePtr.p->logMaxGciCompleted[tiglIndex] =
logPagePtr.p->logPageWord[((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(tiglLoop * ZFD_PART_SIZE)) + tiglIndex];
arrGuard((((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) + ZNO_MBYTES_IN_FILE) +
(tiglLoop * ZFD_PART_SIZE)) + tiglIndex, ZPAGE_SIZE);
iglLogFilePtr.p->logMaxGciStarted[tiglIndex] =
logPagePtr.p->logPageWord[(((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
ZNO_MBYTES_IN_FILE) +
(tiglLoop * ZFD_PART_SIZE)) + tiglIndex];
arrGuard((((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(2 * ZNO_MBYTES_IN_FILE)) + (tiglLoop * ZFD_PART_SIZE)) +
tiglIndex, ZPAGE_SIZE);
iglLogFilePtr.p->logLastPrepRef[tiglIndex] =
logPagePtr.p->logPageWord[(((ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE) +
(2 * ZNO_MBYTES_IN_FILE)) +
(tiglLoop * ZFD_PART_SIZE)) + tiglIndex];
}//for
tiglLoop = tiglLoop + 1;
if (tiglLoop < noFdDescriptors) {
LogFileRecordPtr filePtr = logFilePtr;
Uint32 pos = ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE;
for (Uint32 fd = 0; fd < noFdDescriptors; fd++)
{
jam();
iglLogFilePtr.i = iglLogFilePtr.p->prevLogFile;
ptrCheckGuard(iglLogFilePtr, clogFileFileSize, logFileRecord);
goto IGL_LOOP;
}//if
for (Uint32 mb = 0; mb < clogFileSize; mb++)
{
jam();
Uint32 pos0 = pos + fd * (ZFD_MBYTE_SIZE * clogFileSize) + mb;
Uint32 pos1 = pos0 + clogFileSize;
Uint32 pos2 = pos1 + clogFileSize;
arrGuard(pos0, ZPAGE_SIZE);
arrGuard(pos1, ZPAGE_SIZE);
arrGuard(pos2, ZPAGE_SIZE);
filePtr.p->logMaxGciCompleted[mb] = logPagePtr.p->logPageWord[pos0];
filePtr.p->logMaxGciStarted[mb] = logPagePtr.p->logPageWord[pos1];
filePtr.p->logLastPrepRef[mb] = logPagePtr.p->logPageWord[pos2];
}
if (fd + 1 < noFdDescriptors)
{
jam();
filePtr.i = filePtr.p->prevLogFile;
ptrCheckGuard(filePtr, clogFileFileSize, logFileRecord);
}
}
}//Dblqh::initGciInLogFileRec()
/* ==========================================================================
......@@ -18295,7 +18347,7 @@ void Dblqh::writeNextLog(Signal* signal)
ndbrequire(logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] < ZPAGE_SIZE);
logPagePtr.p->logPageWord[logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX]] =
ZNEXT_MBYTE_TYPE;
if (logFilePtr.p->currentMbyte == (ZNO_MBYTES_IN_FILE - 1)) {
if (logFilePtr.p->currentMbyte == (clogFileSize - 1)) {
jam();
/* -------------------------------------------------- */
/* CALCULATE THE NEW REMAINING WORDS WHEN */
......@@ -18384,7 +18436,7 @@ void Dblqh::writeNextLog(Signal* signal)
systemError(signal, __LINE__);
}//if
}//if
if (logFilePtr.p->currentMbyte == (ZNO_MBYTES_IN_FILE - 1)) {
if (logFilePtr.p->currentMbyte == (clogFileSize - 1)) {
jam();
twnlNextMbyte = 0;
if (logFilePtr.p->fileChangeState != LogFileRecord::NOT_ONGOING) {
......
......@@ -871,6 +871,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"3",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_DB_REDOLOG_FILE_SIZE,
"FragmentLogFileSize",
DB_TOKEN,
"Size of each Redo log file",
ConfigInfo::CI_USED,
false,
ConfigInfo::CI_INT,
"16M",
"4M",
"1G" },
{
CFG_DB_MAX_OPEN_FILES,
"MaxNoOfOpenFiles",
......
......@@ -179,7 +179,7 @@ ErrorBundle ErrorCodes[] = {
{ 873, DMEC, TR, "Out of attrinfo records for scan in tuple manager" },
{ 899, DMEC, TR, "Rowid already allocated" },
{ 1217, DMEC, TR, "Out of operation records in local data manager (increase MaxNoOfLocalOperations)" },
{ 1220, DMEC, TR, "REDO log files overloaded, consult online manual (decrease TimeBetweenLocalCheckpoints, and|or increase NoOfFragmentLogFiles)" },
{ 1220, DMEC, TR, "REDO log files overloaded, consult online manual (increase FragmentLogFileSize)" },
{ 1222, DMEC, TR, "Out of transaction markers in LQH" },
{ 4021, DMEC, TR, "Out of Send Buffer space in NDB API" },
{ 4022, DMEC, TR, "Out of Send Buffer space in NDB API" },
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment