Bug #27512 Inconsistent tuples when using variable size and >16Gb datamemory

- increase var part reference from 32 to 64 bits on 64 bit systems
- note current patch does not allow for online upgrade from 32 to 64 bit systems or vice versa
parent a39fe6c1
...@@ -1206,9 +1206,35 @@ typedef Ptr<HostBuffer> HostBufferPtr; ...@@ -1206,9 +1206,35 @@ typedef Ptr<HostBuffer> HostBufferPtr;
*/ */
struct Var_part_ref struct Var_part_ref
{ {
#if NDB_SIZEOF_CHARP == 4
Uint32 m_ref; Uint32 m_ref;
}; STATIC_CONST( SZ32 = 1 );
void copyout(Local_key* dst) const {
dst->m_page_no = m_ref >> MAX_TUPLES_BITS;
dst->m_page_idx = m_ref & MAX_TUPLES_PER_PAGE;
}
void assign(const Local_key* src) {
m_ref = (src->m_page_no << MAX_TUPLES_BITS) | src->m_page_idx;
}
#else
Uint32 m_page_no;
Uint32 m_page_idx;
STATIC_CONST( SZ32 = 2 );
void copyout(Local_key* dst) const {
dst->m_page_no = m_page_no;
dst->m_page_idx = m_page_idx;
}
void assign(const Local_key* src) {
m_page_no = src->m_page_no;
m_page_idx = src->m_page_idx;
}
#endif
};
struct Tuple_header struct Tuple_header
{ {
union { union {
...@@ -2847,12 +2873,13 @@ Uint32* ...@@ -2847,12 +2873,13 @@ Uint32*
Dbtup::get_ptr(Ptr<Page>* pagePtr, Var_part_ref ref) Dbtup::get_ptr(Ptr<Page>* pagePtr, Var_part_ref ref)
{ {
PagePtr tmp; PagePtr tmp;
Uint32 page_idx= ref.m_ref & MAX_TUPLES_PER_PAGE; Local_key key;
tmp.i= ref.m_ref >> MAX_TUPLES_BITS; ref.copyout(&key);
tmp.i = key.m_page_no;
c_page_pool.getPtr(tmp); c_page_pool.getPtr(tmp);
memcpy(pagePtr, &tmp, sizeof(tmp)); memcpy(pagePtr, &tmp, sizeof(tmp));
return ((Var_page*)tmp.p)->get_ptr(page_idx); return ((Var_page*)tmp.p)->get_ptr(key.m_page_idx);
} }
inline inline
......
...@@ -153,12 +153,14 @@ void Dbtup::do_tup_abortreq(Signal* signal, Uint32 flags) ...@@ -153,12 +153,14 @@ void Dbtup::do_tup_abortreq(Signal* signal, Uint32 flags)
ndbassert(tuple_ptr->m_header_bits & Tuple_header::CHAINED_ROW); ndbassert(tuple_ptr->m_header_bits & Tuple_header::CHAINED_ROW);
Uint32 ref= * tuple_ptr->get_var_part_ptr(regTabPtr.p); Var_part_ref *ref =
(Var_part_ref*)tuple_ptr->get_var_part_ptr(regTabPtr.p);
Local_key tmp; Local_key tmp;
tmp.assref(ref); ref->copyout(&tmp);
idx= tmp.m_page_idx; idx= tmp.m_page_idx;
var_part= get_ptr(&vpage, *(Var_part_ref*)&ref); var_part= get_ptr(&vpage, *ref);
Var_page* pageP = (Var_page*)vpage.p; Var_page* pageP = (Var_page*)vpage.p;
Uint32 len= pageP->get_entry_len(idx) & ~Var_page::CHAIN; Uint32 len= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
Uint32 sz = ((((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]) + 3)>> 2); Uint32 sz = ((((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]) + 3)>> 2);
......
...@@ -236,13 +236,14 @@ Dbtup::commit_operation(Signal* signal, ...@@ -236,13 +236,14 @@ Dbtup::commit_operation(Signal* signal,
} }
else else
{ {
Uint32 *ref= tuple_ptr->get_var_part_ptr(regTabPtr); Var_part_ref *ref= (Var_part_ref*)tuple_ptr->get_var_part_ptr(regTabPtr);
memcpy(tuple_ptr, copy, 4*(Tuple_header::HeaderSize+fixsize)); memcpy(tuple_ptr, copy, 4*(Tuple_header::HeaderSize+fixsize));
Local_key tmp; tmp.assref(*ref); Local_key tmp;
ref->copyout(&tmp);
PagePtr vpagePtr; PagePtr vpagePtr;
Uint32 *dst= get_ptr(&vpagePtr, *(Var_part_ref*)ref); Uint32 *dst= get_ptr(&vpagePtr, *ref);
Var_page* vpagePtrP = (Var_page*)vpagePtr.p; Var_page* vpagePtrP = (Var_page*)vpagePtr.p;
Uint32 *src= copy->get_var_part_ptr(regTabPtr); Uint32 *src= copy->get_var_part_ptr(regTabPtr);
Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)src)[mm_vars]); Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)src)[mm_vars]);
......
...@@ -2851,11 +2851,11 @@ Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct, ...@@ -2851,11 +2851,11 @@ Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr; Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
Var_page* pageP= (Var_page*)pagePtr.p; Var_page* pageP= (Var_page*)pagePtr.p;
Uint32 idx, alloc, needed; Uint32 idx, alloc, needed;
Uint32 *refptr = org->get_var_part_ptr(regTabPtr); Var_part_ref *refptr = (Var_part_ref*)org->get_var_part_ptr(regTabPtr);
ndbassert(bits & Tuple_header::CHAINED_ROW); ndbassert(bits & Tuple_header::CHAINED_ROW);
Local_key ref; Local_key ref;
ref.assref(*refptr); refptr->copyout(&ref);
idx= ref.m_page_idx; idx= ref.m_page_idx;
if (! (copy_bits & Tuple_header::CHAINED_ROW)) if (! (copy_bits & Tuple_header::CHAINED_ROW))
{ {
...@@ -2878,7 +2878,7 @@ Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct, ...@@ -2878,7 +2878,7 @@ Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
} }
copy_bits |= Tuple_header::MM_GROWN; copy_bits |= Tuple_header::MM_GROWN;
if (unlikely(realloc_var_part(regFragPtr, regTabPtr, pagePtr, if (unlikely(realloc_var_part(regFragPtr, regTabPtr, pagePtr,
(Var_part_ref*)refptr, alloc, needed))) refptr, alloc, needed)))
return -1; return -1;
} }
req_struct->m_tuple_ptr->m_header_bits = copy_bits; req_struct->m_tuple_ptr->m_header_bits = copy_bits;
...@@ -2945,9 +2945,10 @@ Dbtup::nr_read_pk(Uint32 fragPtrI, ...@@ -2945,9 +2945,10 @@ Dbtup::nr_read_pk(Uint32 fragPtrI,
PagePtr page_ptr; PagePtr page_ptr;
if (tablePtr.p->m_attributes[MM].m_no_of_varsize) if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
{ {
tablePtr.p->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize+1; const Uint32 XXX = Tuple_header::HeaderSize+Var_part_ref::SZ32;
tablePtr.p->m_offsets[MM].m_fix_header_size += XXX;
ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no); ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
tablePtr.p->m_offsets[MM].m_fix_header_size -= Tuple_header::HeaderSize+1; tablePtr.p->m_offsets[MM].m_fix_header_size -= XXX;
} }
else else
{ {
......
...@@ -479,7 +479,7 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI) ...@@ -479,7 +479,7 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI)
const Uint32 firstTupleNo = 0; const Uint32 firstTupleNo = 0;
const Uint32 tupheadsize = tablePtr.p->m_offsets[MM].m_fix_header_size + const Uint32 tupheadsize = tablePtr.p->m_offsets[MM].m_fix_header_size +
(buildPtr.p->m_build_vs ? Tuple_header::HeaderSize + 1: 0); (buildPtr.p->m_build_vs? Tuple_header::HeaderSize + Var_part_ref::SZ32: 0);
#ifdef TIME_MEASUREMENT #ifdef TIME_MEASUREMENT
MicroSecondTimer start; MicroSecondTimer start;
......
...@@ -542,7 +542,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -542,7 +542,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
{ {
Uint32 fix_tupheader = regTabPtr.p->m_offsets[MM].m_fix_header_size; Uint32 fix_tupheader = regTabPtr.p->m_offsets[MM].m_fix_header_size;
if(regTabPtr.p->m_attributes[MM].m_no_of_varsize != 0) if(regTabPtr.p->m_attributes[MM].m_no_of_varsize != 0)
fix_tupheader += Tuple_header::HeaderSize + 1; fix_tupheader += Tuple_header::HeaderSize + Var_part_ref::SZ32;
ndbassert(fix_tupheader > 0); ndbassert(fix_tupheader > 0);
Uint32 noRowsPerPage = ZWORDS_ON_PAGE / fix_tupheader; Uint32 noRowsPerPage = ZWORDS_ON_PAGE / fix_tupheader;
Uint32 noAllocatedPages = Uint32 noAllocatedPages =
......
...@@ -597,7 +597,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -597,7 +597,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list; Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
Uint32 size = table.m_offsets[mm].m_fix_header_size + Uint32 size = table.m_offsets[mm].m_fix_header_size +
(bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0); (bits&ScanOp::SCAN_VS ? Tuple_header::HeaderSize + Var_part_ref::SZ32 : 0);
if (lcp && lcp_list != RNIL) if (lcp && lcp_list != RNIL)
goto found_lcp_keep; goto found_lcp_keep;
......
...@@ -71,9 +71,10 @@ Uint32* Dbtup::alloc_var_rec(Fragrecord* fragPtr, ...@@ -71,9 +71,10 @@ Uint32* Dbtup::alloc_var_rec(Fragrecord* fragPtr,
/** /**
* TODO alloc fix+var part * TODO alloc fix+var part
*/ */
tabPtr->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize + 1; const Uint32 XXX = Tuple_header::HeaderSize + Var_part_ref::SZ32;
tabPtr->m_offsets[MM].m_fix_header_size += XXX;
Uint32 *ptr = alloc_fix_rec(fragPtr, tabPtr, key, out_frag_page_id); Uint32 *ptr = alloc_fix_rec(fragPtr, tabPtr, key, out_frag_page_id);
tabPtr->m_offsets[MM].m_fix_header_size -= Tuple_header::HeaderSize + 1; tabPtr->m_offsets[MM].m_fix_header_size -= XXX;
if (unlikely(ptr == 0)) if (unlikely(ptr == 0))
{ {
return 0; return 0;
...@@ -90,7 +91,8 @@ Uint32* Dbtup::alloc_var_rec(Fragrecord* fragPtr, ...@@ -90,7 +91,8 @@ Uint32* Dbtup::alloc_var_rec(Fragrecord* fragPtr,
if (likely(alloc_var_part(fragPtr, tabPtr, alloc_size, &varref) != 0)) if (likely(alloc_var_part(fragPtr, tabPtr, alloc_size, &varref) != 0))
{ {
Tuple_header* tuple = (Tuple_header*)ptr; Tuple_header* tuple = (Tuple_header*)ptr;
* tuple->get_var_part_ptr(tabPtr) = varref.ref(); Var_part_ref* dst = (Var_part_ref*)tuple->get_var_part_ptr(tabPtr);
dst->assign(&varref);
return ptr; return ptr;
} }
...@@ -168,7 +170,8 @@ void Dbtup::free_var_rec(Fragrecord* fragPtr, ...@@ -168,7 +170,8 @@ void Dbtup::free_var_rec(Fragrecord* fragPtr,
Tuple_header* tuple = (Tuple_header*)ptr; Tuple_header* tuple = (Tuple_header*)ptr;
Local_key ref; Local_key ref;
ref.assref(* tuple->get_var_part_ptr(tabPtr)); Var_part_ref * varref = (Var_part_ref*)tuple->get_var_part_ptr(tabPtr);
varref->copyout(&ref);
free_fix_rec(fragPtr, tabPtr, key, (Fix_page*)pagePtr.p); free_fix_rec(fragPtr, tabPtr, key, (Fix_page*)pagePtr.p);
...@@ -194,12 +197,12 @@ void Dbtup::free_var_rec(Fragrecord* fragPtr, ...@@ -194,12 +197,12 @@ void Dbtup::free_var_rec(Fragrecord* fragPtr,
int int
Dbtup::realloc_var_part(Fragrecord* fragPtr, Tablerec* tabPtr, PagePtr pagePtr, Dbtup::realloc_var_part(Fragrecord* fragPtr, Tablerec* tabPtr, PagePtr pagePtr,
Var_part_ref* ref, Uint32 oldsz, Uint32 newsz) Var_part_ref* refptr, Uint32 oldsz, Uint32 newsz)
{ {
Uint32 add = newsz - oldsz; Uint32 add = newsz - oldsz;
Var_page* pageP = (Var_page*)pagePtr.p; Var_page* pageP = (Var_page*)pagePtr.p;
Local_key oldref; Local_key oldref;
oldref.assref(*(Uint32*)ref); refptr->copyout(&oldref);
if (pageP->free_space >= add) if (pageP->free_space >= add)
{ {
...@@ -238,7 +241,7 @@ Dbtup::realloc_var_part(Fragrecord* fragPtr, Tablerec* tabPtr, PagePtr pagePtr, ...@@ -238,7 +241,7 @@ Dbtup::realloc_var_part(Fragrecord* fragPtr, Tablerec* tabPtr, PagePtr pagePtr,
ndbassert(oldref.m_page_no != newref.m_page_no); ndbassert(oldref.m_page_no != newref.m_page_no);
ndbassert(pageP->get_entry_len(oldref.m_page_idx) == oldsz); ndbassert(pageP->get_entry_len(oldref.m_page_idx) == oldsz);
memcpy(dst, src, 4*oldsz); memcpy(dst, src, 4*oldsz);
* ((Uint32*)ref) = newref.ref(); refptr->assign(&newref);
pageP->free_record(oldref.m_page_idx, Var_page::CHAIN); pageP->free_record(oldref.m_page_idx, Var_page::CHAIN);
update_free_page_list(fragPtr, pagePtr); update_free_page_list(fragPtr, pagePtr);
...@@ -399,9 +402,10 @@ Dbtup::alloc_var_rowid(Fragrecord* fragPtr, ...@@ -399,9 +402,10 @@ Dbtup::alloc_var_rowid(Fragrecord* fragPtr,
Local_key* key, Local_key* key,
Uint32 * out_frag_page_id) Uint32 * out_frag_page_id)
{ {
tabPtr->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize + 1; const Uint32 XXX = Tuple_header::HeaderSize + Var_part_ref::SZ32;
tabPtr->m_offsets[MM].m_fix_header_size += XXX;
Uint32 *ptr = alloc_fix_rowid(fragPtr, tabPtr, key, out_frag_page_id); Uint32 *ptr = alloc_fix_rowid(fragPtr, tabPtr, key, out_frag_page_id);
tabPtr->m_offsets[MM].m_fix_header_size -= Tuple_header::HeaderSize + 1; tabPtr->m_offsets[MM].m_fix_header_size -= XXX;
if (unlikely(ptr == 0)) if (unlikely(ptr == 0))
{ {
return 0; return 0;
...@@ -417,7 +421,8 @@ Dbtup::alloc_var_rowid(Fragrecord* fragPtr, ...@@ -417,7 +421,8 @@ Dbtup::alloc_var_rowid(Fragrecord* fragPtr,
if (likely(alloc_var_part(fragPtr, tabPtr, alloc_size, &varref) != 0)) if (likely(alloc_var_part(fragPtr, tabPtr, alloc_size, &varref) != 0))
{ {
Tuple_header* tuple = (Tuple_header*)ptr; Tuple_header* tuple = (Tuple_header*)ptr;
* tuple->get_var_part_ptr(tabPtr) = varref.ref(); Var_part_ref* dst = (Var_part_ref*)tuple->get_var_part_ptr(tabPtr);
dst->assign(&varref);
return ptr; return ptr;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment