NdbScanOperation.cpp 48.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

17
#include <ndb_global.h>
18 19
#include <Ndb.hpp>
#include <NdbScanOperation.hpp>
unknown's avatar
unknown committed
20
#include <NdbIndexScanOperation.hpp>
21
#include <NdbTransaction.hpp>
22 23 24
#include "NdbApiSignal.hpp"
#include <NdbOut.hpp>
#include "NdbDictionaryImpl.hpp"
25
#include <NdbBlob.hpp>
26

unknown's avatar
unknown committed
27 28 29 30 31 32 33 34
#include <NdbRecAttr.hpp>
#include <NdbReceiver.hpp>

#include <stdlib.h>
#include <NdbSqlUtil.hpp>

#include <signaldata/ScanTab.hpp>
#include <signaldata/KeyInfo.hpp>
35
#include <signaldata/AttrInfo.hpp>
unknown's avatar
unknown committed
36 37
#include <signaldata/TcKeyReq.hpp>

unknown's avatar
unknown committed
38 39
#define DEBUG_NEXT_RESULT 0

40
NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
unknown's avatar
unknown committed
41 42
  NdbOperation(aNdb),
  m_transConnection(NULL)
43
{
unknown's avatar
unknown committed
44 45 46 47 48 49 50
  theParallelism = 0;
  m_allocated_receivers = 0;
  m_prepared_receivers = 0;
  m_api_receivers = 0;
  m_conf_receivers = 0;
  m_sent_receivers = 0;
  m_receivers = 0;
unknown's avatar
unknown committed
51
  m_array = new Uint32[1]; // skip if on delete in fix_receivers
unknown's avatar
unknown committed
52
  theSCAN_TABREQ = 0;
53
  m_executed = false;
54 55 56 57
}

NdbScanOperation::~NdbScanOperation()
{
unknown's avatar
unknown committed
58
  for(Uint32 i = 0; i<m_allocated_receivers; i++){
unknown's avatar
unknown committed
59
    m_receivers[i]->release();
unknown's avatar
unknown committed
60 61 62
    theNdb->releaseNdbScanRec(m_receivers[i]);
  }
  delete[] m_array;
63 64 65 66
}

void
NdbScanOperation::setErrorCode(int aErrorCode){
67
  NdbTransaction* tmp = theNdbCon;
68 69 70 71 72 73 74
  theNdbCon = m_transConnection;
  NdbOperation::setErrorCode(aErrorCode);
  theNdbCon = tmp;
}

void
NdbScanOperation::setErrorCodeAbort(int aErrorCode){
75
  NdbTransaction* tmp = theNdbCon;
76 77 78 79 80 81 82 83 84 85 86 87 88 89
  theNdbCon = m_transConnection;
  NdbOperation::setErrorCodeAbort(aErrorCode);
  theNdbCon = tmp;
}

  
/*****************************************************************************
 * int init();
 *
 * Return Value:  Return 0 : init was successful.
 *                Return -1: In all other case.  
 * Remark:        Initiates operation record after allocation.
 *****************************************************************************/
int
90
NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
91 92 93
{
  m_transConnection = myConnection;
  //NdbConnection* aScanConnection = theNdb->startTransaction(myConnection);
94
  theNdb->theRemainingStartTransactions++; // will be checked in hupp...
95
  NdbTransaction* aScanConnection = theNdb->hupp(myConnection);
96
  if (!aScanConnection){
97
    theNdb->theRemainingStartTransactions--;
98
    setErrorCodeAbort(theNdb->getNdbError().code);
99
    return -1;
100
  }
101

unknown's avatar
unknown committed
102 103
  // NOTE! The hupped trans becomes the owner of the operation
  if(NdbOperation::init(tab, aScanConnection) != 0){
104
    theNdb->theRemainingStartTransactions--;
105 106
    return -1;
  }
unknown's avatar
unknown committed
107 108 109 110 111
  
  initInterpreter();
  
  theStatus = GetValue;
  theOperationType = OpenScanRequest;
unknown's avatar
unknown committed
112
  theNdbCon->theMagicNumber = 0xFE11DF;
113
  theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
114
  m_read_range_no = 0;
115
  m_executed = false;
116 117 118
  return 0;
}

119 120
int 
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
121
			     Uint32 scan_flags, 
122
			     Uint32 parallel)
123
{
124
  m_ordered = m_descending = false;
unknown's avatar
unknown committed
125
  Uint32 fragCount = m_currentTable->m_fragmentCount;
126

unknown's avatar
unknown committed
127
  if (parallel > fragCount || parallel == 0) {
unknown's avatar
unknown committed
128
     parallel = fragCount;
unknown's avatar
unknown committed
129
  }
unknown's avatar
unknown committed
130 131 132 133 134 135 136

  // It is only possible to call openScan if 
  //  1. this transcation don't already  contain another scan operation
  //  2. this transaction don't already contain other operations
  //  3. theScanOp contains a NdbScanOperation
  if (theNdbCon->theScanningOp != NULL){
    setErrorCode(4605);
137
    return -1;
unknown's avatar
unknown committed
138
  }
139

unknown's avatar
unknown committed
140
  theNdbCon->theScanningOp = this;
unknown's avatar
unknown committed
141
  theLockMode = lm;
142

unknown's avatar
unknown committed
143 144 145 146 147 148 149 150 151 152 153 154
  bool lockExcl, lockHoldMode, readCommitted;
  switch(lm){
  case NdbScanOperation::LM_Read:
    lockExcl = false;
    lockHoldMode = true;
    readCommitted = false;
    break;
  case NdbScanOperation::LM_Exclusive:
    lockExcl = true;
    lockHoldMode = true;
    readCommitted = false;
    break;
unknown's avatar
unknown committed
155
  case NdbScanOperation::LM_CommittedRead:
unknown's avatar
unknown committed
156 157 158 159 160 161
    lockExcl = false;
    lockHoldMode = false;
    readCommitted = true;
    break;
  default:
    setErrorCode(4003);
162
    return -1;
unknown's avatar
unknown committed
163
  }
164

unknown's avatar
unknown committed
165
  m_keyInfo = lockExcl ? 1 : 0;
unknown's avatar
unknown committed
166 167 168 169 170 171 172 173 174 175
  bool tupScan = (scan_flags & SF_TupScan);

#if 1 // XXX temp for testing
  { char* p = getenv("NDB_USE_TUPSCAN");
    if (p != 0) {
      unsigned n = atoi(p); // 0-10
      if (::time(0) % 10 < n) tupScan = true;
    }
  }
#endif
unknown's avatar
unknown committed
176

177
  bool rangeScan = false;
178 179
  if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
  {
180 181 182 183 184 185 186
    if (m_currentTable == m_accessTable){
      // Old way of scanning indexes, should not be allowed
      m_currentTable = theNdb->theDictionary->
	getTable(m_currentTable->m_primaryTable.c_str());
      assert(m_currentTable != NULL);
    }
    assert (m_currentTable != m_accessTable);
unknown's avatar
unknown committed
187
    // Modify operation state
188
    theStatus = GetValue;
unknown's avatar
unknown committed
189
    theOperationType  = OpenRangeScanRequest;
190 191
    rangeScan = true;
    tupScan = false;
unknown's avatar
unknown committed
192
  }
unknown's avatar
unknown committed
193
  
unknown's avatar
unknown committed
194
  theParallelism = parallel;
195

unknown's avatar
unknown committed
196
  if(fix_receivers(parallel) == -1){
unknown's avatar
unknown committed
197
    setErrorCodeAbort(4000);
198
    return -1;
unknown's avatar
unknown committed
199 200
  }
  
unknown's avatar
unknown committed
201
  theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ);
unknown's avatar
unknown committed
202 203
  if (theSCAN_TABREQ == NULL) {
    setErrorCodeAbort(4000);
204
    return -1;
unknown's avatar
unknown committed
205 206
  }//if
  
207
  theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ);
unknown's avatar
unknown committed
208 209
  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
  req->apiConnectPtr = theNdbCon->theTCConPtr;
unknown's avatar
unknown committed
210
  req->tableId = m_accessTable->m_id;
unknown's avatar
unknown committed
211 212 213 214 215
  req->tableSchemaVersion = m_accessTable->m_version;
  req->storedProcId = 0xFFFF;
  req->buddyConPtr = theNdbCon->theBuddyConPtr;
  
  Uint32 reqInfo = 0;
unknown's avatar
unknown committed
216
  ScanTabReq::setParallelism(reqInfo, parallel);
unknown's avatar
unknown committed
217
  ScanTabReq::setScanBatch(reqInfo, 0);
unknown's avatar
unknown committed
218 219 220
  ScanTabReq::setLockMode(reqInfo, lockExcl);
  ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
  ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
221 222
  ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
  ScanTabReq::setTupScanFlag(reqInfo, tupScan);
unknown's avatar
unknown committed
223
  req->requestInfo = reqInfo;
224

unknown's avatar
unknown committed
225 226 227
  Uint64 transId = theNdbCon->getTransactionId();
  req->transId1 = (Uint32) transId;
  req->transId2 = (Uint32) (transId >> 32);
228

229 230 231 232 233
  NdbApiSignal* tSignal = theSCAN_TABREQ->next();
  if(!tSignal)
  {
    theSCAN_TABREQ->next(tSignal = theNdb->getSignal());
  }
unknown's avatar
unknown committed
234
  theLastKEYINFO = tSignal;
235 236 237 238
  
  tSignal->setSignal(GSN_KEYINFO);
  theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
  theTotalNrOfKeyWordInSignal= 0;
239

240
  getFirstATTRINFOScan();
241
  return 0;
242 243
}

unknown's avatar
unknown committed
244
int
unknown's avatar
unknown committed
245 246 247 248 249
NdbScanOperation::fix_receivers(Uint32 parallel){
  assert(parallel > 0);
  if(parallel > m_allocated_receivers){
    const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));

250
    Uint64 * tmp = new Uint64[(sz+7)/8];
unknown's avatar
unknown committed
251
    // Save old receivers
252
    memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*));
unknown's avatar
unknown committed
253
    delete[] m_array;
254
    m_array = (Uint32*)tmp;
unknown's avatar
unknown committed
255
    
256
    m_receivers = (NdbReceiver**)tmp;
unknown's avatar
unknown committed
257 258 259
    m_api_receivers = m_receivers + parallel;
    m_conf_receivers = m_api_receivers + parallel;
    m_sent_receivers = m_conf_receivers + parallel;
260
    m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel);
unknown's avatar
unknown committed
261

unknown's avatar
unknown committed
262
    // Only get/init "new" receivers
unknown's avatar
unknown committed
263
    NdbReceiver* tScanRec;
unknown's avatar
unknown committed
264
    for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
unknown's avatar
unknown committed
265 266 267 268 269 270
      tScanRec = theNdb->getNdbScanRec();
      if (tScanRec == NULL) {
	setErrorCodeAbort(4000);
	return -1;
      }//if
      m_receivers[i] = tScanRec;
unknown's avatar
unknown committed
271
      tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);
unknown's avatar
unknown committed
272
    }
unknown's avatar
unknown committed
273
    m_allocated_receivers = parallel;
unknown's avatar
unknown committed
274
  }
unknown's avatar
unknown committed
275
  
unknown's avatar
unknown committed
276
  reset_receivers(parallel, 0);
277 278 279
  return 0;
}

unknown's avatar
unknown committed
280 281 282 283 284
/**
 * Move receiver from send array to conf:ed array
 */
void
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
unknown's avatar
unknown committed
285
  if(theError.code == 0){
286 287 288
    if(DEBUG_NEXT_RESULT)
      ndbout_c("receiver_delivered");
    
unknown's avatar
unknown committed
289 290 291 292 293 294 295 296 297 298 299 300 301 302
    Uint32 idx = tRec->m_list_index;
    Uint32 last = m_sent_receivers_count - 1;
    if(idx != last){
      NdbReceiver * move = m_sent_receivers[last];
      m_sent_receivers[idx] = move;
      move->m_list_index = idx;
    }
    m_sent_receivers_count = last;
    
    last = m_conf_receivers_count;
    m_conf_receivers[last] = tRec;
    m_conf_receivers_count = last + 1;
    tRec->m_list_index = last;
    tRec->m_current_row = 0;
unknown's avatar
unknown committed
303
  }
304 305
}

unknown's avatar
unknown committed
306 307 308 309 310
/**
 * Remove receiver as it's completed
 */
void
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
unknown's avatar
unknown committed
311
  if(theError.code == 0){
unknown's avatar
unknown committed
312 313 314
    if(DEBUG_NEXT_RESULT)
      ndbout_c("receiver_completed");
    
unknown's avatar
unknown committed
315 316 317 318 319 320 321 322
    Uint32 idx = tRec->m_list_index;
    Uint32 last = m_sent_receivers_count - 1;
    if(idx != last){
      NdbReceiver * move = m_sent_receivers[last];
      m_sent_receivers[idx] = move;
      move->m_list_index = idx;
    }
    m_sent_receivers_count = last;
unknown's avatar
unknown committed
323
  }
324 325
}

unknown's avatar
unknown committed
326 327 328 329 330 331 332 333 334 335 336 337 338 339
/*****************************************************************************
 * int getFirstATTRINFOScan( U_int32 aData )
 *
 * Return Value:  Return 0:   Successful
 *      	  Return -1:  All other cases
 * Parameters:    None: 	   Only allocate the first signal.
 * Remark:        When a scan is defined we need to use this method instead 
 *                of insertATTRINFO for the first signal. 
 *                This is because we need not to mess up the code in 
 *                insertATTRINFO with if statements since we are not 
 *                interested in the TCKEYREQ signal.
 *****************************************************************************/
int
NdbScanOperation::getFirstATTRINFOScan()
340
{
unknown's avatar
unknown committed
341
  NdbApiSignal* tSignal;
342

unknown's avatar
unknown committed
343 344 345 346 347 348 349 350 351 352 353
  tSignal = theNdb->getSignal();
  if (tSignal == NULL){
    setErrorCodeAbort(4000);      
    return -1;    
  }
  tSignal->setSignal(m_attrInfoGSN);
  theAI_LenInCurrAI = 8;
  theATTRINFOptr = &tSignal->getDataPtrSend()[8];
  theFirstATTRINFO = tSignal;
  theCurrentATTRINFO = tSignal;
  theCurrentATTRINFO->next(NULL);
354 355 356 357

  return 0;
}

unknown's avatar
unknown committed
358 359 360 361 362 363
/**
 * Constats for theTupleKeyDefined[][0]
 */
#define SETBOUND_EQ 1
#define FAKE_PTR 2
#define API_PTR 3
364

unknown's avatar
unknown committed
365
#define WAITFOR_SCAN_TIMEOUT 120000
366

unknown's avatar
unknown committed
367 368
int
NdbScanOperation::executeCursor(int nodeId){
369
  NdbTransaction * tCon = theNdbCon;
370
  TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
unknown's avatar
unknown committed
371
  Guard guard(tp->theMutexPtr);
unknown's avatar
unknown committed
372 373

  Uint32 magic = tCon->theMagicNumber;
unknown's avatar
unknown committed
374
  Uint32 seq = tCon->theNodeSequence;
unknown's avatar
unknown committed
375

unknown's avatar
unknown committed
376 377 378
  if (tp->get_node_alive(nodeId) &&
      (tp->getNodeSequence(nodeId) == seq)) {

unknown's avatar
unknown committed
379 380 381 382
    /**
     * Only call prepareSendScan first time (incase of restarts)
     *   - check with theMagicNumber
     */
unknown's avatar
unknown committed
383
    tCon->theMagicNumber = 0x37412619;
unknown's avatar
unknown committed
384 385 386 387
    if(magic != 0x37412619 && 
       prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
      return -1;
    
unknown's avatar
unknown committed
388 389 390 391
    
    if (doSendScan(nodeId) == -1)
      return -1;

392
    m_executed= true; // Mark operation as executed
unknown's avatar
unknown committed
393 394 395 396 397 398 399 400 401 402 403
    return 0;
  } else {
    if (!(tp->get_node_stopping(nodeId) &&
	  (tp->getNodeSequence(nodeId) == seq))){
      TRACE_DEBUG("The node is hard dead when attempting to start a scan");
      setErrorCode(4029);
      tCon->theReleaseOnClose = true;
    } else {
      TRACE_DEBUG("The node is stopping when attempting to start a scan");
      setErrorCode(4030);
    }//if
404
    tCon->theCommitStatus = NdbTransaction::Aborted;
unknown's avatar
unknown committed
405 406
  }//if
  return -1;
407 408
}

409

unknown's avatar
unknown committed
410
int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
{
  int res;
  if ((res = nextResultImpl(fetchAllowed, forceSend)) == 0) {
    // handle blobs
    NdbBlob* tBlob = theBlobList;
    while (tBlob != 0) {
      if (tBlob->atNextResult() == -1)
        return -1;
      tBlob = tBlob->theNext;
    }
    /*
     * Flush blob part ops on behalf of user because
     * - nextResult is analogous to execute(NoCommit)
     * - user is likely to want blob value before next execute
     */
    if (m_transConnection->executePendingBlobOps() == -1)
      return -1;
    return 0;
  }
  return res;
}

int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
434
{
unknown's avatar
unknown committed
435
  if(m_ordered)
unknown's avatar
unknown committed
436 437
    return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
							       forceSend);
unknown's avatar
unknown committed
438 439 440 441 442 443 444
  
  /**
   * Check current receiver
   */
  int retVal = 2;
  Uint32 idx = m_current_api_receiver;
  Uint32 last = m_api_receivers_count;
445
  m_curr_row = 0;
446 447 448

  if(DEBUG_NEXT_RESULT)
    ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);
unknown's avatar
unknown committed
449 450 451 452 453 454 455
  
  /**
   * Check next buckets
   */
  for(; idx < last; idx++){
    NdbReceiver* tRec = m_api_receivers[idx];
    if(tRec->nextResult()){
456
      m_curr_row = tRec->copyout(theReceiver);
unknown's avatar
unknown committed
457 458 459 460 461 462 463 464
      retVal = 0;
      break;
    }
  }
    
  /**
   * We have advanced atleast one bucket
   */
unknown's avatar
unknown committed
465
  if(!fetchAllowed || !retVal){
unknown's avatar
unknown committed
466
    m_current_api_receiver = idx;
unknown's avatar
unknown committed
467
    if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
unknown's avatar
unknown committed
468 469
    return retVal;
  }
unknown's avatar
unknown committed
470
  
unknown's avatar
unknown committed
471
  Uint32 nodeId = theNdbCon->theDBnode;
472
  TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
unknown's avatar
unknown committed
473 474 475 476 477 478 479 480
  /*
    The PollGuard has an implicit call of unlock_and_signal through the
    ~PollGuard method. This method is called implicitly by the compiler
    in all places where the object is out of context due to a return,
    break, continue or simply end of statement block
  */
  PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
                       theNdb->theNdbBlockNumber);
481 482 483
  if(theError.code)
    return -1;

unknown's avatar
unknown committed
484
  Uint32 seq = theNdbCon->theNodeSequence;
unknown's avatar
unknown committed
485 486
  if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0)
  {
unknown's avatar
unknown committed
487 488 489 490 491
      
    idx = m_current_api_receiver;
    last = m_api_receivers_count;
      
    do {
unknown's avatar
unknown committed
492 493
      if(theError.code){
	setErrorCode(theError.code);
unknown's avatar
unknown committed
494
	if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
unknown's avatar
unknown committed
495 496 497
	return -1;
      }
      
unknown's avatar
unknown committed
498 499
      Uint32 cnt = m_conf_receivers_count;
      Uint32 sent = m_sent_receivers_count;
500 501 502

      if(DEBUG_NEXT_RESULT)
	ndbout_c("idx=%d last=%d cnt=%d sent=%d", idx, last, cnt, sent);
unknown's avatar
unknown committed
503 504 505 506 507 508 509 510 511 512 513 514
	
      if(cnt > 0){
	/**
	 * Just move completed receivers
	 */
	memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));
	last += cnt;
	m_conf_receivers_count = 0;
      } else if(retVal == 2 && sent > 0){
	/**
	 * No completed...
	 */
unknown's avatar
unknown committed
515 516 517
        int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
                                           forceSend);
	if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
unknown's avatar
unknown committed
518 519 520
	  continue;
	} else {
	  idx = last;
unknown's avatar
unknown committed
521
	  retVal = -2; //return_code;
unknown's avatar
unknown committed
522 523 524 525 526
	}
      } else if(retVal == 2){
	/**
	 * No completed & no sent -> EndOfData
	 */
unknown's avatar
unknown committed
527 528 529
	theError.code = -1; // make sure user gets error if he tries again
	if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
	return 1;
530
      }
unknown's avatar
unknown committed
531 532 533 534 535 536 537
	
      if(retVal == 0)
	break;
	
      for(; idx < last; idx++){
	NdbReceiver* tRec = m_api_receivers[idx];
	if(tRec->nextResult()){
538
	  m_curr_row = tRec->copyout(theReceiver);      
unknown's avatar
unknown committed
539 540 541
	  retVal = 0;
	  break;
	}
542
      }
unknown's avatar
unknown committed
543 544 545
    } while(retVal == 2);
  } else {
    retVal = -3;
546
  }
unknown's avatar
unknown committed
547 548 549 550 551 552 553 554
    
  m_api_receivers_count = last;
  m_current_api_receiver = idx;
    
  switch(retVal){
  case 0:
  case 1:
  case 2:
unknown's avatar
unknown committed
555
    if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
unknown's avatar
unknown committed
556 557 558 559 560 561 562 563
    return retVal;
  case -1:
    setErrorCode(4008); // Timeout
    break;
  case -2:
    setErrorCode(4028); // Node fail
    break;
  case -3: // send_next_scan -> return fail (set error-code self)
unknown's avatar
unknown committed
564 565
    if(theError.code == 0)
      setErrorCode(4028); // seq changed = Node fail
unknown's avatar
unknown committed
566 567 568 569 570
    break;
  }
    
  theNdbCon->theTransactionIsStarted = false;
  theNdbCon->theReleaseOnClose = true;
unknown's avatar
unknown committed
571
  if(DEBUG_NEXT_RESULT) ndbout_c("return -1", retVal);
unknown's avatar
unknown committed
572
  return -1;
573 574
}

unknown's avatar
unknown committed
575
int
unknown's avatar
unknown committed
576 577
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag)
{
unknown's avatar
Merge  
unknown committed
578
  if(cnt > 0){
unknown's avatar
unknown committed
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593
    NdbApiSignal tSignal(theNdb->theMyRef);
    tSignal.setSignal(GSN_SCAN_NEXTREQ);
    
    Uint32* theData = tSignal.getDataPtrSend();
    theData[0] = theNdbCon->theTCConPtr;
    theData[1] = stopScanFlag == true ? 1 : 0;
    Uint64 transId = theNdbCon->theTransactionId;
    theData[2] = transId;
    theData[3] = (Uint32) (transId >> 32);
    
    /**
     * Prepare ops
     */
    Uint32 last = m_sent_receivers_count;
    Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
unknown's avatar
unknown committed
594
    Uint32 sent = 0;
unknown's avatar
unknown committed
595 596
    for(Uint32 i = 0; i<cnt; i++){
      NdbReceiver * tRec = m_api_receivers[i];
unknown's avatar
unknown committed
597 598 599 600 601 602 603
      if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
      {
	m_sent_receivers[last+sent] = tRec;
	tRec->m_list_index = last+sent;
	tRec->prepareSend();
	sent++;
      }
unknown's avatar
unknown committed
604
    }
605 606
    memmove(m_api_receivers, m_api_receivers+cnt, 
	    (theParallelism-cnt) * sizeof(char*));
unknown's avatar
unknown committed
607
    
unknown's avatar
unknown committed
608 609 610 611
    int ret = 0;
    if(sent)
    {
      Uint32 nodeId = theNdbCon->theDBnode;
612
      TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
613
      if(cnt > 21){
unknown's avatar
unknown committed
614 615 616 617 618 619
	tSignal.setLength(4);
	LinearSectionPtr ptr[3];
	ptr[0].p = prep_array;
	ptr[0].sz = sent;
	ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
      } else {
620
	tSignal.setLength(4+sent);
unknown's avatar
unknown committed
621 622
	ret = tp->sendSignal(&tSignal, nodeId);
      }
unknown's avatar
unknown committed
623
    }
unknown's avatar
unknown committed
624
    m_sent_receivers_count = last + sent;
unknown's avatar
unknown committed
625 626
    m_api_receivers_count -= cnt;
    m_current_api_receiver = 0;
unknown's avatar
unknown committed
627
    
unknown's avatar
unknown committed
628
    return ret;
629
  }
unknown's avatar
unknown committed
630
  return 0;
631 632 633 634 635 636
}

int 
NdbScanOperation::prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId)
{
  printf("NdbScanOperation::prepareSend\n");
unknown's avatar
unknown committed
637
  abort();
638 639 640 641 642 643 644 645 646 647
  return 0;
}

int 
NdbScanOperation::doSend(int ProcessorId)
{
  printf("NdbScanOperation::doSend\n");
  return 0;
}

648
void NdbScanOperation::close(bool forceSend, bool releaseOp)
649
{
650 651 652 653 654 655
  DBUG_ENTER("NdbScanOperation::close");
  DBUG_PRINT("enter", ("this=%x tcon=%x con=%x force=%d release=%d",
                       (UintPtr)this,
                       (UintPtr)m_transConnection, (UintPtr)theNdbCon,
                       forceSend, releaseOp));

656
  if(m_transConnection){
unknown's avatar
unknown committed
657
    if(DEBUG_NEXT_RESULT)
658
      ndbout_c("close() theError.code = %d "
unknown's avatar
unknown committed
659 660 661 662 663 664 665 666
	       "m_api_receivers_count = %d "
	       "m_conf_receivers_count = %d "
	       "m_sent_receivers_count = %d",
	       theError.code, 
	       m_api_receivers_count,
	       m_conf_receivers_count,
	       m_sent_receivers_count);
    
667
    TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
unknown's avatar
unknown committed
668 669 670 671 672 673 674 675 676
    /*
      The PollGuard has an implicit call of unlock_and_signal through the
      ~PollGuard method. This method is called implicitly by the compiler
      in all places where the object is out of context due to a return,
      break, continue or simply end of statement block
    */
    PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
                         theNdb->theNdbBlockNumber);
    close_impl(tp, forceSend, &poll_guard);
677 678 679 680 681
  }

  NdbConnection* tCon = theNdbCon;
  NdbConnection* tTransCon = m_transConnection;
  theNdbCon = NULL;
unknown's avatar
unknown committed
682
  m_transConnection = NULL;
683 684 685

  if (releaseOp && tTransCon) {
    NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
686
    tTransCon->releaseScanOperation(tOp);
687 688 689 690
  }
  
  tCon->theScanningOp = 0;
  theNdb->closeTransaction(tCon);
691
  theNdb->theRemainingStartTransactions--;
692
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
693
}
694

unknown's avatar
unknown committed
695
void
unknown's avatar
unknown committed
696
NdbScanOperation::execCLOSE_SCAN_REP(){
unknown's avatar
unknown committed
697 698
  m_conf_receivers_count = 0;
  m_sent_receivers_count = 0;
699 700
}

unknown's avatar
unknown committed
701
void NdbScanOperation::release()
702
{
unknown's avatar
unknown committed
703
  if(theNdbCon != 0 || m_transConnection != 0){
704
    close();
unknown's avatar
unknown committed
705 706 707
  }
  for(Uint32 i = 0; i<m_allocated_receivers; i++){
    m_receivers[i]->release();
708
  }
709 710 711

  NdbOperation::release();
  
unknown's avatar
unknown committed
712 713 714 715 716
  if(theSCAN_TABREQ)
  {
    theNdb->releaseSignal(theSCAN_TABREQ);
    theSCAN_TABREQ = 0;
  }
717 718
}

unknown's avatar
unknown committed
719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
/***************************************************************************
int prepareSendScan(Uint32 aTC_ConnectPtr,
                    Uint64 aTransactionId)

Return Value:   Return 0 : preparation of send was succesful.
                Return -1: In all other case.   
Parameters:     aTC_ConnectPtr: the Connect pointer to TC.
		aTransactionId:	the Transaction identity of the transaction.
Remark:         Puts the the final data into ATTRINFO signal(s)  after this 
                we know the how many signal to send and their sizes
***************************************************************************/
int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
				      Uint64 aTransactionId){

  if (theInterpretIndicator != 1 ||
      (theOperationType != OpenScanRequest &&
       theOperationType != OpenRangeScanRequest)) {
    setErrorCodeAbort(4005);
    return -1;
  }
739

unknown's avatar
unknown committed
740
  theErrorLine = 0;
741

unknown's avatar
unknown committed
742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758
  // In preapareSendInterpreted we set the sizes (word 4-8) in the
  // first ATTRINFO signal.
  if (prepareSendInterpreted() == -1)
    return -1;
  
  if(m_ordered){
    ((NdbIndexScanOperation*)this)->fix_get_values();
  }
  
  theCurrentATTRINFO->setLength(theAI_LenInCurrAI);

  /**
   * Prepare all receivers
   */
  theReceiver.prepareSend();
  bool keyInfo = m_keyInfo;
  Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
unknown's avatar
unknown committed
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773
  /**
   * The number of records sent by each LQH is calculated and the kernel
   * is informed of this number by updating the SCAN_TABREQ signal
   */
  Uint32 batch_size, batch_byte_size, first_batch_size;
  theReceiver.calculate_batch_size(key_size,
                                   theParallelism,
                                   batch_size,
                                   batch_byte_size,
                                   first_batch_size);
  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
  ScanTabReq::setScanBatch(req->requestInfo, batch_size);
  req->batch_byte_size= batch_byte_size;
  req->first_batch_size= first_batch_size;

unknown's avatar
unknown committed
774 775 776 777 778 779
  /**
   * Set keyinfo flag
   *  (Always keyinfo when using blobs)
   */
  Uint32 reqInfo = req->requestInfo;
  ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
unknown's avatar
unknown committed
780
  ScanTabReq::setNoDiskFlag(reqInfo, m_no_disk_flag);
unknown's avatar
unknown committed
781 782
  req->requestInfo = reqInfo;
  
unknown's avatar
unknown committed
783
  for(Uint32 i = 0; i<theParallelism; i++){
784 785 786
    m_receivers[i]->do_get_value(&theReceiver, batch_size, 
				 key_size, 
				 m_read_range_no);
787
  }
unknown's avatar
unknown committed
788
  return 0;
789 790
}

unknown's avatar
unknown committed
791
/*****************************************************************************
unknown's avatar
unknown committed
792 793 794 795 796 797
int doSend()

Return Value:   Return >0 : send was succesful, returns number of signals sent
                Return -1: In all other case.   
Parameters:     aProcessorId: Receiving processor node
Remark:         Sends the ATTRINFO signal(s)
unknown's avatar
unknown committed
798
*****************************************************************************/
unknown's avatar
unknown committed
799 800
int
NdbScanOperation::doSendScan(int aProcessorId)
801
{
unknown's avatar
unknown committed
802 803 804 805 806 807 808 809 810 811 812 813
  Uint32 tSignalCount = 0;
  NdbApiSignal* tSignal;
 
  if (theInterpretIndicator != 1 ||
      (theOperationType != OpenScanRequest &&
       theOperationType != OpenRangeScanRequest)) {
      setErrorCodeAbort(4005);
      return -1;
  }
  
  assert(theSCAN_TABREQ != NULL);
  tSignal = theSCAN_TABREQ;
814 815 816 817 818 819
  
  Uint32 tupKeyLen = theTupKeyLen;
  Uint32 len = theTotalNrOfKeyWordInSignal;
  Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
  Uint64 transId = theNdbCon->theTransactionId;
  
unknown's avatar
unknown committed
820 821 822 823
  // Update the "attribute info length in words" in SCAN_TABREQ before 
  // sending it. This could not be done in openScan because 
  // we created the ATTRINFO signals after the SCAN_TABREQ signal.
  ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
824
  req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
825 826 827
  Uint32 tmp = req->requestInfo;
  ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_);
  req->distributionKey = theDistributionKey;
828
  req->requestInfo = tmp;
829
  tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
830

831
  TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
unknown's avatar
unknown committed
832 833 834
  LinearSectionPtr ptr[3];
  ptr[0].p = m_prepared_receivers;
  ptr[0].sz = theParallelism;
835
  if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) {
unknown's avatar
unknown committed
836 837 838
    setErrorCode(4002);
    return -1;
  } 
839 840

  if (tupKeyLen > 0){
unknown's avatar
unknown committed
841
    // must have at least one signal since it contains attrLen for bounds
842 843 844 845
    assert(theLastKEYINFO != NULL);
    tSignal = theLastKEYINFO;
    tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);
    
846 847
    assert(theSCAN_TABREQ->next() != NULL);
    tSignal = theSCAN_TABREQ->next();
848 849 850 851 852 853 854 855
    
    NdbApiSignal* last;
    do {
      KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
      keyInfo->connectPtr = aTC_ConnectPtr;
      keyInfo->transId[0] = Uint32(transId);
      keyInfo->transId[1] = Uint32(transId >> 32);
      
unknown's avatar
unknown committed
856
      if (tp->sendSignal(tSignal,aProcessorId) == -1){
857 858
	setErrorCode(4002);
	return -1;
unknown's avatar
unknown committed
859
      }
860
      
unknown's avatar
unknown committed
861
      tSignalCount++;
862
      last = tSignal;
unknown's avatar
unknown committed
863
      tSignal = tSignal->next();
864
    } while(last != theLastKEYINFO);
865
  }
unknown's avatar
unknown committed
866 867 868
  
  tSignal = theFirstATTRINFO;
  while (tSignal != NULL) {
869 870 871 872 873
    AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend());
    attrInfo->connectPtr = aTC_ConnectPtr;
    attrInfo->transId[0] = Uint32(transId);
    attrInfo->transId[1] = Uint32(transId >> 32);
    
unknown's avatar
unknown committed
874 875 876 877 878 879 880 881
    if (tp->sendSignal(tSignal,aProcessorId) == -1){
      setErrorCode(4002);
      return -1;
    }
    tSignalCount++;
    tSignal = tSignal->next();
  }    
  theStatus = WaitResponse;  
unknown's avatar
unknown committed
882

883
  m_curr_row = 0;
unknown's avatar
unknown committed
884 885 886 887 888 889 890
  m_sent_receivers_count = theParallelism;
  if(m_ordered)
  {
    m_current_api_receiver = theParallelism;
    m_api_receivers_count = theParallelism;
  }
  
unknown's avatar
unknown committed
891 892 893
  return tSignalCount;
}//NdbOperation::doSendScan()

unknown's avatar
unknown committed
894
/*****************************************************************************
895
 * NdbOperation* takeOverScanOp(NdbTransaction* updateTrans);
unknown's avatar
unknown committed
896
 *
897
 * Parameters:     The update transactions NdbTransaction pointer.
unknown's avatar
unknown committed
898 899 900 901 902 903 904 905 906
 * Return Value:   A reference to the transferred operation object 
 *                   or NULL if no success.
 * Remark:         Take over the scanning transactions NdbOperation 
 *                 object for a tuple to an update transaction, 
 *                 which is the last operation read in nextScanResult()
 *		   (theNdbCon->thePreviousScanRec)
 *
 *     FUTURE IMPLEMENTATION:   (This note was moved from header file.)
 *     In the future, it will even be possible to transfer 
907 908
 *     to a NdbTransaction on another Ndb-object.  
 *     In this case the receiving NdbTransaction-object must call 
unknown's avatar
unknown committed
909 910 911 912
 *     a method receiveOpFromScan to actually receive the information.  
 *     This means that the updating transactions can be placed
 *     in separate threads and thus increasing the parallelism during
 *     the scan process. 
unknown's avatar
unknown committed
913
 ****************************************************************************/
unknown's avatar
unknown committed
914
int
915
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, Uint32 & size)
unknown's avatar
unknown committed
916
{
917 918 919
  NdbRecAttr * tRecAttr = m_curr_row;
  if(tRecAttr)
  {
unknown's avatar
unknown committed
920
    const Uint32 * src = (Uint32*)tRecAttr->aRef();
921 922 923 924 925 926 927 928

    assert(tRecAttr->get_size_in_bytes() > 0);
    assert(tRecAttr->get_size_in_bytes() < 65536);
    const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;

    assert(size >= len);
    memcpy(data, src, 4*len);
    size = len;
unknown's avatar
unknown committed
929 930 931 932 933
    return 0;
  }
  return -1;
}

unknown's avatar
unknown committed
934
NdbOperation*
935 936
NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
{
unknown's avatar
unknown committed
937
  
938 939 940
  NdbRecAttr * tRecAttr = m_curr_row;
  if(tRecAttr)
  {
unknown's avatar
unknown committed
941 942 943 944
    NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable);
    if (newOp == NULL){
      return NULL;
    }
unknown's avatar
unknown committed
945
    pTrans->theSimpleState = 0;
unknown's avatar
unknown committed
946
    
unknown's avatar
unknown committed
947 948 949 950
    assert(tRecAttr->get_size_in_bytes() > 0);
    assert(tRecAttr->get_size_in_bytes() < 65536);
    const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
    
unknown's avatar
unknown committed
951 952 953 954 955 956 957 958 959
    newOp->theTupKeyLen = len;
    newOp->theOperationType = opType;
    if (opType == DeleteRequest) {
      newOp->theStatus = GetValue;  
    } else {
      newOp->theStatus = SetValue;  
    }
    
    const Uint32 * src = (Uint32*)tRecAttr->aRef();
unknown's avatar
unknown committed
960
    const Uint32 tScanInfo = src[len] & 0x3FFFF;
unknown's avatar
unknown committed
961
    const Uint32 tTakeOverFragment = src[len] >> 20;
unknown's avatar
unknown committed
962 963 964
    {
      UintR scanInfo = 0;
      TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
unknown's avatar
unknown committed
965
      TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
unknown's avatar
unknown committed
966 967
      TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
      newOp->theScanInfo = scanInfo;
unknown's avatar
unknown committed
968 969
      newOp->theDistrKeyIndicator_ = 1;
      newOp->theDistributionKey = tTakeOverFragment;
unknown's avatar
unknown committed
970
    }
971

unknown's avatar
unknown committed
972 973 974 975 976 977 978 979 980
    // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ
    TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend());
    Uint32 i = 0;
    for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) {
      tcKeyReq->keyInfo[i] = * src++;
    }
    
    if(i < len){
      NdbApiSignal* tSignal = theNdb->getSignal();
981
      newOp->theTCREQ->next(tSignal); 
unknown's avatar
unknown committed
982 983 984 985 986 987 988 989 990 991 992 993
      
      Uint32 left = len - i;
      while(tSignal && left > KeyInfo::DataLength){
	tSignal->setSignal(GSN_KEYINFO);
	KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
	memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
	src += KeyInfo::DataLength;
	left -= KeyInfo::DataLength;

	tSignal->next(theNdb->getSignal());
	tSignal = tSignal->next();
      }
994

unknown's avatar
unknown committed
995 996 997 998
      if(tSignal && left > 0){
	tSignal->setSignal(GSN_KEYINFO);
	KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
	memcpy(keyInfo->keyData, src, 4 * left);
unknown's avatar
unknown committed
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
      }      
    }
    // create blob handles automatically
    if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
      for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
	NdbColumnImpl* c = m_currentTable->m_columns[i];
	assert(c != 0);
	if (c->getBlobType()) {
	  if (newOp->getBlobHandle(pTrans, c) == NULL)
	    return NULL;
	}
unknown's avatar
unknown committed
1010 1011
      }
    }
unknown's avatar
unknown committed
1012
    
unknown's avatar
unknown committed
1013
    return newOp;
1014
  }
unknown's avatar
unknown committed
1015
  return 0;
1016 1017
}

unknown's avatar
unknown committed
1018 1019 1020
NdbBlob*
NdbScanOperation::getBlobHandle(const char* anAttrName)
{
unknown's avatar
unknown committed
1021
  m_keyInfo = 1;
unknown's avatar
unknown committed
1022 1023 1024 1025 1026 1027 1028
  return NdbOperation::getBlobHandle(m_transConnection, 
				     m_currentTable->getColumn(anAttrName));
}

NdbBlob*
NdbScanOperation::getBlobHandle(Uint32 anAttrId)
{
unknown's avatar
unknown committed
1029
  m_keyInfo = 1;
unknown's avatar
unknown committed
1030 1031 1032 1033
  return NdbOperation::getBlobHandle(m_transConnection, 
				     m_currentTable->getColumn(anAttrId));
}

unknown's avatar
unknown committed
1034 1035
NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
  : NdbScanOperation(aNdb)
1036
{
unknown's avatar
unknown committed
1037
}
1038

unknown's avatar
unknown committed
1039
NdbIndexScanOperation::~NdbIndexScanOperation(){
1040 1041
}

unknown's avatar
unknown committed
1042
int
unknown's avatar
unknown committed
1043
NdbIndexScanOperation::setBound(const char* anAttrName, int type, 
unknown's avatar
unknown committed
1044
				const void* aValue)
1045
{
unknown's avatar
unknown committed
1046
  return setBound(m_accessTable->getColumn(anAttrName), type, aValue);
1047 1048
}

unknown's avatar
unknown committed
1049
int
unknown's avatar
unknown committed
1050
NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, 
unknown's avatar
unknown committed
1051
				const void* aValue)
1052
{
unknown's avatar
unknown committed
1053
  return setBound(m_accessTable->getColumn(anAttrId), type, aValue);
unknown's avatar
unknown committed
1054
}
1055

unknown's avatar
unknown committed
1056 1057
int
NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, 
unknown's avatar
unknown committed
1058 1059 1060
				  const char* aValue)
{
  return setBound(anAttrObject, BoundEQ, aValue);
1061 1062
}

unknown's avatar
unknown committed
1063 1064 1065
NdbRecAttr*
NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, 
				     char* aValue){
1066
  if(!m_ordered){
unknown's avatar
unknown committed
1067 1068
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
  }
1069
  
unknown's avatar
unknown committed
1070
  int id = attrInfo->getColumnNo();                // In "real" table
1071 1072 1073 1074 1075 1076 1077
  assert(m_accessTable->m_index);
  int sz = (int)m_accessTable->m_index->m_key_ids.size();
  if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
  }
  
  assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
unknown's avatar
unknown committed
1078
  Uint32 marker = theTupleKeyDefined[id][0];
1079
  
unknown's avatar
unknown committed
1080 1081 1082 1083
  if(marker == SETBOUND_EQ){
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
  } else if(marker == API_PTR){
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
1084
  }
unknown's avatar
unknown committed
1085
  
1086 1087
  assert(marker == FAKE_PTR);
  
unknown's avatar
unknown committed
1088 1089 1090 1091 1092 1093 1094 1095 1096
  UintPtr oldVal;
  oldVal = theTupleKeyDefined[id][1];
#if (SIZEOF_CHARP == 8)
  oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32);
#endif
  theTupleKeyDefined[id][0] = API_PTR;

  NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
  tmp->setup(attrInfo, aValue);
1097

unknown's avatar
unknown committed
1098
  return tmp;
1099 1100
}

unknown's avatar
unknown committed
1101 1102 1103 1104 1105 1106
#include <AttributeHeader.hpp>
/*
 * Define bound on index column in range scan.
 */
int
NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, 
unknown's avatar
unknown committed
1107
				int type, const void* aValue)
1108
{
unknown's avatar
unknown committed
1109
  if (theOperationType == OpenRangeScanRequest &&
unknown's avatar
unknown committed
1110
      (0 <= type && type <= 4)) {
1111
    // insert bound type
1112 1113
    Uint32 currLen = theTotalNrOfKeyWordInSignal;
    Uint32 remaining = KeyInfo::DataLength - currLen;
1114
    bool tDistrKey = tAttrInfo->m_distributionKey;
1115

unknown's avatar
unknown committed
1116 1117 1118 1119 1120 1121
    Uint32 len = 0;
    if (aValue != NULL)
      if (! tAttrInfo->get_var_length(aValue, len)) {
        setErrorCodeAbort(4209);
        return -1;
      }
unknown's avatar
unknown committed
1122

1123
    // insert attribute header
unknown's avatar
unknown committed
1124 1125
    Uint32 tIndexAttrId = tAttrInfo->m_attrId;
    Uint32 sizeInWords = (len + 3) / 4;
1126
    AttributeHeader ah(tIndexAttrId, sizeInWords << 2);
1127 1128
    const Uint32 ahValue = ah.m_value;

1129
    const Uint32 align = (UintPtr(aValue) & 7);
unknown's avatar
unknown committed
1130 1131 1132
    const bool aligned = (tDistrKey && type == BoundEQ) ? 
      (align == 0) : (align & 3) == 0;

1133 1134 1135
    const bool nobytes = (len & 0x3) == 0;
    const Uint32 totalLen = 2 + sizeInWords;
    Uint32 tupKeyLen = theTupKeyLen;
1136
    if(remaining > totalLen && aligned && nobytes){
1137 1138 1139 1140 1141 1142 1143
      Uint32 * dst = theKEYINFOptr + currLen;
      * dst ++ = type;
      * dst ++ = ahValue;
      memcpy(dst, aValue, 4 * sizeInWords);
      theTotalNrOfKeyWordInSignal = currLen + totalLen;
    } else {
      if(!aligned || !nobytes){
unknown's avatar
unknown committed
1144
        Uint32 tempData[2000];
1145 1146
	tempData[0] = type;
	tempData[1] = ahValue;
unknown's avatar
unknown committed
1147
	tempData[2 + (len >> 2)] = 0;
1148
        memcpy(tempData+2, aValue, len);
unknown's avatar
unknown committed
1149
	
1150 1151 1152 1153 1154
	insertBOUNDS(tempData, 2+sizeInWords);
      } else {
	Uint32 buf[2] = { type, ahValue };
	insertBOUNDS(buf, 2);
	insertBOUNDS((Uint32*)aValue, sizeInWords);
1155
      }
unknown's avatar
unknown committed
1156
    }
1157
    theTupKeyLen = tupKeyLen + totalLen;
1158

unknown's avatar
unknown committed
1159 1160 1161 1162 1163 1164 1165 1166 1167
    /**
     * Do sorted stuff
     */

    /**
     * The primary keys for an ordered index is defined in the beginning
     * so it's safe to use [tIndexAttrId] 
     * (instead of looping as is NdbOperation::equal_impl)
     */
1168 1169 1170 1171
    if(type == BoundEQ && tDistrKey)
    {
      theNoOfTupKeyLeft--;
      return handle_distribution_key((Uint64*)aValue, sizeInWords);
unknown's avatar
unknown committed
1172 1173 1174 1175 1176
    }
    return 0;
  } else {
    setErrorCodeAbort(4228);    // XXX wrong code
    return -1;
1177 1178 1179
  }
}

1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210
int
NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
  Uint32 len;
  Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal;
  Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal;
  do {
    len = (sz < remaining ? sz : remaining);
    memcpy(dst, data, 4 * len);
    
    if(sz >= remaining){
      NdbApiSignal* tCurr = theLastKEYINFO;
      tCurr->setLength(KeyInfo::MaxSignalLength);
      NdbApiSignal* tSignal = tCurr->next();
      if(tSignal)
	;
      else if((tSignal = theNdb->getSignal()) != 0)
      {
	tCurr->next(tSignal);
	tSignal->setSignal(GSN_KEYINFO);
      } else {
	goto error;
      }
      theLastKEYINFO = tSignal;
      theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
      remaining = KeyInfo::DataLength;
      sz -= len;
      data += len;
    } else {
      len = (KeyInfo::DataLength - remaining) + len;
      break;
    }
unknown's avatar
unknown committed
1211
  } while(true);   
1212 1213 1214 1215 1216 1217 1218 1219
  theTotalNrOfKeyWordInSignal = len;
  return 0;

error:
  setErrorCodeAbort(4228);    // XXX wrong code
  return -1;
}

1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244
Uint32
NdbIndexScanOperation::getKeyFromSCANTABREQ(Uint32* data, Uint32 size)
{
  DBUG_ENTER("NdbIndexScanOperation::getKeyFromSCANTABREQ");
  assert(size >= theTotalNrOfKeyWordInSignal);
  size = theTotalNrOfKeyWordInSignal;
  NdbApiSignal* tSignal = theSCAN_TABREQ->next();
  Uint32 pos = 0;
  while (pos < size) {
    assert(tSignal != NULL);
    Uint32* tData = tSignal->getDataPtrSend();
    Uint32 rem = size - pos;
    if (rem > KeyInfo::DataLength)
      rem = KeyInfo::DataLength;
    Uint32 i = 0;
    while (i < rem) {
      data[pos + i] = tData[KeyInfo::HeaderLength + i];
      i++;
    }
    pos += rem;
  }
  DBUG_DUMP("key", (char*)data, size << 2);
  DBUG_RETURN(size);
}

1245
int
unknown's avatar
unknown committed
1246
NdbIndexScanOperation::readTuples(LockMode lm,
1247 1248 1249 1250 1251 1252 1253 1254
				  Uint32 scan_flags,
				  Uint32 parallel)
{
  const bool order_by = scan_flags & SF_OrderBy;
  const bool order_desc = scan_flags & SF_Descending;
  const bool read_range_no = scan_flags & SF_ReadRangeNo;

  int res = NdbScanOperation::readTuples(lm, scan_flags, 0);
1255
  if(!res && read_range_no)
1256 1257 1258 1259 1260
  {
    m_read_range_no = 1;
    Uint32 word = 0;
    AttributeHeader::init(&word, AttributeHeader::RANGE_NO, 0);
    if(insertATTRINFO(word) == -1)
1261
      res = -1;
1262
  }
1263
  if(!res && order_by){
1264 1265 1266 1267 1268 1269
    m_ordered = true;
    if (order_desc) {
      m_descending = true;
      ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
      ScanTabReq::setDescendingFlag(req->requestInfo, true);
    }
1270 1271
    Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
    m_sort_columns = cnt; // -1 for NDB$NODE
unknown's avatar
unknown committed
1272
    m_current_api_receiver = m_sent_receivers_count;
1273
    m_api_receivers_count = m_sent_receivers_count;
1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286
    
    m_sort_columns = cnt;
    for(Uint32 i = 0; i<cnt; i++){
      const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
      const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
      NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
      UintPtr newVal = UintPtr(tmp);
      theTupleKeyDefined[i][0] = FAKE_PTR;
      theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
#if (SIZEOF_CHARP == 8)
      theTupleKeyDefined[i][2] = (newVal >> 32);
#endif
    }
unknown's avatar
unknown committed
1287
  }
1288 1289 1290
  m_this_bound_start = 0;
  m_first_bound_word = theKEYINFOptr;
  
1291
  return res;
unknown's avatar
unknown committed
1292
}
1293

unknown's avatar
unknown committed
1294 1295 1296 1297 1298 1299
void
NdbIndexScanOperation::fix_get_values(){
  /**
   * Loop through all getValues and set buffer pointer to "API" pointer
   */
  NdbRecAttr * curr = theReceiver.theFirstRecAttr;
1300
  Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
unknown's avatar
unknown committed
1301
  assert(cnt <  NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
1302
  
unknown's avatar
unknown committed
1303 1304
  const NdbIndexImpl * idx = m_accessTable->m_index;
  const NdbTableImpl * tab = m_currentTable;
1305 1306 1307
  for(Uint32 i = 0; i<cnt; i++){
    Uint32 val = theTupleKeyDefined[i][0];
    switch(val){
1308 1309
    case FAKE_PTR:
      curr->setup(curr->m_column, 0);
1310
    case API_PTR:
1311 1312
      curr = curr->next();
      break;
1313 1314
    case SETBOUND_EQ:
      break;
unknown's avatar
unknown committed
1315
#ifdef VM_TRACE
1316 1317
    default:
      abort();
unknown's avatar
unknown committed
1318 1319
#endif
    }
1320 1321 1322
  }
}

unknown's avatar
unknown committed
1323 1324 1325 1326 1327 1328 1329
int
NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, 
			       const NdbReceiver* t1, 
			       const NdbReceiver* t2){

  NdbRecAttr * r1 = t1->m_rows[t1->m_current_row];
  NdbRecAttr * r2 = t2->m_rows[t2->m_current_row];
1330

unknown's avatar
unknown committed
1331 1332
  r1 = (skip ? r1->next() : r1);
  r2 = (skip ? r2->next() : r2);
1333 1334
  const int jdir = 1 - 2 * (int)m_descending;
  assert(jdir == 1 || jdir == -1);
unknown's avatar
unknown committed
1335 1336 1337 1338 1339
  while(cols > 0){
    Uint32 * d1 = (Uint32*)r1->aRef();
    Uint32 * d2 = (Uint32*)r2->aRef();
    unsigned r1_null = r1->isNULL();
    if((r1_null ^ (unsigned)r2->isNULL())){
1340
      return (r1_null ? -1 : 1) * jdir;
unknown's avatar
unknown committed
1341
    }
1342
    const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
unknown's avatar
unknown committed
1343 1344
    Uint32 len1 = r1->get_size_in_bytes();
    Uint32 len2 = r2->get_size_in_bytes();
unknown's avatar
unknown committed
1345
    if(!r1_null){
1346
      const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_type);
unknown's avatar
unknown committed
1347
      int r = (*sqlType.m_cmp)(col.m_cs, d1, len1, d2, len2, true);
unknown's avatar
unknown committed
1348 1349
      if(r){
	assert(r != NdbSqlUtil::CmpUnknown);
1350
	return r * jdir;
unknown's avatar
unknown committed
1351 1352 1353 1354 1355
      }
    }
    cols--;
    r1 = r1->next();
    r2 = r2->next();
1356
  }
unknown's avatar
unknown committed
1357
  return 0;
1358 1359
}

unknown's avatar
unknown committed
1360
int
unknown's avatar
unknown committed
1361 1362
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
					   bool forceSend){
unknown's avatar
unknown committed
1363
  
1364
  m_curr_row = 0;
unknown's avatar
unknown committed
1365
  Uint32 u_idx = 0, u_last = 0;
1366
  Uint32 s_idx   = m_current_api_receiver; // first sorted
unknown's avatar
unknown committed
1367 1368 1369
  Uint32 s_last  = theParallelism;         // last sorted

  NdbReceiver** arr = m_api_receivers;
1370
  NdbReceiver* tRec = arr[s_idx];
unknown's avatar
unknown committed
1371 1372 1373
  
  if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
				 fetchAllowed, 
1374
				 (s_idx < s_last ? tRec->nextResult() : 0));
unknown's avatar
unknown committed
1375 1376 1377 1378
  
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
				 u_idx, u_last,
				 s_idx, s_last);
1379 1380 1381
  
  bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();
  
unknown's avatar
unknown committed
1382 1383 1384
  if(fetchNeeded){
    if(fetchAllowed){
      if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
1385
      TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
unknown's avatar
unknown committed
1386 1387 1388 1389 1390 1391 1392 1393
      /*
        The PollGuard has an implicit call of unlock_and_signal through the
        ~PollGuard method. This method is called implicitly by the compiler
        in all places where the object is out of context due to a return,
        break, continue or simply end of statement block
      */
      PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
                           theNdb->theNdbBlockNumber);
1394 1395
      if(theError.code)
	return -1;
unknown's avatar
unknown committed
1396 1397
      Uint32 seq = theNdbCon->theNodeSequence;
      Uint32 nodeId = theNdbCon->theDBnode;
unknown's avatar
unknown committed
1398
      if(seq == tp->getNodeSequence(nodeId) &&
unknown's avatar
unknown committed
1399
	 !send_next_scan_ordered(s_idx)){
unknown's avatar
unknown committed
1400
	Uint32 tmp = m_sent_receivers_count;
1401
	s_idx = m_current_api_receiver; 
unknown's avatar
unknown committed
1402
	while(m_sent_receivers_count > 0 && !theError.code){
unknown's avatar
unknown committed
1403 1404 1405
          int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
                                             forceSend);
	  if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
unknown's avatar
unknown committed
1406 1407
	    continue;
	  }
1408
	  if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
1409 1410 1411 1412 1413 1414 1415
	  setErrorCode(4028);
	  return -1;
	}
	
	if(theError.code){
	  setErrorCode(theError.code);
	  if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
unknown's avatar
unknown committed
1416 1417 1418 1419 1420 1421 1422 1423 1424
	  return -1;
	}
	
	u_idx = 0;
	u_last = m_conf_receivers_count;
	m_conf_receivers_count = 0;
	memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
	
	if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
1425 1426 1427
      } else {
	setErrorCode(4028);
	return -1;
unknown's avatar
unknown committed
1428 1429
      }
    } else {
1430
      if(DEBUG_NEXT_RESULT) ndbout_c("return 2");
unknown's avatar
unknown committed
1431 1432
      return 2;
    }
1433 1434 1435 1436
  } else {
    u_idx = s_idx;
    u_last = s_idx + 1;
    s_idx++;
unknown's avatar
unknown committed
1437
  }
unknown's avatar
unknown committed
1438
  
unknown's avatar
unknown committed
1439 1440 1441 1442 1443
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
				 u_idx, u_last,
				 s_idx, s_last);


1444
  Uint32 cols = m_sort_columns + m_read_range_no;
unknown's avatar
unknown committed
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
  Uint32 skip = m_keyInfo;
  while(u_idx < u_last){
    u_last--;
    tRec = arr[u_last];
    
    // Do binary search instead to find place
    Uint32 place = s_idx;
    for(; place < s_last; place++){
      if(compare(skip, cols, tRec, arr[place]) <= 0){
	break;
      }
    }
    
    if(place != s_idx){
      if(DEBUG_NEXT_RESULT) 
	ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx));
      memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx));
    }
    
    if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1);
    m_api_receivers[place-1] = tRec;
    s_idx--;
1467 1468
  }

unknown's avatar
unknown committed
1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
				 u_idx, u_last,
				 s_idx, s_last);
  
  m_current_api_receiver = s_idx;
  
  if(DEBUG_NEXT_RESULT)
    for(Uint32 i = s_idx; i<s_last; i++)
      ndbout_c("%p", arr[i]);
  
  tRec = m_api_receivers[s_idx];    
  if(s_idx < s_last && tRec->nextResult()){
1481
    m_curr_row = tRec->copyout(theReceiver);      
1482
    if(DEBUG_NEXT_RESULT) ndbout_c("return 0");
unknown's avatar
unknown committed
1483
    return 0;
1484 1485
  }

unknown's avatar
unknown committed
1486 1487 1488
  theError.code = -1;
  if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
  return 1;
1489 1490
}

1491
int
unknown's avatar
unknown committed
1492 1493
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
{
unknown's avatar
unknown committed
1494 1495 1496
  if(idx == theParallelism)
    return 0;
  
unknown's avatar
unknown committed
1497
  NdbReceiver* tRec = m_api_receivers[idx];
unknown's avatar
unknown committed
1498 1499 1500
  NdbApiSignal tSignal(theNdb->theMyRef);
  tSignal.setSignal(GSN_SCAN_NEXTREQ);
  
unknown's avatar
unknown committed
1501
  Uint32 last = m_sent_receivers_count;
unknown's avatar
unknown committed
1502
  Uint32* theData = tSignal.getDataPtrSend();
unknown's avatar
unknown committed
1503 1504 1505 1506 1507 1508 1509 1510 1511 1512
  Uint32* prep_array = theData + 4;
  
  m_current_api_receiver = idx + 1;
  if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
  {
    if(DEBUG_NEXT_RESULT)
      ndbout_c("receiver completed, don't send");
    return 0;
  }
  
unknown's avatar
unknown committed
1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525
  theData[0] = theNdbCon->theTCConPtr;
  theData[1] = 0;
  Uint64 transId = theNdbCon->theTransactionId;
  theData[2] = transId;
  theData[3] = (Uint32) (transId >> 32);
  
  /**
   * Prepare ops
   */
  m_sent_receivers[last] = tRec;
  tRec->m_list_index = last;
  tRec->prepareSend();
  m_sent_receivers_count = last + 1;
1526
  
unknown's avatar
unknown committed
1527
  Uint32 nodeId = theNdbCon->theDBnode;
1528
  TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
unknown's avatar
unknown committed
1529
  tSignal.setLength(4+1);
unknown's avatar
unknown committed
1530 1531
  int ret= tp->sendSignal(&tSignal, nodeId);
  return ret;
1532
}
unknown's avatar
unknown committed
1533 1534

int
unknown's avatar
unknown committed
1535 1536 1537
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
                             PollGuard *poll_guard)
{
unknown's avatar
unknown committed
1538 1539 1540
  Uint32 seq = theNdbCon->theNodeSequence;
  Uint32 nodeId = theNdbCon->theDBnode;
  
unknown's avatar
unknown committed
1541 1542
  if(seq != tp->getNodeSequence(nodeId))
  {
unknown's avatar
unknown committed
1543 1544 1545
    theNdbCon->theReleaseOnClose = true;
    return -1;
  }
1546
  
unknown's avatar
unknown committed
1547 1548 1549 1550 1551
  /**
   * Wait for outstanding
   */
  while(theError.code == 0 && m_sent_receivers_count)
  {
unknown's avatar
unknown committed
1552 1553
    int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
                                           false);
unknown's avatar
unknown committed
1554 1555 1556 1557 1558 1559 1560 1561 1562
    switch(return_code){
    case 0:
      break;
    case -1:
      setErrorCode(4008);
    case -2:
      m_api_receivers_count = 0;
      m_conf_receivers_count = 0;
      m_sent_receivers_count = 0;
1563
      theNdbCon->theReleaseOnClose = true;
unknown's avatar
unknown committed
1564 1565 1566 1567
      return -1;
    }
  }

1568 1569 1570 1571 1572 1573 1574
  if(theError.code)
  {
    m_api_receivers_count = 0;
    m_current_api_receiver = m_ordered ? theParallelism : 0;
  }


unknown's avatar
unknown committed
1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609
  /**
   * move all conf'ed into api
   *   so that send_next_scan can check if they needs to be closed
   */
  Uint32 api = m_api_receivers_count;
  Uint32 conf = m_conf_receivers_count;

  if(m_ordered)
  {
    /**
     * Ordered scan, keep the m_api_receivers "to the right"
     */
    memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, 
	    (theParallelism - m_current_api_receiver) * sizeof(char*));
    api = (theParallelism - m_current_api_receiver);
    m_api_receivers_count = api;
  }
  
  if(DEBUG_NEXT_RESULT)
    ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
	     m_ordered, api, conf, 
	     m_sent_receivers_count, m_current_api_receiver, theParallelism);
  
  if(api+conf)
  {
    /**
     * There's something to close
     *   setup m_api_receivers (for send_next_scan)
     */
    memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
    m_api_receivers_count = api + conf;
    m_conf_receivers_count = 0;
  }
  
  // Send close scan
unknown's avatar
unknown committed
1610
  if(send_next_scan(api+conf, true) == -1)
unknown's avatar
unknown committed
1611 1612 1613
  {
    theNdbCon->theReleaseOnClose = true;
    return -1;
unknown's avatar
unknown committed
1614 1615 1616 1617 1618
  }
  
  /**
   * wait for close scan conf
   */
unknown's avatar
unknown committed
1619 1620
  while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
  {
unknown's avatar
unknown committed
1621 1622
    int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
                                           forceSend);
unknown's avatar
unknown committed
1623 1624 1625 1626 1627 1628 1629 1630 1631
    switch(return_code){
    case 0:
      break;
    case -1:
      setErrorCode(4008);
    case -2:
      m_api_receivers_count = 0;
      m_conf_receivers_count = 0;
      m_sent_receivers_count = 0;
1632
      theNdbCon->theReleaseOnClose = true;
unknown's avatar
unknown committed
1633 1634 1635
      return -1;
    }
  }
unknown's avatar
unknown committed
1636
  
1637 1638
  return 0;
}
unknown's avatar
unknown committed
1639

1640 1641
void
NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
unknown's avatar
unknown committed
1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652
  for(Uint32 i = 0; i<parallell; i++){
    m_receivers[i]->m_list_index = i;
    m_prepared_receivers[i] = m_receivers[i]->getId();
    m_sent_receivers[i] = m_receivers[i];
    m_conf_receivers[i] = 0;
    m_api_receivers[i] = 0;
    m_receivers[i]->prepareSend();
  }
  
  m_api_receivers_count = 0;
  m_current_api_receiver = 0;
unknown's avatar
unknown committed
1653
  m_sent_receivers_count = 0;
unknown's avatar
unknown committed
1654
  m_conf_receivers_count = 0;
1655 1656 1657
}

int
unknown's avatar
unknown committed
1658
NdbScanOperation::restart(bool forceSend)
1659 1660
{
  
1661
  TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
unknown's avatar
unknown committed
1662 1663 1664 1665 1666 1667 1668 1669
  /*
    The PollGuard has an implicit call of unlock_and_signal through the
    ~PollGuard method. This method is called implicitly by the compiler
    in all places where the object is out of context due to a return,
    break, continue or simply end of statement block
  */
  PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
                       theNdb->theNdbBlockNumber);
1670 1671 1672 1673
  Uint32 nodeId = theNdbCon->theDBnode;
  
  {
    int res;
unknown's avatar
unknown committed
1674
    if((res= close_impl(tp, forceSend, &poll_guard)))
1675 1676 1677 1678 1679 1680 1681 1682 1683
    {
      return res;
    }
  }
  
  /**
   * Reset receivers
   */
  reset_receivers(theParallelism, m_ordered);
unknown's avatar
unknown committed
1684
  
unknown's avatar
unknown committed
1685
  theError.code = 0;
unknown's avatar
unknown committed
1686 1687 1688 1689
  if (doSendScan(nodeId) == -1)
    return -1;
  return 0;
}
1690 1691

int
unknown's avatar
unknown committed
1692
NdbIndexScanOperation::reset_bounds(bool forceSend){
1693 1694 1695
  int res;
  
  {
1696
    TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
unknown's avatar
unknown committed
1697 1698 1699 1700 1701 1702 1703 1704 1705
    /*
      The PollGuard has an implicit call of unlock_and_signal through the
      ~PollGuard method. This method is called implicitly by the compiler
      in all places where the object is out of context due to a return,
      break, continue or simply end of statement block
    */
    PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
                         theNdb->theNdbBlockNumber);
    res= close_impl(tp, forceSend, &poll_guard);
1706 1707 1708 1709
  }

  if(!res)
  {
unknown's avatar
unknown committed
1710
    theError.code = 0;
1711 1712
    reset_receivers(theParallelism, m_ordered);
    
1713 1714
    theLastKEYINFO = theSCAN_TABREQ->next();
    theKEYINFOptr = ((KeyInfo*)theLastKEYINFO->getDataPtrSend())->keyData;
unknown's avatar
unknown committed
1715 1716
    theTupKeyLen = 0;
    theTotalNrOfKeyWordInSignal = 0;
1717 1718
    theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys;
    theDistrKeyIndicator_ = 0;
1719 1720
    m_this_bound_start = 0;
    m_first_bound_word = theKEYINFOptr;
unknown's avatar
unknown committed
1721
    m_transConnection
unknown's avatar
unknown committed
1722
      ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,
unknown's avatar
unknown committed
1723
		    this);
1724 1725 1726 1727 1728
    m_transConnection->define_scan_op(this);
    return 0;
  }
  return res;
}
1729 1730

int
1731
NdbIndexScanOperation::end_of_bound(Uint32 no)
1732
{
1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745
  if(no < (1 << 13)) // Only 12-bits no of ranges
  {
    Uint32 bound_head = * m_first_bound_word;
    bound_head |= (theTupKeyLen - m_this_bound_start) << 16 | (no << 4);
    * m_first_bound_word = bound_head;
    
    m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;;
    m_this_bound_start = theTupKeyLen;
    return 0;
  }
  return -1;
}

1746
int
1747 1748
NdbIndexScanOperation::get_range_no()
{
1749 1750
  NdbRecAttr* tRecAttr = m_curr_row;
  if(m_read_range_no && tRecAttr)
1751
  {
1752 1753 1754 1755
    if(m_keyInfo)
      tRecAttr = tRecAttr->next();
    Uint32 ret = *(Uint32*)tRecAttr->aRef();
    return ret;
1756
  }
1757
  return -1;
1758
}