Commit a736a2cb authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-21724: Correctly invoke page_dir_split_slot()

In commit 138cbec5, we
computed an incorrect parameter to page_dir_split_slot(),
leading us to splitting the wrong directory slot, or
an out-of-bounds access when splitting the supremum slot.
This was once caught in the test innodb_gis.kill_server for
inserting records to a clustered index root page.

page_dir_split_slot(): Take the slot as a pointer, instead of
a numeric index.

page_apply_insert_redundant(), page_apply_insert_dynamic():
Rename slot to last_slot, and make owner_slot a pointer.
parent fae259f0
...@@ -799,12 +799,13 @@ static void page_rec_set_n_owned(rec_t *rec, ulint n_owned, bool comp) ...@@ -799,12 +799,13 @@ static void page_rec_set_n_owned(rec_t *rec, ulint n_owned, bool comp)
/** /**
Split a directory slot which owns too many records. Split a directory slot which owns too many records.
@param[in,out] block index page @param[in,out] block index page
@param[in] s the slot that needs to be split */ @param[in,out] slot the slot that needs to be split */
static void page_dir_split_slot(const buf_block_t &block, ulint s) static void page_dir_split_slot(const buf_block_t &block,
page_dir_slot_t *slot)
{ {
ut_ad(s); ut_ad(slot <= &block.frame[srv_page_size - PAGE_EMPTY_DIR_START]);
slot= my_assume_aligned<2>(slot);
page_dir_slot_t *slot= page_dir_get_nth_slot(block.frame, s);
const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1; const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1;
ut_ad(page_dir_slot_get_n_owned(slot) == n_owned); ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
...@@ -825,6 +826,7 @@ static void page_dir_split_slot(const buf_block_t &block, ulint s) ...@@ -825,6 +826,7 @@ static void page_dir_split_slot(const buf_block_t &block, ulint s)
page_dir_slot_t *last_slot= static_cast<page_dir_slot_t*> page_dir_slot_t *last_slot= static_cast<page_dir_slot_t*>
(block.frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) - (block.frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) -
n_slots * PAGE_DIR_SLOT_SIZE); n_slots * PAGE_DIR_SLOT_SIZE);
ut_ad(slot >= last_slot);
memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE, memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE,
slot - last_slot); slot - last_slot);
...@@ -1595,7 +1597,10 @@ page_cur_insert_rec_low( ...@@ -1595,7 +1597,10 @@ page_cur_insert_rec_low(
corresponding directory slot in two. */ corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
page_dir_split_slot(*block, page_dir_find_owner_slot(next_rec)); {
const auto owner= page_dir_find_owner_slot(next_rec);
page_dir_split_slot(*block, page_dir_get_nth_slot(block->frame, owner));
}
rec_offs_make_valid(insert_buf + extra_size, index, rec_offs_make_valid(insert_buf + extra_size, index,
page_is_leaf(block->frame), offsets); page_is_leaf(block->frame), offsets);
...@@ -2305,14 +2310,14 @@ bool page_apply_insert_redundant(const buf_block_t &block, bool reuse, ...@@ -2305,14 +2310,14 @@ bool page_apply_insert_redundant(const buf_block_t &block, bool reuse,
return true; return true;
} }
const byte *const slot= page_dir_get_nth_slot(block.frame, n_slots - 1); byte * const last_slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
byte * const page_heap_top= my_assume_aligned<2> byte * const page_heap_top= my_assume_aligned<2>
(PAGE_HEAP_TOP + PAGE_HEADER + block.frame); (PAGE_HEAP_TOP + PAGE_HEADER + block.frame);
const byte *const heap_bot= &block.frame[PAGE_OLD_SUPREMUM_END]; const byte *const heap_bot= &block.frame[PAGE_OLD_SUPREMUM_END];
byte *heap_top= block.frame + mach_read_from_2(page_heap_top); byte *heap_top= block.frame + mach_read_from_2(page_heap_top);
if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > slot)) if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > last_slot))
goto corrupted; goto corrupted;
if (UNIV_UNLIKELY(mach_read_from_2(slot) != PAGE_OLD_SUPREMUM)) if (UNIV_UNLIKELY(mach_read_from_2(last_slot) != PAGE_OLD_SUPREMUM))
goto corrupted; goto corrupted;
if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) != if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) !=
PAGE_OLD_INFIMUM)) PAGE_OLD_INFIMUM))
...@@ -2370,16 +2375,21 @@ bool page_apply_insert_redundant(const buf_block_t &block, bool reuse, ...@@ -2370,16 +2375,21 @@ bool page_apply_insert_redundant(const buf_block_t &block, bool reuse,
goto corrupted; /* Corrupted (cyclic?) next-record list */ goto corrupted; /* Corrupted (cyclic?) next-record list */
} }
page_dir_slot_t *owner_slot= last_slot;
if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED) if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED)
goto corrupted; goto corrupted;
else
{
mach_write_to_2(insert_buf, owner_rec - block.frame);
static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
const page_dir_slot_t * const first_slot=
page_dir_get_nth_slot(block.frame, 0);
mach_write_to_2(insert_buf, owner_rec - block.frame); while (memcmp_aligned<2>(owner_slot, insert_buf, 2))
ulint owner_slot= n_slots; if ((owner_slot+= 2) == first_slot)
static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility"); goto corrupted;
while (memcmp_aligned<2>(slot + 2 * owner_slot, insert_buf, 2)) }
if (!owner_slot--)
goto corrupted;
owner_slot= n_slots - 1 - owner_slot;
memcpy(insert_buf, data, extra_size - hdr_c); memcpy(insert_buf, data, extra_size - hdr_c);
byte *insert_rec= &insert_buf[extra_size]; byte *insert_rec= &insert_buf[extra_size];
...@@ -2437,7 +2447,7 @@ bool page_apply_insert_redundant(const buf_block_t &block, bool reuse, ...@@ -2437,7 +2447,7 @@ bool page_apply_insert_redundant(const buf_block_t &block, bool reuse,
} }
else else
{ {
if (UNIV_UNLIKELY(heap_top + extra_size + data_size > slot)) if (UNIV_UNLIKELY(heap_top + extra_size + data_size > last_slot))
goto corrupted; goto corrupted;
rec_set_bit_field_2(insert_rec, h, rec_set_bit_field_2(insert_rec, h,
REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
...@@ -2545,14 +2555,14 @@ bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse, ...@@ -2545,14 +2555,14 @@ bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse,
return true; return true;
} }
const byte *const slot= page_dir_get_nth_slot(block.frame, n_slots - 1); byte * const last_slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
byte * const page_heap_top= my_assume_aligned<2> byte * const page_heap_top= my_assume_aligned<2>
(PAGE_HEAP_TOP + PAGE_HEADER + block.frame); (PAGE_HEAP_TOP + PAGE_HEADER + block.frame);
const byte *const heap_bot= &block.frame[PAGE_NEW_SUPREMUM_END]; const byte *const heap_bot= &block.frame[PAGE_NEW_SUPREMUM_END];
byte *heap_top= block.frame + mach_read_from_2(page_heap_top); byte *heap_top= block.frame + mach_read_from_2(page_heap_top);
if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > slot)) if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > last_slot))
goto corrupted; goto corrupted;
if (UNIV_UNLIKELY(mach_read_from_2(slot) != PAGE_NEW_SUPREMUM)) if (UNIV_UNLIKELY(mach_read_from_2(last_slot) != PAGE_NEW_SUPREMUM))
goto corrupted; goto corrupted;
if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) != if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) !=
PAGE_NEW_INFIMUM)) PAGE_NEW_INFIMUM))
...@@ -2589,7 +2599,7 @@ bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse, ...@@ -2589,7 +2599,7 @@ bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse,
goto corrupted; /* Corrupted (cyclic?) next-record list */ goto corrupted; /* Corrupted (cyclic?) next-record list */
} }
ulint owner_slot= n_slots; page_dir_slot_t* owner_slot= last_slot;
if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED) if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED)
goto corrupted; goto corrupted;
...@@ -2598,10 +2608,12 @@ bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse, ...@@ -2598,10 +2608,12 @@ bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse,
static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility"); static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
alignas(2) byte slot_buf[2]; alignas(2) byte slot_buf[2];
mach_write_to_2(slot_buf, owner_rec - block.frame); mach_write_to_2(slot_buf, owner_rec - block.frame);
while (memcmp_aligned<2>(slot + 2 * owner_slot, slot_buf, 2)) const page_dir_slot_t * const first_slot=
if (!owner_slot--) page_dir_get_nth_slot(block.frame, 0);
while (memcmp_aligned<2>(owner_slot, slot_buf, 2))
if ((owner_slot+= 2) == first_slot)
goto corrupted; goto corrupted;
owner_slot= n_slots - 1 - owner_slot;
} }
const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_c + (enc_hdr_l >> 3); const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_c + (enc_hdr_l >> 3);
...@@ -2643,7 +2655,7 @@ bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse, ...@@ -2643,7 +2655,7 @@ bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse,
} }
else else
{ {
if (UNIV_UNLIKELY(heap_top + extra_size + data_size > slot)) if (UNIV_UNLIKELY(heap_top + extra_size + data_size > last_slot))
goto corrupted; goto corrupted;
mach_write_to_2(page_n_heap, h + 1); mach_write_to_2(page_n_heap, h + 1);
h&= 0x7fff; h&= 0x7fff;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment