NdbScanOperation.cpp 46.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

17
#include <ndb_global.h>
18 19
#include <Ndb.hpp>
#include <NdbScanOperation.hpp>
unknown's avatar
unknown committed
20
#include <NdbIndexScanOperation.hpp>
21
#include <NdbTransaction.hpp>
22 23 24
#include "NdbApiSignal.hpp"
#include <NdbOut.hpp>
#include "NdbDictionaryImpl.hpp"
25
#include <NdbBlob.hpp>
26

unknown's avatar
unknown committed
27 28 29 30 31 32 33 34
#include <NdbRecAttr.hpp>
#include <NdbReceiver.hpp>

#include <stdlib.h>
#include <NdbSqlUtil.hpp>

#include <signaldata/ScanTab.hpp>
#include <signaldata/KeyInfo.hpp>
35
#include <signaldata/AttrInfo.hpp>
unknown's avatar
unknown committed
36 37
#include <signaldata/TcKeyReq.hpp>

unknown's avatar
unknown committed
38 39
#define DEBUG_NEXT_RESULT 0

unknown's avatar
unknown committed
40 41
NdbScanOperation::NdbScanOperation(Ndb* aNdb, NdbOperation::Type aType) :
  NdbOperation(aNdb, aType),
unknown's avatar
unknown committed
42
  m_transConnection(NULL)
43
{
unknown's avatar
unknown committed
44 45 46 47 48 49 50
  theParallelism = 0;
  m_allocated_receivers = 0;
  m_prepared_receivers = 0;
  m_api_receivers = 0;
  m_conf_receivers = 0;
  m_sent_receivers = 0;
  m_receivers = 0;
unknown's avatar
unknown committed
51
  m_array = new Uint32[1]; // skip if on delete in fix_receivers
unknown's avatar
unknown committed
52
  theSCAN_TABREQ = 0;
53 54 55 56
}

NdbScanOperation::~NdbScanOperation()
{
unknown's avatar
unknown committed
57
  for(Uint32 i = 0; i<m_allocated_receivers; i++){
unknown's avatar
unknown committed
58
    m_receivers[i]->release();
unknown's avatar
unknown committed
59 60 61
    theNdb->releaseNdbScanRec(m_receivers[i]);
  }
  delete[] m_array;
62 63 64 65
}

void
NdbScanOperation::setErrorCode(int aErrorCode){
66
  NdbTransaction* tmp = theNdbCon;
67 68 69 70 71 72 73
  theNdbCon = m_transConnection;
  NdbOperation::setErrorCode(aErrorCode);
  theNdbCon = tmp;
}

void
NdbScanOperation::setErrorCodeAbort(int aErrorCode){
74
  NdbTransaction* tmp = theNdbCon;
75 76 77 78 79 80 81 82 83 84 85 86 87 88
  theNdbCon = m_transConnection;
  NdbOperation::setErrorCodeAbort(aErrorCode);
  theNdbCon = tmp;
}

  
/*****************************************************************************
 * int init();
 *
 * Return Value:  Return 0 : init was successful.
 *                Return -1: In all other case.  
 * Remark:        Initiates operation record after allocation.
 *****************************************************************************/
int
89
NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
90 91 92
{
  m_transConnection = myConnection;
  //NdbConnection* aScanConnection = theNdb->startTransaction(myConnection);
93
  theNdb->theRemainingStartTransactions++; // will be checked in hupp...
94
  NdbTransaction* aScanConnection = theNdb->hupp(myConnection);
95
  if (!aScanConnection){
96
    theNdb->theRemainingStartTransactions--;
97
    setErrorCodeAbort(theNdb->getNdbError().code);
98
    return -1;
99
  }
100

unknown's avatar
unknown committed
101 102
  // NOTE! The hupped trans becomes the owner of the operation
  if(NdbOperation::init(tab, aScanConnection) != 0){
103
    theNdb->theRemainingStartTransactions--;
104 105
    return -1;
  }
unknown's avatar
unknown committed
106 107 108 109 110
  
  initInterpreter();
  
  theStatus = GetValue;
  theOperationType = OpenScanRequest;
unknown's avatar
unknown committed
111
  theNdbCon->theMagicNumber = 0xFE11DF;
112
  theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
113
  m_read_range_no = 0;
114 115 116
  return 0;
}

117 118
int 
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
119
			     Uint32 scan_flags, 
120
			     Uint32 parallel)
121
{
122
  m_ordered = m_descending = false;
unknown's avatar
unknown committed
123
  Uint32 fragCount = m_currentTable->m_fragmentCount;
124

unknown's avatar
unknown committed
125
  if (parallel > fragCount || parallel == 0) {
unknown's avatar
unknown committed
126
     parallel = fragCount;
unknown's avatar
unknown committed
127
  }
unknown's avatar
unknown committed
128 129 130 131 132 133 134

  // It is only possible to call openScan if 
  //  1. this transcation don't already  contain another scan operation
  //  2. this transaction don't already contain other operations
  //  3. theScanOp contains a NdbScanOperation
  if (theNdbCon->theScanningOp != NULL){
    setErrorCode(4605);
135
    return -1;
unknown's avatar
unknown committed
136
  }
137

unknown's avatar
unknown committed
138
  theNdbCon->theScanningOp = this;
unknown's avatar
unknown committed
139
  theLockMode = lm;
140

unknown's avatar
unknown committed
141 142 143 144 145 146 147 148 149 150 151 152
  bool lockExcl, lockHoldMode, readCommitted;
  switch(lm){
  case NdbScanOperation::LM_Read:
    lockExcl = false;
    lockHoldMode = true;
    readCommitted = false;
    break;
  case NdbScanOperation::LM_Exclusive:
    lockExcl = true;
    lockHoldMode = true;
    readCommitted = false;
    break;
unknown's avatar
unknown committed
153
  case NdbScanOperation::LM_CommittedRead:
unknown's avatar
unknown committed
154 155 156 157 158 159
    lockExcl = false;
    lockHoldMode = false;
    readCommitted = true;
    break;
  default:
    setErrorCode(4003);
160
    return -1;
unknown's avatar
unknown committed
161
  }
162

163
  m_keyInfo = ((scan_flags & SF_KeyInfo) || lockExcl) ? 1 : 0;
unknown's avatar
unknown committed
164

165
  bool rangeScan = false;
166 167
  if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
  {
168 169 170 171 172 173 174
    if (m_currentTable == m_accessTable){
      // Old way of scanning indexes, should not be allowed
      m_currentTable = theNdb->theDictionary->
	getTable(m_currentTable->m_primaryTable.c_str());
      assert(m_currentTable != NULL);
    }
    assert (m_currentTable != m_accessTable);
unknown's avatar
unknown committed
175
    // Modify operation state
176
    theStatus = GetValue;
unknown's avatar
unknown committed
177
    theOperationType  = OpenRangeScanRequest;
178
    rangeScan = true;
unknown's avatar
unknown committed
179
  }
180 181 182 183

  bool tupScan = (scan_flags & SF_TupScan);
  if (tupScan && rangeScan)
    tupScan = false;
unknown's avatar
unknown committed
184
  
unknown's avatar
unknown committed
185
  theParallelism = parallel;
186

unknown's avatar
unknown committed
187
  if(fix_receivers(parallel) == -1){
unknown's avatar
unknown committed
188
    setErrorCodeAbort(4000);
189
    return -1;
unknown's avatar
unknown committed
190 191
  }
  
unknown's avatar
unknown committed
192
  theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ);
unknown's avatar
unknown committed
193 194
  if (theSCAN_TABREQ == NULL) {
    setErrorCodeAbort(4000);
195
    return -1;
unknown's avatar
unknown committed
196 197
  }//if
  
198
  theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ);
unknown's avatar
unknown committed
199 200 201 202 203 204 205 206
  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
  req->apiConnectPtr = theNdbCon->theTCConPtr;
  req->tableId = m_accessTable->m_tableId;
  req->tableSchemaVersion = m_accessTable->m_version;
  req->storedProcId = 0xFFFF;
  req->buddyConPtr = theNdbCon->theBuddyConPtr;
  
  Uint32 reqInfo = 0;
unknown's avatar
unknown committed
207
  ScanTabReq::setParallelism(reqInfo, parallel);
unknown's avatar
unknown committed
208
  ScanTabReq::setScanBatch(reqInfo, 0);
unknown's avatar
unknown committed
209 210 211
  ScanTabReq::setLockMode(reqInfo, lockExcl);
  ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
  ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
212 213
  ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
  ScanTabReq::setTupScanFlag(reqInfo, tupScan);
unknown's avatar
unknown committed
214
  req->requestInfo = reqInfo;
215

unknown's avatar
unknown committed
216 217 218
  Uint64 transId = theNdbCon->getTransactionId();
  req->transId1 = (Uint32) transId;
  req->transId2 = (Uint32) (transId >> 32);
219

220 221 222 223 224
  NdbApiSignal* tSignal = theSCAN_TABREQ->next();
  if(!tSignal)
  {
    theSCAN_TABREQ->next(tSignal = theNdb->getSignal());
  }
unknown's avatar
unknown committed
225
  theLastKEYINFO = tSignal;
226 227 228 229
  
  tSignal->setSignal(GSN_KEYINFO);
  theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
  theTotalNrOfKeyWordInSignal= 0;
230

231
  getFirstATTRINFOScan();
232
  return 0;
233 234
}

unknown's avatar
unknown committed
235
int
unknown's avatar
unknown committed
236 237 238 239 240
NdbScanOperation::fix_receivers(Uint32 parallel){
  assert(parallel > 0);
  if(parallel > m_allocated_receivers){
    const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));

241
    Uint64 * tmp = new Uint64[(sz+7)/8];
unknown's avatar
unknown committed
242
    // Save old receivers
243
    memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*));
unknown's avatar
unknown committed
244
    delete[] m_array;
245
    m_array = (Uint32*)tmp;
unknown's avatar
unknown committed
246
    
247
    m_receivers = (NdbReceiver**)tmp;
unknown's avatar
unknown committed
248 249 250
    m_api_receivers = m_receivers + parallel;
    m_conf_receivers = m_api_receivers + parallel;
    m_sent_receivers = m_conf_receivers + parallel;
251
    m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel);
unknown's avatar
unknown committed
252

unknown's avatar
unknown committed
253
    // Only get/init "new" receivers
unknown's avatar
unknown committed
254
    NdbReceiver* tScanRec;
unknown's avatar
unknown committed
255
    for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
unknown's avatar
unknown committed
256 257 258 259 260 261
      tScanRec = theNdb->getNdbScanRec();
      if (tScanRec == NULL) {
	setErrorCodeAbort(4000);
	return -1;
      }//if
      m_receivers[i] = tScanRec;
unknown's avatar
unknown committed
262
      tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);
unknown's avatar
unknown committed
263
    }
unknown's avatar
unknown committed
264
    m_allocated_receivers = parallel;
unknown's avatar
unknown committed
265
  }
unknown's avatar
unknown committed
266
  
unknown's avatar
unknown committed
267
  reset_receivers(parallel, 0);
268 269 270
  return 0;
}

unknown's avatar
unknown committed
271 272 273 274 275
/**
 * Move receiver from send array to conf:ed array
 */
void
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
unknown's avatar
unknown committed
276
  if(theError.code == 0){
277 278 279
    if(DEBUG_NEXT_RESULT)
      ndbout_c("receiver_delivered");
    
unknown's avatar
unknown committed
280 281 282 283 284 285 286 287 288 289 290 291 292 293
    Uint32 idx = tRec->m_list_index;
    Uint32 last = m_sent_receivers_count - 1;
    if(idx != last){
      NdbReceiver * move = m_sent_receivers[last];
      m_sent_receivers[idx] = move;
      move->m_list_index = idx;
    }
    m_sent_receivers_count = last;
    
    last = m_conf_receivers_count;
    m_conf_receivers[last] = tRec;
    m_conf_receivers_count = last + 1;
    tRec->m_list_index = last;
    tRec->m_current_row = 0;
unknown's avatar
unknown committed
294
  }
295 296
}

unknown's avatar
unknown committed
297 298 299 300 301
/**
 * Remove receiver as it's completed
 */
void
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
unknown's avatar
unknown committed
302
  if(theError.code == 0){
unknown's avatar
unknown committed
303 304 305
    if(DEBUG_NEXT_RESULT)
      ndbout_c("receiver_completed");
    
unknown's avatar
unknown committed
306 307 308 309 310 311 312 313
    Uint32 idx = tRec->m_list_index;
    Uint32 last = m_sent_receivers_count - 1;
    if(idx != last){
      NdbReceiver * move = m_sent_receivers[last];
      m_sent_receivers[idx] = move;
      move->m_list_index = idx;
    }
    m_sent_receivers_count = last;
unknown's avatar
unknown committed
314
  }
315 316
}

unknown's avatar
unknown committed
317 318 319 320 321 322 323 324 325 326 327 328 329 330
/*****************************************************************************
 * int getFirstATTRINFOScan( U_int32 aData )
 *
 * Return Value:  Return 0:   Successful
 *      	  Return -1:  All other cases
 * Parameters:    None: 	   Only allocate the first signal.
 * Remark:        When a scan is defined we need to use this method instead 
 *                of insertATTRINFO for the first signal. 
 *                This is because we need not to mess up the code in 
 *                insertATTRINFO with if statements since we are not 
 *                interested in the TCKEYREQ signal.
 *****************************************************************************/
int
NdbScanOperation::getFirstATTRINFOScan()
331
{
unknown's avatar
unknown committed
332
  NdbApiSignal* tSignal;
333

unknown's avatar
unknown committed
334 335 336 337 338 339 340 341 342 343 344
  tSignal = theNdb->getSignal();
  if (tSignal == NULL){
    setErrorCodeAbort(4000);      
    return -1;    
  }
  tSignal->setSignal(m_attrInfoGSN);
  theAI_LenInCurrAI = 8;
  theATTRINFOptr = &tSignal->getDataPtrSend()[8];
  theFirstATTRINFO = tSignal;
  theCurrentATTRINFO = tSignal;
  theCurrentATTRINFO->next(NULL);
345 346 347 348

  return 0;
}

unknown's avatar
unknown committed
349 350 351 352 353 354
/**
 * Constats for theTupleKeyDefined[][0]
 */
#define SETBOUND_EQ 1
#define FAKE_PTR 2
#define API_PTR 3
355

unknown's avatar
unknown committed
356
#define WAITFOR_SCAN_TIMEOUT 120000
357

unknown's avatar
unknown committed
358 359
int
NdbScanOperation::executeCursor(int nodeId){
360
  NdbTransaction * tCon = theNdbCon;
unknown's avatar
unknown committed
361 362
  TransporterFacade* tp = TransporterFacade::instance();
  Guard guard(tp->theMutexPtr);
unknown's avatar
unknown committed
363 364

  Uint32 magic = tCon->theMagicNumber;
unknown's avatar
unknown committed
365
  Uint32 seq = tCon->theNodeSequence;
unknown's avatar
unknown committed
366

unknown's avatar
unknown committed
367 368 369
  if (tp->get_node_alive(nodeId) &&
      (tp->getNodeSequence(nodeId) == seq)) {

unknown's avatar
unknown committed
370 371 372 373
    /**
     * Only call prepareSendScan first time (incase of restarts)
     *   - check with theMagicNumber
     */
unknown's avatar
unknown committed
374
    tCon->theMagicNumber = 0x37412619;
unknown's avatar
unknown committed
375 376 377 378
    if(magic != 0x37412619 && 
       prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
      return -1;
    
unknown's avatar
unknown committed
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
    
    if (doSendScan(nodeId) == -1)
      return -1;

    return 0;
  } else {
    if (!(tp->get_node_stopping(nodeId) &&
	  (tp->getNodeSequence(nodeId) == seq))){
      TRACE_DEBUG("The node is hard dead when attempting to start a scan");
      setErrorCode(4029);
      tCon->theReleaseOnClose = true;
    } else {
      TRACE_DEBUG("The node is stopping when attempting to start a scan");
      setErrorCode(4030);
    }//if
394
    tCon->theCommitStatus = NdbTransaction::Aborted;
unknown's avatar
unknown committed
395 396
  }//if
  return -1;
397 398
}

399

unknown's avatar
unknown committed
400
int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
{
  int res;
  if ((res = nextResultImpl(fetchAllowed, forceSend)) == 0) {
    // handle blobs
    NdbBlob* tBlob = theBlobList;
    while (tBlob != 0) {
      if (tBlob->atNextResult() == -1)
        return -1;
      tBlob = tBlob->theNext;
    }
    /*
     * Flush blob part ops on behalf of user because
     * - nextResult is analogous to execute(NoCommit)
     * - user is likely to want blob value before next execute
     */
    if (m_transConnection->executePendingBlobOps() == -1)
      return -1;
    return 0;
  }
  return res;
}

int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
424
{
unknown's avatar
unknown committed
425
  if(m_ordered)
unknown's avatar
unknown committed
426 427
    return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
							       forceSend);
unknown's avatar
unknown committed
428 429 430 431 432 433 434
  
  /**
   * Check current receiver
   */
  int retVal = 2;
  Uint32 idx = m_current_api_receiver;
  Uint32 last = m_api_receivers_count;
435
  m_curr_row = 0;
436 437 438

  if(DEBUG_NEXT_RESULT)
    ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);
unknown's avatar
unknown committed
439 440 441 442 443 444 445
  
  /**
   * Check next buckets
   */
  for(; idx < last; idx++){
    NdbReceiver* tRec = m_api_receivers[idx];
    if(tRec->nextResult()){
446
      m_curr_row = tRec->copyout(theReceiver);
unknown's avatar
unknown committed
447 448 449 450 451 452 453 454
      retVal = 0;
      break;
    }
  }
    
  /**
   * We have advanced atleast one bucket
   */
unknown's avatar
unknown committed
455
  if(!fetchAllowed || !retVal){
unknown's avatar
unknown committed
456
    m_current_api_receiver = idx;
unknown's avatar
unknown committed
457
    if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
unknown's avatar
unknown committed
458 459
    return retVal;
  }
unknown's avatar
unknown committed
460
  
unknown's avatar
unknown committed
461 462 463
  Uint32 nodeId = theNdbCon->theDBnode;
  TransporterFacade* tp = TransporterFacade::instance();
  Guard guard(tp->theMutexPtr);
464 465 466
  if(theError.code)
    return -1;

unknown's avatar
unknown committed
467
  Uint32 seq = theNdbCon->theNodeSequence;
unknown's avatar
unknown committed
468 469
  if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
							  forceSend) == 0){
unknown's avatar
unknown committed
470 471 472 473 474
      
    idx = m_current_api_receiver;
    last = m_api_receivers_count;
      
    do {
unknown's avatar
unknown committed
475 476
      if(theError.code){
	setErrorCode(theError.code);
unknown's avatar
unknown committed
477
	if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
unknown's avatar
unknown committed
478 479 480
	return -1;
      }
      
unknown's avatar
unknown committed
481 482
      Uint32 cnt = m_conf_receivers_count;
      Uint32 sent = m_sent_receivers_count;
483 484 485

      if(DEBUG_NEXT_RESULT)
	ndbout_c("idx=%d last=%d cnt=%d sent=%d", idx, last, cnt, sent);
unknown's avatar
unknown committed
486 487 488 489 490 491 492 493 494 495 496 497
	
      if(cnt > 0){
	/**
	 * Just move completed receivers
	 */
	memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));
	last += cnt;
	m_conf_receivers_count = 0;
      } else if(retVal == 2 && sent > 0){
	/**
	 * No completed...
	 */
498 499
	theNdb->theImpl->theWaiter.m_node = nodeId;
	theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
unknown's avatar
unknown committed
500 501 502 503 504
	int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
	if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
	  continue;
	} else {
	  idx = last;
unknown's avatar
unknown committed
505
	  retVal = -2; //return_code;
unknown's avatar
unknown committed
506 507 508 509 510
	}
      } else if(retVal == 2){
	/**
	 * No completed & no sent -> EndOfData
	 */
unknown's avatar
unknown committed
511 512 513
	theError.code = -1; // make sure user gets error if he tries again
	if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
	return 1;
514
      }
unknown's avatar
unknown committed
515 516 517 518 519 520 521
	
      if(retVal == 0)
	break;
	
      for(; idx < last; idx++){
	NdbReceiver* tRec = m_api_receivers[idx];
	if(tRec->nextResult()){
522
	  m_curr_row = tRec->copyout(theReceiver);      
unknown's avatar
unknown committed
523 524 525
	  retVal = 0;
	  break;
	}
526
      }
unknown's avatar
unknown committed
527 528 529
    } while(retVal == 2);
  } else {
    retVal = -3;
530
  }
unknown's avatar
unknown committed
531 532 533 534 535 536 537 538
    
  m_api_receivers_count = last;
  m_current_api_receiver = idx;
    
  switch(retVal){
  case 0:
  case 1:
  case 2:
unknown's avatar
unknown committed
539
    if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
unknown's avatar
unknown committed
540 541 542 543 544 545 546 547
    return retVal;
  case -1:
    setErrorCode(4008); // Timeout
    break;
  case -2:
    setErrorCode(4028); // Node fail
    break;
  case -3: // send_next_scan -> return fail (set error-code self)
unknown's avatar
unknown committed
548 549
    if(theError.code == 0)
      setErrorCode(4028); // seq changed = Node fail
unknown's avatar
unknown committed
550 551 552 553 554
    break;
  }
    
  theNdbCon->theTransactionIsStarted = false;
  theNdbCon->theReleaseOnClose = true;
unknown's avatar
unknown committed
555
  if(DEBUG_NEXT_RESULT) ndbout_c("return -1", retVal);
unknown's avatar
unknown committed
556
  return -1;
557 558
}

unknown's avatar
unknown committed
559
int
unknown's avatar
unknown committed
560 561
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
				 bool forceSend){  
unknown's avatar
Merge  
unknown committed
562
  if(cnt > 0){
unknown's avatar
unknown committed
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
    NdbApiSignal tSignal(theNdb->theMyRef);
    tSignal.setSignal(GSN_SCAN_NEXTREQ);
    
    Uint32* theData = tSignal.getDataPtrSend();
    theData[0] = theNdbCon->theTCConPtr;
    theData[1] = stopScanFlag == true ? 1 : 0;
    Uint64 transId = theNdbCon->theTransactionId;
    theData[2] = transId;
    theData[3] = (Uint32) (transId >> 32);
    
    /**
     * Prepare ops
     */
    Uint32 last = m_sent_receivers_count;
    Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
unknown's avatar
unknown committed
578
    Uint32 sent = 0;
unknown's avatar
unknown committed
579 580
    for(Uint32 i = 0; i<cnt; i++){
      NdbReceiver * tRec = m_api_receivers[i];
unknown's avatar
unknown committed
581 582 583 584 585 586 587
      if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
      {
	m_sent_receivers[last+sent] = tRec;
	tRec->m_list_index = last+sent;
	tRec->prepareSend();
	sent++;
      }
unknown's avatar
unknown committed
588
    }
589 590
    memmove(m_api_receivers, m_api_receivers+cnt, 
	    (theParallelism-cnt) * sizeof(char*));
unknown's avatar
unknown committed
591
    
unknown's avatar
unknown committed
592 593 594 595 596
    int ret = 0;
    if(sent)
    {
      Uint32 nodeId = theNdbCon->theDBnode;
      TransporterFacade * tp = TransporterFacade::instance();
597
      if(cnt > 21){
unknown's avatar
unknown committed
598 599 600 601 602 603
	tSignal.setLength(4);
	LinearSectionPtr ptr[3];
	ptr[0].p = prep_array;
	ptr[0].sz = sent;
	ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
      } else {
604
	tSignal.setLength(4+sent);
unknown's avatar
unknown committed
605 606
	ret = tp->sendSignal(&tSignal, nodeId);
      }
unknown's avatar
unknown committed
607
    }
unknown's avatar
unknown committed
608
    
unknown's avatar
unknown committed
609 610
    if (!ret) checkForceSend(forceSend);

unknown's avatar
unknown committed
611
    m_sent_receivers_count = last + sent;
unknown's avatar
unknown committed
612 613
    m_api_receivers_count -= cnt;
    m_current_api_receiver = 0;
unknown's avatar
unknown committed
614
    
unknown's avatar
unknown committed
615
    return ret;
616
  }
unknown's avatar
unknown committed
617
  return 0;
618 619
}

unknown's avatar
unknown committed
620 621 622 623 624 625 626 627 628
void NdbScanOperation::checkForceSend(bool forceSend)
{
  if (forceSend) {
    TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
  } else {
    TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
  }//if
}

629 630 631 632
int 
NdbScanOperation::prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId)
{
  printf("NdbScanOperation::prepareSend\n");
unknown's avatar
unknown committed
633
  abort();
634 635 636 637 638 639 640 641 642 643
  return 0;
}

int 
NdbScanOperation::doSend(int ProcessorId)
{
  printf("NdbScanOperation::doSend\n");
  return 0;
}

644
void NdbScanOperation::close(bool forceSend, bool releaseOp)
645
{
646 647 648 649 650 651
  DBUG_ENTER("NdbScanOperation::close");
  DBUG_PRINT("enter", ("this=%x tcon=%x con=%x force=%d release=%d",
                       (UintPtr)this,
                       (UintPtr)m_transConnection, (UintPtr)theNdbCon,
                       forceSend, releaseOp));

652
  if(m_transConnection){
unknown's avatar
unknown committed
653
    if(DEBUG_NEXT_RESULT)
654
      ndbout_c("close() theError.code = %d "
unknown's avatar
unknown committed
655 656 657 658 659 660 661 662
	       "m_api_receivers_count = %d "
	       "m_conf_receivers_count = %d "
	       "m_sent_receivers_count = %d",
	       theError.code, 
	       m_api_receivers_count,
	       m_conf_receivers_count,
	       m_sent_receivers_count);
    
unknown's avatar
unknown committed
663 664
    TransporterFacade* tp = TransporterFacade::instance();
    Guard guard(tp->theMutexPtr);
unknown's avatar
unknown committed
665
    close_impl(tp, forceSend);
unknown's avatar
unknown committed
666
    
667 668 669 670 671
  }

  NdbConnection* tCon = theNdbCon;
  NdbConnection* tTransCon = m_transConnection;
  theNdbCon = NULL;
unknown's avatar
unknown committed
672
  m_transConnection = NULL;
673 674 675 676 677 678 679 680

  if (releaseOp && tTransCon) {
    NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
    tTransCon->releaseExecutedScanOperation(tOp);
  }
  
  tCon->theScanningOp = 0;
  theNdb->closeTransaction(tCon);
681
  theNdb->theRemainingStartTransactions--;
682
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
683
}
684

unknown's avatar
unknown committed
685
void
unknown's avatar
unknown committed
686
NdbScanOperation::execCLOSE_SCAN_REP(){
unknown's avatar
unknown committed
687 688
  m_conf_receivers_count = 0;
  m_sent_receivers_count = 0;
689 690
}

unknown's avatar
unknown committed
691
void NdbScanOperation::release()
692
{
unknown's avatar
unknown committed
693
  if(theNdbCon != 0 || m_transConnection != 0){
694
    close();
unknown's avatar
unknown committed
695 696 697
  }
  for(Uint32 i = 0; i<m_allocated_receivers; i++){
    m_receivers[i]->release();
698
  }
699 700 701

  NdbOperation::release();
  
unknown's avatar
unknown committed
702 703 704 705 706
  if(theSCAN_TABREQ)
  {
    theNdb->releaseSignal(theSCAN_TABREQ);
    theSCAN_TABREQ = 0;
  }
707 708
}

unknown's avatar
unknown committed
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
/***************************************************************************
int prepareSendScan(Uint32 aTC_ConnectPtr,
                    Uint64 aTransactionId)

Return Value:   Return 0 : preparation of send was succesful.
                Return -1: In all other case.   
Parameters:     aTC_ConnectPtr: the Connect pointer to TC.
		aTransactionId:	the Transaction identity of the transaction.
Remark:         Puts the the final data into ATTRINFO signal(s)  after this 
                we know the how many signal to send and their sizes
***************************************************************************/
int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
				      Uint64 aTransactionId){

  if (theInterpretIndicator != 1 ||
      (theOperationType != OpenScanRequest &&
       theOperationType != OpenRangeScanRequest)) {
    setErrorCodeAbort(4005);
    return -1;
  }
729

unknown's avatar
unknown committed
730
  theErrorLine = 0;
731

unknown's avatar
unknown committed
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748
  // In preapareSendInterpreted we set the sizes (word 4-8) in the
  // first ATTRINFO signal.
  if (prepareSendInterpreted() == -1)
    return -1;
  
  if(m_ordered){
    ((NdbIndexScanOperation*)this)->fix_get_values();
  }
  
  theCurrentATTRINFO->setLength(theAI_LenInCurrAI);

  /**
   * Prepare all receivers
   */
  theReceiver.prepareSend();
  bool keyInfo = m_keyInfo;
  Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
unknown's avatar
unknown committed
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763
  /**
   * The number of records sent by each LQH is calculated and the kernel
   * is informed of this number by updating the SCAN_TABREQ signal
   */
  Uint32 batch_size, batch_byte_size, first_batch_size;
  theReceiver.calculate_batch_size(key_size,
                                   theParallelism,
                                   batch_size,
                                   batch_byte_size,
                                   first_batch_size);
  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
  ScanTabReq::setScanBatch(req->requestInfo, batch_size);
  req->batch_byte_size= batch_byte_size;
  req->first_batch_size= first_batch_size;

unknown's avatar
unknown committed
764 765 766 767 768 769 770 771
  /**
   * Set keyinfo flag
   *  (Always keyinfo when using blobs)
   */
  Uint32 reqInfo = req->requestInfo;
  ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
  req->requestInfo = reqInfo;
  
unknown's avatar
unknown committed
772
  for(Uint32 i = 0; i<theParallelism; i++){
773 774 775
    m_receivers[i]->do_get_value(&theReceiver, batch_size, 
				 key_size, 
				 m_read_range_no);
776
  }
unknown's avatar
unknown committed
777
  return 0;
778 779
}

unknown's avatar
unknown committed
780
/*****************************************************************************
unknown's avatar
unknown committed
781 782 783 784 785 786
int doSend()

Return Value:   Return >0 : send was succesful, returns number of signals sent
                Return -1: In all other case.   
Parameters:     aProcessorId: Receiving processor node
Remark:         Sends the ATTRINFO signal(s)
unknown's avatar
unknown committed
787
*****************************************************************************/
unknown's avatar
unknown committed
788 789
int
NdbScanOperation::doSendScan(int aProcessorId)
790
{
unknown's avatar
unknown committed
791 792 793 794 795 796 797 798 799 800 801 802
  Uint32 tSignalCount = 0;
  NdbApiSignal* tSignal;
 
  if (theInterpretIndicator != 1 ||
      (theOperationType != OpenScanRequest &&
       theOperationType != OpenRangeScanRequest)) {
      setErrorCodeAbort(4005);
      return -1;
  }
  
  assert(theSCAN_TABREQ != NULL);
  tSignal = theSCAN_TABREQ;
803 804 805 806 807 808
  
  Uint32 tupKeyLen = theTupKeyLen;
  Uint32 len = theTotalNrOfKeyWordInSignal;
  Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
  Uint64 transId = theNdbCon->theTransactionId;
  
unknown's avatar
unknown committed
809 810 811 812
  // Update the "attribute info length in words" in SCAN_TABREQ before 
  // sending it. This could not be done in openScan because 
  // we created the ATTRINFO signals after the SCAN_TABREQ signal.
  ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
813
  req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
814 815 816
  Uint32 tmp = req->requestInfo;
  ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_);
  req->distributionKey = theDistributionKey;
817
  req->requestInfo = tmp;
818
  tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
819

unknown's avatar
unknown committed
820
  TransporterFacade *tp = TransporterFacade::instance();
unknown's avatar
unknown committed
821 822 823
  LinearSectionPtr ptr[3];
  ptr[0].p = m_prepared_receivers;
  ptr[0].sz = theParallelism;
824
  if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) {
unknown's avatar
unknown committed
825 826 827
    setErrorCode(4002);
    return -1;
  } 
828 829

  if (tupKeyLen > 0){
unknown's avatar
unknown committed
830
    // must have at least one signal since it contains attrLen for bounds
831 832 833 834
    assert(theLastKEYINFO != NULL);
    tSignal = theLastKEYINFO;
    tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);
    
835 836
    assert(theSCAN_TABREQ->next() != NULL);
    tSignal = theSCAN_TABREQ->next();
837 838 839 840 841 842 843 844
    
    NdbApiSignal* last;
    do {
      KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
      keyInfo->connectPtr = aTC_ConnectPtr;
      keyInfo->transId[0] = Uint32(transId);
      keyInfo->transId[1] = Uint32(transId >> 32);
      
unknown's avatar
unknown committed
845
      if (tp->sendSignal(tSignal,aProcessorId) == -1){
846 847
	setErrorCode(4002);
	return -1;
unknown's avatar
unknown committed
848
      }
849
      
unknown's avatar
unknown committed
850
      tSignalCount++;
851
      last = tSignal;
unknown's avatar
unknown committed
852
      tSignal = tSignal->next();
853
    } while(last != theLastKEYINFO);
854
  }
unknown's avatar
unknown committed
855 856 857
  
  tSignal = theFirstATTRINFO;
  while (tSignal != NULL) {
858 859 860 861 862
    AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend());
    attrInfo->connectPtr = aTC_ConnectPtr;
    attrInfo->transId[0] = Uint32(transId);
    attrInfo->transId[1] = Uint32(transId >> 32);
    
unknown's avatar
unknown committed
863 864 865 866 867 868 869 870
    if (tp->sendSignal(tSignal,aProcessorId) == -1){
      setErrorCode(4002);
      return -1;
    }
    tSignalCount++;
    tSignal = tSignal->next();
  }    
  theStatus = WaitResponse;  
unknown's avatar
unknown committed
871

872
  m_curr_row = 0;
unknown's avatar
unknown committed
873 874 875 876 877 878 879
  m_sent_receivers_count = theParallelism;
  if(m_ordered)
  {
    m_current_api_receiver = theParallelism;
    m_api_receivers_count = theParallelism;
  }
  
unknown's avatar
unknown committed
880 881 882
  return tSignalCount;
}//NdbOperation::doSendScan()

unknown's avatar
unknown committed
883
/*****************************************************************************
884
 * NdbOperation* takeOverScanOp(NdbTransaction* updateTrans);
unknown's avatar
unknown committed
885
 *
886
 * Parameters:     The update transactions NdbTransaction pointer.
unknown's avatar
unknown committed
887 888 889 890 891 892 893 894 895
 * Return Value:   A reference to the transferred operation object 
 *                   or NULL if no success.
 * Remark:         Take over the scanning transactions NdbOperation 
 *                 object for a tuple to an update transaction, 
 *                 which is the last operation read in nextScanResult()
 *		   (theNdbCon->thePreviousScanRec)
 *
 *     FUTURE IMPLEMENTATION:   (This note was moved from header file.)
 *     In the future, it will even be possible to transfer 
896 897
 *     to a NdbTransaction on another Ndb-object.  
 *     In this case the receiving NdbTransaction-object must call 
unknown's avatar
unknown committed
898 899 900 901
 *     a method receiveOpFromScan to actually receive the information.  
 *     This means that the updating transactions can be placed
 *     in separate threads and thus increasing the parallelism during
 *     the scan process. 
unknown's avatar
unknown committed
902
 ****************************************************************************/
unknown's avatar
unknown committed
903 904 905
int
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
{
906 907 908
  NdbRecAttr * tRecAttr = m_curr_row;
  if(tRecAttr)
  {
unknown's avatar
unknown committed
909 910 911 912 913 914 915
    const Uint32 * src = (Uint32*)tRecAttr->aRef();
    memcpy(data, src, 4*size);
    return 0;
  }
  return -1;
}

unknown's avatar
unknown committed
916
NdbOperation*
917 918
NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
{
unknown's avatar
unknown committed
919
  
920 921 922
  NdbRecAttr * tRecAttr = m_curr_row;
  if(tRecAttr)
  {
unknown's avatar
unknown committed
923 924 925 926
    NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable);
    if (newOp == NULL){
      return NULL;
    }
927 928 929 930 931 932
    if (!m_keyInfo)
    {
      // Cannot take over lock if no keyinfo was requested
      setErrorCodeAbort(4604);
      return NULL;
    }
unknown's avatar
unknown committed
933
    pTrans->theSimpleState = 0;
unknown's avatar
unknown committed
934 935 936 937 938
    
    const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;

    newOp->theTupKeyLen = len;
    newOp->theOperationType = opType;
939 940 941 942 943 944 945 946 947
    switch (opType) {
    case (ReadRequest):
      newOp->theLockMode = theLockMode;
      // Fall through
    case (DeleteRequest):
      newOp->theStatus = GetValue;
      break;
    default:
      newOp->theStatus = SetValue;
unknown's avatar
unknown committed
948 949
    }
    const Uint32 * src = (Uint32*)tRecAttr->aRef();
unknown's avatar
unknown committed
950
    const Uint32 tScanInfo = src[len] & 0x3FFFF;
unknown's avatar
unknown committed
951
    const Uint32 tTakeOverFragment = src[len] >> 20;
unknown's avatar
unknown committed
952 953 954
    {
      UintR scanInfo = 0;
      TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
unknown's avatar
unknown committed
955
      TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
unknown's avatar
unknown committed
956 957
      TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
      newOp->theScanInfo = scanInfo;
unknown's avatar
unknown committed
958 959
      newOp->theDistrKeyIndicator_ = 1;
      newOp->theDistributionKey = tTakeOverFragment;
unknown's avatar
unknown committed
960
    }
961

unknown's avatar
unknown committed
962 963 964 965 966 967 968 969 970
    // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ
    TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend());
    Uint32 i = 0;
    for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) {
      tcKeyReq->keyInfo[i] = * src++;
    }
    
    if(i < len){
      NdbApiSignal* tSignal = theNdb->getSignal();
971
      newOp->theTCREQ->next(tSignal); 
unknown's avatar
unknown committed
972 973 974 975 976 977 978 979 980 981 982 983
      
      Uint32 left = len - i;
      while(tSignal && left > KeyInfo::DataLength){
	tSignal->setSignal(GSN_KEYINFO);
	KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
	memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
	src += KeyInfo::DataLength;
	left -= KeyInfo::DataLength;

	tSignal->next(theNdb->getSignal());
	tSignal = tSignal->next();
      }
984

unknown's avatar
unknown committed
985 986 987 988
      if(tSignal && left > 0){
	tSignal->setSignal(GSN_KEYINFO);
	KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
	memcpy(keyInfo->keyData, src, 4 * left);
unknown's avatar
unknown committed
989 990 991 992 993 994 995 996 997 998 999
      }      
    }
    // create blob handles automatically
    if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
      for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
	NdbColumnImpl* c = m_currentTable->m_columns[i];
	assert(c != 0);
	if (c->getBlobType()) {
	  if (newOp->getBlobHandle(pTrans, c) == NULL)
	    return NULL;
	}
unknown's avatar
unknown committed
1000 1001
      }
    }
unknown's avatar
unknown committed
1002
    
unknown's avatar
unknown committed
1003
    return newOp;
1004
  }
unknown's avatar
unknown committed
1005
  return 0;
1006 1007
}

unknown's avatar
unknown committed
1008 1009 1010
NdbBlob*
NdbScanOperation::getBlobHandle(const char* anAttrName)
{
unknown's avatar
unknown committed
1011
  m_keyInfo = 1;
unknown's avatar
unknown committed
1012 1013 1014 1015 1016 1017 1018
  return NdbOperation::getBlobHandle(m_transConnection, 
				     m_currentTable->getColumn(anAttrName));
}

NdbBlob*
NdbScanOperation::getBlobHandle(Uint32 anAttrId)
{
unknown's avatar
unknown committed
1019
  m_keyInfo = 1;
unknown's avatar
unknown committed
1020 1021 1022 1023
  return NdbOperation::getBlobHandle(m_transConnection, 
				     m_currentTable->getColumn(anAttrId));
}

unknown's avatar
unknown committed
1024
NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
unknown's avatar
unknown committed
1025
  : NdbScanOperation(aNdb, NdbOperation::OrderedIndexScan)
1026
{
unknown's avatar
unknown committed
1027
}
1028

unknown's avatar
unknown committed
1029
NdbIndexScanOperation::~NdbIndexScanOperation(){
1030 1031
}

unknown's avatar
unknown committed
1032
int
unknown's avatar
unknown committed
1033 1034
NdbIndexScanOperation::setBound(const char* anAttrName, int type, 
				const void* aValue, Uint32 len)
1035
{
unknown's avatar
unknown committed
1036
  return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len);
1037 1038
}

unknown's avatar
unknown committed
1039
int
unknown's avatar
unknown committed
1040 1041
NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, 
				const void* aValue, Uint32 len)
1042
{
unknown's avatar
unknown committed
1043 1044
  return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len);
}
1045

unknown's avatar
unknown committed
1046 1047 1048 1049 1050
int
NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, 
				  const char* aValue, 
				  Uint32 len){
  return setBound(anAttrObject, BoundEQ, aValue, len);
1051 1052
}

unknown's avatar
unknown committed
1053 1054 1055
NdbRecAttr*
NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, 
				     char* aValue){
1056
  if(!m_ordered){
unknown's avatar
unknown committed
1057 1058
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
  }
1059 1060 1061 1062 1063 1064 1065 1066 1067
  
  int id = attrInfo->m_attrId;                // In "real" table
  assert(m_accessTable->m_index);
  int sz = (int)m_accessTable->m_index->m_key_ids.size();
  if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
  }
  
  assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
unknown's avatar
unknown committed
1068
  Uint32 marker = theTupleKeyDefined[id][0];
1069
  
unknown's avatar
unknown committed
1070 1071 1072 1073
  if(marker == SETBOUND_EQ){
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
  } else if(marker == API_PTR){
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
1074
  }
unknown's avatar
unknown committed
1075
  
1076 1077
  assert(marker == FAKE_PTR);
  
unknown's avatar
unknown committed
1078 1079 1080 1081 1082 1083 1084 1085 1086
  UintPtr oldVal;
  oldVal = theTupleKeyDefined[id][1];
#if (SIZEOF_CHARP == 8)
  oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32);
#endif
  theTupleKeyDefined[id][0] = API_PTR;

  NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
  tmp->setup(attrInfo, aValue);
1087

unknown's avatar
unknown committed
1088
  return tmp;
1089 1090
}

unknown's avatar
unknown committed
1091 1092 1093 1094 1095 1096 1097
#include <AttributeHeader.hpp>
/*
 * Define bound on index column in range scan.
 */
int
NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, 
				int type, const void* aValue, Uint32 len)
1098
{
1099 1100 1101 1102 1103
  if (!tAttrInfo)
  {
    setErrorCodeAbort(4318);    // Invalid attribute
    return -1;
  }
unknown's avatar
unknown committed
1104 1105 1106
  if (theOperationType == OpenRangeScanRequest &&
      (0 <= type && type <= 4) &&
      len <= 8000) {
1107
    // insert bound type
1108 1109
    Uint32 currLen = theTotalNrOfKeyWordInSignal;
    Uint32 remaining = KeyInfo::DataLength - currLen;
unknown's avatar
unknown committed
1110
    Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
1111
    bool tDistrKey = tAttrInfo->m_distributionKey;
1112

unknown's avatar
unknown committed
1113 1114 1115 1116 1117 1118
    len = aValue != NULL ? sizeInBytes : 0;
    if (len != sizeInBytes && (len != 0)) {
      setErrorCodeAbort(4209);
      return -1;
    }

1119
    // insert attribute header
unknown's avatar
unknown committed
1120 1121 1122
    Uint32 tIndexAttrId = tAttrInfo->m_attrId;
    Uint32 sizeInWords = (len + 3) / 4;
    AttributeHeader ah(tIndexAttrId, sizeInWords);
1123 1124
    const Uint32 ahValue = ah.m_value;

1125
    const Uint32 align = (UintPtr(aValue) & 7);
unknown's avatar
unknown committed
1126 1127 1128
    const bool aligned = (tDistrKey && type == BoundEQ) ? 
      (align == 0) : (align & 3) == 0;

1129 1130 1131
    const bool nobytes = (len & 0x3) == 0;
    const Uint32 totalLen = 2 + sizeInWords;
    Uint32 tupKeyLen = theTupKeyLen;
1132
    if(remaining > totalLen && aligned && nobytes){
1133 1134 1135 1136 1137 1138 1139
      Uint32 * dst = theKEYINFOptr + currLen;
      * dst ++ = type;
      * dst ++ = ahValue;
      memcpy(dst, aValue, 4 * sizeInWords);
      theTotalNrOfKeyWordInSignal = currLen + totalLen;
    } else {
      if(!aligned || !nobytes){
unknown's avatar
unknown committed
1140
        Uint32 tempData[2000];
1141 1142
	tempData[0] = type;
	tempData[1] = ahValue;
unknown's avatar
unknown committed
1143
	tempData[2 + (len >> 2)] = 0;
1144
        memcpy(tempData+2, aValue, len);
unknown's avatar
unknown committed
1145
	
1146 1147 1148 1149 1150
	insertBOUNDS(tempData, 2+sizeInWords);
      } else {
	Uint32 buf[2] = { type, ahValue };
	insertBOUNDS(buf, 2);
	insertBOUNDS((Uint32*)aValue, sizeInWords);
1151
      }
unknown's avatar
unknown committed
1152
    }
1153
    theTupKeyLen = tupKeyLen + totalLen;
1154

unknown's avatar
unknown committed
1155 1156 1157 1158 1159 1160 1161 1162 1163
    /**
     * Do sorted stuff
     */

    /**
     * The primary keys for an ordered index is defined in the beginning
     * so it's safe to use [tIndexAttrId] 
     * (instead of looping as is NdbOperation::equal_impl)
     */
1164 1165 1166 1167
    if(type == BoundEQ && tDistrKey)
    {
      theNoOfTupKeyLeft--;
      return handle_distribution_key((Uint64*)aValue, sizeInWords);
unknown's avatar
unknown committed
1168 1169 1170 1171 1172
    }
    return 0;
  } else {
    setErrorCodeAbort(4228);    // XXX wrong code
    return -1;
1173 1174 1175
  }
}

1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206
int
NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
  Uint32 len;
  Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal;
  Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal;
  do {
    len = (sz < remaining ? sz : remaining);
    memcpy(dst, data, 4 * len);
    
    if(sz >= remaining){
      NdbApiSignal* tCurr = theLastKEYINFO;
      tCurr->setLength(KeyInfo::MaxSignalLength);
      NdbApiSignal* tSignal = tCurr->next();
      if(tSignal)
	;
      else if((tSignal = theNdb->getSignal()) != 0)
      {
	tCurr->next(tSignal);
	tSignal->setSignal(GSN_KEYINFO);
      } else {
	goto error;
      }
      theLastKEYINFO = tSignal;
      theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
      remaining = KeyInfo::DataLength;
      sz -= len;
      data += len;
    } else {
      len = (KeyInfo::DataLength - remaining) + len;
      break;
    }
unknown's avatar
unknown committed
1207
  } while(true);   
1208 1209 1210 1211 1212 1213 1214 1215
  theTotalNrOfKeyWordInSignal = len;
  return 0;

error:
  setErrorCodeAbort(4228);    // XXX wrong code
  return -1;
}

1216
int
unknown's avatar
unknown committed
1217
NdbIndexScanOperation::readTuples(LockMode lm,
1218 1219 1220 1221 1222 1223 1224 1225
				  Uint32 scan_flags,
				  Uint32 parallel)
{
  const bool order_by = scan_flags & SF_OrderBy;
  const bool order_desc = scan_flags & SF_Descending;
  const bool read_range_no = scan_flags & SF_ReadRangeNo;

  int res = NdbScanOperation::readTuples(lm, scan_flags, 0);
1226
  if(!res && read_range_no)
1227 1228 1229 1230 1231
  {
    m_read_range_no = 1;
    Uint32 word = 0;
    AttributeHeader::init(&word, AttributeHeader::RANGE_NO, 0);
    if(insertATTRINFO(word) == -1)
1232
      res = -1;
1233
  }
1234
  if(!res && order_by){
1235 1236 1237 1238 1239 1240
    m_ordered = true;
    if (order_desc) {
      m_descending = true;
      ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
      ScanTabReq::setDescendingFlag(req->requestInfo, true);
    }
1241 1242
    Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
    m_sort_columns = cnt; // -1 for NDB$NODE
unknown's avatar
unknown committed
1243
    m_current_api_receiver = m_sent_receivers_count;
1244
    m_api_receivers_count = m_sent_receivers_count;
1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
    
    m_sort_columns = cnt;
    for(Uint32 i = 0; i<cnt; i++){
      const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
      const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
      NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
      UintPtr newVal = UintPtr(tmp);
      theTupleKeyDefined[i][0] = FAKE_PTR;
      theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
#if (SIZEOF_CHARP == 8)
      theTupleKeyDefined[i][2] = (newVal >> 32);
#endif
    }
unknown's avatar
unknown committed
1258
  }
1259 1260 1261
  m_this_bound_start = 0;
  m_first_bound_word = theKEYINFOptr;
  
1262
  return res;
unknown's avatar
unknown committed
1263
}
1264

unknown's avatar
unknown committed
1265 1266 1267 1268 1269 1270
void
NdbIndexScanOperation::fix_get_values(){
  /**
   * Loop through all getValues and set buffer pointer to "API" pointer
   */
  NdbRecAttr * curr = theReceiver.theFirstRecAttr;
1271
  Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
unknown's avatar
unknown committed
1272
  assert(cnt <  NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
1273
  
unknown's avatar
unknown committed
1274 1275
  const NdbIndexImpl * idx = m_accessTable->m_index;
  const NdbTableImpl * tab = m_currentTable;
1276 1277 1278
  for(Uint32 i = 0; i<cnt; i++){
    Uint32 val = theTupleKeyDefined[i][0];
    switch(val){
1279 1280
    case FAKE_PTR:
      curr->setup(curr->m_column, 0);
1281
    case API_PTR:
1282 1283
      curr = curr->next();
      break;
1284 1285
    case SETBOUND_EQ:
      break;
unknown's avatar
unknown committed
1286
#ifdef VM_TRACE
1287 1288
    default:
      abort();
unknown's avatar
unknown committed
1289 1290
#endif
    }
1291 1292 1293
  }
}

unknown's avatar
unknown committed
1294 1295 1296 1297 1298 1299 1300
int
NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, 
			       const NdbReceiver* t1, 
			       const NdbReceiver* t2){

  NdbRecAttr * r1 = t1->m_rows[t1->m_current_row];
  NdbRecAttr * r2 = t2->m_rows[t2->m_current_row];
1301

unknown's avatar
unknown committed
1302 1303
  r1 = (skip ? r1->next() : r1);
  r2 = (skip ? r2->next() : r2);
1304 1305
  const int jdir = 1 - 2 * (int)m_descending;
  assert(jdir == 1 || jdir == -1);
unknown's avatar
unknown committed
1306 1307 1308 1309 1310
  while(cols > 0){
    Uint32 * d1 = (Uint32*)r1->aRef();
    Uint32 * d2 = (Uint32*)r2->aRef();
    unsigned r1_null = r1->isNULL();
    if((r1_null ^ (unsigned)r2->isNULL())){
1311
      return (r1_null ? -1 : 1) * jdir;
unknown's avatar
unknown committed
1312
    }
1313
    const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
unknown's avatar
unknown committed
1314
    Uint32 len = r1->theAttrSize * r1->theArraySize;
unknown's avatar
unknown committed
1315
    if(!r1_null){
1316
      const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_type);
unknown's avatar
unknown committed
1317
      int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true);
unknown's avatar
unknown committed
1318 1319
      if(r){
	assert(r != NdbSqlUtil::CmpUnknown);
1320
	return r * jdir;
unknown's avatar
unknown committed
1321 1322 1323 1324 1325
      }
    }
    cols--;
    r1 = r1->next();
    r2 = r2->next();
1326
  }
unknown's avatar
unknown committed
1327
  return 0;
1328 1329
}

unknown's avatar
unknown committed
1330
int
unknown's avatar
unknown committed
1331 1332
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
					   bool forceSend){
unknown's avatar
unknown committed
1333
  
1334
  m_curr_row = 0;
unknown's avatar
unknown committed
1335
  Uint32 u_idx = 0, u_last = 0;
1336
  Uint32 s_idx   = m_current_api_receiver; // first sorted
unknown's avatar
unknown committed
1337 1338 1339
  Uint32 s_last  = theParallelism;         // last sorted

  NdbReceiver** arr = m_api_receivers;
1340
  NdbReceiver* tRec = arr[s_idx];
unknown's avatar
unknown committed
1341 1342 1343
  
  if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
				 fetchAllowed, 
1344
				 (s_idx < s_last ? tRec->nextResult() : 0));
unknown's avatar
unknown committed
1345 1346 1347 1348
  
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
				 u_idx, u_last,
				 s_idx, s_last);
1349 1350 1351
  
  bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();
  
unknown's avatar
unknown committed
1352 1353 1354 1355 1356
  if(fetchNeeded){
    if(fetchAllowed){
      if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
      TransporterFacade* tp = TransporterFacade::instance();
      Guard guard(tp->theMutexPtr);
1357 1358
      if(theError.code)
	return -1;
unknown's avatar
unknown committed
1359 1360
      Uint32 seq = theNdbCon->theNodeSequence;
      Uint32 nodeId = theNdbCon->theDBnode;
unknown's avatar
unknown committed
1361 1362
      if(seq == tp->getNodeSequence(nodeId) &&
	 !send_next_scan_ordered(s_idx, forceSend)){
unknown's avatar
unknown committed
1363
	Uint32 tmp = m_sent_receivers_count;
1364
	s_idx = m_current_api_receiver; 
unknown's avatar
unknown committed
1365
	while(m_sent_receivers_count > 0 && !theError.code){
1366 1367
	  theNdb->theImpl->theWaiter.m_node = nodeId;
	  theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
unknown's avatar
unknown committed
1368 1369 1370 1371
	  int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
	  if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
	    continue;
	  }
1372
	  if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
1373 1374 1375 1376 1377 1378 1379
	  setErrorCode(4028);
	  return -1;
	}
	
	if(theError.code){
	  setErrorCode(theError.code);
	  if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
unknown's avatar
unknown committed
1380 1381 1382 1383 1384 1385 1386 1387 1388
	  return -1;
	}
	
	u_idx = 0;
	u_last = m_conf_receivers_count;
	m_conf_receivers_count = 0;
	memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
	
	if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
1389 1390 1391
      } else {
	setErrorCode(4028);
	return -1;
unknown's avatar
unknown committed
1392 1393
      }
    } else {
1394
      if(DEBUG_NEXT_RESULT) ndbout_c("return 2");
unknown's avatar
unknown committed
1395 1396
      return 2;
    }
1397 1398 1399 1400
  } else {
    u_idx = s_idx;
    u_last = s_idx + 1;
    s_idx++;
unknown's avatar
unknown committed
1401
  }
unknown's avatar
unknown committed
1402
  
unknown's avatar
unknown committed
1403 1404 1405 1406 1407
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
				 u_idx, u_last,
				 s_idx, s_last);


1408
  Uint32 cols = m_sort_columns + m_read_range_no;
unknown's avatar
unknown committed
1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430
  Uint32 skip = m_keyInfo;
  while(u_idx < u_last){
    u_last--;
    tRec = arr[u_last];
    
    // Do binary search instead to find place
    Uint32 place = s_idx;
    for(; place < s_last; place++){
      if(compare(skip, cols, tRec, arr[place]) <= 0){
	break;
      }
    }
    
    if(place != s_idx){
      if(DEBUG_NEXT_RESULT) 
	ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx));
      memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx));
    }
    
    if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1);
    m_api_receivers[place-1] = tRec;
    s_idx--;
1431 1432
  }

unknown's avatar
unknown committed
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
				 u_idx, u_last,
				 s_idx, s_last);
  
  m_current_api_receiver = s_idx;
  
  if(DEBUG_NEXT_RESULT)
    for(Uint32 i = s_idx; i<s_last; i++)
      ndbout_c("%p", arr[i]);
  
  tRec = m_api_receivers[s_idx];    
  if(s_idx < s_last && tRec->nextResult()){
1445
    m_curr_row = tRec->copyout(theReceiver);      
1446
    if(DEBUG_NEXT_RESULT) ndbout_c("return 0");
unknown's avatar
unknown committed
1447
    return 0;
1448 1449
  }

unknown's avatar
unknown committed
1450 1451 1452
  theError.code = -1;
  if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
  return 1;
1453 1454
}

1455
int
unknown's avatar
unknown committed
1456
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){  
unknown's avatar
unknown committed
1457 1458 1459
  if(idx == theParallelism)
    return 0;
  
unknown's avatar
unknown committed
1460
  NdbReceiver* tRec = m_api_receivers[idx];
unknown's avatar
unknown committed
1461 1462 1463
  NdbApiSignal tSignal(theNdb->theMyRef);
  tSignal.setSignal(GSN_SCAN_NEXTREQ);
  
unknown's avatar
unknown committed
1464
  Uint32 last = m_sent_receivers_count;
unknown's avatar
unknown committed
1465
  Uint32* theData = tSignal.getDataPtrSend();
unknown's avatar
unknown committed
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475
  Uint32* prep_array = theData + 4;
  
  m_current_api_receiver = idx + 1;
  if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
  {
    if(DEBUG_NEXT_RESULT)
      ndbout_c("receiver completed, don't send");
    return 0;
  }
  
unknown's avatar
unknown committed
1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488
  theData[0] = theNdbCon->theTCConPtr;
  theData[1] = 0;
  Uint64 transId = theNdbCon->theTransactionId;
  theData[2] = transId;
  theData[3] = (Uint32) (transId >> 32);
  
  /**
   * Prepare ops
   */
  m_sent_receivers[last] = tRec;
  tRec->m_list_index = last;
  tRec->prepareSend();
  m_sent_receivers_count = last + 1;
1489
  
unknown's avatar
unknown committed
1490 1491 1492
  Uint32 nodeId = theNdbCon->theDBnode;
  TransporterFacade * tp = TransporterFacade::instance();
  tSignal.setLength(4+1);
unknown's avatar
unknown committed
1493 1494 1495
  int ret= tp->sendSignal(&tSignal, nodeId);
  if (!ret) checkForceSend(forceSend);
  return ret;
1496
}
unknown's avatar
unknown committed
1497 1498

int
unknown's avatar
unknown committed
1499
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
unknown's avatar
unknown committed
1500 1501 1502
  Uint32 seq = theNdbCon->theNodeSequence;
  Uint32 nodeId = theNdbCon->theDBnode;
  
unknown's avatar
unknown committed
1503 1504
  if(seq != tp->getNodeSequence(nodeId))
  {
unknown's avatar
unknown committed
1505 1506 1507
    theNdbCon->theReleaseOnClose = true;
    return -1;
  }
1508
  
unknown's avatar
unknown committed
1509 1510 1511 1512 1513
  /**
   * Wait for outstanding
   */
  while(theError.code == 0 && m_sent_receivers_count)
  {
1514 1515
    theNdb->theImpl->theWaiter.m_node = nodeId;
    theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
unknown's avatar
unknown committed
1516 1517 1518 1519 1520 1521 1522 1523 1524 1525
    int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
    switch(return_code){
    case 0:
      break;
    case -1:
      setErrorCode(4008);
    case -2:
      m_api_receivers_count = 0;
      m_conf_receivers_count = 0;
      m_sent_receivers_count = 0;
1526
      theNdbCon->theReleaseOnClose = true;
unknown's avatar
unknown committed
1527 1528 1529 1530
      return -1;
    }
  }

1531 1532 1533 1534 1535 1536 1537
  if(theError.code)
  {
    m_api_receivers_count = 0;
    m_current_api_receiver = m_ordered ? theParallelism : 0;
  }


unknown's avatar
unknown committed
1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572
  /**
   * move all conf'ed into api
   *   so that send_next_scan can check if they needs to be closed
   */
  Uint32 api = m_api_receivers_count;
  Uint32 conf = m_conf_receivers_count;

  if(m_ordered)
  {
    /**
     * Ordered scan, keep the m_api_receivers "to the right"
     */
    memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, 
	    (theParallelism - m_current_api_receiver) * sizeof(char*));
    api = (theParallelism - m_current_api_receiver);
    m_api_receivers_count = api;
  }
  
  if(DEBUG_NEXT_RESULT)
    ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
	     m_ordered, api, conf, 
	     m_sent_receivers_count, m_current_api_receiver, theParallelism);
  
  if(api+conf)
  {
    /**
     * There's something to close
     *   setup m_api_receivers (for send_next_scan)
     */
    memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
    m_api_receivers_count = api + conf;
    m_conf_receivers_count = 0;
  }
  
  // Send close scan
unknown's avatar
Merge  
unknown committed
1573
  if(send_next_scan(api+conf, true, forceSend) == -1)
unknown's avatar
unknown committed
1574 1575 1576
  {
    theNdbCon->theReleaseOnClose = true;
    return -1;
unknown's avatar
unknown committed
1577 1578 1579 1580 1581
  }
  
  /**
   * wait for close scan conf
   */
unknown's avatar
unknown committed
1582 1583
  while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
  {
1584 1585
    theNdb->theImpl->theWaiter.m_node = nodeId;
    theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
unknown's avatar
unknown committed
1586 1587 1588 1589 1590 1591 1592 1593 1594 1595
    int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
    switch(return_code){
    case 0:
      break;
    case -1:
      setErrorCode(4008);
    case -2:
      m_api_receivers_count = 0;
      m_conf_receivers_count = 0;
      m_sent_receivers_count = 0;
1596
      theNdbCon->theReleaseOnClose = true;
unknown's avatar
unknown committed
1597 1598 1599
      return -1;
    }
  }
unknown's avatar
unknown committed
1600
  
1601 1602
  return 0;
}
unknown's avatar
unknown committed
1603

1604 1605
void
NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
unknown's avatar
unknown committed
1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616
  for(Uint32 i = 0; i<parallell; i++){
    m_receivers[i]->m_list_index = i;
    m_prepared_receivers[i] = m_receivers[i]->getId();
    m_sent_receivers[i] = m_receivers[i];
    m_conf_receivers[i] = 0;
    m_api_receivers[i] = 0;
    m_receivers[i]->prepareSend();
  }
  
  m_api_receivers_count = 0;
  m_current_api_receiver = 0;
unknown's avatar
unknown committed
1617
  m_sent_receivers_count = 0;
unknown's avatar
unknown committed
1618
  m_conf_receivers_count = 0;
1619 1620 1621
}

int
unknown's avatar
unknown committed
1622
NdbScanOperation::restart(bool forceSend)
1623 1624 1625 1626 1627 1628 1629 1630
{
  
  TransporterFacade* tp = TransporterFacade::instance();
  Guard guard(tp->theMutexPtr);
  Uint32 nodeId = theNdbCon->theDBnode;
  
  {
    int res;
unknown's avatar
unknown committed
1631
    if((res= close_impl(tp, forceSend)))
1632 1633 1634 1635 1636 1637 1638 1639 1640
    {
      return res;
    }
  }
  
  /**
   * Reset receivers
   */
  reset_receivers(theParallelism, m_ordered);
unknown's avatar
unknown committed
1641
  
unknown's avatar
unknown committed
1642
  theError.code = 0;
unknown's avatar
unknown committed
1643 1644 1645 1646 1647
  if (doSendScan(nodeId) == -1)
    return -1;
  
  return 0;
}
1648 1649

int
unknown's avatar
unknown committed
1650
NdbIndexScanOperation::reset_bounds(bool forceSend){
1651 1652 1653 1654 1655
  int res;
  
  {
    TransporterFacade* tp = TransporterFacade::instance();
    Guard guard(tp->theMutexPtr);
unknown's avatar
unknown committed
1656
    res= close_impl(tp, forceSend);
1657 1658 1659 1660
  }

  if(!res)
  {
unknown's avatar
unknown committed
1661
    theError.code = 0;
1662 1663
    reset_receivers(theParallelism, m_ordered);
    
1664 1665
    theLastKEYINFO = theSCAN_TABREQ->next();
    theKEYINFOptr = ((KeyInfo*)theLastKEYINFO->getDataPtrSend())->keyData;
unknown's avatar
unknown committed
1666 1667
    theTupKeyLen = 0;
    theTotalNrOfKeyWordInSignal = 0;
1668 1669
    theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys;
    theDistrKeyIndicator_ = 0;
1670 1671
    m_this_bound_start = 0;
    m_first_bound_word = theKEYINFOptr;
unknown's avatar
unknown committed
1672
    m_transConnection
unknown's avatar
unknown committed
1673
      ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,
unknown's avatar
unknown committed
1674
		    this);
1675 1676 1677 1678 1679
    m_transConnection->define_scan_op(this);
    return 0;
  }
  return res;
}
1680 1681

int
1682
NdbIndexScanOperation::end_of_bound(Uint32 no)
1683
{
1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696
  if(no < (1 << 13)) // Only 12-bits no of ranges
  {
    Uint32 bound_head = * m_first_bound_word;
    bound_head |= (theTupKeyLen - m_this_bound_start) << 16 | (no << 4);
    * m_first_bound_word = bound_head;
    
    m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;;
    m_this_bound_start = theTupKeyLen;
    return 0;
  }
  return -1;
}

1697
int
1698 1699
NdbIndexScanOperation::get_range_no()
{
1700 1701
  NdbRecAttr* tRecAttr = m_curr_row;
  if(m_read_range_no && tRecAttr)
1702
  {
1703 1704 1705 1706
    if(m_keyInfo)
      tRecAttr = tRecAttr->next();
    Uint32 ret = *(Uint32*)tRecAttr->aRef();
    return ret;
1707
  }
1708
  return -1;
1709
}