Commit 044c51ab authored by jonas@eel.(none)'s avatar jonas@eel.(none)

ndb dd - fix bug with deletes during LCP, that link between mm and dd could get lost

parent bcca33e9
......@@ -604,6 +604,7 @@ struct Fragrecord {
SLList<Page>::Head m_empty_pages; // Empty pages not in logical/physical map
Uint32 m_lcp_scan_op;
Uint32 m_lcp_keep_list;
State fragStatus;
Uint32 fragTableId;
......@@ -1194,8 +1195,10 @@ typedef Ptr<HostBuffer> HostBufferPtr;
STATIC_CONST( ALLOC = 0x00100000 ); // Is record allocated now
STATIC_CONST( MM_SHRINK = 0x00200000 ); // Has MM part shrunk
STATIC_CONST( MM_GROWN = 0x00400000 ); // Has MM part grown
STATIC_CONST( FREE = 0x00800000 ); // On free list of page
STATIC_CONST( FREED = 0x00800000 ); // Is freed
STATIC_CONST( LCP_SKIP = 0x01000000 ); // Should not be returned in LCP
STATIC_CONST( LCP_KEEP = 0x02000000 ); // Should be returned in LCP
STATIC_CONST( FREE = 0x02800000 ); // Is free
Uint32 get_tuple_version() const {
return m_header_bits & TUP_VERSION_MASK;
......
......@@ -172,7 +172,7 @@ void Dbtup::execTUP_ABORTREQ(Signal* signal)
*/
ndbout_c("clearing ALLOC");
tuple_ptr->m_header_bits &= ~(Uint32)Tuple_header::ALLOC;
tuple_ptr->m_header_bits |= Tuple_header::FREE;
tuple_ptr->m_header_bits |= Tuple_header::FREED;
}
}
......
......@@ -51,6 +51,15 @@ void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
PagePtr pagePtr;
Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
ndbassert(ptr->m_header_bits & Tuple_header::FREE);
if (ptr->m_header_bits & Tuple_header::LCP_KEEP)
{
ndbassert(ptr.p->m_header_bits & Tuple_header::FREED);
ptr->m_header_bits |= Tuple_header::FREED;
return;
}
if (regTabPtr.p->m_attributes[MM].m_no_of_varsize)
{
......@@ -140,6 +149,15 @@ void Dbtup::initOpConnection(Operationrec* regOperPtr)
regOperPtr->m_undo_buffer_space= 0;
}
static
inline
bool
operator>=(const Local_key& key1, const Local_key& key2)
{
return key1.m_page_no > key2.m_page_no ||
(key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
}
void
Dbtup::dealloc_tuple(Signal* signal,
Uint32 gci,
......@@ -149,15 +167,36 @@ Dbtup::dealloc_tuple(Signal* signal,
Fragrecord* regFragPtr,
Tablerec* regTabPtr)
{
ptr->m_header_bits |= Tuple_header::FREE;
if (ptr->m_header_bits & Tuple_header::DISK_PART)
Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
Uint32 lcp_keep_list = regFragPtr->m_lcp_keep_list;
Uint32 bits = ptr->m_header_bits;
Uint32 extra_bits = Tuple_header::FREED;
if (bits & Tuple_header::DISK_PART)
{
Local_key disk;
memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
disk_page_free(signal, regTabPtr, regFragPtr,
&disk, *(PagePtr*)&m_pgman.m_ptr, gci);
}
if (! (bits & Tuple_header::LCP_SKIP) && lcpScan_ptr_i != RNIL)
{
ScanOpPtr scanOp;
c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
Local_key rowid = regOperPtr->m_tuple_location;
Local_key scanpos = scanOp.p->m_scanPos.m_key;
rowid.m_page_no = page->frag_page_id;
if (rowid >= scanpos)
{
extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
ptr->m_operation_ptr_i = lcp_keep_list;
regFragPtr->m_lcp_keep_list = rowid.ref();
}
}
ptr->m_header_bits = bits | extra_bits;
if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
{
jam();
......@@ -165,15 +204,6 @@ Dbtup::dealloc_tuple(Signal* signal,
}
}
static
inline
bool
operator>=(const Local_key& key1, const Local_key& key2)
{
return key1.m_page_no > key2.m_page_no ||
(key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
}
void
Dbtup::commit_operation(Signal* signal,
Uint32 gci,
......
......@@ -138,8 +138,9 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regFragPtr.p->fragmentId= fragId;
regFragPtr.p->m_tablespace_id= tablespace;
regFragPtr.p->m_undo_complete= false;
regFragPtr.p->m_lcp_scan_op= RNIL;
regFragPtr.p->m_lcp_scan_op = RNIL;
regFragPtr.p->m_lcp_keep_list = RNIL;
Uint32 noAllocatedPages= allocFragPages(regFragPtr.p, pages);
if (noAllocatedPages == 0) {
......
......@@ -587,8 +587,14 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
Uint32 foundGCI;
bool mm = (bits & ScanOp::SCAN_DD);
bool lcp = (bits & ScanOp::SCAN_LCP);
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
Uint32 size = table.m_offsets[mm].m_fix_header_size +
(bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0);
if (lcp && lcp_list != RNIL)
goto found_lcp_keep;
while (true) {
switch (pos.m_get) {
case ScanPos::Get_next_page:
......@@ -864,6 +870,53 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
signal->theData[1] = scanPtr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
return false;
found_lcp_keep:
Local_key tmp;
tmp.assref(lcp_list);
tmp.m_page_no = getRealpid(fragPtr.p, tmp.m_page_no);
Ptr<Page> pagePtr;
c_page_pool.getPtr(pagePtr, tmp.m_page_no);
Tuple_header* ptr = (Tuple_header*)
((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
Uint32 headerbits = ptr->m_header_bits;
ndbrequire(headerbits & Tuple_header::LCP_KEEP);
Uint32 next = ptr->m_operation_ptr_i;
ptr->m_operation_ptr_i = RNIL;
ptr->m_header_bits = headerbits & ~(Uint32)Tuple_header::FREE;
if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
jam();
setChecksum(ptr, tablePtr.p);
}
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr;
conf->accOperationPtr = RNIL + 1;
conf->fragId = frag.fragmentId;
conf->localKey[0] = lcp_list;
conf->localKey[1] = 0;
conf->localKeyLength = 1;
conf->gci = 0;
Uint32 blockNo = refToBlock(scan.m_userRef);
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
fragPtr.p->m_lcp_keep_list = next;
ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
if (headerbits & Tuple_header::FREED)
{
if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
{
jam();
free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
} else {
jam();
free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
}
}
return false;
}
void
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment