Commit b6a16107 authored by joreland@mysql.com's avatar joreland@mysql.com

wl1873 impl + test prg

parent fcc8d934
......@@ -96,6 +96,11 @@ public:
*/
void close();
/**
* Restart
*/
int restart();
/**
* Transfer scan operation to an updating transaction. Use this function
* when a scan has found a record that you want to update.
......
......@@ -151,6 +151,8 @@ protected:
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
Uint32 m_ordered;
int restart();
};
inline
......
......@@ -77,3 +77,8 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
return -1;
return 0;
}
int
NdbResultSet::restart(){
return m_operation->restart();
}
......@@ -448,6 +448,8 @@ NdbScanOperation::executeCursor(int nodeId){
return -1;
}
#define DEBUG_NEXT_RESULT 0
int NdbScanOperation::nextResult(bool fetchAllowed)
{
if(m_ordered)
......@@ -460,6 +462,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
Uint32 idx = m_current_api_receiver;
Uint32 last = m_api_receivers_count;
if(DEBUG_NEXT_RESULT)
ndbout_c("nextResult(%d) idx=%d last=%d",
fetchAllowed,
idx, last);
/**
* Check next buckets
*/
......@@ -1147,8 +1154,6 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
return 0;
}
#define DEBUG_NEXT_RESULT 0
int
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
......@@ -1299,3 +1304,88 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
tSignal.setLength(4+1);
return tp->sendSignal(&tSignal, nodeId);
}
int
NdbScanOperation::restart(){
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq != tp->getNodeSequence(nodeId)){
theNdbCon->theReleaseOnClose = true;
return -1;
}
while(m_sent_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
break;
case -1:
setErrorCode(4008);
case -2:
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
return -1;
}
}
if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan
if(send_next_scan(0, true) == -1) // Close scan
return -1;
}
/**
* wait for close scan conf
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
break;
case -1:
setErrorCode(4008);
case -2:
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
return -1;
}
}
/**
* Reset receivers
*/
const Uint32 parallell = theParallelism;
for(Uint32 i = 0; i<parallell; i++){
m_receivers[i]->m_list_index = i;
m_prepared_receivers[i] = m_receivers[i]->getId();
m_sent_receivers[i] = m_receivers[i];
m_conf_receivers[i] = 0;
m_api_receivers[i] = 0;
m_receivers[i]->prepareSend();
}
m_api_receivers_count = 0;
m_current_api_receiver = 0;
m_sent_receivers_count = parallell;
m_conf_receivers_count = 0;
if(m_ordered){
m_current_api_receiver = parallell;
}
if (doSendScan(nodeId) == -1)
return -1;
return 0;
}
......@@ -970,6 +970,93 @@ int runCheckInactivityBeforeClose(NDBT_Context* ctx, NDBT_Step* step){
}
int runScanRestart(NDBT_Context* ctx, NDBT_Step* step){
int loops = ctx->getNumLoops();
int records = ctx->getNumRecords();
Ndb * pNdb = GETNDB(step);
const NdbDictionary::Table* pTab = ctx->getTab();
HugoCalculator calc(* pTab);
NDBT_ResultRow tmpRow(* pTab);
int i = 0;
while (i<loops && !ctx->isTestStopped()) {
g_info << i++ << ": ";
const int record = (rand() % records);
g_info << " row=" << record;
NdbConnection* pCon = pNdb->startTransaction();
NdbScanOperation* pOp = pCon->getNdbScanOperation(pTab->getName());
if (pOp == NULL) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
NdbResultSet* rs = pOp->readTuples();
if( rs == 0 ) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
int check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
// Define attributes to read
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if((tmpRow.attributeStore(a) =
pOp->getValue(pTab->getColumn(a)->getName())) == 0) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
}
check = pCon->execute(NoCommit);
if( check == -1 ) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
int res;
int row = 0;
while(row < record && (res = rs->nextResult()) == 0) {
if(calc.verifyRowValues(&tmpRow) != 0){
abort();
return NDBT_FAILED;
}
row++;
}
if(row != record){
ERR(pCon->getNdbError());
abort();
return NDBT_FAILED;
}
g_info << " restarting" << endl;
if((res = rs->restart()) != 0){
ERR(pCon->getNdbError());
abort();
return NDBT_FAILED;
}
row = 0;
while((res = rs->nextResult()) == 0) {
if(calc.verifyRowValues(&tmpRow) != 0){
abort();
return NDBT_FAILED;
}
row++;
}
if(res != 1 || row != records){
ERR(pCon->getNdbError());
abort();
return NDBT_FAILED;
}
pCon->close();
}
return NDBT_OK;
}
NDBT_TESTSUITE(testScan);
......@@ -1393,6 +1480,12 @@ TESTCASE("ScanReadWhileNodeIsDown",
STEP(runStopAndStartNode);
FINALIZER(runClearTable);
}
TESTCASE("ScanRestart",
"Verify restart functionallity"){
INITIALIZER(runLoadTable);
STEP(runScanRestart);
FINALIZER(runClearTable);
}
NDBT_TESTSUITE_END(testScan);
int main(int argc, const char** argv){
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment