Commit 3465e155 authored by unknown's avatar unknown

ndb: wl-1893: range scanning backwards, ndb kernel


ndb/include/kernel/signaldata/AccScan.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/include/kernel/signaldata/ScanFrag.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/include/kernel/signaldata/ScanTab.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/include/ndbapi/NdbIndexScanOperation.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/include/ndbapi/NdbScanOperation.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/common/debugger/signaldata/ScanTab.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/Dbtux.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/ndbapi/NdbScanOperation.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/tools/select_all.cpp:
  wl-1893: range scanning backwards, ndb kernel
sql/ha_ndbcluster.cc:
  wl-1893: range scanning backwards, ndb kernel
parent 755874c8
...@@ -52,9 +52,11 @@ private: ...@@ -52,9 +52,11 @@ private:
*/ */
static Uint32 getLockMode(const Uint32 & requestInfo); static Uint32 getLockMode(const Uint32 & requestInfo);
static Uint32 getReadCommittedFlag(const Uint32 & requestInfo); static Uint32 getReadCommittedFlag(const Uint32 & requestInfo);
static Uint32 getDescendingFlag(const Uint32 & requestInfo);
static void setLockMode(Uint32 & requestInfo, Uint32 lockMode); static void setLockMode(Uint32 & requestInfo, Uint32 lockMode);
static void setReadCommittedFlag(Uint32 & requestInfo, Uint32 readCommitted); static void setReadCommittedFlag(Uint32 & requestInfo, Uint32 readCommitted);
static void setDescendingFlag(Uint32 & requestInfo, Uint32 descending);
}; };
/** /**
...@@ -62,14 +64,16 @@ private: ...@@ -62,14 +64,16 @@ private:
* *
* l = Lock Mode - 1 Bit 2 * l = Lock Mode - 1 Bit 2
* h = Read Committed - 1 Bit 5 * h = Read Committed - 1 Bit 5
* z = Descending (TUX) - 1 Bit 6
* *
* 1111111111222222222233 * 1111111111222222222233
* 01234567890123456789012345678901 * 01234567890123456789012345678901
* l h * l hz
*/ */
#define AS_LOCK_MODE_SHIFT (2) #define AS_LOCK_MODE_SHIFT (2)
#define AS_LOCK_MODE_MASK (1) #define AS_LOCK_MODE_MASK (1)
#define AS_READ_COMMITTED_SHIFT (5) #define AS_READ_COMMITTED_SHIFT (5)
#define AS_DESCENDING_SHIFT (6)
inline inline
Uint32 Uint32
...@@ -83,6 +87,12 @@ AccScanReq::getReadCommittedFlag(const Uint32 & requestInfo){ ...@@ -83,6 +87,12 @@ AccScanReq::getReadCommittedFlag(const Uint32 & requestInfo){
return (requestInfo >> AS_READ_COMMITTED_SHIFT) & 1; return (requestInfo >> AS_READ_COMMITTED_SHIFT) & 1;
} }
inline
Uint32
AccScanReq::getDescendingFlag(const Uint32 & requestInfo){
return (requestInfo >> AS_DESCENDING_SHIFT) & 1;
}
inline inline
void void
AccScanReq::setLockMode(UintR & requestInfo, UintR val){ AccScanReq::setLockMode(UintR & requestInfo, UintR val){
...@@ -97,6 +107,13 @@ AccScanReq::setReadCommittedFlag(UintR & requestInfo, UintR val){ ...@@ -97,6 +107,13 @@ AccScanReq::setReadCommittedFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << AS_READ_COMMITTED_SHIFT); requestInfo |= (val << AS_READ_COMMITTED_SHIFT);
} }
inline
void
AccScanReq::setDescendingFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "AccScanReq::setDescendingFlag");
requestInfo |= (val << AS_DESCENDING_SHIFT);
}
class AccScanConf { class AccScanConf {
/** /**
* Sender(s) * Sender(s)
......
...@@ -56,6 +56,7 @@ public: ...@@ -56,6 +56,7 @@ public:
static Uint32 getKeyinfoFlag(const Uint32 & requestInfo); static Uint32 getKeyinfoFlag(const Uint32 & requestInfo);
static Uint32 getReadCommittedFlag(const Uint32 & requestInfo); static Uint32 getReadCommittedFlag(const Uint32 & requestInfo);
static Uint32 getRangeScanFlag(const Uint32 & requestInfo); static Uint32 getRangeScanFlag(const Uint32 & requestInfo);
static Uint32 getDescendingFlag(const Uint32 & requestInfo);
static Uint32 getAttrLen(const Uint32 & requestInfo); static Uint32 getAttrLen(const Uint32 & requestInfo);
static Uint32 getScanPrio(const Uint32 & requestInfo); static Uint32 getScanPrio(const Uint32 & requestInfo);
...@@ -64,6 +65,7 @@ public: ...@@ -64,6 +65,7 @@ public:
static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo); static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo);
static void setReadCommittedFlag(Uint32 & requestInfo, Uint32 readCommitted); static void setReadCommittedFlag(Uint32 & requestInfo, Uint32 readCommitted);
static void setRangeScanFlag(Uint32 & requestInfo, Uint32 rangeScan); static void setRangeScanFlag(Uint32 & requestInfo, Uint32 rangeScan);
static void setDescendingFlag(Uint32 & requestInfo, Uint32 descending);
static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen); static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen);
static void setScanPrio(Uint32& requestInfo, Uint32 prio); static void setScanPrio(Uint32& requestInfo, Uint32 prio);
}; };
...@@ -197,11 +199,12 @@ public: ...@@ -197,11 +199,12 @@ public:
* k = Keyinfo - 1 Bit 8 * k = Keyinfo - 1 Bit 8
* r = read committed - 1 Bit 9 * r = read committed - 1 Bit 9
* x = range scan - 1 Bit 6 * x = range scan - 1 Bit 6
* z = descending - 1 Bit 10
* p = Scan prio - 4 Bits (12-15) -> max 15 * p = Scan prio - 4 Bits (12-15) -> max 15
* *
* 1111111111222222222233 * 1111111111222222222233
* 01234567890123456789012345678901 * 01234567890123456789012345678901
* lxhkr ppppaaaaaaaaaaaaaaaa * lxhkrz ppppaaaaaaaaaaaaaaaa
*/ */
#define SF_LOCK_MODE_SHIFT (5) #define SF_LOCK_MODE_SHIFT (5)
#define SF_LOCK_MODE_MASK (1) #define SF_LOCK_MODE_MASK (1)
...@@ -210,6 +213,7 @@ public: ...@@ -210,6 +213,7 @@ public:
#define SF_KEYINFO_SHIFT (8) #define SF_KEYINFO_SHIFT (8)
#define SF_READ_COMMITTED_SHIFT (9) #define SF_READ_COMMITTED_SHIFT (9)
#define SF_RANGE_SCAN_SHIFT (6) #define SF_RANGE_SCAN_SHIFT (6)
#define SF_DESCENDING_SHIFT (10)
#define SF_ATTR_LEN_SHIFT (16) #define SF_ATTR_LEN_SHIFT (16)
#define SF_ATTR_LEN_MASK (65535) #define SF_ATTR_LEN_MASK (65535)
...@@ -241,6 +245,12 @@ ScanFragReq::getRangeScanFlag(const Uint32 & requestInfo){ ...@@ -241,6 +245,12 @@ ScanFragReq::getRangeScanFlag(const Uint32 & requestInfo){
return (requestInfo >> SF_RANGE_SCAN_SHIFT) & 1; return (requestInfo >> SF_RANGE_SCAN_SHIFT) & 1;
} }
inline
Uint32
ScanFragReq::getDescendingFlag(const Uint32 & requestInfo){
return (requestInfo >> SF_DESCENDING_SHIFT) & 1;
}
inline inline
Uint32 Uint32
ScanFragReq::getReadCommittedFlag(const Uint32 & requestInfo){ ScanFragReq::getReadCommittedFlag(const Uint32 & requestInfo){
...@@ -301,6 +311,13 @@ ScanFragReq::setRangeScanFlag(UintR & requestInfo, UintR val){ ...@@ -301,6 +311,13 @@ ScanFragReq::setRangeScanFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << SF_RANGE_SCAN_SHIFT); requestInfo |= (val << SF_RANGE_SCAN_SHIFT);
} }
inline
void
ScanFragReq::setDescendingFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "ScanFragReq::setDescendingFlag");
requestInfo |= (val << SF_DESCENDING_SHIFT);
}
inline inline
void void
ScanFragReq::setAttrLen(UintR & requestInfo, UintR val){ ScanFragReq::setAttrLen(UintR & requestInfo, UintR val){
......
...@@ -35,6 +35,7 @@ class ScanTabReq { ...@@ -35,6 +35,7 @@ class ScanTabReq {
*/ */
friend class NdbConnection; friend class NdbConnection;
friend class NdbScanOperation; friend class NdbScanOperation;
friend class NdbIndexScanOperation;
/** /**
* For printing * For printing
...@@ -79,6 +80,7 @@ private: ...@@ -79,6 +80,7 @@ private:
static Uint8 getHoldLockFlag(const UintR & requestInfo); static Uint8 getHoldLockFlag(const UintR & requestInfo);
static Uint8 getReadCommittedFlag(const UintR & requestInfo); static Uint8 getReadCommittedFlag(const UintR & requestInfo);
static Uint8 getRangeScanFlag(const UintR & requestInfo); static Uint8 getRangeScanFlag(const UintR & requestInfo);
static Uint8 getDescendingFlag(const UintR & requestInfo);
static Uint8 getKeyinfoFlag(const UintR & requestInfo); static Uint8 getKeyinfoFlag(const UintR & requestInfo);
static Uint16 getScanBatch(const UintR & requestInfo); static Uint16 getScanBatch(const UintR & requestInfo);
static Uint8 getDistributionKeyFlag(const UintR & requestInfo); static Uint8 getDistributionKeyFlag(const UintR & requestInfo);
...@@ -92,6 +94,7 @@ private: ...@@ -92,6 +94,7 @@ private:
static void setHoldLockFlag(UintR & requestInfo, Uint32 flag); static void setHoldLockFlag(UintR & requestInfo, Uint32 flag);
static void setReadCommittedFlag(UintR & requestInfo, Uint32 flag); static void setReadCommittedFlag(UintR & requestInfo, Uint32 flag);
static void setRangeScanFlag(UintR & requestInfo, Uint32 flag); static void setRangeScanFlag(UintR & requestInfo, Uint32 flag);
static void setDescendingFlag(UintR & requestInfo, Uint32 flag);
static void setKeyinfoFlag(UintR & requestInfo, Uint32 flag); static void setKeyinfoFlag(UintR & requestInfo, Uint32 flag);
static void setScanBatch(Uint32& requestInfo, Uint32 sz); static void setScanBatch(Uint32& requestInfo, Uint32 sz);
static void setDistributionKeyFlag(Uint32& requestInfo, Uint32 flag); static void setDistributionKeyFlag(Uint32& requestInfo, Uint32 flag);
...@@ -105,13 +108,14 @@ private: ...@@ -105,13 +108,14 @@ private:
h = Hold lock mode - 1 Bit 10 h = Hold lock mode - 1 Bit 10
c = Read Committed - 1 Bit 11 c = Read Committed - 1 Bit 11
k = Keyinfo - 1 Bit 12 k = Keyinfo - 1 Bit 12
z = Descending (TUX) - 1 Bit 14
x = Range Scan (TUX) - 1 Bit 15 x = Range Scan (TUX) - 1 Bit 15
b = Scan batch - 10 Bit 16-25 (max 1023) b = Scan batch - 10 Bit 16-25 (max 1023)
d = Distribution key flag d = Distribution key flag
1111111111222222222233 1111111111222222222233
01234567890123456789012345678901 01234567890123456789012345678901
ppppppppl hck xbbbbbbbbbb ppppppppl hck zxbbbbbbbbbb
*/ */
#define PARALLELL_SHIFT (0) #define PARALLELL_SHIFT (0)
...@@ -132,6 +136,9 @@ private: ...@@ -132,6 +136,9 @@ private:
#define RANGE_SCAN_SHIFT (15) #define RANGE_SCAN_SHIFT (15)
#define RANGE_SCAN_MASK (1) #define RANGE_SCAN_MASK (1)
#define DESCENDING_SHIFT (14)
#define DESCENDING_MASK (1)
#define SCAN_BATCH_SHIFT (16) #define SCAN_BATCH_SHIFT (16)
#define SCAN_BATCH_MASK (1023) #define SCAN_BATCH_MASK (1023)
...@@ -167,6 +174,12 @@ ScanTabReq::getRangeScanFlag(const UintR & requestInfo){ ...@@ -167,6 +174,12 @@ ScanTabReq::getRangeScanFlag(const UintR & requestInfo){
return (Uint8)((requestInfo >> RANGE_SCAN_SHIFT) & RANGE_SCAN_MASK); return (Uint8)((requestInfo >> RANGE_SCAN_SHIFT) & RANGE_SCAN_MASK);
} }
inline
Uint8
ScanTabReq::getDescendingFlag(const UintR & requestInfo){
return (Uint8)((requestInfo >> DESCENDING_SHIFT) & DESCENDING_MASK);
}
inline inline
Uint16 Uint16
ScanTabReq::getScanBatch(const Uint32 & requestInfo){ ScanTabReq::getScanBatch(const Uint32 & requestInfo){
...@@ -214,6 +227,13 @@ ScanTabReq::setRangeScanFlag(UintR & requestInfo, Uint32 flag){ ...@@ -214,6 +227,13 @@ ScanTabReq::setRangeScanFlag(UintR & requestInfo, Uint32 flag){
requestInfo |= (flag << RANGE_SCAN_SHIFT); requestInfo |= (flag << RANGE_SCAN_SHIFT);
} }
inline
void
ScanTabReq::setDescendingFlag(UintR & requestInfo, Uint32 flag){
ASSERT_BOOL(flag, "ScanTabReq::setDescendingFlag");
requestInfo |= (flag << DESCENDING_SHIFT);
}
inline inline
void void
ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){ ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){
......
...@@ -39,6 +39,7 @@ public: ...@@ -39,6 +39,7 @@ public:
* @param batch No of rows to fetch from each fragment at a time * @param batch No of rows to fetch from each fragment at a time
* @param LockMode Scan lock handling * @param LockMode Scan lock handling
* @param order_by Order result set in index order * @param order_by Order result set in index order
* @param order_desc Order descending, ignored unless order_by
* @returns NdbResultSet. * @returns NdbResultSet.
* @see NdbScanOperation::readTuples * @see NdbScanOperation::readTuples
*/ */
...@@ -46,6 +47,7 @@ public: ...@@ -46,6 +47,7 @@ public:
Uint32 batch = 0, Uint32 batch = 0,
Uint32 parallel = 0, Uint32 parallel = 0,
bool order_by = false, bool order_by = false,
bool order_desc = false,
bool read_range_no = false); bool read_range_no = false);
inline int readTuples(int parallell){ inline int readTuples(int parallell){
...@@ -128,6 +130,7 @@ public: ...@@ -128,6 +130,7 @@ public:
int get_range_no(); int get_range_no();
bool getSorted() const { return m_ordered; } bool getSorted() const { return m_ordered; }
bool getDescending() const { return m_descending; }
private: private:
NdbIndexScanOperation(Ndb* aNdb); NdbIndexScanOperation(Ndb* aNdb);
virtual ~NdbIndexScanOperation(); virtual ~NdbIndexScanOperation();
......
...@@ -232,7 +232,8 @@ protected: ...@@ -232,7 +232,8 @@ protected:
int getKeyFromKEYINFO20(Uint32* data, unsigned size); int getKeyFromKEYINFO20(Uint32* data, unsigned size);
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*); NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
Uint32 m_ordered; bool m_ordered;
bool m_descending;
Uint32 m_read_range_no; Uint32 m_read_range_no;
}; };
......
...@@ -30,13 +30,14 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv ...@@ -30,13 +30,14 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " apiConnectPtr: H\'%.8x", fprintf(output, " apiConnectPtr: H\'%.8x",
sig->apiConnectPtr); sig->apiConnectPtr);
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo); fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u ReadCommitted: %u\n DistributionKeyFlag: %u", fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u Keyinfo: %u Holdlock: %u RangeScan: %u Descending: %u ReadCommitted: %u\n DistributionKeyFlag: %u",
sig->getParallelism(requestInfo), sig->getParallelism(requestInfo),
sig->getScanBatch(requestInfo), sig->getScanBatch(requestInfo),
sig->getLockMode(requestInfo), sig->getLockMode(requestInfo),
sig->getKeyinfoFlag(requestInfo), sig->getKeyinfoFlag(requestInfo),
sig->getHoldLockFlag(requestInfo), sig->getHoldLockFlag(requestInfo),
sig->getRangeScanFlag(requestInfo), sig->getRangeScanFlag(requestInfo),
sig->getDescendingFlag(requestInfo),
sig->getReadCommittedFlag(requestInfo), sig->getReadCommittedFlag(requestInfo),
sig->getDistributionKeyFlag(requestInfo)); sig->getDistributionKeyFlag(requestInfo));
......
...@@ -572,6 +572,7 @@ public: ...@@ -572,6 +572,7 @@ public:
Uint8 scanLockMode; Uint8 scanLockMode;
Uint8 readCommitted; Uint8 readCommitted;
Uint8 rangeScan; Uint8 rangeScan;
Uint8 descending;
Uint8 scanTcWaiting; Uint8 scanTcWaiting;
Uint8 scanKeyinfoFlag; Uint8 scanKeyinfoFlag;
Uint8 m_last_row; Uint8 m_last_row;
......
...@@ -7637,6 +7637,7 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal) ...@@ -7637,6 +7637,7 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal)
req->requestInfo = 0; req->requestInfo = 0;
AccScanReq::setLockMode(req->requestInfo, scanptr.p->scanLockMode); AccScanReq::setLockMode(req->requestInfo, scanptr.p->scanLockMode);
AccScanReq::setReadCommittedFlag(req->requestInfo, scanptr.p->readCommitted); AccScanReq::setReadCommittedFlag(req->requestInfo, scanptr.p->readCommitted);
AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending);
req->transId1 = tcConnectptr.p->transid[0]; req->transId1 = tcConnectptr.p->transid[0];
req->transId2 = tcConnectptr.p->transid[1]; req->transId2 = tcConnectptr.p->transid[1];
req->savePointId = tcConnectptr.p->savePointId; req->savePointId = tcConnectptr.p->savePointId;
...@@ -8628,6 +8629,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8628,6 +8629,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo); const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo); const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo);
const Uint32 idx = ScanFragReq::getRangeScanFlag(reqinfo); const Uint32 idx = ScanFragReq::getRangeScanFlag(reqinfo);
const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo);
const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo); const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo);
const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo); const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo);
...@@ -8649,6 +8651,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8649,6 +8651,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanLockMode = scanLockMode; scanptr.p->scanLockMode = scanLockMode;
scanptr.p->readCommitted = readCommitted; scanptr.p->readCommitted = readCommitted;
scanptr.p->rangeScan = idx; scanptr.p->rangeScan = idx;
scanptr.p->descending = descending;
scanptr.p->scanState = ScanRecord::SCAN_FREE; scanptr.p->scanState = ScanRecord::SCAN_FREE;
scanptr.p->scanFlag = ZFALSE; scanptr.p->scanFlag = ZFALSE;
scanptr.p->scanLocalref[0] = 0; scanptr.p->scanLocalref[0] = 0;
......
...@@ -8791,6 +8791,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, ...@@ -8791,6 +8791,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ScanFragReq::setKeyinfoFlag(tmp, ScanTabReq::getKeyinfoFlag(ri)); ScanFragReq::setKeyinfoFlag(tmp, ScanTabReq::getKeyinfoFlag(ri));
ScanFragReq::setReadCommittedFlag(tmp,ScanTabReq::getReadCommittedFlag(ri)); ScanFragReq::setReadCommittedFlag(tmp,ScanTabReq::getReadCommittedFlag(ri));
ScanFragReq::setRangeScanFlag(tmp, ScanTabReq::getRangeScanFlag(ri)); ScanFragReq::setRangeScanFlag(tmp, ScanTabReq::getRangeScanFlag(ri));
ScanFragReq::setDescendingFlag(tmp, ScanTabReq::getDescendingFlag(ri));
ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF); ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF);
scanptr.p->scanRequestInfo = tmp; scanptr.p->scanRequestInfo = tmp;
......
...@@ -406,6 +406,7 @@ private: ...@@ -406,6 +406,7 @@ private:
Uint32 m_accLockOp; Uint32 m_accLockOp;
Uint8 m_readCommitted; // no locking Uint8 m_readCommitted; // no locking
Uint8 m_lockMode; Uint8 m_lockMode;
Uint8 m_descending;
ScanBound m_boundMin; ScanBound m_boundMin;
ScanBound m_boundMax; ScanBound m_boundMax;
ScanBound* m_bound[2]; // pointers to above 2 ScanBound* m_bound[2]; // pointers to above 2
...@@ -638,7 +639,7 @@ private: ...@@ -638,7 +639,7 @@ private:
void execACCKEYREF(Signal* signal); void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal); void execACC_ABORTCONF(Signal* signal);
void scanFirst(ScanOpPtr scanPtr); void scanFirst(ScanOpPtr scanPtr);
void scanNext(ScanOpPtr scanPtr); void scanNext(ScanOpPtr scanPtr, bool fromMaintReq);
bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr); void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
...@@ -650,7 +651,9 @@ private: ...@@ -650,7 +651,9 @@ private:
*/ */
void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/* /*
* DbtuxCmp.cpp * DbtuxCmp.cpp
...@@ -1029,6 +1032,7 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) : ...@@ -1029,6 +1032,7 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_accLockOp(RNIL), m_accLockOp(RNIL),
m_readCommitted(0), m_readCommitted(0),
m_lockMode(0), m_lockMode(0),
m_descending(0),
m_boundMin(scanBoundPool), m_boundMin(scanBoundPool),
m_boundMax(scanBoundPool), m_boundMax(scanBoundPool),
m_scanPos(), m_scanPos(),
......
...@@ -103,7 +103,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons ...@@ -103,7 +103,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
* *
* Following example illustrates this. We are at (a=2, b=3). * Following example illustrates this. We are at (a=2, b=3).
* *
* dir bounds strict return * idir bounds strict return
* 0 a >= 2 and b >= 3 no -1 * 0 a >= 2 and b >= 3 no -1
* 0 a >= 2 and b > 3 yes +1 * 0 a >= 2 and b > 3 yes +1
* 1 a <= 2 and b <= 3 no +1 * 1 a <= 2 and b <= 3 no +1
...@@ -112,11 +112,11 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons ...@@ -112,11 +112,11 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
* The attributes are normalized and have variable size given in words. * The attributes are normalized and have variable size given in words.
*/ */
int int
Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen) Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
{ {
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// direction 0-lower 1-upper // direction 0-lower 1-upper
ndbrequire(dir <= 1); ndbrequire(idir <= 1);
// number of words of data left // number of words of data left
unsigned len2 = maxlen; unsigned len2 = maxlen;
// in case of no bounds, init last type to something non-strict // in case of no bounds, init last type to something non-strict
...@@ -171,5 +171,5 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -171,5 +171,5 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
} }
// all attributes were equal // all attributes were equal
const int strict = (type & 0x1); const int strict = (type & 0x1);
return (dir == 0 ? (strict == 0 ? -1 : +1) : (strict == 0 ? +1 : -1)); return (idir == 0 ? (strict == 0 ? -1 : +1) : (strict == 0 ? +1 : -1));
} }
...@@ -347,6 +347,7 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan) ...@@ -347,6 +347,7 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
out << "]"; out << "]";
out << " [readCommitted " << dec << scan.m_readCommitted << "]"; out << " [readCommitted " << dec << scan.m_readCommitted << "]";
out << " [lockMode " << dec << scan.m_lockMode << "]"; out << " [lockMode " << dec << scan.m_lockMode << "]";
out << " [descending " << dec << scan.m_descending << "]";
out << " [pos " << scan.m_scanPos << "]"; out << " [pos " << scan.m_scanPos << "]";
out << " [ent " << scan.m_scanEnt << "]"; out << " [ent " << scan.m_scanEnt << "]";
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
......
...@@ -491,7 +491,7 @@ Dbtux::moveScanList(NodeHandle& node, unsigned pos) ...@@ -491,7 +491,7 @@ Dbtux::moveScanList(NodeHandle& node, unsigned pos)
debugOut << "At pos=" << pos << " " << node << endl; debugOut << "At pos=" << pos << " " << node << endl;
} }
#endif #endif
scanNext(scanPtr); scanNext(scanPtr, true);
ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos)); ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
} }
scanPtr.i = nextPtrI; scanPtr.i = nextPtrI;
......
...@@ -74,17 +74,18 @@ Dbtux::execACC_SCANREQ(Signal* signal) ...@@ -74,17 +74,18 @@ Dbtux::execACC_SCANREQ(Signal* signal)
scanPtr.p->m_savePointId = req->savePointId; scanPtr.p->m_savePointId = req->savePointId;
scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo); scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo);
scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo); scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo);
#ifdef VM_TRACE scanPtr.p->m_descending = AccScanReq::getDescendingFlag(req->requestInfo);
if (debugFlags & DebugScan) {
debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl;
}
#endif
/* /*
* readCommitted lockMode keyInfo * readCommitted lockMode keyInfo
* 1 0 0 - read committed (no lock) * 1 0 0 - read committed (no lock)
* 0 0 0 - read latest (read lock) * 0 0 0 - read latest (read lock)
* 0 1 1 - read exclusive (write lock) * 0 1 1 - read exclusive (write lock)
*/ */
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl;
}
#endif
// conf // conf
AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend(); AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
conf->scanPtr = req->senderData; conf->scanPtr = req->senderData;
...@@ -418,7 +419,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -418,7 +419,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::Next) { if (scan.m_state == ScanOp::Next) {
jam(); jam();
// look for next // look for next
scanNext(scanPtr); scanNext(scanPtr, false);
} }
// for reading tuple key in Current or Locked state // for reading tuple key in Current or Locked state
Data pkData = c_dataBuffer; Data pkData = c_dataBuffer;
...@@ -697,8 +698,10 @@ Dbtux::scanFirst(ScanOpPtr scanPtr) ...@@ -697,8 +698,10 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
// set up index keys for this operation // set up index keys for this operation
setKeyAttrs(frag); setKeyAttrs(frag);
// unpack lower bound into c_dataBuffer // scan direction 0, 1
const ScanBound& bound = *scan.m_bound[0]; const unsigned idir = scan.m_descending;
// unpack start key into c_dataBuffer
const ScanBound& bound = *scan.m_bound[idir];
ScanBoundIterator iter; ScanBoundIterator iter;
bound.first(iter); bound.first(iter);
for (unsigned j = 0; j < bound.getSize(); j++) { for (unsigned j = 0; j < bound.getSize(); j++) {
...@@ -706,11 +709,10 @@ Dbtux::scanFirst(ScanOpPtr scanPtr) ...@@ -706,11 +709,10 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
c_dataBuffer[j] = *iter.data; c_dataBuffer[j] = *iter.data;
bound.next(iter); bound.next(iter);
} }
// search for scan start position
TreePos treePos; TreePos treePos;
searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos); searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos);
if (treePos.m_loc == NullTupLoc) { if (treePos.m_loc == NullTupLoc) {
// empty tree // empty result set
jam(); jam();
scan.m_state = ScanOp::Last; scan.m_state = ScanOp::Last;
return; return;
...@@ -728,7 +730,8 @@ Dbtux::scanFirst(ScanOpPtr scanPtr) ...@@ -728,7 +730,8 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
* Move to next entry. The scan is already linked to some node. When * Move to next entry. The scan is already linked to some node. When
* we leave, if an entry was found, it will be linked to a possibly * we leave, if an entry was found, it will be linked to a possibly
* different node. The scan has a position, and a direction which tells * different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of: * from where we came to this position. This is one of (all comments
* are in terms of ascending scan):
* *
* 0 - up from left child (scan this node next) * 0 - up from left child (scan this node next)
* 1 - up from right child (proceed to parent) * 1 - up from right child (proceed to parent)
...@@ -740,7 +743,7 @@ Dbtux::scanFirst(ScanOpPtr scanPtr) ...@@ -740,7 +743,7 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
* re-organizations need not worry about scan direction. * re-organizations need not worry about scan direction.
*/ */
void void
Dbtux::scanNext(ScanOpPtr scanPtr) Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
...@@ -753,8 +756,11 @@ Dbtux::scanNext(ScanOpPtr scanPtr) ...@@ -753,8 +756,11 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
ndbrequire(scan.m_state != ScanOp::Locked); ndbrequire(scan.m_state != ScanOp::Locked);
// set up index keys for this operation // set up index keys for this operation
setKeyAttrs(frag); setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer // scan direction
const ScanBound& bound = *scan.m_bound[1]; const unsigned idir = scan.m_descending; // 0, 1
const int jdir = 1 - 2 * (int)idir; // 1, -1
// unpack end key into c_dataBuffer
const ScanBound& bound = *scan.m_bound[1 - idir];
ScanBoundIterator iter; ScanBoundIterator iter;
bound.first(iter); bound.first(iter);
for (unsigned j = 0; j < bound.getSize(); j++) { for (unsigned j = 0; j < bound.getSize(); j++) {
...@@ -774,6 +780,11 @@ Dbtux::scanNext(ScanOpPtr scanPtr) ...@@ -774,6 +780,11 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
TreeEnt ent; TreeEnt ent;
while (true) { while (true) {
jam(); jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Scan next pos " << pos << " " << node << endl;
}
#endif
if (pos.m_dir == 2) { if (pos.m_dir == 2) {
// coming up from root ends the scan // coming up from root ends the scan
jam(); jam();
...@@ -788,7 +799,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr) ...@@ -788,7 +799,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
if (pos.m_dir == 4) { if (pos.m_dir == 4) {
// coming down from parent proceed to left child // coming down from parent proceed to left child
jam(); jam();
TupLoc loc = node.getLink(0); TupLoc loc = node.getLink(idir);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
pos.m_loc = loc; pos.m_loc = loc;
...@@ -796,34 +807,42 @@ Dbtux::scanNext(ScanOpPtr scanPtr) ...@@ -796,34 +807,42 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
continue; continue;
} }
// pretend we came from left child // pretend we came from left child
pos.m_dir = 0; pos.m_dir = idir;
}
const unsigned occup = node.getOccup();
if (occup == 0) {
jam();
ndbrequire(fromMaintReq);
// move back to parent - see comment in treeRemoveInner
pos.m_loc = node.getLink(2);
pos.m_dir = node.getSide();
continue;
} }
if (pos.m_dir == 0) { if (pos.m_dir == idir) {
// coming up from left child scan current node // coming up from left child scan current node
jam(); jam();
pos.m_pos = 0; pos.m_pos = idir == 0 ? 0 : occup - 1;
pos.m_match = false; pos.m_match = false;
pos.m_dir = 3; pos.m_dir = 3;
} }
if (pos.m_dir == 3) { if (pos.m_dir == 3) {
// within node // within node
jam(); jam();
unsigned occup = node.getOccup();
ndbrequire(occup >= 1);
// advance position // advance position
if (! pos.m_match) if (! pos.m_match)
pos.m_match = true; pos.m_match = true;
else else
pos.m_pos++; // becomes ZNIL (which is > occup) if 0 and scan descending
pos.m_pos += jdir;
if (pos.m_pos < occup) { if (pos.m_pos < occup) {
jam(); jam();
ent = node.getEnt(pos.m_pos); ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged pos.m_dir = 3; // unchanged
// read and compare all attributes // read and compare all attributes
readKeyAttrs(frag, ent, 0, c_entryKey); readKeyAttrs(frag, ent, 0, c_entryKey);
int ret = cmpScanBound(frag, 1, c_dataBuffer, scan.m_boundCnt[1], c_entryKey); int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, scan.m_boundCnt[1 - idir], c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown); ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) { if (jdir * ret < 0) {
jam(); jam();
// hit upper bound of single range scan // hit upper bound of single range scan
pos.m_loc = NullTupLoc; pos.m_loc = NullTupLoc;
...@@ -840,7 +859,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr) ...@@ -840,7 +859,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
break; break;
} }
// after node proceed to right child // after node proceed to right child
TupLoc loc = node.getLink(1); TupLoc loc = node.getLink(1 - idir);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
pos.m_loc = loc; pos.m_loc = loc;
...@@ -848,9 +867,9 @@ Dbtux::scanNext(ScanOpPtr scanPtr) ...@@ -848,9 +867,9 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
continue; continue;
} }
// pretend we came from right child // pretend we came from right child
pos.m_dir = 1; pos.m_dir = 1 - idir;
} }
if (pos.m_dir == 1) { if (pos.m_dir == 1 - idir) {
// coming up from right child proceed to parent // coming up from right child proceed to parent
jam(); jam();
pos.m_loc = node.getLink(2); pos.m_loc = node.getLink(2);
......
...@@ -253,22 +253,33 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo ...@@ -253,22 +253,33 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo
/* /*
* Search for scan start position. * Search for scan start position.
* *
* Similar to searchToAdd. * Similar to searchToAdd. The routines differ somewhat depending on
* scan direction and are done by separate methods.
*/ */
void void
Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos) Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag); if (tree.m_root != NullTupLoc) {
currNode.m_loc = tree.m_root; if (! descending)
if (currNode.m_loc == NullTupLoc) { searchToScanAscending(frag, boundInfo, boundCount, treePos);
// empty tree else
jam(); searchToScanDescending(frag, boundInfo, boundCount, treePos);
treePos.m_match = false;
return; return;
} }
// empty tree
}
void
Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
// always before entry
treePos.m_match = false;
while (true) { while (true) {
jam(); jam();
selectNode(currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
...@@ -283,6 +294,7 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo ...@@ -283,6 +294,7 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo
ndbrequire(ret != NdbSqlUtil::CmpUnknown); ndbrequire(ret != NdbSqlUtil::CmpUnknown);
} }
if (ret < 0) { if (ret < 0) {
// bound is left of this node
jam(); jam();
const TupLoc loc = currNode.getLink(0); const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
...@@ -300,11 +312,11 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo ...@@ -300,11 +312,11 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo
// start scanning this node // start scanning this node
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3; treePos.m_dir = 3;
return; return;
} }
} else if (ret > 0) { } else if (ret > 0) {
// bound is at or right of this node
jam(); jam();
const TupLoc loc = currNode.getLink(1); const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
...@@ -316,7 +328,7 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo ...@@ -316,7 +328,7 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo
continue; continue;
} }
} else { } else {
ndbassert(false); ndbrequire(false);
} }
break; break;
} }
...@@ -328,20 +340,19 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo ...@@ -328,20 +340,19 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo
ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey); ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown); ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) { if (ret < 0) {
// start scanning from current entry // found first entry satisfying the bound
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
treePos.m_pos = j; treePos.m_pos = j;
treePos.m_match = false;
treePos.m_dir = 3; treePos.m_dir = 3;
return; return;
} }
} }
// bound is to right of this node
if (! bottomNode.isNull()) { if (! bottomNode.isNull()) {
jam(); jam();
// start scanning the l.u.b // start scanning the l.u.b
treePos.m_loc = bottomNode.m_loc; treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3; treePos.m_dir = 3;
return; return;
} }
...@@ -349,3 +360,90 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo ...@@ -349,3 +360,90 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
treePos.m_dir = 1; treePos.m_dir = 1;
} }
void
Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag);
// always before entry
treePos.m_match = false;
while (true) {
jam();
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
ret = cmpScanBound(frag, 1, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare all attributes
readKeyAttrs(frag, currNode.getMinMax(0), 0, c_entryKey);
ret = cmpScanBound(frag, 1, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret < 0) {
// bound is left of this node
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b but remember the bottom node
bottomNode = currNode;
currNode = glbNode;
} else {
// empty result set
return;
}
} else if (ret > 0) {
// bound is at or right of this node
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
ndbrequire(false);
}
break;
}
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
// read and compare attributes
readKeyAttrs(frag, currNode.getEnt(j), 0, c_entryKey);
ret = cmpScanBound(frag, 1, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
if (j > 0) {
// start scanning from previous entry
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j - 1;
treePos.m_dir = 3;
return;
}
// start scanning upwards (pretend we came from left child)
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_dir = 0;
return;
}
}
// start scanning this node
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup() - 1;
treePos.m_dir = 3;
}
...@@ -226,6 +226,9 @@ Dbtux::treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos) ...@@ -226,6 +226,9 @@ Dbtux::treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos)
// borrow max entry from semi/leaf // borrow max entry from semi/leaf
Uint32 scanList = RNIL; Uint32 scanList = RNIL;
nodePopDown(glbNode, glbNode.getOccup() - 1, ent, &scanList); nodePopDown(glbNode, glbNode.getOccup() - 1, ent, &scanList);
// g.l.b may be empty now
// a descending scan may try to enter the empty g.l.b
// we prevent this in scanNext
nodePopUp(lubNode, pos, ent, scanList); nodePopUp(lubNode, pos, ent, scanList);
if (glbNode.getLink(0) != NullTupLoc) { if (glbNode.getLink(0) != NullTupLoc) {
jam(); jam();
......
...@@ -116,7 +116,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -116,7 +116,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 batch, Uint32 batch,
Uint32 parallel) Uint32 parallel)
{ {
m_ordered = 0; m_ordered = m_descending = false;
Uint32 fragCount = m_currentTable->m_fragmentCount; Uint32 fragCount = m_currentTable->m_fragmentCount;
if (parallel > fragCount || parallel == 0) { if (parallel > fragCount || parallel == 0) {
...@@ -1191,9 +1191,10 @@ NdbIndexScanOperation::readTuples(LockMode lm, ...@@ -1191,9 +1191,10 @@ NdbIndexScanOperation::readTuples(LockMode lm,
Uint32 batch, Uint32 batch,
Uint32 parallel, Uint32 parallel,
bool order_by, bool order_by,
bool order_desc,
bool read_range_no){ bool read_range_no){
int res = NdbScanOperation::readTuples(lm, batch, 0); int res = NdbScanOperation::readTuples(lm, batch, 0);
if(read_range_no) if(!res && read_range_no)
{ {
m_read_range_no = 1; m_read_range_no = 1;
Uint32 word = 0; Uint32 word = 0;
...@@ -1202,7 +1203,12 @@ NdbIndexScanOperation::readTuples(LockMode lm, ...@@ -1202,7 +1203,12 @@ NdbIndexScanOperation::readTuples(LockMode lm,
res = -1; res = -1;
} }
if(!res && order_by){ if(!res && order_by){
m_ordered = 1; m_ordered = true;
if (order_desc) {
m_descending = true;
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
ScanTabReq::setDescendingFlag(req->requestInfo, true);
}
Uint32 cnt = m_accessTable->getNoOfColumns() - 1; Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
m_sort_columns = cnt; // -1 for NDB$NODE m_sort_columns = cnt; // -1 for NDB$NODE
m_current_api_receiver = m_sent_receivers_count; m_current_api_receiver = m_sent_receivers_count;
...@@ -1266,12 +1272,14 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, ...@@ -1266,12 +1272,14 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
r1 = (skip ? r1->next() : r1); r1 = (skip ? r1->next() : r1);
r2 = (skip ? r2->next() : r2); r2 = (skip ? r2->next() : r2);
const int jdir = 1 - 2 * (int)m_descending;
assert(jdir == 1 || jdir == -1);
while(cols > 0){ while(cols > 0){
Uint32 * d1 = (Uint32*)r1->aRef(); Uint32 * d1 = (Uint32*)r1->aRef();
Uint32 * d2 = (Uint32*)r2->aRef(); Uint32 * d2 = (Uint32*)r2->aRef();
unsigned r1_null = r1->isNULL(); unsigned r1_null = r1->isNULL();
if((r1_null ^ (unsigned)r2->isNULL())){ if((r1_null ^ (unsigned)r2->isNULL())){
return (r1_null ? -1 : 1); return (r1_null ? -1 : 1) * jdir;
} }
const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column); const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
Uint32 len = r1->theAttrSize * r1->theArraySize; Uint32 len = r1->theAttrSize * r1->theArraySize;
...@@ -1280,7 +1288,7 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, ...@@ -1280,7 +1288,7 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true); int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true);
if(r){ if(r){
assert(r != NdbSqlUtil::CmpUnknown); assert(r != NdbSqlUtil::CmpUnknown);
return r; return r * jdir;
} }
} }
cols--; cols--;
......
...@@ -34,13 +34,14 @@ int scanReadRecords(Ndb*, ...@@ -34,13 +34,14 @@ int scanReadRecords(Ndb*,
bool headers, bool headers,
bool useHexFormat, bool useHexFormat,
char delim, char delim,
bool orderby); bool orderby,
bool descending);
static const char* opt_connect_str= 0; static const char* opt_connect_str= 0;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static const char* _delimiter = "\t"; static const char* _delimiter = "\t";
static int _unqualified, _header, _parallelism, _useHexFormat, _lock, static int _unqualified, _header, _parallelism, _useHexFormat, _lock,
_order; _order, _descending;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
...@@ -57,6 +58,9 @@ static struct my_option my_long_options[] = ...@@ -57,6 +58,9 @@ static struct my_option my_long_options[] =
{ "order", 'o', "Sort resultset according to index", { "order", 'o', "Sort resultset according to index",
(gptr*) &_order, (gptr*) &_order, 0, (gptr*) &_order, (gptr*) &_order, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "descending", 'z', "Sort descending (requires order flag)",
(gptr*) &_descending, (gptr*) &_descending, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "header", 'h', "Print header", { "header", 'h', "Print header",
(gptr*) &_header, (gptr*) &_header, 0, (gptr*) &_header, (gptr*) &_header, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
...@@ -151,6 +155,11 @@ int main(int argc, char** argv){ ...@@ -151,6 +155,11 @@ int main(int argc, char** argv){
return NDBT_ProgramExit(NDBT_WRONGARGS); return NDBT_ProgramExit(NDBT_WRONGARGS);
} }
if (_descending && ! _order) {
ndbout << " Descending flag given without order flag" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
if (scanReadRecords(&MyNdb, if (scanReadRecords(&MyNdb,
pTab, pTab,
pIdx, pIdx,
...@@ -158,7 +167,7 @@ int main(int argc, char** argv){ ...@@ -158,7 +167,7 @@ int main(int argc, char** argv){
_lock, _lock,
_header, _header,
_useHexFormat, _useHexFormat,
(char)*_delimiter, _order) != 0){ (char)*_delimiter, _order, _descending) != 0){
return NDBT_ProgramExit(NDBT_FAILED); return NDBT_ProgramExit(NDBT_FAILED);
} }
...@@ -173,7 +182,7 @@ int scanReadRecords(Ndb* pNdb, ...@@ -173,7 +182,7 @@ int scanReadRecords(Ndb* pNdb,
int _lock, int _lock,
bool headers, bool headers,
bool useHexFormat, bool useHexFormat,
char delimiter, bool order){ char delimiter, bool order, bool descending){
int retryAttempt = 0; int retryAttempt = 0;
const int retryMax = 100; const int retryMax = 100;
...@@ -225,13 +234,13 @@ int scanReadRecords(Ndb* pNdb, ...@@ -225,13 +234,13 @@ int scanReadRecords(Ndb* pNdb,
break; break;
case 3: case 3:
rs = pIOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, parallel, rs = pIOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, parallel,
true); true, descending);
break; break;
case 4: case 4:
rs = pIOp->readTuples(NdbScanOperation::LM_Read, 0, parallel, true); rs = pIOp->readTuples(NdbScanOperation::LM_Read, 0, parallel, true, descending);
break; break;
case 5: case 5:
rs = pIOp->readTuples(NdbScanOperation::LM_Exclusive, 0, parallel, true); rs = pIOp->readTuples(NdbScanOperation::LM_Exclusive, 0, parallel, true, descending);
break; break;
case 0: case 0:
default: default:
......
...@@ -4926,7 +4926,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4926,7 +4926,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
end_of_buffer -= reclength; end_of_buffer -= reclength;
} }
else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab))
&& !scanOp->readTuples(lm, 0, parallelism, sorted, true) && && !scanOp->readTuples(lm, 0, parallelism, sorted, false, true) &&
!define_read_attrs(end_of_buffer-reclength, scanOp)) !define_read_attrs(end_of_buffer-reclength, scanOp))
{ {
m_multi_cursor= scanOp; m_multi_cursor= scanOp;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment