Commit 35c2846c authored by unknown's avatar unknown

ndb - wl#2972 (5.1, related) detached trigger fixes for multiops


storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  detached trigger fixes for multiops
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp:
  detached trigger fixes for multiops
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  detached trigger fixes for multiops
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp:
  detached trigger fixes for multiops
storage/ndb/test/ndbapi/test_event_merge.cpp:
  detached trigger fixes for multiops
parent f59bbb4f
...@@ -34,6 +34,15 @@ ...@@ -34,6 +34,15 @@
#include <../pgman.hpp> #include <../pgman.hpp>
#include <../tsman.hpp> #include <../tsman.hpp>
#ifdef VM_TRACE
inline const char* dbgmask(const Bitmask<MAXNROFATTRIBUTESINWORDS>& bm) {
static int i=0; static char buf[5][200];
bm.getText(buf[i%5]); return buf[i++%5]; }
inline const char* dbgmask(const Uint32 bm[2]) {
static int i=0; static char buf[5][200];
sprintf(buf[i%5],"%08x%08x",bm[1],bm[0]); return buf[i++%5]; }
#endif
#define ZWORDS_ON_PAGE 8192 /* NUMBER OF WORDS ON A PAGE. */ #define ZWORDS_ON_PAGE 8192 /* NUMBER OF WORDS ON A PAGE. */
#define ZATTRBUF_SIZE 32 /* SIZE OF ATTRIBUTE RECORD BUFFER */ #define ZATTRBUF_SIZE 32 /* SIZE OF ATTRIBUTE RECORD BUFFER */
#define ZMIN_PAGE_LIMIT_TUPKEYREQ 5 #define ZMIN_PAGE_LIMIT_TUPKEYREQ 5
...@@ -693,10 +702,7 @@ struct Operationrec { ...@@ -693,10 +702,7 @@ struct Operationrec {
/* /*
* We use 64 bits to save change mask for the most common cases. * We use 64 bits to save change mask for the most common cases.
*/ */
union { Uint32 saved_change_mask[2];
Uint32 saved_change_mask[2];
Uint64 m_mask;
};
/* /*
* State variables on connection. * State variables on connection.
...@@ -1291,6 +1297,7 @@ struct KeyReqStruct { ...@@ -1291,6 +1297,7 @@ struct KeyReqStruct {
Uint32 trans_id1; Uint32 trans_id1;
Uint32 trans_id2; Uint32 trans_id2;
Uint32 TC_index; Uint32 TC_index;
// next 2 apply only to attrids >= 64 (zero otherwise)
Uint32 max_attr_id_updated; Uint32 max_attr_id_updated;
Uint32 no_changed_attrs; Uint32 no_changed_attrs;
BlockReference TC_ref; BlockReference TC_ref;
...@@ -2749,11 +2756,12 @@ void ...@@ -2749,11 +2756,12 @@ void
Dbtup::update_change_mask_info(KeyReqStruct * req_struct, Dbtup::update_change_mask_info(KeyReqStruct * req_struct,
Operationrec * regOperPtr) Operationrec * regOperPtr)
{ {
//Save change mask
if (req_struct->max_attr_id_updated == 0) { if (req_struct->max_attr_id_updated == 0) {
set_change_mask_state(regOperPtr, USE_SAVED_CHANGE_MASK); if (get_change_mask_state(regOperPtr) == USE_SAVED_CHANGE_MASK) {
memcpy(regOperPtr->saved_change_mask, &req_struct->changeMask, // add new changes
sizeof(regOperPtr->saved_change_mask)); regOperPtr->saved_change_mask[0] |= req_struct->changeMask.getWord(0);
regOperPtr->saved_change_mask[1] |= req_struct->changeMask.getWord(1);
}
} else { } else {
if (req_struct->no_changed_attrs < 16) { if (req_struct->no_changed_attrs < 16) {
set_change_mask_state(regOperPtr, RECALCULATE_CHANGE_MASK); set_change_mask_state(regOperPtr, RECALCULATE_CHANGE_MASK);
......
...@@ -584,22 +584,22 @@ void ...@@ -584,22 +584,22 @@ void
Dbtup::set_change_mask_info(KeyReqStruct * const req_struct, Dbtup::set_change_mask_info(KeyReqStruct * const req_struct,
Operationrec * const regOperPtr) Operationrec * const regOperPtr)
{ {
ChangeMaskState change_mask= get_change_mask_state(regOperPtr); ChangeMaskState state = get_change_mask_state(regOperPtr);
if (change_mask == USE_SAVED_CHANGE_MASK) { if (state == USE_SAVED_CHANGE_MASK) {
ljam(); ljam();
req_struct->changeMask.setWord(0, regOperPtr->saved_change_mask[0]); req_struct->changeMask.setWord(0, regOperPtr->saved_change_mask[0]);
req_struct->changeMask.setWord(1, regOperPtr->saved_change_mask[1]); req_struct->changeMask.setWord(1, regOperPtr->saved_change_mask[1]);
//get saved state } else if (state == RECALCULATE_CHANGE_MASK) {
} else if (change_mask == RECALCULATE_CHANGE_MASK) {
ljam(); ljam();
//Recompute change mask, for now set all bits // Recompute change mask, for now set all bits
req_struct->changeMask.set(); req_struct->changeMask.set();
} else if (change_mask == SET_ALL_MASK) { } else if (state == SET_ALL_MASK) {
ljam(); ljam();
req_struct->changeMask.set(); req_struct->changeMask.set();
} else { } else {
ljam(); ljam();
ndbrequire(change_mask == DELETE_CHANGES); ndbrequire(state == DELETE_CHANGES);
req_struct->changeMask.set();
} }
} }
......
...@@ -293,6 +293,9 @@ Dbtup::insertActiveOpList(OperationrecPtr regOperPtr, ...@@ -293,6 +293,9 @@ Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
regOperPtr.p->m_undo_buffer_space= 0; regOperPtr.p->m_undo_buffer_space= 0;
req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i; req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i;
if (prevOpPtr.i == RNIL) { if (prevOpPtr.i == RNIL) {
set_change_mask_state(regOperPtr.p, USE_SAVED_CHANGE_MASK);
regOperPtr.p->saved_change_mask[0] = 0;
regOperPtr.p->saved_change_mask[1] = 0;
return true; return true;
} else { } else {
req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i); req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i);
...@@ -303,7 +306,10 @@ Dbtup::insertActiveOpList(OperationrecPtr regOperPtr, ...@@ -303,7 +306,10 @@ Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
regOperPtr.p->op_struct.m_load_diskpage_on_commit= regOperPtr.p->op_struct.m_load_diskpage_on_commit=
prevOpPtr.p->op_struct.m_load_diskpage_on_commit; prevOpPtr.p->op_struct.m_load_diskpage_on_commit;
regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space; regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
regOperPtr.p->m_mask = prevOpPtr.p->m_mask; // start with prev mask (matters only for UPD o UPD)
set_change_mask_state(regOperPtr.p, get_change_mask_state(prevOpPtr.p));
regOperPtr.p->saved_change_mask[0] = prevOpPtr.p->saved_change_mask[0];
regOperPtr.p->saved_change_mask[1] = prevOpPtr.p->saved_change_mask[1];
prevOpPtr.p->op_struct.m_wait_log_buffer= 0; prevOpPtr.p->op_struct.m_wait_log_buffer= 0;
prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0; prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0;
......
...@@ -447,6 +447,37 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct, ...@@ -447,6 +447,37 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
Operationrec* const regOperPtr, Operationrec* const regOperPtr,
Tablerec* const regTablePtr) Tablerec* const regTablePtr)
{ {
Uint32 save_type = regOperPtr->op_struct.op_type;
Tuple_header *save_ptr = req_struct->m_tuple_ptr;
switch (save_type) {
case ZUPDATE:
case ZINSERT:
req_struct->m_tuple_ptr = (Tuple_header*)
c_undo_buffer.get_ptr(&regOperPtr->m_copy_tuple_location);
break;
}
/**
* Set correct operation type and fix change mask
* Note ALLOC is set in "orig" tuple
*/
if (save_ptr->m_header_bits & Tuple_header::ALLOC) {
if (save_type == ZDELETE) {
// insert + delete = nothing
ljam();
return;
goto end;
}
regOperPtr->op_struct.op_type = ZINSERT;
}
else if (save_type == ZINSERT) {
/**
* Tuple was not created but last op is INSERT.
* This is possible only on DELETE + INSERT
*/
regOperPtr->op_struct.op_type = ZUPDATE;
}
switch(regOperPtr->op_struct.op_type) { switch(regOperPtr->op_struct.op_type) {
case(ZINSERT): case(ZINSERT):
...@@ -454,7 +485,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct, ...@@ -454,7 +485,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
if (regTablePtr->subscriptionInsertTriggers.isEmpty()) { if (regTablePtr->subscriptionInsertTriggers.isEmpty()) {
// Table has no active triggers monitoring inserts at commit // Table has no active triggers monitoring inserts at commit
ljam(); ljam();
return; goto end;
} }
// If any fired immediate insert trigger then fetch after tuple // If any fired immediate insert trigger then fetch after tuple
...@@ -467,7 +498,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct, ...@@ -467,7 +498,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
if (regTablePtr->subscriptionDeleteTriggers.isEmpty()) { if (regTablePtr->subscriptionDeleteTriggers.isEmpty()) {
// Table has no active triggers monitoring deletes at commit // Table has no active triggers monitoring deletes at commit
ljam(); ljam();
return; goto end;
} }
// Execute any after delete triggers by sending // Execute any after delete triggers by sending
...@@ -481,7 +512,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct, ...@@ -481,7 +512,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
if (regTablePtr->subscriptionUpdateTriggers.isEmpty()) { if (regTablePtr->subscriptionUpdateTriggers.isEmpty()) {
// Table has no active triggers monitoring updates at commit // Table has no active triggers monitoring updates at commit
ljam(); ljam();
return; goto end;
} }
// If any fired immediate update trigger then fetch after tuple // If any fired immediate update trigger then fetch after tuple
...@@ -494,6 +525,10 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct, ...@@ -494,6 +525,10 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
ndbrequire(false); ndbrequire(false);
break; break;
} }
end:
regOperPtr->op_struct.op_type = save_type;
req_struct->m_tuple_ptr = save_ptr;
} }
void void
...@@ -546,40 +581,6 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct, ...@@ -546,40 +581,6 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,
{ {
TriggerPtr trigPtr; TriggerPtr trigPtr;
Uint32 save= regOperPtr->op_struct.op_type;
Tuple_header *save_ptr = req_struct->m_tuple_ptr;
switch(save){
case ZUPDATE:
case ZINSERT:
req_struct->m_tuple_ptr = (Tuple_header*)
c_undo_buffer.get_ptr(&regOperPtr->m_copy_tuple_location);
break;
}
/**
* Set correct operation type and fix change mask
* Note ALLOC is set in "orig" tuple
*/
if(save_ptr->m_header_bits & Tuple_header::ALLOC)
{
if(save == ZDELETE)
{
// insert + delete = nothing
ljam();
return;
goto end;
}
regOperPtr->op_struct.op_type = ZINSERT;
}
else if (save == ZINSERT)
/**
* Tuple was not created but last op is INSERT.
* This is possible only on DELETE + INSERT
*/
{
regOperPtr->op_struct.op_type = ZUPDATE;
}
/** /**
* Set disk page * Set disk page
...@@ -601,10 +602,6 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct, ...@@ -601,10 +602,6 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,
} }
triggerList.next(trigPtr); triggerList.next(trigPtr);
} }
end:
regOperPtr->op_struct.op_type = save;
req_struct->m_tuple_ptr = save_ptr;
} }
void Dbtup::executeTriggers(KeyReqStruct *req_struct, void Dbtup::executeTriggers(KeyReqStruct *req_struct,
...@@ -627,7 +624,6 @@ void Dbtup::executeTrigger(KeyReqStruct *req_struct, ...@@ -627,7 +624,6 @@ void Dbtup::executeTrigger(KeyReqStruct *req_struct,
TupTriggerData* const trigPtr, TupTriggerData* const trigPtr,
Operationrec* const regOperPtr) Operationrec* const regOperPtr)
{ {
/** /**
* The block below does not work together with GREP. * The block below does not work together with GREP.
* I have 2 db nodes (2 replicas) -> one node group. * I have 2 db nodes (2 replicas) -> one node group.
...@@ -833,7 +829,6 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, ...@@ -833,7 +829,6 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
false); false);
ndbrequire(ret != -1); ndbrequire(ret != -1);
noPrimKey= ret; noPrimKey= ret;
Uint32 numAttrsToRead; Uint32 numAttrsToRead;
if ((regOperPtr->op_struct.op_type == ZUPDATE) && if ((regOperPtr->op_struct.op_type == ZUPDATE) &&
(trigPtr->sendOnlyChangedAttributes)) { (trigPtr->sendOnlyChangedAttributes)) {
......
...@@ -54,6 +54,8 @@ ...@@ -54,6 +54,8 @@
* Option --no-multiops allows 1 operation per commit. This avoids TUP * Option --no-multiops allows 1 operation per commit. This avoids TUP
* and blob multi-operation bugs. * and blob multi-operation bugs.
* *
* There are other -no-* options, each added to isolate a specific bug.
*
* There are 5 ways (ignoring NUL operand) to compose 2 ops: * There are 5 ways (ignoring NUL operand) to compose 2 ops:
* 5.0 bugs 5.1 bugs * 5.0 bugs 5.1 bugs
* INS o DEL = NUL * INS o DEL = NUL
...@@ -70,7 +72,9 @@ struct Opts { ...@@ -70,7 +72,9 @@ struct Opts {
uint maxops; uint maxops;
uint maxpk; uint maxpk;
my_bool no_blobs; my_bool no_blobs;
my_bool no_implicit_nulls;
my_bool no_multiops; my_bool no_multiops;
my_bool no_nulls;
my_bool one_blob; my_bool one_blob;
const char* opstring; const char* opstring;
uint seed; uint seed;
...@@ -1033,13 +1037,13 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t) ...@@ -1033,13 +1037,13 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
} else if (i == getcol("seq").no) { } else if (i == getcol("seq").no) {
d.seq = g_seq++; d.seq = g_seq++;
d.ind[i] = 0; d.ind[i] = 0;
} else if (t == Op::INS && c.nullable && urandom(10, 100)) { } else if (t == Op::INS && ! g_opts.no_implicit_nulls && c.nullable && urandom(10, 100)) {
d.noop |= (1 << i); d.noop |= (1 << i);
d.ind[i] = 1; // implicit NULL value is known d.ind[i] = 1; // implicit NULL value is known
} else if (t == Op::UPD && urandom(10, 100)) { } else if (t == Op::UPD && urandom(10, 100)) {
d.noop |= (1 << i); d.noop |= (1 << i);
d.ind[i] = -1; // fixed up in caller d.ind[i] = -1; // fixed up in caller
} else if (c.nullable && urandom(10, 100)) { } else if (! g_opts.no_nulls && c.nullable && urandom(10, 100)) {
d.ind[i] = 1; d.ind[i] = 1;
} else { } else {
switch (c.type) { switch (c.type) {
...@@ -1695,23 +1699,29 @@ my_long_options[] = ...@@ -1695,23 +1699,29 @@ my_long_options[] =
{ "no-blobs", 1006, "Omit blob attributes (5.0: true)", { "no-blobs", 1006, "Omit blob attributes (5.0: true)",
(gptr*)&g_opts.no_blobs, (gptr*)&g_opts.no_blobs, 0, (gptr*)&g_opts.no_blobs, (gptr*)&g_opts.no_blobs, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "no-multiops", 1007, "Allow only 1 operation per commit", { "no-implicit-nulls", 1007, "Insert must include NULL values explicitly",
(gptr*)&g_opts.no_implicit_nulls, (gptr*)&g_opts.no_implicit_nulls, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "no-multiops", 1008, "Allow only 1 operation per commit",
(gptr*)&g_opts.no_multiops, (gptr*)&g_opts.no_multiops, 0, (gptr*)&g_opts.no_multiops, (gptr*)&g_opts.no_multiops, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "one-blob", 1008, "Only one blob attribute (defautt 2)", { "no-nulls", 1009, "Create no NULL values",
(gptr*)&g_opts.no_nulls, (gptr*)&g_opts.no_nulls, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "one-blob", 1010, "Only one blob attribute (defautt 2)",
(gptr*)&g_opts.one_blob, (gptr*)&g_opts.one_blob, 0, (gptr*)&g_opts.one_blob, (gptr*)&g_opts.one_blob, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "opstring", 1009, "Operations to run e.g. idiucdc (c is commit) or" { "opstring", 1011, "Operations to run e.g. idiucdc (c is commit) or"
" iuuc:uudc (the : separates loops)", " iuuc:uudc (the : separates loops)",
(gptr*)&g_opts.opstring, (gptr*)&g_opts.opstring, 0, (gptr*)&g_opts.opstring, (gptr*)&g_opts.opstring, 0,
GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "seed", 1010, "Random seed (0=loop number, default -1=random)", { "seed", 1012, "Random seed (0=loop number, default -1=random)",
(gptr*)&g_opts.seed, (gptr*)&g_opts.seed, 0, (gptr*)&g_opts.seed, (gptr*)&g_opts.seed, 0,
GET_INT, REQUIRED_ARG, -1, 0, 0, 0, 0, 0 }, GET_INT, REQUIRED_ARG, -1, 0, 0, 0, 0, 0 },
{ "separate-events", 1011, "Do not combine events per GCI (5.0: true)", { "separate-events", 1013, "Do not combine events per GCI (5.0: true)",
(gptr*)&g_opts.separate_events, (gptr*)&g_opts.separate_events, 0, (gptr*)&g_opts.separate_events, (gptr*)&g_opts.separate_events, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "use-table", 1012, "Use existing table 'tem1'", { "use-table", 1014, "Use existing table 'tem1'",
(gptr*)&g_opts.use_table, (gptr*)&g_opts.use_table, 0, (gptr*)&g_opts.use_table, (gptr*)&g_opts.use_table, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ 0, 0, 0, { 0, 0, 0,
...@@ -1759,6 +1769,9 @@ checkopts() ...@@ -1759,6 +1769,9 @@ checkopts()
return -1; return -1;
} }
} }
if (g_opts.no_nulls) {
g_opts.no_implicit_nulls = true;
}
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment