ndb - bug#25587

  fix not aligned or non 32-bit values in "smart" scan
parent a4570b09
...@@ -203,3 +203,51 @@ NODEGROUP PARTITION_NAME ...@@ -203,3 +203,51 @@ NODEGROUP PARTITION_NAME
0 p0 0 p0
0 p1 0 p1
DROP TABLE t1; DROP TABLE t1;
CREATE TABLE t1 (
a tinyint unsigned NOT NULL,
b bigint(20) unsigned NOT NULL,
c char(12),
PRIMARY KEY (a,b)
) ENGINE ndb DEFAULT CHARSET=latin1 PARTITION BY KEY (a);
insert into t1 values(1,1,'1'), (2,2,'2'), (3,3,'3'), (4,4,'4'), (5,5,'5');
select * from t1 where a = 1;
a b c
1 1 1
select * from t1 where a = 2;
a b c
2 2 2
select * from t1 where a = 3;
a b c
3 3 3
select * from t1 where a = 4;
a b c
4 4 4
select * from t1 where a = 5;
a b c
5 5 5
delete from t1 where a = 1;
select * from t1 order by 1;
a b c
2 2 2
3 3 3
4 4 4
5 5 5
delete from t1 where a = 2;
select * from t1 order by 1;
a b c
3 3 3
4 4 4
5 5 5
delete from t1 where a = 3;
select * from t1 order by 1;
a b c
4 4 4
5 5 5
delete from t1 where a = 4;
select * from t1 order by 1;
a b c
5 5 5
delete from t1 where a = 5;
select * from t1 order by 1;
a b c
drop table t1;
...@@ -199,3 +199,31 @@ ALTER TABLE t1 ADD COLUMN c4 INT AFTER c1; ...@@ -199,3 +199,31 @@ ALTER TABLE t1 ADD COLUMN c4 INT AFTER c1;
SELECT NODEGROUP,PARTITION_NAME FROM information_schema.partitions WHERE SELECT NODEGROUP,PARTITION_NAME FROM information_schema.partitions WHERE
table_name = "t1"; table_name = "t1";
DROP TABLE t1; DROP TABLE t1;
# bug#25587
CREATE TABLE t1 (
a tinyint unsigned NOT NULL,
b bigint(20) unsigned NOT NULL,
c char(12),
PRIMARY KEY (a,b)
) ENGINE ndb DEFAULT CHARSET=latin1 PARTITION BY KEY (a);
insert into t1 values(1,1,'1'), (2,2,'2'), (3,3,'3'), (4,4,'4'), (5,5,'5');
select * from t1 where a = 1;
select * from t1 where a = 2;
select * from t1 where a = 3;
select * from t1 where a = 4;
select * from t1 where a = 5;
delete from t1 where a = 1;
select * from t1 order by 1;
delete from t1 where a = 2;
select * from t1 order by 1;
delete from t1 where a = 3;
select * from t1 order by 1;
delete from t1 where a = 4;
select * from t1 order by 1;
delete from t1 where a = 5;
select * from t1 order by 1;
drop table t1;
...@@ -1188,25 +1188,31 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, ...@@ -1188,25 +1188,31 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
const bool nobytes = (len & 0x3) == 0; const bool nobytes = (len & 0x3) == 0;
const Uint32 totalLen = 2 + sizeInWords; const Uint32 totalLen = 2 + sizeInWords;
Uint32 tupKeyLen = theTupKeyLen; Uint32 tupKeyLen = theTupKeyLen;
union {
Uint32 tempData[2000];
Uint64 __align;
};
Uint64 *valPtr;
if(remaining > totalLen && aligned && nobytes){ if(remaining > totalLen && aligned && nobytes){
Uint32 * dst = theKEYINFOptr + currLen; Uint32 * dst = theKEYINFOptr + currLen;
* dst ++ = type; * dst ++ = type;
* dst ++ = ahValue; * dst ++ = ahValue;
memcpy(dst, aValue, 4 * sizeInWords); memcpy(dst, aValue, 4 * sizeInWords);
theTotalNrOfKeyWordInSignal = currLen + totalLen; theTotalNrOfKeyWordInSignal = currLen + totalLen;
valPtr = (Uint64*)aValue;
} else { } else {
if(!aligned || !nobytes){ if(!aligned || !nobytes){
Uint32 tempData[2000];
tempData[0] = type; tempData[0] = type;
tempData[1] = ahValue; tempData[1] = ahValue;
tempData[2 + (len >> 2)] = 0; tempData[2 + (len >> 2)] = 0;
memcpy(tempData+2, aValue, len); memcpy(tempData+2, aValue, len);
insertBOUNDS(tempData, 2+sizeInWords); insertBOUNDS(tempData, 2+sizeInWords);
valPtr = (Uint64*)(tempData+2);
} else { } else {
Uint32 buf[2] = { type, ahValue }; Uint32 buf[2] = { type, ahValue };
insertBOUNDS(buf, 2); insertBOUNDS(buf, 2);
insertBOUNDS((Uint32*)aValue, sizeInWords); insertBOUNDS((Uint32*)aValue, sizeInWords);
valPtr = (Uint64*)aValue;
} }
} }
theTupKeyLen = tupKeyLen + totalLen; theTupKeyLen = tupKeyLen + totalLen;
...@@ -1223,7 +1229,7 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, ...@@ -1223,7 +1229,7 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
if(type == BoundEQ && tDistrKey) if(type == BoundEQ && tDistrKey)
{ {
theNoOfTupKeyLeft--; theNoOfTupKeyLeft--;
return handle_distribution_key((Uint64*)aValue, sizeInWords); return handle_distribution_key(valPtr, sizeInWords);
} }
return 0; return 0;
} else { } else {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment