Commit b17949ae authored by joreland@mysql.com's avatar joreland@mysql.com

Merge mysql.com:/home/jonas/src/mysql-4.1-ndb

into mysql.com:/home/jonas/src/wl1873
parents 27fc980e b6a16107
......@@ -96,6 +96,11 @@ public:
*/
void close();
/**
* Restart
*/
int restart();
/**
* Transfer scan operation to an updating transaction. Use this function
* when a scan has found a record that you want to update.
......
......@@ -157,6 +157,8 @@ protected:
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
Uint32 m_ordered;
int restart();
};
inline
......
......@@ -89,3 +89,8 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
return -1;
return 0;
}
int
NdbResultSet::restart(){
return m_operation->restart();
}
......@@ -470,6 +470,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
if(DEBUG_NEXT_RESULT)
ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);
if(DEBUG_NEXT_RESULT)
ndbout_c("nextResult(%d) idx=%d last=%d",
fetchAllowed,
idx, last);
/**
* Check next buckets
*/
......@@ -1395,3 +1400,88 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
tSignal.setLength(4+1);
return tp->sendSignal(&tSignal, nodeId);
}
int
NdbScanOperation::restart(){
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq != tp->getNodeSequence(nodeId)){
theNdbCon->theReleaseOnClose = true;
return -1;
}
while(m_sent_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
break;
case -1:
setErrorCode(4008);
case -2:
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
return -1;
}
}
if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan
if(send_next_scan(0, true) == -1) // Close scan
return -1;
}
/**
* wait for close scan conf
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
break;
case -1:
setErrorCode(4008);
case -2:
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
return -1;
}
}
/**
* Reset receivers
*/
const Uint32 parallell = theParallelism;
for(Uint32 i = 0; i<parallell; i++){
m_receivers[i]->m_list_index = i;
m_prepared_receivers[i] = m_receivers[i]->getId();
m_sent_receivers[i] = m_receivers[i];
m_conf_receivers[i] = 0;
m_api_receivers[i] = 0;
m_receivers[i]->prepareSend();
}
m_api_receivers_count = 0;
m_current_api_receiver = 0;
m_sent_receivers_count = parallell;
m_conf_receivers_count = 0;
if(m_ordered){
m_current_api_receiver = parallell;
}
if (doSendScan(nodeId) == -1)
return -1;
return 0;
}
......@@ -881,6 +881,93 @@ int runCheckInactivityBeforeClose(NDBT_Context* ctx, NDBT_Step* step){
}
int runScanRestart(NDBT_Context* ctx, NDBT_Step* step){
int loops = ctx->getNumLoops();
int records = ctx->getNumRecords();
Ndb * pNdb = GETNDB(step);
const NdbDictionary::Table* pTab = ctx->getTab();
HugoCalculator calc(* pTab);
NDBT_ResultRow tmpRow(* pTab);
int i = 0;
while (i<loops && !ctx->isTestStopped()) {
g_info << i++ << ": ";
const int record = (rand() % records);
g_info << " row=" << record;
NdbConnection* pCon = pNdb->startTransaction();
NdbScanOperation* pOp = pCon->getNdbScanOperation(pTab->getName());
if (pOp == NULL) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
NdbResultSet* rs = pOp->readTuples();
if( rs == 0 ) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
int check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
// Define attributes to read
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if((tmpRow.attributeStore(a) =
pOp->getValue(pTab->getColumn(a)->getName())) == 0) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
}
check = pCon->execute(NoCommit);
if( check == -1 ) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
int res;
int row = 0;
while(row < record && (res = rs->nextResult()) == 0) {
if(calc.verifyRowValues(&tmpRow) != 0){
abort();
return NDBT_FAILED;
}
row++;
}
if(row != record){
ERR(pCon->getNdbError());
abort();
return NDBT_FAILED;
}
g_info << " restarting" << endl;
if((res = rs->restart()) != 0){
ERR(pCon->getNdbError());
abort();
return NDBT_FAILED;
}
row = 0;
while((res = rs->nextResult()) == 0) {
if(calc.verifyRowValues(&tmpRow) != 0){
abort();
return NDBT_FAILED;
}
row++;
}
if(res != 1 || row != records){
ERR(pCon->getNdbError());
abort();
return NDBT_FAILED;
}
pCon->close();
}
return NDBT_OK;
}
NDBT_TESTSUITE(testScan);
......@@ -1304,6 +1391,12 @@ TESTCASE("ScanReadWhileNodeIsDown",
STEP(runStopAndStartNode);
FINALIZER(runClearTable);
}
TESTCASE("ScanRestart",
"Verify restart functionallity"){
INITIALIZER(runLoadTable);
STEP(runScanRestart);
FINALIZER(runClearTable);
}
NDBT_TESTSUITE_END(testScan);
int main(int argc, const char** argv){
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment