Commit ff81faf6 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-13654 Various crashes due to DB_TRX_ID mismatch in table-rebuilding ALTER TABLE…LOCK=NONE

After MDEV-12288 and MDEV-13536, the DB_TRX_ID of old clustered index
records for which no history is available should be reset to 0.

This caused crashes in online table-rebuilding ALTER, because the
row_log_table_apply() is built on the assumption that the PRIMARY KEY
together with DB_TRX_ID,DB_ROLL_PTR identifies the record.

Both when copying the old table and when writing log about changes to
the old table, we must map "old" DB_TRX_ID to 0. "old" here is simply
"older than the trx_id of the ALTER TABLE transaction", because
the MDL_EXCLUSIVE (and exclusive InnoDB table lock) in
ha_innobase::prepare_inplace_alter_table() forces any transactions
accessing the table to commit or rollback. So, we know that we can
safely reset any DB_TRX_ID in the table that is older than the
transaction ID of the ALTER TABLE, because the undo log history would be
lost in a table-rebuilding ALTER.

Note: After a table-rebuilding online ALTER TABLE, the rebuilt table
may end up containing some nonzero DB_TRX_ID columns. The apply logic
identifies the rows by the combination of PRIMARY KEY and DB_TRX_ID.
These nonzero DB_TRX_ID would necessarily refer to concurrent DML
operations that were started during ha_innobase::inplace_alter_table().

row_log_allocate(): Add a parameter for the ALTER TABLE transaction.

row_log_t::min_trx: The ALTER TABLE transaction ID.

trx_id_check(): A debug function to check that DB_TRX_ID makes sense
(is either 0 or bigger than the ALTER TABLE transaction ID).

reset_trx_id[]: The reset DB_TRX_ID,DB_ROLL_PTR columns.

row_log_table_delete(), row_log_table_get_pk(): Reset the
DB_TRX_ID,DB_ROLL_PTR when they precede the ALTER TABLE transaction.

row_log_table_apply_delete(), row_log_table_apply_update():
Assert trx_id_check().

row_merge_insert_index_tuples(): Remove the unused parameter trx_id.

row_merge_read_clustered_index(): In a table-rebuilding ALTER,
reset the DB_TRX_ID,DB_ROLL_PTR when they precede the ALTER TABLE
transaction. Assert trx_id_check() on clustered index records that
are being buffered.
parent fdc47792
...@@ -4911,7 +4911,8 @@ prepare_inplace_alter_table_dict( ...@@ -4911,7 +4911,8 @@ prepare_inplace_alter_table_dict(
goto error_handling;); goto error_handling;);
rw_lock_x_lock(&ctx->add_index[a]->lock); rw_lock_x_lock(&ctx->add_index[a]->lock);
bool ok = row_log_allocate(ctx->add_index[a], bool ok = row_log_allocate(ctx->prebuilt->trx,
ctx->add_index[a],
NULL, true, NULL, NULL, NULL, true, NULL, NULL,
path); path);
rw_lock_x_unlock(&ctx->add_index[a]->lock); rw_lock_x_unlock(&ctx->add_index[a]->lock);
...@@ -4961,6 +4962,7 @@ prepare_inplace_alter_table_dict( ...@@ -4961,6 +4962,7 @@ prepare_inplace_alter_table_dict(
/* Allocate a log for online table rebuild. */ /* Allocate a log for online table rebuild. */
rw_lock_x_lock(&clust_index->lock); rw_lock_x_lock(&clust_index->lock);
bool ok = row_log_allocate( bool ok = row_log_allocate(
ctx->prebuilt->trx,
clust_index, ctx->new_table, clust_index, ctx->new_table,
!(ha_alter_info->handler_flags !(ha_alter_info->handler_flags
& Alter_inplace_info::ADD_PK_INDEX), & Alter_inplace_info::ADD_PK_INDEX),
......
...@@ -48,6 +48,7 @@ for online creation. ...@@ -48,6 +48,7 @@ for online creation.
bool bool
row_log_allocate( row_log_allocate(
/*=============*/ /*=============*/
const trx_t* trx, /*!< in: the ALTER TABLE transaction */
dict_index_t* index, /*!< in/out: index */ dict_index_t* index, /*!< in/out: index */
dict_table_t* table, /*!< in/out: new table being rebuilt, dict_table_t* table, /*!< in/out: new table being rebuilt,
or NULL when creating a secondary index */ or NULL when creating a secondary index */
......
...@@ -163,16 +163,31 @@ trx_write_trx_id( ...@@ -163,16 +163,31 @@ trx_write_trx_id(
/*=============*/ /*=============*/
byte* ptr, /*!< in: pointer to memory where written */ byte* ptr, /*!< in: pointer to memory where written */
trx_id_t id); /*!< in: id */ trx_id_t id); /*!< in: id */
/*****************************************************************//**
Reads a trx id from an index page. In case that the id size changes in /** Read a transaction identifier.
some future version, this function should be used instead of
mach_read_...
@return id */ @return id */
UNIV_INLINE inline
trx_id_t trx_id_t
trx_read_trx_id( trx_read_trx_id(const byte* ptr)
/*============*/ {
const byte* ptr); /*!< in: pointer to memory from where to read */ #if DATA_TRX_ID_LEN != 6
# error "DATA_TRX_ID_LEN != 6"
#endif
return(mach_read_from_6(ptr));
}
#ifdef UNIV_DEBUG
/** Check that the DB_TRX_ID in a record is valid.
@param[in] db_trx_id the DB_TRX_ID column to validate
@param[in] trx_id the id of the ALTER TABLE transaction */
inline bool trx_id_check(const void* db_trx_id, trx_id_t trx_id)
{
trx_id_t id = trx_read_trx_id(static_cast<const byte*>(db_trx_id));
ut_ad(id == 0 || id > trx_id);
return true;
}
#endif
/****************************************************************//** /****************************************************************//**
Looks for the trx instance with the given id in the rw trx_list. Looks for the trx instance with the given id in the rw trx_list.
@return the trx handle or NULL if not found */ @return the trx handle or NULL if not found */
......
...@@ -191,23 +191,6 @@ trx_write_trx_id( ...@@ -191,23 +191,6 @@ trx_write_trx_id(
mach_write_to_6(ptr, id); mach_write_to_6(ptr, id);
} }
/*****************************************************************//**
Reads a trx id from an index page. In case that the id size changes in
some future version, this function should be used instead of
mach_read_...
@return id */
UNIV_INLINE
trx_id_t
trx_read_trx_id(
/*============*/
const byte* ptr) /*!< in: pointer to memory from where to read */
{
#if DATA_TRX_ID_LEN != 6
# error "DATA_TRX_ID_LEN != 6"
#endif
return(mach_read_from_6(ptr));
}
/****************************************************************//** /****************************************************************//**
Looks for the trx handle with the given id in rw_trx_list. Looks for the trx handle with the given id in rw_trx_list.
The caller must be holding trx_sys->mutex. The caller must be holding trx_sys->mutex.
......
...@@ -45,6 +45,12 @@ ulint onlineddl_rowlog_rows; ...@@ -45,6 +45,12 @@ ulint onlineddl_rowlog_rows;
ulint onlineddl_rowlog_pct_used; ulint onlineddl_rowlog_pct_used;
ulint onlineddl_pct_progress; ulint onlineddl_pct_progress;
/** The DB_TRX_ID,DB_ROLL_PTR values for "no history is available" */
static const byte reset_trx_id[DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN] = {
0, 0, 0, 0, 0, 0,
0x80, 0, 0, 0, 0, 0, 0
};
/** Table row modification operations during online table rebuild. /** Table row modification operations during online table rebuild.
Delete-marked records are not copied to the rebuilt table. */ Delete-marked records are not copied to the rebuilt table. */
enum row_tab_op { enum row_tab_op {
...@@ -187,6 +193,20 @@ struct row_log_t { ...@@ -187,6 +193,20 @@ struct row_log_t {
new ones, or NULL if !table */ new ones, or NULL if !table */
dberr_t error; /*!< error that occurred during online dberr_t error; /*!< error that occurred during online
table rebuild */ table rebuild */
/** The transaction ID of the ALTER TABLE transaction. Any
concurrent DML would necessarily be logged with a larger
transaction ID, because ha_innobase::prepare_inplace_alter_table()
acts as a barrier that ensures that any concurrent transaction
that operates on the table would have been started after
ha_innobase::prepare_inplace_alter_table() returns and before
ha_innobase::commit_inplace_alter_table(commit=true) is invoked.
Due to the nondeterministic nature of purge and due to the
possibility of upgrading from an earlier version of MariaDB
or MySQL, it is possible that row_log_table_low() would be
fed DB_TRX_ID that precedes than min_trx. We must normalize
such references to reset_trx_id[]. */
trx_id_t min_trx;
trx_id_t max_trx;/*!< biggest observed trx_id in trx_id_t max_trx;/*!< biggest observed trx_id in
row_log_online_op(); row_log_online_op();
protected by mutex and index->lock S-latch, protected by mutex and index->lock S-latch,
...@@ -586,6 +606,7 @@ row_log_table_delete( ...@@ -586,6 +606,7 @@ row_log_table_delete(
ut_ad(dict_index_is_clust(new_index)); ut_ad(dict_index_is_clust(new_index));
ut_ad(!dict_index_is_online_ddl(new_index)); ut_ad(!dict_index_is_online_ddl(new_index));
ut_ad(index->online_log->min_trx);
/* Create the tuple PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in new_table. */ /* Create the tuple PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in new_table. */
if (index->online_log->same_pk) { if (index->online_log->same_pk) {
...@@ -612,16 +633,27 @@ row_log_table_delete( ...@@ -612,16 +633,27 @@ row_log_table_delete(
dfield_set_data(dfield, field, len); dfield_set_data(dfield, field, len);
} }
if (sys) { dfield_t* db_trx_id = dtuple_get_nth_field(
dfield_set_data( tuple, new_index->n_uniq);
dtuple_get_nth_field(tuple,
new_index->n_uniq), const bool replace_sys_fields
sys, DATA_TRX_ID_LEN); = sys
dfield_set_data( || trx_read_trx_id(static_cast<byte*>(db_trx_id->data))
dtuple_get_nth_field(tuple, < index->online_log->min_trx;
new_index->n_uniq + 1),
sys + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN); if (replace_sys_fields) {
if (!sys || trx_read_trx_id(sys)
< index->online_log->min_trx) {
sys = reset_trx_id;
}
dfield_set_data(db_trx_id, sys, DATA_TRX_ID_LEN);
dfield_set_data(db_trx_id + 1, sys + DATA_TRX_ID_LEN,
DATA_ROLL_PTR_LEN);
} }
ut_d(trx_id_check(db_trx_id->data,
index->online_log->min_trx));
} else { } else {
/* The PRIMARY KEY has changed. Translate the tuple. */ /* The PRIMARY KEY has changed. Translate the tuple. */
old_pk = row_log_table_get_pk( old_pk = row_log_table_get_pk(
...@@ -1184,6 +1216,7 @@ row_log_table_get_pk( ...@@ -1184,6 +1216,7 @@ row_log_table_get_pk(
ut_ad(log); ut_ad(log);
ut_ad(log->table); ut_ad(log->table);
ut_ad(log->min_trx);
if (log->same_pk) { if (log->same_pk) {
/* The PRIMARY KEY columns are unchanged. */ /* The PRIMARY KEY columns are unchanged. */
...@@ -1208,8 +1241,13 @@ row_log_table_get_pk( ...@@ -1208,8 +1241,13 @@ row_log_table_get_pk(
ut_ad(len == DATA_TRX_ID_LEN); ut_ad(len == DATA_TRX_ID_LEN);
} }
memcpy(sys, rec + trx_id_offs, const byte* ptr = trx_read_trx_id(rec + trx_id_offs)
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); < log->min_trx
? reset_trx_id
: rec + trx_id_offs;
memcpy(sys, ptr, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
ut_d(trx_id_check(sys, log->min_trx));
} }
return(NULL); return(NULL);
...@@ -1326,9 +1364,15 @@ row_log_table_get_pk( ...@@ -1326,9 +1364,15 @@ row_log_table_get_pk(
/* Copy the fields, because the fields will be updated /* Copy the fields, because the fields will be updated
or the record may be moved somewhere else in the B-tree or the record may be moved somewhere else in the B-tree
as part of the upcoming operation. */ as part of the upcoming operation. */
if (trx_read_trx_id(trx_roll) < log->min_trx) {
trx_roll = reset_trx_id;
if (sys) { if (sys) {
memcpy(sys, trx_roll, memcpy(sys, trx_roll,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
}
} else if (sys) {
memcpy(sys, trx_roll,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
trx_roll = sys; trx_roll = sys;
} else { } else {
trx_roll = static_cast<const byte*>( trx_roll = static_cast<const byte*>(
...@@ -1337,6 +1381,8 @@ row_log_table_get_pk( ...@@ -1337,6 +1381,8 @@ row_log_table_get_pk(
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)); DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN));
} }
ut_d(trx_id_check(trx_roll, log->min_trx));
dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq), dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq),
trx_roll, DATA_TRX_ID_LEN); trx_roll, DATA_TRX_ID_LEN);
dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1), dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1),
...@@ -1938,6 +1984,8 @@ row_log_table_apply_delete( ...@@ -1938,6 +1984,8 @@ row_log_table_apply_delete(
= rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets, = rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
trx_id_col, &len); trx_id_col, &len);
ut_ad(len == DATA_TRX_ID_LEN); ut_ad(len == DATA_TRX_ID_LEN);
ut_d(trx_id_check(rec_trx_id, log->min_trx));
ut_d(trx_id_check(mrec_trx_id, log->min_trx));
ut_ad(rec_get_nth_field(mrec, moffsets, trx_id_col + 1, &len) ut_ad(rec_get_nth_field(mrec, moffsets, trx_id_col + 1, &len)
== mrec_trx_id + DATA_TRX_ID_LEN); == mrec_trx_id + DATA_TRX_ID_LEN);
...@@ -2132,21 +2180,21 @@ row_log_table_apply_update( ...@@ -2132,21 +2180,21 @@ row_log_table_apply_update(
/* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what /* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what
was buffered. */ was buffered. */
ulint len; ulint len;
const void* rec_trx_id const byte* rec_trx_id
= rec_get_nth_field(btr_pcur_get_rec(&pcur), = rec_get_nth_field(btr_pcur_get_rec(&pcur),
cur_offsets, index->n_uniq, &len); cur_offsets, index->n_uniq, &len);
const dfield_t* old_pk_trx_id
= dtuple_get_nth_field(old_pk, index->n_uniq);
ut_ad(len == DATA_TRX_ID_LEN); ut_ad(len == DATA_TRX_ID_LEN);
ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq)->len ut_d(trx_id_check(rec_trx_id, log->min_trx));
== DATA_TRX_ID_LEN); ut_ad(old_pk_trx_id->len == DATA_TRX_ID_LEN);
ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq + 1)->len ut_ad(old_pk_trx_id[1].len == DATA_ROLL_PTR_LEN);
== DATA_ROLL_PTR_LEN); ut_ad(DATA_TRX_ID_LEN
ut_ad(DATA_TRX_ID_LEN + static_cast<const char*>( + static_cast<const char*>(old_pk_trx_id->data)
dtuple_get_nth_field(old_pk, == old_pk_trx_id[1].data);
index->n_uniq)->data) ut_d(trx_id_check(old_pk_trx_id->data, log->min_trx));
== dtuple_get_nth_field(old_pk,
index->n_uniq + 1)->data); if (memcmp(rec_trx_id, old_pk_trx_id->data,
if (memcmp(rec_trx_id,
dtuple_get_nth_field(old_pk, index->n_uniq)->data,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) { DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
/* The ROW_T_UPDATE was logged for a different /* The ROW_T_UPDATE was logged for a different
DB_TRX_ID,DB_ROLL_PTR. This is possible if an DB_TRX_ID,DB_ROLL_PTR. This is possible if an
...@@ -3132,6 +3180,7 @@ for online creation. ...@@ -3132,6 +3180,7 @@ for online creation.
bool bool
row_log_allocate( row_log_allocate(
/*=============*/ /*=============*/
const trx_t* trx, /*!< in: the ALTER TABLE transaction */
dict_index_t* index, /*!< in/out: index */ dict_index_t* index, /*!< in/out: index */
dict_table_t* table, /*!< in/out: new table being rebuilt, dict_table_t* table, /*!< in/out: new table being rebuilt,
or NULL when creating a secondary index */ or NULL when creating a secondary index */
...@@ -3154,6 +3203,8 @@ row_log_allocate( ...@@ -3154,6 +3203,8 @@ row_log_allocate(
ut_ad(!table || col_map); ut_ad(!table || col_map);
ut_ad(!add_cols || col_map); ut_ad(!add_cols || col_map);
ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
ut_ad(trx->id);
log = static_cast<row_log_t*>(ut_malloc_nokey(sizeof *log)); log = static_cast<row_log_t*>(ut_malloc_nokey(sizeof *log));
...@@ -3170,6 +3221,7 @@ row_log_allocate( ...@@ -3170,6 +3221,7 @@ row_log_allocate(
log->add_cols = add_cols; log->add_cols = add_cols;
log->col_map = col_map; log->col_map = col_map;
log->error = DB_SUCCESS; log->error = DB_SUCCESS;
log->min_trx = trx->id;
log->max_trx = 0; log->max_trx = 0;
log->tail.blocks = log->tail.bytes = 0; log->tail.blocks = log->tail.bytes = 0;
log->tail.total = 0; log->tail.total = 0;
......
...@@ -357,7 +357,6 @@ row_merge_decrypt_buf( ...@@ -357,7 +357,6 @@ row_merge_decrypt_buf(
#define FTS_PENDING_DOC_MEMORY_LIMIT 1000000 #define FTS_PENDING_DOC_MEMORY_LIMIT 1000000
/** Insert sorted data tuples to the index. /** Insert sorted data tuples to the index.
@param[in] trx_id transaction identifier
@param[in] index index to be inserted @param[in] index index to be inserted
@param[in] old_table old table @param[in] old_table old table
@param[in] fd file descriptor @param[in] fd file descriptor
...@@ -372,7 +371,6 @@ and then stage->inc() will be called for each record that is processed. ...@@ -372,7 +371,6 @@ and then stage->inc() will be called for each record that is processed.
static MY_ATTRIBUTE((warn_unused_result)) static MY_ATTRIBUTE((warn_unused_result))
dberr_t dberr_t
row_merge_insert_index_tuples( row_merge_insert_index_tuples(
trx_id_t trx_id,
dict_index_t* index, dict_index_t* index,
const dict_table_t* old_table, const dict_table_t* old_table,
int fd, int fd,
...@@ -1832,6 +1830,8 @@ row_merge_read_clustered_index( ...@@ -1832,6 +1830,8 @@ row_merge_read_clustered_index(
ut_ad((old_table == new_table) == !col_map); ut_ad((old_table == new_table) == !col_map);
ut_ad(!add_cols || col_map); ut_ad(!add_cols || col_map);
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
ut_ad(trx->id);
table_total_rows = dict_table_get_n_rows(old_table); table_total_rows = dict_table_get_n_rows(old_table);
if(table_total_rows == 0) { if(table_total_rows == 0) {
...@@ -1927,6 +1927,16 @@ row_merge_read_clustered_index( ...@@ -1927,6 +1927,16 @@ row_merge_read_clustered_index(
based on that. */ based on that. */
clust_index = dict_table_get_first_index(old_table); clust_index = dict_table_get_first_index(old_table);
const ulint old_trx_id_col = old_table->n_cols + DATA_TRX_ID
- dict_table_get_n_sys_cols(table);
ut_ad(old_table->cols[old_trx_id_col].mtype == DATA_SYS);
ut_ad(old_table->cols[old_trx_id_col].prtype
== (DATA_TRX_ID | DATA_NOT_NULL));
ut_ad(old_table->cols[old_trx_id_col + 1].mtype == DATA_SYS);
ut_ad(old_table->cols[old_trx_id_col + 1].prtype
== (DATA_ROLL_PTR | DATA_NOT_NULL));
const ulint new_trx_id_col = col_map
? col_map[old_trx_id_col] : old_trx_id_col;
btr_pcur_open_at_index_side( btr_pcur_open_at_index_side(
true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr); true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
...@@ -1984,6 +1994,7 @@ row_merge_read_clustered_index( ...@@ -1984,6 +1994,7 @@ row_merge_read_clustered_index(
/* Scan the clustered index. */ /* Scan the clustered index. */
for (;;) { for (;;) {
const rec_t* rec; const rec_t* rec;
trx_id_t rec_trx_id;
ulint* offsets; ulint* offsets;
const dtuple_t* row; const dtuple_t* row;
row_ext_t* ext; row_ext_t* ext;
...@@ -2135,6 +2146,8 @@ row_merge_read_clustered_index( ...@@ -2135,6 +2146,8 @@ row_merge_read_clustered_index(
if (online) { if (online) {
offsets = rec_get_offsets(rec, clust_index, NULL, offsets = rec_get_offsets(rec, clust_index, NULL,
ULINT_UNDEFINED, &row_heap); ULINT_UNDEFINED, &row_heap);
rec_trx_id = row_get_rec_trx_id(rec, clust_index,
offsets);
/* Perform a REPEATABLE READ. /* Perform a REPEATABLE READ.
...@@ -2156,11 +2169,10 @@ row_merge_read_clustered_index( ...@@ -2156,11 +2169,10 @@ row_merge_read_clustered_index(
the DML thread has updated the clustered index the DML thread has updated the clustered index
but has not yet accessed secondary index. */ but has not yet accessed secondary index. */
ut_ad(MVCC::is_view_active(trx->read_view)); ut_ad(MVCC::is_view_active(trx->read_view));
ut_ad(rec_trx_id != trx->id);
if (!trx->read_view->changes_visible( if (!trx->read_view->changes_visible(
row_get_rec_trx_id( rec_trx_id, old_table->name)) {
rec, clust_index, offsets),
old_table->name)) {
rec_t* old_vers; rec_t* old_vers;
row_vers_build_for_consistent_read( row_vers_build_for_consistent_read(
...@@ -2168,20 +2180,33 @@ row_merge_read_clustered_index( ...@@ -2168,20 +2180,33 @@ row_merge_read_clustered_index(
trx->read_view, &row_heap, trx->read_view, &row_heap,
row_heap, &old_vers, NULL); row_heap, &old_vers, NULL);
rec = old_vers; if (!old_vers) {
if (!rec) {
continue; continue;
} }
/* The old version must necessarily be
in the "prehistory", because the
exclusive lock in
ha_innobase::prepare_inplace_alter_table()
forced the completion of any transactions
that accessed this table. */
ut_ad(row_get_rec_trx_id(old_vers, clust_index,
offsets) < trx->id);
rec = old_vers;
rec_trx_id = 0;
} }
if (rec_get_deleted_flag( if (rec_get_deleted_flag(
rec, rec,
dict_table_is_comp(old_table))) { dict_table_is_comp(old_table))) {
/* In delete-marked records, DB_TRX_ID must /* In delete-marked records, DB_TRX_ID must
always refer to an existing undo log record. */ always refer to an existing undo log record.
ut_ad(row_get_rec_trx_id(rec, clust_index, Above, we did reset rec_trx_id = 0
offsets)); for rec = old_vers.*/
ut_ad(rec == page_cur_get_rec(cur)
? rec_trx_id
: !rec_trx_id);
/* This record was deleted in the latest /* This record was deleted in the latest
committed version, or it was deleted and committed version, or it was deleted and
then reinserted-by-update before purge then reinserted-by-update before purge
...@@ -2194,19 +2219,37 @@ row_merge_read_clustered_index( ...@@ -2194,19 +2219,37 @@ row_merge_read_clustered_index(
rec, dict_table_is_comp(old_table))) { rec, dict_table_is_comp(old_table))) {
/* In delete-marked records, DB_TRX_ID must /* In delete-marked records, DB_TRX_ID must
always refer to an existing undo log record. */ always refer to an existing undo log record. */
ut_ad(rec_get_trx_id(rec, clust_index)); ut_d(rec_trx_id = rec_get_trx_id(rec, clust_index));
ut_ad(rec_trx_id);
/* This must be a purgeable delete-marked record,
and the transaction that delete-marked the record
must have been committed before this
!online ALTER TABLE transaction. */
ut_ad(rec_trx_id < trx->id);
/* Skip delete-marked records. /* Skip delete-marked records.
Skipping delete-marked records will make the Skipping delete-marked records will make the
created indexes unuseable for transactions created indexes unuseable for transactions
whose read views were created before the index whose read views were created before the index
creation completed, but preserving the history creation completed, but an attempt to preserve
would make it tricky to detect duplicate the history would make it tricky to detect
keys. */ duplicate keys. */
continue; continue;
} else { } else {
offsets = rec_get_offsets(rec, clust_index, NULL, offsets = rec_get_offsets(rec, clust_index, NULL,
ULINT_UNDEFINED, &row_heap); ULINT_UNDEFINED, &row_heap);
/* This is a locking ALTER TABLE.
If we are not rebuilding the table, the
DB_TRX_ID does not matter, as it is not being
written to any secondary indexes; see
if (old_table == new_table) below.
If we are rebuilding the table, the
DB_TRX_ID,DB_ROLL_PTR should be reset, because
there will be no history available. */
ut_ad(rec_get_trx_id(rec, clust_index) < trx->id);
rec_trx_id = 0;
} }
/* When !online, we are holding a lock on old_table, preventing /* When !online, we are holding a lock on old_table, preventing
...@@ -2241,6 +2284,33 @@ row_merge_read_clustered_index( ...@@ -2241,6 +2284,33 @@ row_merge_read_clustered_index(
doc_id = 0; doc_id = 0;
} }
ut_ad(row->fields[new_trx_id_col].type.mtype == DATA_SYS);
ut_ad(row->fields[new_trx_id_col].type.prtype
== (DATA_TRX_ID | DATA_NOT_NULL));
ut_ad(row->fields[new_trx_id_col].len == DATA_TRX_ID_LEN);
ut_ad(row->fields[new_trx_id_col + 1].type.mtype == DATA_SYS);
ut_ad(row->fields[new_trx_id_col + 1].type.prtype
== (DATA_ROLL_PTR | DATA_NOT_NULL));
ut_ad(row->fields[new_trx_id_col + 1].len == DATA_ROLL_PTR_LEN);
if (old_table == new_table) {
/* Do not bother touching DB_TRX_ID,DB_ROLL_PTR
because they are not going to be written into
secondary indexes. */
} else if (rec_trx_id < trx->id) {
/* Reset the DB_TRX_ID,DB_ROLL_PTR of old rows
for which history is not going to be
available after the rebuild operation.
This essentially mimics row_purge_reset_trx_id(). */
byte* ptr = static_cast<byte*>(
row->fields[new_trx_id_col].data);
memset(ptr, 0, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
ptr[DATA_TRX_ID_LEN] = 1U
<< (ROLL_PTR_INSERT_FLAG_POS - CHAR_BIT
* (DATA_ROLL_PTR_LEN - 1));
}
if (add_autoinc != ULINT_UNDEFINED) { if (add_autoinc != ULINT_UNDEFINED) {
ut_ad(add_autoinc ut_ad(add_autoinc
...@@ -2328,6 +2398,11 @@ row_merge_read_clustered_index( ...@@ -2328,6 +2398,11 @@ row_merge_read_clustered_index(
continue; continue;
} }
ut_ad(!row
|| !dict_index_is_clust(buf->index)
|| trx_id_check(row->fields[new_trx_id_col].data,
trx->id));
if (UNIV_LIKELY if (UNIV_LIKELY
(row && (rows_added = row_merge_buf_add( (row && (rows_added = row_merge_buf_add(
buf, fts_index, old_table, new_table, buf, fts_index, old_table, new_table,
...@@ -2485,7 +2560,7 @@ row_merge_read_clustered_index( ...@@ -2485,7 +2560,7 @@ row_merge_read_clustered_index(
} }
err = row_merge_insert_index_tuples( err = row_merge_insert_index_tuples(
trx->id, index[i], old_table, index[i], old_table,
-1, NULL, buf, clust_btr_bulk, -1, NULL, buf, clust_btr_bulk,
table_total_rows, table_total_rows,
curr_progress, curr_progress,
...@@ -2597,7 +2672,7 @@ row_merge_read_clustered_index( ...@@ -2597,7 +2672,7 @@ row_merge_read_clustered_index(
btr_bulk.init(); btr_bulk.init();
err = row_merge_insert_index_tuples( err = row_merge_insert_index_tuples(
trx->id, index[i], old_table, index[i], old_table,
-1, NULL, buf, &btr_bulk, -1, NULL, buf, &btr_bulk,
table_total_rows, table_total_rows,
curr_progress, curr_progress,
...@@ -3444,7 +3519,6 @@ row_merge_mtuple_to_dtuple( ...@@ -3444,7 +3519,6 @@ row_merge_mtuple_to_dtuple(
} }
/** Insert sorted data tuples to the index. /** Insert sorted data tuples to the index.
@param[in] trx_id transaction identifier
@param[in] index index to be inserted @param[in] index index to be inserted
@param[in] old_table old table @param[in] old_table old table
@param[in] fd file descriptor @param[in] fd file descriptor
...@@ -3459,7 +3533,6 @@ and then stage->inc() will be called for each record that is processed. ...@@ -3459,7 +3533,6 @@ and then stage->inc() will be called for each record that is processed.
static MY_ATTRIBUTE((warn_unused_result)) static MY_ATTRIBUTE((warn_unused_result))
dberr_t dberr_t
row_merge_insert_index_tuples( row_merge_insert_index_tuples(
trx_id_t trx_id,
dict_index_t* index, dict_index_t* index,
const dict_table_t* old_table, const dict_table_t* old_table,
int fd, int fd,
...@@ -3498,7 +3571,6 @@ row_merge_insert_index_tuples( ...@@ -3498,7 +3571,6 @@ row_merge_insert_index_tuples(
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
ut_ad(!(index->type & DICT_FTS)); ut_ad(!(index->type & DICT_FTS));
ut_ad(!dict_index_is_spatial(index)); ut_ad(!dict_index_is_spatial(index));
ut_ad(trx_id);
if (stage != NULL) { if (stage != NULL) {
stage->begin_phase_insert(); stage->begin_phase_insert();
...@@ -3542,7 +3614,6 @@ row_merge_insert_index_tuples( ...@@ -3542,7 +3614,6 @@ row_merge_insert_index_tuples(
} }
} }
for (;;) { for (;;) {
if (stage != NULL) { if (stage != NULL) {
...@@ -5010,7 +5081,7 @@ row_merge_build_indexes( ...@@ -5010,7 +5081,7 @@ row_merge_build_indexes(
} }
error = row_merge_insert_index_tuples( error = row_merge_insert_index_tuples(
trx->id, sort_idx, old_table, sort_idx, old_table,
merge_files[i].fd, block, NULL, merge_files[i].fd, block, NULL,
&btr_bulk, &btr_bulk,
merge_files[i].n_rec, pct_progress, pct_cost, merge_files[i].n_rec, pct_progress, pct_cost,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment