Commit d02998e2 authored by pekka@mysql.com's avatar pekka@mysql.com

ndb charsets (wl-1732) final part: use strxfrm + strcoll

parent 23dd2e15
drop table if exists t1;
create table t1 (
a char(3) character set latin1 collate latin1_bin primary key
) engine=ndb;
insert into t1 values('aAa');
insert into t1 values('aaa');
insert into t1 values('AAA');
select * from t1 order by a;
a
AAA
aAa
aaa
select * from t1 where a = 'aAa';
a
aAa
select * from t1 where a = 'aaa';
a
aaa
select * from t1 where a = 'AaA';
a
select * from t1 where a = 'AAA';
a
AAA
drop table t1;
create table t1 (
a char(3) character set latin1 collate latin1_swedish_ci primary key
) engine=ndb;
insert into t1 values('aAa');
insert into t1 values('aaa');
ERROR 23000: Duplicate entry 'aaa' for key 1
insert into t1 values('AAA');
ERROR 23000: Duplicate entry 'AAA' for key 1
select * from t1 order by a;
a
aAa
select * from t1 where a = 'aAa';
a
aAa
select * from t1 where a = 'aaa';
a
aAa
select * from t1 where a = 'AaA';
a
aAa
select * from t1 where a = 'AAA';
a
aAa
drop table t1;
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_bin not null,
unique key(a)
) engine=ndb;
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
select * from t1 order by p;
p a
1 aAa
2 aaa
3 AAA
select * from t1 where a = 'aAa';
p a
1 aAa
select * from t1 where a = 'aaa';
p a
2 aaa
select * from t1 where a = 'AaA';
p a
select * from t1 where a = 'AAA';
p a
3 AAA
drop table t1;
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_swedish_ci not null,
unique key(a)
) engine=ndb;
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
ERROR 23000: Can't write, because of unique constraint, to table 't1'
insert into t1 values(3, 'AAA');
ERROR 23000: Can't write, because of unique constraint, to table 't1'
select * from t1 order by p;
p a
1 aAa
select * from t1 where a = 'aAa';
p a
1 aAa
select * from t1 where a = 'aaa';
p a
1 aAa
select * from t1 where a = 'AaA';
p a
1 aAa
select * from t1 where a = 'AAA';
p a
1 aAa
drop table t1;
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_bin not null,
index(a)
) engine=ndb;
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
insert into t1 values(4, 'aAa');
insert into t1 values(5, 'aaa');
insert into t1 values(6, 'AAA');
select * from t1 order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
explain select * from t1 where a = 'zZz' order by p;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ref a a 3 const 10 Using where; Using filesort
select * from t1 where a = 'aAa' order by p;
p a
1 aAa
4 aAa
select * from t1 where a = 'aaa' order by p;
p a
2 aaa
5 aaa
select * from t1 where a = 'AaA' order by p;
p a
select * from t1 where a = 'AAA' order by p;
p a
3 AAA
6 AAA
drop table t1;
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_swedish_ci not null,
index(a)
) engine=ndb;
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
insert into t1 values(4, 'aAa');
insert into t1 values(5, 'aaa');
insert into t1 values(6, 'AAA');
select * from t1 order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
explain select * from t1 where a = 'zZz' order by p;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ref a a 3 const 10 Using where; Using filesort
select * from t1 where a = 'aAa' order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
select * from t1 where a = 'aaa' order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
select * from t1 where a = 'AaA' order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
select * from t1 where a = 'AAA' order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
drop table t1;
...@@ -4,7 +4,7 @@ PORT varchar(16) NOT NULL, ...@@ -4,7 +4,7 @@ PORT varchar(16) NOT NULL,
ACCESSNODE varchar(16) NOT NULL, ACCESSNODE varchar(16) NOT NULL,
POP varchar(48) NOT NULL, POP varchar(48) NOT NULL,
ACCESSTYPE int unsigned NOT NULL, ACCESSTYPE int unsigned NOT NULL,
CUSTOMER_ID varchar(20) NOT NULL, CUSTOMER_ID varchar(20) collate latin1_bin NOT NULL,
PROVIDER varchar(16), PROVIDER varchar(16),
TEXPIRE int unsigned, TEXPIRE int unsigned,
NUM_IP int unsigned, NUM_IP int unsigned,
......
--source include/have_ndb.inc
--disable_warnings
drop table if exists t1;
--enable_warnings
#
# Minimal NDB charset test.
#
# pk - binary
create table t1 (
a char(3) character set latin1 collate latin1_bin primary key
) engine=ndb;
# ok
insert into t1 values('aAa');
insert into t1 values('aaa');
insert into t1 values('AAA');
# 3
select * from t1 order by a;
# 1
select * from t1 where a = 'aAa';
# 1
select * from t1 where a = 'aaa';
# 0
select * from t1 where a = 'AaA';
# 1
select * from t1 where a = 'AAA';
drop table t1;
# pk - case insensitive
create table t1 (
a char(3) character set latin1 collate latin1_swedish_ci primary key
) engine=ndb;
# ok
insert into t1 values('aAa');
# fail
--error 1062
insert into t1 values('aaa');
--error 1062
insert into t1 values('AAA');
# 1
select * from t1 order by a;
# 1
select * from t1 where a = 'aAa';
# 1
select * from t1 where a = 'aaa';
# 1
select * from t1 where a = 'AaA';
# 1
select * from t1 where a = 'AAA';
drop table t1;
# unique hash index - binary
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_bin not null,
unique key(a)
) engine=ndb;
# ok
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
# 3
select * from t1 order by p;
# 1
select * from t1 where a = 'aAa';
# 1
select * from t1 where a = 'aaa';
# 0
select * from t1 where a = 'AaA';
# 1
select * from t1 where a = 'AAA';
drop table t1;
# unique hash index - case insensitive
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_swedish_ci not null,
unique key(a)
) engine=ndb;
# ok
insert into t1 values(1, 'aAa');
# fail
--error 1169
insert into t1 values(2, 'aaa');
--error 1169
insert into t1 values(3, 'AAA');
# 1
select * from t1 order by p;
# 1
select * from t1 where a = 'aAa';
# 1
select * from t1 where a = 'aaa';
# 1
select * from t1 where a = 'AaA';
# 1
select * from t1 where a = 'AAA';
drop table t1;
# ordered index - binary
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_bin not null,
index(a)
) engine=ndb;
# ok
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
insert into t1 values(4, 'aAa');
insert into t1 values(5, 'aaa');
insert into t1 values(6, 'AAA');
# 6
select * from t1 order by p;
# plan
explain select * from t1 where a = 'zZz' order by p;
# 2
select * from t1 where a = 'aAa' order by p;
# 2
select * from t1 where a = 'aaa' order by p;
# 0
select * from t1 where a = 'AaA' order by p;
# 2
select * from t1 where a = 'AAA' order by p;
drop table t1;
# ordered index - case insensitive
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_swedish_ci not null,
index(a)
) engine=ndb;
# ok
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
insert into t1 values(4, 'aAa');
insert into t1 values(5, 'aaa');
insert into t1 values(6, 'AAA');
# 6
select * from t1 order by p;
# plan
explain select * from t1 where a = 'zZz' order by p;
# 6
select * from t1 where a = 'aAa' order by p;
# 6
select * from t1 where a = 'aaa' order by p;
# 6
select * from t1 where a = 'AaA' order by p;
# 6
select * from t1 where a = 'AAA' order by p;
drop table t1;
...@@ -9,7 +9,7 @@ CREATE TABLE t1 ( ...@@ -9,7 +9,7 @@ CREATE TABLE t1 (
ACCESSNODE varchar(16) NOT NULL, ACCESSNODE varchar(16) NOT NULL,
POP varchar(48) NOT NULL, POP varchar(48) NOT NULL,
ACCESSTYPE int unsigned NOT NULL, ACCESSTYPE int unsigned NOT NULL,
CUSTOMER_ID varchar(20) NOT NULL, CUSTOMER_ID varchar(20) collate latin1_bin NOT NULL,
PROVIDER varchar(16), PROVIDER varchar(16),
TEXPIRE int unsigned, TEXPIRE int unsigned,
NUM_IP int unsigned, NUM_IP int unsigned,
......
...@@ -40,11 +40,14 @@ public: ...@@ -40,11 +40,14 @@ public:
* Compare kernel attribute values. Returns -1, 0, +1 for less, * Compare kernel attribute values. Returns -1, 0, +1 for less,
* equal, greater, respectively. Parameters are pointers to values, * equal, greater, respectively. Parameters are pointers to values,
* full attribute size in words, and size of available data in words. * full attribute size in words, and size of available data in words.
* There is also pointer to type specific extra info. Char types
* receive CHARSET_INFO in it.
*
* If available size is less than full size, CmpUnknown may be * If available size is less than full size, CmpUnknown may be
* returned. If a value cannot be parsed, it compares like NULL i.e. * returned. If a value cannot be parsed, it compares like NULL i.e.
* less than any valid value. * less than any valid value.
*/ */
typedef int Cmp(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size); typedef int Cmp(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size);
enum CmpResult { enum CmpResult {
CmpLess = -1, CmpLess = -1,
...@@ -55,6 +58,7 @@ public: ...@@ -55,6 +58,7 @@ public:
/** /**
* Kernel data types. Must match m_typeList in NdbSqlUtil.cpp. * Kernel data types. Must match m_typeList in NdbSqlUtil.cpp.
* Now also must match types in NdbDictionary.
*/ */
struct Type { struct Type {
enum Enum { enum Enum {
......
This diff is collapsed.
...@@ -7700,6 +7700,7 @@ void Dblqh::accScanConfScanLab(Signal* signal) ...@@ -7700,6 +7700,7 @@ void Dblqh::accScanConfScanLab(Signal* signal)
ndbrequire(sz == boundAiLength); ndbrequire(sz == boundAiLength);
EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO, EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO,
signal, TuxBoundInfo::SignalLength + boundAiLength); signal, TuxBoundInfo::SignalLength + boundAiLength);
jamEntry();
if (req->errorCode != 0) { if (req->errorCode != 0) {
jam(); jam();
/* /*
......
...@@ -1374,7 +1374,8 @@ private: ...@@ -1374,7 +1374,8 @@ private:
const Uint32* inBuffer, const Uint32* inBuffer,
Uint32 inBufLen, Uint32 inBufLen,
Uint32* outBuffer, Uint32* outBuffer,
Uint32 TmaxRead); Uint32 TmaxRead,
bool xfrmFlag);
//------------------------------------------------------------------ //------------------------------------------------------------------
//------------------------------------------------------------------ //------------------------------------------------------------------
...@@ -1620,6 +1621,20 @@ private: ...@@ -1620,6 +1621,20 @@ private:
Uint32 attrDescriptor, Uint32 attrDescriptor,
Uint32 attrDes2); Uint32 attrDes2);
// *****************************************************************
// Read char routines optionally (tXfrmFlag) apply strxfrm
// *****************************************************************
bool readCharNotNULL(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
bool readCharNULLable(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
//------------------------------------------------------------------ //------------------------------------------------------------------
//------------------------------------------------------------------ //------------------------------------------------------------------
bool nullFlagCheck(Uint32 attrDes2); bool nullFlagCheck(Uint32 attrDes2);
...@@ -2225,6 +2240,7 @@ private: ...@@ -2225,6 +2240,7 @@ private:
Uint32 tMaxRead; Uint32 tMaxRead;
Uint32 tOutBufIndex; Uint32 tOutBufIndex;
Uint32* tTupleHeader; Uint32* tTupleHeader;
bool tXfrmFlag;
// updateAttributes module // updateAttributes module
Uint32 tInBufIndex; Uint32 tInBufIndex;
......
...@@ -903,7 +903,8 @@ int Dbtup::handleReadReq(Signal* signal, ...@@ -903,7 +903,8 @@ int Dbtup::handleReadReq(Signal* signal,
&cinBuffer[0], &cinBuffer[0],
regOperPtr->attrinbufLen, regOperPtr->attrinbufLen,
dst, dst,
dstLen); dstLen,
false);
if (TnoOfDataRead != (Uint32)-1) { if (TnoOfDataRead != (Uint32)-1) {
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
// We have read all data into coutBuffer. Now send it to the API. // We have read all data into coutBuffer. Now send it to the API.
...@@ -1274,7 +1275,8 @@ int Dbtup::interpreterStartLab(Signal* signal, ...@@ -1274,7 +1275,8 @@ int Dbtup::interpreterStartLab(Signal* signal,
&cinBuffer[5], &cinBuffer[5],
RinitReadLen, RinitReadLen,
&dst[0], &dst[0],
dstLen); dstLen,
false);
if (TnoDataRW != (Uint32)-1) { if (TnoDataRW != (Uint32)-1) {
RattroutCounter = TnoDataRW; RattroutCounter = TnoDataRW;
RinstructionCounter += RinitReadLen; RinstructionCounter += RinitReadLen;
...@@ -1347,7 +1349,8 @@ int Dbtup::interpreterStartLab(Signal* signal, ...@@ -1347,7 +1349,8 @@ int Dbtup::interpreterStartLab(Signal* signal,
&cinBuffer[RinstructionCounter], &cinBuffer[RinstructionCounter],
RfinalRLen, RfinalRLen,
&dst[RattroutCounter], &dst[RattroutCounter],
(dstLen - RattroutCounter)); (dstLen - RattroutCounter),
false);
if (TnoDataRW != (Uint32)-1) { if (TnoDataRW != (Uint32)-1) {
RattroutCounter += TnoDataRW; RattroutCounter += TnoDataRW;
} else { } else {
...@@ -1487,7 +1490,8 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1487,7 +1490,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
&theAttrinfo, &theAttrinfo,
(Uint32)1, (Uint32)1,
&TregMemBuffer[theRegister], &TregMemBuffer[theRegister],
(Uint32)3); (Uint32)3,
false);
if (TnoDataRW == 2) { if (TnoDataRW == 2) {
/* ------------------------------------------------------------- */ /* ------------------------------------------------------------- */
// Two words read means that we get the instruction plus one 32 // Two words read means that we get the instruction plus one 32
...@@ -1833,7 +1837,8 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1833,7 +1837,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
Int32 TnoDataR = readAttributes(pagePtr, Int32 TnoDataR = readAttributes(pagePtr,
TupHeadOffset, TupHeadOffset,
&attrId, 1, &attrId, 1,
tmpArea, tmpAreaSz); tmpArea, tmpAreaSz,
false);
if (TnoDataR == -1) { if (TnoDataR == -1) {
jam(); jam();
...@@ -1929,7 +1934,8 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1929,7 +1934,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
Int32 TnoDataR = readAttributes(pagePtr, Int32 TnoDataR = readAttributes(pagePtr,
TupHeadOffset, TupHeadOffset,
&attrId, 1, &attrId, 1,
tmpArea, tmpAreaSz); tmpArea, tmpAreaSz,
false);
if (TnoDataR == -1) { if (TnoDataR == -1) {
jam(); jam();
...@@ -1957,7 +1963,8 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1957,7 +1963,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
Int32 TnoDataR = readAttributes(pagePtr, Int32 TnoDataR = readAttributes(pagePtr,
TupHeadOffset, TupHeadOffset,
&attrId, 1, &attrId, 1,
tmpArea, tmpAreaSz); tmpArea, tmpAreaSz,
false);
if (TnoDataR == -1) { if (TnoDataR == -1) {
jam(); jam();
......
...@@ -160,7 +160,7 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu ...@@ -160,7 +160,7 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu
operPtr.i = RNIL; operPtr.i = RNIL;
operPtr.p = NULL; operPtr.p = NULL;
// do it // do it
int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL); int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true);
// restore globals // restore globals
tabptr = tabptr_old; tabptr = tabptr_old;
fragptr = fragptr_old; fragptr = fragptr_old;
...@@ -200,7 +200,7 @@ Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* data ...@@ -200,7 +200,7 @@ Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* data
operPtr.i = RNIL; operPtr.i = RNIL;
operPtr.p = NULL; operPtr.p = NULL;
// do it // do it
int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL); int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true);
// restore globals // restore globals
tabptr = tabptr_old; tabptr = tabptr_old;
fragptr = fragptr_old; fragptr = fragptr_old;
......
...@@ -332,11 +332,11 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -332,11 +332,11 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
} }
if (i == fragOperPtr.p->charsetIndex) { if (i == fragOperPtr.p->charsetIndex) {
ljam(); ljam();
fragOperPtr.p->charsetIndex++;
}
ndbrequire(i < regTabPtr.p->noOfCharsets); ndbrequire(i < regTabPtr.p->noOfCharsets);
regTabPtr.p->charsetArray[i] = cs; regTabPtr.p->charsetArray[i] = cs;
AttributeOffset::setCharsetPos(attrDes2, i); AttributeOffset::setCharsetPos(attrDes2, i);
fragOperPtr.p->charsetIndex++;
}
} }
setTabDescrWord(firstTabDesIndex + 1, attrDes2); setTabDescrWord(firstTabDesIndex + 1, attrDes2);
......
...@@ -35,6 +35,7 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) ...@@ -35,6 +35,7 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
for (Uint32 i = 0; i < regTabPtr->noOfAttr; i++) { for (Uint32 i = 0; i < regTabPtr->noOfAttr; i++) {
Uint32 attrDescriptorStart = startDescriptor + (i << ZAD_LOG_SIZE); Uint32 attrDescriptorStart = startDescriptor + (i << ZAD_LOG_SIZE);
Uint32 attrDescriptor = tableDescriptor[attrDescriptorStart].tabDescr; Uint32 attrDescriptor = tableDescriptor[attrDescriptorStart].tabDescr;
Uint32 attrOffset = tableDescriptor[attrDescriptorStart + 1].tabDescr;
if (!AttributeDescriptor::getDynamic(attrDescriptor)) { if (!AttributeDescriptor::getDynamic(attrDescriptor)) {
if ((AttributeDescriptor::getArrayType(attrDescriptor) == ZNON_ARRAY) || if ((AttributeDescriptor::getArrayType(attrDescriptor) == ZNON_ARRAY) ||
(AttributeDescriptor::getArrayType(attrDescriptor) == ZFIXED_ARRAY)) { (AttributeDescriptor::getArrayType(attrDescriptor) == ZFIXED_ARRAY)) {
...@@ -54,6 +55,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) ...@@ -54,6 +55,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
} else { } else {
ndbrequire(false); ndbrequire(false);
}//if }//if
// replace read function of char attribute
if (AttributeOffset::getCharsetFlag(attrOffset)) {
ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL;
}
} else { } else {
if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) { if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) {
ljam(); ljam();
...@@ -72,6 +78,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) ...@@ -72,6 +78,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHZeroWordNULLable; regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHZeroWordNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable; regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
}//if }//if
// replace read function of char attribute
if (AttributeOffset::getCharsetFlag(attrOffset)) {
ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readCharNULLable;
}
}//if }//if
} else if (AttributeDescriptor::getArrayType(attrDescriptor) == ZVAR_ARRAY) { } else if (AttributeDescriptor::getArrayType(attrDescriptor) == ZVAR_ARRAY) {
if (!AttributeDescriptor::getNullable(attrDescriptor)) { if (!AttributeDescriptor::getNullable(attrDescriptor)) {
...@@ -149,7 +160,8 @@ int Dbtup::readAttributes(Page* const pagePtr, ...@@ -149,7 +160,8 @@ int Dbtup::readAttributes(Page* const pagePtr,
const Uint32* inBuffer, const Uint32* inBuffer,
Uint32 inBufLen, Uint32 inBufLen,
Uint32* outBuffer, Uint32* outBuffer,
Uint32 maxRead) Uint32 maxRead,
bool xfrmFlag)
{ {
Tablerec* const regTabPtr = tabptr.p; Tablerec* const regTabPtr = tabptr.p;
Uint32 numAttributes = regTabPtr->noOfAttr; Uint32 numAttributes = regTabPtr->noOfAttr;
...@@ -162,6 +174,7 @@ int Dbtup::readAttributes(Page* const pagePtr, ...@@ -162,6 +174,7 @@ int Dbtup::readAttributes(Page* const pagePtr,
tCheckOffset = regTabPtr->tupheadsize; tCheckOffset = regTabPtr->tupheadsize;
tMaxRead = maxRead; tMaxRead = maxRead;
tTupleHeader = &pagePtr->pageWord[tupHeadOffset]; tTupleHeader = &pagePtr->pageWord[tupHeadOffset];
tXfrmFlag = xfrmFlag;
ndbrequire(tupHeadOffset + tCheckOffset <= ZWORDS_ON_PAGE); ndbrequire(tupHeadOffset + tCheckOffset <= ZWORDS_ON_PAGE);
while (inBufIndex < inBufLen) { while (inBufIndex < inBufLen) {
...@@ -542,6 +555,74 @@ Dbtup::readDynSmallVarSize(Uint32* outBuffer, ...@@ -542,6 +555,74 @@ Dbtup::readDynSmallVarSize(Uint32* outBuffer,
return false; return false;
}//Dbtup::readDynSmallVarSize() }//Dbtup::readDynSmallVarSize()
bool
Dbtup::readCharNotNULL(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
Uint32 indexBuf = tOutBufIndex;
Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
Uint32 newIndexBuf = indexBuf + attrNoOfWords;
Uint32 maxRead = tMaxRead;
ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset);
if (newIndexBuf <= maxRead) {
ljam();
ahOut->setDataSize(attrNoOfWords);
if (! tXfrmFlag) {
MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
&tTupleHeader[readOffset],
attrNoOfWords);
} else {
ljam();
Tablerec* regTabPtr = tabptr.p;
Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
ndbrequire(i < tabptr.p->noOfCharsets);
// not const in MySQL
CHARSET_INFO* cs = tabptr.p->charsetArray[i];
// XXX should strip Uint32 null padding
const unsigned nBytes = attrNoOfWords << 2;
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)&outBuffer[indexBuf],
nBytes,
(const uchar*)&tTupleHeader[readOffset],
nBytes);
// pad with ascii spaces
while (n < nBytes)
((uchar*)&outBuffer[indexBuf])[n++] = 0x20;
}
tOutBufIndex = newIndexBuf;
return true;
} else {
ljam();
terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
return false;
}
}
bool
Dbtup::readCharNULLable(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
if (!nullFlagCheck(attrDes2)) {
ljam();
return readCharNotNULL(outBuffer,
ahOut,
attrDescriptor,
attrDes2);
} else {
ljam();
ahOut->setNULL();
return true;
}
}
/* ---------------------------------------------------------------------- */ /* ---------------------------------------------------------------------- */
/* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */ /* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */
/* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */ /* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */
......
...@@ -751,7 +751,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, ...@@ -751,7 +751,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
&tableDescriptor[regTabPtr->readKeyArray].tabDescr, &tableDescriptor[regTabPtr->readKeyArray].tabDescr,
regTabPtr->noOfKeyAttr, regTabPtr->noOfKeyAttr,
keyBuffer, keyBuffer,
ZATTR_BUFFER_SIZE); ZATTR_BUFFER_SIZE,
true);
ndbrequire(noPrimKey != (Uint32)-1); ndbrequire(noPrimKey != (Uint32)-1);
Uint32 numAttrsToRead; Uint32 numAttrsToRead;
...@@ -792,7 +793,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, ...@@ -792,7 +793,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
&readBuffer[0], &readBuffer[0],
numAttrsToRead, numAttrsToRead,
mainBuffer, mainBuffer,
ZATTR_BUFFER_SIZE); ZATTR_BUFFER_SIZE,
true);
ndbrequire(noMainWords != (Uint32)-1); ndbrequire(noMainWords != (Uint32)-1);
} else { } else {
ljam(); ljam();
...@@ -816,7 +818,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, ...@@ -816,7 +818,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
&readBuffer[0], &readBuffer[0],
numAttrsToRead, numAttrsToRead,
copyBuffer, copyBuffer,
ZATTR_BUFFER_SIZE); ZATTR_BUFFER_SIZE,
true);
ndbrequire(noCopyWords != (Uint32)-1); ndbrequire(noCopyWords != (Uint32)-1);
if ((noMainWords == noCopyWords) && if ((noMainWords == noCopyWords) &&
......
...@@ -58,7 +58,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons ...@@ -58,7 +58,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start]; NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
const Uint32* const p1 = &searchKey[AttributeHeaderSize]; const Uint32* const p1 = &searchKey[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize];
ret = (*cmp)(p1, p2, size1, size2); ret = (*cmp)(0, p1, p2, size1, size2);
if (ret != 0) { if (ret != 0) {
jam(); jam();
break; break;
...@@ -132,6 +132,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -132,6 +132,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
jam(); jam();
// current attribute // current attribute
const unsigned index = boundInfo.ah().getAttributeId(); const unsigned index = boundInfo.ah().getAttributeId();
ndbrequire(index < frag.m_numAttrs);
const DescAttr& descAttr = descEnt.m_descAttr[index]; const DescAttr& descAttr = descEnt.m_descAttr[index];
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId); ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// full data size // full data size
...@@ -143,7 +144,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -143,7 +144,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index]; NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
const Uint32* const p1 = &boundInfo[AttributeHeaderSize]; const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize];
int ret = (*cmp)(p1, p2, size1, size2); int ret = (*cmp)(0, p1, p2, size1, size2);
if (ret != 0) { if (ret != 0) {
jam(); jam();
return ret; return ret;
......
...@@ -201,8 +201,8 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ...@@ -201,8 +201,8 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
// allocate buffers // allocate buffers
c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes); c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes);
c_sqlCmp = (NdbSqlUtil::Cmp**)allocRecord("c_sqlCmp", sizeof(NdbSqlUtil::Cmp*), MaxIndexAttributes); c_sqlCmp = (NdbSqlUtil::Cmp**)allocRecord("c_sqlCmp", sizeof(NdbSqlUtil::Cmp*), MaxIndexAttributes);
c_searchKey = (Uint32*)allocRecord("c_searchKey", sizeof(Uint32*), MaxIndexAttributes); c_searchKey = (Uint32*)allocRecord("c_searchKey", sizeof(Uint32), MaxAttrDataSize);
c_entryKey = (Uint32*)allocRecord("c_entryKey", sizeof(Uint32*), MaxIndexAttributes); c_entryKey = (Uint32*)allocRecord("c_entryKey", sizeof(Uint32), MaxAttrDataSize);
c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1); c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1);
// ack // ack
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
......
...@@ -112,6 +112,7 @@ Dbtux::execACC_SCANREQ(Signal* signal) ...@@ -112,6 +112,7 @@ Dbtux::execACC_SCANREQ(Signal* signal)
void void
Dbtux::execTUX_BOUND_INFO(Signal* signal) Dbtux::execTUX_BOUND_INFO(Signal* signal)
{ {
jamEntry();
struct BoundInfo { struct BoundInfo {
unsigned offset; unsigned offset;
unsigned size; unsigned size;
......
...@@ -83,7 +83,7 @@ optim 13 mc02/a 39 ms 59 ms 50 pct ...@@ -83,7 +83,7 @@ optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/c 9 ms 12 ms 44 pct mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct mc02/d 246 ms 289 ms 17 pct
[ case d: what happened to PK read performance? ] [ case d: bug in testOIBasic killed PK read performance ]
optim 14 mc02/a 41 ms 60 ms 44 pct optim 14 mc02/a 41 ms 60 ms 44 pct
mc02/b 46 ms 81 ms 73 pct mc02/b 46 ms 81 ms 73 pct
...@@ -91,5 +91,21 @@ optim 14 mc02/a 41 ms 60 ms 44 pct ...@@ -91,5 +91,21 @@ optim 14 mc02/a 41 ms 60 ms 44 pct
mc02/d 242 ms 285 ms 17 pct mc02/d 242 ms 285 ms 17 pct
[ case b: do long keys suffer from many subroutine calls? ] [ case b: do long keys suffer from many subroutine calls? ]
[ case d: bug in testOIBasic killed PK read performance ]
none mc02/a 35 ms 60 ms 71 pct
mc02/b 42 ms 75 ms 76 pct
mc02/c 5 ms 12 ms 106 pct
mc02/d 165 ms 238 ms 44 pct
[ johan re-installed mc02 as fedora gcc-3.3.2 ]
[ case c: table scan has improved... ]
charsets mc02/a 35 ms 60 ms 71 pct
mc02/b 42 ms 84 ms 97 pct
mc02/c 5 ms 12 ms 109 pct
mc02/d 190 ms 236 ms 23 pct
[ case b: TUX can no longer use pointers to TUP data ]
vim: set et: vim: set et:
...@@ -164,6 +164,7 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -164,6 +164,7 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
Uint32 tData; Uint32 tData;
Uint32 tKeyInfoPosition; Uint32 tKeyInfoPosition;
const char* aValue = aValuePassed; const char* aValue = aValuePassed;
Uint32 xfrmData[1024];
Uint32 tempData[1024]; Uint32 tempData[1024];
if ((theStatus == OperationDefined) && if ((theStatus == OperationDefined) &&
...@@ -224,6 +225,21 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -224,6 +225,21 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
m_theIndexDefined[i][2] = true; m_theIndexDefined[i][2] = true;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
const char* aValueToWrite = aValue;
CHARSET_INFO* cs = tAttrInfo->m_cs;
if (cs != 0) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply == 1);
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, sizeInBytes);
while (n < sizeInBytes)
((uchar*)xfrmData)[n++] = 0x20;
aValue = (char*)xfrmData;
}
Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ; Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ;
Uint32 totalSizeInWords = (sizeInBytes + 3)/4;// Inc. bits in last word Uint32 totalSizeInWords = (sizeInBytes + 3)/4;// Inc. bits in last word
Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word
...@@ -314,13 +330,20 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -314,13 +330,20 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
if ((tOpType == InsertRequest) || if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest)) { (tOpType == WriteRequest)) {
if (!tAttrInfo->m_indexOnly){ if (!tAttrInfo->m_indexOnly){
// invalid data can crash kernel
if (cs != NULL &&
(*cs->cset->well_formed_len)(cs,
aValueToWrite,
aValueToWrite + sizeInBytes,
sizeInBytes) != sizeInBytes)
goto equal_error4;
Uint32 ahValue; Uint32 ahValue;
Uint32 sz = totalSizeInWords; Uint32 sz = totalSizeInWords;
AttributeHeader::init(&ahValue, tAttrId, sz); AttributeHeader::init(&ahValue, tAttrId, sz);
insertATTRINFO( ahValue ); insertATTRINFO( ahValue );
insertATTRINFOloop((Uint32*)aValue, sizeInWords); insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords);
if (bitsInLastWord != 0) { if (bitsInLastWord != 0) {
tData = *(Uint32*)(aValue + (sizeInWords << 2)); tData = *(Uint32*)(aValueToWrite + (sizeInWords << 2));
tData = convertEndian(tData); tData = convertEndian(tData);
tData = tData & ((1 << bitsInLastWord) - 1); tData = tData & ((1 << bitsInLastWord) - 1);
tData = convertEndian(tData); tData = convertEndian(tData);
...@@ -411,7 +434,10 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -411,7 +434,10 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
equal_error3: equal_error3:
setErrorCodeAbort(4209); setErrorCodeAbort(4209);
return -1;
equal_error4:
setErrorCodeAbort(744);
return -1; return -1;
} }
......
...@@ -492,6 +492,17 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, ...@@ -492,6 +492,17 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
// Insert Attribute Id into ATTRINFO part. // Insert Attribute Id into ATTRINFO part.
const Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; const Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
CHARSET_INFO* cs = tAttrInfo->m_cs;
// invalid data can crash kernel
if (cs != NULL &&
(*cs->cset->well_formed_len)(cs,
aValue,
aValue + sizeInBytes,
sizeInBytes) != sizeInBytes) {
setErrorCodeAbort(744);
return -1;
}
#if 0 #if 0
tAttrSize = tAttrInfo->theAttrSize; tAttrSize = tAttrInfo->theAttrSize;
tArraySize = tAttrInfo->theArraySize; tArraySize = tAttrInfo->theArraySize;
......
...@@ -60,6 +60,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -60,6 +60,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
Uint32 tData; Uint32 tData;
Uint32 tKeyInfoPosition; Uint32 tKeyInfoPosition;
const char* aValue = aValuePassed; const char* aValue = aValuePassed;
Uint32 xfrmData[1024];
Uint32 tempData[1024]; Uint32 tempData[1024];
if ((theStatus == OperationDefined) && if ((theStatus == OperationDefined) &&
...@@ -117,6 +118,21 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -117,6 +118,21 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
theTupleKeyDefined[i][2] = true; theTupleKeyDefined[i][2] = true;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
const char* aValueToWrite = aValue;
CHARSET_INFO* cs = tAttrInfo->m_cs;
if (cs != 0) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply == 1);
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, sizeInBytes);
while (n < sizeInBytes)
((uchar*)xfrmData)[n++] = 0x20;
aValue = (char*)xfrmData;
}
Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ; Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ;
Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Inc. bits in last word Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Inc. bits in last word
Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word
...@@ -206,13 +222,20 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -206,13 +222,20 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
if ((tOpType == InsertRequest) || if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest)) { (tOpType == WriteRequest)) {
if (!tAttrInfo->m_indexOnly){ if (!tAttrInfo->m_indexOnly){
// invalid data can crash kernel
if (cs != NULL &&
(*cs->cset->well_formed_len)(cs,
aValueToWrite,
aValueToWrite + sizeInBytes,
sizeInBytes) != sizeInBytes)
goto equal_error4;
Uint32 ahValue; Uint32 ahValue;
const Uint32 sz = totalSizeInWords; const Uint32 sz = totalSizeInWords;
AttributeHeader::init(&ahValue, tAttrId, sz); AttributeHeader::init(&ahValue, tAttrId, sz);
insertATTRINFO( ahValue ); insertATTRINFO( ahValue );
insertATTRINFOloop((Uint32*)aValue, sizeInWords); insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords);
if (bitsInLastWord != 0) { if (bitsInLastWord != 0) {
tData = *(Uint32*)(aValue + (sizeInWords << 2)); tData = *(Uint32*)(aValueToWrite + (sizeInWords << 2));
tData = convertEndian(tData); tData = convertEndian(tData);
tData = tData & ((1 << bitsInLastWord) - 1); tData = tData & ((1 << bitsInLastWord) - 1);
tData = convertEndian(tData); tData = convertEndian(tData);
...@@ -311,6 +334,10 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -311,6 +334,10 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
equal_error3: equal_error3:
setErrorCodeAbort(4209); setErrorCodeAbort(4209);
return -1; return -1;
equal_error4:
setErrorCodeAbort(744);
return -1;
} }
/****************************************************************************** /******************************************************************************
......
...@@ -1096,30 +1096,43 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, ...@@ -1096,30 +1096,43 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
theStatus == SetBound && theStatus == SetBound &&
(0 <= type && type <= 4) && (0 <= type && type <= 4) &&
len <= 8000) { len <= 8000) {
// bound type // insert bound type
insertATTRINFO(type); insertATTRINFO(type);
// attribute header
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
// normalize char bound
CHARSET_INFO* cs = tAttrInfo->m_cs;
Uint32 xfrmData[2000];
if (cs != NULL && aValue != NULL) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply == 1);
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, sizeInBytes);
while (n < sizeInBytes)
((uchar*)xfrmData)[n++] = 0x20;
aValue = (char*)xfrmData;
}
if (len != sizeInBytes && (len != 0)) { if (len != sizeInBytes && (len != 0)) {
setErrorCodeAbort(4209); setErrorCodeAbort(4209);
return -1; return -1;
} }
// insert attribute header
len = aValue != NULL ? sizeInBytes : 0; len = aValue != NULL ? sizeInBytes : 0;
Uint32 tIndexAttrId = tAttrInfo->m_attrId; Uint32 tIndexAttrId = tAttrInfo->m_attrId;
Uint32 sizeInWords = (len + 3) / 4; Uint32 sizeInWords = (len + 3) / 4;
AttributeHeader ah(tIndexAttrId, sizeInWords); AttributeHeader ah(tIndexAttrId, sizeInWords);
insertATTRINFO(ah.m_value); insertATTRINFO(ah.m_value);
if (len != 0) { if (len != 0) {
// attribute data // insert attribute data
if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0) if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0)
insertATTRINFOloop((const Uint32*)aValue, sizeInWords); insertATTRINFOloop((const Uint32*)aValue, sizeInWords);
else { else {
Uint32 temp[2000]; Uint32 tempData[2000];
memcpy(temp, aValue, len); memcpy(tempData, aValue, len);
while ((len & 0x3) != 0) while ((len & 0x3) != 0)
((char*)temp)[len++] = 0; ((char*)tempData)[len++] = 0;
insertATTRINFOloop(temp, sizeInWords); insertATTRINFOloop(tempData, sizeInWords);
} }
} }
...@@ -1206,11 +1219,11 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, ...@@ -1206,11 +1219,11 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
if((r1_null ^ (unsigned)r2->isNULL())){ if((r1_null ^ (unsigned)r2->isNULL())){
return (r1_null ? -1 : 1); return (r1_null ? -1 : 1);
} }
Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType; const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4; Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4;
if(!r1_null){ if(!r1_null){
const NdbSqlUtil::Type& t = NdbSqlUtil::getType(type); const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_extType);
int r = (*t.m_cmp)(d1, d2, size, size); int r = (*sqlType.m_cmp)(col.m_cs, d1, d2, size, size);
if(r){ if(r){
assert(r != NdbSqlUtil::CmpUnknown); assert(r != NdbSqlUtil::CmpUnknown);
return r; return r;
......
...@@ -282,7 +282,7 @@ ErrorBundle ErrorCodes[] = { ...@@ -282,7 +282,7 @@ ErrorBundle ErrorCodes[] = {
{ 741, SE, "Unsupported alter table" }, { 741, SE, "Unsupported alter table" },
{ 742, SE, "Unsupported attribute type in index" }, { 742, SE, "Unsupported attribute type in index" },
{ 743, SE, "Unsupported character set in table or index" }, { 743, SE, "Unsupported character set in table or index" },
{ 744, SE, "Character conversion error" }, { 744, SE, "Character string is invalid for given character set" },
{ 241, SE, "Invalid schema object version" }, { 241, SE, "Invalid schema object version" },
{ 283, SE, "Table is being dropped" }, { 283, SE, "Table is being dropped" },
{ 284, SE, "Table not defined in transaction coordinator" }, { 284, SE, "Table not defined in transaction coordinator" },
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include <NdbCondition.h> #include <NdbCondition.h>
#include <NdbThread.h> #include <NdbThread.h>
#include <NdbTick.h> #include <NdbTick.h>
#include <my_sys.h>
// options // options
...@@ -37,6 +38,8 @@ struct Opt { ...@@ -37,6 +38,8 @@ struct Opt {
const char* m_bound; const char* m_bound;
const char* m_case; const char* m_case;
bool m_core; bool m_core;
const char* m_csname;
CHARSET_INFO* m_cs;
bool m_dups; bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype; NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop; unsigned m_idxloop;
...@@ -59,6 +62,8 @@ struct Opt { ...@@ -59,6 +62,8 @@ struct Opt {
m_bound("01234"), m_bound("01234"),
m_case(0), m_case(0),
m_core(false), m_core(false),
m_csname("latin1_bin"),
m_cs(0),
m_dups(false), m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined), m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4), m_idxloop(4),
...@@ -94,6 +99,7 @@ printhelp() ...@@ -94,6 +99,7 @@ printhelp()
<< " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl << " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl
<< " -case abc only given test cases (letters a-z)" << endl << " -case abc only given test cases (letters a-z)" << endl
<< " -core core dump on error [" << d.m_core << "]" << endl << " -core core dump on error [" << d.m_core << "]" << endl
<< " -csname S charset (collation) of non-pk char column [" << d.m_csname << "]" << endl
<< " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl << " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
<< " -fragtype T fragment type single/small/medium/large" << endl << " -fragtype T fragment type single/small/medium/large" << endl
<< " -index xyz only given index numbers (digits 1-9)" << endl << " -index xyz only given index numbers (digits 1-9)" << endl
...@@ -983,6 +989,10 @@ createtable(Par par) ...@@ -983,6 +989,10 @@ createtable(Par par)
c.setLength(col.m_length); c.setLength(col.m_length);
c.setPrimaryKey(col.m_pk); c.setPrimaryKey(col.m_pk);
c.setNullable(col.m_nullable); c.setNullable(col.m_nullable);
if (c.getCharset()) { // test if char type
if (! col.m_pk)
c.setCharset(par.m_cs);
}
t.addColumn(c); t.addColumn(c);
} }
con.m_dic = con.m_ndb->getDictionary(); con.m_dic = con.m_ndb->getDictionary();
...@@ -3149,6 +3159,10 @@ runtest(Par par) ...@@ -3149,6 +3159,10 @@ runtest(Par par)
LL1("start"); LL1("start");
if (par.m_seed != 0) if (par.m_seed != 0)
srandom(par.m_seed); srandom(par.m_seed);
assert(par.m_csname != 0);
CHARSET_INFO* cs;
CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
par.m_cs = cs;
Con con; Con con;
CHK(con.connect() == 0); CHK(con.connect() == 0);
par.m_con = &con; par.m_con = &con;
...@@ -3232,6 +3246,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) ...@@ -3232,6 +3246,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
g_opt.m_core = true; g_opt.m_core = true;
continue; continue;
} }
if (strcmp(arg, "-csname") == 0) {
if (++argv, --argc > 0) {
g_opt.m_csname = strdup(argv[0]);
continue;
}
}
if (strcmp(arg, "-dups") == 0) { if (strcmp(arg, "-dups") == 0) {
g_opt.m_dups = true; g_opt.m_dups = true;
continue; continue;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment