Commit 35c17763 authored by unknown's avatar unknown

improved the ndb redo log reader

parent 88dedca2
......@@ -239,6 +239,17 @@ bool PageHeader::check() {
return true;
}
bool PageHeader::lastPage()
{
return m_next_page == 0xffffff00;
}
Uint32 PageHeader::lastWord()
{
return m_current_page_index;
}
NdbOut& operator<<(NdbOut& no, const PageHeader& ph) {
no << "------------PAGE HEADER------------------------" << endl << endl;
ndbout_c("%-30s%-12s%-12s\n", "", "Decimal", "Hex");
......
......@@ -132,6 +132,8 @@ class PageHeader {
public:
bool check();
Uint32 getLogRecordSize();
bool lastPage();
Uint32 lastWord();
protected:
Uint32 m_checksum;
Uint32 m_lap;
......
......@@ -35,7 +35,6 @@
#define FROM_BEGINNING 0
void usage(const char * prg);
Uint32 readRecordOverPageBoundary (Uint32 *, Uint32 , Uint32 , Uint32);
Uint32 readFromFile(FILE * f, Uint32 *toPtr, Uint32 sizeInWords);
void readArguments(int argc, const char** argv);
void doExit();
......@@ -54,8 +53,8 @@ Uint32 startAtPageIndex = 0;
Uint32 *redoLogPage;
NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read a redo log file", 16384) {
Uint32 pageIndex = 0;
Uint32 oldPageIndex = 0;
int wordIndex = 0;
int oldWordIndex = 0;
Uint32 recordType = 1234567890;
PageHeader *thePageHeader;
......@@ -83,25 +82,23 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
}
redoLogPage = new Uint32[PAGESIZE*NO_PAGES_IN_MBYTE];
Uint32 words_from_previous_page = 0;
// Loop for every mbyte.
for (Uint32 j = startAtMbyte; j < NO_MBYTE_IN_FILE; j++) {
bool lastPage = false;
for (Uint32 j = startAtMbyte; j < NO_MBYTE_IN_FILE && !lastPage; j++) {
readFromFile(f, redoLogPage, PAGESIZE*NO_PAGES_IN_MBYTE);
if (firstLap) {
pageIndex = startAtPageIndex;
firstLap = false;
} else
pageIndex = 0;
words_from_previous_page = 0;
// Loop for every page.
for (int i = startAtPage; i < NO_PAGES_IN_MBYTE; i++) {
if (pageIndex == 0) {
for (int i = 0; i < NO_PAGES_IN_MBYTE; i++) {
wordIndex = 0;
thePageHeader = (PageHeader *) &redoLogPage[i*PAGESIZE];
// Print out mbyte number, page number and page index.
ndbout << j << ":" << i << ":" << pageIndex << endl
<< " " << j*32 + i << ":" << pageIndex << " ";
ndbout << j << ":" << i << ":" << wordIndex << endl
<< " " << j*32 + i << ":" << wordIndex << " ";
if (thePrintFlag) ndbout << (*thePageHeader);
if (theCheckFlag) {
if(!thePageHeader->check()) {
......@@ -122,8 +119,9 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
ndbout << "expected checksum: " << checkSum << endl;
}
pageIndex += thePageHeader->getLogRecordSize();
}
lastPage = i != 0 && thePageHeader->lastPage();
Uint32 lastWord = thePageHeader->lastWord();
if (onlyMbyteHeaders) {
// Show only the first page header in every mbyte of the file.
......@@ -132,18 +130,40 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
if (onlyPageHeaders) {
// Show only page headers. Continue with the next page in this for loop.
pageIndex = 0;
continue;
}
wordIndex = thePageHeader->getLogRecordSize() - words_from_previous_page;
Uint32 *redoLogPagePos = redoLogPage + i*PAGESIZE;
if (words_from_previous_page)
{
memmove(redoLogPagePos + wordIndex ,
redoLogPagePos - words_from_previous_page,
words_from_previous_page*4);
}
do {
// Print out mbyte number, page number and page index.
ndbout << j << ":" << i << ":" << pageIndex << endl
<< " " << j*32 + i << ":" << pageIndex << " ";
recordType = redoLogPage[i*PAGESIZE + pageIndex];
if (words_from_previous_page)
{
// Print out mbyte number, page number and word index.
ndbout << j << ":" << i-1 << ":" << PAGESIZE-words_from_previous_page << endl
<< j << ":" << i << ":" << wordIndex+words_from_previous_page << endl
<< " " << j*32 + i-1 << ":" << PAGESIZE-words_from_previous_page << " ";
words_from_previous_page = 0;
}
else
{
// Print out mbyte number, page number and word index.
ndbout << j << ":" << i << ":" << wordIndex << endl
<< " " << j*32 + i << ":" << wordIndex << " ";
}
redoLogPagePos = redoLogPage + i*PAGESIZE + wordIndex;
oldWordIndex = wordIndex;
recordType = *redoLogPagePos;
switch(recordType) {
case ZFD_TYPE:
fdRecord = (FileDescriptor *) &redoLogPage[i*PAGESIZE + pageIndex];
fdRecord = (FileDescriptor *) redoLogPagePos;
if (thePrintFlag) ndbout << (*fdRecord);
if (theCheckFlag) {
if(!fdRecord->check()) {
......@@ -155,13 +175,13 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
delete [] redoLogPage;
exit(RETURN_OK);
}
pageIndex += fdRecord->getLogRecordSize();
wordIndex += fdRecord->getLogRecordSize();
break;
case ZNEXT_LOG_RECORD_TYPE:
nlRecord = (NextLogRecord *) (&redoLogPage[i*PAGESIZE] + pageIndex);
pageIndex += nlRecord->getLogRecordSize(pageIndex);
if (pageIndex <= PAGESIZE) {
nlRecord = (NextLogRecord *) redoLogPagePos;
wordIndex += nlRecord->getLogRecordSize(wordIndex);
if (wordIndex <= PAGESIZE) {
if (thePrintFlag) ndbout << (*nlRecord);
if (theCheckFlag) {
if(!nlRecord->check()) {
......@@ -173,9 +193,9 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
break;
case ZCOMPLETED_GCI_TYPE:
cGCIrecord = (CompletedGCIRecord *) &redoLogPage[i*PAGESIZE + pageIndex];
pageIndex += cGCIrecord->getLogRecordSize();
if (pageIndex <= PAGESIZE) {
cGCIrecord = (CompletedGCIRecord *) redoLogPagePos;
wordIndex += cGCIrecord->getLogRecordSize();
if (wordIndex <= PAGESIZE) {
if (thePrintFlag) ndbout << (*cGCIrecord);
if (theCheckFlag) {
if(!cGCIrecord->check()) {
......@@ -187,9 +207,9 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
break;
case ZPREP_OP_TYPE:
poRecord = (PrepareOperationRecord *) &redoLogPage[i*PAGESIZE + pageIndex];
pageIndex += poRecord->getLogRecordSize();
if (pageIndex <= PAGESIZE) {
poRecord = (PrepareOperationRecord *) redoLogPagePos;
wordIndex += poRecord->getLogRecordSize();
if (wordIndex <= PAGESIZE) {
if (thePrintFlag) ndbout << (*poRecord);
if (theCheckFlag) {
if(!poRecord->check()) {
......@@ -198,15 +218,12 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
}
}
}
else {
oldPageIndex = pageIndex - poRecord->getLogRecordSize();
}
break;
case ZCOMMIT_TYPE:
ctRecord = (CommitTransactionRecord *) &redoLogPage[i*PAGESIZE + pageIndex];
pageIndex += ctRecord->getLogRecordSize();
if (pageIndex <= PAGESIZE) {
ctRecord = (CommitTransactionRecord *) redoLogPagePos;
wordIndex += ctRecord->getLogRecordSize();
if (wordIndex <= PAGESIZE) {
if (thePrintFlag) ndbout << (*ctRecord);
if (theCheckFlag) {
if(!ctRecord->check()) {
......@@ -215,15 +232,12 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
}
}
}
else {
oldPageIndex = pageIndex - ctRecord->getLogRecordSize();
}
break;
case ZINVALID_COMMIT_TYPE:
ictRecord = (InvalidCommitTransactionRecord *) &redoLogPage[i*PAGESIZE + pageIndex];
pageIndex += ictRecord->getLogRecordSize();
if (pageIndex <= PAGESIZE) {
ictRecord = (InvalidCommitTransactionRecord *) redoLogPagePos;
wordIndex += ictRecord->getLogRecordSize();
if (wordIndex <= PAGESIZE) {
if (thePrintFlag) ndbout << (*ictRecord);
if (theCheckFlag) {
if(!ictRecord->check()) {
......@@ -232,21 +246,18 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
}
}
}
else {
oldPageIndex = pageIndex - ictRecord->getLogRecordSize();
}
break;
case ZNEXT_MBYTE_TYPE:
nmRecord = (NextMbyteRecord *) &redoLogPage[i*PAGESIZE + pageIndex];
nmRecord = (NextMbyteRecord *) redoLogPagePos;
if (thePrintFlag) ndbout << (*nmRecord);
i = NO_PAGES_IN_MBYTE;
break;
case ZABORT_TYPE:
atRecord = (AbortTransactionRecord *) &redoLogPage[i*PAGESIZE + pageIndex];
pageIndex += atRecord->getLogRecordSize();
if (pageIndex <= PAGESIZE) {
atRecord = (AbortTransactionRecord *) redoLogPagePos;
wordIndex += atRecord->getLogRecordSize();
if (wordIndex <= PAGESIZE) {
if (thePrintFlag) ndbout << (*atRecord);
if (theCheckFlag) {
if(!atRecord->check()) {
......@@ -266,7 +277,7 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
ndbout << " ------ERROR: UNKNOWN RECORD TYPE------" << endl;
// Print out remaining data in this page
for (int j = pageIndex; j < PAGESIZE; j++){
for (int j = wordIndex; j < PAGESIZE; j++){
Uint32 unknown = redoLogPage[i*PAGESIZE + j];
ndbout_c("%-30d%-12u%-12x", j, unknown, unknown);
......@@ -274,14 +285,18 @@ NDB_COMMAND(redoLogFileReader, "redoLogFileReader", "redoLogFileReader", "Read
doExit();
}
} while(pageIndex < PAGESIZE && i < NO_PAGES_IN_MBYTE);
} while(wordIndex < lastWord && i < NO_PAGES_IN_MBYTE);
if (lastPage)
break;
if (pageIndex > PAGESIZE) {
// The last record overlapped page boundary. Must redo that record.
pageIndex = readRecordOverPageBoundary(&redoLogPage[i*PAGESIZE],
pageIndex, oldPageIndex, recordType);
if (wordIndex > PAGESIZE) {
words_from_previous_page = PAGESIZE - oldWordIndex;
ndbout << " ----------- Record continues on next page -----------" << endl;
} else {
pageIndex = 0;
wordIndex = 0;
words_from_previous_page = 0;
}
ndbout << endl;
}//for
......@@ -310,93 +325,6 @@ Uint32 readFromFile(FILE * f, Uint32 *toPtr, Uint32 sizeInWords) {
}
//----------------------------------------------------------------
//
//----------------------------------------------------------------
Uint32 readRecordOverPageBoundary(Uint32 *pagePtr, Uint32 pageIndex, Uint32 oldPageIndex, Uint32 recordType) {
Uint32 pageHeader[PAGEHEADERSIZE];
Uint32 tmpPages[PAGESIZE*10];
PageHeader *thePageHeader;
Uint32 recordSize = 0;
PrepareOperationRecord *poRecord;
CommitTransactionRecord *ctRecord;
InvalidCommitTransactionRecord *ictRecord;
memcpy(pageHeader, pagePtr + PAGESIZE, PAGEHEADERSIZE*sizeof(Uint32));
memcpy(tmpPages, pagePtr + oldPageIndex, (PAGESIZE - oldPageIndex)*sizeof(Uint32));
memcpy(tmpPages + PAGESIZE - oldPageIndex ,
(pagePtr + PAGESIZE + PAGEHEADERSIZE),
(PAGESIZE - PAGEHEADERSIZE)*sizeof(Uint32));
switch(recordType) {
case ZPREP_OP_TYPE:
poRecord = (PrepareOperationRecord *) tmpPages;
recordSize = poRecord->getLogRecordSize();
if (recordSize < (PAGESIZE - PAGEHEADERSIZE)) {
if (theCheckFlag) {
if(!poRecord->check()) {
ndbout << "Error in poRecord->check() (readRecordOverPageBoundary)" << endl;
doExit();
}
}
if (thePrintFlag) ndbout << (*poRecord);
} else {
ndbout << "Error: Record greater than a Page" << endl;
}
break;
case ZCOMMIT_TYPE:
ctRecord = (CommitTransactionRecord *) tmpPages;
recordSize = ctRecord->getLogRecordSize();
if (recordSize < (PAGESIZE - PAGEHEADERSIZE)) {
if (theCheckFlag) {
if(!ctRecord->check()) {
ndbout << "Error in ctRecord->check() (readRecordOverPageBoundary)" << endl;
doExit();
}
}
if (thePrintFlag) ndbout << (*ctRecord);
} else {
ndbout << endl << "Error: Record greater than a Page" << endl;
}
break;
case ZINVALID_COMMIT_TYPE:
ictRecord = (InvalidCommitTransactionRecord *) tmpPages;
recordSize = ictRecord->getLogRecordSize();
if (recordSize < (PAGESIZE - PAGEHEADERSIZE)) {
if (theCheckFlag) {
if(!ictRecord->check()) {
ndbout << "Error in ictRecord->check() (readRecordOverPageBoundary)" << endl;
doExit();
}
}
if (thePrintFlag) ndbout << (*ictRecord);
} else {
ndbout << endl << "Error: Record greater than a Page" << endl;
}
break;
case ZNEW_PREP_OP_TYPE:
case ZABORT_TYPE:
case ZFRAG_SPLIT_TYPE:
case ZNEXT_MBYTE_TYPE:
ndbout << endl << "Record type = " << recordType << " not implemented." << endl;
return 0;
default:
ndbout << endl << "Error: Unknown record type. Record type = " << recordType << endl;
return 0;
}
thePageHeader = (PageHeader *) (pagePtr + PAGESIZE);
if (thePrintFlag) ndbout << (*thePageHeader);
return PAGEHEADERSIZE - PAGESIZE + oldPageIndex + recordSize;
}
//----------------------------------------------------------------
//
//----------------------------------------------------------------
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment