Commit cd9706b2 authored by Michael Widenius's avatar Michael Widenius

Fixes bug when we run bcmp() on row when the storage engine hasn't filled in all fields in the row.

This was triggered by innodb.innodb_multi_update, where we had a static length row without nulls and xtradb didn't fill in the delete-marker byte


include/my_bitmap.h:
  Added prototype for bitmap_union_is_set_all()
mysys/my_bitmap.c:
  Added function to check if union of two bit maps covers all bits.
sql/mysql_priv.h:
  Updated protype for compare_record()
sql/sql_insert.cc:
  Send to compare_record() flag if all fields are used.
sql/sql_select.cc:
  Set share->null_bytes_for_compare.
sql/sql_update.cc:
  In compare_record() don't use the fast cmp_record() (which is basically memcmp) if we don't know that all fields exists.
  Don't compare the null_bytes if there is no data there.
sql/table.cc:
  Store in share->null_bytes_for_compare the number of bytes that has null or bit fields (but not delete marker)
  Store in can_cmp_whole_record if we can use memcmp() (assuming all rows are read) to compare rows in compare_record()
sql/table.h:
  Added two elements in table->share to speed up checking how updated rows can be compared.
parent 34e0c8f4
...@@ -53,6 +53,8 @@ extern my_bool bitmap_is_overlapping(const MY_BITMAP *map1, ...@@ -53,6 +53,8 @@ extern my_bool bitmap_is_overlapping(const MY_BITMAP *map1,
extern my_bool bitmap_test_and_set(MY_BITMAP *map, uint bitmap_bit); extern my_bool bitmap_test_and_set(MY_BITMAP *map, uint bitmap_bit);
extern my_bool bitmap_test_and_clear(MY_BITMAP *map, uint bitmap_bit); extern my_bool bitmap_test_and_clear(MY_BITMAP *map, uint bitmap_bit);
extern my_bool bitmap_fast_test_and_set(MY_BITMAP *map, uint bitmap_bit); extern my_bool bitmap_fast_test_and_set(MY_BITMAP *map, uint bitmap_bit);
extern my_bool bitmap_union_is_set_all(const MY_BITMAP *map1,
const MY_BITMAP *map2);
extern uint bitmap_set_next(MY_BITMAP *map); extern uint bitmap_set_next(MY_BITMAP *map);
extern uint bitmap_get_first(const MY_BITMAP *map); extern uint bitmap_get_first(const MY_BITMAP *map);
extern uint bitmap_get_first_set(const MY_BITMAP *map); extern uint bitmap_get_first_set(const MY_BITMAP *map);
......
...@@ -378,6 +378,24 @@ void bitmap_intersect(MY_BITMAP *map, const MY_BITMAP *map2) ...@@ -378,6 +378,24 @@ void bitmap_intersect(MY_BITMAP *map, const MY_BITMAP *map2)
} }
} }
/* True if union of bitmaps have all bits set */
my_bool bitmap_union_is_set_all(const MY_BITMAP *map1, const MY_BITMAP *map2)
{
my_bitmap_map *m1= map1->bitmap, *m2= map2->bitmap, *end;
DBUG_ASSERT(map1->bitmap && map2->bitmap &&
map1->n_bits==map2->n_bits);
*map1->last_word_ptr|= map1->last_word_mask;
end= map1->last_word_ptr;
while ( m1 <= end)
if ((*m1++ | *m2++) != 0xFFFFFFFF)
return FALSE;
return TRUE;
}
/* /*
Set/clear all bits above a bit. Set/clear all bits above a bit.
......
...@@ -1064,7 +1064,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1064,7 +1064,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length); char* packet, uint packet_length);
void log_slow_statement(THD *thd); void log_slow_statement(THD *thd);
bool check_dup(const char *db, const char *name, TABLE_LIST *tables); bool check_dup(const char *db, const char *name, TABLE_LIST *tables);
bool compare_record(TABLE *table); bool compare_record(TABLE *table, bool all_columns_exists);
bool append_file_to_dir(THD *thd, const char **filename_ptr, bool append_file_to_dir(THD *thd, const char **filename_ptr,
const char *table_name); const char *table_name);
void wait_while_table_is_used(THD *thd, TABLE *table, void wait_while_table_is_used(THD *thd, TABLE *table,
......
...@@ -1509,9 +1509,10 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) ...@@ -1509,9 +1509,10 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
table->file->adjust_next_insert_id_after_explicit_value( table->file->adjust_next_insert_id_after_explicit_value(
table->next_number_field->val_int()); table->next_number_field->val_int());
info->touched++; info->touched++;
if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ && if (((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ) &&
!bitmap_is_subset(table->write_set, table->read_set)) || !bitmap_is_subset(table->write_set, table->read_set)) ||
compare_record(table)) compare_record(table, bitmap_union_is_set_all(table->write_set,
table->read_set)))
{ {
if ((error=table->file->ha_update_row(table->record[1], if ((error=table->file->ha_update_row(table->record[1],
table->record[0])) && table->record[0])) &&
......
...@@ -10396,7 +10396,7 @@ create_tmp_table(THD *thd,TMP_TABLE_PARAM *param,List<Item> &fields, ...@@ -10396,7 +10396,7 @@ create_tmp_table(THD *thd,TMP_TABLE_PARAM *param,List<Item> &fields,
table->null_flags= (uchar*) table->record[0]; table->null_flags= (uchar*) table->record[0];
share->null_fields= null_count+ hidden_null_count; share->null_fields= null_count+ hidden_null_count;
share->null_bytes= null_pack_length; share->null_bytes= share->null_bytes_for_compare= null_pack_length;
} }
null_count= (blob_count == 0) ? 1 : 0; null_count= (blob_count == 0) ? 1 : 0;
hidden_field_count=param->hidden_field_count; hidden_field_count=param->hidden_field_count;
...@@ -10762,7 +10762,7 @@ TABLE *create_virtual_tmp_table(THD *thd, List<Create_field> &field_list) ...@@ -10762,7 +10762,7 @@ TABLE *create_virtual_tmp_table(THD *thd, List<Create_field> &field_list)
{ {
table->null_flags= (uchar*) table->record[0]; table->null_flags= (uchar*) table->record[0];
share->null_fields= null_count; share->null_fields= null_count;
share->null_bytes= null_pack_length; share->null_bytes= share->null_bytes_for_compare= null_pack_length;
} }
table->in_use= thd; /* field->reset() may access table->in_use */ table->in_use= thd; /* field->reset() may access table->in_use */
......
...@@ -27,14 +27,14 @@ ...@@ -27,14 +27,14 @@
/* Return 0 if row hasn't changed */ /* Return 0 if row hasn't changed */
bool compare_record(TABLE *table) bool compare_record(TABLE *table, bool all_columns_exists)
{ {
if (table->s->blob_fields + table->s->varchar_fields == 0) if (table->s->can_cmp_whole_record && all_columns_exists)
return cmp_record(table,record[1]); return cmp_record(table,record[1]);
/* Compare null bits */ /* Compare null bits */
if (memcmp(table->null_flags, if (memcmp(table->null_flags,
table->null_flags+table->s->rec_buff_length, table->null_flags+table->s->rec_buff_length,
table->s->null_bytes)) table->s->null_bytes_for_compare))
return TRUE; // Diff in NULL value return TRUE; // Diff in NULL value
/* Compare updated fields */ /* Compare updated fields */
for (Field **ptr= table->field ; *ptr ; ptr++) for (Field **ptr= table->field ; *ptr ; ptr++)
...@@ -186,7 +186,7 @@ int mysql_update(THD *thd, ...@@ -186,7 +186,7 @@ int mysql_update(THD *thd,
bool using_limit= limit != HA_POS_ERROR; bool using_limit= limit != HA_POS_ERROR;
bool safe_update= test(thd->options & OPTION_SAFE_UPDATES); bool safe_update= test(thd->options & OPTION_SAFE_UPDATES);
bool used_key_is_modified, transactional_table, will_batch; bool used_key_is_modified, transactional_table, will_batch;
bool can_compare_record; bool can_compare_record, all_columns_exists;
int res; int res;
int error, loc_error; int error, loc_error;
uint used_index= MAX_KEY, dup_key_found; uint used_index= MAX_KEY, dup_key_found;
...@@ -574,6 +574,9 @@ int mysql_update(THD *thd, ...@@ -574,6 +574,9 @@ int mysql_update(THD *thd,
can_compare_record= (!(table->file->ha_table_flags() & can_compare_record= (!(table->file->ha_table_flags() &
HA_PARTIAL_COLUMN_READ) || HA_PARTIAL_COLUMN_READ) ||
bitmap_is_subset(table->write_set, table->read_set)); bitmap_is_subset(table->write_set, table->read_set));
all_columns_exists= (can_compare_record &&
bitmap_union_is_set_all(table->write_set,
table->read_set));
while (!(error=info.read_record(&info)) && !thd->killed) while (!(error=info.read_record(&info)) && !thd->killed)
{ {
...@@ -591,7 +594,7 @@ int mysql_update(THD *thd, ...@@ -591,7 +594,7 @@ int mysql_update(THD *thd,
found++; found++;
if (!can_compare_record || compare_record(table)) if (!can_compare_record || compare_record(table, all_columns_exists))
{ {
if ((res= table_list->view_check_option(thd, ignore)) != if ((res= table_list->view_check_option(thd, ignore)) !=
VIEW_CHECK_OK) VIEW_CHECK_OK)
...@@ -1758,11 +1761,14 @@ bool multi_update::send_data(List<Item> &not_used_values) ...@@ -1758,11 +1761,14 @@ bool multi_update::send_data(List<Item> &not_used_values)
*/ */
if (table == table_to_update) if (table == table_to_update)
{ {
bool can_compare_record; bool can_compare_record, all_columns_exists;
can_compare_record= (!(table->file->ha_table_flags() & can_compare_record= (!(table->file->ha_table_flags() &
HA_PARTIAL_COLUMN_READ) || HA_PARTIAL_COLUMN_READ) ||
bitmap_is_subset(table->write_set, bitmap_is_subset(table->write_set,
table->read_set)); table->read_set));
all_columns_exists= (can_compare_record &&
bitmap_union_is_set_all(table->write_set,
table->read_set));
table->status|= STATUS_UPDATED; table->status|= STATUS_UPDATED;
store_record(table,record[1]); store_record(table,record[1]);
if (fill_record_n_invoke_before_triggers(thd, *fields_for_table[offset], if (fill_record_n_invoke_before_triggers(thd, *fields_for_table[offset],
...@@ -1777,7 +1783,7 @@ bool multi_update::send_data(List<Item> &not_used_values) ...@@ -1777,7 +1783,7 @@ bool multi_update::send_data(List<Item> &not_used_values)
*/ */
table->auto_increment_field_not_null= FALSE; table->auto_increment_field_not_null= FALSE;
found++; found++;
if (!can_compare_record || compare_record(table)) if (!can_compare_record || compare_record(table, all_columns_exists))
{ {
int error; int error;
if ((error= cur_table->view_check_option(thd, ignore)) != if ((error= cur_table->view_check_option(thd, ignore)) !=
...@@ -1964,7 +1970,7 @@ int multi_update::do_updates() ...@@ -1964,7 +1970,7 @@ int multi_update::do_updates()
DBUG_RETURN(0); DBUG_RETURN(0);
for (cur_table= update_tables; cur_table; cur_table= cur_table->next_local) for (cur_table= update_tables; cur_table; cur_table= cur_table->next_local)
{ {
bool can_compare_record; bool can_compare_record, all_columns_exists;
uint offset= cur_table->shared; uint offset= cur_table->shared;
table = cur_table->table; table = cur_table->table;
...@@ -1988,9 +1994,11 @@ int multi_update::do_updates() ...@@ -1988,9 +1994,11 @@ int multi_update::do_updates()
Setup copy functions to copy fields from temporary table Setup copy functions to copy fields from temporary table
*/ */
List_iterator_fast<Item> field_it(*fields_for_table[offset]); List_iterator_fast<Item> field_it(*fields_for_table[offset]);
Field **field= tmp_table->field + Field **field;
1 + unupdated_check_opt_tables.elements; // Skip row pointers
Copy_field *copy_field_ptr= copy_field, *copy_field_end; Copy_field *copy_field_ptr= copy_field, *copy_field_end;
/* Skip row pointers */
field= tmp_table->field + 1 + unupdated_check_opt_tables.elements;
for ( ; *field ; field++) for ( ; *field ; field++)
{ {
Item_field *item= (Item_field* ) field_it++; Item_field *item= (Item_field* ) field_it++;
...@@ -2005,6 +2013,9 @@ int multi_update::do_updates() ...@@ -2005,6 +2013,9 @@ int multi_update::do_updates()
HA_PARTIAL_COLUMN_READ) || HA_PARTIAL_COLUMN_READ) ||
bitmap_is_subset(table->write_set, bitmap_is_subset(table->write_set,
table->read_set)); table->read_set));
all_columns_exists= (can_compare_record &&
bitmap_union_is_set_all(table->write_set,
table->read_set));
for (;;) for (;;)
{ {
...@@ -2046,7 +2057,7 @@ int multi_update::do_updates() ...@@ -2046,7 +2057,7 @@ int multi_update::do_updates()
TRG_ACTION_BEFORE, TRUE)) TRG_ACTION_BEFORE, TRUE))
goto err2; goto err2;
if (!can_compare_record || compare_record(table)) if (!can_compare_record || compare_record(table, all_columns_exists))
{ {
int error; int error;
if ((error= cur_table->view_check_option(thd, ignore)) != if ((error= cur_table->view_check_option(thd, ignore)) !=
......
...@@ -687,6 +687,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, ...@@ -687,6 +687,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head,
const char **interval_array; const char **interval_array;
enum legacy_db_type legacy_db_type; enum legacy_db_type legacy_db_type;
my_bitmap_map *bitmaps; my_bitmap_map *bitmaps;
bool null_bits_are_used;
DBUG_ENTER("open_binary_frm"); DBUG_ENTER("open_binary_frm");
new_field_pack_flag= head[27]; new_field_pack_flag= head[27];
...@@ -1134,6 +1135,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, ...@@ -1134,6 +1135,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head,
goto err; goto err;
record= share->default_values-1; /* Fieldstart = 1 */ record= share->default_values-1; /* Fieldstart = 1 */
null_bits_are_used= share->null_fields != 0;
if (share->null_field_first) if (share->null_field_first)
{ {
null_flags= null_pos= (uchar*) record+1; null_flags= null_pos= (uchar*) record+1;
...@@ -1306,6 +1308,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, ...@@ -1306,6 +1308,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head,
reg_field->comment=comment; reg_field->comment=comment;
if (field_type == MYSQL_TYPE_BIT && !f_bit_as_char(pack_flag)) if (field_type == MYSQL_TYPE_BIT && !f_bit_as_char(pack_flag))
{ {
null_bits_are_used= 1;
if ((null_bit_pos+= field_length & 7) > 7) if ((null_bit_pos+= field_length & 7) > 7)
{ {
null_pos++; null_pos++;
...@@ -1594,6 +1597,13 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, ...@@ -1594,6 +1597,13 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head,
share->null_bytes= (null_pos - (uchar*) null_flags + share->null_bytes= (null_pos - (uchar*) null_flags +
(null_bit_pos + 7) / 8); (null_bit_pos + 7) / 8);
share->last_null_bit_pos= null_bit_pos; share->last_null_bit_pos= null_bit_pos;
share->null_bytes_for_compare= null_bits_are_used ? share->null_bytes : 0;
share->can_cmp_whole_record= (share->blob_fields == 0 &&
share->varchar_fields == 0 &&
(share->null_bytes_for_compare ==
share->null_bytes ||
!(handler_file->ha_table_flags() &
HA_PARTIAL_COLUMN_READ)));
share->db_low_byte_first= handler_file->low_byte_first(); share->db_low_byte_first= handler_file->low_byte_first();
share->column_bitmap_size= bitmap_buffer_size(share->fields); share->column_bitmap_size= bitmap_buffer_size(share->fields);
......
...@@ -401,6 +401,11 @@ typedef struct st_table_share ...@@ -401,6 +401,11 @@ typedef struct st_table_share
uint blob_ptr_size; /* 4 or 8 */ uint blob_ptr_size; /* 4 or 8 */
uint key_block_size; /* create key_block_size, if used */ uint key_block_size; /* create key_block_size, if used */
uint null_bytes, last_null_bit_pos; uint null_bytes, last_null_bit_pos;
/*
Same as null_bytes, except that if there is only a 'delete-marker' in
the record then this value is 0.
*/
uint null_bytes_for_compare;
uint fields; /* Number of fields */ uint fields; /* Number of fields */
uint rec_buff_length; /* Size of table->record[] buffer */ uint rec_buff_length; /* Size of table->record[] buffer */
uint keys, key_parts; uint keys, key_parts;
...@@ -432,6 +437,7 @@ typedef struct st_table_share ...@@ -432,6 +437,7 @@ typedef struct st_table_share
bool name_lock, replace_with_name_lock; bool name_lock, replace_with_name_lock;
bool waiting_on_cond; /* Protection against free */ bool waiting_on_cond; /* Protection against free */
bool deleting; /* going to delete this table */ bool deleting; /* going to delete this table */
bool can_cmp_whole_record;
ulong table_map_id; /* for row-based replication */ ulong table_map_id; /* for row-based replication */
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment