Commit 2f4d0e1e authored by unknown's avatar unknown

wl-1884 storing NULL in ordered index

parent 0c6d41a8
...@@ -86,26 +86,25 @@ public: ...@@ -86,26 +86,25 @@ public:
/** /**
* Define bound on index key in range scan. * Define bound on index key in range scan.
* *
* Each index key can have not null lower and/or upper bound, or can * Each index key can have lower and/or upper bound, or can be set
* be set equal to not null value. The bounds can be defined in any * equal to a value. The bounds can be defined in any order but
* order but a duplicate definition is an error. * a duplicate definition is an error.
* *
* The scan is most effective when bounds are given for an initial * The bounds must specify a single range i.e. they are on an initial
* sequence of non-nullable index keys, and all but the last one is an * sequence of index keys and the condition is equality for all but
* equality. In this case the scan returns a contiguous range from * (at most) the last key which has a lower and/or upper bound.
* each ordered index fragment.
* *
* @note This release implements only the case described above, * NULL is treated like a normal value which is less than any not-NULL
* except for the non-nullable limitation. Other sets of * value and equal to another NULL value. To search for NULL use
* bounds return error or empty result set. * setBound with null pointer (0).
* *
* @note In this release a null key value satisfies any lower * An index stores also all-NULL keys (this may become optional).
* bound and no upper bound. This may change. * Doing index scan with empty bound set returns all table tuples.
* *
* @param attrName Attribute name, alternatively: * @param attrName Attribute name, alternatively:
* @param anAttrId Index column id (starting from 0). * @param anAttrId Index column id (starting from 0)
* @param type Type of bound * @param type Type of bound
* @param value Pointer to bound value * @param value Pointer to bound value, 0 for NULL
* @param len Value length in bytes. * @param len Value length in bytes.
* Fixed per datatype and can be omitted * Fixed per datatype and can be omitted
* @return 0 if successful otherwise -1 * @return 0 if successful otherwise -1
......
...@@ -446,6 +446,7 @@ private: ...@@ -446,6 +446,7 @@ private:
Uint32 m_descPage; // descriptor page Uint32 m_descPage; // descriptor page
Uint16 m_descOff; // offset within the page Uint16 m_descOff; // offset within the page
Uint16 m_numAttrs; Uint16 m_numAttrs;
bool m_storeNullKey;
union { union {
Uint32 nextPool; Uint32 nextPool;
}; };
...@@ -469,6 +470,7 @@ private: ...@@ -469,6 +470,7 @@ private:
Uint32 m_descPage; // copy from index level Uint32 m_descPage; // copy from index level
Uint16 m_descOff; Uint16 m_descOff;
Uint16 m_numAttrs; Uint16 m_numAttrs;
bool m_storeNullKey;
TreeHead m_tree; TreeHead m_tree;
TupLoc m_freeLoc; // one node pre-allocated for insert TupLoc m_freeLoc; // one node pre-allocated for insert
DLList<ScanOp> m_scanList; // current scans on this fragment DLList<ScanOp> m_scanList; // current scans on this fragment
...@@ -993,7 +995,8 @@ Dbtux::Index::Index() : ...@@ -993,7 +995,8 @@ Dbtux::Index::Index() :
m_numFrags(0), m_numFrags(0),
m_descPage(RNIL), m_descPage(RNIL),
m_descOff(0), m_descOff(0),
m_numAttrs(0) m_numAttrs(0),
m_storeNullKey(false)
{ {
for (unsigned i = 0; i < MaxIndexFragments; i++) { for (unsigned i = 0; i < MaxIndexFragments; i++) {
m_fragId[i] = ZNIL; m_fragId[i] = ZNIL;
...@@ -1012,6 +1015,7 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) : ...@@ -1012,6 +1015,7 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) :
m_descPage(RNIL), m_descPage(RNIL),
m_descOff(0), m_descOff(0),
m_numAttrs(ZNIL), m_numAttrs(ZNIL),
m_storeNullKey(false),
m_tree(), m_tree(),
m_freeLoc(), m_freeLoc(),
m_scanList(scanOpPool), m_scanList(scanOpPool),
......
...@@ -62,15 +62,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons ...@@ -62,15 +62,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
} }
} else { } else {
jam(); jam();
// not NULL < NULL // not NULL > NULL
ret = -1; ret = +1;
break; break;
} }
} else { } else {
if (! entryData.ah().isNULL()) { if (! entryData.ah().isNULL()) {
jam(); jam();
// NULL > not NULL // NULL < not NULL
ret = +1; ret = -1;
break; break;
} }
} }
...@@ -116,15 +116,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Tabl ...@@ -116,15 +116,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Tabl
} }
} else { } else {
jam(); jam();
// not NULL < NULL // not NULL > NULL
ret = -1; ret = +1;
break; break;
} }
} else { } else {
if (*entryKey != 0) { if (*entryKey != 0) {
jam(); jam();
// NULL > not NULL // NULL < not NULL
ret = +1; ret = -1;
break; break;
} }
} }
...@@ -180,36 +180,41 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -180,36 +180,41 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
// get and skip bound type // get and skip bound type
type = boundInfo[0]; type = boundInfo[0];
boundInfo += 1; boundInfo += 1;
ndbrequire(! boundInfo.ah().isNULL()); if (! boundInfo.ah().isNULL()) {
if (! entryData.ah().isNULL()) { if (! entryData.ah().isNULL()) {
jam(); jam();
// current attribute // current attribute
const unsigned index = boundInfo.ah().getAttributeId(); const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index]; const DescAttr& descAttr = descEnt.m_descAttr[index];
const unsigned typeId = descAttr.m_typeId; const unsigned typeId = descAttr.m_typeId;
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId); ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// full data size // full data size
const unsigned size1 = boundInfo.ah().getDataSize(); const unsigned size1 = boundInfo.ah().getDataSize();
ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize()); ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2); const unsigned size2 = min(size1, len2);
len2 -= size2; len2 -= size2;
// compare // compare
const Uint32* const p1 = &boundInfo[AttributeHeaderSize]; const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize];
int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2); int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
// XXX until data format errors are handled // XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError); ndbrequire(ret != NdbSqlUtil::CmpError);
if (ret != 0) { if (ret != 0) {
jam();
return ret;
}
} else {
jam(); jam();
return ret; // not NULL > NULL
return +1;
} }
} else { } else {
jam(); jam();
/* if (! entryData.ah().isNULL()) {
* NULL is bigger than any bound, thus the boundary is always to jam();
* the left of NULL. // NULL < not NULL
*/ return -1;
return -1; }
} }
boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize(); boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
entryData += AttributeHeaderSize + entryData.ah().getDataSize(); entryData += AttributeHeaderSize + entryData.ah().getDataSize();
...@@ -258,32 +263,37 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -258,32 +263,37 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
// get and skip bound type // get and skip bound type
type = boundInfo[0]; type = boundInfo[0];
boundInfo += 1; boundInfo += 1;
ndbrequire(! boundInfo.ah().isNULL()); if (! boundInfo.ah().isNULL()) {
if (*entryKey != 0) { if (*entryKey != 0) {
jam(); jam();
// current attribute // current attribute
const unsigned index = boundInfo.ah().getAttributeId(); const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index]; const DescAttr& descAttr = descEnt.m_descAttr[index];
const unsigned typeId = descAttr.m_typeId; const unsigned typeId = descAttr.m_typeId;
// full data size // full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare // compare
const Uint32* const p1 = &boundInfo[AttributeHeaderSize]; const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = *entryKey; const Uint32* const p2 = *entryKey;
int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1); int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
// XXX until data format errors are handled // XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError); ndbrequire(ret != NdbSqlUtil::CmpError);
if (ret != 0) { if (ret != 0) {
jam();
return ret;
}
} else {
jam(); jam();
return ret; // not NULL > NULL
return +1;
} }
} else { } else {
jam(); jam();
/* if (*entryKey != 0) {
* NULL is bigger than any bound, thus the boundary is always to jam();
* the left of NULL. // NULL < not NULL
*/ return -1;
return -1; }
} }
boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize(); boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
entryKey += 1; entryKey += 1;
......
...@@ -82,8 +82,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -82,8 +82,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
ent.m_fragBit = fragBit; ent.m_fragBit = fragBit;
// read search key // read search key
readKeyAttrs(frag, ent, 0, c_searchKey); readKeyAttrs(frag, ent, 0, c_searchKey);
// check if all keys are null if (! frag.m_storeNullKey) {
{ // check if all keys are null
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
bool allNull = true; bool allNull = true;
for (unsigned i = 0; i < numAttrs; i++) { for (unsigned i = 0; i < numAttrs; i++) {
......
...@@ -85,6 +85,7 @@ Dbtux::execTUXFRAGREQ(Signal* signal) ...@@ -85,6 +85,7 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
fragPtr.p->m_fragOff = req->fragOff; fragPtr.p->m_fragOff = req->fragOff;
fragPtr.p->m_fragId = req->fragId; fragPtr.p->m_fragId = req->fragId;
fragPtr.p->m_numAttrs = req->noOfAttr; fragPtr.p->m_numAttrs = req->noOfAttr;
fragPtr.p->m_storeNullKey = true; // not yet configurable
fragPtr.p->m_tupIndexFragPtrI = req->tupIndexFragPtrI; fragPtr.p->m_tupIndexFragPtrI = req->tupIndexFragPtrI;
fragPtr.p->m_tupTableFragPtrI[0] = req->tupTableFragPtrI[0]; fragPtr.p->m_tupTableFragPtrI[0] = req->tupTableFragPtrI[0];
fragPtr.p->m_tupTableFragPtrI[1] = req->tupTableFragPtrI[1]; fragPtr.p->m_tupTableFragPtrI[1] = req->tupTableFragPtrI[1];
...@@ -111,6 +112,7 @@ Dbtux::execTUXFRAGREQ(Signal* signal) ...@@ -111,6 +112,7 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
indexPtr.p->m_tableId = req->primaryTableId; indexPtr.p->m_tableId = req->primaryTableId;
indexPtr.p->m_fragOff = req->fragOff; indexPtr.p->m_fragOff = req->fragOff;
indexPtr.p->m_numAttrs = req->noOfAttr; indexPtr.p->m_numAttrs = req->noOfAttr;
indexPtr.p->m_storeNullKey = true; // not yet configurable
// allocate attribute descriptors // allocate attribute descriptors
if (! allocDescEnt(indexPtr)) { if (! allocDescEnt(indexPtr)) {
jam(); jam();
......
...@@ -137,7 +137,7 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) ...@@ -137,7 +137,7 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength; const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
unsigned offset = 5; unsigned offset = 5;
// walk through entries // walk through entries
while (offset + 2 < req->boundAiLength) { while (offset + 2 <= req->boundAiLength) {
jam(); jam();
const unsigned type = data[offset]; const unsigned type = data[offset];
if (type > 4) { if (type > 4) {
......
...@@ -21,11 +21,11 @@ shows ms / 1000 rows for each and pct overhead ...@@ -21,11 +21,11 @@ shows ms / 1000 rows for each and pct overhead
c c
1 million rows, index on PK, full table scan, full index scan 1 million rows, index on PK, full table scan, full index scan
shows ms / 1000 rows for each and index time pct shows ms / 1000 rows for each and index time overhead
d d
1 million rows, index on PK, read table via each pk, scan index for each pk 1 million rows, index on PK, read table via each pk, scan index for each pk
shows ms / 1000 rows for each and index time pct shows ms / 1000 rows for each and index time overhead
samples 10% of all PKs (100,000 pk reads, 100,000 scans) samples 10% of all PKs (100,000 pk reads, 100,000 scans)
040616 mc02/a 40 ms 87 ms 114 pct 040616 mc02/a 40 ms 87 ms 114 pct
...@@ -66,12 +66,20 @@ optim 11 mc02/a 43 ms 63 ms 46 pct ...@@ -66,12 +66,20 @@ optim 11 mc02/a 43 ms 63 ms 46 pct
optim 12 mc02/a 38 ms 55 ms 43 pct optim 12 mc02/a 38 ms 55 ms 43 pct
mc02/b 47 ms 77 ms 63 pct mc02/b 47 ms 77 ms 63 pct
mc02/c 10 ms 14 ms 147 pct mc02/c 10 ms 14 ms 47 pct
mc02/d 176 ms 281 ms 159 pct mc02/d 176 ms 281 ms 59 pct
optim 13 mc02/a 40 ms 57 ms 42 pct optim 13 mc02/a 40 ms 57 ms 42 pct
mc02/b 47 ms 77 ms 61 pct mc02/b 47 ms 77 ms 61 pct
mc02/c 9 ms 13 ms 150 pct mc02/c 9 ms 13 ms 50 pct
mc02/d 170 ms 256 ms 150 pct mc02/d 170 ms 256 ms 50 pct
after wl-1884 store all-NULL keys (the tests have pctnull=10 per column)
[ what happened to PK read performance? ]
optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/b 47 ms 77 ms 61 pct
mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct
vim: set et: vim: set et:
...@@ -1125,7 +1125,6 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, ...@@ -1125,7 +1125,6 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
if (theOperationType == OpenRangeScanRequest && if (theOperationType == OpenRangeScanRequest &&
theStatus == SetBound && theStatus == SetBound &&
(0 <= type && type <= 4) && (0 <= type && type <= 4) &&
aValue != NULL &&
len <= 8000) { len <= 8000) {
// bound type // bound type
...@@ -1136,20 +1135,22 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, ...@@ -1136,20 +1135,22 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
setErrorCodeAbort(4209); setErrorCodeAbort(4209);
return -1; return -1;
} }
len = sizeInBytes; len = aValue != NULL ? sizeInBytes : 0;
Uint32 tIndexAttrId = tAttrInfo->m_attrId; Uint32 tIndexAttrId = tAttrInfo->m_attrId;
Uint32 sizeInWords = (len + 3) / 4; Uint32 sizeInWords = (len + 3) / 4;
AttributeHeader ah(tIndexAttrId, sizeInWords); AttributeHeader ah(tIndexAttrId, sizeInWords);
insertATTRINFO(ah.m_value); insertATTRINFO(ah.m_value);
// attribute data if (len != 0) {
if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0) // attribute data
insertATTRINFOloop((const Uint32*)aValue, sizeInWords); if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0)
else { insertATTRINFOloop((const Uint32*)aValue, sizeInWords);
Uint32 temp[2000]; else {
memcpy(temp, aValue, len); Uint32 temp[2000];
while ((len & 0x3) != 0) memcpy(temp, aValue, len);
((char*)temp)[len++] = 0; while ((len & 0x3) != 0)
insertATTRINFOloop(temp, sizeInWords); ((char*)temp)[len++] = 0;
insertATTRINFOloop(temp, sizeInWords);
}
} }
/** /**
...@@ -1236,7 +1237,7 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, ...@@ -1236,7 +1237,7 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
Uint32 * d2 = (Uint32*)r2->aRef(); Uint32 * d2 = (Uint32*)r2->aRef();
unsigned r1_null = r1->isNULL(); unsigned r1_null = r1->isNULL();
if((r1_null ^ (unsigned)r2->isNULL())){ if((r1_null ^ (unsigned)r2->isNULL())){
return (r1_null ? 1 : -1); return (r1_null ? -1 : 1);
} }
Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType; Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType;
Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4; Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4;
......
...@@ -85,7 +85,7 @@ printhelp() ...@@ -85,7 +85,7 @@ printhelp()
<< " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl << " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
<< " -fragtype T fragment type single/small/medium/large" << endl << " -fragtype T fragment type single/small/medium/large" << endl
<< " -index xyz only given index numbers (digits 1-9)" << endl << " -index xyz only given index numbers (digits 1-9)" << endl
<< " -loop N loop count full suite forever=0 [" << d.m_loop << "]" << endl << " -loop N loop count full suite 0=forever [" << d.m_loop << "]" << endl
<< " -nologging create tables in no-logging mode" << endl << " -nologging create tables in no-logging mode" << endl
<< " -rows N rows per thread [" << d.m_rows << "]" << endl << " -rows N rows per thread [" << d.m_rows << "]" << endl
<< " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl << " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
...@@ -102,6 +102,12 @@ printhelp() ...@@ -102,6 +102,12 @@ printhelp()
printtables(); printtables();
} }
// not yet configurable
static const bool g_store_null_key = true;
// compare NULL like normal value (NULL < not NULL, NULL == NULL)
static const bool g_compare_null = true;
// log and error macros // log and error macros
static NdbMutex ndbout_mutex = NDB_MUTEX_INITIALIZER; static NdbMutex ndbout_mutex = NDB_MUTEX_INITIALIZER;
...@@ -306,8 +312,8 @@ Tmr::pct(const Tmr& t1) ...@@ -306,8 +312,8 @@ Tmr::pct(const Tmr& t1)
const char* const char*
Tmr::over(const Tmr& t1) Tmr::over(const Tmr& t1)
{ {
if (0 < t1.m_ms && t1.m_ms < m_ms) { if (0 < t1.m_ms) {
sprintf(m_text, "%u pct", (100 * (m_ms - t1.m_ms)) / t1.m_ms); sprintf(m_text, "%d pct", (100 * (m_ms - t1.m_ms)) / t1.m_ms);
} else { } else {
sprintf(m_text, "[cannot measure]"); sprintf(m_text, "[cannot measure]");
} }
...@@ -1168,9 +1174,9 @@ Val::cmp(const Val& val2) const ...@@ -1168,9 +1174,9 @@ Val::cmp(const Val& val2) const
assert(col.m_type == col2.m_type && col.m_length == col2.m_length); assert(col.m_type == col2.m_type && col.m_length == col2.m_length);
if (m_null || val2.m_null) { if (m_null || val2.m_null) {
if (! m_null) if (! m_null)
return -1;
if (! val2.m_null)
return +1; return +1;
if (! val2.m_null)
return -1;
return 0; return 0;
} }
// verify data formats // verify data formats
...@@ -1695,8 +1701,8 @@ int ...@@ -1695,8 +1701,8 @@ int
BVal::setbnd(Par par) const BVal::setbnd(Par par) const
{ {
Con& con = par.con(); Con& con = par.con();
const char* addr = (const char*)dataaddr(); assert(g_compare_null || ! m_null);
assert(! m_null); const char* addr = ! m_null ? (const char*)dataaddr() : 0;
const ICol& icol = m_icol; const ICol& icol = m_icol;
CHK(con.setBound(icol.m_num, m_type, addr) == 0); CHK(con.setBound(icol.m_num, m_type, addr) == 0);
return 0; return 0;
...@@ -1785,7 +1791,8 @@ BSet::calc(Par par) ...@@ -1785,7 +1791,8 @@ BSet::calc(Par par)
if (k + 1 < itab.m_icols) if (k + 1 < itab.m_icols)
bval.m_type = 4; bval.m_type = 4;
// value generation parammeters // value generation parammeters
par.m_pctnull = 0; if (! g_compare_null)
par.m_pctnull = 0;
par.m_pctrange = 50; // bit higher par.m_pctrange = 50; // bit higher
do { do {
bval.calc(par, 0); bval.calc(par, 0);
...@@ -1842,18 +1849,20 @@ BSet::filter(const Set& set, Set& set2) const ...@@ -1842,18 +1849,20 @@ BSet::filter(const Set& set, Set& set2) const
if (! set.exist(i)) if (! set.exist(i))
continue; continue;
const Row& row = *set.m_row[i]; const Row& row = *set.m_row[i];
bool ok1 = false; if (! g_store_null_key) {
for (unsigned k = 0; k < itab.m_icols; k++) { bool ok1 = false;
const ICol& icol = itab.m_icol[k]; for (unsigned k = 0; k < itab.m_icols; k++) {
const Col& col = icol.m_col; const ICol& icol = itab.m_icol[k];
const Val& val = *row.m_val[col.m_num]; const Col& col = icol.m_col;
if (! val.m_null) { const Val& val = *row.m_val[col.m_num];
ok1 = true; if (! val.m_null) {
break; ok1 = true;
break;
}
} }
if (! ok1)
continue;
} }
if (! ok1)
continue;
bool ok2 = true; bool ok2 = true;
for (unsigned j = 0; j < m_bvals; j++) { for (unsigned j = 0; j < m_bvals; j++) {
const BVal& bval = *m_bval[j]; const BVal& bval = *m_bval[j];
...@@ -2727,13 +2736,13 @@ tpkops(Par par) ...@@ -2727,13 +2736,13 @@ tpkops(Par par)
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, MT); RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) { for (unsigned i = 0; i < par.m_subloop; i++) {
RUNSTEP(par, pkupdatescanread, MT); RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, MT); RUNSTEP(par, readverify, ST);
} }
RUNSTEP(par, pkdelete, MT); RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverify, MT); RUNSTEP(par, readverify, ST);
return 0; return 0;
} }
...@@ -2746,10 +2755,10 @@ tmixedops(Par par) ...@@ -2746,10 +2755,10 @@ tmixedops(Par par)
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, MT); RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) { for (unsigned i = 0; i < par.m_subloop; i++) {
RUNSTEP(par, mixedoperations, MT); RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, MT); RUNSTEP(par, readverify, ST);
} }
return 0; return 0;
} }
...@@ -2832,7 +2841,7 @@ ttimescan(Par par) ...@@ -2832,7 +2841,7 @@ ttimescan(Par par)
} }
LL1("full scan table - " << t1.time()); LL1("full scan table - " << t1.time());
LL1("full scan PK index - " << t2.time()); LL1("full scan PK index - " << t2.time());
LL1("index time pct - " << t2.pct(t1)); LL1("overhead - " << t2.over(t1));
return 0; return 0;
} }
...@@ -2854,7 +2863,7 @@ ttimepkread(Par par) ...@@ -2854,7 +2863,7 @@ ttimepkread(Par par)
} }
LL1("pk read table - " << t1.time()); LL1("pk read table - " << t1.time());
LL1("pk read PK index - " << t2.time()); LL1("pk read PK index - " << t2.time());
LL1("index time pct - " << t2.pct(t1)); LL1("overhead - " << t2.over(t1));
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment