Commit 6778b029 authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - bug#10029 fix

parent c27c54f4
...@@ -36,8 +36,8 @@ public: ...@@ -36,8 +36,8 @@ public:
}; };
enum ErrorCode { enum ErrorCode {
NoError = 0, // must be zero NoError = 0, // must be zero
SearchError = 895, // add + found or remove + not found SearchError = 901, // add + found or remove + not found
NoMemError = 827 NoMemError = 902
}; };
STATIC_CONST( SignalLength = 8 ); STATIC_CONST( SignalLength = 8 );
private: private:
......
...@@ -1777,6 +1777,10 @@ private: ...@@ -1777,6 +1777,10 @@ private:
Operationrec* const regOperPtr, Operationrec* const regOperPtr,
Tablerec* const regTabPtr); Tablerec* const regTabPtr);
int addTuxEntries(Signal* signal,
Operationrec* regOperPtr,
Tablerec* regTabPtr);
// these crash the node on error // these crash the node on error
void executeTuxCommitTriggers(Signal* signal, void executeTuxCommitTriggers(Signal* signal,
...@@ -1787,6 +1791,10 @@ private: ...@@ -1787,6 +1791,10 @@ private:
Operationrec* regOperPtr, Operationrec* regOperPtr,
Tablerec* const regTabPtr); Tablerec* const regTabPtr);
void removeTuxEntries(Signal* signal,
Operationrec* regOperPtr,
Tablerec* regTabPtr);
// ***************************************************************** // *****************************************************************
// Error Handling routines. // Error Handling routines.
// ***************************************************************** // *****************************************************************
......
...@@ -973,25 +973,7 @@ Dbtup::executeTuxInsertTriggers(Signal* signal, ...@@ -973,25 +973,7 @@ Dbtup::executeTuxInsertTriggers(Signal* signal,
req->pageOffset = regOperPtr->pageOffset; req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpAdd; req->opInfo = TuxMaintReq::OpAdd;
// loop over index list return addTuxEntries(signal, regOperPtr, regTabPtr);
const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers;
TriggerPtr triggerPtr;
triggerList.first(triggerPtr);
while (triggerPtr.i != RNIL) {
ljam();
req->indexId = triggerPtr.p->indexId;
req->errorCode = RNIL;
EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ,
signal, TuxMaintReq::SignalLength);
ljamEntry();
if (req->errorCode != 0) {
ljam();
terrorCode = req->errorCode;
return -1;
}
triggerList.next(triggerPtr);
}
return 0;
} }
int int
...@@ -1012,9 +994,18 @@ Dbtup::executeTuxUpdateTriggers(Signal* signal, ...@@ -1012,9 +994,18 @@ Dbtup::executeTuxUpdateTriggers(Signal* signal,
req->pageOffset = regOperPtr->pageOffset; req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpAdd; req->opInfo = TuxMaintReq::OpAdd;
// loop over index list return addTuxEntries(signal, regOperPtr, regTabPtr);
}
int
Dbtup::addTuxEntries(Signal* signal,
Operationrec* regOperPtr,
Tablerec* regTabPtr)
{
TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers; const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers;
TriggerPtr triggerPtr; TriggerPtr triggerPtr;
Uint32 failPtrI;
triggerList.first(triggerPtr); triggerList.first(triggerPtr);
while (triggerPtr.i != RNIL) { while (triggerPtr.i != RNIL) {
ljam(); ljam();
...@@ -1026,11 +1017,29 @@ Dbtup::executeTuxUpdateTriggers(Signal* signal, ...@@ -1026,11 +1017,29 @@ Dbtup::executeTuxUpdateTriggers(Signal* signal,
if (req->errorCode != 0) { if (req->errorCode != 0) {
ljam(); ljam();
terrorCode = req->errorCode; terrorCode = req->errorCode;
return -1; failPtrI = triggerPtr.i;
goto fail;
} }
triggerList.next(triggerPtr); triggerList.next(triggerPtr);
} }
return 0; return 0;
fail:
req->opInfo = TuxMaintReq::OpRemove;
triggerList.first(triggerPtr);
while (triggerPtr.i != failPtrI) {
ljam();
req->indexId = triggerPtr.p->indexId;
req->errorCode = RNIL;
EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ,
signal, TuxMaintReq::SignalLength);
ljamEntry();
ndbrequire(req->errorCode == 0);
triggerList.next(triggerPtr);
}
#ifdef VM_TRACE
ndbout << "aborted partial tux update: op " << hex << regOperPtr << endl;
#endif
return -1;
} }
int int
...@@ -1049,7 +1058,6 @@ Dbtup::executeTuxCommitTriggers(Signal* signal, ...@@ -1049,7 +1058,6 @@ Dbtup::executeTuxCommitTriggers(Signal* signal,
{ {
TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend(); TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
// get version // get version
// XXX could add prevTupVersion to Operationrec
Uint32 tupVersion; Uint32 tupVersion;
if (regOperPtr->optype == ZINSERT) { if (regOperPtr->optype == ZINSERT) {
if (! regOperPtr->deleteInsertFlag) if (! regOperPtr->deleteInsertFlag)
...@@ -1087,21 +1095,7 @@ Dbtup::executeTuxCommitTriggers(Signal* signal, ...@@ -1087,21 +1095,7 @@ Dbtup::executeTuxCommitTriggers(Signal* signal,
req->pageOffset = regOperPtr->pageOffset; req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpRemove; req->opInfo = TuxMaintReq::OpRemove;
// loop over index list removeTuxEntries(signal, regOperPtr, regTabPtr);
const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers;
TriggerPtr triggerPtr;
triggerList.first(triggerPtr);
while (triggerPtr.i != RNIL) {
ljam();
req->indexId = triggerPtr.p->indexId;
req->errorCode = RNIL;
EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ,
signal, TuxMaintReq::SignalLength);
ljamEntry();
// commit must succeed
ndbrequire(req->errorCode == 0);
triggerList.next(triggerPtr);
}
} }
void void
...@@ -1132,7 +1126,15 @@ Dbtup::executeTuxAbortTriggers(Signal* signal, ...@@ -1132,7 +1126,15 @@ Dbtup::executeTuxAbortTriggers(Signal* signal,
req->pageOffset = regOperPtr->pageOffset; req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpRemove; req->opInfo = TuxMaintReq::OpRemove;
// loop over index list removeTuxEntries(signal, regOperPtr, regTabPtr);
}
void
Dbtup::removeTuxEntries(Signal* signal,
Operationrec* regOperPtr,
Tablerec* regTabPtr)
{
TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers; const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers;
TriggerPtr triggerPtr; TriggerPtr triggerPtr;
triggerList.first(triggerPtr); triggerList.first(triggerPtr);
...@@ -1143,7 +1145,7 @@ Dbtup::executeTuxAbortTriggers(Signal* signal, ...@@ -1143,7 +1145,7 @@ Dbtup::executeTuxAbortTriggers(Signal* signal,
EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ, EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ,
signal, TuxMaintReq::SignalLength); signal, TuxMaintReq::SignalLength);
ljamEntry(); ljamEntry();
// abort must succeed // must succeed
ndbrequire(req->errorCode == 0); ndbrequire(req->errorCode == 0);
triggerList.next(triggerPtr); triggerList.next(triggerPtr);
} }
......
...@@ -135,6 +135,24 @@ abort DELETE none - ...@@ -135,6 +135,24 @@ abort DELETE none -
1) alternatively, store prevTupVersion in operation record. 1) alternatively, store prevTupVersion in operation record.
Abort from ordered index error
------------------------------
Obviously, index update failure causes operation failure.
The operation is then aborted later by TC.
The problem here is with multiple indexes. Some may have been
updated successfully before the one that failed. Therefore
the trigger code aborts the successful ones already in
the prepare phase.
In other words, multiple indexes are treated as one.
Abort from any cause
--------------------
[ hairy stuff ]
Read attributes, query status Read attributes, query status
----------------------------- -----------------------------
...@@ -170,14 +188,11 @@ used to decide if the scan can see the tuple. ...@@ -170,14 +188,11 @@ used to decide if the scan can see the tuple.
This signal may also be called during any phase since commit/abort This signal may also be called during any phase since commit/abort
of all operations is not done in one time-slice. of all operations is not done in one time-slice.
Commit and abort
----------------
[ hairy stuff ]
Problems Problems
-------- --------
Current abort code can destroy a tuple version too early. This Current abort code can destroy a tuple version too early. This
happens in test case "ticuur" (insert-commit-update-update-rollback), happens in test case "ticuur" (insert-commit-update-update-rollback),
if abort of first update arrives before abort of second update. if abort of first update arrives before abort of second update.
vim: set textwidth=68:
...@@ -23,6 +23,11 @@ ...@@ -23,6 +23,11 @@
int int
Dbtux::allocNode(Signal* signal, NodeHandle& node) Dbtux::allocNode(Signal* signal, NodeHandle& node)
{ {
if (ERROR_INSERTED(12007)) {
jam();
CLEAR_ERROR_INSERT_VALUE;
return TuxMaintReq::NoMemError;
}
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
Uint32 pageId = NullTupLoc.getPageId(); Uint32 pageId = NullTupLoc.getPageId();
Uint32 pageOffset = NullTupLoc.getPageOffset(); Uint32 pageOffset = NullTupLoc.getPageOffset();
...@@ -34,6 +39,12 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node) ...@@ -34,6 +39,12 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
node.m_loc = TupLoc(pageId, pageOffset); node.m_loc = TupLoc(pageId, pageOffset);
node.m_node = reinterpret_cast<TreeNode*>(node32); node.m_node = reinterpret_cast<TreeNode*>(node32);
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0); ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
} else {
switch (errorCode) {
case 827:
errorCode = TuxMaintReq::NoMemError;
break;
}
} }
return errorCode; return errorCode;
} }
......
...@@ -175,10 +175,11 @@ ErrorBundle ErrorCodes[] = { ...@@ -175,10 +175,11 @@ ErrorBundle ErrorCodes[] = {
*/ */
{ 623, IS, "623" }, { 623, IS, "623" },
{ 624, IS, "624" }, { 624, IS, "624" },
{ 625, IS, "Out of memory in Ndb Kernel, index part (increase IndexMemory)" }, { 625, IS, "Out of memory in Ndb Kernel, hash index part (increase IndexMemory)" },
{ 800, IS, "Too many ordered indexes (increase MaxNoOfOrderedIndexes)" }, { 800, IS, "Too many ordered indexes (increase MaxNoOfOrderedIndexes)" },
{ 826, IS, "Too many tables and attributes (increase MaxNoOfAttributes or MaxNoOfTables)" }, { 826, IS, "Too many tables and attributes (increase MaxNoOfAttributes or MaxNoOfTables)" },
{ 827, IS, "Out of memory in Ndb Kernel, data part (increase DataMemory)" }, { 827, IS, "Out of memory in Ndb Kernel, table data (increase DataMemory)" },
{ 902, IS, "Out of memory in Ndb Kernel, ordered index data (increase DataMemory)" },
{ 832, IS, "832" }, { 832, IS, "832" },
/** /**
...@@ -205,7 +206,7 @@ ErrorBundle ErrorCodes[] = { ...@@ -205,7 +206,7 @@ ErrorBundle ErrorCodes[] = {
* Internal errors * Internal errors
*/ */
{ 892, IE, "Inconsistent hash index. The index needs to be dropped and recreated" }, { 892, IE, "Inconsistent hash index. The index needs to be dropped and recreated" },
{ 895, IE, "Inconsistent ordered index. The index needs to be dropped and recreated" }, { 901, IE, "Inconsistent ordered index. The index needs to be dropped and recreated" },
{ 202, IE, "202" }, { 202, IE, "202" },
{ 203, IE, "203" }, { 203, IE, "203" },
{ 207, IE, "207" }, { 207, IE, "207" },
......
...@@ -228,6 +228,8 @@ struct Par : public Opt { ...@@ -228,6 +228,8 @@ struct Par : public Opt {
bool m_verify; bool m_verify;
// deadlock possible // deadlock possible
bool m_deadlock; bool m_deadlock;
// abort percentabge
unsigned m_abortpct;
// timer location // timer location
Par(const Opt& opt) : Par(const Opt& opt) :
Opt(opt), Opt(opt),
...@@ -243,7 +245,8 @@ struct Par : public Opt { ...@@ -243,7 +245,8 @@ struct Par : public Opt {
m_pctrange(0), m_pctrange(0),
m_randomkey(false), m_randomkey(false),
m_verify(false), m_verify(false),
m_deadlock(false) { m_deadlock(false),
m_abortpct(0) {
} }
}; };
...@@ -684,7 +687,7 @@ struct Con { ...@@ -684,7 +687,7 @@ struct Con {
NdbResultSet* m_resultset; NdbResultSet* m_resultset;
enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive }; enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
ScanMode m_scanmode; ScanMode m_scanmode;
enum ErrType { ErrNone = 0, ErrDeadlock, ErrOther }; enum ErrType { ErrNone = 0, ErrDeadlock, ErrNospace, ErrOther };
ErrType m_errtype; ErrType m_errtype;
Con() : Con() :
m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_ndb(0), m_dic(0), m_tx(0), m_op(0),
...@@ -705,7 +708,7 @@ struct Con { ...@@ -705,7 +708,7 @@ struct Con {
int setValue(int num, const char* addr); int setValue(int num, const char* addr);
int setBound(int num, int type, const void* value); int setBound(int num, int type, const void* value);
int execute(ExecType t); int execute(ExecType t);
int execute(ExecType t, bool& deadlock); int execute(ExecType t, bool& deadlock, bool& nospace);
int openScanRead(unsigned scanbat, unsigned scanpar); int openScanRead(unsigned scanbat, unsigned scanpar);
int openScanExclusive(unsigned scanbat, unsigned scanpar); int openScanExclusive(unsigned scanbat, unsigned scanpar);
int executeScan(); int executeScan();
...@@ -818,17 +821,21 @@ Con::execute(ExecType t) ...@@ -818,17 +821,21 @@ Con::execute(ExecType t)
} }
int int
Con::execute(ExecType t, bool& deadlock) Con::execute(ExecType t, bool& deadlock, bool& nospace)
{ {
int ret = execute(t); int ret = execute(t);
if (ret != 0) { if (ret != 0 && deadlock && m_errtype == ErrDeadlock) {
if (deadlock && m_errtype == ErrDeadlock) {
LL3("caught deadlock"); LL3("caught deadlock");
ret = 0; ret = 0;
}
} else { } else {
deadlock = false; deadlock = false;
} }
if (ret != 0 && nospace && m_errtype == ErrNospace) {
LL3("caught nospace");
ret = 0;
} else {
nospace = false;
}
CHK(ret == 0); CHK(ret == 0);
return 0; return 0;
} }
...@@ -940,6 +947,8 @@ Con::printerror(NdbOut& out) ...@@ -940,6 +947,8 @@ Con::printerror(NdbOut& out)
die += (code == g_opt.m_die); die += (code == g_opt.m_die);
if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499) if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499)
m_errtype = ErrDeadlock; m_errtype = ErrDeadlock;
if (code == 826 || code == 827 || code == 902)
m_errtype = ErrNospace;
} }
if (m_op && m_op->getNdbError().code != 0) { if (m_op && m_op->getNdbError().code != 0) {
LL0(++any << " op : error " << m_op->getNdbError()); LL0(++any << " op : error " << m_op->getNdbError());
...@@ -1128,6 +1137,16 @@ irandom(unsigned n) ...@@ -1128,6 +1137,16 @@ irandom(unsigned n)
return i; return i;
} }
static bool
randompct(unsigned pct)
{
if (pct == 0)
return false;
if (pct >= 100)
return true;
return urandom(100) < pct;
}
// Val - typed column value // Val - typed column value
struct Val { struct Val {
...@@ -1565,8 +1584,8 @@ struct Set { ...@@ -1565,8 +1584,8 @@ struct Set {
// row methods // row methods
bool exist(unsigned i) const; bool exist(unsigned i) const;
Row::Op pending(unsigned i) const; Row::Op pending(unsigned i) const;
void notpending(unsigned i); void notpending(unsigned i, ExecType et = Commit);
void notpending(const Lst& lst); void notpending(const Lst& lst, ExecType et = Commit);
void calc(Par par, unsigned i); void calc(Par par, unsigned i);
int insrow(Par par, unsigned i); int insrow(Par par, unsigned i);
int updrow(Par par, unsigned i); int updrow(Par par, unsigned i);
...@@ -1775,23 +1794,30 @@ Set::putval(unsigned i, bool force) ...@@ -1775,23 +1794,30 @@ Set::putval(unsigned i, bool force)
} }
void void
Set::notpending(unsigned i) Set::notpending(unsigned i, ExecType et)
{ {
assert(m_row[i] != 0); assert(m_row[i] != 0);
Row& row = *m_row[i]; Row& row = *m_row[i];
if (et == Commit) {
if (row.m_pending == Row::InsOp) if (row.m_pending == Row::InsOp)
row.m_exist = true; row.m_exist = true;
if (row.m_pending == Row::DelOp) if (row.m_pending == Row::DelOp)
row.m_exist = false; row.m_exist = false;
} else {
if (row.m_pending == Row::InsOp)
row.m_exist = false;
if (row.m_pending == Row::DelOp)
row.m_exist = true;
}
row.m_pending = Row::NoOp; row.m_pending = Row::NoOp;
} }
void void
Set::notpending(const Lst& lst) Set::notpending(const Lst& lst, ExecType et)
{ {
for (unsigned j = 0; j < lst.m_cnt; j++) { for (unsigned j = 0; j < lst.m_cnt; j++) {
unsigned i = lst.m_arr[j]; unsigned i = lst.m_arr[j];
notpending(i); notpending(i, et);
} }
} }
...@@ -2121,14 +2147,20 @@ pkinsert(Par par) ...@@ -2121,14 +2147,20 @@ pkinsert(Par par)
lst.push(i); lst.push(i);
if (lst.cnt() == par.m_batch) { if (lst.cnt() == par.m_batch) {
bool deadlock = par.m_deadlock; bool deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); bool nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
con.closeTransaction(); con.closeTransaction();
if (deadlock) { if (deadlock) {
LL1("pkinsert: stop on deadlock"); LL1("pkinsert: stop on deadlock");
return 0; return 0;
} }
if (nospace) {
LL1("pkinsert: cnt=" << j << " stop on nospace");
return 0;
}
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.unlock(); set.unlock();
lst.reset(); lst.reset();
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
...@@ -2136,14 +2168,20 @@ pkinsert(Par par) ...@@ -2136,14 +2168,20 @@ pkinsert(Par par)
} }
if (lst.cnt() != 0) { if (lst.cnt() != 0) {
bool deadlock = par.m_deadlock; bool deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); bool nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
con.closeTransaction(); con.closeTransaction();
if (deadlock) { if (deadlock) {
LL1("pkinsert: stop on deadlock"); LL1("pkinsert: stop on deadlock");
return 0; return 0;
} }
if (nospace) {
LL1("pkinsert: end: stop on nospace");
return 0;
}
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.unlock(); set.unlock();
return 0; return 0;
} }
...@@ -2160,6 +2198,7 @@ pkupdate(Par par) ...@@ -2160,6 +2198,7 @@ pkupdate(Par par)
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
Lst lst; Lst lst;
bool deadlock = false; bool deadlock = false;
bool nospace = false;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2); unsigned i = thrrow(par, j2);
...@@ -2175,27 +2214,37 @@ pkupdate(Par par) ...@@ -2175,27 +2214,37 @@ pkupdate(Par par)
lst.push(i); lst.push(i);
if (lst.cnt() == par.m_batch) { if (lst.cnt() == par.m_batch) {
deadlock = par.m_deadlock; deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
if (deadlock) { if (deadlock) {
LL1("pkupdate: stop on deadlock"); LL1("pkupdate: stop on deadlock");
break; break;
} }
if (nospace) {
LL1("pkupdate: cnt=" << j << " stop on nospace");
break;
}
con.closeTransaction(); con.closeTransaction();
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.unlock(); set.unlock();
lst.reset(); lst.reset();
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
} }
} }
if (! deadlock && lst.cnt() != 0) { if (! deadlock && ! nospace && lst.cnt() != 0) {
deadlock = par.m_deadlock; deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
if (deadlock) { if (deadlock) {
LL1("pkupdate: stop on deadlock"); LL1("pkupdate: stop on deadlock");
} else if (nospace) {
LL1("pkupdate: end: stop on nospace");
} else { } else {
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.unlock(); set.unlock();
} }
} }
...@@ -2212,6 +2261,7 @@ pkdelete(Par par) ...@@ -2212,6 +2261,7 @@ pkdelete(Par par)
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
Lst lst; Lst lst;
bool deadlock = false; bool deadlock = false;
bool nospace = false;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2); unsigned i = thrrow(par, j2);
...@@ -2226,27 +2276,31 @@ pkdelete(Par par) ...@@ -2226,27 +2276,31 @@ pkdelete(Par par)
lst.push(i); lst.push(i);
if (lst.cnt() == par.m_batch) { if (lst.cnt() == par.m_batch) {
deadlock = par.m_deadlock; deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
if (deadlock) { if (deadlock) {
LL1("pkdelete: stop on deadlock"); LL1("pkdelete: stop on deadlock");
break; break;
} }
con.closeTransaction(); con.closeTransaction();
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.unlock(); set.unlock();
lst.reset(); lst.reset();
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
} }
} }
if (! deadlock && lst.cnt() != 0) { if (! deadlock && ! nospace && lst.cnt() != 0) {
deadlock = par.m_deadlock; deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
if (deadlock) { if (deadlock) {
LL1("pkdelete: stop on deadlock"); LL1("pkdelete: stop on deadlock");
} else { } else {
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.unlock(); set.unlock();
} }
} }
...@@ -2730,6 +2784,10 @@ readverify(Par par) ...@@ -2730,6 +2784,10 @@ readverify(Par par)
if (par.m_noverify) if (par.m_noverify)
return 0; return 0;
par.m_verify = true; par.m_verify = true;
if (par.m_abortpct != 0) {
LL2("skip verify in this version"); // implement in 5.0 version
par.m_verify = false;
}
CHK(pkread(par) == 0); CHK(pkread(par) == 0);
CHK(scanreadall(par) == 0); CHK(scanreadall(par) == 0);
return 0; return 0;
...@@ -3028,11 +3086,11 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode) ...@@ -3028,11 +3086,11 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
for (n = 0; n < threads; n++) { for (n = 0; n < threads; n++) {
LL4("start " << n); LL4("start " << n);
Thr& thr = *g_thrlist[n]; Thr& thr = *g_thrlist[n];
thr.m_par.m_tab = par.m_tab; Par oldpar = thr.m_par;
thr.m_par.m_set = par.m_set; // update parameters
thr.m_par.m_tmr = par.m_tmr; thr.m_par = par;
thr.m_par.m_lno = par.m_lno; thr.m_par.m_no = oldpar.m_no;
thr.m_par.m_slno = par.m_slno; thr.m_par.m_con = oldpar.m_con;
thr.m_func = func; thr.m_func = func;
thr.start(); thr.start();
} }
...@@ -3143,6 +3201,24 @@ tbusybuild(Par par) ...@@ -3143,6 +3201,24 @@ tbusybuild(Par par)
return 0; return 0;
} }
static int
trollback(Par par)
{
par.m_abortpct = 50;
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST);
}
return 0;
}
static int static int
ttimebuild(Par par) ttimebuild(Par par)
{ {
...@@ -3252,10 +3328,12 @@ struct TCase { ...@@ -3252,10 +3328,12 @@ struct TCase {
static const TCase static const TCase
tcaselist[] = { tcaselist[] = {
TCase("a", tbuild, "index build"), TCase("a", tbuild, "index build"),
TCase("b", tpkops, "pk operations"), // "b" in 5.0
TCase("c", tpkopsread, "pk operations and scan reads"), TCase("c", tpkops, "pk operations"),
TCase("d", tmixedops, "pk operations and scan operations"), TCase("d", tpkopsread, "pk operations and scan reads"),
TCase("e", tbusybuild, "pk operations and index build"), TCase("e", tmixedops, "pk operations and scan operations"),
TCase("f", tbusybuild, "pk operations and index build"),
TCase("g", trollback, "operations with random rollbacks"),
TCase("t", ttimebuild, "time index build"), TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"), TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"), TCase("v", ttimescan, "time full scan table vs index on pk"),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment