Commit 66a09bd6 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-13318 Crash recovery failure after the server is killed during innodb_encrypt_log startup

This fixes several InnoDB bugs related to innodb_encrypt_log and
two Mariabackup --backup bugs.

log_crypt(): Properly derive the initialization vector from the
start LSN of each block. Add a debug assertion.

log_crypt_init(): Note that the function should only be used when
creating redo log files and that the information is persisted in
the checkpoint pages.

xtrabackup_copy_log(): Validate data_len.

xtrabackup_backup_func(): Always use the chosen checkpoint buffer.

log_group_write_buf(), log_write_up_to(): Only log_crypt() the redo
log payload, not the padding bytes.

innobase_start_or_create_for_mysql(): Do not invoke log_crypt_init()
or initiate a redo log checkpoint.

recv_find_max_checkpoint(): Return the contents of LOG_CHECKPOINT_NO
to xtrabackup_backup_func() in log_sys->next_checkpoint_no.
parent 8ee4b414
...@@ -2359,10 +2359,18 @@ xtrabackup_copy_log(copy_logfile copy, lsn_t start_lsn, lsn_t end_lsn) ...@@ -2359,10 +2359,18 @@ xtrabackup_copy_log(copy_logfile copy, lsn_t start_lsn, lsn_t end_lsn)
scanned_checkpoint = checkpoint; scanned_checkpoint = checkpoint;
ulint data_len = log_block_get_data_len(log_block); ulint data_len = log_block_get_data_len(log_block);
scanned_lsn += data_len;
if (data_len != OS_FILE_LOG_BLOCK_SIZE) { if (data_len == OS_FILE_LOG_BLOCK_SIZE) {
/* The current end of the log was reached. */ /* We got a full log block. */
scanned_lsn += data_len;
} else if (data_len
>= OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE
|| data_len <= LOG_BLOCK_HDR_SIZE) {
/* We got a garbage block (abrupt end of the log). */
break;
} else {
/* We got a partial block (abrupt end of the log). */
scanned_lsn += data_len;
break; break;
} }
} }
...@@ -2375,7 +2383,7 @@ xtrabackup_copy_log(copy_logfile copy, lsn_t start_lsn, lsn_t end_lsn) ...@@ -2375,7 +2383,7 @@ xtrabackup_copy_log(copy_logfile copy, lsn_t start_lsn, lsn_t end_lsn)
if (ulint write_size = ulint(end_lsn - start_lsn)) { if (ulint write_size = ulint(end_lsn - start_lsn)) {
if (srv_encrypt_log) { if (srv_encrypt_log) {
log_crypt(log_sys->buf, write_size); log_crypt(log_sys->buf, start_lsn, write_size);
} }
if (ds_write(dst_log_file, log_sys->buf, write_size)) { if (ds_write(dst_log_file, log_sys->buf, write_size)) {
...@@ -3757,10 +3765,10 @@ xtrabackup_backup_func() ...@@ -3757,10 +3765,10 @@ xtrabackup_backup_func()
const byte* buf = log_sys->checkpoint_buf; const byte* buf = log_sys->checkpoint_buf;
checkpoint_lsn_start = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
checkpoint_no_start = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
reread_log_header: reread_log_header:
checkpoint_lsn_start = log_sys->log.lsn;
checkpoint_no_start = log_sys->next_checkpoint_no;
err = recv_find_max_checkpoint(&max_cp_field); err = recv_find_max_checkpoint(&max_cp_field);
if (err != DB_SUCCESS) { if (err != DB_SUCCESS) {
...@@ -3774,10 +3782,9 @@ xtrabackup_backup_func() ...@@ -3774,10 +3782,9 @@ xtrabackup_backup_func()
ut_ad(!((log_sys->log.format ^ LOG_HEADER_FORMAT_CURRENT) ut_ad(!((log_sys->log.format ^ LOG_HEADER_FORMAT_CURRENT)
& ~LOG_HEADER_FORMAT_ENCRYPTED)); & ~LOG_HEADER_FORMAT_ENCRYPTED));
if (checkpoint_no_start != mach_read_from_8(buf + LOG_CHECKPOINT_NO)) { log_group_header_read(&log_sys->log, max_cp_field);
checkpoint_lsn_start = mach_read_from_8(buf + LOG_CHECKPOINT_LSN); if (checkpoint_no_start != mach_read_from_8(buf + LOG_CHECKPOINT_NO)) {
checkpoint_no_start = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
goto reread_log_header; goto reread_log_header;
} }
......
...@@ -32,7 +32,11 @@ MDEV-11782: Rewritten for MariaDB 10.2 by Marko Mäkelä, MariaDB Corporation. ...@@ -32,7 +32,11 @@ MDEV-11782: Rewritten for MariaDB 10.2 by Marko Mäkelä, MariaDB Corporation.
/** innodb_encrypt_log: whether to encrypt the redo log */ /** innodb_encrypt_log: whether to encrypt the redo log */
extern my_bool srv_encrypt_log; extern my_bool srv_encrypt_log;
/** Initialize the redo log encryption key. /** Initialize the redo log encryption key and random parameters
when creating a new redo log.
The random parameters will be persisted in the log checkpoint pages.
@see log_crypt_write_checkpoint_buf()
@see log_crypt_read_checkpoint_buf()
@return whether the operation succeeded */ @return whether the operation succeeded */
UNIV_INTERN UNIV_INTERN
bool bool
...@@ -71,10 +75,11 @@ log_crypt_read_checkpoint_buf(const byte* buf); ...@@ -71,10 +75,11 @@ log_crypt_read_checkpoint_buf(const byte* buf);
/** Encrypt or decrypt log blocks. /** Encrypt or decrypt log blocks.
@param[in,out] buf log blocks to encrypt or decrypt @param[in,out] buf log blocks to encrypt or decrypt
@param[in] lsn log sequence number of the start of the buffer
@param[in] size size of the buffer, in bytes @param[in] size size of the buffer, in bytes
@param[in] decrypt whether to decrypt instead of encrypting */ @param[in] decrypt whether to decrypt instead of encrypting */
UNIV_INTERN UNIV_INTERN
void void
log_crypt(byte* buf, ulint size, bool decrypt = false); log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt = false);
#endif // log0crypt.h #endif // log0crypt.h
...@@ -103,11 +103,12 @@ get_crypt_info(ulint checkpoint_no) ...@@ -103,11 +103,12 @@ get_crypt_info(ulint checkpoint_no)
/** Encrypt or decrypt log blocks. /** Encrypt or decrypt log blocks.
@param[in,out] buf log blocks to encrypt or decrypt @param[in,out] buf log blocks to encrypt or decrypt
@param[in] lsn log sequence number of the start of the buffer
@param[in] size size of the buffer, in bytes @param[in] size size of the buffer, in bytes
@param[in] decrypt whether to decrypt instead of encrypting */ @param[in] decrypt whether to decrypt instead of encrypting */
UNIV_INTERN UNIV_INTERN
void void
log_crypt(byte* buf, ulint size, bool decrypt) log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt)
{ {
ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0); ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_a(info.key_version); ut_a(info.key_version);
...@@ -117,12 +118,12 @@ log_crypt(byte* buf, ulint size, bool decrypt) ...@@ -117,12 +118,12 @@ log_crypt(byte* buf, ulint size, bool decrypt)
compile_time_assert(sizeof(uint32_t) == 4); compile_time_assert(sizeof(uint32_t) == 4);
#define LOG_CRYPT_HDR_SIZE 4 #define LOG_CRYPT_HDR_SIZE 4
lsn &= ~lsn_t(OS_FILE_LOG_BLOCK_SIZE - 1);
for (const byte* const end = buf + size; buf != end; for (const byte* const end = buf + size; buf != end;
buf += OS_FILE_LOG_BLOCK_SIZE) { buf += OS_FILE_LOG_BLOCK_SIZE, lsn += OS_FILE_LOG_BLOCK_SIZE) {
uint32_t dst[(OS_FILE_LOG_BLOCK_SIZE - LOG_CRYPT_HDR_SIZE) uint32_t dst[(OS_FILE_LOG_BLOCK_SIZE - LOG_CRYPT_HDR_SIZE)
/ sizeof(uint32_t)]; / sizeof(uint32_t)];
const ulint log_block_no = log_block_get_hdr_no(buf);
/* The log block number is not encrypted. */ /* The log block number is not encrypted. */
*aes_ctr_iv = *aes_ctr_iv =
...@@ -137,10 +138,10 @@ log_crypt(byte* buf, ulint size, bool decrypt) ...@@ -137,10 +138,10 @@ log_crypt(byte* buf, ulint size, bool decrypt)
# error "LOG_BLOCK_HDR_NO has been moved; redo log format affected!" # error "LOG_BLOCK_HDR_NO has been moved; redo log format affected!"
#endif #endif
aes_ctr_iv[1] = info.crypt_nonce.word; aes_ctr_iv[1] = info.crypt_nonce.word;
mach_write_to_8(reinterpret_cast<byte*>(aes_ctr_iv + 2), mach_write_to_8(reinterpret_cast<byte*>(aes_ctr_iv + 2), lsn);
log_block_get_start_lsn( ut_ad(log_block_get_start_lsn(lsn,
decrypt ? srv_start_lsn : log_sys->lsn, log_block_get_hdr_no(buf))
log_block_no)); == lsn);
int rc = encryption_crypt( int rc = encryption_crypt(
buf + LOG_CRYPT_HDR_SIZE, sizeof dst, buf + LOG_CRYPT_HDR_SIZE, sizeof dst,
...@@ -206,7 +207,11 @@ init_crypt_key(crypt_info_t* info, bool upgrade = false) ...@@ -206,7 +207,11 @@ init_crypt_key(crypt_info_t* info, bool upgrade = false)
return true; return true;
} }
/** Initialize the redo log encryption key. /** Initialize the redo log encryption key and random parameters
when creating a new redo log.
The random parameters will be persisted in the log checkpoint pages.
@see log_crypt_write_checkpoint_buf()
@see log_crypt_read_checkpoint_buf()
@return whether the operation succeeded */ @return whether the operation succeeded */
UNIV_INTERN UNIV_INTERN
bool bool
......
...@@ -997,10 +997,6 @@ log_group_write_buf( ...@@ -997,10 +997,6 @@ log_group_write_buf(
|| log_block_get_hdr_no(buf) || log_block_get_hdr_no(buf)
== log_block_convert_lsn_to_no(start_lsn)); == log_block_convert_lsn_to_no(start_lsn));
if (log_sys->is_encrypted()) {
log_crypt(buf, write_len);
}
/* Calculate the checksums for each log block and write them to /* Calculate the checksums for each log block and write them to
the trailer fields of the log blocks */ the trailer fields of the log blocks */
...@@ -1264,6 +1260,12 @@ log_write_up_to( ...@@ -1264,6 +1260,12 @@ log_write_up_to(
::memset(write_buf + area_end, 0, pad_size); ::memset(write_buf + area_end, 0, pad_size);
} }
} }
if (log_sys->is_encrypted()) {
log_crypt(write_buf + area_start, log_sys->write_lsn,
area_end - area_start);
}
/* Do the write to the log files */ /* Do the write to the log files */
log_group_write_buf( log_group_write_buf(
&log_sys->log, write_buf + area_start, &log_sys->log, write_buf + area_start,
......
...@@ -715,7 +715,8 @@ log_group_read_log_seg( ...@@ -715,7 +715,8 @@ log_group_read_log_seg(
} }
if (group->is_encrypted()) { if (group->is_encrypted()) {
log_crypt(buf, OS_FILE_LOG_BLOCK_SIZE, true); log_crypt(buf, start_lsn,
OS_FILE_LOG_BLOCK_SIZE, true);
} }
} }
} }
...@@ -1016,6 +1017,7 @@ recv_find_max_checkpoint(ulint* max_field) ...@@ -1016,6 +1017,7 @@ recv_find_max_checkpoint(ulint* max_field)
buf + LOG_CHECKPOINT_LSN); buf + LOG_CHECKPOINT_LSN);
group->lsn_offset = mach_read_from_8( group->lsn_offset = mach_read_from_8(
buf + LOG_CHECKPOINT_OFFSET); buf + LOG_CHECKPOINT_OFFSET);
log_sys->next_checkpoint_no = checkpoint_no;
} }
} }
......
...@@ -2223,14 +2223,6 @@ innobase_start_or_create_for_mysql() ...@@ -2223,14 +2223,6 @@ innobase_start_or_create_for_mysql()
recv_sys->dblwr.pages.clear(); recv_sys->dblwr.pages.clear();
if (err == DB_SUCCESS && !srv_read_only_mode) {
log_mutex_enter();
if (log_sys->is_encrypted() && !log_crypt_init()) {
err = DB_ERROR;
}
log_mutex_exit();
}
if (err == DB_SUCCESS) { if (err == DB_SUCCESS) {
/* Initialize the change buffer. */ /* Initialize the change buffer. */
err = dict_boot(); err = dict_boot();
...@@ -2733,13 +2725,6 @@ innobase_start_or_create_for_mysql() ...@@ -2733,13 +2725,6 @@ innobase_start_or_create_for_mysql()
fil_crypt_threads_init(); fil_crypt_threads_init();
fil_system_exit(); fil_system_exit();
/*
Create a checkpoint before logging anything new, so that
the current encryption key in use is definitely logged
before any log blocks encrypted with that key.
*/
log_make_checkpoint_at(LSN_MAX, TRUE);
/* Initialize online defragmentation. */ /* Initialize online defragmentation. */
btr_defragment_init(); btr_defragment_init();
btr_defragment_thread_active = true; btr_defragment_thread_active = true;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment