Commit 7537854b authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-34705: Binlog in Engine: Pre-allocate binlog tablespaces in background thread

Signed-off-by: default avatarKristian Nielsen <knielsen@knielsen-hq.org>
parent 8a1e1f61
...@@ -1530,6 +1530,7 @@ struct handlerton ...@@ -1530,6 +1530,7 @@ struct handlerton
void *optimizer_costs; /* Costs are stored here */ void *optimizer_costs; /* Costs are stored here */
/* Optional implementation of binlog in the engine. */ /* Optional implementation of binlog in the engine. */
bool (*binlog_init)(size_t binlog_size);
bool (*binlog_write_direct)(IO_CACHE *cache, size_t main_size); bool (*binlog_write_direct)(IO_CACHE *cache, size_t main_size);
handler_binlog_reader * (*get_binlog_reader)(); handler_binlog_reader * (*get_binlog_reader)();
......
...@@ -5470,6 +5470,13 @@ static int init_server_components() ...@@ -5470,6 +5470,13 @@ static int init_server_components()
if (opt_bin_log) if (opt_bin_log)
{ {
int error; int error;
if (opt_binlog_engine_hton)
{
if ((*opt_binlog_engine_hton->binlog_init)((size_t)max_binlog_size))
error= 1;
}
if (true) /* ToDo: `else` branch (don't open legacy binlog if using engine implementation). */
{
mysql_mutex_t *log_lock= mysql_bin_log.get_log_lock(); mysql_mutex_t *log_lock= mysql_bin_log.get_log_lock();
mysql_mutex_lock(log_lock); mysql_mutex_lock(log_lock);
error= mysql_bin_log.open(opt_bin_logname, 0, 0, error= mysql_bin_log.open(opt_bin_logname, 0, 0,
...@@ -5478,6 +5485,7 @@ static int init_server_components() ...@@ -5478,6 +5485,7 @@ static int init_server_components()
if (unlikely(error)) if (unlikely(error))
unireg_abort(1); unireg_abort(1);
} }
}
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
binlog_space_limit= internal_binlog_space_limit; binlog_space_limit= internal_binlog_space_limit;
......
...@@ -24,6 +24,8 @@ File space management ...@@ -24,6 +24,8 @@ File space management
Created 11/29/1995 Heikki Tuuri Created 11/29/1995 Heikki Tuuri
***********************************************************************/ ***********************************************************************/
#include <thread>
#include "fsp0fsp.h" #include "fsp0fsp.h"
#include "buf0buf.h" #include "buf0buf.h"
#include "buf0flu.h" #include "buf0flu.h"
...@@ -3768,25 +3770,52 @@ void fsp_shrink_temp_space() ...@@ -3768,25 +3770,52 @@ void fsp_shrink_temp_space()
} }
static uint32_t binlog_size_in_pages;
fil_space_t* binlog_space;
buf_block_t *binlog_cur_block; buf_block_t *binlog_cur_block;
uint32_t binlog_cur_page_no; uint32_t binlog_cur_page_no;
/* ToDo: This needs to be initialized to the current point in any existing binlog. */ uint32_t binlog_cur_page_offset;
uint32_t binlog_cur_page_offset= FIL_PAGE_DATA; /*
Mutex protecting active_binlog_file_no and active_binlog_space.
*/
mysql_mutex_t active_binlog_mutex;
pthread_cond_t active_binlog_cond;
static std::thread binlog_prealloc_thr_obj;
static bool prealloc_thread_end= false;
/* The currently being written binlog tablespace. */
/* ToDo: This needs to discover existing binlogs and start from the next free index. */ /* ToDo: This needs to discover existing binlogs and start from the next free index. */
std::atomic<uint64_t> binlog_file_no; std::atomic<uint64_t> active_binlog_file_no;
fil_space_t* active_binlog_space;
/*
The first binlog tablespace that is still open.
This can be equal to active_binlog_file_no, if the tablespace prior to the
active one has been fully flushed out to disk and closed.
Or it can be one less, if the prior tablespace is still being written out and
closed.
*/
std::atomic<uint64_t> first_open_binlog_file_no;
/*
The most recent created and open tablespace.
This can be equal to active_binlog_file_no+1, if the next tablespace to be
used has already been pre-allocated and opened.
Or it can be the same as active_binlog_file_no, if the pre-allocation of the
next tablespace is still pending.
*/
uint64_t last_created_binlog_file_no;
fil_space_t *last_created_binlog_space;
/* /*
Point at which it is guaranteed that all data has been written out to the Point at which it is guaranteed that all data has been written out to the
binlog file (on the OS level; not necessarily fsync()'ed yet). binlog file (on the OS level; not necessarily fsync()'ed yet).
Stores the most recent two values, each corresponding to binlog_file_no&1. Stores the most recent two values, each corresponding to active_binlog_file_no&1.
*/ */
/* ToDo: maintain this offset value as up to where data has been written out to the OS. Needs to be binary-searched in current binlog file at server restart; which is also a reason why it might not be a multiple of the page size. */ /* ToDo: maintain this offset value as up to where data has been written out to the OS. Needs to be binary-searched in current binlog file at server restart; which is also a reason why it might not be a multiple of the page size. */
std::atomic<uint64_t> binlog_cur_written_offset[2]; std::atomic<uint64_t> binlog_cur_written_offset[2];
/* Offset of last valid byte of data in most recent 2 binlog files. */ /* Offset of last valid byte of data in most recent 2 binlog files. */
std::atomic<uint64_t> binlog_cur_end_offset[2]; std::atomic<uint64_t> binlog_cur_end_offset[2];
static void fsp_binlog_prealloc_thread();
/* '.' + '/' + "binlog" + '-' + (<=20 digits) + '.' + "ibb" + '\0'. */ /* '.' + '/' + "binlog" + '-' + (<=20 digits) + '.' + "ibb" + '\0'. */
#define BINLOG_NAME_LEN 1 + 1 + 6 + 1 + 20 + 1 + 3 + 1 #define BINLOG_NAME_LEN 1 + 1 + 6 + 1 + 20 + 1 + 3 + 1
...@@ -3832,14 +3861,82 @@ fsp_binlog_tablespace_close(uint64_t file_no) ...@@ -3832,14 +3861,82 @@ fsp_binlog_tablespace_close(uint64_t file_no)
} }
/*
Initialize the InnoDB implementation of binlog.
Note that we do not create or open any binlog tablespaces here.
This is only done if InnoDB binlog is enabled on the server leve.
*/
void
fsp_binlog_init()
{
mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr);
pthread_cond_init(&active_binlog_cond, nullptr);
}
/*
Open the InnoDB binlog implementation.
This is called from server binlog layer if the user configured the binlog to
use the innodb implementation (with --binlog-storage-engine=innodb).
*/
bool
innodb_binlog_init(size_t binlog_size)
{
uint64_t pages= binlog_size >> srv_page_size_shift;
if (UNIV_LIKELY(pages > (uint64_t)UINT32_MAX)) {
pages= UINT32_MAX;
ib::warn() << "Requested max_binlog_size is larger than the maximum " <<
"InnoDB tablespace size, truncated to " <<
(pages << srv_page_size_shift) << ".";
} else if (pages < 2) { /* Minimum one data page and one index page. */
pages= 2;
ib::warn() << "Requested max_binlog_size is smaller than the minimum " <<
"size supported by InnoDB, truncated to " <<
(pages << srv_page_size_shift) << ".";
}
binlog_size_in_pages= (uint32_t)pages;
first_open_binlog_file_no.store(~(uint64_t)0, std::memory_order_relaxed);
last_created_binlog_file_no= ~(uint64_t)0;
active_binlog_file_no.store(~(uint64_t)0, std::memory_order_release);
active_binlog_space= nullptr;
/* ToDo: This needs to be initialized to the current point in any existing binlog. */
binlog_cur_page_offset= FIL_PAGE_DATA;
/* Start pre-allocating new binlog files. */
// ToDo: Eventually, open existing binlog files and find where to continue writing.
binlog_prealloc_thr_obj= std::thread{fsp_binlog_prealloc_thread};
mysql_mutex_lock(&active_binlog_mutex);
while (last_created_binlog_file_no == ~(uint64_t)0) {
/* Wait for the first binlog file to be available. */
my_cond_wait(&active_binlog_cond, &active_binlog_mutex.m_mutex);
}
mysql_mutex_unlock(&active_binlog_mutex);
ut_ad(active_binlog_file_no == 0);
return false;
}
void fsp_binlog_close() void fsp_binlog_close()
{ {
uint64_t file_no= binlog_file_no.load(std::memory_order_relaxed); if (binlog_prealloc_thr_obj.joinable()) {
// ToDo: this check needs to find another way to determine how many tablespaces are still open, once we no longer always start from 1. mysql_mutex_lock(&active_binlog_mutex);
if (file_no >= 2) prealloc_thread_end= true;
fsp_binlog_tablespace_close(file_no - 1); pthread_cond_signal(&active_binlog_cond);
if (file_no >= 1) mysql_mutex_unlock(&active_binlog_mutex);
binlog_prealloc_thr_obj.join();
}
uint64_t file_no= first_open_binlog_file_no.load(std::memory_order_relaxed);
if (file_no != ~(uint64_t)0) {
if (file_no <= last_created_binlog_file_no) {
fsp_binlog_tablespace_close(file_no); fsp_binlog_tablespace_close(file_no);
if (file_no + 1 <= last_created_binlog_file_no) {
fsp_binlog_tablespace_close(file_no + 1);
}
}
}
/* /*
ToDo: This doesn't seem to free all memory. I'm still getting leaks in eg. --valgrind. Find out why and fix. Example: ToDo: This doesn't seem to free all memory. I'm still getting leaks in eg. --valgrind. Find out why and fix. Example:
==3464576== at 0x48407B4: malloc (vg_replace_malloc.c:381) ==3464576== at 0x48407B4: malloc (vg_replace_malloc.c:381)
...@@ -3848,18 +3945,22 @@ void fsp_binlog_close() ...@@ -3848,18 +3945,22 @@ void fsp_binlog_close()
==3464576== by 0x1558445: fsp_binlog_tablespace_create(unsigned long) (fsp0fsp.cc:3900) ==3464576== by 0x1558445: fsp_binlog_tablespace_create(unsigned long) (fsp0fsp.cc:3900)
==3464576== by 0x1558C70: fsp_binlog_write_cache(st_io_cache*, unsigned long, mtr_t*) (fsp0fsp.cc:4013) ==3464576== by 0x1558C70: fsp_binlog_write_cache(st_io_cache*, unsigned long, mtr_t*) (fsp0fsp.cc:4013)
*/ */
pthread_cond_destroy(&active_binlog_cond);
mysql_mutex_destroy(&active_binlog_mutex);
} }
/** Create a binlog tablespace file /** Create a binlog tablespace file
@param[in] file_no Index of the binlog tablespace @param[in] file_no Index of the binlog tablespace
@param[out] new_space The newly created tablespace
@return DB_SUCCESS or error code */ @return DB_SUCCESS or error code */
dberr_t fsp_binlog_tablespace_create(uint64_t file_no) dberr_t fsp_binlog_tablespace_create(uint64_t file_no, fil_space_t **new_space)
{ {
pfs_os_file_t fh; pfs_os_file_t fh;
bool ret; bool ret;
uint32_t size= (1<<20) >> srv_page_size_shift /* ToDo --max-binlog-size */; *new_space= nullptr;
uint32_t size= binlog_size_in_pages;
if(srv_read_only_mode) if(srv_read_only_mode)
return DB_ERROR; return DB_ERROR;
...@@ -3896,7 +3997,7 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no) ...@@ -3896,7 +3997,7 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no)
mysql_mutex_lock(&fil_system.mutex); mysql_mutex_lock(&fil_system.mutex);
/* ToDo: Need to ensure file (N-2) is no longer active before creating (N). */ /* ToDo: Need to ensure file (N-2) is no longer active before creating (N). */
uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (file_no & 1); uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (file_no & 1);
if (!(binlog_space= fil_space_t::create(space_id, if (!(*new_space= fil_space_t::create(space_id,
( FSP_FLAGS_FCRC32_MASK_MARKER | ( FSP_FLAGS_FCRC32_MASK_MARKER |
FSP_FLAGS_FCRC32_PAGE_SSIZE()), FSP_FLAGS_FCRC32_PAGE_SSIZE()),
FIL_TYPE_TABLESPACE, crypt_data, FIL_TYPE_TABLESPACE, crypt_data,
...@@ -3905,19 +4006,87 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no) ...@@ -3905,19 +4006,87 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no)
return DB_ERROR; return DB_ERROR;
} }
fil_node_t* node = binlog_space->add(name, fh, size, false, true); fil_node_t* node = (*new_space)->add(name, fh, size, false, true);
IF_WIN(node->find_metadata(), node->find_metadata(fh, true)); IF_WIN(node->find_metadata(), node->find_metadata(fh, true));
mysql_mutex_unlock(&fil_system.mutex); mysql_mutex_unlock(&fil_system.mutex);
binlog_cur_page_no= 0;
binlog_cur_page_offset= FIL_PAGE_DATA;
return DB_SUCCESS; return DB_SUCCESS;
} }
/*
Background thread to close old binlog tablespaces and pre-allocate new ones.
*/
static void
fsp_binlog_prealloc_thread()
{
mysql_mutex_lock(&active_binlog_mutex);
while (1)
{
uint64_t active= active_binlog_file_no.load(std::memory_order_relaxed);
uint64_t first_open= first_open_binlog_file_no.load(std::memory_order_relaxed);
/* Pre-allocate the next tablespace (if not done already). */
uint64_t last_created= last_created_binlog_file_no;
if (last_created <= active && last_created <= first_open) {
fil_space_t *new_space;
ut_ad(last_created == active);
ut_ad(last_created == first_open || first_open == ~(uint64_t)0);
/*
Note: `last_created` is initialized to ~0, so incrementing it here
makes us start from binlog file 0.
*/
++last_created;
mysql_mutex_unlock(&active_binlog_mutex);
dberr_t res2= fsp_binlog_tablespace_create(last_created, &new_space);
mysql_mutex_lock(&active_binlog_mutex);
ut_a(res2 == DB_SUCCESS /* ToDo: Error handling. */);
ut_a(new_space);
last_created_binlog_file_no= last_created;
last_created_binlog_space= new_space;
/* If we created the initial tablespace file, make it the active one. */
ut_ad(active < ~(uint64_t)0 || last_created == 0);
if (active == ~(uint64_t)0) {
active_binlog_file_no.store(last_created, std::memory_order_relaxed);
active_binlog_space= last_created_binlog_space;
}
if (first_open == ~(uint64_t)0)
first_open_binlog_file_no.store(first_open= last_created,
std::memory_order_relaxed);
pthread_cond_signal(&active_binlog_cond);
continue; /* Re-start loop after releasing/reacquiring mutex. */
}
/*
Flush out to disk and close any binlog tablespace that has been
completely written.
*/
if (first_open < active) {
ut_ad(first_open == active - 1);
mysql_mutex_unlock(&active_binlog_mutex);
fsp_binlog_tablespace_close(active - 1);
mysql_mutex_lock(&active_binlog_mutex);
first_open_binlog_file_no.store(first_open + 1, std::memory_order_relaxed);
continue; /* Re-start loop after releasing/reacquiring mutex. */
}
/* Exit thread at server shutdown. */
if (prealloc_thread_end)
break;
my_cond_wait(&active_binlog_cond, &active_binlog_mutex.m_mutex);
}
mysql_mutex_unlock(&active_binlog_mutex);
}
void fsp_binlog_write_start(uint32_t page_no, void fsp_binlog_write_start(uint32_t page_no,
const uchar *data, uint32_t len, mtr_t *mtr) const uchar *data, uint32_t len, mtr_t *mtr)
{ {
buf_block_t *block= fsp_page_create(binlog_space, page_no, mtr); buf_block_t *block= fsp_page_create(active_binlog_space, page_no, mtr);
mtr->memcpy<mtr_t::MAYBE_NOP>(*block, FIL_PAGE_DATA + block->page.frame, mtr->memcpy<mtr_t::MAYBE_NOP>(*block, FIL_PAGE_DATA + block->page.frame,
data, len); data, len);
binlog_cur_block= block; binlog_cur_block= block;
...@@ -3928,7 +4097,7 @@ void fsp_binlog_write_offset(uint32_t page_no, uint32_t offset, ...@@ -3928,7 +4097,7 @@ void fsp_binlog_write_offset(uint32_t page_no, uint32_t offset,
{ {
dberr_t err; dberr_t err;
/* ToDo: Is RW_SX_LATCH appropriate here? */ /* ToDo: Is RW_SX_LATCH appropriate here? */
buf_block_t *block= buf_page_get_gen(page_id_t{binlog_space->id, page_no}, buf_block_t *block= buf_page_get_gen(page_id_t{active_binlog_space->id, page_no},
0, RW_SX_LATCH, binlog_cur_block, 0, RW_SX_LATCH, binlog_cur_block,
BUF_GET, mtr, &err); BUF_GET, mtr, &err);
ut_a(err == DB_SUCCESS); ut_a(err == DB_SUCCESS);
...@@ -3966,14 +4135,13 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr) ...@@ -3966,14 +4135,13 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
{ {
uint32_t page_size= (uint32_t)srv_page_size; uint32_t page_size= (uint32_t)srv_page_size;
uint32_t page_size_shift= srv_page_size_shift; uint32_t page_size_shift= srv_page_size_shift;
fil_space_t *space= binlog_space; fil_space_t *space= active_binlog_space;
const uint32_t page_end= page_size - FIL_PAGE_DATA_END; const uint32_t page_end= page_size - FIL_PAGE_DATA_END;
uint32_t page_no= binlog_cur_page_no; uint32_t page_no= binlog_cur_page_no;
uint32_t page_offset= binlog_cur_page_offset; uint32_t page_offset= binlog_cur_page_offset;
/* ToDo: What is the lifetime of what's pointed to by binlog_cur_block, is there some locking needed around it or something? */ /* ToDo: What is the lifetime of what's pointed to by binlog_cur_block, is there some locking needed around it or something? */
buf_block_t *block= binlog_cur_block; buf_block_t *block= binlog_cur_block;
uint64_t file_no= binlog_file_no.load(std::memory_order_relaxed); uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed);
uint32_t idx= file_no & 1;
uint64_t pending_prev_end_offset= 0; uint64_t pending_prev_end_offset= 0;
/* /*
...@@ -3988,13 +4156,17 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr) ...@@ -3988,13 +4156,17 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
size_t gtid_remain= remain - main_size; size_t gtid_remain= remain - main_size;
while (remain > 0) { while (remain > 0) {
if (page_offset == FIL_PAGE_DATA) { if (page_offset == FIL_PAGE_DATA) {
if (UNIV_UNLIKELY(!space) || page_no >= space->size) { if (UNIV_UNLIKELY(page_no >= space->size)) {
/* /*
Flush out to disk and close the old (N-2) tablespace so that we can Signal to the pre-allocation thread that this tablespace has been
re-use its tablespace id for the new one. written full, so that it can be closed and a new one pre-allocated
in its place. Then wait for a new tablespace to be pre-allocated that
we can use.
ToDo: Start this operation in the background as soon as the (N-2) The normal case is that the next tablespace is already pre-allocated
tablespace has been filled, to avoid stalling here. and available; binlog tablespace N is active while (N+1) is being
pre-allocated. Only under extreme I/O pressure should be need to
stall here.
ToDo: Handle recovery. Idea: write the current LSN at the start of ToDo: Handle recovery. Idea: write the current LSN at the start of
the binlog tablespace when we create it. At recovery, we should open the binlog tablespace when we create it. At recovery, we should open
...@@ -4005,22 +4177,21 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr) ...@@ -4005,22 +4177,21 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
SRV_SPACE_ID_BINLOG1. SRV_SPACE_ID_BINLOG1.
*/ */
pending_prev_end_offset= page_no << page_size_shift; pending_prev_end_offset= page_no << page_size_shift;
if (file_no >= 2) mysql_mutex_lock(&active_binlog_mutex);
fsp_binlog_tablespace_close(file_no - 1); /* ToDo: Make this wait killable?. */
/* ToDo2: Handle not stalling infinitely if the new tablespace cannot be created due to eg. I/O error. Or should we in this case loop and repeatedly retry the create? */
while (last_created_binlog_file_no <= file_no) {
my_cond_wait(&active_binlog_cond, &active_binlog_mutex.m_mutex);
}
// ToDo: assert that a single write doesn't span more than two binlog files.
++file_no; ++file_no;
idx= idx ^ 1; binlog_cur_written_offset[file_no & 1].store(0, std::memory_order_relaxed);
/* binlog_cur_end_offset[file_no & 1].store(0, std::memory_order_relaxed);
Create a new binlog tablespace. active_binlog_file_no.store(file_no, std::memory_order_release);
ToDo: pre-create the next tablespace in a background thread, avoiding active_binlog_space= space= last_created_binlog_space;
a large stall here in the common case where pre-allocation is fast pthread_cond_signal(&active_binlog_cond);
enough. mysql_mutex_unlock(&active_binlog_mutex);
*/
binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed);
binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed);
binlog_file_no.store(file_no, std::memory_order_release);
dberr_t res2= fsp_binlog_tablespace_create(file_no);
ut_a(res2 == DB_SUCCESS /* ToDo: Error handling. */);
space= binlog_space;
binlog_cur_page_no= page_no= 0; binlog_cur_page_no= page_no= 0;
} }
block= fsp_page_create(space, page_no, mtr); block= fsp_page_create(space, page_no, mtr);
...@@ -4028,7 +4199,7 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr) ...@@ -4028,7 +4199,7 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
dberr_t err; dberr_t err;
/* ToDo: Is RW_SX_LATCH appropriate here? */ /* ToDo: Is RW_SX_LATCH appropriate here? */
block= buf_page_get_gen(page_id_t{space->id, page_no}, block= buf_page_get_gen(page_id_t{space->id, page_no},
0, RW_SX_LATCH, binlog_cur_block, 0, RW_SX_LATCH, block,
BUF_GET, mtr, &err); BUF_GET, mtr, &err);
ut_a(err == DB_SUCCESS); ut_a(err == DB_SUCCESS);
} }
...@@ -4087,9 +4258,9 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr) ...@@ -4087,9 +4258,9 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
binlog_cur_page_no= page_no; binlog_cur_page_no= page_no;
binlog_cur_page_offset= page_offset; binlog_cur_page_offset= page_offset;
if (UNIV_UNLIKELY(pending_prev_end_offset)) if (UNIV_UNLIKELY(pending_prev_end_offset))
binlog_cur_end_offset[idx ^ 1].store(pending_prev_end_offset, binlog_cur_end_offset[(file_no-1) & 1].store(pending_prev_end_offset,
std::memory_order_relaxed); std::memory_order_relaxed);
binlog_cur_end_offset[idx].store((page_no << page_size_shift) + page_offset, binlog_cur_end_offset[file_no & 1].store((page_no << page_size_shift) + page_offset,
std::memory_order_relaxed); std::memory_order_relaxed);
} }
...@@ -4114,8 +4285,8 @@ void fsp_binlog_test(const uchar *data, uint32_t len) ...@@ -4114,8 +4285,8 @@ void fsp_binlog_test(const uchar *data, uint32_t len)
{ {
mtr_t mtr; mtr_t mtr;
mtr.start(); mtr.start();
if (!binlog_space) if (!active_binlog_space)
fsp_binlog_tablespace_create(0); fsp_binlog_tablespace_create(0, &active_binlog_space);
fsp_binlog_append(data, len, &mtr); fsp_binlog_append(data, len, &mtr);
mtr.commit(); mtr.commit();
} }
...@@ -4148,8 +4319,7 @@ ha_innodb_binlog_reader::ha_innodb_binlog_reader() ...@@ -4148,8 +4319,7 @@ ha_innodb_binlog_reader::ha_innodb_binlog_reader()
{ {
page_buf= (uchar *)my_malloc(PSI_NOT_INSTRUMENTED, srv_page_size, MYF(0)); /* ToDo: InnoDB alloc function? */ page_buf= (uchar *)my_malloc(PSI_NOT_INSTRUMENTED, srv_page_size, MYF(0)); /* ToDo: InnoDB alloc function? */
// ToDo: Need some mechanism to find where to start reading. This is just "start from 0" for early testing. // ToDo: Need some mechanism to find where to start reading. This is just "start from 0" for early testing.
// And ToDo 2: we should be starting from binlog 000000, not 000001 as we do now. cur_file_no= 0;
cur_file_no= 1;
} }
...@@ -4185,13 +4355,13 @@ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len) ...@@ -4185,13 +4355,13 @@ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len)
int res; int res;
uint64_t file_no= cur_file_no; uint64_t file_no= cur_file_no;
uint64_t offset= cur_file_offset; uint64_t offset= cur_file_offset;
uint64_t active_file_no= binlog_file_no.load(std::memory_order_acquire); uint64_t active_file_no= active_binlog_file_no.load(std::memory_order_acquire);
ut_ad(active_file_no >= file_no); if (first_open_binlog_file_no.load(std::memory_order_relaxed) > file_no + /* Temporary hack to work-around the next line ToDo: comment */ (offset >= cur_file_length)) {
if (active_file_no > file_no + 1) {
// ToDo: I think there is a bug here, if we're at the end of active_file_no-2, we will be reading directly from active_file_no-1 without checking properly if buffer pool is needed instead. */ // ToDo: I think there is a bug here, if we're at the end of active_file_no-2, we will be reading directly from active_file_no-1 without checking properly if buffer pool is needed instead. */
return read_from_file(~(uint64_t)0, buf, len); return read_from_file(~(uint64_t)0, buf, len);
} }
ut_ad(active_file_no >= file_no);
uint32_t idx= file_no & 1; uint32_t idx= file_no & 1;
uint64_t write_offset= uint64_t write_offset=
binlog_cur_written_offset[idx].load(std::memory_order_relaxed); binlog_cur_written_offset[idx].load(std::memory_order_relaxed);
...@@ -4253,7 +4423,7 @@ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len) ...@@ -4253,7 +4423,7 @@ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len)
has, then the page is invalid (it is from a newer binlog file tablespace has, then the page is invalid (it is from a newer binlog file tablespace
file_no + 2), and we should just read from the real file directly. file_no + 2), and we should just read from the real file directly.
*/ */
active_file_no= binlog_file_no.load(std::memory_order_acquire); active_file_no= active_binlog_file_no.load(std::memory_order_acquire);
if (active_file_no > file_no + 1) if (active_file_no > file_no + 1)
res= read_from_file(~(uint64_t)0, buf, len); res= read_from_file(~(uint64_t)0, buf, len);
else else
......
...@@ -527,6 +527,7 @@ mysql_pfs_key_t lock_wait_mutex_key; ...@@ -527,6 +527,7 @@ mysql_pfs_key_t lock_wait_mutex_key;
mysql_pfs_key_t trx_sys_mutex_key; mysql_pfs_key_t trx_sys_mutex_key;
mysql_pfs_key_t srv_threads_mutex_key; mysql_pfs_key_t srv_threads_mutex_key;
mysql_pfs_key_t tpool_cache_mutex_key; mysql_pfs_key_t tpool_cache_mutex_key;
mysql_pfs_key_t fsp_active_binlog_mutex_key;
/* all_innodb_mutexes array contains mutexes that are /* all_innodb_mutexes array contains mutexes that are
performance schema instrumented if "UNIV_PFS_MUTEX" performance schema instrumented if "UNIV_PFS_MUTEX"
...@@ -4093,6 +4094,7 @@ static int innodb_init(void* p) ...@@ -4093,6 +4094,7 @@ static int innodb_init(void* p)
= innodb_prepare_commit_versioned; = innodb_prepare_commit_versioned;
innobase_hton->update_optimizer_costs= innobase_update_optimizer_costs; innobase_hton->update_optimizer_costs= innobase_update_optimizer_costs;
innobase_hton->binlog_init= innodb_binlog_init;
innobase_hton->binlog_write_direct= innobase_binlog_write_direct; innobase_hton->binlog_write_direct= innobase_binlog_write_direct;
innobase_hton->get_binlog_reader= innodb_get_binlog_reader; innobase_hton->get_binlog_reader= innodb_get_binlog_reader;
......
...@@ -584,6 +584,8 @@ extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr); ...@@ -584,6 +584,8 @@ extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr);
class handler_binlog_reader; class handler_binlog_reader;
extern bool innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size); extern bool innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size);
extern handler_binlog_reader *innodb_get_binlog_reader(); extern handler_binlog_reader *innodb_get_binlog_reader();
extern void fsp_binlog_init();
extern bool innodb_binlog_init(size_t binlog_size);
extern void fsp_binlog_close(); extern void fsp_binlog_close();
#ifndef UNIV_DEBUG #ifndef UNIV_DEBUG
......
...@@ -482,6 +482,7 @@ extern mysql_pfs_key_t trx_pool_mutex_key; ...@@ -482,6 +482,7 @@ extern mysql_pfs_key_t trx_pool_mutex_key;
extern mysql_pfs_key_t trx_pool_manager_mutex_key; extern mysql_pfs_key_t trx_pool_manager_mutex_key;
extern mysql_pfs_key_t lock_wait_mutex_key; extern mysql_pfs_key_t lock_wait_mutex_key;
extern mysql_pfs_key_t srv_threads_mutex_key; extern mysql_pfs_key_t srv_threads_mutex_key;
extern mysql_pfs_key_t fsp_active_binlog_mutex_key;
# endif /* UNIV_PFS_MUTEX */ # endif /* UNIV_PFS_MUTEX */
# ifdef UNIV_PFS_RWLOCK # ifdef UNIV_PFS_RWLOCK
......
...@@ -1904,6 +1904,8 @@ dberr_t srv_start(bool create_new_db) ...@@ -1904,6 +1904,8 @@ dberr_t srv_start(bool create_new_db)
return(srv_init_abort(err)); return(srv_init_abort(err));
} }
fsp_binlog_init();
if (!srv_read_only_mode if (!srv_read_only_mode
&& srv_operation <= SRV_OPERATION_EXPORT_RESTORED) { && srv_operation <= SRV_OPERATION_EXPORT_RESTORED) {
/* Initialize the innodb_temporary tablespace and keep /* Initialize the innodb_temporary tablespace and keep
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment