Commit 060d2d73 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1058

get ha_tokudb to properly support keys with NULL values in unique indexes
still need to write an automated test

git-svn-id: file:///svn/mysql/tokudb-engine/src@5589 c7de825b-a66e-492c-adef-691d508d4ae1
parent c38e0d78
......@@ -1050,14 +1050,15 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
my_free(cmp_byte_stream.data, MYF(0));
}
else
else {
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
if (!(key_info->flags & HA_NOSAME)) {
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name));
(*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT);
(*ptr)->api_internal = share->file->app_private;
(*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key);
}
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name));
(*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT);
(*ptr)->api_internal = share->file->app_private;
(*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key);
if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
my_errno = error;
goto cleanup;
......@@ -1653,12 +1654,13 @@ void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) {
// the parameter key
//
DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length) {
DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length) {
KEY_PART_INFO *key_part = key_info->key_part;
KEY_PART_INFO *end = key_part + key_info->key_parts;
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
key->data = buff;
*has_null = false;
for (; key_part != end && key_length > 0; key_part++) {
//
// accessing key_part->field->null_bit instead off key_part->null_bit
......@@ -1669,6 +1671,7 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
/* Store 0 if the key part is a NULL part */
if (record[key_part->null_offset] & key_part->field->null_bit) {
*buff++ = 0;
*has_null = true;
//
// fractal tree does not handle this falg at the moment
// so commenting out for now
......@@ -1711,15 +1714,16 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
// Returns:
// the parameter key
//
DBT *ha_tokudb::create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length) {
DBT *ha_tokudb::create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length) {
TOKUDB_DBUG_ENTER("ha_tokudb::create_dbt_key_from_table");
bzero((void *) key, sizeof(*key));
if (hidden_primary_key && keynr == primary_key) {
key->data = current_ident;
key->size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
*has_null = false;
DBUG_RETURN(key);
}
DBUG_RETURN(create_dbt_key_from_key(key, &table->key_info[keynr],buff,record,key_length));
DBUG_RETURN(create_dbt_key_from_key(key, &table->key_info[keynr],buff,record, has_null, key_length));
}
......@@ -1977,6 +1981,7 @@ int ha_tokudb::write_row(uchar * record) {
int error;
THD *thd = NULL;
u_int32_t put_flags;
bool has_null;
//
// some crap that needs to be done because MySQL does not properly abstract
......@@ -2005,7 +2010,13 @@ int ha_tokudb::write_row(uchar * record) {
if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
put_flags = DB_YESOVERWRITE;
}
error = share->file->put(share->file, transaction, create_dbt_key_from_table(&prim_key, primary_key, key_buff, record), &row, put_flags);
error = share->file->put(
share->file,
transaction,
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null),
&row,
put_flags
);
if (error) {
last_dup_key = primary_key;
goto cleanup;
......@@ -2018,13 +2029,15 @@ int ha_tokudb::write_row(uchar * record) {
continue;
}
put_flags = share->key_type[keynr];
if (put_flags == DB_NOOVERWRITE && thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null);
if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) {
put_flags = DB_YESOVERWRITE;
}
error = share->key_file[keynr]->put(
share->key_file[keynr],
transaction,
create_dbt_key_from_table(&key, keynr, key_buff2, record),
transaction,
&key,
&prim_key,
put_flags
);
......@@ -2124,6 +2137,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
DBT prim_key, key, old_prim_key;
int error;
bool primary_key_changed;
bool has_null;
THD* thd = ha_thd();
LINT_INIT(error);
statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status);
......@@ -2139,9 +2154,9 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
old_prim_key = prim_key;
}
else {
create_dbt_key_from_table(&prim_key, primary_key, key_buff, new_row);
create_dbt_key_from_table(&prim_key, primary_key, key_buff, new_row, &has_null);
if ((primary_key_changed = key_cmp(primary_key, old_row, new_row))) {
create_dbt_key_from_table(&old_prim_key, primary_key, primary_key_buff, old_row);
create_dbt_key_from_table(&old_prim_key, primary_key, primary_key_buff, old_row, &has_null);
}
else {
old_prim_key = prim_key;
......@@ -2160,15 +2175,20 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
continue;
}
if (key_cmp(keynr, old_row, new_row) || primary_key_changed) {
u_int32_t put_flags;
if ((error = remove_key(transaction, keynr, old_row, &old_prim_key))) {
goto cleanup;
}
put_flags = share->key_type[keynr];
if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) {
put_flags = DB_YESOVERWRITE;
}
error = share->key_file[keynr]->put(
share->key_file[keynr],
transaction,
create_dbt_key_from_table(&key, keynr, key_buff2, new_row),
create_dbt_key_from_table(&key, keynr, key_buff2, new_row, &has_null),
&prim_key,
share->key_type[keynr]
put_flags
);
//
// We break if we hit an error, unless it is a dup key error
......@@ -2211,16 +2231,17 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
TOKUDB_DBUG_ENTER("ha_tokudb::remove_key");
int error;
DBT key;
bool has_null;
DBUG_PRINT("enter", ("index: %d", keynr));
DBUG_PRINT("primary", ("index: %d", primary_key));
DBUG_DUMP("prim_key", (uchar *) prim_key->data, prim_key->size);
if (keynr == active_index && cursor)
error = cursor->c_del(cursor, 0);
else if (keynr == primary_key || ((table->key_info[keynr].flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME)) { // Unique key
else if (keynr == primary_key) { // Unique key
DBUG_PRINT("Unique key", ("index: %d", keynr));
DBUG_ASSERT(keynr == primary_key || prim_key->data != key_buff2);
error = share->key_file[keynr]->del(share->key_file[keynr], trans, keynr == primary_key ? prim_key : create_dbt_key_from_table(&key, keynr, key_buff2, record), 0);
error = share->key_file[keynr]->del(share->key_file[keynr], trans, keynr == primary_key ? prim_key : create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null), 0);
} else {
/* QQQ use toku_db_delboth(key_file[keynr], key, val, trans);
To delete the not duplicated key, we need to open an cursor on the
......@@ -2230,7 +2251,7 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
DBUG_ASSERT(keynr != primary_key && prim_key->data != key_buff2);
DBC *tmp_cursor;
if (!(error = share->key_file[keynr]->cursor(share->key_file[keynr], trans, &tmp_cursor, 0))) {
if (!(error = tmp_cursor->c_get(tmp_cursor, create_dbt_key_from_table(&key, keynr, key_buff2, record), prim_key, DB_GET_BOTH))) {
if (!(error = tmp_cursor->c_get(tmp_cursor, create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null), prim_key, DB_GET_BOTH))) {
DBUG_DUMP("cget key", (uchar *) key.data, key.size);
error = tmp_cursor->c_del(tmp_cursor, 0);
}
......@@ -2281,9 +2302,10 @@ int ha_tokudb::delete_row(const uchar * record) {
int error = ENOSYS;
DBT prim_key;
key_map keys = table_share->keys_in_use;
bool has_null;
statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status);
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record);
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null);
if (hidden_primary_key) {
keys.set_bit(primary_key);
}
......@@ -3205,8 +3227,10 @@ void ha_tokudb::position(const uchar * record) {
if (hidden_primary_key) {
DBUG_ASSERT(ref_length == TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
memcpy_fixed(ref, (char *) current_ident, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
} else {
create_dbt_key_from_table(&key, primary_key, ref, record);
}
else {
bool has_null;
create_dbt_key_from_table(&key, primary_key, ref, record, &has_null);
if (key.size < ref_length)
bzero(ref + key.size, ref_length - key.size);
}
......@@ -3743,7 +3767,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
sprintf(part, "key-%s", form->s->key_info[i].name);
make_name(newname, name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, (form->key_info[i].flags & HA_NOSAME) ? 0 : DB_DUP + DB_DUPSORT);
error = create_sub_table(name_buff, NULL, DB_BTREE, DB_DUP + DB_DUPSORT);
if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, form->key_info[i].flags, error);
if (error) {
......@@ -4076,6 +4100,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
DB_TXN* txn = NULL;
uchar tmp_key_buff[2*table_arg->s->rec_buff_length];
uchar tmp_prim_key_buff[2*table_arg->s->rec_buff_length];
THD* thd = ha_thd();
//
// number of DB files we have open currently, before add_index is executed
......@@ -4118,7 +4143,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
sprintf(part, "key-%s", key_info[i].name);
make_name(newname, share->table_name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, (key_info[i].flags & HA_NOSAME) ? 0 : DB_DUP + DB_DUPSORT);
error = create_sub_table(name_buff, NULL, DB_BTREE, DB_DUP + DB_DUPSORT);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, key_info[i].flags, error);
}
......@@ -4203,9 +4228,13 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
for (uint i = 0; i < num_of_keys; i++) {
DBT secondary_key;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record);
bool has_null = false;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record, &has_null);
uint curr_index = i + curr_num_DBs;
u_int32_t put_flags = share->key_type[curr_index];
if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) {
put_flags = DB_YESOVERWRITE;
}
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &current_primary_key, put_flags);
if (error) {
......
......@@ -161,8 +161,8 @@ private:
ulong max_row_length(const uchar * buf);
int pack_row(DBT * row, const uchar * record);
void unpack_key(uchar * record, DBT const *key, uint index);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length);
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment