Commit f73bdb68 authored by Sergei Golubchik's avatar Sergei Golubchik

bugfix: UPDATE and virtual BLOBs

When updating a table with virtual BLOB columns, the following might happen:
- an old record is read from the table, it has no virtual blob values
- update_virtual_fields() is run, vcol blob gets its value into the
  record. But only a pointer to the value is in the table->record[0],
  the value is in Field_blob::value String (but it doesn't have to be!
  it can be in the record, if the column is just a copy of another
  columns: ... b VARCHAR, c BLOB AS (b) ...)
- store_record(table,record[1]), old record now is in record[1]
- fill_record() prepares new values in record[0], vcol blob is updated,
  new value replaces the old one in the Field_blob::value
- now both record[1] and record[0] have a pointer that points to the
  *new* vcol blob value. Or record[1] has a pointer to nowhere if
  Field_blob::value had to realloc.

To resolve this we unlink vcol blobs from the pointer to the
data (in the record[1]). Because the value is not *always* in
the Field_blob::value String, we need to remember what blobs
were unlinked. The orphan memory must be freed manually.

To complicate the matter, ha_update_row() is also used in
multi-update, in REPLACE, in INSERT ... ON DUPLICATE KEY UPDATE,
also on REPLACE ... SELECT, REPLACE DELAYED, and LOAD DATA REPLACE, etc
parent aebb1038
......@@ -54,3 +54,103 @@ select * from t;
a b c
9 10 10
drop table t, t2;
create table t1 (a int, b int, c int, d int, e int);
insert t1 values (1,2,3,4,5), (1,2,3,4,5);
create table t (a int primary key,
b int, c blob as (b), index (c(57)),
d blob, e blob as (d), index (e(57)))
replace select * from t1;
Warnings:
Warning 1906 The value specified for computed column 'c' in table 't' ignored
Warning 1906 The value specified for computed column 'e' in table 't' ignored
Warning 1906 The value specified for computed column 'c' in table 't' ignored
Warning 1906 The value specified for computed column 'e' in table 't' ignored
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
1 2 2 4 4
update t set a=10, b=1, d=1;
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
10 1 1 1 1
replace t (a,b,d) values (10,2,2);
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
10 2 2 2 2
insert t(a,b,d) values (10) on duplicate key update b=3;
ERROR 21S01: Column count doesn't match value count at row 1
insert t(a,b,d) values (10,2,2) on duplicate key update b=3, d=3;
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
10 3 3 3 3
replace t (a,b,d) select 10,4,4;
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
10 4 4 4 4
insert t(a,b,d) select 10,4,4 on duplicate key update b=5, d=5;
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
10 5 5 5 5
replace delayed t (a,b,d) values (10,6,6);
flush tables;
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
10 6 6 6 6
insert delayed t(a,b,d) values (10,6,6) on duplicate key update b=7, d=7;
flush tables;
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
10 7 7 7 7
load data infile 'MYSQLTEST_VARDIR/tmp/vblobs.txt' replace into table t;
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
10 8 8 8 8
update t set a=11, b=9, d=9 where a>5;
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
11 9 9 9 9
create table t2 select * from t;
update t, t2 set t.b=10, t.d=10 where t.a=t2.a;
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
11 10 10 10 10
update t, t tt set t.b=11, tt.d=11 where t.a=tt.a;
check table t;
Table Op Msg_type Msg_text
test.t check status OK
select * from t;
a b c d e
11 11 11 11 11
drop table t, t1, t2;
......@@ -65,3 +65,48 @@ create table t2 select * from t;
update t, t2 set t.b=10 where t.a=t2.a;
check table t; select * from t;
drop table t, t2;
#
# blobs
# This tests BLOB_VALUE_ORPHANAGE
#
create table t1 (a int, b int, c int, d int, e int);
insert t1 values (1,2,3,4,5), (1,2,3,4,5);
create table t (a int primary key,
b int, c blob as (b), index (c(57)),
d blob, e blob as (d), index (e(57)))
replace select * from t1;
check table t; select * from t;
update t set a=10, b=1, d=1;
check table t; select * from t;
replace t (a,b,d) values (10,2,2);
check table t; select * from t;
--error ER_WRONG_VALUE_COUNT_ON_ROW
insert t(a,b,d) values (10) on duplicate key update b=3;
insert t(a,b,d) values (10,2,2) on duplicate key update b=3, d=3;
check table t; select * from t;
replace t (a,b,d) select 10,4,4;
check table t; select * from t;
insert t(a,b,d) select 10,4,4 on duplicate key update b=5, d=5;
check table t; select * from t;
replace delayed t (a,b,d) values (10,6,6);
flush tables;
check table t; select * from t;
insert delayed t(a,b,d) values (10,6,6) on duplicate key update b=7, d=7;
flush tables;
check table t; select * from t;
--write_file $MYSQLTEST_VARDIR/tmp/vblobs.txt
10 8 foo 8 foo
EOF
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
--eval load data infile '$MYSQLTEST_VARDIR/tmp/vblobs.txt' replace into table t
--remove_file $MYSQLTEST_VARDIR/tmp/vblobs.txt
check table t; select * from t;
update t set a=11, b=9, d=9 where a>5;
check table t; select * from t;
create table t2 select * from t;
update t, t2 set t.b=10, t.d=10 where t.a=t2.a;
check table t; select * from t;
update t, t tt set t.b=11, tt.d=11 where t.a=tt.a;
check table t; select * from t;
drop table t, t1, t2;
......@@ -3324,6 +3324,9 @@ class Field_blob :public Field_longstr {
uint max_packed_col_length(uint max_length);
void free() { value.free(); }
inline void clear_temporary() { bzero((uchar*) &value, sizeof(value)); }
inline bool owns_ptr(uchar* p) const { return p == (uchar*)value.ptr(); }
inline void own_value_ptr()
{ value.reset((char*)get_ptr(), get_length(), get_length(), value.charset()); }
uint size_of() const { return sizeof(*this); }
bool has_charset(void) const
{ return charset() == &my_charset_bin ? FALSE : TRUE; }
......
......@@ -201,6 +201,99 @@ typedef struct st_user_var_events
bool unsigned_flag;
} BINLOG_USER_VAR_EVENT;
/*
When updating a table with virtual BLOB columns, the following might happen:
- an old record is read from the table, it has no vcol blob.
- update_virtual_fields() is run, vcol blob gets its value into the
record. But only a pointer to the value is in the table->record[0],
the value is in Field_blob::value String (or, it can be elsewhere!)
- store_record(table,record[1]), old record now is in record[1]
- fill_record() prepares new values in record[0], vcol blob is updated,
new value replaces the old one in the Field_blob::value
- now both record[1] and record[0] have a pointer that points to the
*new* vcol blob value. Or record[1] has a pointer to nowhere if
Field_blob::value had to realloc.
To resolve this we unlink vcol blobs from the pointer to the
data (in the record[1]). The orphan memory must be freed manually
(but, again, only if it was owned by Field_blob::value String).
With REPLACE and INSERT ... ON DUP KEY UPATE it's even more complex.
There is no store_record(table,record[1]), instead the row is read
directly into record[1].
*/
struct BLOB_VALUE_ORPHANAGE {
MY_BITMAP map;
TABLE *table;
BLOB_VALUE_ORPHANAGE() { map.bitmap= NULL; }
~BLOB_VALUE_ORPHANAGE() { free(); }
bool init(TABLE *table_arg)
{
table= table_arg;
if (table->s->virtual_fields && table->s->blob_fields)
return bitmap_init(&map, NULL, table->s->virtual_fields, FALSE);
map.bitmap= NULL;
return 0;
}
void free() { bitmap_free(&map); }
/** Remove blob's ownership from blob value memory
@note the memory becomes orphaned, it needs to be freed using
free_orphans() or re-attached back to blobs using adopt_orphans()
*/
void make_orphans()
{
DBUG_ASSERT(!table || !table->s->virtual_fields || !table->s->blob_fields || map.bitmap);
if (!map.bitmap)
return;
for (Field **ptr=table->vfield; *ptr; ptr++)
{
Field_blob *vb= (Field_blob*)(*ptr);
if (!(vb->flags & BLOB_FLAG) || !vb->owns_ptr(vb->get_ptr()))
continue;
bitmap_set_bit(&map, ptr - table->vfield);
vb->clear_temporary();
}
}
/** Frees orphaned blob values
@note It is assumed that value pointers are in table->record[1], while
Field_blob::ptr's point to table->record[0] as usual
*/
void free_orphans()
{
DBUG_ASSERT(!table || !table->s->virtual_fields || !table->s->blob_fields || map.bitmap);
if (!map.bitmap)
return;
for (Field **ptr=table->vfield; *ptr; ptr++)
{
Field_blob *vb= (Field_blob*)(*ptr);
if (vb->flags & BLOB_FLAG && bitmap_fast_test_and_clear(&map, ptr - table->vfield))
my_free(vb->get_ptr(table->s->rec_buff_length));
}
DBUG_ASSERT(bitmap_is_clear_all(&map));
}
/** Restores blob's ownership over previously orphaned values */
void adopt_orphans()
{
DBUG_ASSERT(!table || !table->s->virtual_fields || !table->s->blob_fields || map.bitmap);
if (!map.bitmap)
return;
for (Field **ptr=table->vfield; *ptr; ptr++)
{
Field_blob *vb= (Field_blob*)(*ptr);
if (vb->flags & BLOB_FLAG && bitmap_fast_test_and_clear(&map, ptr - table->vfield))
vb->own_value_ptr();
}
DBUG_ASSERT(bitmap_is_clear_all(&map));
}
};
/*
The COPY_INFO structure is used by INSERT/REPLACE code.
The schema of the row counting by the INSERT/INSERT ... ON DUPLICATE KEY
......@@ -213,7 +306,7 @@ typedef struct st_user_var_events
of the INSERT ... ON DUPLICATE KEY UPDATE no matter whether the row
was actually changed or not.
*/
typedef struct st_copy_info {
struct COPY_INFO {
ha_rows records; /**< Number of processed records */
ha_rows deleted; /**< Number of deleted records */
ha_rows updated; /**< Number of updated records */
......@@ -229,7 +322,8 @@ typedef struct st_copy_info {
/* for VIEW ... WITH CHECK OPTION */
TABLE_LIST *view;
TABLE_LIST *table_list; /* Normal table */
} COPY_INFO;
BLOB_VALUE_ORPHANAGE vblobs0, vblobs1; // vcol blobs of record[0] and record[1]
};
class Key_part_spec :public Sql_alloc {
......@@ -5317,6 +5411,7 @@ class multi_update :public select_result_interceptor
TABLE_LIST *update_tables, *table_being_updated;
TABLE **tmp_tables, *main_table, *table_to_update;
TMP_TABLE_PARAM *tmp_table_param;
BLOB_VALUE_ORPHANAGE *vblobs;
ha_rows updated, found;
List <Item> *fields, *values;
List <Item> **fields_for_table, **values_for_table;
......
......@@ -812,6 +812,11 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
info.update_values= &update_values;
info.view= (table_list->view ? table_list : 0);
info.table_list= table_list;
if (duplic != DUP_ERROR)
{
info.vblobs0.init(table);
info.vblobs1.init(table);
}
/*
Count warnings for all inserts.
......@@ -1184,7 +1189,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
thd->lex->current_select->save_leaf_tables(thd);
thd->lex->current_select->first_cond_optimization= 0;
}
DBUG_RETURN(FALSE);
abort:
......@@ -1698,9 +1702,12 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
}
if (table->vfield)
{
info->vblobs0.make_orphans();
table->move_fields(table->field, table->record[1], table->record[0]);
table->update_virtual_fields(VCOL_UPDATE_INDEXED);
info->vblobs1.make_orphans();
table->move_fields(table->field, table->record[0], table->record[1]);
info->vblobs0.adopt_orphans();
}
if (info->handle_duplicates == DUP_UPDATE)
{
......@@ -1861,6 +1868,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
trg_error= 1;
goto ok_or_after_trg_err;
}
info->vblobs1.free_orphans();
/* Let us attempt do write_row() once more */
}
}
......@@ -1911,6 +1919,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
my_safe_afree(key,table->s->max_unique_length);
if (!table->file->has_transactions())
thd->transaction.stmt.modified_non_trans_table= TRUE;
info->vblobs1.free_orphans();
DBUG_RETURN(trg_error);
err:
......@@ -1922,6 +1931,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
if (key)
my_safe_afree(key, table->s->max_unique_length);
table->column_bitmaps_set(save_read_set, save_write_set);
info->vblobs1.free_orphans();
DBUG_RETURN(1);
}
......@@ -3130,10 +3140,14 @@ static void free_delayed_insert_blobs(register TABLE *table)
{
for (Field **ptr=table->field ; *ptr ; ptr++)
{
if ((*ptr)->flags & BLOB_FLAG)
Field_blob *f= (Field_blob*)(*ptr);
if (f->flags & BLOB_FLAG)
{
my_free(((Field_blob *) (*ptr))->get_ptr());
((Field_blob *) (*ptr))->reset();
if (f->vcol_info)
f->free();
else
my_free(f->get_ptr());
f->reset();
}
}
}
......@@ -3155,6 +3169,9 @@ bool Delayed_insert::handle_inserts(void)
table->next_number_field=table->found_next_number_field;
table->use_all_columns();
info.vblobs0.init(table);
info.vblobs1.init(table);
THD_STAGE_INFO(&thd, stage_upgrading_lock);
if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock,
thd.variables.lock_wait_timeout))
......@@ -3261,6 +3278,8 @@ bool Delayed_insert::handle_inserts(void)
if (info.handle_duplicates == DUP_UPDATE)
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
thd.clear_error(); // reset error for binlog
if (table->vfield)
table->update_virtual_fields(VCOL_UPDATE_FOR_WRITE);
if (write_record(&thd, table, &info))
{
info.error_count++; // Ignore errors
......@@ -3367,6 +3386,8 @@ bool Delayed_insert::handle_inserts(void)
DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
goto err;
}
info.vblobs0.free();
info.vblobs1.free();
query_cache_invalidate3(&thd, table, 1);
mysql_mutex_lock(&mutex);
DBUG_RETURN(0);
......@@ -3375,6 +3396,8 @@ bool Delayed_insert::handle_inserts(void)
#ifndef DBUG_OFF
max_rows= 0; // For DBUG output
#endif
info.vblobs0.free();
info.vblobs1.free();
/* Remove all not used rows */
mysql_mutex_lock(&mutex);
while ((row=rows.get()))
......@@ -3622,6 +3645,11 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
restore_record(table,s->default_values); // Get empty record
table->reset_default_fields();
table->next_number_field=table->found_next_number_field;
if (info.handle_duplicates != DUP_ERROR)
{
info.vblobs0.init(table);
info.vblobs1.init(table);
}
#ifdef HAVE_REPLICATION
if (thd->rgi_slave &&
......
......@@ -21,7 +21,6 @@
/* Instead of including sql_lex.h we add this typedef here */
typedef List<Item> List_item;
typedef struct st_copy_info COPY_INFO;
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, TABLE *table,
List<Item> &fields, List_item *values,
......
......@@ -539,6 +539,12 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
!(thd->variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES)))
? (*escaped)[0] : INT_MAX;
if (handle_duplicates != DUP_ERROR)
{
info.vblobs0.init(table);
info.vblobs1.init(table);
}
READ_INFO read_info(thd, file, tot_length,
ex->cs ? ex->cs : thd->variables.collation_database,
*field_term,*ex->line_start, *ex->line_term, *enclosed,
......
......@@ -273,6 +273,7 @@ int mysql_update(THD *thd,
SORT_INFO *file_sort= 0;
READ_RECORD info;
SELECT_LEX *select_lex= &thd->lex->select_lex;
BLOB_VALUE_ORPHANAGE vblobs;
ulonglong id;
List<Item> all_fields;
killed_state killed_status= NOT_KILLED;
......@@ -724,6 +725,8 @@ int mysql_update(THD *thd,
table->reset_default_fields();
vblobs.init(table);
/*
We can use compare_record() to optimize away updates if
the table handler is returning all columns OR if
......@@ -745,6 +748,9 @@ int mysql_update(THD *thd,
explain->tracker.on_record_after_where();
store_record(table,record[1]);
vblobs.make_orphans();
if (fill_record_n_invoke_before_triggers(thd, table, fields, values, 0,
TRG_EVENT_UPDATE))
break; /* purecov: inspected */
......@@ -904,7 +910,9 @@ int mysql_update(THD *thd,
error= 1;
break;
}
vblobs.free_orphans();
}
vblobs.free_orphans();
ANALYZE_STOP_TRACKING(&explain->command_tracker);
table->auto_increment_field_not_null= FALSE;
dup_key_found= 0;
......@@ -1752,6 +1760,8 @@ int multi_update::prepare(List<Item> &not_used_values,
table_count);
values_for_table= (List_item **) thd->alloc(sizeof(List_item *) *
table_count);
vblobs= (BLOB_VALUE_ORPHANAGE *)thd->calloc(sizeof(*vblobs) * table_count);
if (thd->is_fatal_error)
DBUG_RETURN(1);
for (i=0 ; i < table_count ; i++)
......@@ -1784,6 +1794,7 @@ int multi_update::prepare(List<Item> &not_used_values,
TABLE *table= ((Item_field*)(fields_for_table[i]->head()))->field->table;
switch_to_nullable_trigger_fields(*fields_for_table[i], table);
switch_to_nullable_trigger_fields(*values_for_table[i], table);
vblobs[i].init(table);
}
}
copy_field= new Copy_field[max_fields];
......@@ -2065,6 +2076,8 @@ multi_update::~multi_update()
free_tmp_table(thd, tmp_tables[cnt]);
tmp_table_param[cnt].cleanup();
}
vblobs[cnt].free_orphans();
vblobs[cnt].free();
}
}
if (copy_field)
......@@ -2110,7 +2123,9 @@ int multi_update::send_data(List<Item> &not_used_values)
can_compare_record= records_are_comparable(table);
table->status|= STATUS_UPDATED;
vblobs[offset].free_orphans();
store_record(table,record[1]);
vblobs[offset].make_orphans();
if (fill_record_n_invoke_before_triggers(thd, table,
*fields_for_table[offset],
*values_for_table[offset], 0,
......@@ -2327,6 +2342,7 @@ int multi_update::do_updates()
goto err;
}
table->file->extra(HA_EXTRA_NO_CACHE);
empty_record(table);
check_opt_it.rewind();
while(TABLE *tbl= check_opt_it++)
......@@ -2400,7 +2416,9 @@ int multi_update::do_updates()
goto err2;
table->status|= STATUS_UPDATED;
vblobs[offset].free_orphans();
store_record(table,record[1]);
vblobs[offset].make_orphans();
/* Copy data from temporary table to current table */
for (copy_field_ptr=copy_field;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment