Commit da25388d authored by marko's avatar marko

branches/zip: Fetch externally stored columns only when the clustered index

record is protected by a latch or a lock.

dtuple_copy(): New function: Copy a data tuple.

row_upd_replace(): New function: Apply on a row an update vector that
was built for the clustered index.  Set up a cache of externally stored
column prefixes if needed.

undo_node_t: Add the fields undo_row, undo_ext.

row_undo_search_clust_to_pcur(): Initialize undo_row and undo_ext.

row_undo_mod_upd_exist_sec(): Instead of fetching prefixes of
externally stored columns, use the undo_row and undo_ext that were
initialized in row_undo_search_clust_to_pcur().

upd_node_t: Remove the field n_ext.  Add the fields upd_row and upd_ext.

row_upd_store_row(): Initialize the upd_row and upd_ext fields of upd_node_t.

row_upd_sec_index_entry(), row_upd_clust_rec_by_insert(): Instead of
fetching prefixes of externally stored columns, use the upd_row
and upd_ext that were initialized in row_upd_store_row().
parent 9188a049
......@@ -242,6 +242,17 @@ dtuple_set_n_fields(
/*================*/
dtuple_t* tuple, /* in: tuple */
ulint n_fields); /* in: number of fields */
/*************************************************************************
Copies a data tuple to another. This is a shallow copy; if a deep copy
is desired, dfield_dup() will have to be invoked on each field. */
UNIV_INLINE
dtuple_t*
dtuple_copy(
/*========*/
/* out, own: copy of tuple */
const dtuple_t* tuple, /* in: tuple to copy from */
mem_heap_t* heap); /* in: memory heap
where the tuple is created */
/**************************************************************
The following function returns the sum of data lengths of a tuple. The space
occupied by the field structs or the tuple struct is not counted. */
......
......@@ -384,6 +384,30 @@ dtuple_from_fields(
return(tuple);
}
/*************************************************************************
Copies a data tuple to another. This is a shallow copy; if a deep copy
is desired, dfield_dup() will have to be invoked on each field. */
UNIV_INLINE
dtuple_t*
dtuple_copy(
/*========*/
/* out, own: copy of tuple */
const dtuple_t* tuple, /* in: tuple to copy from */
mem_heap_t* heap) /* in: memory heap
where the tuple is created */
{
ulint n_fields = dtuple_get_n_fields(tuple);
dtuple_t* new_tuple = dtuple_create(heap, n_fields);
ulint i;
for (i = 0; i < n_fields; i++) {
dfield_copy(dtuple_get_nth_field(new_tuple, i),
dtuple_get_nth_field(tuple, i));
}
return(new_tuple);
}
/**************************************************************
The following function returns the sum of data lengths of a tuple. The space
occupied by the field structs or the tuple struct is not counted. Neither
......
......@@ -93,6 +93,9 @@ struct undo_node_struct{
row to handle */
row_ext_t* ext; /* NULL, or prefixes of the externally
stored columns of the row */
dtuple_t* undo_row;/* NULL, or the row after undo */
row_ext_t* undo_ext;/* NULL, or prefixes of the externally
stored columns of undo_row */
dict_index_t* index; /* the next index whose record should be
handled */
mem_heap_t* heap; /* memory heap used as auxiliary storage for
......
......@@ -231,6 +231,23 @@ row_upd_index_replace_new_col_vals(
record does not contain externally
stored columns or column prefixes */
/***************************************************************
Replaces the new column values stored in the update vector. */
void
row_upd_replace(
/*============*/
dtuple_t* row, /* in/out: row where replaced,
indexed by col_no;
the clustered index record must be
covered by a lock or a page latch to
prevent deletion (rollback or purge) */
row_ext_t** ext, /* out, own: NULL, or externally
stored column prefixes */
const dict_index_t* index, /* in: clustered index */
const upd_t* update, /* in: an update vector built for the
clustered index */
mem_heap_t* heap); /* in: memory heap */
/***************************************************************
Checks if an update vector changes an ordering field of an index record.
This function is fast if the update vector is short or the number of ordering
fields in the index is small. Otherwise, this can be quadratic.
......@@ -405,8 +422,10 @@ struct upd_node_struct{
heap) of the row to update; this must be reset
to NULL after a successful update */
row_ext_t* ext; /* NULL, or prefixes of the externally
stored columns of the row */
ulint n_ext; /* number of fields in ext_vec */
stored columns in the old row */
dtuple_t* upd_row;/* NULL, or a copy of the updated row */
row_ext_t* upd_ext;/* NULL, or prefixes of the externally
stored columns in upd_row */
mem_heap_t* heap; /* memory heap used as auxiliary storage;
this must be emptied after a successful
update */
......
......@@ -636,12 +636,12 @@ row_undo_mod_upd_exist_sec(
the secondary index record if we updated its fields
but alphabetically they stayed the same, e.g.,
'abc' -> 'aBc'. */
mem_heap_empty(heap);
entry = row_build_index_entry(node->undo_row,
node->undo_ext,
index, heap);
ut_a(entry);
/* TODO: lock the clustered index record
before fetching BLOBs */
row_upd_index_replace_new_col_vals(entry, index,
node->update,
NULL, heap);
err = row_undo_mod_del_unmark_sec_and_undo_update(
BTR_MODIFY_LEAF, thr, index, entry);
if (err == DB_FAIL) {
......
......@@ -24,9 +24,9 @@ Created 1/8/1997 Heikki Tuuri
#include "row0row.h"
#include "row0uins.h"
#include "row0umod.h"
#include "row0upd.h"
#include "row0mysql.h"
#include "srv0srv.h"
#include "row0merge.h"
/* How to undo row operations?
(1) For an insert, we have stored a prefix of the clustered index record
......@@ -185,6 +185,15 @@ row_undo_search_clust_to_pcur(
} else {
node->row = row_build(ROW_COPY_DATA, clust_index, rec,
offsets, NULL, &node->ext, node->heap);
if (node->update) {
node->undo_row = dtuple_copy(node->row, node->heap);
row_upd_replace(node->undo_row, &node->undo_ext,
clust_index, node->update, node->heap);
} else {
node->undo_row = NULL;
node->undo_ext = NULL;
}
btr_pcur_store_position(&(node->pcur), &mtr);
ret = TRUE;
......
......@@ -280,7 +280,8 @@ upd_node_create(
node->row = NULL;
node->ext = NULL;
node->n_ext = 0;
node->upd_row = NULL;
node->upd_ext = NULL;
node->index = NULL;
node->update = NULL;
......@@ -1097,6 +1098,89 @@ row_upd_index_replace_new_col_vals(
}
}
/***************************************************************
Replaces the new column values stored in the update vector. */
void
row_upd_replace(
/*============*/
dtuple_t* row, /* in/out: row where replaced,
indexed by col_no;
the clustered index record must be
covered by a lock or a page latch to
prevent deletion (rollback or purge) */
row_ext_t** ext, /* out, own: NULL, or externally
stored column prefixes */
const dict_index_t* index, /* in: clustered index */
const upd_t* update, /* in: an update vector built for the
clustered index */
mem_heap_t* heap) /* in: memory heap */
{
ulint col_no;
ulint i;
ulint n_cols;
ulint n_ext_cols;
ulint* ext_cols;
const dict_table_t* table;
ut_ad(row);
ut_ad(ext);
ut_ad(index);
ut_ad(dict_index_is_clust(index));
ut_ad(update);
ut_ad(heap);
n_cols = dtuple_get_n_fields(row);
table = index->table;
ut_ad(n_cols == dict_table_get_n_cols(table));
ext_cols = mem_heap_alloc(heap, n_cols * sizeof *ext_cols);
n_ext_cols = 0;
dtuple_set_info_bits(row, update->info_bits);
for (col_no = 0; col_no < n_cols; col_no++) {
const dict_col_t* col
= dict_table_get_nth_col(table, col_no);
const ulint clust_pos
= dict_col_get_clust_pos(col, index);
dfield_t* dfield;
if (UNIV_UNLIKELY(clust_pos == ULINT_UNDEFINED)) {
continue;
}
dfield = dtuple_get_nth_field(row, col_no);
for (i = 0; i < upd_get_n_fields(update); i++) {
const upd_field_t* upd_field
= upd_get_nth_field(update, i);
if (upd_field->field_no != clust_pos) {
continue;
}
dfield_copy_data(dfield, &upd_field->new_val);
break;
}
if (dfield_is_ext(dfield) && col->ord_part) {
ext_cols[n_ext_cols++] = col_no;
}
}
if (n_ext_cols) {
*ext = row_ext_create(n_ext_cols, ext_cols, row,
dict_table_zip_size(table), heap);
} else {
*ext = NULL;
}
}
/***************************************************************
Checks if an update vector changes an ordering field of an index record.
This function is fast if the update vector is short or the number of ordering
......@@ -1339,15 +1423,13 @@ row_upd_store_row(
ULINT_UNDEFINED, &heap);
node->row = row_build(ROW_COPY_DATA, clust_index, rec, offsets,
NULL, &node->ext, node->heap);
if (UNIV_LIKELY_NULL(node->ext)) {
node->n_ext = node->ext->n_ext;
if (node->is_delete) {
node->upd_row = NULL;
node->upd_ext = NULL;
} else {
node->n_ext = 0;
}
if (!node->is_delete) {
node->n_ext += btr_push_update_extern_fields(node->row,
node->update);
node->upd_row = dtuple_copy(node->row, node->heap);
row_upd_replace(node->upd_row, &node->upd_ext,
clust_index, node->update, node->heap);
}
if (UNIV_LIKELY_NULL(heap)) {
......@@ -1446,9 +1528,9 @@ row_upd_sec_index_entry(
}
/* Build a new index entry */
/* TODO: lock the clustered index record before fetching BLOBs */
row_upd_index_replace_new_col_vals(entry, index, node->update,
NULL, heap);
entry = row_build_index_entry(node->upd_row, node->upd_ext,
index, heap);
ut_a(entry);
/* Insert new index entry */
err = row_ins_index_entry(index, entry, 0, TRUE, thr);
......@@ -1559,23 +1641,20 @@ row_upd_clust_rec_by_insert(
}
}
mtr_commit(mtr);
if (!heap) {
heap = mem_heap_create(500);
}
node->state = UPD_NODE_INSERT_CLUSTERED;
entry = row_build_index_entry(node->row, node->ext, index, heap);
entry = row_build_index_entry(node->upd_row, node->upd_ext,
index, heap);
ut_a(entry);
/* The page containing the clustered index record is latched until
mtr_commit(mtr) below. Thus the following call is safe. */
row_upd_index_replace_new_col_vals(entry, index, node->update,
NULL, heap);
mtr_commit(mtr);
row_upd_index_entry_sys_field(entry, index, DATA_TRX_ID, trx->id);
if (node->n_ext) {
if (node->upd_ext) {
/* If we return from a lock wait, for example, we may have
extern fields marked as not-owned in entry (marked in the
if-branch above). We must unmark them. */
......@@ -1588,7 +1667,9 @@ row_upd_clust_rec_by_insert(
btr_cur_mark_dtuple_inherited_extern(entry, node->update);
}
err = row_ins_index_entry(index, entry, node->n_ext, TRUE, thr);
err = row_ins_index_entry(index, entry,
node->upd_ext ? node->upd_ext->n_ext : 0,
TRUE, thr);
mem_heap_free(heap);
return(err);
......@@ -1984,7 +2065,8 @@ row_upd(
if (node->row != NULL) {
node->row = NULL;
node->ext = NULL;
node->n_ext = 0;
node->upd_row = NULL;
node->upd_ext = NULL;
mem_heap_empty(node->heap);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment