Commit 1fb4dfce authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

refs #5727 dont allow upserts with secondary keys and other issues raised during a code review

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@51076 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4e0a48bf
#if TOKU_INCLUDE_UPSERT #if TOKU_INCLUDE_UPSERT
// Point updates and upserts // Point updates and upserts
//
// Restrictions: // Restrictions:
// No triggers // No triggers
// No binary logging // No binary logging
...@@ -18,8 +18,7 @@ ...@@ -18,8 +18,7 @@
// x = if(x=0,0,x-1) // x = if(x=0,0,x-1)
// Session variable enables fast updates and fast upserts // Session variable enables fast updates and fast upserts
// Session variable disables slow updates and slow upserts // Session variable disables slow updates and slow upserts
// Bugs:
// Does this work with hot indexing? Probably not.
// Future features: // Future features:
// Support more primary key types // Support more primary key types
// Allow statement based binary logging // Allow statement based binary logging
...@@ -29,9 +28,6 @@ ...@@ -29,9 +28,6 @@
// Support more complicated update expressions // Support more complicated update expressions
// Replace field_offset // Replace field_offset
int tokudb_fast_update_debug = 0;
int tokudb_upsert_debug = 0;
// Debug function to dump an Item // Debug function to dump an Item
static void dump_item(Item *item) { static void dump_item(Item *item) {
fprintf(stderr, "%u", item->type()); fprintf(stderr, "%u", item->type());
...@@ -101,7 +97,6 @@ static Field *find_field_by_name(TABLE *table, Item *item) { ...@@ -101,7 +97,6 @@ static Field *find_field_by_name(TABLE *table, Item *item) {
if (strcmp(table->s->db.str, field_item->db_name) != 0 || if (strcmp(table->s->db.str, field_item->db_name) != 0 ||
strcmp(table->s->table_name.str, field_item->table_name) != 0) strcmp(table->s->table_name.str, field_item->table_name) != 0)
return NULL; return NULL;
// TODO: item->field may be a shortcut instead of this table lookup
Field *found_field = NULL; Field *found_field = NULL;
for (uint i = 0; i < table->s->fields; i++) { for (uint i = 0; i < table->s->fields; i++) {
Field *test_field = table->s->field[i]; Field *test_field = table->s->field[i];
...@@ -112,6 +107,7 @@ static Field *find_field_by_name(TABLE *table, Item *item) { ...@@ -112,6 +107,7 @@ static Field *find_field_by_name(TABLE *table, Item *item) {
} }
return found_field; return found_field;
#else #else
// item->field may be a shortcut instead of the above table lookup
return field_item->field; return field_item->field;
#endif #endif
} }
...@@ -134,9 +130,11 @@ static uint32_t update_field_offset(uint32_t null_bytes, KEY_AND_COL_INFO *kc_in ...@@ -134,9 +130,11 @@ static uint32_t update_field_offset(uint32_t null_bytes, KEY_AND_COL_INFO *kc_in
// of where conditions (conds). The function returns 0 if the update is handled in the storage engine. // of where conditions (conds). The function returns 0 if the update is handled in the storage engine.
// Otherwise, an error is returned. // Otherwise, an error is returned.
int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &update_values, Item *conds) { int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &update_values, Item *conds) {
TOKUDB_DBUG_ENTER("ha_tokudb::fast_update");
int error = 0; int error = 0;
if (tokudb_fast_update_debug) { if (tokudb_debug & TOKUDB_DEBUG_UPSERT) {
dump_item_list("fields", update_fields); dump_item_list("fields", update_fields);
dump_item_list("values", update_values); dump_item_list("values", update_values);
if (conds) { if (conds) {
...@@ -144,15 +142,19 @@ int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &upda ...@@ -144,15 +142,19 @@ int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &upda
} }
} }
if (update_fields.elements < 1 || update_fields.elements != update_values.elements) if (update_fields.elements < 1 || update_fields.elements != update_values.elements) {
return ENOTSUP; // something is fishy with the parameters error = ENOTSUP; // something is fishy with the parameters
goto return_error;
}
if (error == 0 && !check_fast_update(thd, update_fields, update_values, conds)) if (!check_fast_update(thd, update_fields, update_values, conds)) {
error = ENOTSUP; error = ENOTSUP;
goto check_error;
}
if (error == 0)
error = send_update_message(update_fields, update_values, conds, transaction); error = send_update_message(update_fields, update_values, conds, transaction);
check_error:
if (error != 0) { if (error != 0) {
if (get_disable_slow_update(thd)) if (get_disable_slow_update(thd))
error = HA_ERR_UNSUPPORTED; error = HA_ERR_UNSUPPORTED;
...@@ -160,7 +162,8 @@ int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &upda ...@@ -160,7 +162,8 @@ int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &upda
print_error(error, MYF(0)); print_error(error, MYF(0));
} }
return error; return_error:
TOKUDB_DBUG_RETURN(error);
} }
// Return true if an expression is a simple int expression or a simple function of +- int expression. // Return true if an expression is a simple int expression or a simple function of +- int expression.
...@@ -297,25 +300,21 @@ static bool check_all_update_expressions(List<Item> &fields, List<Item> &values, ...@@ -297,25 +300,21 @@ static bool check_all_update_expressions(List<Item> &fields, List<Item> &values,
return true; return true;
} }
#if 0 static bool full_field_in_key(TABLE *table, Field *field) {
static bool field_name_in_primary_key(TABLE *table, const char *field_name) { assert(table->s->primary_key < table->s->keys);
if (table->s->primary_key >= table->s->keys)
return false;
KEY *key = &table->s->key_info[table->s->primary_key]; KEY *key = &table->s->key_info[table->s->primary_key];
if (key->key_parts != 1) for (uint i = 0; i < key->key_parts; i++) {
return false; KEY_PART_INFO *key_part = &key->key_part[i];
KEY_PART_INFO *key_part = &key->key_part[0]; if (strcmp(field->field_name, key_part->field->field_name) == 0) {
if (key->key_length != key_part->store_length) return key_part->length == field->field_length;
return false; }
if (strcmp(field_name, key_part->field->field_name) != 0) }
return false; return false;
return true;
} }
#endif
// Check that an expression looks like fieldname = constant, fieldname is part of the // Check that an expression looks like fieldname = constant, fieldname is part of the
// primary key, and the named field is an int, char or varchar type. Return true if it does. // primary key, and the named field is an int, char or varchar type. Return true if it does.
static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP &pk_fields) { static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP *pk_fields) {
if (item->type() != Item::FUNC_ITEM) if (item->type() != Item::FUNC_ITEM)
return false; return false;
Item_func *func = static_cast<Item_func*>(item); Item_func *func = static_cast<Item_func*>(item);
...@@ -328,7 +327,7 @@ static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP &p ...@@ -328,7 +327,7 @@ static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP &p
Field *field = find_field_by_name(table, arguments[0]); Field *field = find_field_by_name(table, arguments[0]);
if (field == NULL) if (field == NULL)
return false; return false;
if (!bitmap_test_and_clear(&pk_fields, field->field_index)) if (!bitmap_test_and_clear(pk_fields, field->field_index))
return false; return false;
switch (field->type()) { switch (field->type()) {
case MYSQL_TYPE_TINY: case MYSQL_TYPE_TINY:
...@@ -336,9 +335,11 @@ static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP &p ...@@ -336,9 +335,11 @@ static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP &p
case MYSQL_TYPE_INT24: case MYSQL_TYPE_INT24:
case MYSQL_TYPE_LONG: case MYSQL_TYPE_LONG:
case MYSQL_TYPE_LONGLONG: case MYSQL_TYPE_LONGLONG:
return arguments[1]->type() == Item::INT_ITEM || arguments[1]->type() == Item::STRING_ITEM;
case MYSQL_TYPE_STRING: case MYSQL_TYPE_STRING:
case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VARCHAR:
return arguments[1]->type() == Item::INT_ITEM || arguments[1]->type() == Item::STRING_ITEM; return full_field_in_key(table, field) &&
(arguments[1]->type() == Item::INT_ITEM || arguments[1]->type() == Item::STRING_ITEM);
default: default:
return false; return false;
} }
...@@ -366,7 +367,7 @@ static bool check_point_update(Item *conds, TABLE *table) { ...@@ -366,7 +367,7 @@ static bool check_point_update(Item *conds, TABLE *table) {
switch (conds->type()) { switch (conds->type()) {
case Item::FUNC_ITEM: case Item::FUNC_ITEM:
result = check_pk_field_equal_constant(conds, table, pk_fields); result = check_pk_field_equal_constant(conds, table, &pk_fields);
break; break;
case Item::COND_ITEM: { case Item::COND_ITEM: {
Item_cond *cond_item = static_cast<Item_cond*>(conds); Item_cond *cond_item = static_cast<Item_cond*>(conds);
...@@ -376,7 +377,7 @@ static bool check_point_update(Item *conds, TABLE *table) { ...@@ -376,7 +377,7 @@ static bool check_point_update(Item *conds, TABLE *table) {
Item *list_item; Item *list_item;
result = true; result = true;
while (result == true && (list_item = li++)) { while (result == true && (list_item = li++)) {
result = check_pk_field_equal_constant(list_item, table, pk_fields); result = check_pk_field_equal_constant(list_item, table, &pk_fields);
} }
break; break;
} }
...@@ -393,9 +394,10 @@ static bool check_point_update(Item *conds, TABLE *table) { ...@@ -393,9 +394,10 @@ static bool check_point_update(Item *conds, TABLE *table) {
// Return true if there are any clustering keys (except the primary). // Return true if there are any clustering keys (except the primary).
// Precompute this when the table is opened. // Precompute this when the table is opened.
static bool clustering_keys_exist(TABLE *table) { static bool clustering_keys_exist(TABLE *table) {
for (uint i = 0; i < table->s->keys; i++) for (uint keynr = 0; keynr < table->s->keys; keynr++) {
if (i != table->s->primary_key && (table->s->key_info[i].flags & HA_CLUSTERING)) if (keynr != table->s->primary_key && (table->s->key_info[keynr].flags & HA_CLUSTERING))
return true; return true;
}
return false; return false;
} }
...@@ -579,7 +581,8 @@ int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update ...@@ -579,7 +581,8 @@ int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update
Item_cond *cond_item = static_cast<Item_cond*>(conds); Item_cond *cond_item = static_cast<Item_cond*>(conds);
List_iterator<Item> li(*cond_item->argument_list()); List_iterator<Item> li(*cond_item->argument_list());
Item *list_item; Item *list_item;
for (error = 0; error == 0 && (list_item = li++); ) { error = 0;
while (error == 0 && (list_item = li++)) {
error = save_in_field(list_item, table); error = save_in_field(list_item, table);
} }
} else } else
...@@ -638,30 +641,38 @@ int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update ...@@ -638,30 +641,38 @@ int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update
// An upsert consists of a row and a list of update expressions (update_fields[i] = update_values[i]). // An upsert consists of a row and a list of update expressions (update_fields[i] = update_values[i]).
// The function returns 0 if the upsert is handled in the storage engine. Otherwise, an error code is returned. // The function returns 0 if the upsert is handled in the storage engine. Otherwise, an error code is returned.
int ha_tokudb::upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values) { int ha_tokudb::upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values) {
TOKUDB_DBUG_ENTER("ha_tokudb::upsert");
int error = 0; int error = 0;
if (tokudb_upsert_debug) { if (tokudb_debug & TOKUDB_DEBUG_UPSERT) {
fprintf(stderr, "upsert\n"); fprintf(stderr, "upsert\n");
dump_item_list("update_fields", update_fields); dump_item_list("update_fields", update_fields);
dump_item_list("update_values", update_values); dump_item_list("update_values", update_values);
} }
if (update_fields.elements < 1 || update_fields.elements != update_values.elements) if (update_fields.elements < 1 || update_fields.elements != update_values.elements) {
return ENOTSUP; // not an upsert or something is fishy with the parameters error = ENOTSUP; // not an upsert or something is fishy with the parameters
goto return_error;
}
if (error == 0 && !check_upsert(thd, update_fields, update_values)) if (!check_upsert(thd, update_fields, update_values)) {
error = ENOTSUP; error = ENOTSUP;
goto check_error;
}
if (error == 0)
error = send_upsert_message(thd, update_fields, update_values, transaction); error = send_upsert_message(thd, update_fields, update_values, transaction);
check_error:
if (error != 0) { if (error != 0) {
if (get_disable_slow_upsert(thd)) if (get_disable_slow_upsert(thd))
error = HA_ERR_UNSUPPORTED; error = HA_ERR_UNSUPPORTED;
if (error != ENOTSUP) if (error != ENOTSUP)
print_error(error, MYF(0)); print_error(error, MYF(0));
} }
return error;
return_error:
TOKUDB_DBUG_RETURN(error);
} }
// Check if an upsert can be handled by this storage engine. Return trus if it can. // Check if an upsert can be handled by this storage engine. Return trus if it can.
...@@ -689,8 +700,8 @@ bool ha_tokudb::check_upsert(THD *thd, List<Item> &update_fields, List<Item> &up ...@@ -689,8 +700,8 @@ bool ha_tokudb::check_upsert(THD *thd, List<Item> &update_fields, List<Item> &up
if (table->s->primary_key >= table->s->keys) if (table->s->primary_key >= table->s->keys)
return false; return false;
// no clustering keys (need to broadcast an increment into the clustering keys since we are selecting with the primary key) // no secondary keys
if (clustering_keys_exist(table)) if (table->s->keys > 1)
return false; return false;
if (!check_all_update_expressions(update_fields, update_values, table)) if (!check_all_update_expressions(update_fields, update_values, table))
......
...@@ -118,6 +118,7 @@ extern ulong tokudb_debug; ...@@ -118,6 +118,7 @@ extern ulong tokudb_debug;
#define TOKUDB_DEBUG_CHECK_KEY 1024 #define TOKUDB_DEBUG_CHECK_KEY 1024
#define TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS 2048 #define TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS 2048
#define TOKUDB_DEBUG_ALTER_TABLE_INFO 4096 #define TOKUDB_DEBUG_ALTER_TABLE_INFO 4096
#define TOKUDB_DEBUG_UPSERT 8192
#define TOKUDB_TRACE(f, ...) \ #define TOKUDB_TRACE(f, ...) \
printf("%d:%s:%d:" f, my_tid(), __FILE__, __LINE__, ##__VA_ARGS__); printf("%d:%s:%d:" f, my_tid(), __FILE__, __LINE__, ##__VA_ARGS__);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment