Commit 5dfe4d50 authored by marko's avatar marko

branches/zip: Fix a bug in the retrieval of old versions of records containing

externally stored columns.

innodb-zip.test: Correct the test case.  Without the fixes, the test
would fail, because the BLOB would be prepended with a 768-byte prefix
of the data.

row_upd_index_replace_new_col_vals_index_pos(),
row_upd_index_replace_new_col_vals(): Use only one "heap"
parameter that must be non-NULL.  When fetching externally
stored columns, use upd_field_t::orig_len.

upd_get_field_by_field_no(): New accessor function, for retrieving
an field from an update vector by field_no.

row_upd_index_replace_new_col_val(): New function, for replacing the
value from an update vector.  This used to be duplicated code in
row_upd_index_replace_new_col_vals_index_pos() and
row_upd_index_replace_new_col_vals().
parent aacaca62
......@@ -1867,7 +1867,7 @@ btr_cur_optimistic_update(
corresponding to new_entry is latched in mtr.
Thus the following call is safe. */
row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
FALSE, NULL, heap);
FALSE, heap);
old_rec_size = rec_offs_size(offsets);
new_rec_size = rec_get_converted_size(index, new_entry, 0);
......@@ -2144,7 +2144,7 @@ btr_cur_pessimistic_update(
purge would also have removed the clustered index record
itself. Thus the following call is safe. */
row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
FALSE, *heap, *heap);
FALSE, *heap);
if (!(flags & BTR_KEEP_SYS_FLAG)) {
row_upd_index_entry_sys_field(new_entry, index, DATA_ROLL_PTR,
roll_ptr);
......
......@@ -62,6 +62,16 @@ upd_field_set_field_no(
dict_index_t* index, /* in: index */
trx_t* trx); /* in: transaction */
/*************************************************************************
Returns a field of an update vector by field_no. */
UNIV_INLINE
const upd_field_t*
upd_get_field_by_field_no(
/*======================*/
/* out: update vector field, or NULL */
const upd_t* update, /* in: update vector */
ulint no) /* in: field_no */
__attribute__((nonnull, pure));
/*************************************************************************
Writes into the redo log the values of trx id and roll ptr and enough info
to determine their positions within a clustered index record. */
UNIV_INTERN
......@@ -198,14 +208,9 @@ row_upd_index_replace_new_col_vals_index_pos(
/* in: if TRUE, limit the replacement to
ordering fields of index; note that this
does not work for non-clustered indexes. */
mem_heap_t* heap, /* in: memory heap to which we allocate and
copy the new values, set this as NULL if you
do not want allocation */
mem_heap_t* ext_heap);/* in: memory heap where to allocate
column prefixes of externally stored
columns, may be NULL if the index
record does not contain externally
stored columns or column prefixes */
mem_heap_t* heap) /* in: memory heap for allocating and
copying the new values */
__attribute__((nonnull));
/***************************************************************
Replaces the new column values stored in the update vector to the index entry
given. */
......@@ -222,14 +227,9 @@ row_upd_index_replace_new_col_vals(
const upd_t* update, /* in: an update vector built for the
CLUSTERED index so that the field number in
an upd_field is the clustered index position */
mem_heap_t* heap, /* in: memory heap to which we allocate and
copy the new values, set this as NULL if you
do not want allocation */
mem_heap_t* ext_heap);/* in: memory heap where to allocate
column prefixes of externally stored
columns, may be NULL if the index
record does not contain externally
stored columns or column prefixes */
mem_heap_t* heap) /* in: memory heap for allocating and
copying the new values */
__attribute__((nonnull));
/***************************************************************
Replaces the new column values stored in the update vector. */
UNIV_INTERN
......
......@@ -97,6 +97,29 @@ upd_field_set_field_no(
dfield_get_type(&upd_field->new_val));
}
/*************************************************************************
Returns a field of an update vector by field_no. */
UNIV_INLINE
const upd_field_t*
upd_get_field_by_field_no(
/*======================*/
/* out: update vector field, or NULL */
const upd_t* update, /* in: update vector */
ulint no) /* in: field_no */
{
ulint i;
for (i = 0; i < upd_get_n_fields(update); i++) {
const upd_field_t* uf = upd_get_nth_field(update, i);
if (uf->field_no == no) {
return(uf);
}
}
return(NULL);
}
/*************************************************************************
Updates the trx id and roll ptr field in a clustered index record when
a row is updated or marked deleted. */
......
......@@ -49,21 +49,22 @@ t1 CREATE TABLE `t1` (
PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 KEY_BLOCK_SIZE=1
drop table t1;
create table t1(a int primary key) engine=innodb key_block_size=9;
ERROR HY000: Table storage engine 'InnoDB' does not support the create option 'KEY_BLOCK_SIZE'
create table t1(a int not null, b text, index(b(10))) engine=innodb
key_block_size=1;
insert into t1 values (1,1);
create table t2(b text)engine=innodb;
insert into t2 values(concat('1abcdefghijklmnopqrstuvwxyz', repeat('A',5000)));
insert into t1 select 1, b from t2;
commit;
begin;
update t1 set b=repeat('B',100);
select a,left(b,40),b=1 is_equal from t1;
a left(b,40) is_equal
1 1 1
select a,left(b,40) from t1 natural join t2;
a left(b,40)
1 1abcdefghijklmnopqrstuvwxyzAAAAAAAAAAAAA
rollback;
select a,left(b,40),b=1 is_equal from t1;
a left(b,40) is_equal
1 1 1
select a,left(b,40) from t1 natural join t2;
a left(b,40)
1 1abcdefghijklmnopqrstuvwxyzAAAAAAAAAAAAA
drop table t1;
drop table t2;
set global innodb_file_per_table=0;
set global innodb_file_format=0;
......@@ -37,15 +37,13 @@ key_block_size=1;
show create table t1;
drop table t1;
--error 1478
create table t1(a int primary key) engine=innodb key_block_size=9;
create table t1(a int not null, b text, index(b(10))) engine=innodb
key_block_size=1;
let $b=`select '1abcdefghijklmnopqrstuvwxyz'+repeat('A',5000)`;
create table t2(b text)engine=innodb;
insert into t2 values(concat('1abcdefghijklmnopqrstuvwxyz', repeat('A',5000)));
eval insert into t1 values (1,$b);
insert into t1 select 1, b from t2;
commit;
connect (a,localhost,root,,);
......@@ -56,18 +54,19 @@ begin;
update t1 set b=repeat('B',100);
connection b;
eval select a,left(b,40),b=$b is_equal from t1;
select a,left(b,40) from t1 natural join t2;
connection a;
rollback;
connection b;
eval select a,left(b,40),b=$b is_equal from t1;
select a,left(b,40) from t1 natural join t2;
connection default;
disconnect a;
disconnect b;
drop table t1;
drop table t2;
eval set global innodb_file_per_table=$per_table;
eval set global innodb_file_format=$format;
......@@ -865,6 +865,97 @@ row_upd_ext_fetch(
return(buf);
}
/***************************************************************
Replaces the new column value stored in the update vector in
the given index entry field. */
static
void
row_upd_index_replace_new_col_val(
/*==============================*/
dfield_t* dfield, /* in/out: data field
of the index entry */
const dict_field_t* field, /* in: index field */
const dict_col_t* col, /* in: field->col */
const upd_field_t* uf, /* in: update field */
ulint zip_size)/* in: compressed page
size of the table, or 0 */
{
ulint len;
const byte* data;
dfield_copy_data(dfield, &uf->new_val);
if (dfield_is_null(dfield)) {
return;
}
len = dfield_get_len(dfield);
data = dfield_get_data(dfield);
if (field->prefix_len > 0) {
ibool fetch_ext = dfield_is_ext(dfield)
&& len < (ulint) field->prefix_len
+ BTR_EXTERN_FIELD_REF_SIZE;
if (fetch_ext) {
ulint l = len;
len = field->prefix_len;
data = row_upd_ext_fetch(data, l, zip_size,
&len, heap);
}
len = dtype_get_at_most_n_mbchars(col->prtype,
col->mbminlen, col->mbmaxlen,
field->prefix_len, len,
(const char*) data);
dfield_set_data(dfield, data, len);
if (!fetch_ext) {
dfield_dup(dfield, heap);
}
return;
}
switch (uf->orig_len) {
byte* buf;
case BTR_EXTERN_FIELD_REF_SIZE:
/* Restore the original locally stored
part of the column. In the undo log,
InnoDB writes a longer prefix of externally
stored columns, so that column prefixes
in secondary indexes can be reconstructed. */
dfield_set_data(dfield,
data + len - BTR_EXTERN_FIELD_REF_SIZE,
BTR_EXTERN_FIELD_REF_SIZE);
dfield_set_ext(dfield);
/* fall through */
case 0:
dfield_dup(dfield, heap);
break;
default:
/* Reconstruct the original locally
stored part of the column. The data
will have to be copied. */
ut_a(uf->orig_len > BTR_EXTERN_FIELD_REF_SIZE);
buf = mem_heap_alloc(heap, uf->orig_len);
/* Copy the locally stored prefix. */
memcpy(buf, data,
uf->orig_len - BTR_EXTERN_FIELD_REF_SIZE);
/* Copy the BLOB pointer. */
memcpy(buf + uf->orig_len - BTR_EXTERN_FIELD_REF_SIZE,
data + len - BTR_EXTERN_FIELD_REF_SIZE,
BTR_EXTERN_FIELD_REF_SIZE);
dfield_set_data(dfield, buf, uf->orig_len);
dfield_set_ext(dfield);
break;
}
}
/***************************************************************
Replaces the new column values stored in the update vector to the index entry
given. */
......@@ -885,18 +976,12 @@ row_upd_index_replace_new_col_vals_index_pos(
/* in: if TRUE, limit the replacement to
ordering fields of index; note that this
does not work for non-clustered indexes. */
mem_heap_t* heap, /* in: memory heap to which we allocate and
copy the new values, set this as NULL if you
do not want allocation */
mem_heap_t* ext_heap)/* in: memory heap where to allocate
column prefixes of externally stored
columns, may be NULL if the index
record does not contain externally
stored columns or column prefixes */
mem_heap_t* heap, /* in: memory heap for allocating and
copying the new values */
{
ulint j;
ulint i;
ulint n_fields;
const ulint zip_size = dict_table_zip_size(index->table);
ut_ad(index);
......@@ -908,80 +993,19 @@ row_upd_index_replace_new_col_vals_index_pos(
n_fields = dict_index_get_n_fields(index);
}
for (j = 0; j < n_fields; j++) {
dict_field_t* field
= dict_index_get_nth_field(index, j);
const dict_col_t* col
= dict_field_get_col(field);
for (i = 0; i < upd_get_n_fields(update); i++) {
upd_field_t* upd_field;
dfield_t* dfield;
upd_field = upd_get_nth_field(update, i);
if (upd_field->field_no != j) {
continue;
}
dfield = dtuple_get_nth_field(entry, j);
dfield_copy_data(dfield, &upd_field->new_val);
if (dfield_is_null(dfield)) {
break;
}
if (field->prefix_len > 0) {
ulint len
= dfield_get_len(dfield);
const byte* data
= dfield_get_data(dfield);
ibool fetch_ext
= dfield_is_ext(dfield)
&& len < (ulint) field->prefix_len
+ BTR_EXTERN_FIELD_REF_SIZE;
if (fetch_ext) {
ulint l
= len;
ulint zip_size
= dict_table_zip_size(
index->table);
ut_a(ext_heap);
len = field->prefix_len;
data = row_upd_ext_fetch(data, l,
zip_size,
&len,
ext_heap);
}
len = dtype_get_at_most_n_mbchars(
col->prtype,
col->mbminlen,
col->mbmaxlen,
field->prefix_len,
len, (const char*) data);
dfield_set_data(dfield, data, len);
if (fetch_ext && heap && heap == ext_heap) {
/* Skip the dfield_dup() below,
as the column prefix has already
been allocated from ext_heap. */
break;
}
}
for (i = 0; i < n_fields; i++) {
const dict_field_t* field;
const dict_col_t* col;
const upd_field_t* uf;
if (heap) {
dfield_dup(dfield, heap);
}
field = dict_index_get_nth_field(index, i);
col = dict_field_get_col(field);
uf = upd_get_field_by_field_no(update, i);
break;
if (uf) {
row_upd_index_replace_new_col_val(
dtuple_get_nth_field(entry, i),
field, col, uf, zip_size);
}
}
}
......@@ -1002,101 +1026,31 @@ row_upd_index_replace_new_col_vals(
const upd_t* update, /* in: an update vector built for the
CLUSTERED index so that the field number in
an upd_field is the clustered index position */
mem_heap_t* heap, /* in: memory heap to which we allocate and
copy the new values, set this as NULL if you
do not want allocation */
mem_heap_t* ext_heap)/* in: memory heap where to allocate
column prefixes of externally stored
columns, may be NULL if the index
record does not contain externally
stored columns or column prefixes */
mem_heap_t* heap) /* in: memory heap for allocating and
copying the new values */
{
ulint j;
ulint i;
dict_index_t* clust_index;
ut_ad(index);
clust_index = dict_table_get_first_index(index->table);
ulint i;
const dict_index_t* clust_index
= dict_table_get_first_index(index->table);
const ulint zip_size
= dict_table_zip_size(index->table);
dtuple_set_info_bits(entry, update->info_bits);
for (j = 0; j < dict_index_get_n_fields(index); j++) {
dict_field_t* field
= dict_index_get_nth_field(index, j);
const dict_col_t* col
= dict_field_get_col(field);
const ulint clust_pos
= dict_col_get_clust_pos(col, clust_index);
for (i = 0; i < upd_get_n_fields(update); i++) {
upd_field_t* upd_field;
dfield_t* dfield;
upd_field = upd_get_nth_field(update, i);
if (upd_field->field_no != clust_pos) {
continue;
}
dfield = dtuple_get_nth_field(entry, j);
dfield_copy_data(dfield, &upd_field->new_val);
if (dfield_is_null(dfield)) {
break;
}
if (field->prefix_len > 0) {
ulint len
= dfield_get_len(dfield);
const byte* data
= dfield_get_data(dfield);
ibool fetch_ext
= dfield_is_ext(dfield)
&& len < (ulint) field->prefix_len
+ BTR_EXTERN_FIELD_REF_SIZE;
if (fetch_ext) {
ulint l
= len;
ulint zip_size
= dict_table_zip_size(
index->table);
ut_a(ext_heap);
len = field->prefix_len;
data = row_upd_ext_fetch(data, l,
zip_size,
&len,
ext_heap);
}
len = dtype_get_at_most_n_mbchars(
col->prtype,
col->mbminlen,
col->mbmaxlen,
field->prefix_len,
len, (const char*) data);
dfield_set_data(dfield, data, len);
if (fetch_ext && heap && heap == ext_heap) {
/* Skip the dfield_dup() below,
as the column prefix has already
been allocated from ext_heap. */
break;
}
}
for (i = 0; i < dict_index_get_n_fields(index); i++) {
const dict_field_t* field;
const dict_col_t* col;
const upd_field_t* uf;
if (heap) {
dfield_dup(dfield, heap);
}
field = dict_index_get_nth_field(index, i);
col = dict_field_get_col(field);
uf = upd_get_field_by_field_no(
update, dict_col_get_clust_pos(col, clust_index));
break;
if (uf) {
row_upd_index_replace_new_col_val(
dtuple_get_nth_field(entry, i),
field, col, uf);
}
}
}
......
......@@ -1552,8 +1552,7 @@ trx_undo_prev_version_build(
/* The page containing the clustered index record
corresponding to entry is latched in mtr. Thus the
following call is safe. */
row_upd_index_replace_new_col_vals(entry, index, update,
heap, heap);
row_upd_index_replace_new_col_vals(entry, index, update, heap);
buf = mem_heap_alloc(heap, rec_get_converted_size(index, entry,
n_ext));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment