Commit e274047c authored by pekka@mysql.com's avatar pekka@mysql.com

Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into mysql.com:/space/pekka/ndb/version/my41-cc
parents f21e6d81 d02998e2
drop table if exists t1;
create table t1 (
a char(3) character set latin1 collate latin1_bin primary key
) engine=ndb;
insert into t1 values('aAa');
insert into t1 values('aaa');
insert into t1 values('AAA');
select * from t1 order by a;
a
AAA
aAa
aaa
select * from t1 where a = 'aAa';
a
aAa
select * from t1 where a = 'aaa';
a
aaa
select * from t1 where a = 'AaA';
a
select * from t1 where a = 'AAA';
a
AAA
drop table t1;
create table t1 (
a char(3) character set latin1 collate latin1_swedish_ci primary key
) engine=ndb;
insert into t1 values('aAa');
insert into t1 values('aaa');
ERROR 23000: Duplicate entry 'aaa' for key 1
insert into t1 values('AAA');
ERROR 23000: Duplicate entry 'AAA' for key 1
select * from t1 order by a;
a
aAa
select * from t1 where a = 'aAa';
a
aAa
select * from t1 where a = 'aaa';
a
aAa
select * from t1 where a = 'AaA';
a
aAa
select * from t1 where a = 'AAA';
a
aAa
drop table t1;
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_bin not null,
unique key(a)
) engine=ndb;
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
select * from t1 order by p;
p a
1 aAa
2 aaa
3 AAA
select * from t1 where a = 'aAa';
p a
1 aAa
select * from t1 where a = 'aaa';
p a
2 aaa
select * from t1 where a = 'AaA';
p a
select * from t1 where a = 'AAA';
p a
3 AAA
drop table t1;
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_swedish_ci not null,
unique key(a)
) engine=ndb;
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
ERROR 23000: Can't write, because of unique constraint, to table 't1'
insert into t1 values(3, 'AAA');
ERROR 23000: Can't write, because of unique constraint, to table 't1'
select * from t1 order by p;
p a
1 aAa
select * from t1 where a = 'aAa';
p a
1 aAa
select * from t1 where a = 'aaa';
p a
1 aAa
select * from t1 where a = 'AaA';
p a
1 aAa
select * from t1 where a = 'AAA';
p a
1 aAa
drop table t1;
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_bin not null,
index(a)
) engine=ndb;
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
insert into t1 values(4, 'aAa');
insert into t1 values(5, 'aaa');
insert into t1 values(6, 'AAA');
select * from t1 order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
explain select * from t1 where a = 'zZz' order by p;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ref a a 3 const 10 Using where; Using filesort
select * from t1 where a = 'aAa' order by p;
p a
1 aAa
4 aAa
select * from t1 where a = 'aaa' order by p;
p a
2 aaa
5 aaa
select * from t1 where a = 'AaA' order by p;
p a
select * from t1 where a = 'AAA' order by p;
p a
3 AAA
6 AAA
drop table t1;
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_swedish_ci not null,
index(a)
) engine=ndb;
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
insert into t1 values(4, 'aAa');
insert into t1 values(5, 'aaa');
insert into t1 values(6, 'AAA');
select * from t1 order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
explain select * from t1 where a = 'zZz' order by p;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ref a a 3 const 10 Using where; Using filesort
select * from t1 where a = 'aAa' order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
select * from t1 where a = 'aaa' order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
select * from t1 where a = 'AaA' order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
select * from t1 where a = 'AAA' order by p;
p a
1 aAa
2 aaa
3 AAA
4 aAa
5 aaa
6 AAA
drop table t1;
......@@ -4,7 +4,7 @@ PORT varchar(16) NOT NULL,
ACCESSNODE varchar(16) NOT NULL,
POP varchar(48) NOT NULL,
ACCESSTYPE int unsigned NOT NULL,
CUSTOMER_ID varchar(20) NOT NULL,
CUSTOMER_ID varchar(20) collate latin1_bin NOT NULL,
PROVIDER varchar(16),
TEXPIRE int unsigned,
NUM_IP int unsigned,
......
--source include/have_ndb.inc
--disable_warnings
drop table if exists t1;
--enable_warnings
#
# Minimal NDB charset test.
#
# pk - binary
create table t1 (
a char(3) character set latin1 collate latin1_bin primary key
) engine=ndb;
# ok
insert into t1 values('aAa');
insert into t1 values('aaa');
insert into t1 values('AAA');
# 3
select * from t1 order by a;
# 1
select * from t1 where a = 'aAa';
# 1
select * from t1 where a = 'aaa';
# 0
select * from t1 where a = 'AaA';
# 1
select * from t1 where a = 'AAA';
drop table t1;
# pk - case insensitive
create table t1 (
a char(3) character set latin1 collate latin1_swedish_ci primary key
) engine=ndb;
# ok
insert into t1 values('aAa');
# fail
--error 1062
insert into t1 values('aaa');
--error 1062
insert into t1 values('AAA');
# 1
select * from t1 order by a;
# 1
select * from t1 where a = 'aAa';
# 1
select * from t1 where a = 'aaa';
# 1
select * from t1 where a = 'AaA';
# 1
select * from t1 where a = 'AAA';
drop table t1;
# unique hash index - binary
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_bin not null,
unique key(a)
) engine=ndb;
# ok
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
# 3
select * from t1 order by p;
# 1
select * from t1 where a = 'aAa';
# 1
select * from t1 where a = 'aaa';
# 0
select * from t1 where a = 'AaA';
# 1
select * from t1 where a = 'AAA';
drop table t1;
# unique hash index - case insensitive
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_swedish_ci not null,
unique key(a)
) engine=ndb;
# ok
insert into t1 values(1, 'aAa');
# fail
--error 1169
insert into t1 values(2, 'aaa');
--error 1169
insert into t1 values(3, 'AAA');
# 1
select * from t1 order by p;
# 1
select * from t1 where a = 'aAa';
# 1
select * from t1 where a = 'aaa';
# 1
select * from t1 where a = 'AaA';
# 1
select * from t1 where a = 'AAA';
drop table t1;
# ordered index - binary
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_bin not null,
index(a)
) engine=ndb;
# ok
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
insert into t1 values(4, 'aAa');
insert into t1 values(5, 'aaa');
insert into t1 values(6, 'AAA');
# 6
select * from t1 order by p;
# plan
explain select * from t1 where a = 'zZz' order by p;
# 2
select * from t1 where a = 'aAa' order by p;
# 2
select * from t1 where a = 'aaa' order by p;
# 0
select * from t1 where a = 'AaA' order by p;
# 2
select * from t1 where a = 'AAA' order by p;
drop table t1;
# ordered index - case insensitive
create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_swedish_ci not null,
index(a)
) engine=ndb;
# ok
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
insert into t1 values(3, 'AAA');
insert into t1 values(4, 'aAa');
insert into t1 values(5, 'aaa');
insert into t1 values(6, 'AAA');
# 6
select * from t1 order by p;
# plan
explain select * from t1 where a = 'zZz' order by p;
# 6
select * from t1 where a = 'aAa' order by p;
# 6
select * from t1 where a = 'aaa' order by p;
# 6
select * from t1 where a = 'AaA' order by p;
# 6
select * from t1 where a = 'AAA' order by p;
drop table t1;
......@@ -9,7 +9,7 @@ CREATE TABLE t1 (
ACCESSNODE varchar(16) NOT NULL,
POP varchar(48) NOT NULL,
ACCESSTYPE int unsigned NOT NULL,
CUSTOMER_ID varchar(20) NOT NULL,
CUSTOMER_ID varchar(20) collate latin1_bin NOT NULL,
PROVIDER varchar(16),
TEXPIRE int unsigned,
NUM_IP int unsigned,
......
......@@ -89,7 +89,8 @@ public:
ArraySizeTooBig = 737,
RecordTooBig = 738,
InvalidPrimaryKeySize = 739,
NullablePrimaryKey = 740
NullablePrimaryKey = 740,
InvalidCharset = 743
};
private:
......
......@@ -130,7 +130,7 @@ private:
Uint32 keyLength;
Uint32 nextLCP;
Uint32 noOfKeyAttr;
Uint32 noOfNewAttr;
Uint32 noOfNewAttr; // noOfCharsets in upper half
Uint32 checksumIndicator;
Uint32 noOfAttributeGroups;
Uint32 GCPIndicator;
......
......@@ -119,12 +119,13 @@ class TupAddAttrReq {
friend class Dblqh;
friend class Dbtux;
public:
STATIC_CONST( SignalLength = 4 );
STATIC_CONST( SignalLength = 5 );
private:
Uint32 tupConnectPtr;
Uint32 notused1;
Uint32 attrId;
Uint32 attrDescriptor;
Uint32 extTypeInfo;
};
class TupAddAttrConf {
......@@ -141,6 +142,10 @@ class TupAddAttrRef {
friend class Dbtup;
public:
STATIC_CONST( SignalLength = 2 );
enum ErrorCode {
NoError = 0,
InvalidCharset = 743
};
private:
Uint32 userPtr;
Uint32 errorCode;
......@@ -178,7 +183,8 @@ public:
STATIC_CONST( SignalLength = 2 );
enum ErrorCode {
NoError = 0,
InvalidAttributeType = 831,
InvalidAttributeType = 742,
InvalidCharset = 743,
InvalidNodeSize = 832
};
private:
......
......@@ -40,11 +40,14 @@ public:
* Compare kernel attribute values. Returns -1, 0, +1 for less,
* equal, greater, respectively. Parameters are pointers to values,
* full attribute size in words, and size of available data in words.
* There is also pointer to type specific extra info. Char types
* receive CHARSET_INFO in it.
*
* If available size is less than full size, CmpUnknown may be
* returned. If a value cannot be parsed, it compares like NULL i.e.
* less than any valid value.
*/
typedef int Cmp(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size);
typedef int Cmp(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size);
enum CmpResult {
CmpLess = -1,
......@@ -55,6 +58,7 @@ public:
/**
* Kernel data types. Must match m_typeList in NdbSqlUtil.cpp.
* Now also must match types in NdbDictionary.
*/
struct Type {
enum Enum {
......@@ -90,6 +94,11 @@ public:
*/
static const Type& getType(Uint32 typeId);
/**
* Get type by id but replace char type by corresponding binary type.
*/
static const Type& getTypeBinary(Uint32 typeId);
/**
* Check character set.
*/
......
This diff is collapsed.
......@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <my_sys.h>
#define DBDICT_C
#include "Dbdict.hpp"
......@@ -4100,6 +4101,8 @@ Dbdict::execADD_FRAGREQ(Signal* signal) {
req->noOfKeyAttr = tabPtr.p->noOfPrimkey;
req->noOfNewAttr = 0;
// noOfCharsets passed to TUP in upper half
req->noOfNewAttr |= (tabPtr.p->noOfCharsets << 16);
req->checksumIndicator = 1;
req->noOfAttributeGroups = 1;
req->GCPIndicator = 0;
......@@ -4161,6 +4164,8 @@ Dbdict::sendLQHADDATTRREQ(Signal* signal,
entry.attrId = attrPtr.p->attributeId;
entry.attrDescriptor = attrPtr.p->attributeDescriptor;
entry.extTypeInfo = attrPtr.p->extType;
// charset number passed to TUP, TUX in upper half
entry.extTypeInfo |= (attrPtr.p->extPrecision & ~0xFFFF);
if (tabPtr.p->isIndex()) {
Uint32 primaryAttrId;
if (attrPtr.p->nextAttrInTable != RNIL) {
......@@ -4697,6 +4702,8 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
Uint32 keyLength = 0;
Uint32 attrCount = tablePtr.p->noOfAttributes;
Uint32 nullCount = 0;
Uint32 noOfCharsets = 0;
Uint16 charsets[128];
Uint32 recordLength = 0;
AttributeRecordPtr attrPtr;
c_attributeRecordHash.removeAll();
......@@ -4751,6 +4758,31 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
attrPtr.p->extPrecision = attrDesc.AttributeExtPrecision;
attrPtr.p->extScale = attrDesc.AttributeExtScale;
attrPtr.p->extLength = attrDesc.AttributeExtLength;
// charset in upper half of precision
unsigned csNumber = (attrPtr.p->extPrecision >> 16);
if (csNumber != 0) {
CHARSET_INFO* cs = get_charset(csNumber, MYF(0));
if (cs == NULL) {
parseP->errorCode = CreateTableRef::InvalidCharset;
parseP->errorLine = __LINE__;
return;
}
unsigned i = 0;
while (i < noOfCharsets) {
if (charsets[i] == csNumber)
break;
i++;
}
if (i == noOfCharsets) {
noOfCharsets++;
if (noOfCharsets > sizeof(charsets)/sizeof(charsets[0])) {
parseP->errorCode = CreateTableRef::InvalidFormat;
parseP->errorLine = __LINE__;
return;
}
charsets[i] = csNumber;
}
}
/**
* Ignore incoming old-style type and recompute it.
......@@ -4814,6 +4846,7 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
tablePtr.p->noOfPrimkey = keyCount;
tablePtr.p->noOfNullAttr = nullCount;
tablePtr.p->noOfCharsets = noOfCharsets;
tablePtr.p->tupKeyLength = keyLength;
tabRequire(recordLength<= MAX_TUPLE_SIZE_IN_WORDS,
......
......@@ -455,7 +455,7 @@ public:
Uint16 totalAttrReceived;
Uint16 fragCopyCreation;
Uint16 noOfKeyAttr;
Uint16 noOfNewAttr;
Uint32 noOfNewAttr; // noOfCharsets in upper half
Uint16 noOfAttributeGroups;
Uint16 lh3DistrBits;
Uint16 tableType;
......
......@@ -1444,6 +1444,7 @@ Dblqh::sendAddAttrReq(Signal* signal)
tupreq->notused1 = 0;
tupreq->attrId = attrId;
tupreq->attrDescriptor = entry.attrDescriptor;
tupreq->extTypeInfo = entry.extTypeInfo;
sendSignal(fragptr.p->tupBlockref, GSN_TUP_ADD_ATTRREQ,
signal, TupAddAttrReq::SignalLength, JBB);
return;
......@@ -7699,6 +7700,7 @@ void Dblqh::accScanConfScanLab(Signal* signal)
ndbrequire(sz == boundAiLength);
EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO,
signal, TuxBoundInfo::SignalLength + boundAiLength);
jamEntry();
if (req->errorCode != 0) {
jam();
/*
......
......@@ -22,26 +22,59 @@ class AttributeOffset {
private:
static void setOffset(Uint32 & desc, Uint32 offset);
static void setCharsetPos(Uint32 & desc, Uint32 offset);
static void setNullFlagPos(Uint32 & desc, Uint32 offset);
static Uint32 getOffset(const Uint32 &);
static bool getCharsetFlag(const Uint32 &);
static Uint32 getCharsetPos(const Uint32 &);
static Uint32 getNullFlagPos(const Uint32 &);
static Uint32 getNullFlagOffset(const Uint32 &);
static Uint32 getNullFlagBitOffset(const Uint32 &);
static bool isNULL(const Uint32 &, const Uint32 &);
};
#define AO_ATTRIBUTE_OFFSET_MASK (0xffff)
#define AO_NULL_FLAG_POS_MASK (0x7ff)
#define AO_NULL_FLAG_POS_SHIFT (21)
#define AO_NULL_FLAG_WORD_MASK (31)
#define AO_NULL_FLAG_OFFSET_SHIFT (5)
/**
* Allow for 4096 attributes, all nullable, and for 128 different
* character sets.
*
* a = Attribute offset - 11 bits 0-10 ( addr word in 8 kb )
* c = Has charset flag 1 bits 11-11
* s = Charset pointer position - 7 bits 12-18 ( in table descriptor )
* f = Null flag offset in word - 5 bits 20-24 ( address 32 bits )
* w = Null word offset - 7 bits 25-32 ( f+w addr 4096 attrs )
*
* 1111111111222222222233
* 01234567890123456789012345678901
* aaaaaaaaaaacsssssss fffffwwwwwww
*/
#define AO_ATTRIBUTE_OFFSET_SHIFT 0
#define AO_ATTRIBUTE_OFFSET_MASK 0x7ff
#define AO_CHARSET_FLAG_SHIFT 11
#define AO_CHARSET_POS_SHIFT 12
#define AO_CHARSET_POS_MASK 127
#define AO_NULL_FLAG_POS_MASK 0xfff // f+w
#define AO_NULL_FLAG_POS_SHIFT 20
#define AO_NULL_FLAG_WORD_MASK 31 // f
#define AO_NULL_FLAG_OFFSET_SHIFT 5
inline
void
AttributeOffset::setOffset(Uint32 & desc, Uint32 offset){
ASSERT_MAX(offset, AO_ATTRIBUTE_OFFSET_MASK, "AttributeOffset::setOffset");
desc |= offset;
desc |= (offset << AO_ATTRIBUTE_OFFSET_SHIFT);
}
inline
void
AttributeOffset::setCharsetPos(Uint32 & desc, Uint32 offset) {
ASSERT_MAX(offset, AO_CHARSET_POS_MASK, "AttributeOffset::setCharsetPos");
desc |= (1 << AO_CHARSET_FLAG_SHIFT);
desc |= (offset << AO_CHARSET_POS_SHIFT);
}
inline
......@@ -55,7 +88,21 @@ inline
Uint32
AttributeOffset::getOffset(const Uint32 & desc)
{
return desc & AO_ATTRIBUTE_OFFSET_MASK;
return (desc >> AO_ATTRIBUTE_OFFSET_SHIFT) & AO_ATTRIBUTE_OFFSET_MASK;
}
inline
bool
AttributeOffset::getCharsetFlag(const Uint32 & desc)
{
return (desc >> AO_CHARSET_FLAG_SHIFT) & 1;
}
inline
Uint32
AttributeOffset::getCharsetPos(const Uint32 & desc)
{
return (desc >> AO_CHARSET_POS_SHIFT) & AO_CHARSET_POS_MASK;
}
inline
......
......@@ -502,6 +502,7 @@ struct Fragoperrec {
Uint32 attributeCount;
Uint32 freeNullBit;
Uint32 noOfNewAttrCount;
Uint32 charsetIndex;
BlockReference lqhBlockrefFrag;
};
typedef Ptr<Fragoperrec> FragoperrecPtr;
......@@ -785,6 +786,7 @@ struct Tablerec {
ReadFunction* readFunctionArray;
UpdateFunction* updateFunctionArray;
CHARSET_INFO** charsetArray;
Uint32 readKeyArray;
Uint32 tabDescriptor;
......@@ -796,6 +798,7 @@ struct Tablerec {
Uint16 tupheadsize;
Uint16 noOfAttr;
Uint16 noOfKeyAttr;
Uint16 noOfCharsets;
Uint16 noOfNewAttr;
Uint16 noOfNullAttr;
Uint16 noOfAttributeGroups;
......@@ -1001,17 +1004,20 @@ public:
void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node);
/*
* TUX reads primary table attributes for index keys. Input is
* attribute ids in AttributeHeader format. Output is pointers to
* attribute data within tuple or 0 for NULL value.
* TUX reads primary table attributes for index keys. Tuple is
* specified by location of original tuple and version number. Input
* is attribute ids in AttributeHeader format. Output is attribute
* data with headers. Uses readAttributes with xfrm option set.
* Returns number of words or negative (-terrorCode) on error.
*/
void tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData);
int tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut);
/*
* TUX reads primary key without headers into an array of words. Used
* for md5 summing and when returning keyinfo.
* for md5 summing and when returning keyinfo. Returns number of
* words or negative (-terrorCode) on error.
*/
void tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pkSize, Uint32* pkData);
int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut);
/*
* TUX checks if tuple is visible to scan.
......@@ -1365,10 +1371,11 @@ private:
//------------------------------------------------------------------
int readAttributes(Page* const pagePtr,
Uint32 TupHeadOffset,
Uint32* inBuffer,
const Uint32* inBuffer,
Uint32 inBufLen,
Uint32* outBuffer,
Uint32 TmaxRead);
Uint32 TmaxRead,
bool xfrmFlag);
//------------------------------------------------------------------
//------------------------------------------------------------------
......@@ -1614,6 +1621,20 @@ private:
Uint32 attrDescriptor,
Uint32 attrDes2);
// *****************************************************************
// Read char routines optionally (tXfrmFlag) apply strxfrm
// *****************************************************************
bool readCharNotNULL(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
bool readCharNULLable(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
//------------------------------------------------------------------
//------------------------------------------------------------------
bool nullFlagCheck(Uint32 attrDes2);
......@@ -1909,7 +1930,8 @@ private:
void updatePackedList(Signal* signal, Uint16 ahostIndex);
void setUpDescriptorReferences(Uint32 descriptorReference,
Tablerec* const regTabPtr);
Tablerec* const regTabPtr,
const Uint32* offset);
void setUpKeyArray(Tablerec* const regTabPtr);
bool addfragtotab(Tablerec* const regTabPtr, Uint32 fragId, Uint32 fragIndex);
void deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId);
......@@ -2098,7 +2120,8 @@ private:
//-----------------------------------------------------------------------------
// Public methods
Uint32 allocTabDescr(Uint32 noOfAttributes, Uint32 noOfKeyAttr, Uint32 noOfAttributeGroups);
Uint32 getTabDescrOffsets(const Tablerec* regTabPtr, Uint32* offset);
Uint32 allocTabDescr(const Tablerec* regTabPtr, Uint32* offset);
void freeTabDescr(Uint32 retRef, Uint32 retNo);
Uint32 getTabDescrWord(Uint32 index);
void setTabDescrWord(Uint32 index, Uint32 word);
......@@ -2217,6 +2240,7 @@ private:
Uint32 tMaxRead;
Uint32 tOutBufIndex;
Uint32* tTupleHeader;
bool tXfrmFlag;
// updateAttributes module
Uint32 tInBufIndex;
......
......@@ -903,7 +903,8 @@ int Dbtup::handleReadReq(Signal* signal,
&cinBuffer[0],
regOperPtr->attrinbufLen,
dst,
dstLen);
dstLen,
false);
if (TnoOfDataRead != (Uint32)-1) {
/* ------------------------------------------------------------------------- */
// We have read all data into coutBuffer. Now send it to the API.
......@@ -1274,7 +1275,8 @@ int Dbtup::interpreterStartLab(Signal* signal,
&cinBuffer[5],
RinitReadLen,
&dst[0],
dstLen);
dstLen,
false);
if (TnoDataRW != (Uint32)-1) {
RattroutCounter = TnoDataRW;
RinstructionCounter += RinitReadLen;
......@@ -1347,7 +1349,8 @@ int Dbtup::interpreterStartLab(Signal* signal,
&cinBuffer[RinstructionCounter],
RfinalRLen,
&dst[RattroutCounter],
(dstLen - RattroutCounter));
(dstLen - RattroutCounter),
false);
if (TnoDataRW != (Uint32)-1) {
RattroutCounter += TnoDataRW;
} else {
......@@ -1487,7 +1490,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
&theAttrinfo,
(Uint32)1,
&TregMemBuffer[theRegister],
(Uint32)3);
(Uint32)3,
false);
if (TnoDataRW == 2) {
/* ------------------------------------------------------------- */
// Two words read means that we get the instruction plus one 32
......@@ -1833,7 +1837,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
Int32 TnoDataR = readAttributes(pagePtr,
TupHeadOffset,
&attrId, 1,
tmpArea, tmpAreaSz);
tmpArea, tmpAreaSz,
false);
if (TnoDataR == -1) {
jam();
......@@ -1929,7 +1934,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
Int32 TnoDataR = readAttributes(pagePtr,
TupHeadOffset,
&attrId, 1,
tmpArea, tmpAreaSz);
tmpArea, tmpAreaSz,
false);
if (TnoDataR == -1) {
jam();
......@@ -1957,7 +1963,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
Int32 TnoDataR = readAttributes(pagePtr,
TupHeadOffset,
&attrId, 1,
tmpArea, tmpAreaSz);
tmpArea, tmpAreaSz,
false);
if (TnoDataR == -1) {
jam();
......
......@@ -1067,6 +1067,7 @@ Dbtup::initTab(Tablerec* const regTabPtr)
}//for
regTabPtr->readFunctionArray = NULL;
regTabPtr->updateFunctionArray = NULL;
regTabPtr->charsetArray = NULL;
regTabPtr->tabDescriptor = RNIL;
regTabPtr->attributeGroupDescriptor = RNIL;
......
......@@ -112,10 +112,11 @@ Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& no
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
}
void
Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData)
int
Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut)
{
ljamEntry();
// use own variables instead of globals
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
......@@ -134,6 +135,7 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu
while (true) {
ptrCheckGuard(opPtr, cnoOfOprec, operationrec);
if (opPtr.p->realPageIdC != RNIL) {
// update page and offset
pagePtr.i = opPtr.p->realPageIdC;
pageOffset = opPtr.p->pageOffsetC;
ptrCheckGuard(pagePtr, cnoOfPage, page);
......@@ -147,33 +149,34 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu
ndbrequire(++loopGuard < (1 << ZTUP_VERSION_BITS));
}
}
const Uint32 tabDescriptor = tablePtr.p->tabDescriptor;
const Uint32* tupleHeader = &pagePtr.p->pageWord[pageOffset];
for (Uint32 i = 0; i < numAttrs; i++) {
AttributeHeader ah(attrIds[i]);
const Uint32 attrId = ah.getAttributeId();
const Uint32 index = tabDescriptor + (attrId << ZAD_LOG_SIZE);
const Uint32 desc1 = tableDescriptor[index].tabDescr;
const Uint32 desc2 = tableDescriptor[index + 1].tabDescr;
if (AttributeDescriptor::getNullable(desc1)) {
Uint32 offset = AttributeOffset::getNullFlagOffset(desc2);
ndbrequire(offset < tablePtr.p->tupNullWords);
offset += tablePtr.p->tupNullIndex;
ndbrequire(offset < tablePtr.p->tupheadsize);
if (AttributeOffset::isNULL(tupleHeader[offset], desc2)) {
ljam();
attrData[i] = 0;
continue;
}
}
attrData[i] = tupleHeader + AttributeOffset::getOffset(desc2);
// read key attributes from found tuple version
// save globals
TablerecPtr tabptr_old = tabptr;
FragrecordPtr fragptr_old = fragptr;
OperationrecPtr operPtr_old = operPtr;
// new globals
tabptr = tablePtr;
fragptr = fragPtr;
operPtr.i = RNIL;
operPtr.p = NULL;
// do it
int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true);
// restore globals
tabptr = tabptr_old;
fragptr = fragptr_old;
operPtr = operPtr_old;
// done
if (ret == (Uint32)-1) {
ret = terrorCode ? (-(int)terrorCode) : -1;
}
return ret;
}
void
Dbtup::tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pkSize, Uint32* pkData)
int
Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut)
{
ljamEntry();
// use own variables instead of globals
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
......@@ -184,25 +187,45 @@ Dbtup::tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pk
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
const Uint32 tabDescriptor = tablePtr.p->tabDescriptor;
const Uint32 numAttrs = tablePtr.p->noOfKeyAttr;
const Uint32* attrIds = &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
const Uint32* tupleHeader = &pagePtr.p->pageWord[pageOffset];
Uint32 size = 0;
for (Uint32 i = 0; i < numAttrs; i++) {
AttributeHeader ah(attrIds[i]);
const Uint32 attrId = ah.getAttributeId();
const Uint32 index = tabDescriptor + (attrId << ZAD_LOG_SIZE);
const Uint32 desc1 = tableDescriptor[index].tabDescr;
const Uint32 desc2 = tableDescriptor[index + 1].tabDescr;
ndbrequire(! AttributeDescriptor::getNullable(desc1));
const Uint32 attrSize = AttributeDescriptor::getSizeInWords(desc1);
const Uint32* attrData = tupleHeader + AttributeOffset::getOffset(desc2);
for (Uint32 j = 0; j < attrSize; j++) {
pkData[size + j] = attrData[j];
const Uint32 numAttrs = tablePtr.p->noOfKeyAttr;
// read pk attributes from original tuple
// save globals
TablerecPtr tabptr_old = tabptr;
FragrecordPtr fragptr_old = fragptr;
OperationrecPtr operPtr_old = operPtr;
// new globals
tabptr = tablePtr;
fragptr = fragPtr;
operPtr.i = RNIL;
operPtr.p = NULL;
// do it
int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true);
// restore globals
tabptr = tabptr_old;
fragptr = fragptr_old;
operPtr = operPtr_old;
// done
if (ret != (Uint32)-1) {
// remove headers
Uint32 n = 0;
Uint32 i = 0;
while (n < numAttrs) {
const AttributeHeader ah(dataOut[i]);
Uint32 size = ah.getDataSize();
ndbrequire(size != 0);
for (Uint32 j = 0; j < size; j++) {
dataOut[i + j - n] = dataOut[i + j + 1];
}
n += 1;
i += 1 + size;
}
size += attrSize;
ndbrequire(i == ret);
ret -= numAttrs;
} else {
ret = terrorCode ? (-(int)terrorCode) : -1;
}
*pkSize = size;
return ret;
}
bool
......
......@@ -20,12 +20,14 @@
#include <RefConvert.hpp>
#include <ndb_limits.h>
#include <pc.hpp>
#include <signaldata/TupFrag.hpp>
#include <signaldata/FsConf.hpp>
#include <signaldata/FsRemoveReq.hpp>
#include <signaldata/DropTab.hpp>
#include <signaldata/AlterTab.hpp>
#include <AttributeDescriptor.hpp>
#include "AttributeOffset.hpp"
#include <my_sys.h>
#define ljam() { jamLine(20000 + __LINE__); }
#define ljamEntry() { jamEntryLine(20000 + __LINE__); }
......@@ -52,7 +54,10 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
/* Uint32 schemaVersion = signal->theData[8];*/
Uint32 noOfKeyAttr = signal->theData[9];
Uint32 noOfNewAttr = signal->theData[10];
Uint32 noOfNewAttr = (signal->theData[10] & 0xFFFF);
/* DICT sends number of character sets in upper half */
Uint32 noOfCharsets = (signal->theData[10] >> 16);
Uint32 checksumIndicator = signal->theData[11];
Uint32 noOfAttributeGroups = signal->theData[12];
Uint32 globalCheckpointIdIndicator = signal->theData[13];
......@@ -75,6 +80,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
fragOperPtr.p->attributeCount = noOfAttributes;
fragOperPtr.p->freeNullBit = noOfNullAttr;
fragOperPtr.p->noOfNewAttrCount = noOfNewAttr;
fragOperPtr.p->charsetIndex = 0;
ndbrequire(reqinfo == ZADDFRAG);
......@@ -156,6 +162,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regTabPtr.p->tupheadsize = regTabPtr.p->tupGCPIndex;
regTabPtr.p->noOfKeyAttr = noOfKeyAttr;
regTabPtr.p->noOfCharsets = noOfCharsets;
regTabPtr.p->noOfAttr = noOfAttributes;
regTabPtr.p->noOfNewAttr = noOfNewAttr;
regTabPtr.p->noOfNullAttr = noOfNullAttr;
......@@ -163,13 +170,14 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regTabPtr.p->notNullAttributeMask.clear();
Uint32 tableDescriptorRef = allocTabDescr(noOfAttributes, noOfKeyAttr, noOfAttributeGroups);
Uint32 offset[10];
Uint32 tableDescriptorRef = allocTabDescr(regTabPtr.p, offset);
if (tableDescriptorRef == RNIL) {
ljam();
fragrefuse4Lab(signal, fragOperPtr, regFragPtr, regTabPtr.p, fragId);
return;
}//if
setUpDescriptorReferences(tableDescriptorRef, regTabPtr.p);
setUpDescriptorReferences(tableDescriptorRef, regTabPtr.p, offset);
} else {
ljam();
fragOperPtr.p->definingFragment = false;
......@@ -251,6 +259,9 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec);
Uint32 attrId = signal->theData[2];
Uint32 attrDescriptor = signal->theData[3];
// DICT sends extended type (ignored) and charset number
Uint32 extType = (signal->theData[4] & 0xFF);
Uint32 csNumber = (signal->theData[4] >> 16);
regTabPtr.i = fragOperPtr.p->tableidFrag;
ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
......@@ -304,6 +315,29 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
} else {
ndbrequire(false);
}//if
if (csNumber != 0) {
CHARSET_INFO* cs = get_charset(csNumber, MYF(0));
if (cs == NULL) {
ljam();
terrorCode = TupAddAttrRef::InvalidCharset;
addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId);
return;
}
Uint32 i = 0;
while (i < fragOperPtr.p->charsetIndex) {
ljam();
if (regTabPtr.p->charsetArray[i] == cs)
break;
i++;
}
if (i == fragOperPtr.p->charsetIndex) {
ljam();
fragOperPtr.p->charsetIndex++;
}
ndbrequire(i < regTabPtr.p->noOfCharsets);
regTabPtr.p->charsetArray[i] = cs;
AttributeOffset::setCharsetPos(attrDes2, i);
}
setTabDescrWord(firstTabDesIndex + 1, attrDes2);
if (regTabPtr.p->tupheadsize > MAX_TUPLE_SIZE_IN_WORDS) {
......@@ -340,20 +374,28 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
return;
}//Dbtup::execTUP_ADD_ATTRREQ()
/*
* Descriptor has these parts:
*
* 0 readFunctionArray ( one for each attribute )
* 1 updateFunctionArray ( ditto )
* 2 charsetArray ( pointers to distinct CHARSET_INFO )
* 3 readKeyArray ( attribute ids of keys )
* 4 attributeGroupDescriptor ( currently size 1 but unused )
* 5 tabDescriptor ( attribute descriptors, each ZAD_SIZE )
*/
void Dbtup::setUpDescriptorReferences(Uint32 descriptorReference,
Tablerec* const regTabPtr)
Tablerec* const regTabPtr,
const Uint32* offset)
{
Uint32 noOfAttributes = regTabPtr->noOfAttr;
descriptorReference += ZTD_SIZE;
ReadFunction * tmp = (ReadFunction*)&tableDescriptor[descriptorReference].tabDescr;
regTabPtr->readFunctionArray = tmp;
regTabPtr->updateFunctionArray = (UpdateFunction*)(tmp + noOfAttributes);
TableDescriptor * start = &tableDescriptor[descriptorReference];
TableDescriptor * end = (TableDescriptor*)(tmp + 2 * noOfAttributes);
regTabPtr->readKeyArray = descriptorReference + (end - start);
regTabPtr->attributeGroupDescriptor = regTabPtr->readKeyArray + regTabPtr->noOfKeyAttr;
regTabPtr->tabDescriptor = regTabPtr->attributeGroupDescriptor + regTabPtr->noOfAttributeGroups;
Uint32* desc = &tableDescriptor[descriptorReference].tabDescr;
regTabPtr->readFunctionArray = (ReadFunction*)(desc + offset[0]);
regTabPtr->updateFunctionArray = (UpdateFunction*)(desc + offset[1]);
regTabPtr->charsetArray = (CHARSET_INFO**)(desc + offset[2]);
regTabPtr->readKeyArray = descriptorReference + offset[3];
regTabPtr->attributeGroupDescriptor = descriptorReference + offset[4];
regTabPtr->tabDescriptor = descriptorReference + offset[5];
}//Dbtup::setUpDescriptorReferences()
Uint32
......@@ -491,14 +533,18 @@ void Dbtup::releaseTabDescr(Tablerec* const regTabPtr)
Uint32 descriptor = regTabPtr->readKeyArray;
if (descriptor != RNIL) {
ljam();
Uint32 offset[10];
getTabDescrOffsets(regTabPtr, offset);
regTabPtr->tabDescriptor = RNIL;
regTabPtr->readKeyArray = RNIL;
regTabPtr->readFunctionArray = NULL;
regTabPtr->updateFunctionArray = NULL;
regTabPtr->charsetArray = NULL;
regTabPtr->attributeGroupDescriptor= RNIL;
Uint32 sizeFunctionArrays = 2 * (regTabPtr->noOfAttr * sizeOfReadFunction());
descriptor -= (sizeFunctionArrays + ZTD_SIZE);
// move to start of descriptor
descriptor -= offset[3];
Uint32 retNo = getTabDescrWord(descriptor + ZTD_DATASIZE);
ndbrequire(getTabDescrWord(descriptor + ZTD_HEADER) == ZTD_TYPE_NORMAL);
ndbrequire(retNo == getTabDescrWord((descriptor + retNo) - ZTD_TR_SIZE));
......
......@@ -35,6 +35,7 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
for (Uint32 i = 0; i < regTabPtr->noOfAttr; i++) {
Uint32 attrDescriptorStart = startDescriptor + (i << ZAD_LOG_SIZE);
Uint32 attrDescriptor = tableDescriptor[attrDescriptorStart].tabDescr;
Uint32 attrOffset = tableDescriptor[attrDescriptorStart + 1].tabDescr;
if (!AttributeDescriptor::getDynamic(attrDescriptor)) {
if ((AttributeDescriptor::getArrayType(attrDescriptor) == ZNON_ARRAY) ||
(AttributeDescriptor::getArrayType(attrDescriptor) == ZFIXED_ARRAY)) {
......@@ -54,6 +55,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
} else {
ndbrequire(false);
}//if
// replace read function of char attribute
if (AttributeOffset::getCharsetFlag(attrOffset)) {
ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL;
}
} else {
if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) {
ljam();
......@@ -72,6 +78,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHZeroWordNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
}//if
// replace read function of char attribute
if (AttributeOffset::getCharsetFlag(attrOffset)) {
ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readCharNULLable;
}
}//if
} else if (AttributeDescriptor::getArrayType(attrDescriptor) == ZVAR_ARRAY) {
if (!AttributeDescriptor::getNullable(attrDescriptor)) {
......@@ -146,10 +157,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
/* ---------------------------------------------------------------- */
int Dbtup::readAttributes(Page* const pagePtr,
Uint32 tupHeadOffset,
Uint32* inBuffer,
const Uint32* inBuffer,
Uint32 inBufLen,
Uint32* outBuffer,
Uint32 maxRead)
Uint32 maxRead,
bool xfrmFlag)
{
Tablerec* const regTabPtr = tabptr.p;
Uint32 numAttributes = regTabPtr->noOfAttr;
......@@ -162,6 +174,7 @@ int Dbtup::readAttributes(Page* const pagePtr,
tCheckOffset = regTabPtr->tupheadsize;
tMaxRead = maxRead;
tTupleHeader = &pagePtr->pageWord[tupHeadOffset];
tXfrmFlag = xfrmFlag;
ndbrequire(tupHeadOffset + tCheckOffset <= ZWORDS_ON_PAGE);
while (inBufIndex < inBufLen) {
......@@ -542,6 +555,74 @@ Dbtup::readDynSmallVarSize(Uint32* outBuffer,
return false;
}//Dbtup::readDynSmallVarSize()
bool
Dbtup::readCharNotNULL(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
Uint32 indexBuf = tOutBufIndex;
Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
Uint32 newIndexBuf = indexBuf + attrNoOfWords;
Uint32 maxRead = tMaxRead;
ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset);
if (newIndexBuf <= maxRead) {
ljam();
ahOut->setDataSize(attrNoOfWords);
if (! tXfrmFlag) {
MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
&tTupleHeader[readOffset],
attrNoOfWords);
} else {
ljam();
Tablerec* regTabPtr = tabptr.p;
Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
ndbrequire(i < tabptr.p->noOfCharsets);
// not const in MySQL
CHARSET_INFO* cs = tabptr.p->charsetArray[i];
// XXX should strip Uint32 null padding
const unsigned nBytes = attrNoOfWords << 2;
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)&outBuffer[indexBuf],
nBytes,
(const uchar*)&tTupleHeader[readOffset],
nBytes);
// pad with ascii spaces
while (n < nBytes)
((uchar*)&outBuffer[indexBuf])[n++] = 0x20;
}
tOutBufIndex = newIndexBuf;
return true;
} else {
ljam();
terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
return false;
}
}
bool
Dbtup::readCharNULLable(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
if (!nullFlagCheck(attrDes2)) {
ljam();
return readCharNotNULL(outBuffer,
ahOut,
attrDescriptor,
attrDes2);
} else {
ljam();
ahOut->setNULL();
return true;
}
}
/* ---------------------------------------------------------------------- */
/* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */
/* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */
......
......@@ -31,12 +31,33 @@
/* memory attached to fragments (could be allocated per table */
/* instead. Performs its task by a buddy algorithm. */
/* **************************************************************** */
Uint32 Dbtup::allocTabDescr(Uint32 noOfAttributes, Uint32 noOfKeyAttr, Uint32 noOfAttributeGroups)
Uint32
Dbtup::getTabDescrOffsets(const Tablerec* regTabPtr, Uint32* offset)
{
// belongs to configure.in
unsigned sizeOfPointer = sizeof(CHARSET_INFO*);
ndbrequire((sizeOfPointer & 0x3) == 0);
sizeOfPointer = (sizeOfPointer >> 2);
// do in layout order and return offsets (see DbtupMeta.cpp)
Uint32 allocSize = 0;
// magically aligned to 8 bytes
offset[0] = allocSize += ZTD_SIZE;
offset[1] = allocSize += regTabPtr->noOfAttr * sizeOfReadFunction();
offset[2] = allocSize += regTabPtr->noOfAttr * sizeOfReadFunction();
offset[3] = allocSize += regTabPtr->noOfCharsets * sizeOfPointer;
offset[4] = allocSize += regTabPtr->noOfKeyAttr;
offset[5] = allocSize += regTabPtr->noOfAttributeGroups;
allocSize += regTabPtr->noOfAttr * ZAD_SIZE;
allocSize += ZTD_TRAILER_SIZE;
// return number of words
return allocSize;
}
Uint32 Dbtup::allocTabDescr(const Tablerec* regTabPtr, Uint32* offset)
{
Uint32 reference = RNIL;
Uint32 allocSize = (ZTD_SIZE + ZTD_TRAILER_SIZE) + (noOfAttributes * ZAD_SIZE);
allocSize += noOfAttributeGroups;
allocSize += ((2 * noOfAttributes * sizeOfReadFunction()) + noOfKeyAttr);
Uint32 allocSize = getTabDescrOffsets(regTabPtr, offset);
/* ---------------------------------------------------------------- */
/* ALWAYS ALLOCATE A MULTIPLE OF 16 BYTES */
/* ---------------------------------------------------------------- */
......
......@@ -751,7 +751,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
&tableDescriptor[regTabPtr->readKeyArray].tabDescr,
regTabPtr->noOfKeyAttr,
keyBuffer,
ZATTR_BUFFER_SIZE);
ZATTR_BUFFER_SIZE,
true);
ndbrequire(noPrimKey != (Uint32)-1);
Uint32 numAttrsToRead;
......@@ -792,7 +793,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
&readBuffer[0],
numAttrsToRead,
mainBuffer,
ZATTR_BUFFER_SIZE);
ZATTR_BUFFER_SIZE,
true);
ndbrequire(noMainWords != (Uint32)-1);
} else {
ljam();
......@@ -816,7 +818,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
&readBuffer[0],
numAttrsToRead,
copyBuffer,
ZATTR_BUFFER_SIZE);
ZATTR_BUFFER_SIZE,
true);
ndbrequire(noCopyWords != (Uint32)-1);
if ((noMainWords == noCopyWords) &&
......
......@@ -162,11 +162,6 @@ private:
// AttributeHeader size is assumed to be 1 word
static const unsigned AttributeHeaderSize = 1;
/*
* Array of pointers to TUP table attributes. Always read-on|y.
*/
typedef const Uint32** TableData;
/*
* Logical tuple address, "local key". Identifies table tuples.
*/
......@@ -330,11 +325,15 @@ private:
/*
* Attribute metadata. Size must be multiple of word size.
*
* Prefix comparison of char data must use strxfrm and binary
* comparison. The charset is currently unused.
*/
struct DescAttr {
Uint32 m_attrDesc; // standard AttributeDescriptor
Uint16 m_primaryAttrId;
Uint16 m_typeId;
unsigned m_typeId : 6;
unsigned m_charset : 10;
};
static const unsigned DescAttrSize = sizeof(DescAttr) >> 2;
......@@ -553,9 +552,9 @@ private:
void execREAD_CONFIG_REQ(Signal* signal);
// utils
void setKeyAttrs(const Frag& frag);
void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData);
void readTablePk(const Frag& frag, TreeEnt ent, unsigned& pkSize, Data pkData);
void copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData);
void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
/*
* DbtuxMeta.cpp
......@@ -622,17 +621,15 @@ private:
/*
* DbtuxSearch.cpp
*/
void searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/*
* DbtuxCmp.cpp
*/
int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey);
int cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey);
/*
* DbtuxDebug.cpp
......@@ -679,17 +676,27 @@ private:
Uint32 c_typeOfStart;
/*
* Array of index key attribute ids in AttributeHeader format.
* Includes fixed attribute sizes. This is global data set at
* operation start and is not passed as a parameter.
* Global data set at operation start. Unpacked from index metadata.
* Not passed as parameter to methods. Invalid across timeslices.
*
* TODO inline all into index metadata
*/
// index key attr ids with sizes in AttributeHeader format
Data c_keyAttrs;
// buffer for search key data as pointers to TUP storage
TableData c_searchKey;
// pointers to index key comparison functions
NdbSqlUtil::Cmp** c_sqlCmp;
/*
* Other buffers used during the operation.
*/
// buffer for search key data with headers
Data c_searchKey;
// buffer for current entry key data as pointers to TUP storage
TableData c_entryKey;
// buffer for current entry key data with headers
Data c_entryKey;
// buffer for scan bounds and keyinfo (primary key)
Data c_dataBuffer;
......
......@@ -18,21 +18,24 @@
#include "Dbtux.hpp"
/*
* Search key vs node prefix.
* Search key vs node prefix or entry
*
* The comparison starts at given attribute position (in fact 0). The
* position is updated by number of equal initial attributes found. The
* prefix may be partial in which case CmpUnknown may be returned.
* The comparison starts at given attribute position. The position is
* updated by number of equal initial attributes found. The entry data
* may be partial in which case CmpUnknown may be returned.
*/
int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen)
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// number of words of attribute data left
unsigned len2 = maxlen;
// skip to right position in search key
searchKey += start;
// skip to right position in search key only
for (unsigned i = 0; i < start; i++) {
jam();
searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
}
int ret = 0;
while (start < numAttrs) {
if (len2 <= AttributeHeaderSize) {
......@@ -41,22 +44,21 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
break;
}
len2 -= AttributeHeaderSize;
if (*searchKey != 0) {
if (! searchKey.ah().isNULL()) {
if (! entryData.ah().isNULL()) {
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2);
len2 -= size2;
// compare
const Uint32* const p1 = *searchKey;
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
const Uint32* const p1 = &searchKey[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize];
ret = (*type.m_cmp)(p1, p2, size1, size2);
ret = (*cmp)(0, p1, p2, size1, size2);
if (ret != 0) {
jam();
break;
......@@ -75,7 +77,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
break;
}
}
searchKey += 1;
searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
entryData += AttributeHeaderSize + entryData.ah().getDataSize();
start++;
}
......@@ -83,60 +85,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
}
/*
* Search key vs tree entry.
*
* Start position is updated as in previous routine.
*/
int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// skip to right position
searchKey += start;
entryKey += start;
int ret = 0;
while (start < numAttrs) {
if (*searchKey != 0) {
if (*entryKey != 0) {
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
const Uint32* const p1 = *searchKey;
const Uint32* const p2 = *entryKey;
ret = (*type.m_cmp)(p1, p2, size1, size1);
if (ret != 0) {
jam();
break;
}
} else {
jam();
// not NULL > NULL
ret = +1;
break;
}
} else {
if (*entryKey != 0) {
jam();
// NULL < not NULL
ret = -1;
break;
}
}
searchKey += 1;
entryKey += 1;
start++;
}
return ret;
}
/*
* Scan bound vs node prefix.
* Scan bound vs node prefix or entry.
*
* Compare lower or upper bound and index attribute data. The attribute
* data may be partial in which case CmpUnknown may be returned.
......@@ -183,9 +132,8 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
jam();
// current attribute
const unsigned index = boundInfo.ah().getAttributeId();
ndbrequire(index < frag.m_numAttrs);
const DescAttr& descAttr = descEnt.m_descAttr[index];
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// full data size
const unsigned size1 = boundInfo.ah().getDataSize();
......@@ -193,9 +141,10 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
const unsigned size2 = min(size1, len2);
len2 -= size2;
// compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize];
int ret = (*type.m_cmp)(p1, p2, size1, size2);
int ret = (*cmp)(0, p1, p2, size1, size2);
if (ret != 0) {
jam();
return ret;
......@@ -244,72 +193,3 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
return +1;
}
}
/*
* Scan bound vs tree entry.
*/
int
Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey)
{
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// direction 0-lower 1-upper
ndbrequire(dir <= 1);
// initialize type to equality
unsigned type = 4;
while (boundCount != 0) {
// get and skip bound type
type = boundInfo[0];
boundInfo += 1;
if (! boundInfo.ah().isNULL()) {
if (*entryKey != 0) {
jam();
// current attribute
const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index];
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = *entryKey;
int ret = (*type.m_cmp)(p1, p2, size1, size1);
if (ret != 0) {
jam();
return ret;
}
} else {
jam();
// not NULL > NULL
return +1;
}
} else {
jam();
if (*entryKey != 0) {
jam();
// NULL < not NULL
return -1;
}
}
boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
entryKey += 1;
boundCount -= 1;
}
if (dir == 0) {
// lower bound
jam();
if (type == 1) {
jam();
return +1;
}
return -1;
} else {
// upper bound
jam();
if (type == 3) {
jam();
return -1;
}
return +1;
}
}
......@@ -207,14 +207,10 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
}
// check ordering within node
for (unsigned j = 1; j < node.getOccup(); j++) {
unsigned start = 0;
const TreeEnt ent1 = node.getEnt(j - 1);
const TreeEnt ent2 = node.getEnt(j);
if (j == 1) {
readKeyAttrs(frag, ent1, start, c_searchKey);
} else {
memcpy(c_searchKey, c_entryKey, frag.m_numAttrs << 2);
}
unsigned start = 0;
readKeyAttrs(frag, ent1, start, c_searchKey);
readKeyAttrs(frag, ent2, start, c_entryKey);
int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
if (ret == 0)
......
......@@ -16,8 +16,6 @@
#define DBTUX_GEN_CPP
#include "Dbtux.hpp"
#include <signaldata/TuxContinueB.hpp>
#include <signaldata/TuxContinueB.hpp>
Dbtux::Dbtux(const Configuration& conf) :
SimulatedBlock(DBTUX, conf),
......@@ -202,8 +200,9 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
}
// allocate buffers
c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes);
c_searchKey = (TableData)allocRecord("c_searchKey", sizeof(Uint32*), MaxIndexAttributes);
c_entryKey = (TableData)allocRecord("c_entryKey", sizeof(Uint32*), MaxIndexAttributes);
c_sqlCmp = (NdbSqlUtil::Cmp**)allocRecord("c_sqlCmp", sizeof(NdbSqlUtil::Cmp*), MaxIndexAttributes);
c_searchKey = (Uint32*)allocRecord("c_searchKey", sizeof(Uint32), MaxAttrDataSize);
c_entryKey = (Uint32*)allocRecord("c_entryKey", sizeof(Uint32), MaxAttrDataSize);
c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1);
// ack
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
......@@ -218,7 +217,8 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
void
Dbtux::setKeyAttrs(const Frag& frag)
{
Data keyAttrs = c_keyAttrs; // global
Data keyAttrs = c_keyAttrs; // global
NdbSqlUtil::Cmp** sqlCmp = c_sqlCmp; // global
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
for (unsigned i = 0; i < numAttrs; i++) {
......@@ -227,75 +227,71 @@ Dbtux::setKeyAttrs(const Frag& frag)
// set attr id and fixed size
keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size);
keyAttrs += 1;
// set comparison method pointer
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
ndbrequire(sqlType.m_cmp != 0);
*(sqlCmp++) = sqlType.m_cmp;
}
}
void
Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData)
Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
{
ConstData keyAttrs = c_keyAttrs; // global
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc;
const Uint32 tupVersion = ent.m_tupVersion;
ndbrequire(start < frag.m_numAttrs);
const unsigned numAttrs = frag.m_numAttrs - start;
// start applies to both keys and output data
const Uint32 numAttrs = frag.m_numAttrs - start;
// skip to start position in keyAttrs only
keyAttrs += start;
keyData += start;
c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, numAttrs, keyAttrs, keyData);
int ret = c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, keyAttrs, numAttrs, keyData);
jamEntry();
// TODO handle error
ndbrequire(ret > 0);
}
void
Dbtux::readTablePk(const Frag& frag, TreeEnt ent, unsigned& pkSize, Data pkData)
Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize)
{
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc;
Uint32 size = 0;
c_tup->tuxReadKeys(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, &size, pkData);
ndbrequire(size != 0);
pkSize = size;
int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, pkData);
jamEntry();
// TODO handle error
ndbrequire(ret > 0);
pkSize = ret;
}
/*
* Input is pointers to table attributes. Output is array of attribute
* data with headers. Copies whatever fits.
* Copy attribute data with headers. Input is all index key data.
* Copies whatever fits.
*/
void
Dbtux::copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2)
Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2)
{
ConstData keyAttrs = c_keyAttrs; // global
const unsigned numAttrs = frag.m_numAttrs;
unsigned n = frag.m_numAttrs;
unsigned len2 = maxlen2;
for (unsigned n = 0; n < numAttrs; n++) {
while (n != 0) {
jam();
const unsigned attrId = keyAttrs.ah().getAttributeId();
const unsigned dataSize = keyAttrs.ah().getDataSize();
const Uint32* const p1 = *data1;
if (p1 != 0) {
if (len2 == 0)
return;
data2.ah() = AttributeHeader(attrId, dataSize);
data2 += 1;
len2 -= 1;
unsigned n = dataSize;
for (unsigned i = 0; i < dataSize; i++) {
if (len2 == 0)
return;
*data2 = p1[i];
data2 += 1;
len2 -= 1;
}
} else {
const unsigned dataSize = data1.ah().getDataSize();
// copy header
if (len2 == 0)
return;
data2[0] = data1[0];
data1 += 1;
data2 += 1;
len2 -= 1;
// copy data
for (unsigned i = 0; i < dataSize; i++) {
if (len2 == 0)
return;
data2.ah() = AttributeHeader(attrId, 0);
data2.ah().setNULL();
data2 += 1;
data2[i] = data1[i];
len2 -= 1;
}
keyAttrs += 1;
data1 += 1;
data1 += dataSize;
data2 += dataSize;
n -= 1;
}
#ifdef VM_TRACE
memset(data2, DataFillByte, len2 << 2);
......
......@@ -178,19 +178,31 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
descAttr.m_attrDesc = req->attrDescriptor;
descAttr.m_primaryAttrId = req->primaryAttrId;
descAttr.m_typeId = req->extTypeInfo & 0xFF;
descAttr.m_charset = (req->extTypeInfo >> 16);
#ifdef VM_TRACE
if (debugFlags & DebugMeta) {
debugOut << "Add frag " << fragPtr.i << " attr " << attrId << " " << descAttr << endl;
}
#endif
// check if type is valid and has a comparison method
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
// check that type is valid and has a binary comparison method
const NdbSqlUtil::Type& type = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
if (type.m_typeId == NdbSqlUtil::Type::Undefined ||
type.m_cmp == 0) {
jam();
errorCode = TuxAddAttrRef::InvalidAttributeType;
break;
}
#ifdef dbtux_uses_charset
if (descAttr.m_charset != 0) {
CHARSET_INFO *cs = get_charset(descAttr.m_charset, MYF(0));
// here use the non-binary type
if (! NdbSqlUtil::usable_in_ordered_index(descAttr.m_typeId, cs)) {
jam();
errorCode = TuxAddAttrRef::InvalidCharset;
break;
}
}
#endif
if (indexPtr.p->m_numAttrs == fragOpPtr.p->m_numAttrsRecvd) {
jam();
// initialize tree header
......
......@@ -112,6 +112,7 @@ Dbtux::execACC_SCANREQ(Signal* signal)
void
Dbtux::execTUX_BOUND_INFO(Signal* signal)
{
jamEntry();
struct BoundInfo {
unsigned offset;
unsigned size;
......@@ -389,7 +390,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam();
const TreeEnt ent = scan.m_scanPos.m_ent;
// read tuple key
readTablePk(frag, ent, pkSize, pkData);
readTablePk(frag, ent, pkData, pkSize);
// get read lock or exclusive lock
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
......@@ -480,7 +481,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam();
if (pkSize == 0) {
jam();
readTablePk(frag, ent, pkSize, pkData);
readTablePk(frag, ent, pkData, pkSize);
}
}
// conf signal
......
......@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max
*/
void
Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
......@@ -144,7 +144,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt sear
* to it.
*/
void
Dbtux::searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
......
......@@ -83,7 +83,7 @@ optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct
[ case d: what happened to PK read performance? ]
[ case d: bug in testOIBasic killed PK read performance ]
optim 14 mc02/a 41 ms 60 ms 44 pct
mc02/b 46 ms 81 ms 73 pct
......@@ -91,5 +91,21 @@ optim 14 mc02/a 41 ms 60 ms 44 pct
mc02/d 242 ms 285 ms 17 pct
[ case b: do long keys suffer from many subroutine calls? ]
[ case d: bug in testOIBasic killed PK read performance ]
none mc02/a 35 ms 60 ms 71 pct
mc02/b 42 ms 75 ms 76 pct
mc02/c 5 ms 12 ms 106 pct
mc02/d 165 ms 238 ms 44 pct
[ johan re-installed mc02 as fedora gcc-3.3.2 ]
[ case c: table scan has improved... ]
charsets mc02/a 35 ms 60 ms 71 pct
mc02/b 42 ms 84 ms 97 pct
mc02/c 5 ms 12 ms 109 pct
mc02/d 190 ms 236 ms 23 pct
[ case b: TUX can no longer use pointers to TUP data ]
vim: set et:
......@@ -107,6 +107,9 @@ public:
/* Number of primary key attributes (should be computed) */
Uint16 noOfPrimkey;
/* Number of distinct character sets (computed) */
Uint16 noOfCharsets;
/* Length of primary key in words (should be computed) */
/* For ordered index this is tree node size in words */
Uint16 tupKeyLength;
......
......@@ -164,6 +164,7 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
Uint32 tData;
Uint32 tKeyInfoPosition;
const char* aValue = aValuePassed;
Uint32 xfrmData[1024];
Uint32 tempData[1024];
if ((theStatus == OperationDefined) &&
......@@ -224,6 +225,21 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
m_theIndexDefined[i][2] = true;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
const char* aValueToWrite = aValue;
CHARSET_INFO* cs = tAttrInfo->m_cs;
if (cs != 0) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply == 1);
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, sizeInBytes);
while (n < sizeInBytes)
((uchar*)xfrmData)[n++] = 0x20;
aValue = (char*)xfrmData;
}
Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ;
Uint32 totalSizeInWords = (sizeInBytes + 3)/4;// Inc. bits in last word
Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word
......@@ -314,13 +330,20 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest)) {
if (!tAttrInfo->m_indexOnly){
// invalid data can crash kernel
if (cs != NULL &&
(*cs->cset->well_formed_len)(cs,
aValueToWrite,
aValueToWrite + sizeInBytes,
sizeInBytes) != sizeInBytes)
goto equal_error4;
Uint32 ahValue;
Uint32 sz = totalSizeInWords;
AttributeHeader::init(&ahValue, tAttrId, sz);
insertATTRINFO( ahValue );
insertATTRINFOloop((Uint32*)aValue, sizeInWords);
insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords);
if (bitsInLastWord != 0) {
tData = *(Uint32*)(aValue + (sizeInWords << 2));
tData = *(Uint32*)(aValueToWrite + (sizeInWords << 2));
tData = convertEndian(tData);
tData = tData & ((1 << bitsInLastWord) - 1);
tData = convertEndian(tData);
......@@ -411,7 +434,10 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
equal_error3:
setErrorCodeAbort(4209);
return -1;
equal_error4:
setErrorCodeAbort(744);
return -1;
}
......
......@@ -492,6 +492,17 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
// Insert Attribute Id into ATTRINFO part.
const Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
CHARSET_INFO* cs = tAttrInfo->m_cs;
// invalid data can crash kernel
if (cs != NULL &&
(*cs->cset->well_formed_len)(cs,
aValue,
aValue + sizeInBytes,
sizeInBytes) != sizeInBytes) {
setErrorCodeAbort(744);
return -1;
}
#if 0
tAttrSize = tAttrInfo->theAttrSize;
tArraySize = tAttrInfo->theArraySize;
......
......@@ -60,6 +60,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
Uint32 tData;
Uint32 tKeyInfoPosition;
const char* aValue = aValuePassed;
Uint32 xfrmData[1024];
Uint32 tempData[1024];
if ((theStatus == OperationDefined) &&
......@@ -117,6 +118,21 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
theTupleKeyDefined[i][2] = true;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
const char* aValueToWrite = aValue;
CHARSET_INFO* cs = tAttrInfo->m_cs;
if (cs != 0) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply == 1);
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, sizeInBytes);
while (n < sizeInBytes)
((uchar*)xfrmData)[n++] = 0x20;
aValue = (char*)xfrmData;
}
Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ;
Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Inc. bits in last word
Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word
......@@ -206,13 +222,20 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest)) {
if (!tAttrInfo->m_indexOnly){
// invalid data can crash kernel
if (cs != NULL &&
(*cs->cset->well_formed_len)(cs,
aValueToWrite,
aValueToWrite + sizeInBytes,
sizeInBytes) != sizeInBytes)
goto equal_error4;
Uint32 ahValue;
const Uint32 sz = totalSizeInWords;
AttributeHeader::init(&ahValue, tAttrId, sz);
insertATTRINFO( ahValue );
insertATTRINFOloop((Uint32*)aValue, sizeInWords);
insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords);
if (bitsInLastWord != 0) {
tData = *(Uint32*)(aValue + (sizeInWords << 2));
tData = *(Uint32*)(aValueToWrite + (sizeInWords << 2));
tData = convertEndian(tData);
tData = tData & ((1 << bitsInLastWord) - 1);
tData = convertEndian(tData);
......@@ -311,6 +334,10 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
equal_error3:
setErrorCodeAbort(4209);
return -1;
equal_error4:
setErrorCodeAbort(744);
return -1;
}
/******************************************************************************
......
......@@ -1096,30 +1096,43 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
theStatus == SetBound &&
(0 <= type && type <= 4) &&
len <= 8000) {
// bound type
// insert bound type
insertATTRINFO(type);
// attribute header
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
// normalize char bound
CHARSET_INFO* cs = tAttrInfo->m_cs;
Uint32 xfrmData[2000];
if (cs != NULL && aValue != NULL) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply == 1);
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, sizeInBytes);
while (n < sizeInBytes)
((uchar*)xfrmData)[n++] = 0x20;
aValue = (char*)xfrmData;
}
if (len != sizeInBytes && (len != 0)) {
setErrorCodeAbort(4209);
return -1;
}
// insert attribute header
len = aValue != NULL ? sizeInBytes : 0;
Uint32 tIndexAttrId = tAttrInfo->m_attrId;
Uint32 sizeInWords = (len + 3) / 4;
AttributeHeader ah(tIndexAttrId, sizeInWords);
insertATTRINFO(ah.m_value);
if (len != 0) {
// attribute data
// insert attribute data
if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0)
insertATTRINFOloop((const Uint32*)aValue, sizeInWords);
else {
Uint32 temp[2000];
memcpy(temp, aValue, len);
Uint32 tempData[2000];
memcpy(tempData, aValue, len);
while ((len & 0x3) != 0)
((char*)temp)[len++] = 0;
insertATTRINFOloop(temp, sizeInWords);
((char*)tempData)[len++] = 0;
insertATTRINFOloop(tempData, sizeInWords);
}
}
......@@ -1206,11 +1219,11 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
if((r1_null ^ (unsigned)r2->isNULL())){
return (r1_null ? -1 : 1);
}
Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType;
const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4;
if(!r1_null){
const NdbSqlUtil::Type& t = NdbSqlUtil::getType(type);
int r = (*t.m_cmp)(d1, d2, size, size);
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_extType);
int r = (*sqlType.m_cmp)(col.m_cs, d1, d2, size, size);
if(r){
assert(r != NdbSqlUtil::CmpUnknown);
return r;
......
......@@ -282,7 +282,7 @@ ErrorBundle ErrorCodes[] = {
{ 741, SE, "Unsupported alter table" },
{ 742, SE, "Unsupported attribute type in index" },
{ 743, SE, "Unsupported character set in table or index" },
{ 744, SE, "Character conversion error" },
{ 744, SE, "Character string is invalid for given character set" },
{ 241, SE, "Invalid schema object version" },
{ 283, SE, "Table is being dropped" },
{ 284, SE, "Table not defined in transaction coordinator" },
......
......@@ -28,6 +28,7 @@
#include <NdbCondition.h>
#include <NdbThread.h>
#include <NdbTick.h>
#include <my_sys.h>
// options
......@@ -37,6 +38,8 @@ struct Opt {
const char* m_bound;
const char* m_case;
bool m_core;
const char* m_csname;
CHARSET_INFO* m_cs;
bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop;
......@@ -59,6 +62,8 @@ struct Opt {
m_bound("01234"),
m_case(0),
m_core(false),
m_csname("latin1_bin"),
m_cs(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4),
......@@ -94,6 +99,7 @@ printhelp()
<< " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl
<< " -case abc only given test cases (letters a-z)" << endl
<< " -core core dump on error [" << d.m_core << "]" << endl
<< " -csname S charset (collation) of non-pk char column [" << d.m_csname << "]" << endl
<< " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
<< " -fragtype T fragment type single/small/medium/large" << endl
<< " -index xyz only given index numbers (digits 1-9)" << endl
......@@ -983,6 +989,10 @@ createtable(Par par)
c.setLength(col.m_length);
c.setPrimaryKey(col.m_pk);
c.setNullable(col.m_nullable);
if (c.getCharset()) { // test if char type
if (! col.m_pk)
c.setCharset(par.m_cs);
}
t.addColumn(c);
}
con.m_dic = con.m_ndb->getDictionary();
......@@ -3149,6 +3159,10 @@ runtest(Par par)
LL1("start");
if (par.m_seed != 0)
srandom(par.m_seed);
assert(par.m_csname != 0);
CHARSET_INFO* cs;
CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
par.m_cs = cs;
Con con;
CHK(con.connect() == 0);
par.m_con = &con;
......@@ -3232,6 +3246,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
g_opt.m_core = true;
continue;
}
if (strcmp(arg, "-csname") == 0) {
if (++argv, --argc > 0) {
g_opt.m_csname = strdup(argv[0]);
continue;
}
}
if (strcmp(arg, "-dups") == 0) {
g_opt.m_dups = true;
continue;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment