Commit 69485c7b authored by mronstrom@mysql.com's avatar mronstrom@mysql.com

Merge mronstrom@bk-internal.mysql.com:/home/bk/mysql-4.1

into mysql.com:/Users/mikron/mysql-4.1
parents f0cd2093 2973728a
...@@ -218,6 +218,7 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: " ...@@ -218,6 +218,7 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: "
#define ZREL_FRAG 6 #define ZREL_FRAG 6
#define ZREL_DIR 7 #define ZREL_DIR 7
#define ZREPORT_MEMORY_USAGE 8 #define ZREPORT_MEMORY_USAGE 8
#define ZLCP_OP_WRITE_RT_BREAK 9
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/* ERROR CODES */ /* ERROR CODES */
...@@ -1190,6 +1191,7 @@ private: ...@@ -1190,6 +1191,7 @@ private:
void zpagesize_error(const char* where); void zpagesize_error(const char* where);
void reportMemoryUsage(Signal* signal, int gth); void reportMemoryUsage(Signal* signal, int gth);
void lcp_write_op_to_undolog(Signal* signal);
// Initialisation // Initialisation
......
...@@ -46,13 +46,17 @@ Dbacc::remainingUndoPages(){ ...@@ -46,13 +46,17 @@ Dbacc::remainingUndoPages(){
ndbrequire(HeadPage>=TailPage); ndbrequire(HeadPage>=TailPage);
Uint32 UsedPages = HeadPage - TailPage; Uint32 UsedPages = HeadPage - TailPage;
Uint32 Remaining = cundopagesize - UsedPages; Int32 Remaining = cundopagesize - UsedPages;
// There can not be more than cundopagesize remaining // There can not be more than cundopagesize remaining
ndbrequire(Remaining<=cundopagesize); if (Remaining <= 0){
// No more undolog, crash node
progError(__LINE__,
ERR_NO_MORE_UNDOLOG,
"There are more than 1Mbyte undolog writes outstanding");
}
return Remaining; return Remaining;
}//Dbacc::remainingUndoPages() }
void void
Dbacc::updateLastUndoPageIdWritten(Signal* signal, Uint32 aNewValue){ Dbacc::updateLastUndoPageIdWritten(Signal* signal, Uint32 aNewValue){
...@@ -193,6 +197,17 @@ void Dbacc::execCONTINUEB(Signal* signal) ...@@ -193,6 +197,17 @@ void Dbacc::execCONTINUEB(Signal* signal)
return; return;
} }
case ZLCP_OP_WRITE_RT_BREAK:
{
operationRecPtr.i= signal->theData[1];
fragrecptr.i= signal->theData[2];
lcpConnectptr.i= signal->theData[3];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
lcp_write_op_to_undolog(signal);
return;
}
default: default:
ndbrequire(false); ndbrequire(false);
break; break;
...@@ -7697,7 +7712,26 @@ void Dbacc::execACC_LCPREQ(Signal* signal) ...@@ -7697,7 +7712,26 @@ void Dbacc::execACC_LCPREQ(Signal* signal)
fragrecptr.p->lcpMaxOverDirIndex = fragrecptr.p->lastOverIndex; fragrecptr.p->lcpMaxOverDirIndex = fragrecptr.p->lastOverIndex;
fragrecptr.p->createLcp = ZTRUE; fragrecptr.p->createLcp = ZTRUE;
operationRecPtr.i = fragrecptr.p->lockOwnersList; operationRecPtr.i = fragrecptr.p->lockOwnersList;
while (operationRecPtr.i != RNIL) { lcp_write_op_to_undolog(signal);
}
void
Dbacc::lcp_write_op_to_undolog(Signal* signal)
{
bool delay_continueb= false;
Uint32 i, j;
for (i= 0; i < 16; i++) {
jam();
if (remainingUndoPages() <= ZMIN_UNDO_PAGES_AT_COMMIT) {
jam();
delay_continueb= true;
break;
}
for (j= 0; j < 32; j++) {
if (operationRecPtr.i == RNIL) {
jam();
break;
}
jam(); jam();
ptrCheckGuard(operationRecPtr, coprecsize, operationrec); ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
...@@ -7719,10 +7753,29 @@ void Dbacc::execACC_LCPREQ(Signal* signal) ...@@ -7719,10 +7753,29 @@ void Dbacc::execACC_LCPREQ(Signal* signal)
writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */ writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */
checkUndoPages(signal); /* SEND UNDO PAGE TO DISK WHEN A GROUP OF */ checkUndoPages(signal); /* SEND UNDO PAGE TO DISK WHEN A GROUP OF */
/* UNDO PAGES,CURRENTLY 8, IS FILLED */ /* UNDO PAGES,CURRENTLY 8, IS FILLED */
}//if }
operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp; operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp;
}//while }
if (operationRecPtr.i == RNIL) {
jam();
break;
}
}
if (operationRecPtr.i != RNIL) {
jam();
signal->theData[0]= ZLCP_OP_WRITE_RT_BREAK;
signal->theData[1]= operationRecPtr.i;
signal->theData[2]= fragrecptr.i;
signal->theData[3]= lcpConnectptr.i;
if (delay_continueb) {
jam();
sendSignalWithDelay(cownBlockref, GSN_CONTINUEB, signal, 10, 4);
} else {
jam();
sendSignal(cownBlockref, GSN_CONTINUEB, signal, 4, JBB);
}
return;
}
signal->theData[0] = fragrecptr.p->lcpLqhPtr; signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPSTARTED, sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPSTARTED,
...@@ -7735,8 +7788,7 @@ void Dbacc::execACC_LCPREQ(Signal* signal) ...@@ -7735,8 +7788,7 @@ void Dbacc::execACC_LCPREQ(Signal* signal)
signal->theData[0] = lcpConnectptr.i; signal->theData[0] = lcpConnectptr.i;
signal->theData[1] = fragrecptr.i; signal->theData[1] = fragrecptr.i;
sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB); sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
return; }
}//Dbacc::execACC_LCPREQ()
/* ******************--------------------------------------------------------------- */ /* ******************--------------------------------------------------------------- */
/* ACC_SAVE_PAGES A GROUP OF PAGES IS ALLOCATED. THE PAGES AND OVERFLOW */ /* ACC_SAVE_PAGES A GROUP OF PAGES IS ALLOCATED. THE PAGES AND OVERFLOW */
...@@ -8595,12 +8647,6 @@ void Dbacc::checkUndoPages(Signal* signal) ...@@ -8595,12 +8647,6 @@ void Dbacc::checkUndoPages(Signal* signal)
* RECORDS IN * RECORDS IN
*/ */
Uint16 nextUndoPageId = tundoPageId + 1; Uint16 nextUndoPageId = tundoPageId + 1;
if (nextUndoPageId > (clastUndoPageIdWritten + cundopagesize)){
// No more undolog, crash node
progError(__LINE__,
ERR_NO_MORE_UNDOLOG,
"There are more than 1Mbyte undolog writes outstanding");
}
updateUndoPositionPage(signal, nextUndoPageId << ZUNDOPAGEINDEXBITS); updateUndoPositionPage(signal, nextUndoPageId << ZUNDOPAGEINDEXBITS);
if ((tundoPageId & (ZWRITE_UNDOPAGESIZE - 1)) == (ZWRITE_UNDOPAGESIZE - 1)) { if ((tundoPageId & (ZWRITE_UNDOPAGESIZE - 1)) == (ZWRITE_UNDOPAGESIZE - 1)) {
......
...@@ -589,13 +589,14 @@ Ndb::releaseSignal(NdbApiSignal* aSignal) ...@@ -589,13 +589,14 @@ Ndb::releaseSignal(NdbApiSignal* aSignal)
#if defined VM_TRACE #if defined VM_TRACE
// Check that signal is not null // Check that signal is not null
assert(aSignal != NULL); assert(aSignal != NULL);
#if 0
// Check that signal is not already in list // Check that signal is not already in list
NdbApiSignal* tmp = theSignalIdleList; NdbApiSignal* tmp = theSignalIdleList;
while (tmp != NULL){ while (tmp != NULL){
assert(tmp != aSignal); assert(tmp != aSignal);
tmp = tmp->next(); tmp = tmp->next();
} }
#endif
#endif #endif
creleaseSignals++; creleaseSignals++;
aSignal->next(theSignalIdleList); aSignal->next(theSignalIdleList);
......
...@@ -34,7 +34,8 @@ public: ...@@ -34,7 +34,8 @@ public:
int records, int records,
int batch = 512, int batch = 512,
bool allowConstraintViolation = true, bool allowConstraintViolation = true,
int doSleep = 0); int doSleep = 0,
bool oneTrans = false);
int scanReadRecords(Ndb*, int scanReadRecords(Ndb*,
int records, int records,
int abort = 0, int abort = 0,
......
...@@ -29,9 +29,18 @@ ...@@ -29,9 +29,18 @@
* delete should be visible to same transaction * delete should be visible to same transaction
* *
*/ */
int runLoadTable2(NDBT_Context* ctx, NDBT_Step* step)
{
int records = ctx->getNumRecords();
HugoTransactions hugoTrans(*ctx->getTab());
if (hugoTrans.loadTable(GETNDB(step), records, 512, false, 0, true) != 0){
return NDBT_FAILED;
}
return NDBT_OK;
}
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){ int runLoadTable(NDBT_Context* ctx, NDBT_Step* step)
{
int records = ctx->getNumRecords(); int records = ctx->getNumRecords();
HugoTransactions hugoTrans(*ctx->getTab()); HugoTransactions hugoTrans(*ctx->getTab());
if (hugoTrans.loadTable(GETNDB(step), records) != 0){ if (hugoTrans.loadTable(GETNDB(step), records) != 0){
...@@ -1255,6 +1264,11 @@ TESTCASE("MassiveRollback2", ...@@ -1255,6 +1264,11 @@ TESTCASE("MassiveRollback2",
INITIALIZER(runMassiveRollback2); INITIALIZER(runMassiveRollback2);
FINALIZER(runClearTable2); FINALIZER(runClearTable2);
} }
TESTCASE("MassiveTransaction",
"Test very large insert transaction"){
INITIALIZER(runLoadTable2);
FINALIZER(runClearTable2);
}
NDBT_TESTSUITE_END(testBasic); NDBT_TESTSUITE_END(testBasic);
int main(int argc, const char** argv){ int main(int argc, const char** argv){
......
...@@ -693,12 +693,14 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -693,12 +693,14 @@ HugoTransactions::loadTable(Ndb* pNdb,
int records, int records,
int batch, int batch,
bool allowConstraintViolation, bool allowConstraintViolation,
int doSleep){ int doSleep,
bool oneTrans){
int check; int check;
int retryAttempt = 0; int retryAttempt = 0;
int retryMax = 5; int retryMax = 5;
NdbConnection *pTrans; NdbConnection *pTrans;
NdbOperation *pOp; NdbOperation *pOp;
bool first_batch = true;
const int org = batch; const int org = batch;
const int cols = tab.getNoOfColumns(); const int cols = tab.getNoOfColumns();
...@@ -715,7 +717,7 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -715,7 +717,7 @@ HugoTransactions::loadTable(Ndb* pNdb,
g_info << "|- Inserting records..." << endl; g_info << "|- Inserting records..." << endl;
for (int c=0 ; c<records ; ){ for (int c=0 ; c<records ; ){
bool closeTrans;
if (retryAttempt >= retryMax){ if (retryAttempt >= retryMax){
g_info << "Record " << c << " could not be inserted, has retried " g_info << "Record " << c << " could not be inserted, has retried "
<< retryAttempt << " times " << endl; << retryAttempt << " times " << endl;
...@@ -726,6 +728,8 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -726,6 +728,8 @@ HugoTransactions::loadTable(Ndb* pNdb,
if (doSleep > 0) if (doSleep > 0)
NdbSleep_MilliSleep(doSleep); NdbSleep_MilliSleep(doSleep);
if (first_batch || !oneTrans) {
first_batch = false;
pTrans = pNdb->startTransaction(); pTrans = pNdb->startTransaction();
if (pTrans == NULL) { if (pTrans == NULL) {
...@@ -740,6 +744,7 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -740,6 +744,7 @@ HugoTransactions::loadTable(Ndb* pNdb,
ERR(err); ERR(err);
return NDBT_FAILED; return NDBT_FAILED;
} }
}
for(int b = 0; b < batch && c+b<records; b++){ for(int b = 0; b < batch && c+b<records; b++){
...@@ -768,7 +773,13 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -768,7 +773,13 @@ HugoTransactions::loadTable(Ndb* pNdb,
} }
// Execute the transaction and insert the record // Execute the transaction and insert the record
if (!oneTrans || (c + batch) >= records) {
closeTrans = true;
check = pTrans->execute( Commit ); check = pTrans->execute( Commit );
} else {
closeTrans = false;
check = pTrans->execute( NoCommit );
}
if(check == -1 ) { if(check == -1 ) {
const NdbError err = pTrans->getNdbError(); const NdbError err = pTrans->getNdbError();
pNdb->closeTransaction(pTrans); pNdb->closeTransaction(pTrans);
...@@ -812,8 +823,10 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -812,8 +823,10 @@ HugoTransactions::loadTable(Ndb* pNdb,
} }
} }
else{ else{
if (closeTrans) {
pNdb->closeTransaction(pTrans); pNdb->closeTransaction(pTrans);
} }
}
// Step to next record // Step to next record
c = c+batch; c = c+batch;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment