Commit 7f510bcc authored by unknown's avatar unknown

wl2126 - ndb - set correct found_p using new pseudo column NDB$RANGE_NO


ndb/include/kernel/AttributeHeader.hpp:
  Added range_no pseudo column
ndb/include/ndbapi/NdbDictionary.hpp:
  Added range_no pseudo column
ndb/include/ndbapi/NdbIndexScanOperation.hpp:
  Add support for reading range_no
ndb/include/ndbapi/NdbReceiver.hpp:
  Add support for reading range_no
ndb/include/ndbapi/NdbScanOperation.hpp:
  Add support for reading range_no
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  Added range_no pseudo column
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Added range_no pseudo column
ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp:
  Added range_no pseudo column
ndb/src/ndbapi/NdbDictionary.cpp:
  Added range_no pseudo column
ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  Added range_no pseudo column
ndb/src/ndbapi/NdbReceiver.cpp:
  Add support for reading range_no
ndb/src/ndbapi/NdbScanOperation.cpp:
  Read range no
sql/ha_ndbcluster.cc:
  Set correct found_p using new feature read_range_no
sql/ha_ndbcluster.h:
  Set correct found_p using new feature read_range_no
parent 99880af8
...@@ -34,9 +34,10 @@ public: ...@@ -34,9 +34,10 @@ public:
* Psuedo columns * Psuedo columns
*/ */
STATIC_CONST( PSUEDO = 0x8000 ); STATIC_CONST( PSUEDO = 0x8000 );
STATIC_CONST( FRAGMENT = 0xFFFE ); STATIC_CONST( FRAGMENT = 0xFFFE ); // Read fragment no
STATIC_CONST( ROW_COUNT = 0xFFFD ); STATIC_CONST( ROW_COUNT = 0xFFFD ); // Read row count (committed)
STATIC_CONST( COMMIT_COUNT = 0xFFFC ); STATIC_CONST( COMMIT_COUNT = 0xFFFC ); // Read commit count
STATIC_CONST( RANGE_NO = 0xFFFB ); // Read range no (when batched ranges)
/** Initialize AttributeHeader at location aHeaderPtr */ /** Initialize AttributeHeader at location aHeaderPtr */
static AttributeHeader& init(void* aHeaderPtr, Uint32 anAttributeId, static AttributeHeader& init(void* aHeaderPtr, Uint32 anAttributeId,
......
...@@ -387,6 +387,7 @@ public: ...@@ -387,6 +387,7 @@ public:
static const Column * FRAGMENT; static const Column * FRAGMENT;
static const Column * ROW_COUNT; static const Column * ROW_COUNT;
static const Column * COMMIT_COUNT; static const Column * COMMIT_COUNT;
static const Column * RANGE_NO;
#endif #endif
private: private:
......
...@@ -45,7 +45,8 @@ public: ...@@ -45,7 +45,8 @@ public:
NdbResultSet* readTuples(LockMode = LM_Read, NdbResultSet* readTuples(LockMode = LM_Read,
Uint32 batch = 0, Uint32 batch = 0,
Uint32 parallel = 0, Uint32 parallel = 0,
bool order_by = false); bool order_by = false,
bool read_range_no = false);
inline NdbResultSet* readTuples(int parallell){ inline NdbResultSet* readTuples(int parallell){
return readTuples(LM_Read, 0, parallell, false); return readTuples(LM_Read, 0, parallell, false);
...@@ -119,7 +120,12 @@ public: ...@@ -119,7 +120,12 @@ public:
* Marks end of a bound, * Marks end of a bound,
* used when batching index reads (multiple ranges) * used when batching index reads (multiple ranges)
*/ */
int end_of_bound(); int end_of_bound(Uint32 range_no);
/**
* Return range no for current row
*/
Uint32 get_range_no();
bool getSorted() const { return m_ordered; } bool getSorted() const { return m_ordered; }
private: private:
......
...@@ -68,7 +68,7 @@ private: ...@@ -68,7 +68,7 @@ private:
Ndb* m_ndb; Ndb* m_ndb;
Uint32 m_id; Uint32 m_id;
Uint32 m_tcPtrI; Uint32 m_tcPtrI;
Uint32 m_key_info; Uint32 m_hidden_count;
ReceiverType m_type; ReceiverType m_type;
void* m_owner; void* m_owner;
NdbReceiver* m_next; NdbReceiver* m_next;
...@@ -77,7 +77,7 @@ private: ...@@ -77,7 +77,7 @@ private:
* At setup * At setup
*/ */
class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr); class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr);
void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size); void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size, Uint32 range);
void prepareSend(); void prepareSend();
void calculate_batch_size(Uint32, Uint32, Uint32&, Uint32&, Uint32&); void calculate_batch_size(Uint32, Uint32, Uint32&, Uint32&, Uint32&);
......
...@@ -156,6 +156,7 @@ protected: ...@@ -156,6 +156,7 @@ protected:
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*); NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
Uint32 m_ordered; Uint32 m_ordered;
Uint32 m_read_range_no;
int restart(bool forceSend = false); int restart(bool forceSend = false);
}; };
......
...@@ -2008,8 +2008,10 @@ public: ...@@ -2008,8 +2008,10 @@ public:
BlockReference tcTuxBlockref; BlockReference tcTuxBlockref;
BlockReference tcTupBlockref; BlockReference tcTupBlockref;
Uint32 commitAckMarker; Uint32 commitAckMarker;
UintR noFiredTriggers; union {
Uint32 m_scan_curr_range_no;
UintR noFiredTriggers;
};
Uint16 errorCode; Uint16 errorCode;
Uint16 logStartPageIndex; Uint16 logStartPageIndex;
Uint16 logStartPageNo; Uint16 logStartPageNo;
......
...@@ -2619,12 +2619,20 @@ Dblqh::execREAD_PSUEDO_REQ(Signal* signal){ ...@@ -2619,12 +2619,20 @@ Dblqh::execREAD_PSUEDO_REQ(Signal* signal){
regTcPtr.i = signal->theData[0]; regTcPtr.i = signal->theData[0];
ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec); ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
FragrecordPtr regFragptr; if(signal->theData[1] != AttributeHeader::RANGE_NO)
regFragptr.i = regTcPtr.p->fragmentptr; {
ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord); jam();
FragrecordPtr regFragptr;
signal->theData[0] = regFragptr.p->accFragptr[regTcPtr.p->localFragptr]; regFragptr.i = regTcPtr.p->fragmentptr;
EXECUTE_DIRECT(DBACC, GSN_READ_PSUEDO_REQ, signal, 2); ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
signal->theData[0] = regFragptr.p->accFragptr[regTcPtr.p->localFragptr];
EXECUTE_DIRECT(DBACC, GSN_READ_PSUEDO_REQ, signal, 2);
}
else
{
signal->theData[0] = regTcPtr.p->m_scan_curr_range_no;
}
} }
/* ************>> */ /* ************>> */
...@@ -7860,11 +7868,12 @@ Dblqh::copy_bounds(Uint32 * dst, TcConnectionrec* tcPtrP) ...@@ -7860,11 +7868,12 @@ Dblqh::copy_bounds(Uint32 * dst, TcConnectionrec* tcPtrP)
} }
Uint32 first = (* (dst - left)); // First word in range Uint32 first = (* (dst - left)); // First word in range
(* (dst - left)) = (first & 0xFFFF); // Remove length (16 upper bits)
// Length of this range // Length of this range
Uint8 offset; Uint8 offset;
const Uint32 len = (first >> 16) ? (first >> 16) : totalLen; const Uint32 len = (first >> 16) ? (first >> 16) : totalLen;
tcPtrP->m_scan_curr_range_no = (first & 0xFFF0) >> 4;
(* (dst - left)) = (first & 0xF); // Remove length & range no
if(len < left) if(len < left)
{ {
...@@ -8740,6 +8749,7 @@ void Dblqh::initScanTc(Signal* signal, ...@@ -8740,6 +8749,7 @@ void Dblqh::initScanTc(Signal* signal,
tcConnectptr.p->listState = TcConnectionrec::NOT_IN_LIST; tcConnectptr.p->listState = TcConnectionrec::NOT_IN_LIST;
tcConnectptr.p->commitAckMarker = RNIL; tcConnectptr.p->commitAckMarker = RNIL;
tcConnectptr.p->m_offset_current_keybuf = 0; tcConnectptr.p->m_offset_current_keybuf = 0;
tcConnectptr.p->m_scan_curr_range_no = 0;
tabptr.p->usageCount++; tabptr.p->usageCount++;
}//Dblqh::initScanTc() }//Dblqh::initScanTc()
......
...@@ -998,6 +998,13 @@ Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){ ...@@ -998,6 +998,13 @@ Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){
outBuffer[0] = signal->theData[0]; outBuffer[0] = signal->theData[0];
outBuffer[1] = signal->theData[1]; outBuffer[1] = signal->theData[1];
return 2; return 2;
case AttributeHeader::RANGE_NO:
signal->theData[0] = operPtr.p->userpointer;
signal->theData[1] = attrId;
EXECUTE_DIRECT(DBLQH, GSN_READ_PSUEDO_REQ, signal, 2);
outBuffer[0] = signal->theData[0];
return 1;
default: default:
return 0; return 0;
} }
......
...@@ -942,4 +942,4 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col) ...@@ -942,4 +942,4 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col)
const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0; const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0;
const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0; const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0;
const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0; const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0;
const NdbDictionary::Column * NdbDictionary::Column::RANGE_NO = 0;
...@@ -218,6 +218,12 @@ NdbColumnImpl::create_psuedo(const char * name){ ...@@ -218,6 +218,12 @@ NdbColumnImpl::create_psuedo(const char * name){
col->m_impl.m_attrId = AttributeHeader::COMMIT_COUNT; col->m_impl.m_attrId = AttributeHeader::COMMIT_COUNT;
col->m_impl.m_attrSize = 8; col->m_impl.m_attrSize = 8;
col->m_impl.m_arraySize = 1; col->m_impl.m_arraySize = 1;
} else if(!strcmp(name, "NDB$RANGE_NO")){
col->setType(NdbDictionary::Column::Unsigned);
col->m_impl.m_attrId = AttributeHeader::RANGE_NO;
col->m_impl.m_attrSize = 4;
col->m_impl.m_arraySize = 1;
col->m_impl.m_extType = NdbSqlUtil::Type::Unsigned;
} else { } else {
abort(); abort();
} }
...@@ -685,6 +691,8 @@ NdbDictionaryImpl::setTransporter(class Ndb* ndb, ...@@ -685,6 +691,8 @@ NdbDictionaryImpl::setTransporter(class Ndb* ndb,
NdbColumnImpl::create_psuedo("NDB$ROW_COUNT"); NdbColumnImpl::create_psuedo("NDB$ROW_COUNT");
NdbDictionary::Column::COMMIT_COUNT= NdbDictionary::Column::COMMIT_COUNT=
NdbColumnImpl::create_psuedo("NDB$COMMIT_COUNT"); NdbColumnImpl::create_psuedo("NDB$COMMIT_COUNT");
NdbDictionary::Column::RANGE_NO=
NdbColumnImpl::create_psuedo("NDB$RANGE_NO");
} }
m_globalHash->unlock(); m_globalHash->unlock();
return true; return true;
......
...@@ -140,7 +140,10 @@ NdbReceiver::calculate_batch_size(Uint32 key_size, ...@@ -140,7 +140,10 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
} }
void void
NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ NdbReceiver::do_get_value(NdbReceiver * org,
Uint32 rows,
Uint32 key_size,
Uint32 range_no){
if(rows > m_defined_rows){ if(rows > m_defined_rows){
delete[] m_rows; delete[] m_rows;
m_defined_rows = rows; m_defined_rows = rows;
...@@ -155,7 +158,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ ...@@ -155,7 +158,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
key.m_attrSize = 4; key.m_attrSize = 4;
key.m_nullable = true; // So that receive works w.r.t KEYINFO20 key.m_nullable = true; // So that receive works w.r.t KEYINFO20
} }
m_key_info = key_size; m_hidden_count = (key_size ? 1 : 0) + range_no ;
for(Uint32 i = 0; i<rows; i++){ for(Uint32 i = 0; i<rows; i++){
NdbRecAttr * prev = theCurrentRecAttr; NdbRecAttr * prev = theCurrentRecAttr;
...@@ -167,6 +170,12 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ ...@@ -167,6 +170,12 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
return ; // -1 return ; // -1
} }
if(range_no &&
!getValue(&NdbColumnImpl::getImpl(* NdbDictionary::Column::RANGE_NO),0))
{
abort();
}
NdbRecAttr* tRecAttr = org->theFirstRecAttr; NdbRecAttr* tRecAttr = org->theFirstRecAttr;
while(tRecAttr != 0){ while(tRecAttr != 0){
if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0) != 0) if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0) != 0)
...@@ -196,10 +205,9 @@ void ...@@ -196,10 +205,9 @@ void
NdbReceiver::copyout(NdbReceiver & dstRec){ NdbReceiver::copyout(NdbReceiver & dstRec){
NdbRecAttr* src = m_rows[m_current_row++]; NdbRecAttr* src = m_rows[m_current_row++];
NdbRecAttr* dst = dstRec.theFirstRecAttr; NdbRecAttr* dst = dstRec.theFirstRecAttr;
Uint32 tmp = m_key_info; Uint32 tmp = m_hidden_count;
if(tmp > 0){ while(tmp--)
src = src->next(); src = src->next();
}
while(dst){ while(dst){
Uint32 len = ((src->theAttrSize * src->theArraySize)+3)/4; Uint32 len = ((src->theAttrSize * src->theArraySize)+3)/4;
......
...@@ -121,6 +121,7 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection) ...@@ -121,6 +121,7 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection)
theOperationType = OpenScanRequest; theOperationType = OpenScanRequest;
theNdbCon->theMagicNumber = 0xFE11DF; theNdbCon->theMagicNumber = 0xFE11DF;
theNoOfTupKeyLeft = tab->m_noOfDistributionKeys; theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
m_read_range_no = 0;
return 0; return 0;
} }
...@@ -735,7 +736,9 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, ...@@ -735,7 +736,9 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
req->requestInfo = reqInfo; req->requestInfo = reqInfo;
for(Uint32 i = 0; i<theParallelism; i++){ for(Uint32 i = 0; i<theParallelism; i++){
m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size); m_receivers[i]->do_get_value(&theReceiver, batch_size,
key_size,
m_read_range_no);
} }
return 0; return 0;
} }
...@@ -1197,8 +1200,17 @@ NdbResultSet* ...@@ -1197,8 +1200,17 @@ NdbResultSet*
NdbIndexScanOperation::readTuples(LockMode lm, NdbIndexScanOperation::readTuples(LockMode lm,
Uint32 batch, Uint32 batch,
Uint32 parallel, Uint32 parallel,
bool order_by){ bool order_by,
bool read_range_no){
NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0); NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
if(read_range_no)
{
m_read_range_no = 1;
Uint32 word = 0;
AttributeHeader::init(&word, AttributeHeader::RANGE_NO, 0);
if(insertATTRINFO(word) == -1)
rs = 0;
}
if(rs && order_by){ if(rs && order_by){
m_ordered = 1; m_ordered = 1;
Uint32 cnt = m_accessTable->getNoOfColumns() - 1; Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
...@@ -1264,7 +1276,6 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, ...@@ -1264,7 +1276,6 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
r1 = (skip ? r1->next() : r1); r1 = (skip ? r1->next() : r1);
r2 = (skip ? r2->next() : r2); r2 = (skip ? r2->next() : r2);
while(cols > 0){ while(cols > 0){
Uint32 * d1 = (Uint32*)r1->aRef(); Uint32 * d1 = (Uint32*)r1->aRef();
Uint32 * d2 = (Uint32*)r2->aRef(); Uint32 * d2 = (Uint32*)r2->aRef();
...@@ -1366,7 +1377,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, ...@@ -1366,7 +1377,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
s_idx, s_last); s_idx, s_last);
Uint32 cols = m_sort_columns; Uint32 cols = m_sort_columns + m_read_range_no;
Uint32 skip = m_keyInfo; Uint32 skip = m_keyInfo;
while(u_idx < u_last){ while(u_idx < u_last){
u_last--; u_last--;
...@@ -1640,13 +1651,41 @@ NdbIndexScanOperation::reset_bounds(bool forceSend){ ...@@ -1640,13 +1651,41 @@ NdbIndexScanOperation::reset_bounds(bool forceSend){
} }
int int
NdbIndexScanOperation::end_of_bound() NdbIndexScanOperation::end_of_bound(Uint32 no)
{ {
Uint32 bound_head = * m_first_bound_word; if(no < (1 << 13)) // Only 12-bits no of ranges
bound_head |= (theTupKeyLen - m_this_bound_start) << 16; {
* m_first_bound_word = bound_head; Uint32 bound_head = * m_first_bound_word;
bound_head |= (theTupKeyLen - m_this_bound_start) << 16 | (no << 4);
m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;; * m_first_bound_word = bound_head;
m_this_bound_start = theTupKeyLen;
m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;;
m_this_bound_start = theTupKeyLen;
return 0;
}
return -1;
}
Uint32
NdbIndexScanOperation::get_range_no()
{
if(m_read_range_no)
{
Uint32 idx = m_current_api_receiver;
Uint32 last = m_api_receivers_count;
Uint32 row;
NdbReceiver * tRec;
NdbRecAttr * tRecAttr;
if(idx < last && (tRec = m_api_receivers[idx])
&& ((row = tRec->m_current_row) <= tRec->m_defined_rows)
&& (tRecAttr = tRec->m_rows[row-1])){
if(m_keyInfo)
tRecAttr = tRecAttr->next();
Uint32 ret = *(Uint32*)tRecAttr->aRef();
return ret;
}
}
return 0; return 0;
} }
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
#include <ndbapi/NdbScanFilter.hpp> #include <ndbapi/NdbScanFilter.hpp>
// Default value for parallelism // Default value for parallelism
static const int parallelism= 240; static const int parallelism= 0;
// Default value for max number of transactions // Default value for max number of transactions
// createable against NDB from this handler // createable against NDB from this handler
...@@ -1316,7 +1316,8 @@ inline int ha_ndbcluster::next_result(byte *buf) ...@@ -1316,7 +1316,8 @@ inline int ha_ndbcluster::next_result(byte *buf)
*/ */
int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
const key_range *keys[2]) const key_range *keys[2],
uint range_no)
{ {
const KEY *const key_info= table->key_info + active_index; const KEY *const key_info= table->key_info + active_index;
const uint key_parts= key_info->key_parts; const uint key_parts= key_info->key_parts;
...@@ -1418,7 +1419,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, ...@@ -1418,7 +1419,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag)); DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag));
DBUG_ASSERT(false); DBUG_ASSERT(false);
// Stop setting bounds but continue with what we have // Stop setting bounds but continue with what we have
op->end_of_bound(); op->end_of_bound(range_no);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
} }
...@@ -1466,7 +1467,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, ...@@ -1466,7 +1467,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
tot_len+= part_store_len; tot_len+= part_store_len;
} }
op->end_of_bound(); op->end_of_bound(range_no);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -4862,20 +4863,12 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4862,20 +4863,12 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
case ORDERED_INDEX: case ORDERED_INDEX:
range: range:
ranges[i].range_flag &= ~(uint)UNIQUE_RANGE; ranges[i].range_flag &= ~(uint)UNIQUE_RANGE;
if (sorted && scanOp != 0)
{
/**
* We currently don't support batching of ordered range scans
*/
i--;
curr= (byte*)buffer->buffer_end;
break;
}
if (scanOp == 0) if (scanOp == 0)
{ {
if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) && if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) &&
(m_active_cursor= scanOp->readTuples(lm, 0, parallelism, sorted)) && (m_active_cursor= scanOp->readTuples(lm, 0,
!define_read_attrs(curr, scanOp)) parallelism, sorted, true))&&
!define_read_attrs(curr, scanOp))
curr += reclength; curr += reclength;
else else
ERR_RETURN(scanOp ? ERR_RETURN(scanOp ?
...@@ -4883,7 +4876,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4883,7 +4876,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
m_active_trans->getNdbError()); m_active_trans->getNdbError());
} }
const key_range *keys[2]= { &ranges[i].start_key, &ranges[i].end_key }; const key_range *keys[2]= { &ranges[i].start_key, &ranges[i].end_key };
if ((res= set_bounds(scanOp, keys))) if ((res= set_bounds(scanOp, keys, i)))
DBUG_RETURN(res); DBUG_RETURN(res);
break; break;
} }
...@@ -5012,7 +5005,9 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p) ...@@ -5012,7 +5005,9 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
/** /**
* Found a record belonging to a scan * Found a record belonging to a scan
*/ */
* multi_range_found_p= multi_ranges + multi_range_curr; Uint32 range_no = ((NdbIndexScanOperation*)m_active_cursor->getOperation())
->get_range_no();
* multi_range_found_p= multi_ranges + range_no;
memcpy(table->record[0], m_multi_range_result_ptr, reclength); memcpy(table->record[0], m_multi_range_result_ptr, reclength);
setup_recattr(m_active_cursor->getOperation()->getFirstRecAttr()); setup_recattr(m_active_cursor->getOperation()->getFirstRecAttr());
unpack_record(table->record[0]); unpack_record(table->record[0]);
......
...@@ -203,7 +203,7 @@ class ha_ndbcluster: public handler ...@@ -203,7 +203,7 @@ class ha_ndbcluster: public handler
int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key(NdbOperation *op); int set_primary_key(NdbOperation *op);
int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data); int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data);
int set_bounds(NdbIndexScanOperation *ndb_op, const key_range *keys[2]); int set_bounds(NdbIndexScanOperation*, const key_range *keys[2], uint= 0);
int key_cmp(uint keynr, const byte * old_row, const byte * new_row); int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
int set_index_key(NdbOperation *, const KEY *key_info, const byte *key_ptr); int set_index_key(NdbOperation *, const KEY *key_info, const byte *key_ptr);
void print_results(); void print_results();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment