Commit ec5937a7 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

refs #5727 dont allow upserts with secondary keys and other issues raised during a code review

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@51076 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3d3a9674
#if TOKU_INCLUDE_UPSERT
// Point updates and upserts
//
// Restrictions:
// No triggers
// No binary logging
......@@ -18,8 +18,7 @@
// x = if(x=0,0,x-1)
// Session variable enables fast updates and fast upserts
// Session variable disables slow updates and slow upserts
// Bugs:
// Does this work with hot indexing? Probably not.
// Future features:
// Support more primary key types
// Allow statement based binary logging
......@@ -29,9 +28,6 @@
// Support more complicated update expressions
// Replace field_offset
int tokudb_fast_update_debug = 0;
int tokudb_upsert_debug = 0;
// Debug function to dump an Item
static void dump_item(Item *item) {
fprintf(stderr, "%u", item->type());
......@@ -101,7 +97,6 @@ static Field *find_field_by_name(TABLE *table, Item *item) {
if (strcmp(table->s->db.str, field_item->db_name) != 0 ||
strcmp(table->s->table_name.str, field_item->table_name) != 0)
return NULL;
// TODO: item->field may be a shortcut instead of this table lookup
Field *found_field = NULL;
for (uint i = 0; i < table->s->fields; i++) {
Field *test_field = table->s->field[i];
......@@ -112,6 +107,7 @@ static Field *find_field_by_name(TABLE *table, Item *item) {
}
return found_field;
#else
// item->field may be a shortcut instead of the above table lookup
return field_item->field;
#endif
}
......@@ -134,9 +130,11 @@ static uint32_t update_field_offset(uint32_t null_bytes, KEY_AND_COL_INFO *kc_in
// of where conditions (conds). The function returns 0 if the update is handled in the storage engine.
// Otherwise, an error is returned.
int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &update_values, Item *conds) {
TOKUDB_DBUG_ENTER("ha_tokudb::fast_update");
int error = 0;
if (tokudb_fast_update_debug) {
if (tokudb_debug & TOKUDB_DEBUG_UPSERT) {
dump_item_list("fields", update_fields);
dump_item_list("values", update_values);
if (conds) {
......@@ -144,15 +142,19 @@ int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &upda
}
}
if (update_fields.elements < 1 || update_fields.elements != update_values.elements)
return ENOTSUP; // something is fishy with the parameters
if (update_fields.elements < 1 || update_fields.elements != update_values.elements) {
error = ENOTSUP; // something is fishy with the parameters
goto return_error;
}
if (error == 0 && !check_fast_update(thd, update_fields, update_values, conds))
if (!check_fast_update(thd, update_fields, update_values, conds)) {
error = ENOTSUP;
goto check_error;
}
if (error == 0)
error = send_update_message(update_fields, update_values, conds, transaction);
check_error:
if (error != 0) {
if (get_disable_slow_update(thd))
error = HA_ERR_UNSUPPORTED;
......@@ -160,7 +162,8 @@ int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &upda
print_error(error, MYF(0));
}
return error;
return_error:
TOKUDB_DBUG_RETURN(error);
}
// Return true if an expression is a simple int expression or a simple function of +- int expression.
......@@ -297,25 +300,21 @@ static bool check_all_update_expressions(List<Item> &fields, List<Item> &values,
return true;
}
#if 0
static bool field_name_in_primary_key(TABLE *table, const char *field_name) {
if (table->s->primary_key >= table->s->keys)
return false;
static bool full_field_in_key(TABLE *table, Field *field) {
assert(table->s->primary_key < table->s->keys);
KEY *key = &table->s->key_info[table->s->primary_key];
if (key->key_parts != 1)
return false;
KEY_PART_INFO *key_part = &key->key_part[0];
if (key->key_length != key_part->store_length)
return false;
if (strcmp(field_name, key_part->field->field_name) != 0)
for (uint i = 0; i < key->key_parts; i++) {
KEY_PART_INFO *key_part = &key->key_part[i];
if (strcmp(field->field_name, key_part->field->field_name) == 0) {
return key_part->length == field->field_length;
}
}
return false;
return true;
}
#endif
// Check that an expression looks like fieldname = constant, fieldname is part of the
// primary key, and the named field is an int, char or varchar type. Return true if it does.
static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP &pk_fields) {
static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP *pk_fields) {
if (item->type() != Item::FUNC_ITEM)
return false;
Item_func *func = static_cast<Item_func*>(item);
......@@ -328,7 +327,7 @@ static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP &p
Field *field = find_field_by_name(table, arguments[0]);
if (field == NULL)
return false;
if (!bitmap_test_and_clear(&pk_fields, field->field_index))
if (!bitmap_test_and_clear(pk_fields, field->field_index))
return false;
switch (field->type()) {
case MYSQL_TYPE_TINY:
......@@ -336,9 +335,11 @@ static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP &p
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_LONGLONG:
return arguments[1]->type() == Item::INT_ITEM || arguments[1]->type() == Item::STRING_ITEM;
case MYSQL_TYPE_STRING:
case MYSQL_TYPE_VARCHAR:
return arguments[1]->type() == Item::INT_ITEM || arguments[1]->type() == Item::STRING_ITEM;
return full_field_in_key(table, field) &&
(arguments[1]->type() == Item::INT_ITEM || arguments[1]->type() == Item::STRING_ITEM);
default:
return false;
}
......@@ -366,7 +367,7 @@ static bool check_point_update(Item *conds, TABLE *table) {
switch (conds->type()) {
case Item::FUNC_ITEM:
result = check_pk_field_equal_constant(conds, table, pk_fields);
result = check_pk_field_equal_constant(conds, table, &pk_fields);
break;
case Item::COND_ITEM: {
Item_cond *cond_item = static_cast<Item_cond*>(conds);
......@@ -376,7 +377,7 @@ static bool check_point_update(Item *conds, TABLE *table) {
Item *list_item;
result = true;
while (result == true && (list_item = li++)) {
result = check_pk_field_equal_constant(list_item, table, pk_fields);
result = check_pk_field_equal_constant(list_item, table, &pk_fields);
}
break;
}
......@@ -393,9 +394,10 @@ static bool check_point_update(Item *conds, TABLE *table) {
// Return true if there are any clustering keys (except the primary).
// Precompute this when the table is opened.
static bool clustering_keys_exist(TABLE *table) {
for (uint i = 0; i < table->s->keys; i++)
if (i != table->s->primary_key && (table->s->key_info[i].flags & HA_CLUSTERING))
for (uint keynr = 0; keynr < table->s->keys; keynr++) {
if (keynr != table->s->primary_key && (table->s->key_info[keynr].flags & HA_CLUSTERING))
return true;
}
return false;
}
......@@ -579,7 +581,8 @@ int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update
Item_cond *cond_item = static_cast<Item_cond*>(conds);
List_iterator<Item> li(*cond_item->argument_list());
Item *list_item;
for (error = 0; error == 0 && (list_item = li++); ) {
error = 0;
while (error == 0 && (list_item = li++)) {
error = save_in_field(list_item, table);
}
} else
......@@ -638,30 +641,38 @@ int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update
// An upsert consists of a row and a list of update expressions (update_fields[i] = update_values[i]).
// The function returns 0 if the upsert is handled in the storage engine. Otherwise, an error code is returned.
int ha_tokudb::upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values) {
TOKUDB_DBUG_ENTER("ha_tokudb::upsert");
int error = 0;
if (tokudb_upsert_debug) {
if (tokudb_debug & TOKUDB_DEBUG_UPSERT) {
fprintf(stderr, "upsert\n");
dump_item_list("update_fields", update_fields);
dump_item_list("update_values", update_values);
}
if (update_fields.elements < 1 || update_fields.elements != update_values.elements)
return ENOTSUP; // not an upsert or something is fishy with the parameters
if (update_fields.elements < 1 || update_fields.elements != update_values.elements) {
error = ENOTSUP; // not an upsert or something is fishy with the parameters
goto return_error;
}
if (error == 0 && !check_upsert(thd, update_fields, update_values))
if (!check_upsert(thd, update_fields, update_values)) {
error = ENOTSUP;
goto check_error;
}
if (error == 0)
error = send_upsert_message(thd, update_fields, update_values, transaction);
check_error:
if (error != 0) {
if (get_disable_slow_upsert(thd))
error = HA_ERR_UNSUPPORTED;
if (error != ENOTSUP)
print_error(error, MYF(0));
}
return error;
return_error:
TOKUDB_DBUG_RETURN(error);
}
// Check if an upsert can be handled by this storage engine. Return trus if it can.
......@@ -689,8 +700,8 @@ bool ha_tokudb::check_upsert(THD *thd, List<Item> &update_fields, List<Item> &up
if (table->s->primary_key >= table->s->keys)
return false;
// no clustering keys (need to broadcast an increment into the clustering keys since we are selecting with the primary key)
if (clustering_keys_exist(table))
// no secondary keys
if (table->s->keys > 1)
return false;
if (!check_all_update_expressions(update_fields, update_values, table))
......
......@@ -118,6 +118,7 @@ extern ulong tokudb_debug;
#define TOKUDB_DEBUG_CHECK_KEY 1024
#define TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS 2048
#define TOKUDB_DEBUG_ALTER_TABLE_INFO 4096
#define TOKUDB_DEBUG_UPSERT 8192
#define TOKUDB_TRACE(f, ...) \
printf("%d:%s:%d:" f, my_tid(), __FILE__, __LINE__, ##__VA_ARGS__);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment