Commit f6d4f624 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12041: innodb_encrypt_log key rotation

This will change the InnoDB encrypted redo log format only.
Unencrypted redo log will keep using the MariaDB 10.3 format.
In the new encrypted redo log format, 4 additional bytes will
be reserved in the redo log block trailer for storing the
encryption key version.

For performance reasons, the encryption key rotation
(checking if the latest encryption key version is being used)
is only done at log_checkpoint().

LOG_HEADER_FORMAT_CURRENT: Remove.

LOG_HEADER_FORMAT_ENC_10_4: The encrypted 10.4 format.

LOG_BLOCK_KEY: The encryption key version field.

LOG_BLOCK_TRL_SIZE: Remove.

log_t: Add accessors framing_size(), payload_size(), trailer_offset(),
to be used instead of referring to LOG_BLOCK_TRL_SIZE.

log_crypt_t: An operation passed to log_crypt().

log_crypt(): Perform decryption, encryption, or encryption with key
rotation. Return an error if key rotation at decryption fails.
On encryption, keep using the previous key if the rotation fails.
At startup, old-format encrypted redo log may be written before
the redo log is upgraded (rebuilt) to the latest format.

log_write_up_to(): Add the parameter rotate_key=false.

log_checkpoint(): Invoke log_write_up_to() with rotate_key=true.
parent befc09f0
...@@ -2500,8 +2500,7 @@ static lsn_t xtrabackup_copy_log(lsn_t start_lsn, lsn_t end_lsn, bool last) ...@@ -2500,8 +2500,7 @@ static lsn_t xtrabackup_copy_log(lsn_t start_lsn, lsn_t end_lsn, bool last)
if (data_len == OS_FILE_LOG_BLOCK_SIZE) { if (data_len == OS_FILE_LOG_BLOCK_SIZE) {
/* We got a full log block. */ /* We got a full log block. */
scanned_lsn += data_len; scanned_lsn += data_len;
} else if (data_len } else if (data_len >= log_sys.trailer_offset()
>= OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE
|| data_len <= LOG_BLOCK_HDR_SIZE) { || data_len <= LOG_BLOCK_HDR_SIZE) {
/* We got a garbage block (abrupt end of the log). */ /* We got a garbage block (abrupt end of the log). */
msg("mariabackup: garbage block: " LSN_PF ",%zu\n", msg("mariabackup: garbage block: " LSN_PF ",%zu\n",
...@@ -3946,8 +3945,8 @@ xtrabackup_backup_func() ...@@ -3946,8 +3945,8 @@ xtrabackup_backup_func()
goto log_fail; goto log_fail;
} }
ut_ad(!((log_sys.log.format ^ LOG_HEADER_FORMAT_CURRENT) ut_ad(log_sys.log.format == LOG_HEADER_FORMAT_10_3
& ~LOG_HEADER_FORMAT_ENCRYPTED)); || log_sys.log.format == LOG_HEADER_FORMAT_ENC_10_4);
const byte* buf = log_sys.checkpoint_buf; const byte* buf = log_sys.checkpoint_buf;
...@@ -3965,8 +3964,8 @@ xtrabackup_backup_func() ...@@ -3965,8 +3964,8 @@ xtrabackup_backup_func()
goto old_format; goto old_format;
} }
ut_ad(!((log_sys.log.format ^ LOG_HEADER_FORMAT_CURRENT) ut_ad(log_sys.log.format == LOG_HEADER_FORMAT_10_3
& ~LOG_HEADER_FORMAT_ENCRYPTED)); || log_sys.log.format == LOG_HEADER_FORMAT_ENC_10_4);
log_header_read(max_cp_field); log_header_read(max_cp_field);
......
create table t1(a serial) engine=innoDB;
set global innodb_encrypt_tables=ON; set global innodb_encrypt_tables=ON;
show variables like 'innodb_encrypt%'; show variables like 'innodb_encrypt%';
Variable_name Value Variable_name Value
...@@ -13,5 +14,13 @@ set global debug_key_management_version=10; ...@@ -13,5 +14,13 @@ set global debug_key_management_version=10;
select count(*) from information_schema.innodb_tablespaces_encryption where current_key_version <> 10; select count(*) from information_schema.innodb_tablespaces_encryption where current_key_version <> 10;
count(*) count(*)
0 0
SET GLOBAL debug_dbug = '+d,ib_log';
SET GLOBAL innodb_log_checkpoint_now = 1;
SET GLOBAL innodb_flush_log_at_trx_commit = 1;
INSERT INTO t1 VALUES(NULL);
set global innodb_encrypt_tables=OFF; set global innodb_encrypt_tables=OFF;
set global debug_key_management_version=1; set global debug_key_management_version=1;
select * from t1;
a
1
drop table t1;
-- source include/have_innodb.inc -- source include/have_innodb.inc
-- source include/have_debug.inc
-- source include/not_embedded.inc
if (`select count(*) = 0 from information_schema.plugins if (`select count(*) = 0 from information_schema.plugins
where plugin_name = 'debug_key_management' and plugin_status='active'`) where plugin_name = 'debug_key_management' and plugin_status='active'`)
{ {
--skip Needs debug_key_management --skip Needs debug_key_management
} }
create table t1(a serial) engine=innoDB;
set global innodb_encrypt_tables=ON; set global innodb_encrypt_tables=ON;
show variables like 'innodb_encrypt%'; show variables like 'innodb_encrypt%';
...@@ -17,10 +22,21 @@ set global debug_key_management_version=10; ...@@ -17,10 +22,21 @@ set global debug_key_management_version=10;
let $wait_condition= select count(*) = $tables_count from information_schema.innodb_tablespaces_encryption where current_key_version=10; let $wait_condition= select count(*) = $tables_count from information_schema.innodb_tablespaces_encryption where current_key_version=10;
--source include/wait_condition.inc --source include/wait_condition.inc
select count(*) from information_schema.innodb_tablespaces_encryption where current_key_version <> 10; select count(*) from information_schema.innodb_tablespaces_encryption where current_key_version <> 10;
# Test redo log key rotation and crash recovery.
SET GLOBAL debug_dbug = '+d,ib_log';
SET GLOBAL innodb_log_checkpoint_now = 1;
SET GLOBAL innodb_flush_log_at_trx_commit = 1;
INSERT INTO t1 VALUES(NULL);
let $shutdown_timeout = 0;
-- source include/restart_mysqld.inc
# Note that we expect that key_version is increasing so disable encryption before reset # Note that we expect that key_version is increasing so disable encryption before reset
set global innodb_encrypt_tables=OFF; set global innodb_encrypt_tables=OFF;
set global debug_key_management_version=1; set global debug_key_management_version=1;
select * from t1;
drop table t1;
/***************************************************************************** /*****************************************************************************
Copyright (C) 2013, 2015, Google Inc. All Rights Reserved. Copyright (C) 2013, 2015, Google Inc. All Rights Reserved.
Copyright (C) 2014, 2017, MariaDB Corporation. All Rights Reserved. Copyright (C) 2014, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -73,14 +73,23 @@ UNIV_INTERN ...@@ -73,14 +73,23 @@ UNIV_INTERN
bool bool
log_crypt_read_checkpoint_buf(const byte* buf); log_crypt_read_checkpoint_buf(const byte* buf);
/** log_crypt() operation code */
enum log_crypt_t {
/** encrypt a log block without rotating key */
LOG_ENCRYPT,
/** decrypt a log block */
LOG_DECRYPT,
/** attempt to rotate the key, and encrypt a log block */
LOG_ENCRYPT_ROTATE_KEY
};
/** Encrypt or decrypt log blocks. /** Encrypt or decrypt log blocks.
@param[in,out] buf log blocks to encrypt or decrypt @param[in,out] buf log blocks to encrypt or decrypt
@param[in] lsn log sequence number of the start of the buffer @param[in] lsn log sequence number of the start of the buffer
@param[in] size size of the buffer, in bytes @param[in] size size of the buffer, in bytes
@param[in] decrypt whether to decrypt instead of encrypting */ @param[in] op whether to decrypt, encrypt, or rotate key and encrypt
UNIV_INTERN @return whether the operation succeeded (encrypt always does) */
void bool log_crypt(byte* buf, lsn_t lsn, ulint size, log_crypt_t op = LOG_ENCRYPT);
log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt = false);
/** Encrypt or decrypt a temporary file block. /** Encrypt or decrypt a temporary file block.
@param[in] src block to encrypt or decrypt @param[in] src block to encrypt or decrypt
......
...@@ -161,19 +161,16 @@ bool ...@@ -161,19 +161,16 @@ bool
log_set_capacity(ulonglong file_size) log_set_capacity(ulonglong file_size)
MY_ATTRIBUTE((warn_unused_result)); MY_ATTRIBUTE((warn_unused_result));
/******************************************************//** /** Ensure that the log has been written to the log file up to a given
This function is called, e.g., when a transaction wants to commit. It checks log entry (such as that of a transaction commit). Start a new write, or
that the log has been written to the log file up to the last log entry written wait and check if an already running write is covering the request.
by the transaction. If there is a flush running, it waits and checks if the @param[in] lsn log sequence number that should be
flush flushed enough. If not, starts a new flush. */ included in the redo log file write
void @param[in] flush_to_disk whether the written log should also
log_write_up_to( be flushed to the file system
/*============*/ @param[in] rotate_key whether to rotate the encryption key */
lsn_t lsn, /*!< in: log sequence number up to which void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false);
the log should be written, LSN_MAX if not specified */
bool flush_to_disk);
/*!< in: true if we want the written log
also to be flushed to disk */
/** write to the log file up to the last log entry. /** write to the log file up to the last log entry.
@param[in] sync whether we want the written log @param[in] sync whether we want the written log
also to be flushed to disk. */ also to be flushed to disk. */
...@@ -415,13 +412,14 @@ extern my_bool innodb_log_checksums; ...@@ -415,13 +412,14 @@ extern my_bool innodb_log_checksums;
#define LOG_BLOCK_HDR_SIZE 12 /* size of the log block header in #define LOG_BLOCK_HDR_SIZE 12 /* size of the log block header in
bytes */ bytes */
/* Offsets of a log block trailer from the end of the block */ #define LOG_BLOCK_KEY 4 /* encryption key version
before LOG_BLOCK_CHECKSUM;
in LOG_HEADER_FORMAT_ENC_10_4 only */
#define LOG_BLOCK_CHECKSUM 4 /* 4 byte checksum of the log block #define LOG_BLOCK_CHECKSUM 4 /* 4 byte checksum of the log block
contents; in InnoDB versions contents; in InnoDB versions
< 3.23.52 this did not contain the < 3.23.52 this did not contain the
checksum but the same value as checksum but the same value as
.._HDR_NO */ LOG_BLOCK_HDR_NO */
#define LOG_BLOCK_TRL_SIZE 4 /* trailer size in bytes */
/** Offsets inside the checkpoint pages (redo log format version 1) @{ */ /** Offsets inside the checkpoint pages (redo log format version 1) @{ */
/** Checkpoint number */ /** Checkpoint number */
...@@ -476,9 +474,8 @@ or the MySQL version that created the redo log file. */ ...@@ -476,9 +474,8 @@ or the MySQL version that created the redo log file. */
#define LOG_HEADER_FORMAT_10_2 1 #define LOG_HEADER_FORMAT_10_2 1
/** The MariaDB 10.3.2 log format */ /** The MariaDB 10.3.2 log format */
#define LOG_HEADER_FORMAT_10_3 103 #define LOG_HEADER_FORMAT_10_3 103
/** The redo log format identifier corresponding to the current format version. /** The MariaDB 10.4.0 log format (only with innodb_encrypt_log=ON) */
Stored in LOG_HEADER_FORMAT. */ #define LOG_HEADER_FORMAT_ENC_10_4 (104U | 1U << 31)
#define LOG_HEADER_FORMAT_CURRENT LOG_HEADER_FORMAT_10_3
/** Encrypted MariaDB redo log */ /** Encrypted MariaDB redo log */
#define LOG_HEADER_FORMAT_ENCRYPTED (1U<<31) #define LOG_HEADER_FORMAT_ENCRYPTED (1U<<31)
...@@ -556,7 +553,7 @@ struct log_t{ ...@@ -556,7 +553,7 @@ struct log_t{
struct files { struct files {
/** number of files */ /** number of files */
ulint n_files; ulint n_files;
/** format of the redo log: e.g., LOG_HEADER_FORMAT_CURRENT */ /** format of the redo log: e.g., LOG_HEADER_FORMAT_10_3 */
ulint format; ulint format;
/** individual log file size in bytes, including the header */ /** individual log file size in bytes, including the header */
lsn_t file_size; lsn_t file_size;
...@@ -712,11 +709,34 @@ struct log_t{ ...@@ -712,11 +709,34 @@ struct log_t{
/** @return whether the redo log is encrypted */ /** @return whether the redo log is encrypted */
bool is_encrypted() const { return(log.is_encrypted()); } bool is_encrypted() const { return(log.is_encrypted()); }
bool is_initialised() { return m_initialised; } bool is_initialised() const { return m_initialised; }
/** Complete an asynchronous checkpoint write. */ /** Complete an asynchronous checkpoint write. */
void complete_checkpoint(); void complete_checkpoint();
/** @return the log block header + trailer size */
unsigned framing_size() const
{
return log.format == LOG_HEADER_FORMAT_ENC_10_4
? LOG_BLOCK_HDR_SIZE + LOG_BLOCK_KEY + LOG_BLOCK_CHECKSUM
: LOG_BLOCK_HDR_SIZE + LOG_BLOCK_CHECKSUM;
}
/** @return the log block payload size */
unsigned payload_size() const
{
return log.format == LOG_HEADER_FORMAT_ENC_10_4
? OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE - LOG_BLOCK_CHECKSUM -
LOG_BLOCK_KEY
: OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE - LOG_BLOCK_CHECKSUM;
}
/** @return the log block trailer offset */
unsigned trailer_offset() const
{
return log.format == LOG_HEADER_FORMAT_ENC_10_4
? OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM - LOG_BLOCK_KEY
: OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM;
}
/** Initialise the redo log subsystem. */ /** Initialise the redo log subsystem. */
void create(); void create();
......
...@@ -215,7 +215,7 @@ log_block_calc_checksum_format_0( ...@@ -215,7 +215,7 @@ log_block_calc_checksum_format_0(
sum = 1; sum = 1;
sh = 0; sh = 0;
for (i = 0; i < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE; i++) { for (i = 0; i < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM; i++) {
ulint b = (ulint) block[i]; ulint b = (ulint) block[i];
sum &= 0x7FFFFFFFUL; sum &= 0x7FFFFFFFUL;
sum += b; sum += b;
...@@ -237,7 +237,7 @@ ulint ...@@ -237,7 +237,7 @@ ulint
log_block_calc_checksum_crc32( log_block_calc_checksum_crc32(
const byte* block) const byte* block)
{ {
return(ut_crc32(block, OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE)); return ut_crc32(block, OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_CHECKSUM);
} }
/** Calculates the checksum for a log block using the "no-op" algorithm. /** Calculates the checksum for a log block using the "no-op" algorithm.
...@@ -338,7 +338,7 @@ log_reserve_and_write_fast( ...@@ -338,7 +338,7 @@ log_reserve_and_write_fast(
#endif /* UNIV_LOG_LSN_DEBUG */ #endif /* UNIV_LOG_LSN_DEBUG */
+ log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE; + log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE;
if (data_len >= OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) { if (data_len >= log_sys.trailer_offset()) {
/* The string does not fit within the current log block /* The string does not fit within the current log block
or the log block would become full */ or the log block would become full */
......
...@@ -82,19 +82,62 @@ log_block_get_start_lsn( ...@@ -82,19 +82,62 @@ log_block_get_start_lsn(
return start_lsn; return start_lsn;
} }
/** Generate crypt key from crypt msg.
@param[in,out] info encryption key
@param[in] upgrade whether to use the key in MariaDB 10.1 format
@return whether the operation was successful */
static bool init_crypt_key(crypt_info_t* info, bool upgrade = false)
{
byte mysqld_key[MY_AES_MAX_KEY_LENGTH];
uint keylen = sizeof mysqld_key;
compile_time_assert(16 == sizeof info->crypt_key);
if (uint rc = encryption_key_get(LOG_DEFAULT_ENCRYPTION_KEY,
info->key_version, mysqld_key,
&keylen)) {
ib::error()
<< "Obtaining redo log encryption key version "
<< info->key_version << " failed (" << rc
<< "). Maybe the key or the required encryption "
"key management plugin was not found.";
return false;
}
if (upgrade) {
while (keylen < sizeof mysqld_key) {
mysqld_key[keylen++] = 0;
}
}
uint dst_len;
int err= my_aes_crypt(MY_AES_ECB,
ENCRYPTION_FLAG_NOPAD | ENCRYPTION_FLAG_ENCRYPT,
info->crypt_msg.bytes, sizeof info->crypt_msg,
info->crypt_key.bytes, &dst_len,
mysqld_key, keylen, NULL, 0);
if (err != MY_AES_OK || dst_len != MY_AES_BLOCK_SIZE) {
ib::error() << "Getting redo log crypto key failed: err = "
<< err << ", len = " << dst_len;
return false;
}
return true;
}
/** Encrypt or decrypt log blocks. /** Encrypt or decrypt log blocks.
@param[in,out] buf log blocks to encrypt or decrypt @param[in,out] buf log blocks to encrypt or decrypt
@param[in] lsn log sequence number of the start of the buffer @param[in] lsn log sequence number of the start of the buffer
@param[in] size size of the buffer, in bytes @param[in] size size of the buffer, in bytes
@param[in] decrypt whether to decrypt instead of encrypting */ @param[in] op whether to decrypt, encrypt, or rotate key and encrypt
UNIV_INTERN @return whether the operation succeeded (encrypt always does) */
void bool log_crypt(byte* buf, lsn_t lsn, ulint size, log_crypt_t op)
log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt)
{ {
ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0); ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_ad(ulint(buf) % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_a(info.key_version); ut_a(info.key_version);
uint dst_len;
uint32_t aes_ctr_iv[MY_AES_BLOCK_SIZE / sizeof(uint32_t)]; uint32_t aes_ctr_iv[MY_AES_BLOCK_SIZE / sizeof(uint32_t)];
compile_time_assert(sizeof(uint32_t) == 4); compile_time_assert(sizeof(uint32_t) == 4);
...@@ -103,7 +146,8 @@ log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt) ...@@ -103,7 +146,8 @@ log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt)
for (const byte* const end = buf + size; buf != end; for (const byte* const end = buf + size; buf != end;
buf += OS_FILE_LOG_BLOCK_SIZE, lsn += OS_FILE_LOG_BLOCK_SIZE) { buf += OS_FILE_LOG_BLOCK_SIZE, lsn += OS_FILE_LOG_BLOCK_SIZE) {
uint32_t dst[(OS_FILE_LOG_BLOCK_SIZE - LOG_CRYPT_HDR_SIZE) uint32_t dst[(OS_FILE_LOG_BLOCK_SIZE - LOG_CRYPT_HDR_SIZE
- LOG_BLOCK_CHECKSUM)
/ sizeof(uint32_t)]; / sizeof(uint32_t)];
/* The log block number is not encrypted. */ /* The log block number is not encrypted. */
...@@ -123,64 +167,61 @@ log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt) ...@@ -123,64 +167,61 @@ log_crypt(byte* buf, lsn_t lsn, ulint size, bool decrypt)
ut_ad(log_block_get_start_lsn(lsn, ut_ad(log_block_get_start_lsn(lsn,
log_block_get_hdr_no(buf)) log_block_get_hdr_no(buf))
== lsn); == lsn);
byte* key_ver = &buf[OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_KEY
- LOG_BLOCK_CHECKSUM];
const uint dst_size
= log_sys.log.format == LOG_HEADER_FORMAT_ENC_10_4
? sizeof dst - LOG_BLOCK_KEY
: sizeof dst;
if (log_sys.log.format == LOG_HEADER_FORMAT_ENC_10_4) {
const uint key_version = info.key_version;
switch (op) {
case LOG_ENCRYPT_ROTATE_KEY:
info.key_version
= encryption_key_get_latest_version(
LOG_DEFAULT_ENCRYPTION_KEY);
if (key_version != info.key_version
&& !init_crypt_key(&info)) {
info.key_version = key_version;
}
/* fall through */
case LOG_ENCRYPT:
mach_write_to_4(key_ver, info.key_version);
break;
case LOG_DECRYPT:
info.key_version = mach_read_from_4(key_ver);
if (key_version != info.key_version
&& !init_crypt_key(&info)) {
return false;
}
}
#ifndef DBUG_OFF
if (key_version != info.key_version) {
DBUG_PRINT("ib_log", ("key_version: %x -> %x",
key_version,
info.key_version));
}
#endif /* !DBUG_OFF */
}
ut_ad(LOG_CRYPT_HDR_SIZE + dst_size
== log_sys.trailer_offset());
uint dst_len;
int rc = encryption_crypt( int rc = encryption_crypt(
buf + LOG_CRYPT_HDR_SIZE, sizeof dst, buf + LOG_CRYPT_HDR_SIZE, dst_size,
reinterpret_cast<byte*>(dst), &dst_len, reinterpret_cast<byte*>(dst), &dst_len,
const_cast<byte*>(info.crypt_key.bytes), const_cast<byte*>(info.crypt_key.bytes),
sizeof info.crypt_key, sizeof info.crypt_key,
reinterpret_cast<byte*>(aes_ctr_iv), sizeof aes_ctr_iv, reinterpret_cast<byte*>(aes_ctr_iv), sizeof aes_ctr_iv,
decrypt op == LOG_DECRYPT
? ENCRYPTION_FLAG_DECRYPT | ENCRYPTION_FLAG_NOPAD ? ENCRYPTION_FLAG_DECRYPT | ENCRYPTION_FLAG_NOPAD
: ENCRYPTION_FLAG_ENCRYPT | ENCRYPTION_FLAG_NOPAD, : ENCRYPTION_FLAG_ENCRYPT | ENCRYPTION_FLAG_NOPAD,
LOG_DEFAULT_ENCRYPTION_KEY, LOG_DEFAULT_ENCRYPTION_KEY,
info.key_version); info.key_version);
ut_a(rc == MY_AES_OK); ut_a(rc == MY_AES_OK);
ut_a(dst_len == sizeof dst); ut_a(dst_len == dst_size);
memcpy(buf + LOG_CRYPT_HDR_SIZE, dst, sizeof dst); memcpy(buf + LOG_CRYPT_HDR_SIZE, dst, dst_size);
}
}
/** Generate crypt key from crypt msg.
@param[in,out] info encryption key
@param[in] upgrade whether to use the key in MariaDB 10.1 format
@return whether the operation was successful */
static bool init_crypt_key(crypt_info_t* info, bool upgrade = false)
{
byte mysqld_key[MY_AES_MAX_KEY_LENGTH];
uint keylen = sizeof mysqld_key;
compile_time_assert(16 == sizeof info->crypt_key);
if (uint rc = encryption_key_get(LOG_DEFAULT_ENCRYPTION_KEY,
info->key_version, mysqld_key,
&keylen)) {
ib::error()
<< "Obtaining redo log encryption key version "
<< info->key_version << " failed (" << rc
<< "). Maybe the key or the required encryption "
"key management plugin was not found.";
return false;
}
if (upgrade) {
while (keylen < sizeof mysqld_key) {
mysqld_key[keylen++] = 0;
}
}
uint dst_len;
int err= my_aes_crypt(MY_AES_ECB,
ENCRYPTION_FLAG_NOPAD | ENCRYPTION_FLAG_ENCRYPT,
info->crypt_msg.bytes, sizeof info->crypt_msg,
info->crypt_key.bytes, &dst_len,
mysqld_key, keylen, NULL, 0);
if (err != MY_AES_OK || dst_len != MY_AES_BLOCK_SIZE) {
ib::error() << "Getting redo log crypto key failed: err = "
<< err << ", len = " << dst_len;
return false;
} }
return true; return true;
......
...@@ -258,9 +258,9 @@ log_calculate_actual_len( ...@@ -258,9 +258,9 @@ log_calculate_actual_len(
{ {
ut_ad(log_mutex_own()); ut_ad(log_mutex_own());
const ulint framing_size = log_sys.framing_size();
/* actual length stored per block */ /* actual length stored per block */
const ulint len_per_blk = OS_FILE_LOG_BLOCK_SIZE const ulint len_per_blk = OS_FILE_LOG_BLOCK_SIZE - framing_size;
- (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE);
/* actual data length in last block already written */ /* actual data length in last block already written */
ulint extra_len = (log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE); ulint extra_len = (log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE);
...@@ -269,8 +269,7 @@ log_calculate_actual_len( ...@@ -269,8 +269,7 @@ log_calculate_actual_len(
extra_len -= LOG_BLOCK_HDR_SIZE; extra_len -= LOG_BLOCK_HDR_SIZE;
/* total extra length for block header and trailer */ /* total extra length for block header and trailer */
extra_len = ((len + extra_len) / len_per_blk) extra_len = ((len + extra_len) / len_per_blk) * framing_size;
* (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE);
return(len + extra_len); return(len + extra_len);
} }
...@@ -402,26 +401,24 @@ log_write_low( ...@@ -402,26 +401,24 @@ log_write_low(
ulint str_len) /*!< in: string length */ ulint str_len) /*!< in: string length */
{ {
ulint len; ulint len;
ulint data_len;
byte* log_block;
ut_ad(log_mutex_own()); ut_ad(log_mutex_own());
const ulint trailer_offset = log_sys.trailer_offset();
part_loop: part_loop:
/* Calculate a part length */ /* Calculate a part length */
data_len = (log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE) + str_len; ulint data_len = (log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE) + str_len;
if (data_len <= OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) { if (data_len <= trailer_offset) {
/* The string fits within the current log block */ /* The string fits within the current log block */
len = str_len; len = str_len;
} else { } else {
data_len = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE; data_len = trailer_offset;
len = OS_FILE_LOG_BLOCK_SIZE len = trailer_offset
- (log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE) - log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE;
- LOG_BLOCK_TRL_SIZE;
} }
memcpy(log_sys.buf + log_sys.buf_free, str, len); memcpy(log_sys.buf + log_sys.buf_free, str, len);
...@@ -429,18 +426,18 @@ log_write_low( ...@@ -429,18 +426,18 @@ log_write_low(
str_len -= len; str_len -= len;
str = str + len; str = str + len;
log_block = static_cast<byte*>( byte* log_block = static_cast<byte*>(
ut_align_down(log_sys.buf + log_sys.buf_free, ut_align_down(log_sys.buf + log_sys.buf_free,
OS_FILE_LOG_BLOCK_SIZE)); OS_FILE_LOG_BLOCK_SIZE));
log_block_set_data_len(log_block, data_len); log_block_set_data_len(log_block, data_len);
if (data_len == OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) { if (data_len == trailer_offset) {
/* This block became full */ /* This block became full */
log_block_set_data_len(log_block, OS_FILE_LOG_BLOCK_SIZE); log_block_set_data_len(log_block, OS_FILE_LOG_BLOCK_SIZE);
log_block_set_checkpoint_no(log_block, log_block_set_checkpoint_no(log_block,
log_sys.next_checkpoint_no); log_sys.next_checkpoint_no);
len += LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE; len += log_sys.framing_size();
log_sys.lsn += len; log_sys.lsn += len;
...@@ -668,8 +665,7 @@ void log_t::files::create(ulint n_files) ...@@ -668,8 +665,7 @@ void log_t::files::create(ulint n_files)
this->n_files= n_files; this->n_files= n_files;
format= srv_encrypt_log format= srv_encrypt_log
? LOG_HEADER_FORMAT_CURRENT | LOG_HEADER_FORMAT_ENCRYPTED ? LOG_HEADER_FORMAT_ENC_10_4 : LOG_HEADER_FORMAT_10_3;
: LOG_HEADER_FORMAT_CURRENT;
file_size= srv_log_file_size; file_size= srv_log_file_size;
state= LOG_GROUP_OK; state= LOG_GROUP_OK;
lsn= LOG_START_LSN; lsn= LOG_START_LSN;
...@@ -702,8 +698,8 @@ log_file_header_flush( ...@@ -702,8 +698,8 @@ log_file_header_flush(
ut_ad(log_write_mutex_own()); ut_ad(log_write_mutex_own());
ut_ad(!recv_no_log_write); ut_ad(!recv_no_log_write);
ut_a(nth_file < log_sys.log.n_files); ut_a(nth_file < log_sys.log.n_files);
ut_ad((log_sys.log.format & ~LOG_HEADER_FORMAT_ENCRYPTED) ut_ad(log_sys.log.format == LOG_HEADER_FORMAT_10_3
== LOG_HEADER_FORMAT_CURRENT); || log_sys.log.format == LOG_HEADER_FORMAT_ENC_10_4);
buf = log_sys.log.file_header_bufs[nth_file]; buf = log_sys.log.file_header_bufs[nth_file];
...@@ -939,11 +935,9 @@ wait and check if an already running write is covering the request. ...@@ -939,11 +935,9 @@ wait and check if an already running write is covering the request.
@param[in] lsn log sequence number that should be @param[in] lsn log sequence number that should be
included in the redo log file write included in the redo log file write
@param[in] flush_to_disk whether the written log should also @param[in] flush_to_disk whether the written log should also
be flushed to the file system */ be flushed to the file system
void @param[in] rotate_key whether to rotate the encryption key */
log_write_up_to( void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
lsn_t lsn,
bool flush_to_disk)
{ {
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
ulint loop_count = 0; ulint loop_count = 0;
...@@ -952,6 +946,7 @@ log_write_up_to( ...@@ -952,6 +946,7 @@ log_write_up_to(
lsn_t write_lsn; lsn_t write_lsn;
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
ut_ad(!rotate_key || flush_to_disk);
if (recv_no_ibuf_operations) { if (recv_no_ibuf_operations) {
/* Recovery is running and no operations on the log files are /* Recovery is running and no operations on the log files are
...@@ -1095,7 +1090,8 @@ log_write_up_to( ...@@ -1095,7 +1090,8 @@ log_write_up_to(
if (log_sys.is_encrypted()) { if (log_sys.is_encrypted()) {
log_crypt(write_buf + area_start, log_sys.write_lsn, log_crypt(write_buf + area_start, log_sys.write_lsn,
area_end - area_start); area_end - area_start,
rotate_key ? LOG_ENCRYPT_ROTATE_KEY : LOG_ENCRYPT);
} }
/* Do the write to the log files */ /* Do the write to the log files */
...@@ -1503,7 +1499,7 @@ log_checkpoint( ...@@ -1503,7 +1499,7 @@ log_checkpoint(
log_mutex_exit(); log_mutex_exit();
log_write_up_to(flush_lsn, true); log_write_up_to(flush_lsn, true, true);
DBUG_EXECUTE_IF( DBUG_EXECUTE_IF(
"using_wa_checkpoint_middle", "using_wa_checkpoint_middle",
...@@ -2078,13 +2074,9 @@ log_pad_current_log_block(void) ...@@ -2078,13 +2074,9 @@ log_pad_current_log_block(void)
/* We retrieve lsn only because otherwise gcc crashed on HP-UX */ /* We retrieve lsn only because otherwise gcc crashed on HP-UX */
lsn = log_reserve_and_open(OS_FILE_LOG_BLOCK_SIZE); lsn = log_reserve_and_open(OS_FILE_LOG_BLOCK_SIZE);
pad_length = OS_FILE_LOG_BLOCK_SIZE pad_length = log_sys.trailer_offset()
- (log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE) - log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE;
- LOG_BLOCK_TRL_SIZE; if (pad_length == log_sys.payload_size()) {
if (pad_length
== (OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
- LOG_BLOCK_TRL_SIZE)) {
pad_length = 0; pad_length = 0;
} }
......
...@@ -706,14 +706,17 @@ bool log_t::files::read_log_seg(lsn_t* start_lsn, lsn_t end_lsn) ...@@ -706,14 +706,17 @@ bool log_t::files::read_log_seg(lsn_t* start_lsn, lsn_t end_lsn)
<< log_block_get_checkpoint_no(buf) << log_block_get_checkpoint_no(buf)
<< " expected: " << crc << " expected: " << crc
<< " found: " << cksum; << " found: " << cksum;
fail:
end_lsn = *start_lsn; end_lsn = *start_lsn;
success = false; success = false;
break; break;
} }
if (is_encrypted()) { if (is_encrypted()
log_crypt(buf, *start_lsn, && !log_crypt(buf, *start_lsn,
OS_FILE_LOG_BLOCK_SIZE, true); OS_FILE_LOG_BLOCK_SIZE,
LOG_DECRYPT)) {
goto fail;
} }
} }
} }
...@@ -953,8 +956,9 @@ recv_find_max_checkpoint(ulint* max_field) ...@@ -953,8 +956,9 @@ recv_find_max_checkpoint(ulint* max_field)
return(recv_find_max_checkpoint_0(max_field)); return(recv_find_max_checkpoint_0(max_field));
case LOG_HEADER_FORMAT_10_2: case LOG_HEADER_FORMAT_10_2:
case LOG_HEADER_FORMAT_10_2 | LOG_HEADER_FORMAT_ENCRYPTED: case LOG_HEADER_FORMAT_10_2 | LOG_HEADER_FORMAT_ENCRYPTED:
case LOG_HEADER_FORMAT_CURRENT: case LOG_HEADER_FORMAT_10_3:
case LOG_HEADER_FORMAT_CURRENT | LOG_HEADER_FORMAT_ENCRYPTED: case LOG_HEADER_FORMAT_10_3 | LOG_HEADER_FORMAT_ENCRYPTED:
case LOG_HEADER_FORMAT_ENC_10_4:
break; break;
default: default:
ib::error() << "Unsupported redo log format." ib::error() << "Unsupported redo log format."
...@@ -2173,17 +2177,12 @@ recv_calc_lsn_on_data_add( ...@@ -2173,17 +2177,12 @@ recv_calc_lsn_on_data_add(
ib_uint64_t len) /*!< in: this many bytes of data is ib_uint64_t len) /*!< in: this many bytes of data is
added, log block headers not included */ added, log block headers not included */
{ {
ulint frag_len; unsigned frag_len = (lsn % OS_FILE_LOG_BLOCK_SIZE) - LOG_BLOCK_HDR_SIZE;
ib_uint64_t lsn_len; unsigned payload_size = log_sys.payload_size();
ut_ad(frag_len < payload_size);
frag_len = (lsn % OS_FILE_LOG_BLOCK_SIZE) - LOG_BLOCK_HDR_SIZE; lsn_t lsn_len = len;
ut_ad(frag_len < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE lsn_len += (lsn_len + frag_len) / payload_size
- LOG_BLOCK_TRL_SIZE); * (OS_FILE_LOG_BLOCK_SIZE - payload_size);
lsn_len = len;
lsn_len += (lsn_len + frag_len)
/ (OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
- LOG_BLOCK_TRL_SIZE)
* (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE);
return(lsn + lsn_len); return(lsn + lsn_len);
} }
...@@ -2645,11 +2644,7 @@ bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn) ...@@ -2645,11 +2644,7 @@ bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn)
start_offset = LOG_BLOCK_HDR_SIZE; start_offset = LOG_BLOCK_HDR_SIZE;
} }
end_offset = data_len; end_offset = std::min<ulint>(data_len, log_sys.trailer_offset());
if (end_offset > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
end_offset = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE;
}
ut_ad(start_offset <= end_offset); ut_ad(start_offset <= end_offset);
......
...@@ -1346,9 +1346,9 @@ srv_prepare_to_delete_redo_log_files( ...@@ -1346,9 +1346,9 @@ srv_prepare_to_delete_redo_log_files(
{ {
ib::info info; ib::info info;
if (srv_log_file_size == 0 if (srv_log_file_size == 0
|| (log_sys.log.format || (log_sys.log.format != LOG_HEADER_FORMAT_10_3
& ~LOG_HEADER_FORMAT_ENCRYPTED) && log_sys.log.format
!= LOG_HEADER_FORMAT_CURRENT) { != LOG_HEADER_FORMAT_ENC_10_4)) {
info << "Upgrading redo log: "; info << "Upgrading redo log: ";
} else if (n_files != srv_n_log_files } else if (n_files != srv_n_log_files
|| srv_log_file_size || srv_log_file_size
...@@ -2174,9 +2174,8 @@ dberr_t srv_start(bool create_new_db) ...@@ -2174,9 +2174,8 @@ dberr_t srv_start(bool create_new_db)
&& srv_n_log_files_found == srv_n_log_files && srv_n_log_files_found == srv_n_log_files
&& log_sys.log.format && log_sys.log.format
== (srv_encrypt_log == (srv_encrypt_log
? LOG_HEADER_FORMAT_CURRENT ? LOG_HEADER_FORMAT_ENC_10_4
| LOG_HEADER_FORMAT_ENCRYPTED : LOG_HEADER_FORMAT_10_3)) {
: LOG_HEADER_FORMAT_CURRENT)) {
/* No need to upgrade or resize the redo log. */ /* No need to upgrade or resize the redo log. */
} else { } else {
/* Prepare to delete the old redo log files */ /* Prepare to delete the old redo log files */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment