bug#15682 - ndb - invalid handling of varchar in position/rnd_pos

parent 57360b6b
...@@ -673,3 +673,19 @@ select * from atablewithareallylongandirritatingname; ...@@ -673,3 +673,19 @@ select * from atablewithareallylongandirritatingname;
a a
2 2
drop table atablewithareallylongandirritatingname; drop table atablewithareallylongandirritatingname;
create table t1 (f1 varchar(50), f2 text,f3 int, primary key(f1)) engine=NDB;
insert into t1 (f1,f2,f3)VALUES("111111","aaaaaa",1);
insert into t1 (f1,f2,f3)VALUES("222222","bbbbbb",2);
select * from t1 order by f1;
f1 f2 f3
111111 aaaaaa 1
222222 bbbbbb 2
select * from t1 order by f2;
f1 f2 f3
111111 aaaaaa 1
222222 bbbbbb 2
select * from t1 order by f3;
f1 f2 f3
111111 aaaaaa 1
222222 bbbbbb 2
drop table t1;
...@@ -615,3 +615,14 @@ create table atablewithareallylongandirritatingname (a int); ...@@ -615,3 +615,14 @@ create table atablewithareallylongandirritatingname (a int);
insert into atablewithareallylongandirritatingname values (2); insert into atablewithareallylongandirritatingname values (2);
select * from atablewithareallylongandirritatingname; select * from atablewithareallylongandirritatingname;
drop table atablewithareallylongandirritatingname; drop table atablewithareallylongandirritatingname;
#
# Bug#15682
#
create table t1 (f1 varchar(50), f2 text,f3 int, primary key(f1)) engine=NDB;
insert into t1 (f1,f2,f3)VALUES("111111","aaaaaa",1);
insert into t1 (f1,f2,f3)VALUES("222222","bbbbbb",2);
select * from t1 order by f1;
select * from t1 order by f2;
select * from t1 order by f3;
drop table t1;
...@@ -141,6 +141,8 @@ ...@@ -141,6 +141,8 @@
#define ZALREADYEXIST 630 #define ZALREADYEXIST 630
#define ZINCONSISTENTHASHINDEX 892 #define ZINCONSISTENTHASHINDEX 892
#define ZNOTUNIQUE 893 #define ZNOTUNIQUE 893
#define ZINVALID_KEY 290
#endif #endif
class Dbtc: public SimulatedBlock { class Dbtc: public SimulatedBlock {
......
...@@ -2313,7 +2313,10 @@ Dbtc::handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen, ...@@ -2313,7 +2313,10 @@ Dbtc::handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen,
{ {
keyPartLenPtr = keyPartLen; keyPartLenPtr = keyPartLen;
dstPos = xfrm_key(tabPtrI, src, dst, sizeof(Tmp) >> 2, keyPartLenPtr); dstPos = xfrm_key(tabPtrI, src, dst, sizeof(Tmp) >> 2, keyPartLenPtr);
ndbrequire(dstPos); if (unlikely(dstPos == 0))
{
goto error;
}
} }
else else
{ {
...@@ -2334,6 +2337,10 @@ Dbtc::handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen, ...@@ -2334,6 +2337,10 @@ Dbtc::handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen,
dstHash[1] = tmp[1]; dstHash[1] = tmp[1];
} }
return true; // success return true; // success
error:
terrorCode = ZINVALID_KEY;
return false;
} }
/* /*
...@@ -2941,8 +2948,16 @@ void Dbtc::tckeyreq050Lab(Signal* signal) ...@@ -2941,8 +2948,16 @@ void Dbtc::tckeyreq050Lab(Signal* signal)
UintR tnoOfStandby; UintR tnoOfStandby;
UintR tnodeinfo; UintR tnodeinfo;
terrorCode = 0;
hash(signal); /* NOW IT IS TIME TO CALCULATE THE HASH VALUE*/ hash(signal); /* NOW IT IS TIME TO CALCULATE THE HASH VALUE*/
if (unlikely(terrorCode))
{
releaseAtErrorLab(signal);
return;
}
CacheRecord * const regCachePtr = cachePtr.p; CacheRecord * const regCachePtr = cachePtr.p;
TcConnectRecord * const regTcPtr = tcConnectptr.p; TcConnectRecord * const regTcPtr = tcConnectptr.p;
ApiConnectRecord * const regApiPtr = apiConnectptr.p; ApiConnectRecord * const regApiPtr = apiConnectptr.p;
......
...@@ -1890,7 +1890,10 @@ SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src, ...@@ -1890,7 +1890,10 @@ SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src,
AttributeDescriptor::getType(keyAttr.attributeDescriptor); AttributeDescriptor::getType(keyAttr.attributeDescriptor);
Uint32 lb, len; Uint32 lb, len;
bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len); bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
ndbrequire(ok); if (unlikely(!ok))
{
return 0;
}
Uint32 xmul = cs->strxfrm_multiply; Uint32 xmul = cs->strxfrm_multiply;
if (xmul == 0) if (xmul == 0)
xmul = 1; xmul = 1;
...@@ -1902,7 +1905,10 @@ SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src, ...@@ -1902,7 +1905,10 @@ SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src,
Uint32 dstLen = xmul * (srcBytes - lb); Uint32 dstLen = xmul * (srcBytes - lb);
ndbrequire(dstLen <= ((dstSize - dstPos) << 2)); ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len); int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
ndbrequire(n != -1); if (unlikely(n == -1))
{
return 0;
}
while ((n & 3) != 0) while ((n & 3) != 0)
{ {
dstPtr[n++] = 0; dstPtr[n++] = 0;
......
...@@ -227,6 +227,7 @@ ErrorBundle ErrorCodes[] = { ...@@ -227,6 +227,7 @@ ErrorBundle ErrorCodes[] = {
{ 277, IE, "277" }, { 277, IE, "277" },
{ 278, IE, "278" }, { 278, IE, "278" },
{ 287, IE, "Index corrupted" }, { 287, IE, "Index corrupted" },
{ 290, IE, "Corrupt key in TC, unable to xfrm" },
{ 631, IE, "631" }, { 631, IE, "631" },
{ 632, IE, "632" }, { 632, IE, "632" },
{ 702, IE, "Request to non-master" }, { 702, IE, "Request to non-master" },
...@@ -295,7 +296,6 @@ ErrorBundle ErrorCodes[] = { ...@@ -295,7 +296,6 @@ ErrorBundle ErrorCodes[] = {
{ 4608, AE, "You can not takeOverScan unless you have used openScanExclusive"}, { 4608, AE, "You can not takeOverScan unless you have used openScanExclusive"},
{ 4609, AE, "You must call nextScanResult before trying to takeOverScan"}, { 4609, AE, "You must call nextScanResult before trying to takeOverScan"},
{ 4232, AE, "Parallelism can only be between 1 and 240" }, { 4232, AE, "Parallelism can only be between 1 and 240" },
{ 290, AE, "Scan not started or has been closed by kernel due to timeout" },
/** /**
* Event schema errors * Event schema errors
......
...@@ -2796,8 +2796,26 @@ void ha_ndbcluster::position(const byte *record) ...@@ -2796,8 +2796,26 @@ void ha_ndbcluster::position(const byte *record)
} }
*buff++= 0; *buff++= 0;
} }
memcpy(buff, record + key_part->offset, key_part->length);
buff += key_part->length; size_t len = key_part->length;
const byte * ptr = record + key_part->offset;
Field *field = key_part->field;
if ((field->type() == MYSQL_TYPE_VARCHAR) &&
((Field_varstring*)field)->length_bytes == 1)
{
/**
* Keys always use 2 bytes length
*/
buff[0] = ptr[0];
buff[1] = 0;
memcpy(buff+2, ptr + 1, len);
len += 2;
}
else
{
memcpy(buff, ptr, len);
}
buff += len;
} }
} }
else else
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment