Commit 0b5b2f86 authored by Sergei Golubchik's avatar Sergei Golubchik

Bug #25207522: INCORRECT ORDER-BY BEHAVIOR ON A PARTITIONED TABLE WITH A COMPOSITE PREFIX INDEX

Fix prefix key comparison in partitioning. Comparions must
take into account no more than prefix_len characters.

It used to compare prefix_len*mbmaxlen bytes.
parent d5970779
...@@ -2756,5 +2756,45 @@ SELECT 1 FROM t1 WHERE a XOR 'a'; ...@@ -2756,5 +2756,45 @@ SELECT 1 FROM t1 WHERE a XOR 'a';
1 1
DROP TABLE t1; DROP TABLE t1;
# #
# Bug #25207522: INCORRECT ORDER-BY BEHAVIOR ON A PARTITIONED TABLE
# WITH A COMPOSITE PREFIX INDEX
#
create table t1(id int unsigned not null,
data varchar(2) default null,
key data_idx (data(1),id)
) default charset=utf8
partition by range (id) (
partition p10 values less than (10),
partition p20 values less than (20)
);
insert t1 values (6, 'ab'), (4, 'ab'), (5, 'ab'), (16, 'ab'), (14, 'ab'), (15, 'ab'), (5, 'ac'), (15, 'aa') ;
select id from t1 where data = 'ab' order by id;
id
4
5
6
14
15
16
drop table t1;
create table t1(id int unsigned not null,
data text default null,
key data_idx (data(1),id)
) default charset=utf8
partition by range (id) (
partition p10 values less than (10),
partition p20 values less than (20)
);
insert t1 values (6, 'ab'), (4, 'ab'), (5, 'ab'), (16, 'ab'), (14, 'ab'), (15, 'ab'), (5, 'ac'), (15, 'aa') ;
select id from t1 where data = 'ab' order by id;
id
4
5
6
14
15
16
drop table t1;
#
# End of 10.1 tests # End of 10.1 tests
# #
...@@ -2970,6 +2970,34 @@ CREATE TABLE t1(a BINARY(80)) PARTITION BY KEY(a) PARTITIONS 3; ...@@ -2970,6 +2970,34 @@ CREATE TABLE t1(a BINARY(80)) PARTITION BY KEY(a) PARTITIONS 3;
SELECT 1 FROM t1 WHERE a XOR 'a'; SELECT 1 FROM t1 WHERE a XOR 'a';
DROP TABLE t1; DROP TABLE t1;
--echo #
--echo # Bug #25207522: INCORRECT ORDER-BY BEHAVIOR ON A PARTITIONED TABLE
--echo # WITH A COMPOSITE PREFIX INDEX
--echo #
create table t1(id int unsigned not null,
data varchar(2) default null,
key data_idx (data(1),id)
) default charset=utf8
partition by range (id) (
partition p10 values less than (10),
partition p20 values less than (20)
);
insert t1 values (6, 'ab'), (4, 'ab'), (5, 'ab'), (16, 'ab'), (14, 'ab'), (15, 'ab'), (5, 'ac'), (15, 'aa') ;
select id from t1 where data = 'ab' order by id;
drop table t1;
create table t1(id int unsigned not null,
data text default null,
key data_idx (data(1),id)
) default charset=utf8
partition by range (id) (
partition p10 values less than (10),
partition p20 values less than (20)
);
insert t1 values (6, 'ab'), (4, 'ab'), (5, 'ab'), (16, 'ab'), (14, 'ab'), (15, 'ab'), (5, 'ac'), (15, 'aa') ;
select id from t1 where data = 'ab' order by id;
drop table t1;
--echo # --echo #
--echo # End of 10.1 tests --echo # End of 10.1 tests
--echo # --echo #
...@@ -7537,8 +7537,7 @@ my_decimal *Field_varstring::val_decimal(my_decimal *decimal_value) ...@@ -7537,8 +7537,7 @@ my_decimal *Field_varstring::val_decimal(my_decimal *decimal_value)
} }
int Field_varstring::cmp_max(const uchar *a_ptr, const uchar *b_ptr, int Field_varstring::cmp(const uchar *a_ptr, const uchar *b_ptr)
uint max_len)
{ {
uint a_length, b_length; uint a_length, b_length;
int diff; int diff;
...@@ -7553,8 +7552,8 @@ int Field_varstring::cmp_max(const uchar *a_ptr, const uchar *b_ptr, ...@@ -7553,8 +7552,8 @@ int Field_varstring::cmp_max(const uchar *a_ptr, const uchar *b_ptr,
a_length= uint2korr(a_ptr); a_length= uint2korr(a_ptr);
b_length= uint2korr(b_ptr); b_length= uint2korr(b_ptr);
} }
set_if_smaller(a_length, max_len); set_if_smaller(a_length, field_length);
set_if_smaller(b_length, max_len); set_if_smaller(b_length, field_length);
diff= field_charset->coll->strnncollsp(field_charset, diff= field_charset->coll->strnncollsp(field_charset,
a_ptr+ a_ptr+
length_bytes, length_bytes,
...@@ -7566,6 +7565,43 @@ int Field_varstring::cmp_max(const uchar *a_ptr, const uchar *b_ptr, ...@@ -7566,6 +7565,43 @@ int Field_varstring::cmp_max(const uchar *a_ptr, const uchar *b_ptr,
} }
static int cmp_str_prefix(const uchar *ua, size_t alen, const uchar *ub,
size_t blen, size_t prefix, CHARSET_INFO *cs)
{
const char *a= (char*)ua, *b= (char*)ub;
MY_STRCOPY_STATUS status;
prefix/= cs->mbmaxlen;
alen= cs->cset->well_formed_char_length(cs, a, a + alen, prefix, &status);
blen= cs->cset->well_formed_char_length(cs, b, b + blen, prefix, &status);
return cs->coll->strnncollsp(cs, ua, alen, ub, blen, 0);
}
int Field_varstring::cmp_prefix(const uchar *a_ptr, const uchar *b_ptr,
size_t prefix_len)
{
/* avoid expensive well_formed_char_length if possible */
if (prefix_len == table->field[field_index]->field_length)
return Field_varstring::cmp(a_ptr, b_ptr);
size_t a_length, b_length;
if (length_bytes == 1)
{
a_length= *a_ptr;
b_length= *b_ptr;
}
else
{
a_length= uint2korr(a_ptr);
b_length= uint2korr(b_ptr);
}
return cmp_str_prefix(a_ptr+length_bytes, a_length, b_ptr+length_bytes,
b_length, prefix_len, field_charset);
}
/** /**
@note @note
varstring and blob keys are ALWAYS stored with a 2 byte length prefix varstring and blob keys are ALWAYS stored with a 2 byte length prefix
...@@ -8114,16 +8150,24 @@ int Field_blob::cmp(const uchar *a,uint32 a_length, const uchar *b, ...@@ -8114,16 +8150,24 @@ int Field_blob::cmp(const uchar *a,uint32 a_length, const uchar *b,
} }
int Field_blob::cmp_max(const uchar *a_ptr, const uchar *b_ptr, int Field_blob::cmp(const uchar *a_ptr, const uchar *b_ptr)
uint max_length) {
uchar *blob1,*blob2;
memcpy(&blob1, a_ptr+packlength, sizeof(char*));
memcpy(&blob2, b_ptr+packlength, sizeof(char*));
size_t a_len= get_length(a_ptr), b_len= get_length(b_ptr);
return cmp(blob1, a_len, blob2, b_len);
}
int Field_blob::cmp_prefix(const uchar *a_ptr, const uchar *b_ptr,
size_t prefix_len)
{ {
uchar *blob1,*blob2; uchar *blob1,*blob2;
memcpy(&blob1, a_ptr+packlength, sizeof(char*)); memcpy(&blob1, a_ptr+packlength, sizeof(char*));
memcpy(&blob2, b_ptr+packlength, sizeof(char*)); memcpy(&blob2, b_ptr+packlength, sizeof(char*));
uint a_len= get_length(a_ptr), b_len= get_length(b_ptr); size_t a_len= get_length(a_ptr), b_len= get_length(b_ptr);
set_if_smaller(a_len, max_length); return cmp_str_prefix(blob1, a_len, blob2, b_len, prefix_len, field_charset);
set_if_smaller(b_len, max_length);
return Field_blob::cmp(blob1,a_len,blob2,b_len);
} }
...@@ -9407,7 +9451,7 @@ my_decimal *Field_bit::val_decimal(my_decimal *deciaml_value) ...@@ -9407,7 +9451,7 @@ my_decimal *Field_bit::val_decimal(my_decimal *deciaml_value)
The a and b pointer must be pointers to the field in a record The a and b pointer must be pointers to the field in a record
(not the table->record[0] necessarily) (not the table->record[0] necessarily)
*/ */
int Field_bit::cmp_max(const uchar *a, const uchar *b, uint max_len) int Field_bit::cmp_prefix(const uchar *a, const uchar *b, size_t prefix_len)
{ {
my_ptrdiff_t a_diff= a - ptr; my_ptrdiff_t a_diff= a - ptr;
my_ptrdiff_t b_diff= b - ptr; my_ptrdiff_t b_diff= b - ptr;
......
...@@ -954,9 +954,13 @@ class Field: public Value_source ...@@ -954,9 +954,13 @@ class Field: public Value_source
return type(); return type();
} }
inline int cmp(const uchar *str) { return cmp(ptr,str); } inline int cmp(const uchar *str) { return cmp(ptr,str); }
virtual int cmp_max(const uchar *a, const uchar *b, uint max_len)
{ return cmp(a, b); }
virtual int cmp(const uchar *,const uchar *)=0; virtual int cmp(const uchar *,const uchar *)=0;
/*
The following method is used for comparing prefix keys.
Currently it's only used in partitioning.
*/
virtual int cmp_prefix(const uchar *a, const uchar *b, size_t prefix_len)
{ return cmp(a, b); }
virtual int cmp_binary(const uchar *a,const uchar *b, uint32 max_length=~0L) virtual int cmp_binary(const uchar *a,const uchar *b, uint32 max_length=~0L)
{ return memcmp(a,b,pack_length()); } { return memcmp(a,b,pack_length()); }
virtual int cmp_offset(uint row_offset) virtual int cmp_offset(uint row_offset)
...@@ -2991,11 +2995,8 @@ class Field_varstring :public Field_longstr { ...@@ -2991,11 +2995,8 @@ class Field_varstring :public Field_longstr {
longlong val_int(void); longlong val_int(void);
String *val_str(String*,String *); String *val_str(String*,String *);
my_decimal *val_decimal(my_decimal *); my_decimal *val_decimal(my_decimal *);
int cmp_max(const uchar *, const uchar *, uint max_length); int cmp(const uchar *a,const uchar *b);
int cmp(const uchar *a,const uchar *b) int cmp_prefix(const uchar *a, const uchar *b, size_t prefix_len);
{
return cmp_max(a, b, ~0L);
}
void sort_string(uchar *buff,uint length); void sort_string(uchar *buff,uint length);
uint get_key_image(uchar *buff,uint length, imagetype type); uint get_key_image(uchar *buff,uint length, imagetype type);
void set_key_image(const uchar *buff,uint length); void set_key_image(const uchar *buff,uint length);
...@@ -3077,9 +3078,8 @@ class Field_blob :public Field_longstr { ...@@ -3077,9 +3078,8 @@ class Field_blob :public Field_longstr {
longlong val_int(void); longlong val_int(void);
String *val_str(String*,String *); String *val_str(String*,String *);
my_decimal *val_decimal(my_decimal *); my_decimal *val_decimal(my_decimal *);
int cmp_max(const uchar *, const uchar *, uint max_length); int cmp(const uchar *a,const uchar *b);
int cmp(const uchar *a,const uchar *b) int cmp_prefix(const uchar *a, const uchar *b, size_t prefix_len);
{ return cmp_max(a, b, ~0L); }
int cmp(const uchar *a, uint32 a_length, const uchar *b, uint32 b_length); int cmp(const uchar *a, uint32 a_length, const uchar *b, uint32 b_length);
int cmp_binary(const uchar *a,const uchar *b, uint32 max_length=~0L); int cmp_binary(const uchar *a,const uchar *b, uint32 max_length=~0L);
int key_cmp(const uchar *,const uchar*); int key_cmp(const uchar *,const uchar*);
...@@ -3399,7 +3399,7 @@ class Field_bit :public Field { ...@@ -3399,7 +3399,7 @@ class Field_bit :public Field {
} }
int cmp_binary_offset(uint row_offset) int cmp_binary_offset(uint row_offset)
{ return cmp_offset(row_offset); } { return cmp_offset(row_offset); }
int cmp_max(const uchar *a, const uchar *b, uint max_length); int cmp_prefix(const uchar *a, const uchar *b, size_t prefix_len);
int key_cmp(const uchar *a, const uchar *b) int key_cmp(const uchar *a, const uchar *b)
{ return cmp_binary((uchar *) a, (uchar *) b); } { return cmp_binary((uchar *) a, (uchar *) b); }
int key_cmp(const uchar *str, uint length); int key_cmp(const uchar *str, uint length);
......
...@@ -623,8 +623,8 @@ int key_rec_cmp(void *key_p, uchar *first_rec, uchar *second_rec) ...@@ -623,8 +623,8 @@ int key_rec_cmp(void *key_p, uchar *first_rec, uchar *second_rec)
max length. The exceptions are the BLOB and VARCHAR field types max length. The exceptions are the BLOB and VARCHAR field types
that take the max length into account. that take the max length into account.
*/ */
if ((result= field->cmp_max(field->ptr+first_diff, field->ptr+sec_diff, if ((result= field->cmp_prefix(field->ptr+first_diff, field->ptr+sec_diff,
key_part->length))) key_part->length)))
DBUG_RETURN(result); DBUG_RETURN(result);
next_loop: next_loop:
key_part++; key_part++;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment