ndb - Fix problematic handling of TUP_ALLOC/DEALLOC REQ wrt varsize

    same row -
      prepare delete, prepare insert (with different size), commit delete, commit insert would lead to assertion
        as prepare insert didnt call handle_size_change_after_update
parent 88fbfc48
...@@ -2103,6 +2103,7 @@ private: ...@@ -2103,6 +2103,7 @@ private:
} }
void prepare_initial_insert(KeyReqStruct*, Operationrec*, Tablerec*); void prepare_initial_insert(KeyReqStruct*, Operationrec*, Tablerec*);
void fix_disk_insert_no_mem_insert(KeyReqStruct*, Operationrec*, Tablerec*);
void setup_fixed_part(KeyReqStruct* req_struct, void setup_fixed_part(KeyReqStruct* req_struct,
Operationrec* const regOperPtr, Operationrec* const regOperPtr,
Tablerec* const regTabPtr); Tablerec* const regTabPtr);
......
...@@ -1298,12 +1298,56 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct, ...@@ -1298,12 +1298,56 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0; disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
} }
void
Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
Tablerec* regTabPtr)
{
regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
if(cnt1)
{
// Disk part is 32-bit aligned
char *varptr = req_struct->m_var_data[MM].m_data_ptr;
ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
}
else
{
ptr -= Tuple_header::HeaderSize;
}
req_struct->m_disk_ptr= (Tuple_header*)ptr;
if(cnt2)
{
KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
dst->m_var_len_offset= cnt2;
dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
}
// Set all null bits
memset(req_struct->m_disk_ptr->m_null_bits+
regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
4*regTabPtr->m_offsets[DD].m_null_words);
req_struct->m_tuple_ptr->m_header_bits =
(Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
}
int Dbtup::handleInsertReq(Signal* signal, int Dbtup::handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr, Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord> fragPtr, Ptr<Fragrecord> fragPtr,
Tablerec* const regTabPtr, Tablerec* const regTabPtr,
KeyReqStruct *req_struct) KeyReqStruct *req_struct)
{ {
Uint32 tup_version = 1;
Fragrecord* regFragPtr = fragPtr.p; Fragrecord* regFragPtr = fragPtr.p;
Uint32 *dst, *ptr= 0; Uint32 *dst, *ptr= 0;
Tuple_header *base= req_struct->m_tuple_ptr, *org= base; Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
...@@ -1320,29 +1364,24 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1320,29 +1364,24 @@ int Dbtup::handleInsertReq(Signal* signal,
ndbout << "dst: " << hex << UintPtr(dst) << " - " ndbout << "dst: " << hex << UintPtr(dst) << " - "
<< regOperPtr.p->m_copy_tuple_location << endl; << regOperPtr.p->m_copy_tuple_location << endl;
bool disk = regTabPtr->m_no_of_disk_attributes > 0;
bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT;
bool disk_insert = regOperPtr.p->is_first_operation() && disk;
Uint32 tup_version;
union { union {
Uint32 sizes[4]; Uint32 sizes[4];
Uint64 cmp[2]; Uint64 cmp[2];
}; };
if(regOperPtr.p->is_first_operation())
if(mem_insert)
{ {
tup_version= 1; jam();
ndbassert(regOperPtr.p->is_first_operation()); // disk insert
prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr); prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
if(regTabPtr->m_no_of_disk_attributes)
{
int res;
if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
regOperPtr.p->m_undo_buffer_space)))
{
terrorCode= res;
regOperPtr.p->m_undo_buffer_space= 0;
goto log_space_error;
}
}
} }
else else
{
if (!regOperPtr.p->is_first_operation())
{ {
Operationrec* prevOp= req_struct->prevOpPtr.p; Operationrec* prevOp= req_struct->prevOpPtr.p;
ndbassert(prevOp->op_struct.op_type == ZDELETE); ndbassert(prevOp->op_struct.op_type == ZDELETE);
...@@ -1350,12 +1389,31 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1350,12 +1389,31 @@ int Dbtup::handleInsertReq(Signal* signal,
if(!prevOp->is_first_operation()) if(!prevOp->is_first_operation())
org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location); org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
}
if (regTabPtr->need_expand()) if (regTabPtr->need_expand())
expand_tuple(req_struct, sizes, org, regTabPtr, true); expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
else else
memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size); memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
} }
if (disk_insert)
{
int res;
if (unlikely(!mem_insert))
{
sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
}
if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
regOperPtr.p->m_undo_buffer_space)))
{
terrorCode= res;
regOperPtr.p->m_undo_buffer_space= 0;
goto log_space_error;
}
}
regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK; regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
tuple_ptr->set_tuple_version(tup_version); tuple_ptr->set_tuple_version(tup_version);
if(updateAttributes(req_struct, &cinBuffer[0], if(updateAttributes(req_struct, &cinBuffer[0],
...@@ -1364,7 +1422,6 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1364,7 +1422,6 @@ int Dbtup::handleInsertReq(Signal* signal,
if (checkNullAttributes(req_struct, regTabPtr) == false) if (checkNullAttributes(req_struct, regTabPtr) == false)
{ {
goto null_check_error; goto null_check_error;
} }
...@@ -1376,12 +1433,9 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1376,12 +1433,9 @@ int Dbtup::handleInsertReq(Signal* signal,
/** /**
* Alloc memory * Alloc memory
*/ */
if(regOperPtr.p->is_first_operation())
{
Uint32 frag_page_id = req_struct->frag_page_id; Uint32 frag_page_id = req_struct->frag_page_id;
Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no; Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
regOperPtr.p->m_tuple_location.m_page_no = frag_page_id; if(mem_insert)
if (likely(get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT))
{ {
if (!regTabPtr->m_attributes[MM].m_no_of_varsize) if (!regTabPtr->m_attributes[MM].m_no_of_varsize)
{ {
...@@ -1413,9 +1467,24 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1413,9 +1467,24 @@ int Dbtup::handleInsertReq(Signal* signal,
((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i; ((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i;
((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC; ((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC;
regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
}
else
{
int ret;
if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
(ret = handle_size_change_after_update(req_struct,
base,
regOperPtr.p,
regFragPtr,
regTabPtr,
sizes)))
{
return ret;
}
} }
if (regTabPtr->m_no_of_disk_attributes) if (disk_insert)
{ {
Local_key tmp; Local_key tmp;
Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ? Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
...@@ -1431,41 +1500,21 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1431,41 +1500,21 @@ int Dbtup::handleInsertReq(Signal* signal,
/** /**
* Set ref from disk to mm * Set ref from disk to mm
*/ */
Local_key ref = regOperPtr.p->m_tuple_location;
ref.m_page_no = frag_page_id;
Tuple_header* disk_ptr= req_struct->m_disk_ptr; Tuple_header* disk_ptr= req_struct->m_disk_ptr;
disk_ptr->m_header_bits = 0; disk_ptr->m_header_bits = 0;
disk_ptr->m_base_record_ref= regOperPtr.p->m_tuple_location.ref(); disk_ptr->m_base_record_ref= ref.ref();
} }
regOperPtr.p->m_tuple_location.m_page_no = real_page_id; if (regTabPtr->checksumIndicator)
tuple_ptr->m_header_bits |= Tuple_header::ALLOC;
if (regTabPtr->checksumIndicator) {
jam();
setChecksum(req_struct->m_tuple_ptr, regTabPtr);
}
return 0;
}
else
{ {
if (regTabPtr->checksumIndicator) {
jam(); jam();
setChecksum(req_struct->m_tuple_ptr, regTabPtr); setChecksum(req_struct->m_tuple_ptr, regTabPtr);
} }
if (!regTabPtr->need_shrink() || cmp[0] == cmp[1])
return 0; return 0;
return handle_size_change_after_update(req_struct,
base,
regOperPtr.p,
regFragPtr,
regTabPtr,
sizes);
}
mem_error: mem_error:
terrorCode= ZMEM_NOMEM_ERROR; terrorCode= ZMEM_NOMEM_ERROR;
goto error; goto error;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment