Commit 74d2ec5e authored by Zardosht Kasheff's avatar Zardosht Kasheff

refs #94, for keys with strings, add a memcmp at the end of the comparison

function if we are doing comparisons in the fractal tree, so that case-insensitivities
get resolved. Comparisons done inside the handlerton are unaffected.
parent fad930a2
SET DEFAULT_STORAGE_ENGINE = 'tokudb';
DROP TABLE IF EXISTS foo;
create table foo (a varchar (100), primary key (a));
insert into foo values ("a"),("B"),("c"),("D"),("e"),("F"),("A");
ERROR 23000: Duplicate entry 'A' for key 'PRIMARY'
insert into foo values ("a"),("B"),("c"),("D"),("e"),("F");
insert into foo values ("C");
ERROR 23000: Duplicate entry 'C' for key 'PRIMARY'
insert into foo values ("d");
ERROR 23000: Duplicate entry 'd' for key 'PRIMARY'
begin;
update foo set a="C" where a="c";
select * from foo;
a
a
B
C
D
e
F
rollback;
select * from foo;
a
a
B
c
D
e
F
select * from foo where a="c";
a
c
select * from foo where a="C";
a
c
select * from foo where a > "c";
a
D
e
F
select * from foo where a > "C";
a
D
e
F
select * from foo where a >= "c";
a
c
D
e
F
select * from foo where a >= "C";
a
c
D
e
F
select * from foo where a < "c";
a
a
B
select * from foo where a < "C";
a
a
B
select * from foo where a <= "c";
a
a
B
c
select * from foo where a <= "C";
a
a
B
c
update foo set a = "d" where a="a";
ERROR 23000: Duplicate entry 'd' for key 'PRIMARY'
update foo set a = "C" where a="a";
ERROR 23000: Duplicate entry 'C' for key 'PRIMARY'
drop table foo;
create table foo (a varchar (100), b int, primary key (a), key(b));
insert into foo values ("a",1000),("B",1),("c",10000),("D",10),("e",109),("F",1),("A",1);
ERROR 23000: Duplicate entry 'A' for key 'PRIMARY'
insert into foo values ("a",3),("B",1),("c",4),("D",2),("e",11),("F",8);
insert into foo values ("C",1);
ERROR 23000: Duplicate entry 'C' for key 'PRIMARY'
insert into foo values ("d",1);
ERROR 23000: Duplicate entry 'd' for key 'PRIMARY'
begin;
update foo set a="C" where a="c";
select * from foo;
a b
B 1
D 2
a 3
C 4
F 8
e 11
rollback;
select * from foo;
a b
B 1
D 2
a 3
c 4
F 8
e 11
update foo set a = "d" where a="a";
ERROR 23000: Duplicate entry 'd' for key 'PRIMARY'
update foo set a = "C" where a="a";
ERROR 23000: Duplicate entry 'C' for key 'PRIMARY'
drop table foo;
create table foo (a varchar (100), b int, unique key (a), primary key(b));
insert into foo values ("a",1000),("B",1),("c",10000),("D",10),("e",109),("F",1),("A",22);
ERROR 23000: Duplicate entry '1' for key 'PRIMARY'
insert into foo values ("a",3),("B",1),("c",4),("D",2),("e",11),("F",8);
insert into foo values ("C",100);
ERROR 23000: Duplicate entry 'C' for key 'a'
insert into foo values ("d",100);
ERROR 23000: Duplicate entry 'd' for key 'a'
begin;
update foo set a="C" where a="c";
select * from foo;
a b
a 3
B 1
C 4
D 2
e 11
F 8
rollback;
select * from foo;
a b
a 3
B 1
c 4
D 2
e 11
F 8
update foo set a = "d" where a="a";
ERROR 23000: Duplicate entry 'd' for key 'a'
update foo set a = "C" where a="a";
ERROR 23000: Duplicate entry 'C' for key 'a'
select * from foo where a="c";
a b
c 4
select * from foo where a="C";
a b
c 4
select * from foo where a > "c";
a b
D 2
e 11
F 8
select * from foo where a > "C";
a b
D 2
e 11
F 8
select * from foo where a >= "c";
a b
c 4
D 2
e 11
F 8
select * from foo where a >= "C";
a b
c 4
D 2
e 11
F 8
select * from foo where a < "c";
a b
a 3
B 1
select * from foo where a < "C";
a b
a 3
B 1
select * from foo where a <= "c";
a b
a 3
B 1
c 4
select * from foo where a <= "C";
a b
a 3
B 1
c 4
drop table foo;
--source include/have_tokudb.inc
#
# Record inconsistency.
#
#
SET DEFAULT_STORAGE_ENGINE = 'tokudb';
--disable_warnings
DROP TABLE IF EXISTS foo;
--enable_warnings
# first test pk as a varchar
create table foo (a varchar (100), primary key (a));
# test loader
--error ER_DUP_ENTRY
insert into foo values ("a"),("B"),("c"),("D"),("e"),("F"),("A");
insert into foo values ("a"),("B"),("c"),("D"),("e"),("F");
--error ER_DUP_ENTRY
insert into foo values ("C");
--error ER_DUP_ENTRY
insert into foo values ("d");
begin;
# test an update works
update foo set a="C" where a="c";
select * from foo;
# test a rollback works
rollback;
select * from foo;
# now test some queries
select * from foo where a="c";
select * from foo where a="C";
select * from foo where a > "c";
select * from foo where a > "C";
select * from foo where a >= "c";
select * from foo where a >= "C";
select * from foo where a < "c";
select * from foo where a < "C";
select * from foo where a <= "c";
select * from foo where a <= "C";
--error ER_DUP_ENTRY
update foo set a = "d" where a="a";
--error ER_DUP_ENTRY
update foo set a = "C" where a="a";
drop table foo;
#Now repeat all that when we have a second column and key
# first test pk as a varchar
create table foo (a varchar (100), b int, primary key (a), key(b));
# test loader
--error ER_DUP_ENTRY
insert into foo values ("a",1000),("B",1),("c",10000),("D",10),("e",109),("F",1),("A",1);
insert into foo values ("a",3),("B",1),("c",4),("D",2),("e",11),("F",8);
--error ER_DUP_ENTRY
insert into foo values ("C",1);
--error ER_DUP_ENTRY
insert into foo values ("d",1);
begin;
# test an update works
update foo set a="C" where a="c";
select * from foo;
# test a rollback works
rollback;
select * from foo;
--error ER_DUP_ENTRY
update foo set a = "d" where a="a";
--error ER_DUP_ENTRY
update foo set a = "C" where a="a";
drop table foo;
#Now repeat all that when we have a second column and key
# first test pk as a varchar
create table foo (a varchar (100), b int, unique key (a), primary key(b));
# test loader
--error ER_DUP_ENTRY
insert into foo values ("a",1000),("B",1),("c",10000),("D",10),("e",109),("F",1),("A",22);
insert into foo values ("a",3),("B",1),("c",4),("D",2),("e",11),("F",8);
--error ER_DUP_ENTRY
insert into foo values ("C",100);
--error ER_DUP_ENTRY
insert into foo values ("d",100);
begin;
# test an update works
update foo set a="C" where a="c";
select * from foo;
# test a rollback works
rollback;
select * from foo;
--error ER_DUP_ENTRY
update foo set a = "d" where a="a";
--error ER_DUP_ENTRY
update foo set a = "C" where a="a";
# now test some queries
select * from foo where a="c";
select * from foo where a="C";
select * from foo where a > "c";
select * from foo where a > "C";
select * from foo where a >= "c";
select * from foo where a >= "C";
select * from foo where a < "c";
select * from foo where a < "C";
select * from foo where a <= "c";
select * from foo where a <= "C";
drop table foo;
...@@ -1800,6 +1800,7 @@ int ha_tokudb::initialize_share( ...@@ -1800,6 +1800,7 @@ int ha_tokudb::initialize_share(
primary_key primary_key
); );
share->pk_has_string = false;
if (!hidden_primary_key) { if (!hidden_primary_key) {
// //
// We need to set the ref_length to start at 5, to account for // We need to set the ref_length to start at 5, to account for
...@@ -1810,6 +1811,14 @@ int ha_tokudb::initialize_share( ...@@ -1810,6 +1811,14 @@ int ha_tokudb::initialize_share(
KEY_PART_INFO *end = key_part + get_key_parts(&table->key_info[primary_key]); KEY_PART_INFO *end = key_part + get_key_parts(&table->key_info[primary_key]);
for (; key_part != end; key_part++) { for (; key_part != end; key_part++) {
ref_length += key_part->field->max_packed_col_length(key_part->length); ref_length += key_part->field->max_packed_col_length(key_part->length);
TOKU_TYPE toku_type = mysql_to_toku_type(key_part->field);
if (toku_type == toku_type_fixstring ||
toku_type == toku_type_varstring ||
toku_type == toku_type_blob
)
{
share->pk_has_string = true;
}
} }
share->status |= STATUS_PRIMARY_KEY_INIT; share->status |= STATUS_PRIMARY_KEY_INIT;
} }
...@@ -2882,7 +2891,14 @@ DBT* ha_tokudb::create_dbt_key_for_lookup( ...@@ -2882,7 +2891,14 @@ DBT* ha_tokudb::create_dbt_key_for_lookup(
) )
{ {
TOKUDB_DBUG_ENTER("ha_tokudb::create_dbt_key_from_lookup"); TOKUDB_DBUG_ENTER("ha_tokudb::create_dbt_key_from_lookup");
DBUG_RETURN(create_dbt_key_from_key(key, key_info, buff, record, has_null, true, key_length)); DBT* ret = create_dbt_key_from_key(key, key_info, buff, record, has_null, true, key_length);
// override the infinity byte, needed in case the pk is a string
// to make sure that the cursor that uses this key properly positions
// it at the right location. If the table stores "D", but we look up for "d",
// and the infinity byte is 0, then we will skip the "D", because
// in bytes, "d" > "D".
buff[0] = COL_NEG_INF;
DBUG_RETURN(ret);
} }
// //
...@@ -3236,6 +3252,7 @@ ha_rows ha_tokudb::estimate_rows_upper_bound() { ...@@ -3236,6 +3252,7 @@ ha_rows ha_tokudb::estimate_rows_upper_bound() {
// //
int ha_tokudb::cmp_ref(const uchar * ref1, const uchar * ref2) { int ha_tokudb::cmp_ref(const uchar * ref1, const uchar * ref2) {
int ret_val = 0; int ret_val = 0;
bool read_string = false;
ret_val = tokudb_compare_two_keys( ret_val = tokudb_compare_two_keys(
ref1 + sizeof(uint32_t), ref1 + sizeof(uint32_t),
*(uint32_t *)ref1, *(uint32_t *)ref1,
...@@ -3243,7 +3260,8 @@ int ha_tokudb::cmp_ref(const uchar * ref1, const uchar * ref2) { ...@@ -3243,7 +3260,8 @@ int ha_tokudb::cmp_ref(const uchar * ref1, const uchar * ref2) {
*(uint32_t *)ref2, *(uint32_t *)ref2,
(uchar *)share->file->descriptor->dbt.data + 4, (uchar *)share->file->descriptor->dbt.data + 4,
*(uint32_t *)share->file->descriptor->dbt.data - 4, *(uint32_t *)share->file->descriptor->dbt.data - 4,
false false,
&read_string
); );
return ret_val; return ret_val;
} }
...@@ -3418,7 +3436,7 @@ int ha_tokudb::end_bulk_insert(bool abort) { ...@@ -3418,7 +3436,7 @@ int ha_tokudb::end_bulk_insert(bool abort) {
for (uint i = 0; i < table_share->keys; i++) { for (uint i = 0; i < table_share->keys; i++) {
if (table_share->key_info[i].flags & HA_NOSAME) { if (table_share->key_info[i].flags & HA_NOSAME) {
bool is_unique; bool is_unique;
if (i == primary_key) { if (i == primary_key && !share->pk_has_string) {
continue; continue;
} }
error = is_index_unique( error = is_index_unique(
...@@ -3707,12 +3725,12 @@ int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) { ...@@ -3707,12 +3725,12 @@ int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) {
// //
if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) { if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
for (uint keynr = 0; keynr < table_share->keys; keynr++) { for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool is_unique_key = table->key_info[keynr].flags & HA_NOSAME; bool is_unique_key = (table->key_info[keynr].flags & HA_NOSAME) || (keynr == primary_key);
bool is_unique = false; bool is_unique = false;
// //
// don't need to do check for primary key // don't need to do check for primary key that don't have strings
// //
if (keynr == primary_key) { if (keynr == primary_key && !share->pk_has_string) {
continue; continue;
} }
if (!is_unique_key) { if (!is_unique_key) {
...@@ -4093,18 +4111,13 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -4093,18 +4111,13 @@ int ha_tokudb::write_row(uchar * record) {
goto cleanup; goto cleanup;
} }
} }
else {
if (curr_num_DBs == 1) {
error = insert_row_to_main_dictionary(record,&prim_key, &row, txn);
if (error) { goto cleanup; }
}
else { else {
error = do_uniqueness_checks(record, txn, thd); error = do_uniqueness_checks(record, txn, thd);
if (error) { if (error) {
// for #4633 // for #4633
// if we have a duplicate key error, let's check the primary key to see // if we have a duplicate key error, let's check the primary key to see
// if there is a duplicate there. If so, set last_dup_key to the pk // if there is a duplicate there. If so, set last_dup_key to the pk
if (error == DB_KEYEXIST && !test(hidden_primary_key)) { if (error == DB_KEYEXIST && !test(hidden_primary_key) && last_dup_key != primary_key) {
int r = share->file->getf_set( int r = share->file->getf_set(
share->file, share->file,
txn, txn,
...@@ -4126,7 +4139,11 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -4126,7 +4139,11 @@ int ha_tokudb::write_row(uchar * record) {
} }
goto cleanup; goto cleanup;
} }
if (curr_num_DBs == 1) {
error = insert_row_to_main_dictionary(record,&prim_key, &row, txn);
if (error) { goto cleanup; }
}
else {
error = insert_rows_to_dictionaries_mult(&prim_key, &row, txn, thd); error = insert_rows_to_dictionaries_mult(&prim_key, &row, txn, thd);
if (error) { goto cleanup; } if (error) { goto cleanup; }
} }
...@@ -4262,8 +4279,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -4262,8 +4279,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
// //
if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) { if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
for (uint keynr = 0; keynr < table_share->keys; keynr++) { for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool is_unique_key = table->key_info[keynr].flags & HA_NOSAME; bool is_unique_key = (table->key_info[keynr].flags & HA_NOSAME) || (keynr == primary_key);
if (keynr == primary_key) { if (keynr == primary_key && !share->pk_has_string) {
continue; continue;
} }
if (is_unique_key) { if (is_unique_key) {
......
...@@ -164,6 +164,10 @@ class TOKUDB_SHARE { ...@@ -164,6 +164,10 @@ class TOKUDB_SHARE {
// index of auto increment column in table->field, if auto_inc exists // index of auto increment column in table->field, if auto_inc exists
// //
uint ai_field_index; uint ai_field_index;
//
// whether the primary key has a string
//
bool pk_has_string;
KEY_AND_COL_INFO kc_info; KEY_AND_COL_INFO kc_info;
......
...@@ -1250,7 +1250,8 @@ static inline int compare_toku_field( ...@@ -1250,7 +1250,8 @@ static inline int compare_toku_field(
uchar* row_desc, uchar* row_desc,
uint32_t* a_bytes_read, uint32_t* a_bytes_read,
uint32_t* b_bytes_read, uint32_t* b_bytes_read,
uint32_t* row_desc_bytes_read uint32_t* row_desc_bytes_read,
bool* read_string
) )
{ {
int ret_val = 0; int ret_val = 0;
...@@ -1333,6 +1334,7 @@ static inline int compare_toku_field( ...@@ -1333,6 +1334,7 @@ static inline int compare_toku_field(
a_bytes_read, a_bytes_read,
b_bytes_read b_bytes_read
); );
*read_string = true;
break; break;
default: default:
assert(false); assert(false);
...@@ -1570,7 +1572,8 @@ int tokudb_compare_two_keys( ...@@ -1570,7 +1572,8 @@ int tokudb_compare_two_keys(
const uint32_t saved_key_size, const uint32_t saved_key_size,
const void* row_desc, const void* row_desc,
const uint32_t row_desc_size, const uint32_t row_desc_size,
bool cmp_prefix bool cmp_prefix,
bool* read_string
) )
{ {
int ret_val = 0; int ret_val = 0;
...@@ -1639,7 +1642,8 @@ int tokudb_compare_two_keys( ...@@ -1639,7 +1642,8 @@ int tokudb_compare_two_keys(
row_desc_ptr, row_desc_ptr,
&new_key_field_length, &new_key_field_length,
&saved_key_field_length, &saved_key_field_length,
&row_desc_field_length &row_desc_field_length,
read_string
); );
new_key_ptr += new_key_field_length; new_key_ptr += new_key_field_length;
saved_key_ptr += saved_key_field_length; saved_key_ptr += saved_key_field_length;
...@@ -1683,17 +1687,25 @@ int tokudb_compare_two_keys( ...@@ -1683,17 +1687,25 @@ int tokudb_compare_two_keys(
return ret_val; return ret_val;
} }
int tokudb_cmp_dbt_key(DB* file, const DBT *keya, const DBT *keyb) { static int simple_memcmp(const DBT *keya, const DBT *keyb) {
int cmp; int cmp;
if (file->cmp_descriptor->dbt.size == 0) {
int num_bytes_cmp = keya->size < keyb->size ? int num_bytes_cmp = keya->size < keyb->size ?
keya->size : keyb->size; keya->size : keyb->size;
cmp = memcmp(keya->data,keyb->data,num_bytes_cmp); cmp = memcmp(keya->data,keyb->data,num_bytes_cmp);
if (cmp == 0 && (keya->size != keyb->size)) { if (cmp == 0 && (keya->size != keyb->size)) {
cmp = keya->size < keyb->size ? -1 : 1; cmp = keya->size < keyb->size ? -1 : 1;
} }
return cmp;
}
// comparison function to be used by the fractal trees.
int tokudb_cmp_dbt_key(DB* file, const DBT *keya, const DBT *keyb) {
int cmp;
if (file->cmp_descriptor->dbt.size == 0) {
cmp = simple_memcmp(keya, keyb);
} }
else { else {
bool read_string = false;
cmp = tokudb_compare_two_keys( cmp = tokudb_compare_two_keys(
keya->data, keya->data,
keya->size, keya->size,
...@@ -1701,14 +1713,24 @@ int tokudb_cmp_dbt_key(DB* file, const DBT *keya, const DBT *keyb) { ...@@ -1701,14 +1713,24 @@ int tokudb_cmp_dbt_key(DB* file, const DBT *keya, const DBT *keyb) {
keyb->size, keyb->size,
(uchar *)file->cmp_descriptor->dbt.data + 4, (uchar *)file->cmp_descriptor->dbt.data + 4,
(*(uint32_t *)file->cmp_descriptor->dbt.data) - 4, (*(uint32_t *)file->cmp_descriptor->dbt.data) - 4,
false false,
&read_string
); );
// comparison above may be case-insensitive, but fractal tree
// needs to distinguish between different data, so we do this
// additional check here
if (read_string && (cmp == 0)) {
cmp = simple_memcmp(keya, keyb);
}
} }
return cmp; return cmp;
} }
//TODO: QQQ Only do one direction for prefix. //TODO: QQQ Only do one direction for prefix.
int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb) { int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb) {
// calls to this function are done by the handlerton, and are
// comparing just the keys as MySQL would compare them.
bool read_string = false;
int cmp = tokudb_compare_two_keys( int cmp = tokudb_compare_two_keys(
keya->data, keya->data,
keya->size, keya->size,
...@@ -1716,7 +1738,8 @@ int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb) { ...@@ -1716,7 +1738,8 @@ int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb) {
keyb->size, keyb->size,
(uchar *)file->cmp_descriptor->dbt.data + 4, (uchar *)file->cmp_descriptor->dbt.data + 4,
*(uint32_t *)file->cmp_descriptor->dbt.data - 4, *(uint32_t *)file->cmp_descriptor->dbt.data - 4,
true true,
&read_string
); );
return cmp; return cmp;
} }
...@@ -1785,14 +1808,15 @@ static int tokudb_compare_two_key_parts( ...@@ -1785,14 +1808,15 @@ static int tokudb_compare_two_key_parts(
} }
} }
row_desc_ptr++; row_desc_ptr++;
bool read_string = false;
ret_val = compare_toku_field( ret_val = compare_toku_field(
new_key_ptr, new_key_ptr,
saved_key_ptr, saved_key_ptr,
row_desc_ptr, row_desc_ptr,
&new_key_field_length, &new_key_field_length,
&saved_key_field_length, &saved_key_field_length,
&row_desc_field_length &row_desc_field_length,
&read_string
); );
new_key_ptr += new_key_field_length; new_key_ptr += new_key_field_length;
saved_key_ptr += saved_key_field_length; saved_key_ptr += saved_key_field_length;
......
...@@ -362,7 +362,8 @@ int tokudb_compare_two_keys( ...@@ -362,7 +362,8 @@ int tokudb_compare_two_keys(
const uint32_t saved_key_size, const uint32_t saved_key_size,
const void* row_desc, const void* row_desc,
const uint32_t row_desc_size, const uint32_t row_desc_size,
bool cmp_prefix bool cmp_prefix,
bool* read_string
); );
int tokudb_cmp_dbt_key(DB* db, const DBT *keya, const DBT *keyb); int tokudb_cmp_dbt_key(DB* db, const DBT *keya, const DBT *keyb);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment