Commit 255328d3 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-16131 Assertion failed in dict_index_t::instant_field_value()

During a table-rebuilding online ALTER TABLE, if
dict_index_t::remove_instant() was invoked on the source table
(because it became empty), we would inadvertently change the way
how log records are written and parsed. We must keep the online_log
format unchanged throughout the whole table-rebuilding operation.

dict_col_t::def_t: Name the type of dict_col_t::def_val.

rec_get_n_add_field_len(), rec_set_n_add_field(): Define globally,
because these will be needed in row_log_table_low().

rec_init_offsets_temp(), rec_init_offsets_comp_ordinary(): Add
the parameter def_val for explicitly passing the default values
of the instantly added columns of the source table, so that
dict_index_t::instant_field_value() will not be called during
row_log_table_apply(). This allows us to consistently parse the
online_log records, even if the source table was converted
to the canonical non-instant format during the rebuild operation.

row_log_t::non_core_fields[]: The default values of the
instantly added columns on the source table; copied
during ha_innobase::prepare_inplace_alter_table()
while the table is exclusively locked.

row_log_t::instant_field_value(): Accessor to non_core_fields[],
analogous to dict_index_t::instant_field_value().

row_log_table_low(): Add fake_extra_size bytes to the record
header if the source table was converted to the canonical format
during the operation.

row_log_allocate(): Initialize row_log_t::non_core_fields.
parent a97c190d
......@@ -175,10 +175,42 @@ SET DEBUG_SYNC='now WAIT_FOR copied';
BEGIN;
INSERT INTO t1 SET b=1;
ROLLBACK;
disconnect stop_purge;
connection stop_purge;
COMMIT;
connection default;
InnoDB 2 transactions not purged
SET DEBUG_SYNC='now SIGNAL logged';
disconnect ddl;
connection ddl;
connection default;
DROP TABLE t1;
SET DEBUG_SYNC='RESET';
#
# MDEV-16131 Assertion failed in dict_index_t::instant_field_value()
#
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 SET a=0;
ALTER TABLE t1 ADD COLUMN b INT NOT NULL DEFAULT 2, ADD COLUMN c INT;
connection stop_purge;
START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default;
DELETE FROM t1;
connection ddl;
SET DEBUG_SYNC='row_log_table_apply1_before SIGNAL copied WAIT_FOR logged';
ALTER TABLE t1 FORCE;
disconnect stop_purge;
connection default;
SET DEBUG_SYNC = 'now WAIT_FOR copied';
InnoDB 1 transactions not purged
INSERT INTO t1 SET a=1;
INSERT INTO t1 SET a=2,b=3,c=4;
SET DEBUG_SYNC = 'now SIGNAL logged';
connection ddl;
disconnect ddl;
connection default;
SET DEBUG_SYNC = RESET;
SELECT * FROM t1;
a b c
1 2 NULL
2 3 4
DROP TABLE t1;
SET GLOBAL innodb_purge_rseg_truncate_frequency = @save_frequency;
......@@ -189,7 +189,9 @@ SET DEBUG_SYNC='now WAIT_FOR copied';
BEGIN;
INSERT INTO t1 SET b=1;
ROLLBACK;
disconnect stop_purge;
connection stop_purge;
COMMIT;
connection default;
# Wait for purge to empty the table.
let $wait_all_purged=2;
......@@ -197,8 +199,46 @@ let $wait_all_purged=2;
let $wait_all_purged=0;
SET DEBUG_SYNC='now SIGNAL logged';
disconnect ddl;
connection ddl;
reap;
connection default;
DROP TABLE t1;
SET DEBUG_SYNC='RESET';
--echo #
--echo # MDEV-16131 Assertion failed in dict_index_t::instant_field_value()
--echo #
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 SET a=0;
ALTER TABLE t1 ADD COLUMN b INT NOT NULL DEFAULT 2, ADD COLUMN c INT;
connection stop_purge;
START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default;
DELETE FROM t1;
connection ddl;
SET DEBUG_SYNC='row_log_table_apply1_before SIGNAL copied WAIT_FOR logged';
send ALTER TABLE t1 FORCE;
disconnect stop_purge;
connection default;
SET DEBUG_SYNC = 'now WAIT_FOR copied';
let $wait_all_purged = 1;
--source include/wait_all_purged.inc
INSERT INTO t1 SET a=1;
INSERT INTO t1 SET a=2,b=3,c=4;
SET DEBUG_SYNC = 'now SIGNAL logged';
connection ddl;
reap;
disconnect ddl;
connection default;
SET DEBUG_SYNC = RESET;
SELECT * FROM t1;
DROP TABLE t1;
SET GLOBAL innodb_purge_rseg_truncate_frequency = @save_frequency;
......@@ -593,7 +593,7 @@ struct dict_col_t{
inline void detach(const dict_index_t& index);
/** Data for instantly added columns */
struct {
struct def_t {
/** original default value of instantly added column */
const void* data;
/** len of data, or UNIV_SQL_DEFAULT if unavailable */
......
/*****************************************************************************
Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, MariaDB Corporation.
Copyright (c) 2017, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -312,6 +312,31 @@ rec_set_status(rec_t* rec, byte bits)
| bits;
}
/** Get the length of added field count in a REC_STATUS_COLUMNS_ADDED record.
@param[in] n_add_field number of added fields, minus one
@return storage size of the field count, in bytes */
inline unsigned rec_get_n_add_field_len(ulint n_add_field)
{
ut_ad(n_add_field < REC_MAX_N_FIELDS);
return n_add_field < 0x80 ? 1 : 2;
}
/** Set the added field count in a REC_STATUS_COLUMNS_ADDED record.
@param[in,out] header variable header of a REC_STATUS_COLUMNS_ADDED record
@param[in] n_add number of added fields, minus 1
@return record header before the number of added fields */
inline void rec_set_n_add_field(byte*& header, ulint n_add)
{
ut_ad(n_add < REC_MAX_N_FIELDS);
if (n_add < 0x80) {
*header-- = byte(n_add);
} else {
*header-- = byte(n_add) | 0x80;
*header-- = byte(n_add >> 7);
}
}
/******************************************************//**
The following function is used to retrieve the info and status
bits of a record. (Only compact records have status bits.)
......@@ -962,6 +987,7 @@ rec_get_converted_size_temp(
@param[in] index index of that the record belongs to
@param[in,out] offsets offsets to the fields; in: rec_offs_n_fields(offsets)
@param[in] n_core number of core fields (index->n_core_fields)
@param[in] def_val default values for non-core fields
@param[in] status REC_STATUS_ORDINARY or REC_STATUS_COLUMNS_ADDED */
void
rec_init_offsets_temp(
......@@ -969,6 +995,7 @@ rec_init_offsets_temp(
const dict_index_t* index,
ulint* offsets,
ulint n_core,
const dict_col_t::def_t*def_val,
rec_comp_status_t status = REC_STATUS_ORDINARY)
MY_ATTRIBUTE((nonnull));
/** Determine the offset to each field in temporary file.
......
......@@ -237,15 +237,6 @@ rec_get_n_extern_new(
return(n_extern);
}
/** Get the length of added field count in a REC_STATUS_COLUMNS_ADDED record.
@param[in] n_add_field number of added fields, minus one
@return storage size of the field count, in bytes */
static inline unsigned rec_get_n_add_field_len(ulint n_add_field)
{
ut_ad(n_add_field < REC_MAX_N_FIELDS);
return n_add_field < 0x80 ? 1 : 2;
}
/** Get the added field count in a REC_STATUS_COLUMNS_ADDED record.
@param[in,out] header variable header of a REC_STATUS_COLUMNS_ADDED record
@return number of added fields */
......@@ -264,22 +255,6 @@ static inline unsigned rec_get_n_add_field(const byte*& header)
return n_fields_add;
}
/** Set the added field count in a REC_STATUS_COLUMNS_ADDED record.
@param[in,out] header variable header of a REC_STATUS_COLUMNS_ADDED record
@param[in] n_add number of added fields, minus 1
@return record header before the number of added fields */
static inline void rec_set_n_add_field(byte*& header, ulint n_add)
{
ut_ad(n_add < REC_MAX_N_FIELDS);
if (n_add < 0x80) {
*header-- = byte(n_add);
} else {
*header-- = byte(n_add) | 0x80;
*header-- = byte(n_add >> 7);
}
}
/** Format of a leaf-page ROW_FORMAT!=REDUNDANT record */
enum rec_leaf_format {
/** Temporary file record */
......@@ -299,6 +274,8 @@ This is a special case of rec_init_offsets() and rec_get_offsets_func().
@param[in] rec leaf-page record
@param[in] index the index that the record belongs in
@param[in] n_core number of core fields (index->n_core_fields)
@param[in] def_val default values for non-core fields, or
NULL to refer to index->fields[].col->def_val
@param[in,out] offsets offsets, with valid rec_offs_n_fields(offsets)
@param[in] format record format */
static inline
......@@ -308,6 +285,7 @@ rec_init_offsets_comp_ordinary(
const dict_index_t* index,
ulint* offsets,
ulint n_core,
const dict_col_t::def_t*def_val,
rec_leaf_format format)
{
ulint offs = 0;
......@@ -379,7 +357,19 @@ rec_init_offsets_comp_ordinary(
ulint len;
/* set default value flag */
if (i >= n_fields) {
if (i < n_fields) {
} else if (def_val) {
const dict_col_t::def_t& d = def_val[i - n_core];
if (!d.data) {
len = offs | REC_OFFS_SQL_NULL;
ut_ad(d.len == UNIV_SQL_NULL);
} else {
len = offs | REC_OFFS_DEFAULT;
any |= REC_OFFS_DEFAULT;
}
goto resolved;
} else {
ulint dlen;
if (!index->instant_field_value(i, &dlen)) {
len = offs | REC_OFFS_SQL_NULL;
......@@ -618,12 +608,14 @@ rec_init_offsets(
ut_ad(leaf);
rec_init_offsets_comp_ordinary(rec, index, offsets,
index->n_core_fields,
NULL,
REC_LEAF_COLUMNS_ADDED);
return;
case REC_STATUS_ORDINARY:
ut_ad(leaf);
rec_init_offsets_comp_ordinary(rec, index, offsets,
index->n_core_fields,
NULL,
REC_LEAF_ORDINARY);
return;
}
......@@ -1695,6 +1687,7 @@ rec_get_converted_size_temp(
@param[in] index index of that the record belongs to
@param[in,out] offsets offsets to the fields; in: rec_offs_n_fields(offsets)
@param[in] n_core number of core fields (index->n_core_fields)
@param[in] def_val default values for non-core fields
@param[in] status REC_STATUS_ORDINARY or REC_STATUS_COLUMNS_ADDED */
void
rec_init_offsets_temp(
......@@ -1702,6 +1695,7 @@ rec_init_offsets_temp(
const dict_index_t* index,
ulint* offsets,
ulint n_core,
const dict_col_t::def_t*def_val,
rec_comp_status_t status)
{
ut_ad(status == REC_STATUS_ORDINARY
......@@ -1710,7 +1704,7 @@ rec_init_offsets_temp(
if it was emptied during an ALTER TABLE operation. */
ut_ad(index->n_core_fields == n_core || !index->is_instant());
ut_ad(index->n_core_fields >= n_core);
rec_init_offsets_comp_ordinary(rec, index, offsets, n_core,
rec_init_offsets_comp_ordinary(rec, index, offsets, n_core, def_val,
status == REC_STATUS_COLUMNS_ADDED
? REC_LEAF_TEMP_COLUMNS_ADDED
: REC_LEAF_TEMP);
......@@ -1729,7 +1723,8 @@ rec_init_offsets_temp(
{
ut_ad(!index->is_instant());
rec_init_offsets_comp_ordinary(rec, index, offsets,
index->n_core_fields, REC_LEAF_TEMP);
index->n_core_fields, NULL,
REC_LEAF_TEMP);
}
/** Convert a data tuple prefix to the temporary file format.
......
......@@ -227,6 +227,8 @@ struct row_log_t {
table could be emptied, so that table->is_instant() no longer holds,
but all log records must be in the "instant" format. */
unsigned n_core_fields;
/** the default values of non-core fields when the operation started */
dict_col_t::def_t* non_core_fields;
bool allow_not_null; /*!< Whether the alter ignore is being
used or if the sql mode is non-strict mode;
if not, NULL values will not be converted to
......@@ -243,6 +245,14 @@ struct row_log_t {
ut_ad(n_core_fields <= index->n_fields);
return n_core_fields != index->n_fields;
}
const byte* instant_field_value(ulint n, ulint* len) const
{
ut_ad(n >= n_core_fields);
const dict_col_t::def_t& d= non_core_fields[n - n_core_fields];
*len = d.len;
return static_cast<const byte*>(d.data);
}
};
/** Create the file or online log if it does not exist.
......@@ -935,15 +945,16 @@ row_log_table_low(
ulint mrec_size;
ulint avail_size;
const dict_index_t* new_index;
row_log_t* log = index->online_log;
new_index = dict_table_get_first_index(index->online_log->table);
new_index = dict_table_get_first_index(log->table);
ut_ad(dict_index_is_clust(index));
ut_ad(dict_index_is_clust(new_index));
ut_ad(!dict_index_is_online_ddl(new_index));
ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
ut_ad(rec_offs_size(offsets) <= sizeof log->tail.buf);
ut_ad(rw_lock_own_flagged(
&index->lock,
RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
......@@ -970,7 +981,7 @@ row_log_table_low(
if (index->online_status != ONLINE_INDEX_CREATION
|| (index->type & DICT_CORRUPT) || index->table->corrupted
|| index->online_log->error != DB_SUCCESS) {
|| log->error != DB_SUCCESS) {
return;
}
......@@ -987,14 +998,32 @@ row_log_table_low(
const ulint omit_size = REC_N_NEW_EXTRA_BYTES;
const ulint rec_extra_size = rec_offs_extra_size(offsets) - omit_size;
const bool is_instant = index->online_log->is_instant(index);
const bool is_instant = log->is_instant(index);
extra_size = rec_extra_size + is_instant;
unsigned fake_extra_size = 0;
byte fake_extra_buf[2];
if (is_instant && UNIV_UNLIKELY(!index->is_instant())) {
/* The source table was emptied after ALTER TABLE
started, and it was converted to non-instant format.
Because row_log_table_apply_op() expects to find
all records to be logged in the same way, we will
be unable to copy the rec_extra_size bytes from the
record header, but must convert them here. */
unsigned n_add = index->n_fields - 1 - log->n_core_fields;
fake_extra_size = rec_get_n_add_field_len(n_add);
ut_ad(fake_extra_size == 1 || fake_extra_size == 2);
extra_size += fake_extra_size;
byte* fake_extra = fake_extra_buf + fake_extra_size - 1;
rec_set_n_add_field(fake_extra, n_add);
ut_ad(fake_extra + 1 == fake_extra_buf);
}
mrec_size = ROW_LOG_HEADER_SIZE
+ (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size
+ is_instant;
+ is_instant + fake_extra_size;
if (insert || index->online_log->same_pk) {
if (insert || log->same_pk) {
ut_ad(!old_pk);
old_pk_extra_size = old_pk_size = 0;
} else {
......@@ -1012,8 +1041,7 @@ row_log_table_low(
mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
}
if (byte* b = row_log_table_open(index->online_log,
mrec_size, &avail_size)) {
if (byte* b = row_log_table_open(log, mrec_size, &avail_size)) {
if (insert) {
*b++ = ROW_T_INSERT;
} else {
......@@ -1038,20 +1066,23 @@ row_log_table_low(
}
if (is_instant) {
*b++ = rec_get_status(rec);
*b++ = fake_extra_size
? REC_STATUS_COLUMNS_ADDED
: rec_get_status(rec);
} else {
ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
}
memcpy(b, rec - rec_extra_size - omit_size, rec_extra_size);
b += rec_extra_size;
memcpy(b, fake_extra_buf, fake_extra_size);
b += fake_extra_size;
ulint len;
ulint trx_id_offs = rec_get_nth_field_offs(
offsets, index->n_uniq, &len);
ut_ad(len == DATA_TRX_ID_LEN);
memcpy(b, rec, rec_offs_data_size(offsets));
if (trx_read_trx_id(b + trx_id_offs)
< index->online_log->min_trx) {
if (trx_read_trx_id(b + trx_id_offs) < log->min_trx) {
memcpy(b + trx_id_offs,
reset_trx_id, sizeof reset_trx_id);
}
......@@ -1576,7 +1607,7 @@ row_log_table_apply_convert_mrec(
} else {
data = rec_get_nth_field(mrec, offsets, i, &len);
if (len == UNIV_SQL_DEFAULT) {
data = index->instant_field_value(i, &len);
data = log->instant_field_value(i, &len);
}
dfield_set_data(dfield, data, len);
}
......@@ -2416,7 +2447,7 @@ row_log_table_apply_op(
rec_offs_set_n_fields(offsets, dup->index->n_fields);
rec_init_offsets_temp(mrec, dup->index, offsets,
log->n_core_fields,
log->n_core_fields, log->non_core_fields,
is_instant
? static_cast<rec_comp_status_t>(
*(mrec - extra_size))
......@@ -2497,6 +2528,7 @@ row_log_table_apply_op(
rec_offs_set_n_fields(offsets, dup->index->n_fields);
rec_init_offsets_temp(mrec, dup->index, offsets,
log->n_core_fields,
log->non_core_fields,
is_instant
? static_cast<rec_comp_status_t>(
*(mrec - extra_size))
......@@ -2598,6 +2630,7 @@ row_log_table_apply_op(
rec_offs_set_n_fields(offsets, dup->index->n_fields);
rec_init_offsets_temp(mrec, dup->index, offsets,
log->n_core_fields,
log->non_core_fields,
is_instant
? static_cast<rec_comp_status_t>(
*(mrec - extra_size))
......@@ -3174,6 +3207,18 @@ row_log_allocate(
log->old_table = old_table;
log->n_rows = 0;
if (table && index->is_instant()) {
const unsigned n = log->n_core_fields;
log->non_core_fields = UT_NEW_ARRAY_NOKEY(
dict_col_t::def_t, index->n_fields - n);
for (unsigned i = n; i < index->n_fields; i++) {
log->non_core_fields[i - n]
= index->fields[i].col->def_val;
}
} else {
log->non_core_fields = NULL;
}
dict_index_set_online_status(index, ONLINE_INDEX_CREATION);
index->online_log = log;
......@@ -3206,6 +3251,7 @@ row_log_free(
MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX);
UT_DELETE(log->blobs);
UT_DELETE_ARRAY(log->non_core_fields);
row_log_block_free(log->tail);
row_log_block_free(log->head);
row_merge_file_destroy_low(log->fd);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment