Commit 2394be4a authored by unknown's avatar unknown

ndb - bug#20334

  fix bug in tup scan wrt LCP


storage/ndb/include/kernel/signaldata/AccScan.hpp:
  Add LCP flag to AccScan and ScanFrag
storage/ndb/include/kernel/signaldata/ScanFrag.hpp:
  Add LCP flag to AccScan and ScanFrag
storage/ndb/src/kernel/blocks/backup/Backup.cpp:
  Backup sets LCP scan when lcp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  Add LCP scan flag
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Add LCP scan flag
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp:
  Fix assignment of scan record wrt LCP and disk
parent a775c47a
...@@ -67,6 +67,9 @@ private: ...@@ -67,6 +67,9 @@ private:
static Uint32 getNRScanFlag(const Uint32 & requestInfo); static Uint32 getNRScanFlag(const Uint32 & requestInfo);
static void setNRScanFlag(Uint32 & requestInfo, Uint32 nr); static void setNRScanFlag(Uint32 & requestInfo, Uint32 nr);
static Uint32 getLcpScanFlag(const Uint32 & requestInfo);
static void setLcpScanFlag(Uint32 & requestInfo, Uint32 nr);
}; };
/** /**
...@@ -77,6 +80,7 @@ private: ...@@ -77,6 +80,7 @@ private:
* z = Descending (TUX) - 1 Bit 6 * z = Descending (TUX) - 1 Bit 6
* d = No disk scan - 1 Bit 7 * d = No disk scan - 1 Bit 7
* n = Node recovery scan - 1 Bit 8 * n = Node recovery scan - 1 Bit 8
* c = LCP scan - 1 Bit 9
* *
* 1111111111222222222233 * 1111111111222222222233
* 01234567890123456789012345678901 * 01234567890123456789012345678901
...@@ -88,6 +92,7 @@ private: ...@@ -88,6 +92,7 @@ private:
#define AS_DESCENDING_SHIFT (6) #define AS_DESCENDING_SHIFT (6)
#define AS_NO_DISK_SCAN (7) #define AS_NO_DISK_SCAN (7)
#define AS_NR_SCAN (8) #define AS_NR_SCAN (8)
#define AS_LCP_SCAN (9)
inline inline
Uint32 Uint32
...@@ -154,6 +159,19 @@ AccScanReq::setNRScanFlag(UintR & requestInfo, UintR val){ ...@@ -154,6 +159,19 @@ AccScanReq::setNRScanFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << AS_NR_SCAN); requestInfo |= (val << AS_NR_SCAN);
} }
inline
Uint32
AccScanReq::getLcpScanFlag(const Uint32 & requestInfo){
return (requestInfo >> AS_LCP_SCAN) & 1;
}
inline
void
AccScanReq::setLcpScanFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "AccScanReq::setNoDiskScanFlag");
requestInfo |= (val << AS_LCP_SCAN);
}
class AccScanConf { class AccScanConf {
/** /**
* Sender(s) * Sender(s)
......
...@@ -61,7 +61,8 @@ public: ...@@ -61,7 +61,8 @@ public:
static Uint32 getAttrLen(const Uint32 & requestInfo); static Uint32 getAttrLen(const Uint32 & requestInfo);
static Uint32 getScanPrio(const Uint32 & requestInfo); static Uint32 getScanPrio(const Uint32 & requestInfo);
static Uint32 getNoDiskFlag(const Uint32 & requestInfo); static Uint32 getNoDiskFlag(const Uint32 & requestInfo);
static Uint32 getLcpScanFlag(const Uint32 & requestInfo);
static void setLockMode(Uint32 & requestInfo, Uint32 lockMode); static void setLockMode(Uint32 & requestInfo, Uint32 lockMode);
static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock); static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock);
static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo); static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo);
...@@ -72,6 +73,7 @@ public: ...@@ -72,6 +73,7 @@ public:
static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen); static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen);
static void setScanPrio(Uint32& requestInfo, Uint32 prio); static void setScanPrio(Uint32& requestInfo, Uint32 prio);
static void setNoDiskFlag(Uint32& requestInfo, Uint32 val); static void setNoDiskFlag(Uint32& requestInfo, Uint32 val);
static void setLcpScanFlag(Uint32 & requestInfo, Uint32 val);
}; };
class KeyInfo20 { class KeyInfo20 {
...@@ -198,6 +200,7 @@ public: ...@@ -198,6 +200,7 @@ public:
* Request Info * Request Info
* *
* a = Length of attrinfo - 16 Bits (16-31) * a = Length of attrinfo - 16 Bits (16-31)
* c = LCP scan - 1 Bit 3
* d = No disk - 1 Bit 4 * d = No disk - 1 Bit 4
* l = Lock Mode - 1 Bit 5 * l = Lock Mode - 1 Bit 5
* h = Hold lock - 1 Bit 7 * h = Hold lock - 1 Bit 7
...@@ -205,7 +208,7 @@ public: ...@@ -205,7 +208,7 @@ public:
* r = read committed - 1 Bit 9 * r = read committed - 1 Bit 9
* x = range scan - 1 Bit 6 * x = range scan - 1 Bit 6
* z = descending - 1 Bit 10 * z = descending - 1 Bit 10
* t = tup scan -1 Bit 11 (implies x=z=0) * t = tup scan - 1 Bit 11 (implies x=z=0)
* p = Scan prio - 4 Bits (12-15) -> max 15 * p = Scan prio - 4 Bits (12-15) -> max 15
* *
* 1111111111222222222233 * 1111111111222222222233
...@@ -222,6 +225,7 @@ public: ...@@ -222,6 +225,7 @@ public:
#define SF_RANGE_SCAN_SHIFT (6) #define SF_RANGE_SCAN_SHIFT (6)
#define SF_DESCENDING_SHIFT (10) #define SF_DESCENDING_SHIFT (10)
#define SF_TUP_SCAN_SHIFT (11) #define SF_TUP_SCAN_SHIFT (11)
#define SF_LCP_SCAN_SHIFT (3)
#define SF_ATTR_LEN_SHIFT (16) #define SF_ATTR_LEN_SHIFT (16)
#define SF_ATTR_LEN_MASK (65535) #define SF_ATTR_LEN_MASK (65535)
...@@ -359,6 +363,19 @@ ScanFragReq::setNoDiskFlag(UintR & requestInfo, UintR val){ ...@@ -359,6 +363,19 @@ ScanFragReq::setNoDiskFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << SF_NO_DISK_SHIFT); requestInfo |= (val << SF_NO_DISK_SHIFT);
} }
inline
Uint32
ScanFragReq::getLcpScanFlag(const Uint32 & requestInfo){
return (requestInfo >> SF_LCP_SCAN_SHIFT) & 1;
}
inline
void
ScanFragReq::setLcpScanFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "ScanFragReq::setLcpScanFlag");
requestInfo |= (val << SF_LCP_SCAN_SHIFT);
}
inline inline
Uint32 Uint32
KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){ KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){
......
...@@ -2985,22 +2985,25 @@ Backup::parseTableDescription(Signal* signal, ...@@ -2985,22 +2985,25 @@ Backup::parseTableDescription(Signal* signal,
if (disk) if (disk)
{ {
/** /**
* Remove all disk attributes, but add DISK_REF (8 bytes) * Remove all disk attributes
*/ */
tabPtr.p->noOfAttributes -= (disk - 1); tabPtr.p->noOfAttributes -= disk;
AttributePtr attrPtr; {
ndbrequire(tabPtr.p->attributes.seize(attrPtr)); AttributePtr attrPtr;
ndbrequire(tabPtr.p->attributes.seize(attrPtr));
Uint32 sz32 = 2;
attrPtr.p->data.attrId = AttributeHeader::DISK_REF; Uint32 sz32 = 2;
attrPtr.p->data.m_flags = Attribute::COL_FIXED; attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
attrPtr.p->data.sz32 = 2; attrPtr.p->data.m_flags = Attribute::COL_FIXED;
attrPtr.p->data.sz32 = 2;
attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
tabPtr.p->sz_FixedAttributes += sz32; attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
tabPtr.p->sz_FixedAttributes += sz32;
tabPtr.p->noOfAttributes ++;
}
} }
{ {
AttributePtr attrPtr; AttributePtr attrPtr;
ndbrequire(tabPtr.p->attributes.seize(attrPtr)); ndbrequire(tabPtr.p->attributes.seize(attrPtr));
...@@ -3309,6 +3312,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal) ...@@ -3309,6 +3312,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
ScanFragReq::setScanPrio(req->requestInfo, 1); ScanFragReq::setScanPrio(req->requestInfo, 1);
ScanFragReq::setTupScanFlag(req->requestInfo, 1); ScanFragReq::setTupScanFlag(req->requestInfo, 1);
ScanFragReq::setNoDiskFlag(req->requestInfo, 1); ScanFragReq::setNoDiskFlag(req->requestInfo, 1);
ScanFragReq::setLcpScanFlag(req->requestInfo, 1);
} }
req->transId1 = 0; req->transId1 = 0;
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8); req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
......
...@@ -572,6 +572,7 @@ public: ...@@ -572,6 +572,7 @@ public:
Uint8 rangeScan; Uint8 rangeScan;
Uint8 descending; Uint8 descending;
Uint8 tupScan; Uint8 tupScan;
Uint8 lcpScan;
Uint8 scanTcWaiting; Uint8 scanTcWaiting;
Uint8 scanKeyinfoFlag; Uint8 scanKeyinfoFlag;
Uint8 m_last_row; Uint8 m_last_row;
......
...@@ -8362,6 +8362,8 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal) ...@@ -8362,6 +8362,8 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal)
AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending); AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending);
AccScanReq::setNoDiskScanFlag(req->requestInfo, AccScanReq::setNoDiskScanFlag(req->requestInfo,
!tcConnectptr.p->m_disk_table); !tcConnectptr.p->m_disk_table);
AccScanReq::setLcpScanFlag(req->requestInfo, scanptr.p->lcpScan);
req->transId1 = tcConnectptr.p->transid[0]; req->transId1 = tcConnectptr.p->transid[0];
req->transId2 = tcConnectptr.p->transid[1]; req->transId2 = tcConnectptr.p->transid[1];
req->savePointId = tcConnectptr.p->savePointId; req->savePointId = tcConnectptr.p->savePointId;
...@@ -9453,6 +9455,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -9453,6 +9455,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->rangeScan = rangeScan; scanptr.p->rangeScan = rangeScan;
scanptr.p->descending = descending; scanptr.p->descending = descending;
scanptr.p->tupScan = tupScan; scanptr.p->tupScan = tupScan;
scanptr.p->lcpScan = ScanFragReq::getLcpScanFlag(reqinfo);
scanptr.p->scanState = ScanRecord::SCAN_FREE; scanptr.p->scanState = ScanRecord::SCAN_FREE;
scanptr.p->scanFlag = ZFALSE; scanptr.p->scanFlag = ZFALSE;
scanptr.p->m_row_id.setNull(); scanptr.p->m_row_id.setNull();
......
...@@ -53,7 +53,10 @@ Dbtup::execACC_SCANREQ(Signal* signal) ...@@ -53,7 +53,10 @@ Dbtup::execACC_SCANREQ(Signal* signal)
Fragrecord& frag = *fragPtr.p; Fragrecord& frag = *fragPtr.p;
// flags // flags
Uint32 bits = 0; Uint32 bits = 0;
if (frag.m_lcp_scan_op == RNIL) {
if (!AccScanReq::getLcpScanFlag(req->requestInfo) ||
tablePtr.p->m_no_of_disk_attributes == 0)
{
// seize from pool and link to per-fragment list // seize from pool and link to per-fragment list
LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList); LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
if (! list.seize(scanPtr)) { if (! list.seize(scanPtr)) {
...@@ -63,23 +66,25 @@ Dbtup::execACC_SCANREQ(Signal* signal) ...@@ -63,23 +66,25 @@ Dbtup::execACC_SCANREQ(Signal* signal)
if (!AccScanReq::getNoDiskScanFlag(req->requestInfo) if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
&& tablePtr.p->m_no_of_disk_attributes) && tablePtr.p->m_no_of_disk_attributes)
{ {
bits |= ScanOp::SCAN_DD; bits |= ScanOp::SCAN_DD;
} }
bool mm = (bits & ScanOp::SCAN_DD); bool mm = (bits & ScanOp::SCAN_DD);
if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) { if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
bits |= ScanOp::SCAN_VS; bits |= ScanOp::SCAN_VS;
// disk pages have fixed page format // disk pages have fixed page format
ndbrequire(! (bits & ScanOp::SCAN_DD)); ndbrequire(! (bits & ScanOp::SCAN_DD));
} }
if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) { if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
if (AccScanReq::getLockMode(req->requestInfo) == 0) if (AccScanReq::getLockMode(req->requestInfo) == 0)
bits |= ScanOp::SCAN_LOCK_SH; bits |= ScanOp::SCAN_LOCK_SH;
else else
bits |= ScanOp::SCAN_LOCK_EX; bits |= ScanOp::SCAN_LOCK_EX;
} }
} else { } else {
jam(); jam();
// LCP scan and disk
ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op); ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op);
c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op); c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
bits |= ScanOp::SCAN_LCP; bits |= ScanOp::SCAN_LCP;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment