Commit 633eb818 authored by unknown's avatar unknown

Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1

into mysql.com:/space/pekka/ndb/version/my41

parents 79a9f618 235d0787
...@@ -404,8 +404,6 @@ private: ...@@ -404,8 +404,6 @@ private:
Uint32 m_savePointId; Uint32 m_savePointId;
// lock waited for or obtained and not yet passed to LQH // lock waited for or obtained and not yet passed to LQH
Uint32 m_accLockOp; Uint32 m_accLockOp;
// locks obtained and passed to LQH but not yet returned by LQH
Uint32 m_accLockOps[MaxAccLockOps];
Uint8 m_readCommitted; // no locking Uint8 m_readCommitted; // no locking
Uint8 m_lockMode; Uint8 m_lockMode;
Uint8 m_keyInfo; Uint8 m_keyInfo;
...@@ -421,6 +419,13 @@ private: ...@@ -421,6 +419,13 @@ private:
Uint32 nextList; Uint32 nextList;
}; };
Uint32 prevList; Uint32 prevList;
/*
* Locks obtained and passed to LQH but not yet returned by LQH.
* The max was increased from 16 to 992 (default 64). Record max
* ever used in this scan. TODO fix quadratic behaviour
*/
Uint32 m_maxAccLockOps;
Uint32 m_accLockOps[MaxAccLockOps];
ScanOp(ScanBoundPool& scanBoundPool); ScanOp(ScanBoundPool& scanBoundPool);
}; };
typedef Ptr<ScanOp> ScanOpPtr; typedef Ptr<ScanOp> ScanOpPtr;
...@@ -1028,15 +1033,18 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) : ...@@ -1028,15 +1033,18 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_boundMax(scanBoundPool), m_boundMax(scanBoundPool),
m_scanPos(), m_scanPos(),
m_scanEnt(), m_scanEnt(),
m_nodeScan(RNIL) m_nodeScan(RNIL),
m_maxAccLockOps(0)
{ {
m_bound[0] = &m_boundMin; m_bound[0] = &m_boundMin;
m_bound[1] = &m_boundMax; m_bound[1] = &m_boundMax;
m_boundCnt[0] = 0; m_boundCnt[0] = 0;
m_boundCnt[1] = 0; m_boundCnt[1] = 0;
#ifdef VM_TRACE
for (unsigned i = 0; i < MaxAccLockOps; i++) { for (unsigned i = 0; i < MaxAccLockOps; i++) {
m_accLockOps[i] = RNIL; m_accLockOps[i] = 0x1f1f1f1f;
} }
#endif
} }
// Dbtux::Index // Dbtux::Index
......
...@@ -917,7 +917,7 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) ...@@ -917,7 +917,7 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL); ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
// unlock all not unlocked by LQH // unlock all not unlocked by LQH
for (unsigned i = 0; i < MaxAccLockOps; i++) { for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
if (scan.m_accLockOps[i] != RNIL) { if (scan.m_accLockOps[i] != RNIL) {
jam(); jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
...@@ -947,7 +947,7 @@ Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp) ...@@ -947,7 +947,7 @@ Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
ndbrequire(accLockOp != RNIL); ndbrequire(accLockOp != RNIL);
Uint32* list = scan.m_accLockOps; Uint32* list = scan.m_accLockOps;
bool ok = false; bool ok = false;
for (unsigned i = 0; i < MaxAccLockOps; i++) { for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
ndbrequire(list[i] != accLockOp); ndbrequire(list[i] != accLockOp);
if (! ok && list[i] == RNIL) { if (! ok && list[i] == RNIL) {
list[i] = accLockOp; list[i] = accLockOp;
...@@ -955,6 +955,14 @@ Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp) ...@@ -955,6 +955,14 @@ Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
// continue check for duplicates // continue check for duplicates
} }
} }
if (! ok) {
unsigned i = scan.m_maxAccLockOps;
if (i < MaxAccLockOps) {
list[i] = accLockOp;
ok = true;
scan.m_maxAccLockOps = i + 1;
}
}
ndbrequire(ok); ndbrequire(ok);
} }
...@@ -964,7 +972,7 @@ Dbtux::removeAccLockOp(ScanOp& scan, Uint32 accLockOp) ...@@ -964,7 +972,7 @@ Dbtux::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
ndbrequire(accLockOp != RNIL); ndbrequire(accLockOp != RNIL);
Uint32* list = scan.m_accLockOps; Uint32* list = scan.m_accLockOps;
bool ok = false; bool ok = false;
for (unsigned i = 0; i < MaxAccLockOps; i++) { for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
if (list[i] == accLockOp) { if (list[i] == accLockOp) {
list[i] = RNIL; list[i] = RNIL;
ok = true; ok = true;
......
...@@ -13,7 +13,7 @@ case c: full scan: index on PK Unsigned ...@@ -13,7 +13,7 @@ case c: full scan: index on PK Unsigned
testOIBasic -case v -table 1 -index 1 -fragtype small -threads 10 -rows 100000 -subloop 1 -nologging testOIBasic -case v -table 1 -index 1 -fragtype small -threads 10 -rows 100000 -subloop 1 -nologging
case d: scan 1 tuple via EQ: index on PK Unsigned case d: scan 1 tuple via EQ: index on PK Unsigned
testOIBasic -case w -table 1 -index 1 -fragtype small -threads 10 -rows 100000 -samples 10000 -subloop 1 -nologging -v2 testOIBasic -case w -table 1 -index 1 -fragtype small -threads 10 -rows 100000 -samples 50000 -subloop 1 -nologging -v2
a, b a, b
1 million rows, pk update without index, pk update with index 1 million rows, pk update without index, pk update with index
...@@ -132,4 +132,12 @@ optim 17 mc02/a 35 ms 52 ms 49 pct ...@@ -132,4 +132,12 @@ optim 17 mc02/a 35 ms 52 ms 49 pct
wl-1942 mc02/a 35 ms 52 ms 49 pct wl-1942 mc02/a 35 ms 52 ms 49 pct
mc02/b 42 ms 75 ms 76 pct mc02/b 42 ms 75 ms 76 pct
before mc02/c 5 ms 13 ms 126 pct
mc02/d 134 ms 238 ms 78 pct
after mc02/c 5 ms 10 ms 70 pct
mc02/d 178 ms 242 ms 69 pct
[ prelim preformance fix for max batch size 16 -> 992 ]
vim: set et: vim: set et:
...@@ -54,6 +54,7 @@ struct Opt { ...@@ -54,6 +54,7 @@ struct Opt {
unsigned m_samples; unsigned m_samples;
unsigned m_scanbat; unsigned m_scanbat;
unsigned m_scanpar; unsigned m_scanpar;
unsigned m_scanstop;
unsigned m_seed; unsigned m_seed;
unsigned m_subloop; unsigned m_subloop;
const char* m_table; const char* m_table;
...@@ -80,6 +81,7 @@ struct Opt { ...@@ -80,6 +81,7 @@ struct Opt {
m_samples(0), m_samples(0),
m_scanbat(0), m_scanbat(0),
m_scanpar(0), m_scanpar(0),
m_scanstop(0),
m_seed(0), m_seed(0),
m_subloop(4), m_subloop(4),
m_table(0), m_table(0),
...@@ -708,6 +710,7 @@ struct Con { ...@@ -708,6 +710,7 @@ struct Con {
int nextScanResult(bool fetchAllowed, bool& deadlock); int nextScanResult(bool fetchAllowed, bool& deadlock);
int updateScanTuple(Con& con2); int updateScanTuple(Con& con2);
int deleteScanTuple(Con& con2); int deleteScanTuple(Con& con2);
void closeScan();
void closeTransaction(); void closeTransaction();
void printerror(NdbOut& out); void printerror(NdbOut& out);
}; };
...@@ -894,12 +897,22 @@ Con::deleteScanTuple(Con& con2) ...@@ -894,12 +897,22 @@ Con::deleteScanTuple(Con& con2)
return 0; return 0;
} }
void
Con::closeScan()
{
assert(m_resultset != 0);
m_resultset->close();
m_scanop = 0, m_indexscanop = 0, m_resultset = 0;
}
void void
Con::closeTransaction() Con::closeTransaction()
{ {
assert(m_ndb != 0 && m_tx != 0); assert(m_ndb != 0 && m_tx != 0);
m_ndb->closeTransaction(m_tx); m_ndb->closeTransaction(m_tx);
m_tx = 0, m_op = 0; m_tx = 0, m_op = 0;
m_scanop = 0, m_indexscanop = 0, m_resultset = 0;
} }
void void
...@@ -2538,6 +2551,10 @@ scanupdatetable(Par par) ...@@ -2538,6 +2551,10 @@ scanupdatetable(Par par)
LL1("scanupdatetable: stop on deadlock"); LL1("scanupdatetable: stop on deadlock");
break; break;
} }
if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
con.closeScan();
break;
}
do { do {
unsigned i = (unsigned)-1; unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0); CHK(set2.getkey(par, &i) == 0);
...@@ -2618,6 +2635,10 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) ...@@ -2618,6 +2635,10 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
LL1("scanupdateindex: stop on deadlock"); LL1("scanupdateindex: stop on deadlock");
break; break;
} }
if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
con.closeScan();
break;
}
do { do {
unsigned i = (unsigned)-1; unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0); CHK(set2.getkey(par, &i) == 0);
...@@ -2782,6 +2803,7 @@ mixedoperations(Par par) ...@@ -2782,6 +2803,7 @@ mixedoperations(Par par)
{ {
par.m_dups = true; par.m_dups = true;
par.m_deadlock = true; par.m_deadlock = true;
par.m_scanstop = par.m_totrows; // randomly close scans
unsigned sel = urandom(10); unsigned sel = urandom(10);
if (sel < 2) { if (sel < 2) {
CHK(pkdelete(par) == 0); CHK(pkdelete(par) == 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment