Commit 577470e0 authored by Zardosht Kasheff's avatar Zardosht Kasheff

refs #94, for keys with strings, add a memcmp at the end of the comparison

function if we are doing comparisons in the fractal tree, so that case-insensitivities
get resolved. Comparisons done inside the handlerton are unaffected.
parent 9c47f41e
SET DEFAULT_STORAGE_ENGINE = 'tokudb';
DROP TABLE IF EXISTS foo;
create table foo (a varchar (100), primary key (a));
insert into foo values ("a"),("B"),("c"),("D"),("e"),("F"),("A");
ERROR 23000: Duplicate entry 'A' for key 'PRIMARY'
insert into foo values ("a"),("B"),("c"),("D"),("e"),("F");
insert into foo values ("C");
ERROR 23000: Duplicate entry 'C' for key 'PRIMARY'
insert into foo values ("d");
ERROR 23000: Duplicate entry 'd' for key 'PRIMARY'
begin;
update foo set a="C" where a="c";
select * from foo;
a
a
B
C
D
e
F
rollback;
select * from foo;
a
a
B
c
D
e
F
select * from foo where a="c";
a
c
select * from foo where a="C";
a
c
select * from foo where a > "c";
a
D
e
F
select * from foo where a > "C";
a
D
e
F
select * from foo where a >= "c";
a
c
D
e
F
select * from foo where a >= "C";
a
c
D
e
F
select * from foo where a < "c";
a
a
B
select * from foo where a < "C";
a
a
B
select * from foo where a <= "c";
a
a
B
c
select * from foo where a <= "C";
a
a
B
c
update foo set a = "d" where a="a";
ERROR 23000: Duplicate entry 'd' for key 'PRIMARY'
update foo set a = "C" where a="a";
ERROR 23000: Duplicate entry 'C' for key 'PRIMARY'
drop table foo;
create table foo (a varchar (100), b int, primary key (a), key(b));
insert into foo values ("a",1000),("B",1),("c",10000),("D",10),("e",109),("F",1),("A",1);
ERROR 23000: Duplicate entry 'A' for key 'PRIMARY'
insert into foo values ("a",3),("B",1),("c",4),("D",2),("e",11),("F",8);
insert into foo values ("C",1);
ERROR 23000: Duplicate entry 'C' for key 'PRIMARY'
insert into foo values ("d",1);
ERROR 23000: Duplicate entry 'd' for key 'PRIMARY'
begin;
update foo set a="C" where a="c";
select * from foo;
a b
B 1
D 2
a 3
C 4
F 8
e 11
rollback;
select * from foo;
a b
B 1
D 2
a 3
c 4
F 8
e 11
update foo set a = "d" where a="a";
ERROR 23000: Duplicate entry 'd' for key 'PRIMARY'
update foo set a = "C" where a="a";
ERROR 23000: Duplicate entry 'C' for key 'PRIMARY'
drop table foo;
create table foo (a varchar (100), b int, unique key (a), primary key(b));
insert into foo values ("a",1000),("B",1),("c",10000),("D",10),("e",109),("F",1),("A",22);
ERROR 23000: Duplicate entry '1' for key 'PRIMARY'
insert into foo values ("a",3),("B",1),("c",4),("D",2),("e",11),("F",8);
insert into foo values ("C",100);
ERROR 23000: Duplicate entry 'C' for key 'a'
insert into foo values ("d",100);
ERROR 23000: Duplicate entry 'd' for key 'a'
begin;
update foo set a="C" where a="c";
select * from foo;
a b
a 3
B 1
C 4
D 2
e 11
F 8
rollback;
select * from foo;
a b
a 3
B 1
c 4
D 2
e 11
F 8
update foo set a = "d" where a="a";
ERROR 23000: Duplicate entry 'd' for key 'a'
update foo set a = "C" where a="a";
ERROR 23000: Duplicate entry 'C' for key 'a'
select * from foo where a="c";
a b
c 4
select * from foo where a="C";
a b
c 4
select * from foo where a > "c";
a b
D 2
e 11
F 8
select * from foo where a > "C";
a b
D 2
e 11
F 8
select * from foo where a >= "c";
a b
c 4
D 2
e 11
F 8
select * from foo where a >= "C";
a b
c 4
D 2
e 11
F 8
select * from foo where a < "c";
a b
a 3
B 1
select * from foo where a < "C";
a b
a 3
B 1
select * from foo where a <= "c";
a b
a 3
B 1
c 4
select * from foo where a <= "C";
a b
a 3
B 1
c 4
drop table foo;
--source include/have_tokudb.inc
#
# Record inconsistency.
#
#
SET DEFAULT_STORAGE_ENGINE = 'tokudb';
--disable_warnings
DROP TABLE IF EXISTS foo;
--enable_warnings
# first test pk as a varchar
create table foo (a varchar (100), primary key (a));
# test loader
--error ER_DUP_ENTRY
insert into foo values ("a"),("B"),("c"),("D"),("e"),("F"),("A");
insert into foo values ("a"),("B"),("c"),("D"),("e"),("F");
--error ER_DUP_ENTRY
insert into foo values ("C");
--error ER_DUP_ENTRY
insert into foo values ("d");
begin;
# test an update works
update foo set a="C" where a="c";
select * from foo;
# test a rollback works
rollback;
select * from foo;
# now test some queries
select * from foo where a="c";
select * from foo where a="C";
select * from foo where a > "c";
select * from foo where a > "C";
select * from foo where a >= "c";
select * from foo where a >= "C";
select * from foo where a < "c";
select * from foo where a < "C";
select * from foo where a <= "c";
select * from foo where a <= "C";
--error ER_DUP_ENTRY
update foo set a = "d" where a="a";
--error ER_DUP_ENTRY
update foo set a = "C" where a="a";
drop table foo;
#Now repeat all that when we have a second column and key
# first test pk as a varchar
create table foo (a varchar (100), b int, primary key (a), key(b));
# test loader
--error ER_DUP_ENTRY
insert into foo values ("a",1000),("B",1),("c",10000),("D",10),("e",109),("F",1),("A",1);
insert into foo values ("a",3),("B",1),("c",4),("D",2),("e",11),("F",8);
--error ER_DUP_ENTRY
insert into foo values ("C",1);
--error ER_DUP_ENTRY
insert into foo values ("d",1);
begin;
# test an update works
update foo set a="C" where a="c";
select * from foo;
# test a rollback works
rollback;
select * from foo;
--error ER_DUP_ENTRY
update foo set a = "d" where a="a";
--error ER_DUP_ENTRY
update foo set a = "C" where a="a";
drop table foo;
#Now repeat all that when we have a second column and key
# first test pk as a varchar
create table foo (a varchar (100), b int, unique key (a), primary key(b));
# test loader
--error ER_DUP_ENTRY
insert into foo values ("a",1000),("B",1),("c",10000),("D",10),("e",109),("F",1),("A",22);
insert into foo values ("a",3),("B",1),("c",4),("D",2),("e",11),("F",8);
--error ER_DUP_ENTRY
insert into foo values ("C",100);
--error ER_DUP_ENTRY
insert into foo values ("d",100);
begin;
# test an update works
update foo set a="C" where a="c";
select * from foo;
# test a rollback works
rollback;
select * from foo;
--error ER_DUP_ENTRY
update foo set a = "d" where a="a";
--error ER_DUP_ENTRY
update foo set a = "C" where a="a";
# now test some queries
select * from foo where a="c";
select * from foo where a="C";
select * from foo where a > "c";
select * from foo where a > "C";
select * from foo where a >= "c";
select * from foo where a >= "C";
select * from foo where a < "c";
select * from foo where a < "C";
select * from foo where a <= "c";
select * from foo where a <= "C";
drop table foo;
......@@ -1800,6 +1800,7 @@ int ha_tokudb::initialize_share(
primary_key
);
share->pk_has_string = false;
if (!hidden_primary_key) {
//
// We need to set the ref_length to start at 5, to account for
......@@ -1810,6 +1811,14 @@ int ha_tokudb::initialize_share(
KEY_PART_INFO *end = key_part + get_key_parts(&table->key_info[primary_key]);
for (; key_part != end; key_part++) {
ref_length += key_part->field->max_packed_col_length(key_part->length);
TOKU_TYPE toku_type = mysql_to_toku_type(key_part->field);
if (toku_type == toku_type_fixstring ||
toku_type == toku_type_varstring ||
toku_type == toku_type_blob
)
{
share->pk_has_string = true;
}
}
share->status |= STATUS_PRIMARY_KEY_INIT;
}
......@@ -2882,7 +2891,14 @@ DBT* ha_tokudb::create_dbt_key_for_lookup(
)
{
TOKUDB_DBUG_ENTER("ha_tokudb::create_dbt_key_from_lookup");
DBUG_RETURN(create_dbt_key_from_key(key, key_info, buff, record, has_null, true, key_length));
DBT* ret = create_dbt_key_from_key(key, key_info, buff, record, has_null, true, key_length);
// override the infinity byte, needed in case the pk is a string
// to make sure that the cursor that uses this key properly positions
// it at the right location. If the table stores "D", but we look up for "d",
// and the infinity byte is 0, then we will skip the "D", because
// in bytes, "d" > "D".
buff[0] = COL_NEG_INF;
DBUG_RETURN(ret);
}
//
......@@ -3236,6 +3252,7 @@ ha_rows ha_tokudb::estimate_rows_upper_bound() {
//
int ha_tokudb::cmp_ref(const uchar * ref1, const uchar * ref2) {
int ret_val = 0;
bool read_string = false;
ret_val = tokudb_compare_two_keys(
ref1 + sizeof(uint32_t),
*(uint32_t *)ref1,
......@@ -3243,7 +3260,8 @@ int ha_tokudb::cmp_ref(const uchar * ref1, const uchar * ref2) {
*(uint32_t *)ref2,
(uchar *)share->file->descriptor->dbt.data + 4,
*(uint32_t *)share->file->descriptor->dbt.data - 4,
false
false,
&read_string
);
return ret_val;
}
......@@ -3418,7 +3436,7 @@ int ha_tokudb::end_bulk_insert(bool abort) {
for (uint i = 0; i < table_share->keys; i++) {
if (table_share->key_info[i].flags & HA_NOSAME) {
bool is_unique;
if (i == primary_key) {
if (i == primary_key && !share->pk_has_string) {
continue;
}
error = is_index_unique(
......@@ -3707,12 +3725,12 @@ int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) {
//
if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool is_unique_key = table->key_info[keynr].flags & HA_NOSAME;
bool is_unique_key = (table->key_info[keynr].flags & HA_NOSAME) || (keynr == primary_key);
bool is_unique = false;
//
// don't need to do check for primary key
// don't need to do check for primary key that don't have strings
//
if (keynr == primary_key) {
if (keynr == primary_key && !share->pk_has_string) {
continue;
}
if (!is_unique_key) {
......@@ -4094,39 +4112,38 @@ int ha_tokudb::write_row(uchar * record) {
}
}
else {
error = do_uniqueness_checks(record, txn, thd);
if (error) {
// for #4633
// if we have a duplicate key error, let's check the primary key to see
// if there is a duplicate there. If so, set last_dup_key to the pk
if (error == DB_KEYEXIST && !test(hidden_primary_key) && last_dup_key != primary_key) {
int r = share->file->getf_set(
share->file,
txn,
0,
&prim_key,
smart_dbt_do_nothing,
NULL
);
if (r == 0) {
// if we get no error, that means the row
// was found and this is a duplicate key,
// so we set last_dup_key
last_dup_key = primary_key;
}
else if (r != DB_NOTFOUND) {
// if some other error is returned, return that to the user.
error = r;
}
}
goto cleanup;
}
if (curr_num_DBs == 1) {
error = insert_row_to_main_dictionary(record,&prim_key, &row, txn);
if (error) { goto cleanup; }
}
else {
error = do_uniqueness_checks(record, txn, thd);
if (error) {
// for #4633
// if we have a duplicate key error, let's check the primary key to see
// if there is a duplicate there. If so, set last_dup_key to the pk
if (error == DB_KEYEXIST && !test(hidden_primary_key)) {
int r = share->file->getf_set(
share->file,
txn,
0,
&prim_key,
smart_dbt_do_nothing,
NULL
);
if (r == 0) {
// if we get no error, that means the row
// was found and this is a duplicate key,
// so we set last_dup_key
last_dup_key = primary_key;
}
else if (r != DB_NOTFOUND) {
// if some other error is returned, return that to the user.
error = r;
}
}
goto cleanup;
}
error = insert_rows_to_dictionaries_mult(&prim_key, &row, txn, thd);
if (error) { goto cleanup; }
}
......@@ -4262,8 +4279,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
//
if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool is_unique_key = table->key_info[keynr].flags & HA_NOSAME;
if (keynr == primary_key) {
bool is_unique_key = (table->key_info[keynr].flags & HA_NOSAME) || (keynr == primary_key);
if (keynr == primary_key && !share->pk_has_string) {
continue;
}
if (is_unique_key) {
......
......@@ -164,6 +164,10 @@ class TOKUDB_SHARE {
// index of auto increment column in table->field, if auto_inc exists
//
uint ai_field_index;
//
// whether the primary key has a string
//
bool pk_has_string;
KEY_AND_COL_INFO kc_info;
......
......@@ -1250,7 +1250,8 @@ static inline int compare_toku_field(
uchar* row_desc,
uint32_t* a_bytes_read,
uint32_t* b_bytes_read,
uint32_t* row_desc_bytes_read
uint32_t* row_desc_bytes_read,
bool* read_string
)
{
int ret_val = 0;
......@@ -1333,6 +1334,7 @@ static inline int compare_toku_field(
a_bytes_read,
b_bytes_read
);
*read_string = true;
break;
default:
assert(false);
......@@ -1570,7 +1572,8 @@ int tokudb_compare_two_keys(
const uint32_t saved_key_size,
const void* row_desc,
const uint32_t row_desc_size,
bool cmp_prefix
bool cmp_prefix,
bool* read_string
)
{
int ret_val = 0;
......@@ -1639,7 +1642,8 @@ int tokudb_compare_two_keys(
row_desc_ptr,
&new_key_field_length,
&saved_key_field_length,
&row_desc_field_length
&row_desc_field_length,
read_string
);
new_key_ptr += new_key_field_length;
saved_key_ptr += saved_key_field_length;
......@@ -1683,17 +1687,25 @@ int tokudb_compare_two_keys(
return ret_val;
}
static int simple_memcmp(const DBT *keya, const DBT *keyb) {
int cmp;
int num_bytes_cmp = keya->size < keyb->size ?
keya->size : keyb->size;
cmp = memcmp(keya->data,keyb->data,num_bytes_cmp);
if (cmp == 0 && (keya->size != keyb->size)) {
cmp = keya->size < keyb->size ? -1 : 1;
}
return cmp;
}
// comparison function to be used by the fractal trees.
int tokudb_cmp_dbt_key(DB* file, const DBT *keya, const DBT *keyb) {
int cmp;
if (file->cmp_descriptor->dbt.size == 0) {
int num_bytes_cmp = keya->size < keyb->size ?
keya->size : keyb->size;
cmp = memcmp(keya->data,keyb->data,num_bytes_cmp);
if (cmp == 0 && (keya->size != keyb->size)) {
cmp = keya->size < keyb->size ? -1 : 1;
}
cmp = simple_memcmp(keya, keyb);
}
else {
bool read_string = false;
cmp = tokudb_compare_two_keys(
keya->data,
keya->size,
......@@ -1701,14 +1713,24 @@ int tokudb_cmp_dbt_key(DB* file, const DBT *keya, const DBT *keyb) {
keyb->size,
(uchar *)file->cmp_descriptor->dbt.data + 4,
(*(uint32_t *)file->cmp_descriptor->dbt.data) - 4,
false
false,
&read_string
);
// comparison above may be case-insensitive, but fractal tree
// needs to distinguish between different data, so we do this
// additional check here
if (read_string && (cmp == 0)) {
cmp = simple_memcmp(keya, keyb);
}
}
return cmp;
}
//TODO: QQQ Only do one direction for prefix.
int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb) {
// calls to this function are done by the handlerton, and are
// comparing just the keys as MySQL would compare them.
bool read_string = false;
int cmp = tokudb_compare_two_keys(
keya->data,
keya->size,
......@@ -1716,7 +1738,8 @@ int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb) {
keyb->size,
(uchar *)file->cmp_descriptor->dbt.data + 4,
*(uint32_t *)file->cmp_descriptor->dbt.data - 4,
true
true,
&read_string
);
return cmp;
}
......@@ -1785,14 +1808,15 @@ static int tokudb_compare_two_key_parts(
}
}
row_desc_ptr++;
bool read_string = false;
ret_val = compare_toku_field(
new_key_ptr,
saved_key_ptr,
row_desc_ptr,
&new_key_field_length,
&saved_key_field_length,
&row_desc_field_length
&row_desc_field_length,
&read_string
);
new_key_ptr += new_key_field_length;
saved_key_ptr += saved_key_field_length;
......
......@@ -362,7 +362,8 @@ int tokudb_compare_two_keys(
const uint32_t saved_key_size,
const void* row_desc,
const uint32_t row_desc_size,
bool cmp_prefix
bool cmp_prefix,
bool* read_string
);
int tokudb_cmp_dbt_key(DB* db, const DBT *keya, const DBT *keyb);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment