Commit ef145592 authored by unknown's avatar unknown

ndb - bug#20334

  fix bug in tup scan wrt LCP


storage/ndb/include/kernel/signaldata/AccScan.hpp:
  Add LCP flag to AccScan and ScanFrag
storage/ndb/include/kernel/signaldata/ScanFrag.hpp:
  Add LCP flag to AccScan and ScanFrag
storage/ndb/src/kernel/blocks/backup/Backup.cpp:
  Backup sets LCP scan when lcp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  Add LCP scan flag
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Add LCP scan flag
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp:
  Fix assignment of scan record wrt LCP and disk
parent 0697dac1
......@@ -67,6 +67,9 @@ private:
static Uint32 getNRScanFlag(const Uint32 & requestInfo);
static void setNRScanFlag(Uint32 & requestInfo, Uint32 nr);
static Uint32 getLcpScanFlag(const Uint32 & requestInfo);
static void setLcpScanFlag(Uint32 & requestInfo, Uint32 nr);
};
/**
......@@ -77,6 +80,7 @@ private:
* z = Descending (TUX) - 1 Bit 6
* d = No disk scan - 1 Bit 7
* n = Node recovery scan - 1 Bit 8
* c = LCP scan - 1 Bit 9
*
* 1111111111222222222233
* 01234567890123456789012345678901
......@@ -88,6 +92,7 @@ private:
#define AS_DESCENDING_SHIFT (6)
#define AS_NO_DISK_SCAN (7)
#define AS_NR_SCAN (8)
#define AS_LCP_SCAN (9)
inline
Uint32
......@@ -154,6 +159,19 @@ AccScanReq::setNRScanFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << AS_NR_SCAN);
}
inline
Uint32
AccScanReq::getLcpScanFlag(const Uint32 & requestInfo){
return (requestInfo >> AS_LCP_SCAN) & 1;
}
inline
void
AccScanReq::setLcpScanFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "AccScanReq::setNoDiskScanFlag");
requestInfo |= (val << AS_LCP_SCAN);
}
class AccScanConf {
/**
* Sender(s)
......
......@@ -61,7 +61,8 @@ public:
static Uint32 getAttrLen(const Uint32 & requestInfo);
static Uint32 getScanPrio(const Uint32 & requestInfo);
static Uint32 getNoDiskFlag(const Uint32 & requestInfo);
static Uint32 getLcpScanFlag(const Uint32 & requestInfo);
static void setLockMode(Uint32 & requestInfo, Uint32 lockMode);
static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock);
static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo);
......@@ -72,6 +73,7 @@ public:
static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen);
static void setScanPrio(Uint32& requestInfo, Uint32 prio);
static void setNoDiskFlag(Uint32& requestInfo, Uint32 val);
static void setLcpScanFlag(Uint32 & requestInfo, Uint32 val);
};
class KeyInfo20 {
......@@ -198,6 +200,7 @@ public:
* Request Info
*
* a = Length of attrinfo - 16 Bits (16-31)
* c = LCP scan - 1 Bit 3
* d = No disk - 1 Bit 4
* l = Lock Mode - 1 Bit 5
* h = Hold lock - 1 Bit 7
......@@ -205,7 +208,7 @@ public:
* r = read committed - 1 Bit 9
* x = range scan - 1 Bit 6
* z = descending - 1 Bit 10
* t = tup scan -1 Bit 11 (implies x=z=0)
* t = tup scan - 1 Bit 11 (implies x=z=0)
* p = Scan prio - 4 Bits (12-15) -> max 15
*
* 1111111111222222222233
......@@ -222,6 +225,7 @@ public:
#define SF_RANGE_SCAN_SHIFT (6)
#define SF_DESCENDING_SHIFT (10)
#define SF_TUP_SCAN_SHIFT (11)
#define SF_LCP_SCAN_SHIFT (3)
#define SF_ATTR_LEN_SHIFT (16)
#define SF_ATTR_LEN_MASK (65535)
......@@ -359,6 +363,19 @@ ScanFragReq::setNoDiskFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << SF_NO_DISK_SHIFT);
}
inline
Uint32
ScanFragReq::getLcpScanFlag(const Uint32 & requestInfo){
return (requestInfo >> SF_LCP_SCAN_SHIFT) & 1;
}
inline
void
ScanFragReq::setLcpScanFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "ScanFragReq::setLcpScanFlag");
requestInfo |= (val << SF_LCP_SCAN_SHIFT);
}
inline
Uint32
KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){
......
......@@ -2985,22 +2985,25 @@ Backup::parseTableDescription(Signal* signal,
if (disk)
{
/**
* Remove all disk attributes, but add DISK_REF (8 bytes)
* Remove all disk attributes
*/
tabPtr.p->noOfAttributes -= (disk - 1);
tabPtr.p->noOfAttributes -= disk;
AttributePtr attrPtr;
ndbrequire(tabPtr.p->attributes.seize(attrPtr));
Uint32 sz32 = 2;
attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
attrPtr.p->data.m_flags = Attribute::COL_FIXED;
attrPtr.p->data.sz32 = 2;
attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
tabPtr.p->sz_FixedAttributes += sz32;
{
AttributePtr attrPtr;
ndbrequire(tabPtr.p->attributes.seize(attrPtr));
Uint32 sz32 = 2;
attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
attrPtr.p->data.m_flags = Attribute::COL_FIXED;
attrPtr.p->data.sz32 = 2;
attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
tabPtr.p->sz_FixedAttributes += sz32;
tabPtr.p->noOfAttributes ++;
}
}
{
AttributePtr attrPtr;
ndbrequire(tabPtr.p->attributes.seize(attrPtr));
......@@ -3309,6 +3312,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
ScanFragReq::setScanPrio(req->requestInfo, 1);
ScanFragReq::setTupScanFlag(req->requestInfo, 1);
ScanFragReq::setNoDiskFlag(req->requestInfo, 1);
ScanFragReq::setLcpScanFlag(req->requestInfo, 1);
}
req->transId1 = 0;
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
......
......@@ -572,6 +572,7 @@ public:
Uint8 rangeScan;
Uint8 descending;
Uint8 tupScan;
Uint8 lcpScan;
Uint8 scanTcWaiting;
Uint8 scanKeyinfoFlag;
Uint8 m_last_row;
......
......@@ -8362,6 +8362,8 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal)
AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending);
AccScanReq::setNoDiskScanFlag(req->requestInfo,
!tcConnectptr.p->m_disk_table);
AccScanReq::setLcpScanFlag(req->requestInfo, scanptr.p->lcpScan);
req->transId1 = tcConnectptr.p->transid[0];
req->transId2 = tcConnectptr.p->transid[1];
req->savePointId = tcConnectptr.p->savePointId;
......@@ -9453,6 +9455,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->rangeScan = rangeScan;
scanptr.p->descending = descending;
scanptr.p->tupScan = tupScan;
scanptr.p->lcpScan = ScanFragReq::getLcpScanFlag(reqinfo);
scanptr.p->scanState = ScanRecord::SCAN_FREE;
scanptr.p->scanFlag = ZFALSE;
scanptr.p->m_row_id.setNull();
......
......@@ -53,7 +53,10 @@ Dbtup::execACC_SCANREQ(Signal* signal)
Fragrecord& frag = *fragPtr.p;
// flags
Uint32 bits = 0;
if (frag.m_lcp_scan_op == RNIL) {
if (!AccScanReq::getLcpScanFlag(req->requestInfo) ||
tablePtr.p->m_no_of_disk_attributes == 0)
{
// seize from pool and link to per-fragment list
LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
if (! list.seize(scanPtr)) {
......@@ -63,23 +66,25 @@ Dbtup::execACC_SCANREQ(Signal* signal)
if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
&& tablePtr.p->m_no_of_disk_attributes)
{
bits |= ScanOp::SCAN_DD;
bits |= ScanOp::SCAN_DD;
}
bool mm = (bits & ScanOp::SCAN_DD);
if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
bits |= ScanOp::SCAN_VS;
// disk pages have fixed page format
ndbrequire(! (bits & ScanOp::SCAN_DD));
// disk pages have fixed page format
ndbrequire(! (bits & ScanOp::SCAN_DD));
}
if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
if (AccScanReq::getLockMode(req->requestInfo) == 0)
bits |= ScanOp::SCAN_LOCK_SH;
else
bits |= ScanOp::SCAN_LOCK_EX;
if (AccScanReq::getLockMode(req->requestInfo) == 0)
bits |= ScanOp::SCAN_LOCK_SH;
else
bits |= ScanOp::SCAN_LOCK_EX;
}
} else {
jam();
// LCP scan and disk
ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op);
c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
bits |= ScanOp::SCAN_LCP;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment