Commit a7135ea3 authored by marko's avatar marko

branches/zip: Fix a bug in the updates of index records that contain a

column prefix of an externally stored column.

row_upd_ext_fetch(): New function.

row_upd_index_replace_new_col_vals(),
row_upd_index_replace_new_col_vals_index_pos(): Fetch prefixes of
externally stored columns when they are needed for column prefix
indexes.  For memory allocation, add the parameter ext_heap.  Avoid
repeating the inner loop after finding a  matching upd_field->field_no.
parent 2080273e
......@@ -1860,7 +1860,7 @@ any_extern:
ut_a(!n_ext);
row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
FALSE, NULL);
FALSE, NULL, heap);
old_rec_size = rec_offs_size(offsets);
new_rec_size = rec_get_converted_size(index, new_entry, 0);
......@@ -2131,7 +2131,7 @@ btr_cur_pessimistic_update(
rec_offs_make_valid(rec, index, offsets);
row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
FALSE, *heap);
FALSE, *heap, *heap);
if (!(flags & BTR_KEEP_SYS_FLAG)) {
row_upd_index_entry_sys_field(new_entry, index, DATA_ROLL_PTR,
roll_ptr);
......
......@@ -195,9 +195,14 @@ row_upd_index_replace_new_col_vals_index_pos(
/* in: if TRUE, limit the replacement to
ordering fields of index; note that this
does not work for non-clustered indexes. */
mem_heap_t* heap); /* in: memory heap to which we allocate and
mem_heap_t* heap, /* in: memory heap to which we allocate and
copy the new values, set this as NULL if you
do not want allocation */
mem_heap_t* ext_heap);/* in: memory heap where to allocate
column prefixes of externally stored
columns, may be NULL if the index
record does not contain externally
stored columns or column prefixes */
/***************************************************************
Replaces the new column values stored in the update vector to the index entry
given. */
......@@ -211,9 +216,14 @@ row_upd_index_replace_new_col_vals(
const upd_t* update, /* in: an update vector built for the
CLUSTERED index so that the field number in
an upd_field is the clustered index position */
mem_heap_t* heap); /* in: memory heap to which we allocate and
mem_heap_t* heap, /* in: memory heap to which we allocate and
copy the new values, set this as NULL if you
do not want allocation */
mem_heap_t* ext_heap);/* in: memory heap where to allocate
column prefixes of externally stored
columns, may be NULL if the index
record does not contain externally
stored columns or column prefixes */
/***************************************************************
Checks if an update vector changes an ordering field of an index record.
This function is fast if the update vector is short or the number of ordering
......
......@@ -638,7 +638,8 @@ row_undo_mod_upd_exist_sec(
'abc' -> 'aBc'. */
row_upd_index_replace_new_col_vals(entry, index,
node->update, NULL);
node->update,
NULL, heap);
err = row_undo_mod_del_unmark_sec_and_undo_update(
BTR_MODIFY_LEAF, thr, index, entry);
if (err == DB_FAIL) {
......
......@@ -832,6 +832,35 @@ skip_compare:
return(update);
}
/***************************************************************
Fetch a prefix of an externally stored column. This is similar
to row_ext_lookup(), but the row_ext_t holds the old values
of the column and must not be poisoned with the new values. */
static
byte*
row_upd_ext_fetch(
/*==============*/
/* out: BLOB prefix */
const byte* data, /* in: 'internally' stored part of the
field containing also the reference to
the external part */
ulint local_len, /* in: length of data, in bytes */
ulint zip_size, /* in: nonzero=compressed BLOB
page size, zero for uncompressed
BLOBs */
ulint* len, /* in: length of prefix to fetch;
out: fetched length of the prefix */
mem_heap_t* heap) /* in: heap where to allocate */
{
byte* buf = mem_heap_alloc(heap, *len);
*len = btr_copy_externally_stored_field_prefix(buf, *len,
zip_size,
data, local_len);
return(buf);
}
/***************************************************************
Replaces the new column values stored in the update vector to the index entry
given. */
......@@ -849,13 +878,15 @@ row_upd_index_replace_new_col_vals_index_pos(
/* in: if TRUE, limit the replacement to
ordering fields of index; note that this
does not work for non-clustered indexes. */
mem_heap_t* heap) /* in: memory heap to which we allocate and
mem_heap_t* heap, /* in: memory heap to which we allocate and
copy the new values, set this as NULL if you
do not want allocation */
mem_heap_t* ext_heap)/* in: memory heap where to allocate
column prefixes of externally stored
columns, may be NULL if the index
record does not contain externally
stored columns or column prefixes */
{
dict_field_t* field;
upd_field_t* upd_field;
dfield_t* dfield;
ulint j;
ulint i;
ulint n_fields;
......@@ -872,44 +903,78 @@ row_upd_index_replace_new_col_vals_index_pos(
for (j = 0; j < n_fields; j++) {
field = dict_index_get_nth_field(index, j);
dict_field_t* field
= dict_index_get_nth_field(index, j);
const dict_col_t* col
= dict_field_get_col(field);
for (i = 0; i < upd_get_n_fields(update); i++) {
upd_field_t* upd_field;
dfield_t* dfield;
upd_field = upd_get_nth_field(update, i);
if (upd_field->field_no == j) {
if (upd_field->field_no != j) {
continue;
}
dfield = dtuple_get_nth_field(entry, j);
dfield = dtuple_get_nth_field(entry, j);
dfield_copy_data(dfield, &upd_field->new_val);
dfield_copy_data(dfield, &upd_field->new_val);
if (dfield_is_null(dfield)) {
continue;
}
if (dfield_is_null(dfield)) {
break;
}
if (heap) {
dfield_dup(dfield, heap);
if (field->prefix_len > 0) {
ulint len
= dfield_get_len(dfield);
const char* data
= dfield_get_data(dfield);
ibool fetch_ext
= dfield_is_ext(dfield)
&& len < field->prefix_len
+ BTR_EXTERN_FIELD_REF_SIZE;
if (fetch_ext) {
ulint l
= len;
ulint zip_size
= dict_table_zip_size(
index->table);
ut_a(ext_heap);
len = field->prefix_len;
data = row_upd_ext_fetch(data, l,
zip_size,
&len,
ext_heap);
}
if (field->prefix_len > 0) {
len = dtype_get_at_most_n_mbchars(
col->prtype,
col->mbminlen,
col->mbmaxlen,
field->prefix_len,
len, data);
const dict_col_t* col
= dict_field_get_col(field);
ulint len
= dfield_get_len(dfield);
dfield_set_data(dfield, data, len);
len = dtype_get_at_most_n_mbchars(
col->prtype,
col->mbminlen,
col->mbmaxlen,
field->prefix_len,
len,
dfield_get_data(dfield));
dfield_set_len(dfield, len);
if (fetch_ext && heap && heap == ext_heap) {
/* Skip the dfield_dup() below,
as the column prefix has already
been allocated from ext_heap. */
break;
}
}
if (heap) {
dfield_dup(dfield, heap);
}
break;
}
}
}
......@@ -927,12 +992,15 @@ row_upd_index_replace_new_col_vals(
const upd_t* update, /* in: an update vector built for the
CLUSTERED index so that the field number in
an upd_field is the clustered index position */
mem_heap_t* heap) /* in: memory heap to which we allocate and
mem_heap_t* heap, /* in: memory heap to which we allocate and
copy the new values, set this as NULL if you
do not want allocation */
mem_heap_t* ext_heap)/* in: memory heap where to allocate
column prefixes of externally stored
columns, may be NULL if the index
record does not contain externally
stored columns or column prefixes */
{
upd_field_t* upd_field;
dfield_t* dfield;
ulint j;
ulint i;
dict_index_t* clust_index;
......@@ -945,47 +1013,78 @@ row_upd_index_replace_new_col_vals(
for (j = 0; j < dict_index_get_n_fields(index); j++) {
ulint clust_pos;
dict_field_t* field = dict_index_get_nth_field(index, j);
clust_pos = dict_col_get_clust_pos(field->col, clust_index);
dict_field_t* field
= dict_index_get_nth_field(index, j);
const dict_col_t* col
= dict_field_get_col(field);
const ulint clust_pos
= dict_col_get_clust_pos(col, clust_index);
for (i = 0; i < upd_get_n_fields(update); i++) {
upd_field_t* upd_field;
dfield_t* dfield;
upd_field = upd_get_nth_field(update, i);
if (upd_field->field_no == clust_pos) {
if (upd_field->field_no != clust_pos) {
continue;
}
dfield = dtuple_get_nth_field(entry, j);
dfield = dtuple_get_nth_field(entry, j);
dfield_copy_data(dfield, &upd_field->new_val);
dfield_copy_data(dfield, &upd_field->new_val);
if (dfield_is_null(dfield)) {
continue;
}
if (dfield_is_null(dfield)) {
break;
}
if (heap) {
dfield_dup(dfield, heap);
if (field->prefix_len > 0) {
ulint len
= dfield_get_len(dfield);
const char* data
= dfield_get_data(dfield);
if (dfield_is_ext(dfield)
&& len < field->prefix_len
+ BTR_EXTERN_FIELD_REF_SIZE) {
ulint l
= len;
ulint zip_size
= dict_table_zip_size(
index->table);
ut_a(ext_heap);
len = field->prefix_len;
data = row_upd_ext_fetch(data, l,
zip_size,
&len,
ext_heap);
}
if (field->prefix_len > 0) {
const dict_col_t* col
= dict_field_get_col(field);
ulint len
= dfield_get_len(dfield);
len = dtype_get_at_most_n_mbchars(
col->prtype,
col->mbminlen,
col->mbmaxlen,
field->prefix_len,
len, data);
len = dtype_get_at_most_n_mbchars(
col->prtype,
col->mbminlen,
col->mbmaxlen,
field->prefix_len,
len,
dfield_get_data(dfield));
dfield_set_data(dfield, data, len);
dfield_set_len(dfield, len);
if (fetch_ext && heap && heap == ext_heap) {
/* Skip the dfield_dup() below,
as the column prefix has already
been allocated from ext_heap. */
break;
}
}
if (heap) {
dfield_dup(dfield, heap);
}
break;
}
}
}
......@@ -1338,7 +1437,8 @@ row_upd_sec_index_entry(
}
/* Build a new index entry */
row_upd_index_replace_new_col_vals(entry, index, node->update, NULL);
row_upd_index_replace_new_col_vals(entry, index, node->update,
NULL, heap);
/* Insert new index entry */
err = row_ins_index_entry(index, entry, 0, TRUE, thr);
......@@ -1458,7 +1558,8 @@ row_upd_clust_rec_by_insert(
entry = row_build_index_entry(node->row, node->ext, index, heap);
row_upd_index_replace_new_col_vals(entry, index, node->update, NULL);
row_upd_index_replace_new_col_vals(entry, index, node->update,
NULL, heap);
row_upd_index_entry_sys_field(entry, index, DATA_TRX_ID, trx->id);
......
......@@ -1413,7 +1413,8 @@ trx_undo_prev_version_build(
entry = row_rec_to_index_entry(ROW_COPY_DATA, rec, index,
offsets, &n_ext, heap);
n_ext += btr_push_update_extern_fields(entry, update);
row_upd_index_replace_new_col_vals(entry, index, update, heap);
row_upd_index_replace_new_col_vals(entry, index, update,
heap, heap);
buf = mem_heap_alloc(heap, rec_get_converted_size(index, entry,
n_ext));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment