ndb - bug#20446: test case and cleanups (not fix)

parent a845df49
......@@ -121,41 +121,17 @@ private:
// forward declarations
struct DescEnt;
/*
* Pointer to array of Uint32.
*/
struct Data {
private:
Uint32* m_data;
public:
Data();
Data(Uint32* data);
Data& operator=(Uint32* data);
operator Uint32*() const;
Data& operator+=(size_t n);
AttributeHeader& ah() const;
};
friend class Data;
// Pointer to array of Uint32 represents attribute data and bounds
/*
* Pointer to array of constant Uint32.
*/
struct ConstData;
friend struct ConstData;
struct ConstData {
private:
const Uint32* m_data;
public:
ConstData();
ConstData(const Uint32* data);
ConstData& operator=(const Uint32* data);
operator const Uint32*() const;
ConstData& operator+=(size_t n);
const AttributeHeader& ah() const;
// non-const pointer can be cast to const pointer
ConstData(Data data);
ConstData& operator=(Data data);
};
typedef Uint32 *Data;
inline AttributeHeader& ah(Data data) {
return *reinterpret_cast<AttributeHeader*>(data);
}
typedef const Uint32* ConstData;
inline const AttributeHeader& ah(ConstData data) {
return *reinterpret_cast<const AttributeHeader*>(data);
}
// AttributeHeader size is assumed to be 1 word
STATIC_CONST( AttributeHeaderSize = 1 );
......@@ -737,99 +713,6 @@ private:
static unsigned max(unsigned x, unsigned y);
};
// Dbtux::Data
inline
Dbtux::Data::Data() :
m_data(0)
{
}
inline
Dbtux::Data::Data(Uint32* data) :
m_data(data)
{
}
inline Dbtux::Data&
Dbtux::Data::operator=(Uint32* data)
{
m_data = data;
return *this;
}
inline
Dbtux::Data::operator Uint32*() const
{
return m_data;
}
inline Dbtux::Data&
Dbtux::Data::operator+=(size_t n)
{
m_data += n;
return *this;
}
inline AttributeHeader&
Dbtux::Data::ah() const
{
return *reinterpret_cast<AttributeHeader*>(m_data);
}
// Dbtux::ConstData
inline
Dbtux::ConstData::ConstData() :
m_data(0)
{
}
inline
Dbtux::ConstData::ConstData(const Uint32* data) :
m_data(data)
{
}
inline Dbtux::ConstData&
Dbtux::ConstData::operator=(const Uint32* data)
{
m_data = data;
return *this;
}
inline
Dbtux::ConstData::operator const Uint32*() const
{
return m_data;
}
inline Dbtux::ConstData&
Dbtux::ConstData::operator+=(size_t n)
{
m_data += n;
return *this;
}
inline const AttributeHeader&
Dbtux::ConstData::ah() const
{
return *reinterpret_cast<const AttributeHeader*>(m_data);
}
inline
Dbtux::ConstData::ConstData(Data data) :
m_data(static_cast<Uint32*>(data))
{
}
inline Dbtux::ConstData&
Dbtux::ConstData::operator=(Data data)
{
m_data = static_cast<Uint32*>(data);
return *this;
}
// Dbtux::TupLoc
inline
......
......@@ -34,7 +34,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
// skip to right position in search key only
for (unsigned i = 0; i < start; i++) {
jam();
searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
}
// number of words of entry data left
unsigned len2 = maxlen;
......@@ -46,16 +46,16 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
break;
}
len2 -= AttributeHeaderSize;
if (! searchKey.ah().isNULL()) {
if (! entryData.ah().isNULL()) {
if (! ah(searchKey).isNULL()) {
if (! ah(entryData).isNULL()) {
jam();
// verify attribute id
const DescAttr& descAttr = descEnt.m_descAttr[start];
ndbrequire(searchKey.ah().getAttributeId() == descAttr.m_primaryAttrId);
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
ndbrequire(ah(searchKey).getAttributeId() == descAttr.m_primaryAttrId);
ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
// sizes
const unsigned size1 = searchKey.ah().getDataSize();
const unsigned size2 = min(entryData.ah().getDataSize(), len2);
const unsigned size1 = ah(searchKey).getDataSize();
const unsigned size2 = min(ah(entryData).getDataSize(), len2);
len2 -= size2;
// compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
......@@ -74,15 +74,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
break;
}
} else {
if (! entryData.ah().isNULL()) {
if (! ah(entryData).isNULL()) {
jam();
// NULL < not NULL
ret = -1;
break;
}
}
searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
entryData += AttributeHeaderSize + entryData.ah().getDataSize();
searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
entryData += AttributeHeaderSize + ah(entryData).getDataSize();
start++;
}
return ret;
......@@ -130,17 +130,17 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsign
// get and skip bound type (it is used after the loop)
type = boundInfo[0];
boundInfo += 1;
if (! boundInfo.ah().isNULL()) {
if (! entryData.ah().isNULL()) {
if (! ah(boundInfo).isNULL()) {
if (! ah(entryData).isNULL()) {
jam();
// verify attribute id
const Uint32 index = boundInfo.ah().getAttributeId();
const Uint32 index = ah(boundInfo).getAttributeId();
ndbrequire(index < frag.m_numAttrs);
const DescAttr& descAttr = descEnt.m_descAttr[index];
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
// sizes
const unsigned size1 = boundInfo.ah().getDataSize();
const unsigned size2 = min(entryData.ah().getDataSize(), len2);
const unsigned size1 = ah(boundInfo).getDataSize();
const unsigned size2 = min(ah(entryData).getDataSize(), len2);
len2 -= size2;
// compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
......@@ -159,14 +159,14 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsign
}
} else {
jam();
if (! entryData.ah().isNULL()) {
if (! ah(entryData).isNULL()) {
jam();
// NULL < not NULL
return -1;
}
}
boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
entryData += AttributeHeaderSize + entryData.ah().getDataSize();
boundInfo += AttributeHeaderSize + ah(boundInfo).getDataSize();
entryData += AttributeHeaderSize + ah(entryData).getDataSize();
boundCount -= 1;
}
// all attributes were equal
......
......@@ -221,7 +221,7 @@ Dbtux::setKeyAttrs(const Frag& frag)
const DescAttr& descAttr = descEnt.m_descAttr[i];
Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// set attr id and fixed size
keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size);
ah(keyAttrs) = AttributeHeader(descAttr.m_primaryAttrId, size);
keyAttrs += 1;
// set comparison method pointer
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
......@@ -251,8 +251,8 @@ Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
ConstData data = keyData;
Uint32 totalSize = 0;
for (Uint32 i = start; i < frag.m_numAttrs; i++) {
Uint32 attrId = data.ah().getAttributeId();
Uint32 dataSize = data.ah().getDataSize();
Uint32 attrId = ah(data).getAttributeId();
Uint32 dataSize = ah(data).getDataSize();
debugOut << i << " attrId=" << attrId << " size=" << dataSize;
data += 1;
for (Uint32 j = 0; j < dataSize; j++) {
......@@ -290,7 +290,7 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2
unsigned len2 = maxlen2;
while (n != 0) {
jam();
const unsigned dataSize = data1.ah().getDataSize();
const unsigned dataSize = ah(data1).getDataSize();
// copy header
if (len2 == 0)
return;
......
......@@ -47,7 +47,6 @@ struct Opt {
int m_die;
bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_subsubloop;
const char* m_index;
unsigned m_loop;
bool m_msglock;
......@@ -56,6 +55,7 @@ struct Opt {
unsigned m_pctnull;
unsigned m_rows;
unsigned m_samples;
unsigned m_scanbatch;
unsigned m_scanpar;
unsigned m_scanstop;
int m_seed;
......@@ -74,7 +74,6 @@ struct Opt {
m_die(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
m_subsubloop(4),
m_index(0),
m_loop(1),
m_msglock(true),
......@@ -83,6 +82,7 @@ struct Opt {
m_pctnull(10),
m_rows(1000),
m_samples(0),
m_scanbatch(0),
m_scanpar(0),
m_scanstop(0),
m_seed(-1),
......@@ -120,9 +120,10 @@ printhelp()
<< " -pctnull N pct NULL values in nullable column [" << d.m_pctnull << "]" << endl
<< " -rows N rows per thread [" << d.m_rows << "]" << endl
<< " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
<< " -scanpar N scan parallelism [" << d.m_scanpar << "]" << endl
<< " -scanbatch N scan batch 0=default [" << d.m_scanbatch << "]" << endl
<< " -scanpar N scan parallel 0=default [" << d.m_scanpar << "]" << endl
<< " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
<< " -subloop N subtest loop count [" << d.m_subloop << "]" << endl
<< " -subloop N subtest (and subsubtest) loop count [" << d.m_subloop << "]" << endl
<< " -table xyz only given table numbers (digits 0-9)" << endl
<< " -threads N number of threads [" << d.m_threads << "]" << endl
<< " -vN verbosity [" << d.m_v << "]" << endl
......@@ -294,6 +295,7 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
char m_currcase[2];
unsigned m_lno;
unsigned m_slno;
unsigned m_totrows;
......@@ -302,6 +304,7 @@ struct Par : public Opt {
unsigned m_pctrange;
unsigned m_pctbrange;
int m_bdir;
bool m_noindexkeyupdate;
// choice of key
bool m_randomkey;
// do verify after read
......@@ -330,6 +333,7 @@ struct Par : public Opt {
m_pctrange(40),
m_pctbrange(80),
m_bdir(0),
m_noindexkeyupdate(false),
m_randomkey(false),
m_verify(false),
m_deadlock(false),
......@@ -337,7 +341,9 @@ struct Par : public Opt {
m_lockmode(NdbOperation::LM_Read),
m_tupscan(false),
m_ordered(false),
m_descending(false) {
m_descending(false)
{
m_currcase[0] = 0;
}
};
......@@ -892,6 +898,8 @@ struct Tab {
const Col** m_col;
unsigned m_itabs;
const ITab** m_itab;
unsigned m_orderedindexes;
unsigned m_hashindexes;
// pk must contain an Unsigned column
unsigned m_keycol;
void coladd(unsigned k, Col* colptr);
......@@ -906,6 +914,8 @@ Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) :
m_col(new const Col* [cols + 1]),
m_itabs(itabs),
m_itab(new const ITab* [itabs + 1]),
m_orderedindexes(0),
m_hashindexes(0),
m_keycol(keycol)
{
for (unsigned k = 0; k <= cols; k++)
......@@ -935,8 +945,12 @@ Tab::coladd(unsigned k, Col* colptr)
void
Tab::itabadd(unsigned j, ITab* itabptr)
{
assert(j < m_itabs && m_itab[j] == 0);
assert(j < m_itabs && m_itab[j] == 0 && itabptr != 0);
m_itab[j] = itabptr;
if (itabptr->m_type == ITab::OrderedIndex)
m_orderedindexes++;
else
m_hashindexes++;
}
static NdbOut&
......@@ -1434,7 +1448,7 @@ Con::readTuples(Par par)
int scan_flags = 0;
if (par.m_tupscan)
scan_flags |= NdbScanOperation::SF_TupScan;
CHKCON(m_scanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar) == 0, *this);
CHKCON(m_scanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
return 0;
}
......@@ -1442,7 +1456,12 @@ int
Con::readIndexTuples(Par par)
{
assert(m_tx != 0 && m_indexscanop != 0);
CHKCON(m_indexscanop->readTuples(par.m_lockmode, 0, par.m_scanpar, par.m_ordered, par.m_descending) == 0, *this);
int scan_flags = 0;
if (par.m_ordered)
scan_flags |= NdbScanOperation::SF_OrderBy;
if (par.m_descending)
scan_flags |= NdbScanOperation::SF_Descending;
CHKCON(m_indexscanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
return 0;
}
......@@ -2193,7 +2212,7 @@ struct Row {
void copy(const Row& row2);
void calc(Par par, unsigned i, unsigned mask = 0);
const Row& dbrow() const;
int verify(Par par, const Row& row2) const;
int verify(Par par, const Row& row2, bool pkonly) const;
int insrow(Par par);
int updrow(Par par);
int updrow(Par par, const ITab& itab);
......@@ -2275,15 +2294,18 @@ Row::dbrow() const
}
int
Row::verify(Par par, const Row& row2) const
Row::verify(Par par, const Row& row2, bool pkonly) const
{
const Tab& tab = m_tab;
const Row& row1 = *this;
assert(&row1.m_tab == &row2.m_tab && row1.m_exist && row2.m_exist);
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val1 = *row1.m_val[k];
const Val& val2 = *row2.m_val[k];
CHK(val1.verify(par, val2) == 0);
const Col& col = row1.m_val[k]->m_col;
if (! pkonly || col.m_pk) {
const Val& val1 = *row1.m_val[k];
const Val& val2 = *row2.m_val[k];
CHK(val1.verify(par, val2) == 0);
}
}
return 0;
}
......@@ -2585,8 +2607,11 @@ struct Set {
int getval(Par par);
int getkey(Par par, unsigned* i);
int putval(unsigned i, bool force, unsigned n = ~0);
// sort rows in-place according to ordered index
void sort(Par par, const ITab& itab);
void sort(Par par, const ITab& itab, unsigned lo, unsigned hi);
// verify
int verify(Par par, const Set& set2) const;
int verify(Par par, const Set& set2, bool pkonly) const;
int verifyorder(Par par, const ITab& itab, bool descending) const;
// protect structure
NdbMutex* m_mutex;
......@@ -2890,6 +2915,7 @@ Set::getkey(Par par, unsigned* i)
assert(m_rec[k] != 0);
const char* aRef = m_rec[k]->aRef();
Uint32 key = *(const Uint32*)aRef;
LL5("getkey: " << key);
CHK(key < m_rows);
*i = key;
return 0;
......@@ -2922,8 +2948,43 @@ Set::putval(unsigned i, bool force, unsigned n)
return 0;
}
void
Set::sort(Par par, const ITab& itab)
{
if (m_rows != 0)
sort(par, itab, 0, m_rows - 1);
}
void
Set::sort(Par par, const ITab& itab, unsigned lo, unsigned hi)
{
assert(lo < m_rows && hi < m_rows && lo <= hi);
Row* const p = m_row[lo];
unsigned i = lo;
unsigned j = hi;
while (i < j) {
while (i < j && m_row[j]->cmp(par, *p, itab) >= 0)
j--;
if (i < j) {
m_row[i] = m_row[j];
i++;
}
while (i < j && m_row[i]->cmp(par, *p, itab) <= 0)
i++;
if (i < j) {
m_row[j] = m_row[i];
j--;
}
}
m_row[i] = p;
if (lo < i)
sort(par, itab, lo, i - 1);
if (hi > i)
sort(par, itab, i + 1, hi);
}
int
Set::verify(Par par, const Set& set2) const
Set::verify(Par par, const Set& set2, bool pkonly) const
{
assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows);
LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count());
......@@ -2932,7 +2993,7 @@ Set::verify(Par par, const Set& set2) const
if (exist(i) != set2.exist(i)) {
ok = false;
} else if (exist(i)) {
if (dbrow(i).verify(par, set2.dbrow(i)) != 0)
if (dbrow(i).verify(par, set2.dbrow(i), pkonly) != 0)
ok = false;
}
if (! ok) {
......@@ -3490,7 +3551,7 @@ pkread(Par par)
con.closeTransaction();
}
if (par.m_verify)
CHK(set1.verify(par, set2) == 0);
CHK(set1.verify(par, set2, false) == 0);
return 0;
}
......@@ -3657,7 +3718,7 @@ hashindexread(Par par, const ITab& itab)
con.closeTransaction();
}
if (par.m_verify)
CHK(set1.verify(par, set2) == 0);
CHK(set1.verify(par, set2, false) == 0);
return 0;
}
......@@ -3698,7 +3759,7 @@ scanreadtable(Par par)
}
con.closeTransaction();
if (par.m_verify)
CHK(set1.verify(par, set2) == 0);
CHK(set1.verify(par, set2, false) == 0);
LL3("scanread " << tab.m_name << " done rows=" << n);
return 0;
}
......@@ -3730,6 +3791,23 @@ scanreadtablefast(Par par, unsigned countcheck)
return 0;
}
// try to get interesting bounds
static void
calcscanbounds(Par par, const ITab& itab, BSet& bset, const Set& set, Set& set1)
{
while (true) {
bset.calc(par);
bset.filter(par, set, set1);
unsigned n = set1.count();
// prefer proper subset
if (0 < n && n < set.m_rows)
break;
if (urandom(5) == 0)
break;
set1.reset();
}
}
static int
scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
{
......@@ -3738,21 +3816,11 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
const Set& set = par.set();
Set set1(tab, set.m_rows);
if (calc) {
while (true) {
bset.calc(par);
bset.filter(par, set, set1);
unsigned n = set1.count();
// prefer proper subset
if (0 < n && n < set.m_rows)
break;
if (urandom(3) == 0)
break;
set1.reset();
}
calcscanbounds(par, itab, bset, set, set1);
} else {
bset.filter(par, set, set1);
}
LL3("scanread " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending);
LL3("scanread " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
......@@ -3780,7 +3848,7 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
}
con.closeTransaction();
if (par.m_verify) {
CHK(set1.verify(par, set2) == 0);
CHK(set1.verify(par, set2, false) == 0);
if (par.m_ordered)
CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
}
......@@ -3825,17 +3893,7 @@ scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
const Set& set = par.set();
Set set1(tab, set.m_rows);
if (calc) {
while (true) {
bset.calc(par);
bset.filter(par, set, set1);
unsigned n = set1.count();
// prefer proper subset
if (0 < n && n < set.m_rows)
break;
if (urandom(3) == 0)
break;
set1.reset();
}
calcscanbounds(par, itab, bset, set, set1);
} else {
bset.filter(par, set, set1);
}
......@@ -3867,7 +3925,7 @@ scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
}
con.closeTransaction();
if (par.m_verify) {
CHK(set1.verify(par, set2) == 0);
CHK(set1.verify(par, set2, false) == 0);
}
LL3("scanfilter " << itab.m_name << " done rows=" << n);
return 0;
......@@ -3877,7 +3935,7 @@ static int
scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subsubloop; i++) {
for (unsigned i = 0; i < par.m_subloop; i++) {
if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows);
CHK(scanreadfilter(par, itab, bset, true) == 0);
......@@ -4068,12 +4126,19 @@ scanupdatetable(Par par)
}
static int
scanupdateindex(Par par, const ITab& itab, const BSet& bset)
scanupdateindex(Par par, const ITab& itab, BSet& bset, bool calc)
{
Con& con = par.con();
const Tab& tab = par.tab();
Set& set = par.set();
LL3("scan update " << itab.m_name);
// expected
Set set1(tab, set.m_rows);
if (calc) {
calcscanbounds(par, itab, bset, set, set1);
} else {
bset.filter(par, set, set1);
}
LL3("scan update " << itab.m_name << " " << bset << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
Set set2(tab, set.m_rows);
par.m_lockmode = NdbOperation::LM_Exclusive;
CHK(con.startTransaction() == 0);
......@@ -4117,7 +4182,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
Par par2 = par;
par2.m_con = &con2;
set.dbsave(i);
set.calc(par, i);
set.calc(par, i, ! par.m_noindexkeyupdate ? 0 : itab.m_colmask);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << itab.m_name << ": " << row);
lst.push(i);
......@@ -4131,6 +4196,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
goto out;
}
con2.closeTransaction();
LL4("scanupdateindex: committed batch [at 1]");
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
......@@ -4148,6 +4214,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
goto out;
}
con2.closeTransaction();
LL4("scanupdateindex: committed batch [at 2]");
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
......@@ -4160,6 +4227,11 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
}
out:
con2.closeTransaction();
if (par.m_verify) {
CHK(set1.verify(par, set2, true) == 0);
if (par.m_ordered)
CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
}
LL3("scan update " << itab.m_name << " rows updated=" << count);
con.closeTransaction();
return 0;
......@@ -4169,11 +4241,10 @@ static int
scanupdateindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subsubloop; i++) {
for (unsigned i = 0; i < par.m_subloop; i++) {
if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0);
CHK(scanupdateindex(par, itab, bset, true) == 0);
} else {
CHK(hashindexupdate(par, itab) == 0);
}
......@@ -4204,22 +4275,6 @@ scanupdateall(Par par)
// medium level routines
static int
readverify(Par par)
{
if (par.m_noverify)
return 0;
par.m_verify = true;
if (par.m_abortpct != 0) {
LL2("skip verify in this version"); // implement in 5.0 version
par.m_verify = false;
}
par.m_lockmode = NdbOperation::LM_CommittedRead;
CHK(pkread(par) == 0);
CHK(scanreadall(par) == 0);
return 0;
}
static int
readverifyfull(Par par)
{
......@@ -4237,8 +4292,7 @@ readverifyfull(Par par)
CHK(scanreadtable(par) == 0);
// once more via tup scan
par.m_tupscan = true;
if (NDB_VERSION < MAKE_VERSION(5, 1, 0)) //TODO
CHK(scanreadtable(par) == 0);
CHK(scanreadtable(par) == 0);
}
// each thread scans different indexes
for (unsigned i = 0; i < tab.m_itabs; i++) {
......@@ -4278,7 +4332,7 @@ pkops(Par par)
{
const Tab& tab = par.tab();
par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
for (unsigned i = 0; i < par.m_subloop; i++) {
unsigned j = 0;
while (j < tab.m_itabs) {
if (tab.m_itab[j] != 0) {
......@@ -4377,6 +4431,33 @@ mixedoperations(Par par)
return 0;
}
static int
parallelorderedupdate(Par par)
{
const Tab& tab = par.tab();
unsigned k = 0;
for (unsigned i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
if (itab.m_type != ITab::OrderedIndex)
continue;
// cannot sync threads yet except via subloop
if (k++ == par.m_slno % tab.m_orderedindexes) {
LL3("parallelorderedupdate: " << itab.m_name);
par.m_noindexkeyupdate = true;
par.m_ordered = true;
par.m_descending = (par.m_slno != 0);
par.m_verify = true;
BSet bset(tab, itab, par.m_rows); // empty bounds
// prefer empty bounds
unsigned sel = urandom(10);
CHK(scanupdateindex(par, itab, bset, sel < 2) == 0);
}
}
return 0;
}
static int
pkupdateindexbuild(Par par)
{
......@@ -4579,7 +4660,7 @@ getthrno()
static int
runstep(Par par, const char* fname, TFunc func, unsigned mode)
{
LL2(fname);
LL2("step: " << fname);
const int threads = (mode & ST ? 1 : par.m_threads);
int n;
for (n = 0; n < threads; n++) {
......@@ -4605,7 +4686,12 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
return 0;
}
#define RUNSTEP(par, func, mode) CHK(runstep(par, #func, func, mode) == 0)
#define RUNSTEP(par, func, mode) \
CHK(runstep(par, #func, func, mode) == 0)
#define SUBLOOP(par) \
"subloop: " << par.m_lno << "/" << par.m_currcase << "/" << \
par.m_tab->m_name << "/" << par.m_slno
static int
tbuild(Par par)
......@@ -4614,20 +4700,30 @@ tbuild(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
if (par.m_slno % 2 == 0) {
LL1(SUBLOOP(par));
if (par.m_slno % 3 == 0) {
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, pkupdate, MT);
} else if (par.m_slno % 3 == 1) {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkupdate, MT);
} else {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
}
RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST);
// leave last one
if (par.m_slno + 1 < par.m_subloop) {
RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST);
}
}
return 0;
}
......@@ -4643,7 +4739,7 @@ tindexscan(Par par)
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL4("subloop " << par.m_slno);
LL1(SUBLOOP(par));
RUNSTEP(par, readverifyindex, MT);
}
return 0;
......@@ -4659,6 +4755,7 @@ tpkops(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count());
RUNSTEP(par, readverifyfull, MT);
......@@ -4675,13 +4772,14 @@ tpkopsread(Par par)
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, readverifyfull, MT);
}
RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, readverifyfull, MT);
return 0;
}
......@@ -4694,10 +4792,11 @@ tmixedops(Par par)
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
......@@ -4710,9 +4809,10 @@ tbusybuild(Par par)
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST);
}
return 0;
......@@ -4728,10 +4828,29 @@ trollback(Par par)
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
static int
tparupdate(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, parallelorderedupdate, MT);
RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
......@@ -4744,6 +4863,7 @@ ttimebuild(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, createindex, ST);
......@@ -4763,6 +4883,7 @@ ttimemaint(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, pkupdate, MT);
......@@ -4792,6 +4913,7 @@ ttimescan(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
......@@ -4818,6 +4940,7 @@ ttimepkread(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
......@@ -4859,6 +4982,7 @@ tcaselist[] = {
TCase("e", tmixedops, "pk operations and scan operations"),
TCase("f", tbusybuild, "pk operations and index build"),
TCase("g", trollback, "operations with random rollbacks"),
TCase("h", tparupdate, "parallel ordered update (bug20446)"),
TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"),
......@@ -4916,17 +5040,16 @@ printtables()
static int
runtest(Par par)
{
LL1("start");
if (par.m_seed == -1) {
// good enough for daily run
unsigned short seed = (getpid() ^ time(0));
LL1("random seed: " << seed);
unsigned short seed = (unsigned short)getpid();
LL0("random seed: " << seed);
srandom((unsigned)seed);
} else if (par.m_seed != 0) {
LL1("random seed: " << par.m_seed);
LL0("random seed: " << par.m_seed);
srandom(par.m_seed);
} else {
LL1("random seed: loop number");
LL0("random seed: loop number");
}
// cs
assert(par.m_csname != 0);
......@@ -4951,22 +5074,25 @@ runtest(Par par)
assert(thr.m_thread != 0);
}
for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
LL1("loop " << par.m_lno);
if (par.m_seed == 0)
LL1("loop: " << par.m_lno);
if (par.m_seed == 0) {
LL1("random seed: " << par.m_lno);
srandom(par.m_lno);
}
for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
continue;
sprintf(par.m_currcase, "%c", tcase.m_name[0]);
makebuiltintables(par);
LL1("case " << tcase.m_name << " - " << tcase.m_desc);
LL1("case: " << par.m_lno << "/" << tcase.m_name << " - " << tcase.m_desc);
for (unsigned j = 0; j < tabcount; j++) {
if (tablist[j] == 0)
continue;
const Tab& tab = *tablist[j];
par.m_tab = &tab;
par.m_set = new Set(tab, par.m_totrows);
LL1("table " << tab.m_name);
LL1("table: " << par.m_lno << "/" << tcase.m_name << "/" << tab.m_name);
CHK(tcase.m_func(par) == 0);
delete par.m_set;
par.m_set = 0;
......@@ -4985,15 +5111,21 @@ runtest(Par par)
delete [] g_thrlist;
g_thrlist = 0;
con.disconnect();
LL1("done");
return 0;
}
NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
static const char* g_progname = "testOIBasic";
int
main(int argc, char** argv)
{
ndb_init();
if (ndbout_mutex == NULL)
ndbout_mutex = NdbMutex_Create();
unsigned i;
ndbout << g_progname;
for (i = 1; i < argc; i++)
ndbout << " " << argv[i];
ndbout << endl;
ndbout_mutex = NdbMutex_Create();
while (++argv, --argc > 0) {
const char* arg = argv[0];
if (*arg != '-') {
......@@ -5103,6 +5235,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
continue;
}
}
if (strcmp(arg, "-scanbatch") == 0) {
if (++argv, --argc > 0) {
g_opt.m_scanbatch = atoi(argv[0]);
continue;
}
}
if (strcmp(arg, "-scanpar") == 0) {
if (++argv, --argc > 0) {
g_opt.m_scanpar = atoi(argv[0]);
......
......@@ -607,7 +607,15 @@ args:
max-time: 5000
cmd: testOIBasic
args:
args: -case abcdefz
max-time: 2000
cmd: testOIBasic
args: -case gz
max-time: 2000
cmd: testOIBasic
args: -case hz
max-time: 2500
cmd: testBitfield
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment