Commit 91cf4853 authored by mskold@mysql.com's avatar mskold@mysql.com

Merge mskold@bk-internal.mysql.com:/home/bk/mysql-5.0-ndb

into mysql.com:/usr/local/home/marty/MySQL/test/mysql-5.0-ndb
parents bf2e97ca e1c2e85f
......@@ -1011,6 +1011,24 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col)
out << "Type" << (Uint32)col.getType();
break;
}
// show unusual (non-MySQL) array size
if (col.getLength() != 1) {
switch (col.getType()) {
case NdbDictionary::Column::Char:
case NdbDictionary::Column::Varchar:
case NdbDictionary::Column::Binary:
case NdbDictionary::Column::Varbinary:
case NdbDictionary::Column::Blob:
case NdbDictionary::Column::Text:
case NdbDictionary::Column::Bit:
case NdbDictionary::Column::Longvarchar:
case NdbDictionary::Column::Longvarbinary:
break;
default:
out << " [" << col.getLength() << "]";
break;
}
}
if (col.getPrimaryKey())
out << " PRIMARY KEY";
else if (! col.getNullable())
......
......@@ -1026,16 +1026,20 @@ NdbOperation::branch_col(Uint32 type,
abort();
}
Uint32 sizeInBytes = col->m_attrSize * col->m_arraySize;
if (val == NULL)
len = 0;
else {
if (! col->getCharType()) {
// prevent assert in NdbSqlUtil on length error
if(len != 0 && len != sizeInBytes)
Uint32 sizeInBytes = col->m_attrSize * col->m_arraySize;
if (len != 0 && len != sizeInBytes)
{
setErrorCodeAbort(4209);
return -1;
}
len = sizeInBytes;
}
}
if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0, false)) == -1)
return -1;
......
......@@ -139,8 +139,9 @@ NdbRecAttr::receive_data(const Uint32 * data, Uint32 sz){
static void
ndbrecattr_print_string(NdbOut& out, const char *type,
const char *ref, unsigned sz)
const char *aref, unsigned sz)
{
const unsigned char* ref = (const unsigned char*)aref;
int i, len, printable= 1;
// trailing zeroes are not printed
for (i=sz-1; i >= 0; i--)
......@@ -166,7 +167,7 @@ ndbrecattr_print_string(NdbOut& out, const char *type,
for (i= len+1; ref[i] != 0; i++)
out.print("%u]",len-i);
assert((int)sz > i);
ndbrecattr_print_string(out,type,ref+i,sz-i);
ndbrecattr_print_string(out,type,aref+i,sz-i);
}
}
......
......@@ -612,7 +612,10 @@ struct Col {
bool m_pk;
Type m_type;
unsigned m_length;
unsigned m_bytelength;
unsigned m_bytelength; // multiplied by char width
unsigned m_attrsize; // base type size
unsigned m_headsize; // length bytes
unsigned m_bytesize; // full value size
bool m_nullable;
const Chs* m_chs;
Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs);
......@@ -629,12 +632,26 @@ Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type typ
m_type(type),
m_length(length),
m_bytelength(length * (chs == 0 ? 1 : chs->m_cs->mbmaxlen)),
m_attrsize(
type == Unsigned ? sizeof(Uint32) :
type == Char ? sizeof(char) :
type == Varchar ? sizeof(char) :
type == Longvarchar ? sizeof(char) : ~0),
m_headsize(
type == Unsigned ? 0 :
type == Char ? 0 :
type == Varchar ? 1 :
type == Longvarchar ? 2 : ~0),
m_bytesize(m_headsize + m_attrsize * m_bytelength),
m_nullable(nullable),
m_chs(chs)
{
// fix long varchar
if (type == Varchar && m_bytelength > 255)
if (type == Varchar && m_bytelength > 255) {
m_type = Longvarchar;
m_headsize += 1;
m_bytesize += 1;
}
}
Col::~Col()
......@@ -1119,13 +1136,15 @@ struct Con {
NdbIndexOperation* m_indexop;
NdbScanOperation* m_scanop;
NdbIndexScanOperation* m_indexscanop;
NdbScanFilter* m_scanfilter;
enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
ScanMode m_scanmode;
enum ErrType { ErrNone = 0, ErrDeadlock, ErrOther };
ErrType m_errtype;
Con() :
m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_indexop(0),
m_scanop(0), m_indexscanop(0), m_scanmode(ScanNo), m_errtype(ErrNone) {}
m_scanop(0), m_indexscanop(0), m_scanfilter(0),
m_scanmode(ScanNo), m_errtype(ErrNone) {}
~Con() {
if (m_tx != 0)
closeTransaction();
......@@ -1140,10 +1159,14 @@ struct Con {
int getNdbScanOperation(const Tab& tab);
int getNdbIndexScanOperation1(const ITab& itab, const Tab& tab);
int getNdbIndexScanOperation(const ITab& itab, const Tab& tab);
int getNdbScanFilter();
int equal(int num, const char* addr);
int getValue(int num, NdbRecAttr*& rec);
int setValue(int num, const char* addr);
int setBound(int num, int type, const void* value);
int beginFilter(int group);
int endFilter();
int setFilter(int num, int cond, const void* value, unsigned len);
int execute(ExecType t);
int execute(ExecType t, bool& deadlock);
int readTuples(Par par);
......@@ -1253,6 +1276,15 @@ Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab)
return 0;
}
int
Con::getNdbScanFilter()
{
assert(m_tx != 0 && m_scanop != 0);
delete m_scanfilter;
m_scanfilter = new NdbScanFilter(m_scanop);
return 0;
}
int
Con::equal(int num, const char* addr)
{
......@@ -1280,11 +1312,35 @@ Con::setValue(int num, const char* addr)
int
Con::setBound(int num, int type, const void* value)
{
assert(m_tx != 0 && m_op != 0);
assert(m_tx != 0 && m_indexscanop != 0);
CHKCON(m_indexscanop->setBound(num, type, value) == 0, *this);
return 0;
}
int
Con::beginFilter(int group)
{
assert(m_tx != 0 && m_scanfilter != 0);
CHKCON(m_scanfilter->begin((NdbScanFilter::Group)group) == 0, *this);
return 0;
}
int
Con::endFilter()
{
assert(m_tx != 0 && m_scanfilter != 0);
CHKCON(m_scanfilter->end() == 0, *this);
return 0;
}
int
Con::setFilter(int num, int cond, const void* value, unsigned len)
{
assert(m_tx != 0 && m_scanfilter != 0);
CHKCON(m_scanfilter->cmp((NdbScanFilter::BinaryCondition)cond, num, value, len) == 0, *this);
return 0;
}
int
Con::execute(ExecType t)
{
......@@ -1502,7 +1558,7 @@ createtable(Par par)
const Col& col = *tab.m_col[k];
NdbDictionary::Column c(col.m_name);
c.setType((NdbDictionary::Column::Type)col.m_type);
c.setLength(col.m_bytelength); // NDB API uses length in bytes
c.setLength(col.m_bytelength); // for char NDB API uses length in bytes
c.setPrimaryKey(col.m_pk);
c.setNullable(col.m_nullable);
if (col.m_chs != 0)
......@@ -2836,6 +2892,7 @@ struct BVal : public Val {
int m_type;
BVal(const ICol& icol);
int setbnd(Par par) const;
int setflt(Par par) const;
};
BVal::BVal(const ICol& icol) :
......@@ -2855,6 +2912,27 @@ BVal::setbnd(Par par) const
return 0;
}
int
BVal::setflt(Par par) const
{
static unsigned index_bound_to_filter_bound[5] = {
NdbScanFilter::COND_GE,
NdbScanFilter::COND_GT,
NdbScanFilter::COND_LE,
NdbScanFilter::COND_LT,
NdbScanFilter::COND_EQ
};
Con& con = par.con();
assert(g_compare_null || ! m_null);
const char* addr = ! m_null ? (const char*)dataaddr() : 0;
const ICol& icol = m_icol;
const Col& col = icol.m_col;
unsigned length = col.m_bytesize;
unsigned cond = index_bound_to_filter_bound[m_type];
CHK(con.setFilter(col.m_num, cond, addr, length) == 0);
return 0;
}
static NdbOut&
operator<<(NdbOut& out, const BVal& bval)
{
......@@ -2882,6 +2960,7 @@ struct BSet {
void calc(Par par);
void calcpk(Par par, unsigned i);
int setbnd(Par par) const;
int setflt(Par par) const;
void filter(Par par, const Set& set, Set& set2) const;
};
......@@ -3005,6 +3084,33 @@ BSet::setbnd(Par par) const
return 0;
}
int
BSet::setflt(Par par) const
{
Con& con = par.con();
CHK(con.getNdbScanFilter() == 0);
CHK(con.beginFilter(NdbScanFilter::AND) == 0);
if (m_bvals != 0) {
unsigned p1 = urandom(m_bvals);
unsigned p2 = 10009; // prime
const unsigned extras = 5;
// random order
for (unsigned j = 0; j < m_bvals + extras; j++) {
unsigned k = p1 + p2 * j;
const BVal& bval = *m_bval[k % m_bvals];
CHK(bval.setflt(par) == 0);
}
// duplicate
if (urandom(5) == 0) {
unsigned k = urandom(m_bvals);
const BVal& bval = *m_bval[k];
CHK(bval.setflt(par) == 0);
}
}
CHK(con.endFilter() == 0);
return 0;
}
void
BSet::filter(Par par, const Set& set, Set& set2) const
{
......@@ -3593,6 +3699,62 @@ scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countche
return 0;
}
static int
scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
{
Con& con = par.con();
const Tab& tab = par.tab();
const Set& set = par.set();
Set set1(tab, set.m_rows);
if (calc) {
while (true) {
bset.calc(par);
bset.filter(par, set, set1);
unsigned n = set1.count();
// prefer proper subset
if (0 < n && n < set.m_rows)
break;
if (urandom(3) == 0)
break;
set1.reset();
}
} else {
bset.filter(par, set, set1);
}
LL3("scanfilter " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify);
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(tab) == 0);
CHK(con.readTuples(par) == 0);
CHK(bset.setflt(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
unsigned n = 0;
bool deadlock = false;
while (1) {
int ret;
deadlock = par.m_deadlock;
CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
if (ret == 1)
break;
if (deadlock) {
LL1("scanfilter: stop on deadlock");
break;
}
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
CHK(set2.putval(i, par.m_dups, n) == 0);
LL4("key " << i << " row " << n << ": " << *set2.m_row[i]);
n++;
}
con.closeTransaction();
if (par.m_verify) {
CHK(set1.verify(par, set2) == 0);
}
LL3("scanfilter " << itab.m_name << " done rows=" << n);
return 0;
}
static int
scanreadindex(Par par, const ITab& itab)
{
......@@ -3600,6 +3762,7 @@ scanreadindex(Par par, const ITab& itab)
for (unsigned i = 0; i < par.m_subsubloop; i++) {
if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows);
CHK(scanreadfilter(par, itab, bset, true) == 0);
CHK(scanreadindex(par, itab, bset, true) == 0);
}
}
......@@ -3626,7 +3789,6 @@ scanreadindex(Par par)
static int
scanreadall(Par par)
{
if (par.m_no < 11)
CHK(scanreadtable(par) == 0);
CHK(scanreadindex(par) == 0);
return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment