Commit 8bfc2d9c authored by unknown's avatar unknown

ndb -

  bug#19928 and bug#19929
  fix to critical bugs in tup scan
  that affected lcp,backup and opt. nr


storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp:
  1) dont let dirty read scan find uncommitted inserts
  2) force opt. nr scan to wait for locked rows
  3) when finding LCP keep record, use accOpPtr -1, so that it will not be committed towards ACC
parent be0ab479
......@@ -582,12 +582,15 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
Fragrecord& frag = *fragPtr.p;
// tuple found
Tuple_header* th = 0;
Uint32 thbits = 0;
Uint32 loop_count = 0;
Uint32 scanGCI = scanPtr.p->m_scanGCI;
Uint32 foundGCI;
bool mm = (bits & ScanOp::SCAN_DD);
bool lcp = (bits & ScanOp::SCAN_LCP);
const bool mm = (bits & ScanOp::SCAN_DD);
const bool lcp = (bits & ScanOp::SCAN_LCP);
const bool dirty = (bits & ScanOp::SCAN_LOCK) == 0;
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
Uint32 size = table.m_offsets[mm].m_fix_header_size +
(bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0);
......@@ -750,22 +753,22 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
{
pos.m_get = ScanPos::Get_next_tuple_fs;
th = (Tuple_header*)&page->m_data[key.m_page_idx];
thbits = th->m_header_bits;
if (likely(! (bits & ScanOp::SCAN_NR)))
{
if (! (th->m_header_bits & Tuple_header::FREE)) {
goto found_tuple;
}
else
jam();
if (! (thbits & Tuple_header::FREE))
{
jam();
// skip free tuple
}
if (! ((thbits & Tuple_header::ALLOC) && dirty))
goto found_tuple;
}
}
else
{
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
{
if (! (th->m_header_bits & Tuple_header::FREE))
if (! (thbits & Tuple_header::FREE))
{
jam();
goto found_tuple;
......@@ -775,9 +778,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
goto found_deleted_rowid;
}
}
else
else if (thbits != Fix_page::FREE_RECORD &&
th->m_operation_ptr_i != RNIL)
{
jam();
goto found_tuple; // Locked tuple...
// skip free tuple
}
}
......@@ -793,8 +798,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
jam();
{
// caller has already set pos.m_get to next tuple
if (! (bits & ScanOp::SCAN_LCP &&
th->m_header_bits & Tuple_header::LCP_SKIP)) {
if (! (bits & ScanOp::SCAN_LCP && thbits & Tuple_header::LCP_SKIP)) {
Local_key& key_mm = pos.m_key_mm;
if (! (bits & ScanOp::SCAN_DD)) {
key_mm = pos.m_key;
......@@ -810,7 +814,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
} else {
jam();
// clear it so that it will show up in next LCP
th->m_header_bits &= ~(Uint32)Tuple_header::LCP_SKIP;
th->m_header_bits = thbits & ~(Uint32)Tuple_header::LCP_SKIP;
if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
jam();
setChecksum(th, tablePtr.p);
}
}
}
break;
......@@ -833,7 +841,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
{
if (! (th->m_header_bits & Tuple_header::FREE))
if (! (thbits & Tuple_header::FREE))
break;
}
}
......@@ -893,7 +901,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr;
conf->accOperationPtr = RNIL + 1;
conf->accOperationPtr = (Uint32)-1;
conf->fragId = frag.fragmentId;
conf->localKey[0] = lcp_list;
conf->localKey[1] = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment