Commit a79f15e0 authored by tomas@whalegate.ndb.mysql.com's avatar tomas@whalegate.ndb.mysql.com

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new-ndb

into  whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb
parents db577155 e4ca4661
...@@ -6,7 +6,7 @@ Next DBTUP 4029 ...@@ -6,7 +6,7 @@ Next DBTUP 4029
Next DBLQH 5045 Next DBLQH 5045
Next DBDICT 6008 Next DBDICT 6008
Next DBDIH 7186 Next DBDIH 7186
Next DBTC 8053 Next DBTC 8054
Next CMVMI 9000 Next CMVMI 9000
Next BACKUP 10038 Next BACKUP 10038
Next DBUTIL 11002 Next DBUTIL 11002
...@@ -248,6 +248,8 @@ Delay execution of ABORTCONF signal 2 seconds to generate time-out. ...@@ -248,6 +248,8 @@ Delay execution of ABORTCONF signal 2 seconds to generate time-out.
8050: Send ZABORT_TIMEOUT_BREAK delayed 8050: Send ZABORT_TIMEOUT_BREAK delayed
8053: Crash in timeOutFoundLab, state CS_WAIT_COMMIT_CONF
ERROR CODES FOR TESTING TIME-OUT HANDLING IN DBTC ERROR CODES FOR TESTING TIME-OUT HANDLING IN DBTC
------------------------------------------------- -------------------------------------------------
......
...@@ -1124,6 +1124,38 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal) ...@@ -1124,6 +1124,38 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal)
} }
#endif #endif
#endif #endif
if (arg == 9999)
{
Uint32 delay = 1000;
switch(signal->getLength()){
case 1:
break;
case 2:
delay = signal->theData[1];
break;
default:{
Uint32 dmin = signal->theData[1];
Uint32 dmax = signal->theData[2];
delay = dmin + (rand() % (dmax - dmin));
break;
}
}
signal->theData[0] = 9999;
if (delay == 0)
{
execNDB_TAMPER(signal);
}
else if (delay < 10)
{
sendSignal(reference(), GSN_NDB_TAMPER, signal, 1, JBB);
}
else
{
sendSignalWithDelay(reference(), GSN_NDB_TAMPER, signal, delay, 1);
}
}
}//Cmvmi::execDUMP_STATE_ORD() }//Cmvmi::execDUMP_STATE_ORD()
void void
......
...@@ -6483,6 +6483,7 @@ void Dbtc::timeOutFoundLab(Signal* signal, Uint32 TapiConPtr, Uint32 errCode) ...@@ -6483,6 +6483,7 @@ void Dbtc::timeOutFoundLab(Signal* signal, Uint32 TapiConPtr, Uint32 errCode)
return; return;
case CS_WAIT_COMMIT_CONF: case CS_WAIT_COMMIT_CONF:
jam(); jam();
CRASH_INSERTION(8053);
tcConnectptr.i = apiConnectptr.p->currentTcConnect; tcConnectptr.i = apiConnectptr.p->currentTcConnect;
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord); ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
arrGuard(apiConnectptr.p->currentReplicaNo, MAX_REPLICAS); arrGuard(apiConnectptr.p->currentReplicaNo, MAX_REPLICAS);
......
...@@ -238,6 +238,13 @@ Pgman::execCONTINUEB(Signal* signal) ...@@ -238,6 +238,13 @@ Pgman::execCONTINUEB(Signal* signal)
} }
else else
{ {
if (ERROR_INSERTED(11007))
{
ndbout << "No more writes..." << endl;
SET_ERROR_INSERT_VALUE(11008);
signal->theData[0] = 9999;
sendSignalWithDelay(CMVMI_REF, GSN_NDB_TAMPER, signal, 10000, 1);
}
signal->theData[0] = m_end_lcp_req.senderData; signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB); sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
} }
...@@ -1301,6 +1308,13 @@ Pgman::process_lcp(Signal* signal) ...@@ -1301,6 +1308,13 @@ Pgman::process_lcp(Signal* signal)
} }
else else
{ {
if (ERROR_INSERTED(11007))
{
ndbout << "No more writes..." << endl;
signal->theData[0] = 9999;
sendSignalWithDelay(CMVMI_REF, GSN_NDB_TAMPER, signal, 10000, 1);
SET_ERROR_INSERT_VALUE(11008);
}
signal->theData[0] = m_end_lcp_req.senderData; signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB); sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
} }
...@@ -1588,8 +1602,11 @@ Pgman::fswritereq(Signal* signal, Ptr<Page_entry> ptr) ...@@ -1588,8 +1602,11 @@ Pgman::fswritereq(Signal* signal, Ptr<Page_entry> ptr)
} }
#endif #endif
sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, if (!ERROR_INSERTED(11008))
FsReadWriteReq::FixedLength + 1, JBA); {
sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
FsReadWriteReq::FixedLength + 1, JBA);
}
} }
void void
...@@ -2452,6 +2469,11 @@ Pgman::execDUMP_STATE_ORD(Signal* signal) ...@@ -2452,6 +2469,11 @@ Pgman::execDUMP_STATE_ORD(Signal* signal)
{ {
SET_ERROR_INSERT_VALUE(11006); SET_ERROR_INSERT_VALUE(11006);
} }
if (signal->theData[0] == 11007)
{
SET_ERROR_INSERT_VALUE(11007);
}
} }
// page cache client // page cache client
......
...@@ -36,6 +36,16 @@ public: ...@@ -36,6 +36,16 @@ public:
int updateValue = 0, int updateValue = 0,
bool abort = false); bool abort = false);
int loadTableStartFrom(Ndb*,
int startFrom,
int records,
int batch = 512,
bool allowConstraintViolation = true,
int doSleep = 0,
bool oneTrans = false,
int updateValue = 0,
bool abort = false);
int scanReadRecords(Ndb*, int scanReadRecords(Ndb*,
int records, int records,
int abort = 0, int abort = 0,
...@@ -56,6 +66,11 @@ public: ...@@ -56,6 +66,11 @@ public:
int batchsize = 1, int batchsize = 1,
NdbOperation::LockMode = NdbOperation::LM_Read); NdbOperation::LockMode = NdbOperation::LM_Read);
int scanUpdateRecords(Ndb*, NdbScanOperation::ScanFlag,
int records,
int abort = 0,
int parallelism = 0);
int scanUpdateRecords(Ndb*, int scanUpdateRecords(Ndb*,
int records, int records,
int abort = 0, int abort = 0,
...@@ -90,9 +105,12 @@ public: ...@@ -90,9 +105,12 @@ public:
int records, int records,
int percentToLock = 1, int percentToLock = 1,
int lockTime = 1000); int lockTime = 1000);
int fillTable(Ndb*, int fillTable(Ndb*,
int batch=512); int batch=512);
int fillTableStartFrom(Ndb*, int startFrom, int batch=512);
/** /**
* Reading using UniqHashIndex with key = pk * Reading using UniqHashIndex with key = pk
*/ */
......
...@@ -29,6 +29,11 @@ public: ...@@ -29,6 +29,11 @@ public:
int closeTransaction(Ndb*); int closeTransaction(Ndb*);
int clearTable(Ndb*,
NdbScanOperation::ScanFlag,
int records = 0,
int parallelism = 0);
int clearTable(Ndb*, int clearTable(Ndb*,
int records = 0, int records = 0,
int parallelism = 0); int parallelism = 0);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <NdbRestarter.hpp> #include <NdbRestarter.hpp>
#include <Vector.hpp> #include <Vector.hpp>
#include <signaldata/DumpStateOrd.hpp> #include <signaldata/DumpStateOrd.hpp>
#include <NdbBackup.hpp>
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){ int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
...@@ -1293,6 +1294,158 @@ runBug28770(NDBT_Context* ctx, NDBT_Step* step) { ...@@ -1293,6 +1294,158 @@ runBug28770(NDBT_Context* ctx, NDBT_Step* step) {
return result; return result;
} }
int runSR_DD_1(NDBT_Context* ctx, NDBT_Step* step)
{
Ndb* pNdb = GETNDB(step);
int result = NDBT_OK;
Uint32 loops = ctx->getNumLoops();
int count;
NdbRestarter restarter;
NdbBackup backup(GETNDB(step)->getNodeId()+1);
bool lcploop = ctx->getProperty("LCP", (unsigned)0);
Uint32 i = 1;
Uint32 backupId;
int val[] = { DumpStateOrd::CmvmiSetRestartOnErrorInsert, 1 };
int lcp = DumpStateOrd::DihMinTimeBetweenLCP;
int startFrom = 0;
HugoTransactions hugoTrans(*ctx->getTab());
while(i<=loops && result != NDBT_FAILED)
{
if (lcploop)
{
CHECK(restarter.dumpStateAllNodes(&lcp, 1) == 0);
}
int nodeId = restarter.getDbNodeId(rand() % restarter.getNumDbNodes());
//CHECK(restarter.dumpStateAllNodes(&val, 1) == 0);
ndbout << "Loop " << i << "/"<< loops <<" started" << endl;
ndbout << "Loading records..." << startFrom << endl;
CHECK(hugoTrans.loadTable(pNdb, startFrom) == 0);
ndbout << "Making " << nodeId << " crash" << endl;
int kill[] = { 9999, 1000, 3000 };
CHECK(restarter.dumpStateOneNode(nodeId, val, 2) == 0);
CHECK(restarter.dumpStateOneNode(nodeId, kill, 3) == 0);
Uint64 end = NdbTick_CurrentMillisecond() + 4000;
Uint32 row = startFrom;
do {
ndbout << "Loading from " << row << " to " << row + 1000 << endl;
if (hugoTrans.loadTableStartFrom(pNdb, row, 1000) != 0)
break;
row += 1000;
} while (NdbTick_CurrentMillisecond() < end);
ndbout << "Waiting for " << nodeId << " to restart" << endl;
CHECK(restarter.waitNodesNoStart(&nodeId, 1) == 0);
ndbout << "Restarting cluster" << endl;
CHECK(restarter.restartAll(false, true, true) == 0);
CHECK(restarter.waitClusterNoStart() == 0);
CHECK(restarter.startAll() == 0);
CHECK(restarter.waitClusterStarted() == 0);
ndbout << "Starting backup..." << flush;
CHECK(backup.start(backupId) == 0);
ndbout << "done" << endl;
int cnt = 0;
CHECK(hugoTrans.selectCount(pNdb, 0, &cnt) == 0);
ndbout << "Found " << cnt << " records..." << endl;
ndbout << "Clearing..." << endl;
CHECK(hugoTrans.clearTable(pNdb,
NdbScanOperation::SF_TupScan, cnt) == 0);
if (cnt > startFrom)
{
startFrom = cnt;
}
startFrom += 1000;
i++;
}
ndbout << "runSR_DD_1 finished" << endl;
return result;
}
int runSR_DD_2(NDBT_Context* ctx, NDBT_Step* step)
{
Ndb* pNdb = GETNDB(step);
int result = NDBT_OK;
Uint32 loops = ctx->getNumLoops();
Uint32 rows = ctx->getNumRecords();
int count;
NdbRestarter restarter;
NdbBackup backup(GETNDB(step)->getNodeId()+1);
bool lcploop = ctx->getProperty("LCP", (unsigned)0);
Uint32 i = 1;
Uint32 backupId;
int val[] = { DumpStateOrd::CmvmiSetRestartOnErrorInsert, 1 };
int lcp = DumpStateOrd::DihMinTimeBetweenLCP;
int startFrom = 0;
HugoTransactions hugoTrans(*ctx->getTab());
while(i<=loops && result != NDBT_FAILED)
{
if (lcploop)
{
CHECK(restarter.dumpStateAllNodes(&lcp, 1) == 0);
}
int nodeId = restarter.getDbNodeId(rand() % restarter.getNumDbNodes());
ndbout << "Making " << nodeId << " crash" << endl;
int kill[] = { 9999, 3000, 10000 };
CHECK(restarter.dumpStateOneNode(nodeId, val, 2) == 0);
CHECK(restarter.dumpStateOneNode(nodeId, kill, 3) == 0);
Uint64 end = NdbTick_CurrentMillisecond() + 11000;
Uint32 row = startFrom;
do {
if (hugoTrans.loadTable(pNdb, rows) != 0)
break;
if (hugoTrans.clearTable(pNdb, NdbScanOperation::SF_TupScan, rows) != 0)
break;
} while (NdbTick_CurrentMillisecond() < end);
ndbout << "Waiting for " << nodeId << " to restart" << endl;
CHECK(restarter.waitNodesNoStart(&nodeId, 1) == 0);
ndbout << "Restarting cluster" << endl;
CHECK(restarter.restartAll(false, true, true) == 0);
CHECK(restarter.waitClusterNoStart() == 0);
CHECK(restarter.startAll() == 0);
CHECK(restarter.waitClusterStarted() == 0);
ndbout << "Starting backup..." << flush;
CHECK(backup.start(backupId) == 0);
ndbout << "done" << endl;
int cnt = 0;
CHECK(hugoTrans.selectCount(pNdb, 0, &cnt) == 0);
ndbout << "Found " << cnt << " records..." << endl;
ndbout << "Clearing..." << endl;
CHECK(hugoTrans.clearTable(pNdb,
NdbScanOperation::SF_TupScan, cnt) == 0);
i++;
}
ndbout << "runSR_DD_2 finished" << endl;
return result;
}
NDBT_TESTSUITE(testSystemRestart); NDBT_TESTSUITE(testSystemRestart);
TESTCASE("SR1", TESTCASE("SR1",
...@@ -1474,6 +1627,32 @@ TESTCASE("Bug24664", ...@@ -1474,6 +1627,32 @@ TESTCASE("Bug24664",
STEP(runBug24664); STEP(runBug24664);
FINALIZER(runClearTable); FINALIZER(runClearTable);
} }
TESTCASE("SR_DD_1", "")
{
INITIALIZER(runWaitStarted);
STEP(runSR_DD_1);
FINALIZER(runClearTable);
}
TESTCASE("SR_DD_1_LCP", "")
{
TC_PROPERTY("LCP", 1);
INITIALIZER(runWaitStarted);
STEP(runSR_DD_1);
FINALIZER(runClearTable);
}
TESTCASE("SR_DD_2", "")
{
INITIALIZER(runWaitStarted);
STEP(runSR_DD_2);
FINALIZER(runClearTable);
}
TESTCASE("SR_DD_2_LCP", "")
{
TC_PROPERTY("LCP", 1);
INITIALIZER(runWaitStarted);
STEP(runSR_DD_2);
FINALIZER(runClearTable);
}
TESTCASE("Bug29167", "") TESTCASE("Bug29167", "")
{ {
INITIALIZER(runWaitStarted); INITIALIZER(runWaitStarted);
......
...@@ -945,3 +945,36 @@ args: -n Bug28804 T1 T3 ...@@ -945,3 +945,36 @@ args: -n Bug28804 T1 T3
max-time: 180 max-time: 180
cmd: testIndex cmd: testIndex
args: -n Bug28804_ATTRINFO T1 T3 args: -n Bug28804_ATTRINFO T1 T3
max-time: 1500
cmd: testSystemRestart
args: -n SR_DD_1 D1
max-time: 1500
cmd: testSystemRestart
args: -n SR_DD_1 D2
max-time: 1500
cmd: testSystemRestart
args: -n SR_DD_1_LCP D1
max-time: 1500
cmd: testSystemRestart
args: -n SR_DD_1_LCP D2
max-time: 1500
cmd: testSystemRestart
args: -n SR_DD_2 D1
max-time: 1500
cmd: testSystemRestart
args: -n SR_DD_2 D2
max-time: 1500
cmd: testSystemRestart
args: -n SR_DD_2_LCP D1
max-time: 1500
cmd: testSystemRestart
args: -n SR_DD_2_LCP D2
...@@ -341,50 +341,14 @@ HugoTransactions::scanReadRecords(Ndb* pNdb, ...@@ -341,50 +341,14 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
int int
HugoTransactions::scanUpdateRecords(Ndb* pNdb, HugoTransactions::scanUpdateRecords(Ndb* pNdb,
int records, NdbScanOperation::ScanFlag flags,
int abortPercent, int records,
int parallelism){ int abortPercent,
if(m_defaultScanUpdateMethod == 1){ int parallelism){
return scanUpdateRecords1(pNdb, records, abortPercent, parallelism); int retryAttempt = 0;
} else if(m_defaultScanUpdateMethod == 2){
return scanUpdateRecords2(pNdb, records, abortPercent, parallelism);
} else {
return scanUpdateRecords3(pNdb, records, abortPercent, parallelism);
}
}
// Scan all records exclusive and update
// them one by one
int
HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
return scanUpdateRecords3(pNdb, records, abortPercent, 1);
}
// Scan all records exclusive and update
// them batched by asking nextScanResult to
// give us all cached records before fetching new
// records from db
int
HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
return scanUpdateRecords3(pNdb, records, abortPercent, parallelism);
}
int
HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
int retryAttempt = 0;
int check, a; int check, a;
NdbScanOperation *pOp; NdbScanOperation *pOp;
while (true){ while (true){
restart: restart:
if (retryAttempt++ >= m_retryMax){ if (retryAttempt++ >= m_retryMax){
...@@ -411,8 +375,9 @@ restart: ...@@ -411,8 +375,9 @@ restart:
return NDBT_FAILED; return NDBT_FAILED;
} }
if( pOp->readTuplesExclusive(parallelism) ) { if( pOp->readTuples(NdbOperation::LM_Exclusive, flags,
ERR(pTrans->getNdbError()); parallelism))
{
closeTransaction(pNdb); closeTransaction(pNdb);
return NDBT_FAILED; return NDBT_FAILED;
} }
...@@ -429,15 +394,18 @@ restart: ...@@ -429,15 +394,18 @@ restart:
check = pTrans->execute(NoCommit, AbortOnError); check = pTrans->execute(NoCommit, AbortOnError);
if( check == -1 ) { if( check == -1 ) {
const NdbError err = pTrans->getNdbError(); const NdbError err = pTrans->getNdbError();
ERR(err);
closeTransaction(pNdb);
if (err.status == NdbError::TemporaryError){ if (err.status == NdbError::TemporaryError){
ERR(err);
closeTransaction(pNdb);
NdbSleep_MilliSleep(50); NdbSleep_MilliSleep(50);
retryAttempt++;
continue; continue;
} }
ERR(err);
closeTransaction(pNdb);
return NDBT_FAILED; return NDBT_FAILED;
} }
// Abort after 1-100 or 1-records rows // Abort after 1-100 or 1-records rows
int ranVal = rand(); int ranVal = rand();
int abortCount = ranVal % (records == 0 ? 100 : records); int abortCount = ranVal % (records == 0 ? 100 : records);
...@@ -448,74 +416,113 @@ restart: ...@@ -448,74 +416,113 @@ restart:
abortTrans = true; abortTrans = true;
} }
int eof;
int rows = 0; int rows = 0;
while((check = pOp->nextResult(true)) == 0){ while((eof = pOp->nextResult(true)) == 0){
do { rows++;
rows++; if (calc.verifyRowValues(&row) != 0){
NdbOperation* pUp = pOp->updateCurrentTuple(); closeTransaction(pNdb);
if(pUp == 0){ return NDBT_FAILED;
}
if (abortCount == rows && abortTrans == true){
ndbout << "Scan is aborted" << endl;
g_info << "Scan is aborted" << endl;
pOp->close();
if( check == -1 ) {
ERR(pTrans->getNdbError()); ERR(pTrans->getNdbError());
closeTransaction(pNdb); closeTransaction(pNdb);
return NDBT_FAILED; return NDBT_FAILED;
} }
const int updates = calc.getUpdatesValue(&row) + 1;
const int r = calc.getIdValue(&row);
for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == false){
if(setValueForAttr(pUp, a, r, updates ) != 0){
ERR(pTrans->getNdbError());
closeTransaction(pNdb);
return NDBT_FAILED;
}
}
}
if (rows == abortCount && abortTrans == true){
g_info << "Scan is aborted" << endl;
// This scan should be aborted
closeTransaction(pNdb);
return NDBT_OK;
}
} while((check = pOp->nextResult(false)) == 0);
if(check != -1){
check = pTrans->execute(Commit, AbortOnError);
if(check != -1)
m_latest_gci = pTrans->getGCI();
pTrans->restart();
}
const NdbError err = pTrans->getNdbError();
if( check == -1 ) {
closeTransaction(pNdb); closeTransaction(pNdb);
ERR(err); return NDBT_OK;
if (err.status == NdbError::TemporaryError){
NdbSleep_MilliSleep(50);
goto restart;
}
return NDBT_FAILED;
} }
} }
if (eof == -1) {
const NdbError err = pTrans->getNdbError(); const NdbError err = pTrans->getNdbError();
if( check == -1 ) {
closeTransaction(pNdb);
ERR(err);
if (err.status == NdbError::TemporaryError){ if (err.status == NdbError::TemporaryError){
ERR_INFO(err);
closeTransaction(pNdb);
NdbSleep_MilliSleep(50); NdbSleep_MilliSleep(50);
goto restart; switch (err.code){
case 488:
case 245:
case 490:
// Too many active scans, no limit on number of retry attempts
break;
default:
retryAttempt++;
}
continue;
} }
ERR(err);
closeTransaction(pNdb);
return NDBT_FAILED; return NDBT_FAILED;
} }
closeTransaction(pNdb); closeTransaction(pNdb);
g_info << rows << " rows have been read" << endl;
if (records != 0 && rows != records){
g_err << "Check expected number of records failed" << endl
<< " expected=" << records <<", " << endl
<< " read=" << rows << endl;
return NDBT_FAILED;
}
g_info << rows << " rows have been updated" << endl;
return NDBT_OK; return NDBT_OK;
} }
return NDBT_FAILED; return NDBT_FAILED;
} }
int
HugoTransactions::scanUpdateRecords(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
return scanUpdateRecords(pNdb,
(NdbScanOperation::ScanFlag)0,
records, abortPercent, parallelism);
}
// Scan all records exclusive and update
// them one by one
int
HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
return scanUpdateRecords(pNdb,
(NdbScanOperation::ScanFlag)0,
records, abortPercent, 1);
}
// Scan all records exclusive and update
// them batched by asking nextScanResult to
// give us all cached records before fetching new
// records from db
int
HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
records, abortPercent, parallelism);
}
int
HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
int records,
int abortPercent,
int parallelism)
{
return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
records, abortPercent, parallelism);
}
int int
HugoTransactions::loadTable(Ndb* pNdb, HugoTransactions::loadTable(Ndb* pNdb,
int records, int records,
...@@ -524,7 +531,22 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -524,7 +531,22 @@ HugoTransactions::loadTable(Ndb* pNdb,
int doSleep, int doSleep,
bool oneTrans, bool oneTrans,
int value, int value,
bool abort){ bool abort)
{
return loadTableStartFrom(pNdb, 0, records, batch, allowConstraintViolation,
doSleep, oneTrans, value, abort);
}
int
HugoTransactions::loadTableStartFrom(Ndb* pNdb,
int startFrom,
int records,
int batch,
bool allowConstraintViolation,
int doSleep,
bool oneTrans,
int value,
bool abort){
int check; int check;
int retryAttempt = 0; int retryAttempt = 0;
int retryMax = 5; int retryMax = 5;
...@@ -543,8 +565,9 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -543,8 +565,9 @@ HugoTransactions::loadTable(Ndb* pNdb,
<< " -> rows/commit = " << batch << endl; << " -> rows/commit = " << batch << endl;
} }
Uint32 orgbatch = batch;
g_info << "|- Inserting records..." << endl; g_info << "|- Inserting records..." << endl;
for (int c=0 ; c<records ; ){ for (int c=0 ; c<records; ){
bool closeTrans = true; bool closeTrans = true;
if(c + batch > records) if(c + batch > records)
...@@ -578,7 +601,7 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -578,7 +601,7 @@ HugoTransactions::loadTable(Ndb* pNdb,
} }
} }
if(pkInsertRecord(pNdb, c, batch, value) != NDBT_OK) if(pkInsertRecord(pNdb, c + startFrom, batch, value) != NDBT_OK)
{ {
ERR(pTrans->getNdbError()); ERR(pTrans->getNdbError());
closeTransaction(pNdb); closeTransaction(pNdb);
...@@ -625,6 +648,7 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -625,6 +648,7 @@ HugoTransactions::loadTable(Ndb* pNdb,
ERR(err); ERR(err);
NdbSleep_MilliSleep(50); NdbSleep_MilliSleep(50);
retryAttempt++; retryAttempt++;
batch = 1;
continue; continue;
break; break;
...@@ -670,7 +694,14 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -670,7 +694,14 @@ HugoTransactions::loadTable(Ndb* pNdb,
int int
HugoTransactions::fillTable(Ndb* pNdb, HugoTransactions::fillTable(Ndb* pNdb,
int batch){ int batch){
return fillTableStartFrom(pNdb, 0, batch);
}
int
HugoTransactions::fillTableStartFrom(Ndb* pNdb,
int startFrom,
int batch){
int check; int check;
int retryAttempt = 0; int retryAttempt = 0;
int retryMax = 5; int retryMax = 5;
...@@ -688,7 +719,7 @@ HugoTransactions::fillTable(Ndb* pNdb, ...@@ -688,7 +719,7 @@ HugoTransactions::fillTable(Ndb* pNdb,
<< " -> rows/commit = " << batch << endl; << " -> rows/commit = " << batch << endl;
} }
for (int c=0 ; ; ){ for (int c=startFrom ; ; ){
if (retryAttempt >= retryMax){ if (retryAttempt >= retryMax){
g_info << "Record " << c << " could not be inserted, has retried " g_info << "Record " << c << " could not be inserted, has retried "
......
...@@ -42,38 +42,9 @@ UtilTransactions::UtilTransactions(Ndb* ndb, ...@@ -42,38 +42,9 @@ UtilTransactions::UtilTransactions(Ndb* ndb,
int int
UtilTransactions::clearTable(Ndb* pNdb, UtilTransactions::clearTable(Ndb* pNdb,
int records, NdbScanOperation::ScanFlag flags,
int parallelism){ int records,
if(m_defaultClearMethod == 1){ int parallelism){
return clearTable1(pNdb, records, parallelism);
} else if(m_defaultClearMethod == 2){
return clearTable2(pNdb, records, parallelism);
} else {
return clearTable3(pNdb, records, parallelism);
}
}
int
UtilTransactions::clearTable1(Ndb* pNdb,
int records,
int parallelism)
{
return clearTable3(pNdb, records, 1);
}
int
UtilTransactions::clearTable2(Ndb* pNdb,
int records,
int parallelism)
{
return clearTable3(pNdb, records, parallelism);
}
int
UtilTransactions::clearTable3(Ndb* pNdb,
int records,
int parallelism){
// Scan all records exclusive and delete // Scan all records exclusive and delete
// them one by one // them one by one
int retryAttempt = 0; int retryAttempt = 0;
...@@ -116,7 +87,7 @@ UtilTransactions::clearTable3(Ndb* pNdb, ...@@ -116,7 +87,7 @@ UtilTransactions::clearTable3(Ndb* pNdb,
goto failed; goto failed;
} }
if( pOp->readTuplesExclusive(par) ) { if( pOp->readTuples(NdbOperation::LM_Exclusive, flags, par) ) {
err = pTrans->getNdbError(); err = pTrans->getNdbError();
goto failed; goto failed;
} }
...@@ -179,6 +150,43 @@ UtilTransactions::clearTable3(Ndb* pNdb, ...@@ -179,6 +150,43 @@ UtilTransactions::clearTable3(Ndb* pNdb,
return (err.code != 0 ? err.code : NDBT_FAILED); return (err.code != 0 ? err.code : NDBT_FAILED);
} }
int
UtilTransactions::clearTable(Ndb* pNdb,
int records,
int parallelism){
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
records, parallelism);
}
int
UtilTransactions::clearTable1(Ndb* pNdb,
int records,
int parallelism)
{
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
records, 1);
}
int
UtilTransactions::clearTable2(Ndb* pNdb,
int records,
int parallelism)
{
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
records, parallelism);
}
int
UtilTransactions::clearTable3(Ndb* pNdb,
int records,
int parallelism)
{
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
records, parallelism);
}
int int
UtilTransactions::copyTableData(Ndb* pNdb, UtilTransactions::copyTableData(Ndb* pNdb,
const char* destName){ const char* destName){
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment