Commit abb87914 authored by unknown's avatar unknown

Group commit for maria engine.

mysql-test/suite/maria/r/group_commit.result:
  Test of gruoup commit.
mysql-test/suite/maria/r/maria3.result:
  The new variables added.
mysql-test/suite/maria/t/group_commit.test:
  Test of group commit.
randgen:
  Randon query generator tests.
randgen/conf:
  Random query generator tests.
randgen/conf/maria_group_commit.yy:
  Random query generator test for group commit.
storage/maria/ha_maria.cc:
  New variables and support procedures for group commit added.
storage/maria/ma_init.c:
  Correct shutdown of group commit service thread and group commit.
storage/maria/ma_loghandler.c:
  Group commit added.
  Initialization of variables for embeded server edded.
storage/maria/ma_loghandler.h:
  Group commit types and routines.
parent b6d030ed
drop table if exists t1;
create table t1 (a int);
SET GLOBAL maria_group_commit="NONE";
SET GLOBAL maria_group_commit_interval= 0;
SET GLOBAL maria_group_commit="NONE";
SET GLOBAL maria_group_commit_interval= 100;
SET GLOBAL maria_group_commit="HARD";
SET GLOBAL maria_group_commit_interval= 0;
SET GLOBAL maria_group_commit="HARD";
SET GLOBAL maria_group_commit_interval= 100;
SET GLOBAL maria_group_commit="SOFT";
SET GLOBAL maria_group_commit_interval= 0;
SET GLOBAL maria_group_commit="SOFT";
SET GLOBAL maria_group_commit_interval= 100;
SET GLOBAL maria_group_commit="NONE";
SET GLOBAL maria_group_commit_interval= 0;
drop table t1;
......@@ -306,6 +306,8 @@ Variable_name Value
maria_block_size 8192
maria_checkpoint_interval 30
maria_force_start_after_recovery_failures 0
maria_group_commit none
maria_group_commit_interval 0
maria_log_file_size 4294959104
maria_log_purge_type immediate
maria_max_sort_file_size 9223372036853727232
......@@ -328,6 +330,7 @@ Maria_pagecache_read_requests #
Maria_pagecache_reads #
Maria_pagecache_write_requests #
Maria_pagecache_writes #
Maria_transaction_log_syncs #
create table t1 (b char(0));
insert into t1 values(NULL),("");
select length(b) from t1;
......
# Test different ways of syncing (mostly syntax)
--disable_warnings
drop table if exists t1;
--enable_warnings
create table t1 (a int);
SET GLOBAL maria_group_commit="NONE";
SET GLOBAL maria_group_commit_interval= 0;
--disable_query_log
let $num = 5000;
while ($num)
{
insert into t1 values (1);
dec $num;
}
--enable_query_log
SET GLOBAL maria_group_commit="NONE";
SET GLOBAL maria_group_commit_interval= 100;
--disable_query_log
let $num = 5000;
while ($num)
{
insert into t1 values (1);
dec $num;
}
--enable_query_log
SET GLOBAL maria_group_commit="HARD";
SET GLOBAL maria_group_commit_interval= 0;
--disable_query_log
let $num = 5000;
while ($num)
{
insert into t1 values (1);
dec $num;
}
--enable_query_log
SET GLOBAL maria_group_commit="HARD";
SET GLOBAL maria_group_commit_interval= 100;
--disable_query_log
let $num = 5000;
while ($num)
{
insert into t1 values (1);
dec $num;
}
--enable_query_log
SET GLOBAL maria_group_commit="SOFT";
SET GLOBAL maria_group_commit_interval= 0;
--disable_query_log
let $num = 5000;
while ($num)
{
insert into t1 values (1);
dec $num;
}
--enable_query_log
SET GLOBAL maria_group_commit="SOFT";
SET GLOBAL maria_group_commit_interval= 100;
--disable_query_log
let $num = 5000;
while ($num)
{
insert into t1 values (1);
dec $num;
}
--enable_query_log
SET GLOBAL maria_group_commit="NONE";
SET GLOBAL maria_group_commit_interval= 0;
drop table t1;
# test of group commit switching
query:
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
select | insert | update| delete |
change_group_commit | change_interval;
select:
SELECT select_item FROM join where order_by limit;
select_item:
* | X . _field ;
join:
_table AS X |
_table AS X LEFT JOIN _table AS Y ON ( X . _field = Y . _field ) ;
where:
|
WHERE X . _field < value |
WHERE X . _field > value |
WHERE X . _field = value ;
where_delete:
|
WHERE _field < value |
WHERE _field > value |
WHERE _field = value ;
order_by:
| ORDER BY X . _field ;
limit:
| LIMIT _digit ;
insert:
INSERT INTO _table ( _field , _field ) VALUES ( value , value ) ;
update:
UPDATE _table AS X SET _field = value where order_by limit ;
delete:
DELETE FROM _table where_delete LIMIT _digit ;
value:
' _letter ' | _digit | _date | _datetime | _time | _english ;
change_group_commit:
SET GLOBAL MARIA_GROUP_COMMIT=none_soft_hard;
none_soft_hard:
NONE | SOFT | HARD;
change_interval:
set_interval | set_interval | set_interval | set_interval |
drop_interval;
set_interval:
SET GLOBAL MARIA_GROUP_COMMIT_INTERVAL=_tinyint_unsigned;
drop_interval:
SET GLOBAL MARIA_GROUP_COMMIT_INTERVAL=0;
......@@ -102,22 +102,40 @@ TYPELIB maria_translog_purge_type_typelib=
array_elements(maria_translog_purge_type_names) - 1, "",
maria_translog_purge_type_names, NULL
};
/* transactional log directory sync */
const char *maria_sync_log_dir_names[]=
{
"NEVER", "NEWFILE", "ALWAYS", NullS
};
TYPELIB maria_sync_log_dir_typelib=
{
array_elements(maria_sync_log_dir_names) - 1, "",
maria_sync_log_dir_names, NULL
};
/* transactional log group commit */
const char *maria_group_commit_names[]=
{
"none", "hard", "soft", NullS
};
TYPELIB maria_group_commit_typelib=
{
array_elements(maria_group_commit_names) - 1, "",
maria_group_commit_names, NULL
};
/** Interval between background checkpoints in seconds */
static ulong checkpoint_interval;
static void update_checkpoint_interval(MYSQL_THD thd,
struct st_mysql_sys_var *var,
void *var_ptr, const void *save);
static void update_maria_group_commit(MYSQL_THD thd,
struct st_mysql_sys_var *var,
void *var_ptr, const void *save);
static void update_maria_group_commit_interval(MYSQL_THD thd,
struct st_mysql_sys_var *var,
void *var_ptr, const void *save);
/** After that many consecutive recovery failures, remove logs */
static ulong force_start_after_recovery_failures;
static void update_log_file_size(MYSQL_THD thd,
......@@ -164,6 +182,24 @@ static MYSQL_SYSVAR_ULONG(log_file_size, log_file_size,
NULL, update_log_file_size, TRANSLOG_FILE_SIZE,
TRANSLOG_MIN_FILE_SIZE, 0xffffffffL, TRANSLOG_PAGE_SIZE);
static MYSQL_SYSVAR_ENUM(group_commit, maria_group_commit,
PLUGIN_VAR_RQCMDARG,
"Specifies maria group commit mode. "
"Possible values are \"none\" (no group commit), "
"\"hard\" (with waiting to actual commit), "
"\"soft\" (no wait for commit (DANGEROUS!!!))",
NULL, update_maria_group_commit,
TRANSLOG_GCOMMIT_NONE, &maria_group_commit_typelib);
static MYSQL_SYSVAR_ULONG(group_commit_interval, maria_group_commit_interval,
PLUGIN_VAR_RQCMDARG,
"Interval between commite in microseconds (1/1000000c)."
" 0 stands for no waiting"
" for other threads to come and do a commit in \"hard\" mode and no"
" sync()/commit at all in \"soft\" mode. Option has only an effect"
" if maria_group_commit is used",
NULL, update_maria_group_commit_interval, 0, 0, UINT_MAX, 1);
static MYSQL_SYSVAR_ENUM(log_purge_type, log_purge_type,
PLUGIN_VAR_RQCMDARG,
"Specifies how maria transactional log will be purged. "
......@@ -3278,6 +3314,8 @@ static struct st_mysql_sys_var* system_variables[]= {
MYSQL_SYSVAR(block_size),
MYSQL_SYSVAR(checkpoint_interval),
MYSQL_SYSVAR(force_start_after_recovery_failures),
MYSQL_SYSVAR(group_commit),
MYSQL_SYSVAR(group_commit_interval),
MYSQL_SYSVAR(page_checksum),
MYSQL_SYSVAR(log_dir_path),
MYSQL_SYSVAR(log_file_size),
......@@ -3308,6 +3346,92 @@ static void update_checkpoint_interval(MYSQL_THD thd,
ma_checkpoint_init(*(ulong *)var_ptr= (ulong)(*(long *)save));
}
/**
@brief Updates group commit mode
*/
static void update_maria_group_commit(MYSQL_THD thd,
struct st_mysql_sys_var *var,
void *var_ptr, const void *save)
{
ulong value= (ulong)*((long *)var_ptr);
DBUG_ENTER("update_maria_group_commit");
DBUG_PRINT("enter", ("old value: %lu new value %lu rate %lu",
value, (ulong)(*(long *)save),
maria_group_commit_interval));
/* old value */
switch (value) {
case TRANSLOG_GCOMMIT_NONE:
break;
case TRANSLOG_GCOMMIT_HARD:
translog_hard_group_commit(FALSE);
break;
case TRANSLOG_GCOMMIT_SOFT:
translog_soft_sync(FALSE);
if (maria_group_commit_interval)
translog_soft_sync_end();
break;
default:
DBUG_ASSERT(0); /* impossible */
}
value= *(ulong *)var_ptr= (ulong)(*(long *)save);
translog_sync();
/* new value */
switch (value) {
case TRANSLOG_GCOMMIT_NONE:
break;
case TRANSLOG_GCOMMIT_HARD:
translog_hard_group_commit(TRUE);
break;
case TRANSLOG_GCOMMIT_SOFT:
translog_soft_sync(TRUE);
/* variable change made under global lock so we can just read it */
if (maria_group_commit_interval)
translog_soft_sync_start();
break;
default:
DBUG_ASSERT(0); /* impossible */
}
DBUG_VOID_RETURN;
}
/**
@brief Updates group commit interval
*/
static void update_maria_group_commit_interval(MYSQL_THD thd,
struct st_mysql_sys_var *var,
void *var_ptr, const void *save)
{
ulong new_value= (ulong)*((long *)save);
ulong *value_ptr= (ulong*) var_ptr;
DBUG_ENTER("update_maria_group_commit_interval");
DBUG_PRINT("enter", ("old value: %lu new value %lu group commit %lu",
*value_ptr, new_value, maria_group_commit));
/* variable change made under global lock so we can just read it */
switch (maria_group_commit) {
case TRANSLOG_GCOMMIT_NONE:
*value_ptr= new_value;
translog_set_group_commit_interval(new_value);
break;
case TRANSLOG_GCOMMIT_HARD:
*value_ptr= new_value;
translog_set_group_commit_interval(new_value);
break;
case TRANSLOG_GCOMMIT_SOFT:
if (*value_ptr)
translog_soft_sync_end();
translog_set_group_commit_interval(new_value);
if ((*value_ptr= new_value))
translog_soft_sync_start();
break;
default:
DBUG_ASSERT(0); /* impossible */
}
DBUG_VOID_RETURN;
}
/**
@brief Updates the transaction log file limit.
*/
......@@ -3330,6 +3454,7 @@ static SHOW_VAR status_variables[]= {
{"Maria_pagecache_reads", (char*) &maria_pagecache_var.global_cache_read, SHOW_LONGLONG},
{"Maria_pagecache_write_requests", (char*) &maria_pagecache_var.global_cache_w_requests, SHOW_LONGLONG},
{"Maria_pagecache_writes", (char*) &maria_pagecache_var.global_cache_write, SHOW_LONGLONG},
{"Maria_transaction_log_syncs", (char*) &translog_syncs, SHOW_LONGLONG},
{NullS, NullS, SHOW_LONG}
};
......
......@@ -82,6 +82,11 @@ void maria_end(void)
maria_inited= maria_multi_threaded= FALSE;
ft_free_stopwords();
ma_checkpoint_end();
if (translog_status == TRANSLOG_OK)
{
translog_soft_sync_end();
translog_sync();
}
if ((trid= trnman_get_max_trid()) > max_trid_in_control_file)
{
/*
......
......@@ -18,6 +18,7 @@
#include "ma_blockrec.h" /* for some constants and in-write hooks */
#include "ma_key_recover.h" /* For some in-write hooks */
#include "ma_checkpoint.h"
#include "ma_servicethread.h"
/*
On Windows, neither my_open() nor my_sync() work for directories.
......@@ -47,6 +48,15 @@
#include <m_ctype.h>
#endif
/** @brief protects checkpoint_in_progress */
static pthread_mutex_t LOCK_soft_sync;
/** @brief for killing the background checkpoint thread */
static pthread_cond_t COND_soft_sync;
/** @brief control structure for checkpoint background thread */
static MA_SERVICE_THREAD_CONTROL soft_sync_control=
{THREAD_DEAD, FALSE, &LOCK_soft_sync, &COND_soft_sync};
/* transaction log file descriptor */
typedef struct st_translog_file
{
......@@ -123,11 +133,25 @@ struct st_translog_buffer
TRANSLOG_ADDRESS next_buffer_offset;
/* Previous buffer offset to detect it flush finish */
TRANSLOG_ADDRESS prev_buffer_offset;
/*
If the buffer was forced to close it save value of its horizon
otherwise LSN_IMPOSSIBLE
*/
TRANSLOG_ADDRESS pre_force_close_horizon;
/*
How much is written (or will be written when copy_to_buffer_in_progress
become 0) to this buffer
*/
translog_size_t size;
/*
When moving from one log buffer to another, we write the last of the
previous buffer to file and then move to start using the new log
buffer. In the case of a part filed last page, this page is not moved
to the start of the new buffer but instead we set the 'skip_data'
variable to tell us how much data at the beginning of the buffer is not
relevant.
*/
uint skipped_data;
/* File handler for this buffer */
TRANSLOG_FILE *file;
/* Threads which are waiting for buffer filling/freeing */
......@@ -304,6 +328,7 @@ struct st_translog_descriptor
*/
pthread_mutex_t log_flush_lock;
pthread_cond_t log_flush_cond;
pthread_cond_t new_goal_cond;
/* Protects changing of headers of finished files (max_lsn) */
pthread_mutex_t file_header_lock;
......@@ -344,13 +369,39 @@ static struct st_translog_descriptor log_descriptor;
ulong log_purge_type= TRANSLOG_PURGE_IMMIDIATE;
ulong log_file_size= TRANSLOG_FILE_SIZE;
/* sync() of log files directory mode */
ulong sync_log_dir= TRANSLOG_SYNC_DIR_NEWFILE;
ulong maria_group_commit= TRANSLOG_GCOMMIT_NONE;
ulong maria_group_commit_interval= 0;
/* Marker for end of log */
static uchar end_of_log= 0;
#define END_OF_LOG &end_of_log
/**
Switch for "soft" sync (no real sync() but periodical sync by service
thread)
*/
static volatile my_bool soft_sync= FALSE;
/**
Switch for "hard" group commit mode
*/
static volatile my_bool hard_group_commit= FALSE;
/**
File numbers interval which have to be sync()
*/
static uint32 soft_sync_min= 0;
static uint32 soft_sync_max= 0;
static uint32 soft_need_sync= 1;
/**
stores interval in microseconds
*/
static uint32 group_commit_wait= 0;
enum enum_translog_status translog_status= TRANSLOG_UNINITED;
ulonglong translog_syncs= 0; /* Number of sync()s */
/* time of last flush */
static ulonglong flush_start= 0;
/* chunk types */
#define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */
......@@ -980,12 +1031,17 @@ static TRANSLOG_FILE *get_logfile_by_number(uint32 file_no)
static TRANSLOG_FILE *get_current_logfile()
{
TRANSLOG_FILE *file;
DBUG_ENTER("get_current_logfile");
rw_rdlock(&log_descriptor.open_files_lock);
DBUG_PRINT("info", ("max_file: %lu min_file: %lu open_files: %lu",
(ulong) log_descriptor.max_file,
(ulong) log_descriptor.min_file,
(ulong) log_descriptor.open_files.elements));
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **);
rw_unlock(&log_descriptor.open_files_lock);
return (file);
DBUG_RETURN(file);
}
uchar NEAR maria_trans_file_magic[]=
......@@ -1069,6 +1125,7 @@ static my_bool translog_write_file_header()
static my_bool translog_max_lsn_to_header(File file, LSN lsn)
{
uchar lsn_buff[LSN_STORE_SIZE];
my_bool rc;
DBUG_ENTER("translog_max_lsn_to_header");
DBUG_PRINT("enter", ("File descriptor: %ld "
"lsn: (%lu,0x%lx)",
......@@ -1077,11 +1134,17 @@ static my_bool translog_max_lsn_to_header(File file, LSN lsn)
lsn_store(lsn_buff, lsn);
DBUG_RETURN(my_pwrite(file, lsn_buff,
rc= (my_pwrite(file, lsn_buff,
LSN_STORE_SIZE,
(LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE),
log_write_flags) != 0 ||
my_sync(file, MYF(MY_WME)) != 0);
/*
We should not increase counter in case of error above, but it is so
unlikely that we can ignore this case
*/
translog_syncs++;
DBUG_RETURN(rc);
}
......@@ -1423,7 +1486,9 @@ LSN translog_get_file_max_lsn_stored(uint32 file)
static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num)
{
DBUG_ENTER("translog_buffer_init");
buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
buffer->pre_force_close_horizon=
buffer->prev_last_lsn= buffer->last_lsn=
LSN_IMPOSSIBLE;
DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx",
(ulong) buffer));
......@@ -1435,6 +1500,7 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num)
memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER);
/* Buffer size */
buffer->size= 0;
buffer->skipped_data= 0;
/* cond of thread which is waiting for buffer filling */
if (pthread_cond_init(&buffer->waiting_filling_buffer, 0))
DBUG_RETURN(1);
......@@ -1489,7 +1555,10 @@ static my_bool translog_close_log_file(TRANSLOG_FILE *file)
TODO: sync only we have changed the log
*/
if (!file->is_sync)
{
rc= my_sync(file->handler.file, MYF(MY_WME));
translog_syncs++;
}
rc|= my_close(file->handler.file, MYF(MY_WME));
my_free(file, MYF(0));
return test(rc);
......@@ -2044,6 +2113,7 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
(ulong) LSN_OFFSET(log_descriptor.horizon),
(ulong) LSN_OFFSET(log_descriptor.horizon)));
DBUG_ASSERT(buffer_no == buffer->buffer_no);
buffer->pre_force_close_horizon=
buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx",
(ulong) buffer));
......@@ -2052,6 +2122,7 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
buffer->file= get_current_logfile();
buffer->overlay= 0;
buffer->size= 0;
buffer->skipped_data= 0;
translog_cursor_init(cursor, buffer, buffer_no);
DBUG_PRINT("info", ("file: #%ld (%d) init cursor #%u: 0x%lx "
"chaser: %d Size: %lu (%lu)",
......@@ -2523,6 +2594,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
TRANSLOG_ADDRESS offset= buffer->offset;
TRANSLOG_FILE *file= buffer->file;
uint8 ver= buffer->ver;
uint skipped_data;
DBUG_ENTER("translog_buffer_flush");
DBUG_PRINT("enter",
("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
......@@ -2557,6 +2629,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
disk
*/
file= buffer->file;
skipped_data= buffer->skipped_data;
DBUG_ASSERT(skipped_data < TRANSLOG_PAGE_SIZE);
for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE;
i < buffer->size;
i+= TRANSLOG_PAGE_SIZE, pg++)
......@@ -2573,13 +2647,16 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN)
DBUG_RETURN(1);
if (pagecache_inject(log_descriptor.pagecache,
if (pagecache_write_part(log_descriptor.pagecache,
&file->handler, pg, 3,
buffer->buffer + i,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED, 0,
LSN_IMPOSSIBLE))
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DONE, 0,
LSN_IMPOSSIBLE,
skipped_data,
TRANSLOG_PAGE_SIZE - skipped_data))
{
DBUG_PRINT("error",
("Can't write page (%lu,0x%lx) to pagecache, error: %d",
......@@ -2589,10 +2666,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
translog_stop_writing();
DBUG_RETURN(1);
}
skipped_data= 0;
}
file->is_sync= 0;
if (my_pwrite(file->handler.file, buffer->buffer,
buffer->size, LSN_OFFSET(buffer->offset),
if (my_pwrite(file->handler.file, buffer->buffer + buffer->skipped_data,
buffer->size - buffer->skipped_data,
LSN_OFFSET(buffer->offset) + buffer->skipped_data,
log_write_flags))
{
DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu "
......@@ -2985,6 +3064,7 @@ restart:
uchar *from, *table= NULL;
int is_last_unfinished_page;
uint last_protected_sector= 0;
uint skipped_data= curr_buffer->skipped_data;
TRANSLOG_FILE file_copy;
uint8 ver= curr_buffer->ver;
translog_wait_for_writers(curr_buffer);
......@@ -2997,7 +3077,38 @@ restart:
}
DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset));
from= curr_buffer->buffer + (addr - curr_buffer->offset);
memcpy(buffer, from, TRANSLOG_PAGE_SIZE);
if (skipped_data && addr == curr_buffer->offset)
{
/*
We read page part of which is not present in buffer,
so we should read absent part from file (page cache actually)
*/
file= get_logfile_by_number(file_no);
DBUG_ASSERT(file != NULL);
/*
it's ok to not lock the page because:
- The log handler has it's own page cache.
- There is only one thread that can access the log
cache at a time
*/
if (!(buffer= pagecache_read(log_descriptor.pagecache,
&file->handler,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, buffer,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
NULL)))
DBUG_RETURN(NULL);
}
else
skipped_data= 0; /* Read after skipped in buffer data */
/*
Now we have correct data in buffer up to 'skipped_data'. The
following memcpy() will move the data from the internal buffer
that was not yet on disk.
*/
memcpy(buffer + skipped_data, from + skipped_data,
TRANSLOG_PAGE_SIZE - skipped_data);
/*
We can use copy then in translog_page_validator() because it
do not put it permanently somewhere.
......@@ -3291,6 +3402,7 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr)
uint32 next_page_offset, page_rest;
uint32 i;
File fd;
int rc;
TRANSLOG_VALIDATOR_DATA data;
char path[FN_REFLEN];
uchar page_buff[TRANSLOG_PAGE_SIZE];
......@@ -3316,14 +3428,19 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr)
TRANSLOG_PAGE_SIZE);
page_rest= next_page_offset - LSN_OFFSET(addr);
memset(page_buff, TRANSLOG_FILLER, page_rest);
if ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 ||
rc= ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 ||
((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) ||
(page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr),
log_write_flags)) ||
my_sync(fd, MYF(MY_WME))) |
my_close(fd, MYF(MY_WME))) ||
(sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD))))
my_sync(fd, MYF(MY_WME)))));
translog_syncs++;
rc|= (fd > 0 && my_close(fd, MYF(MY_WME)));
if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS)
{
rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
translog_syncs++;
}
if (rc)
DBUG_RETURN(1);
/* fix the horizon */
......@@ -3483,7 +3600,10 @@ my_bool translog_init_with_table(const char *directory,
my_bool version_changed= 0;
DBUG_ENTER("translog_init_with_table");
translog_syncs= 0;
flush_start= 0;
id_to_share= NULL;
log_descriptor.directory_fd= -1;
log_descriptor.is_everything_flushed= 1;
log_descriptor.flush_in_progress= 0;
......@@ -3511,6 +3631,7 @@ my_bool translog_init_with_table(const char *directory,
pthread_mutex_init(&log_descriptor.dirty_buffer_mask_lock,
MY_MUTEX_INIT_FAST) ||
pthread_cond_init(&log_descriptor.log_flush_cond, 0) ||
pthread_cond_init(&log_descriptor.new_goal_cond, 0) ||
my_rwlock_init(&log_descriptor.open_files_lock,
NULL) ||
my_init_dynamic_array(&log_descriptor.open_files,
......@@ -3912,7 +4033,6 @@ my_bool translog_init_with_table(const char *directory,
log_descriptor.flushed= log_descriptor.horizon;
log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset;
log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */
log_descriptor.previous_flush_horizon= log_descriptor.horizon;
/*
Now 'flushed' is set to 'horizon' value, but 'horizon' is (potentially)
address of the next LSN and we want indicate that all LSNs that are
......@@ -3995,6 +4115,10 @@ my_bool translog_init_with_table(const char *directory,
It is beginning of the log => there is no LSNs in the log =>
There is no harm in leaving it "as-is".
*/
log_descriptor.previous_flush_horizon= log_descriptor.horizon;
DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)",
LSN_IN_PARTS(log_descriptor.
previous_flush_horizon)));
DBUG_RETURN(0);
}
file_no--;
......@@ -4070,6 +4194,9 @@ my_bool translog_init_with_table(const char *directory,
translog_free_record_header(&rec);
}
}
log_descriptor.previous_flush_horizon= log_descriptor.horizon;
DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)",
LSN_IN_PARTS(log_descriptor.previous_flush_horizon)));
DBUG_RETURN(0);
err:
ma_message_no_user(0, "log initialization failed");
......@@ -4157,6 +4284,7 @@ void translog_destroy()
pthread_mutex_destroy(&log_descriptor.log_flush_lock);
pthread_mutex_destroy(&log_descriptor.dirty_buffer_mask_lock);
pthread_cond_destroy(&log_descriptor.log_flush_cond);
pthread_cond_destroy(&log_descriptor.new_goal_cond);
rwlock_destroy(&log_descriptor.open_files_lock);
delete_dynamic(&log_descriptor.open_files);
delete_dynamic(&log_descriptor.unfinished_files);
......@@ -6885,11 +7013,11 @@ int translog_read_record_header_from_buffer(uchar *page,
{
translog_size_t res;
DBUG_ENTER("translog_read_record_header_from_buffer");
DBUG_PRINT("info", ("page byte: 0x%x offset: %u",
(uint) page[page_offset], (uint) page_offset));
DBUG_ASSERT(translog_is_LSN_chunk(page[page_offset]));
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
DBUG_PRINT("info", ("page byte: 0x%x offset: %u",
(uint) page[page_offset], (uint) page_offset));
buff->type= (page[page_offset] & TRANSLOG_REC_TYPE);
buff->short_trid= uint2korr(page + page_offset + 1);
DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)",
......@@ -7356,27 +7484,27 @@ static void translog_force_current_buffer_to_finish()
"Buffer addr: (%lu,0x%lx) "
"Page addr: (%lu,0x%lx) "
"size: %lu (%lu) Pg: %u left: %u in progress %u",
(uint) log_descriptor.bc.buffer_no,
(ulong) log_descriptor.bc.buffer,
LSN_IN_PARTS(log_descriptor.bc.buffer->offset),
(uint) old_buffer_no,
(ulong) old_buffer,
LSN_IN_PARTS(old_buffer->offset),
(ulong) LSN_FILE_NO(log_descriptor.horizon),
(ulong) (LSN_OFFSET(log_descriptor.horizon) -
log_descriptor.bc.current_page_fill),
(ulong) log_descriptor.bc.buffer->size,
(ulong) old_buffer->size,
(ulong) (log_descriptor.bc.ptr -log_descriptor.bc.
buffer->buffer),
(uint) log_descriptor.bc.current_page_fill,
(uint) left,
(uint) log_descriptor.bc.buffer->
(uint) old_buffer->
copy_to_buffer_in_progress));
translog_lock_assert_owner();
LINT_INIT(current_page_fill);
new_buff_beginning= log_descriptor.bc.buffer->offset;
new_buff_beginning+= log_descriptor.bc.buffer->size; /* increase offset */
new_buff_beginning= old_buffer->offset;
new_buff_beginning+= old_buffer->size; /* increase offset */
DBUG_ASSERT(log_descriptor.bc.ptr !=NULL);
DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) ==
LSN_FILE_NO(log_descriptor.bc.buffer->offset));
LSN_FILE_NO(old_buffer->offset));
translog_check_cursor(&log_descriptor.bc);
DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE);
if (left)
......@@ -7387,18 +7515,20 @@ static void translog_force_current_buffer_to_finish()
*/
DBUG_PRINT("info", ("left: %u", (uint) left));
old_buffer->pre_force_close_horizon=
old_buffer->offset + old_buffer->size;
/* decrease offset */
new_buff_beginning-= log_descriptor.bc.current_page_fill;
current_page_fill= log_descriptor.bc.current_page_fill;
memset(log_descriptor.bc.ptr, TRANSLOG_FILLER, left);
log_descriptor.bc.buffer->size+= left;
old_buffer->size+= left;
DBUG_PRINT("info", ("Finish Page buffer #%u: 0x%lx "
"Size: %lu",
(uint) log_descriptor.bc.buffer->buffer_no,
(ulong) log_descriptor.bc.buffer,
(ulong) log_descriptor.bc.buffer->size));
DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no ==
(uint) old_buffer->buffer_no,
(ulong) old_buffer,
(ulong) old_buffer->size));
DBUG_ASSERT(old_buffer->buffer_no ==
log_descriptor.bc.buffer_no);
}
else
......@@ -7508,12 +7638,22 @@ static void translog_force_current_buffer_to_finish()
pthread_cond_broadcast(&old_buffer->waiting_filling_buffer);
if (left)
{
if (log_descriptor.flags &
(TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION))
memcpy(new_buffer->buffer, data, current_page_fill);
else
{
/*
TODO: do not copy beginning of the page if we have no CRC or sector
checks on
This page header does not change if we add more data to the page so
we can not copy it and will not overwrite later
*/
memcpy(new_buffer->buffer, data, current_page_fill);
new_buffer->skipped_data= current_page_fill;
#ifndef DBUG_OFF
memset(new_buffer->buffer, 0xa5, current_page_fill);
#endif
DBUG_ASSERT(new_buffer->skipped_data < TRANSLOG_PAGE_SIZE);
}
}
old_buffer->next_buffer_offset= new_buffer->offset;
translog_buffer_lock(new_buffer);
......@@ -7561,6 +7701,7 @@ void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn)
{
log_descriptor.next_pass_max_lsn= lsn;
log_descriptor.max_lsn_requester= pthread_self();
pthread_cond_broadcast(&log_descriptor.new_goal_cond);
}
while (flush_no == log_descriptor.flush_no)
{
......@@ -7572,67 +7713,79 @@ void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn)
/**
@brief Flush the log up to given LSN (included)
@brief sync() range of files (inclusive) and directory (by request)
@param lsn log record serial number up to which (inclusive)
the log has to be flushed
@param min min internal file number to flush
@param max max internal file number to flush
@param sync_dir need sync directory
@return Operation status
return Operation status
@retval 0 OK
@retval 1 Error
*/
my_bool translog_flush(TRANSLOG_ADDRESS lsn)
static my_bool translog_sync_files(uint32 min, uint32 max,
my_bool sync_dir)
{
LSN sent_to_disk= LSN_IMPOSSIBLE;
TRANSLOG_ADDRESS flush_horizon;
uint fn, i;
dirty_buffer_mask_t dirty_buffer_mask;
uint8 last_buffer_no, start_buffer_no;
uint fn;
my_bool rc= 0;
DBUG_ENTER("translog_flush");
DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
LINT_INIT(sent_to_disk);
ulonglong flush_interval;
DBUG_ENTER("translog_sync_files");
DBUG_PRINT("info", ("min: %lu max: %lu sync dir: %d",
(ulong) min, (ulong) max, (int) sync_dir));
DBUG_ASSERT(min <= max);
pthread_mutex_lock(&log_descriptor.log_flush_lock);
DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
LSN_IN_PARTS(log_descriptor.flushed)));
if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
flush_interval= group_commit_wait;
if (flush_interval)
flush_start= my_micro_time();
for (fn= min; fn <= max; fn++)
{
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
DBUG_RETURN(0);
}
if (log_descriptor.flush_in_progress)
TRANSLOG_FILE *file= get_logfile_by_number(fn);
DBUG_ASSERT(file != NULL);
if (!file->is_sync)
{
translog_flush_set_new_goal_and_wait(lsn);
if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self()))
if (my_sync(file->handler.file, MYF(MY_WME)))
{
/* fix lsn if it was horizon */
if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0)
lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer);
translog_flush_wait_for_end(lsn);
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
DBUG_RETURN(0);
rc= 1;
translog_stop_writing();
DBUG_RETURN(rc);
}
translog_syncs++;
file->is_sync= 1;
}
log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
}
log_descriptor.flush_in_progress= 1;
flush_horizon= log_descriptor.previous_flush_horizon;
DBUG_PRINT("info", ("flush_in_progress is set"));
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
translog_lock();
if (log_descriptor.is_everything_flushed)
if (sync_dir)
{
DBUG_PRINT("info", ("everything is flushed"));
rc= (translog_status == TRANSLOG_READONLY);
translog_unlock();
goto out;
if (!(rc= sync_dir(log_descriptor.directory_fd,
MYF(MY_WME | MY_IGNORE_BADFD))))
translog_syncs++;
}
DBUG_RETURN(rc);
}
/*
@brief Flushes buffers with LSNs in them less or equal address <lsn>
@param lsn address up to which all LSNs should be flushed,
can be reset to real last LSN address
@parem sent_to_disk returns 'sent to disk' position
@param flush_horizon returns horizon of the flush
@note About terminology see comment to translog_flush().
*/
void translog_flush_buffers(TRANSLOG_ADDRESS *lsn,
TRANSLOG_ADDRESS *sent_to_disk,
TRANSLOG_ADDRESS *flush_horizon)
{
dirty_buffer_mask_t dirty_buffer_mask;
uint i;
uint8 last_buffer_no, start_buffer_no;
DBUG_ENTER("translog_flush_buffers");
/*
We will recheck information when will lock buffers one by
one so we can use unprotected read here (this is just for
......@@ -7656,13 +7809,13 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
/*
if LSN up to which we have to flush bigger then maximum LSN of previous
buffer and at least one LSN was saved in the current buffer (last_lsn !=
LSN_IMPOSSIBLE) then we better finish the current buffer.
LSN_IMPOSSIBLE) then we have to close the current buffer.
*/
if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 &&
if (cmp_translog_addr(*lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 &&
log_descriptor.bc.buffer->last_lsn != LSN_IMPOSSIBLE)
{
struct st_translog_buffer *buffer= log_descriptor.bc.buffer;
lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */
*lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */
DBUG_PRINT("info", ("LSN to flush fixed to last lsn: (%lu,0x%lx)",
LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn)));
last_buffer_no= log_descriptor.bc.buffer_no;
......@@ -7676,8 +7829,10 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
TRANSLOG_BUFFERS_NO);
translog_unlock();
}
sent_to_disk= translog_get_sent_to_disk();
if (cmp_translog_addr(lsn, sent_to_disk) > 0)
/* flush buffers */
*sent_to_disk= translog_get_sent_to_disk();
if (cmp_translog_addr(*lsn, *sent_to_disk) > 0)
{
DBUG_PRINT("info", ("Start buffer #: %u last buffer #: %u",
......@@ -7697,53 +7852,238 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
LSN_IN_PARTS(buffer->last_lsn),
(buffer->file ?
"dirty" : "closed")));
if (buffer->prev_last_lsn <= lsn &&
if (buffer->prev_last_lsn <= *lsn &&
buffer->file != NULL)
{
DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size);
flush_horizon= buffer->offset + buffer->size;
DBUG_ASSERT(*flush_horizon <= buffer->offset + buffer->size);
*flush_horizon= (buffer->pre_force_close_horizon != LSN_IMPOSSIBLE ?
buffer->pre_force_close_horizon :
buffer->offset + buffer->size);
/* pre_force_close_horizon is reset during new buffer start */
DBUG_PRINT("info", ("flush_horizon: (%lu,0x%lx)",
LSN_IN_PARTS(*flush_horizon)));
DBUG_ASSERT(*flush_horizon <= log_descriptor.horizon);
translog_buffer_flush(buffer);
}
translog_buffer_unlock(buffer);
i= (i + 1) % TRANSLOG_BUFFERS_NO;
} while (i != last_buffer_no);
sent_to_disk= translog_get_sent_to_disk();
*sent_to_disk= translog_get_sent_to_disk();
}
/* sync files from previous flush till current one */
for (fn= LSN_FILE_NO(log_descriptor.flushed); fn <= LSN_FILE_NO(lsn); fn++)
DBUG_VOID_RETURN;
}
/**
@brief Flush the log up to given LSN (included)
@param lsn log record serial number up to which (inclusive)
the log has to be flushed
@return Operation status
@retval 0 OK
@retval 1 Error
@note
- Non group commit logic: Commits made in passes. Thread which started
flush first is performing actual flush, other threads sets new goal (LSN)
of the next pass (if it is maximum) and waits for the pass end or just
wait for the pass end.
- If hard group commit enabled and rate set to zero:
The first thread sends all changed buffers to disk. This is repeated
as long as there are new LSNs added. The process can not loop
forever because we have limited number of threads and they will wait
for the data to be synced.
Pseudo code:
do
send changed buffers to disk
while new_goal
sync
- If hard group commit switched ON and less than rate microseconds has
passed from last sync, then after buffers have been sent to disk
wait until rate microseconds has passed since last sync, do sync and return.
This ensures that if we call sync infrequently we don't do any waits.
- If soft group commit enabled everything works as with 'non group commit'
but the thread doesn't do any real sync(). If rate is not zero the
sync() will be performed by a service thread with the given rate
when needed (new LSN appears).
@note Terminology:
'sent to disk' means written to disk but not sync()ed,
'flushed' mean sent to disk and synced().
*/
my_bool translog_flush(TRANSLOG_ADDRESS lsn)
{
struct timespec abstime;
ulonglong flush_interval;
ulonglong time_spent;
LSN sent_to_disk= LSN_IMPOSSIBLE;
TRANSLOG_ADDRESS flush_horizon;
my_bool rc= 0;
my_bool hgroup_commit_at_start;
DBUG_ENTER("translog_flush");
DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
LINT_INIT(sent_to_disk);
pthread_mutex_lock(&log_descriptor.log_flush_lock);
DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
LSN_IN_PARTS(log_descriptor.flushed)));
if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
{
TRANSLOG_FILE *file= get_logfile_by_number(fn);
DBUG_ASSERT(file != NULL);
if (!file->is_sync)
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
DBUG_RETURN(0);
}
if (log_descriptor.flush_in_progress)
{
if (my_sync(file->handler.file, MYF(MY_WME)))
translog_lock();
/* fix lsn if it was horizon */
if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0)
lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer);
translog_unlock();
translog_flush_set_new_goal_and_wait(lsn);
if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self()))
{
rc= 1;
translog_stop_writing();
sent_to_disk= LSN_IMPOSSIBLE;
/*
translog_flush_wait_for_end() release log_flush_lock while is
waiting then acquire it again
*/
translog_flush_wait_for_end(lsn);
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
DBUG_RETURN(0);
}
log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
}
log_descriptor.flush_in_progress= 1;
flush_horizon= log_descriptor.previous_flush_horizon;
DBUG_PRINT("info", ("flush_in_progress is set, flush_horizon: (%lu,0x%lx)",
LSN_IN_PARTS(flush_horizon)));
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
hgroup_commit_at_start= hard_group_commit;
if (hgroup_commit_at_start)
flush_interval= group_commit_wait;
translog_lock();
if (log_descriptor.is_everything_flushed)
{
DBUG_PRINT("info", ("everything is flushed"));
translog_unlock();
pthread_mutex_lock(&log_descriptor.log_flush_lock);
goto out;
}
file->is_sync= 1;
for (;;)
{
/* Following function flushes buffers and makes translog_unlock() */
translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon);
if (!hgroup_commit_at_start)
break; /* flush pass is ended */
retest:
/*
We do not check time here because pthread_mutex_lock rarely takes
a lot of time so we can sacrifice a bit precision to performance
(taking into account that my_micro_time() might be expensive call).
*/
if (flush_interval == 0)
break; /* flush pass is ended */
pthread_mutex_lock(&log_descriptor.log_flush_lock);
if (log_descriptor.next_pass_max_lsn == LSN_IMPOSSIBLE)
{
if (flush_interval == 0 ||
(time_spent= (my_micro_time() - flush_start)) >= flush_interval)
{
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
break;
}
DBUG_PRINT("info", ("flush waits: %llu interval: %llu spent: %llu",
flush_interval - time_spent,
flush_interval, time_spent));
/* wait time or next goal */
set_timespec_nsec(abstime, flush_interval - time_spent);
pthread_cond_timedwait(&log_descriptor.new_goal_cond,
&log_descriptor.log_flush_lock,
&abstime);
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
DBUG_PRINT("info", ("retest conditions"));
goto retest;
}
/* take next goal */
lsn= log_descriptor.next_pass_max_lsn;
log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
/* prevent other thread from continue */
log_descriptor.max_lsn_requester= pthread_self();
DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)",
LSN_IN_PARTS(lsn)));
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
/* next flush pass */
DBUG_PRINT("info", ("next flush pass"));
translog_lock();
}
if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
(LSN_FILE_NO(log_descriptor.previous_flush_horizon) !=
/*
sync() files from previous flush till current one
*/
if (!soft_sync || hgroup_commit_at_start)
{
if ((rc=
translog_sync_files(LSN_FILE_NO(log_descriptor.flushed),
LSN_FILE_NO(lsn),
sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
(LSN_FILE_NO(log_descriptor.
previous_flush_horizon) !=
LSN_FILE_NO(flush_horizon) ||
((LSN_OFFSET(log_descriptor.previous_flush_horizon) - 1) /
(LSN_OFFSET(log_descriptor.
previous_flush_horizon) /
TRANSLOG_PAGE_SIZE) !=
((LSN_OFFSET(flush_horizon) - 1) / TRANSLOG_PAGE_SIZE)))
rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
(LSN_OFFSET(flush_horizon) /
TRANSLOG_PAGE_SIZE)))))
{
sent_to_disk= LSN_IMPOSSIBLE;
pthread_mutex_lock(&log_descriptor.log_flush_lock);
goto out;
}
/* keep values for soft sync() and forced sync() actual */
{
uint32 fileno= LSN_FILE_NO(lsn);
my_atomic_rwlock_wrlock(&soft_sync_rwl);
my_atomic_store32(&soft_sync_min, fileno);
my_atomic_store32(&soft_sync_max, fileno);
my_atomic_rwlock_wrunlock(&soft_sync_rwl);
}
}
else
{
my_atomic_rwlock_wrlock(&soft_sync_rwl);
my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn));
my_atomic_store32(&soft_need_sync, 1);
my_atomic_rwlock_wrunlock(&soft_sync_rwl);
}
DBUG_ASSERT(flush_horizon <= log_descriptor.horizon);
pthread_mutex_lock(&log_descriptor.log_flush_lock);
log_descriptor.previous_flush_horizon= flush_horizon;
out:
pthread_mutex_lock(&log_descriptor.log_flush_lock);
if (sent_to_disk != LSN_IMPOSSIBLE)
log_descriptor.flushed= sent_to_disk;
log_descriptor.flush_in_progress= 0;
log_descriptor.flush_no++;
DBUG_PRINT("info", ("flush_in_progress is dropped"));
pthread_mutex_unlock(&log_descriptor.log_flush_lock);\
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
pthread_cond_broadcast(&log_descriptor.log_flush_cond);
DBUG_RETURN(rc);
}
......@@ -8113,6 +8453,8 @@ LSN translog_first_theoretical_lsn()
my_bool translog_purge(TRANSLOG_ADDRESS low)
{
uint32 last_need_file= LSN_FILE_NO(low);
uint32 min_unsync;
int soft;
TRANSLOG_ADDRESS horizon= translog_get_horizon();
int rc= 0;
DBUG_ENTER("translog_purge");
......@@ -8120,12 +8462,26 @@ my_bool translog_purge(TRANSLOG_ADDRESS low)
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
soft= soft_sync;
my_atomic_rwlock_wrlock(&soft_sync_rwl);
min_unsync= my_atomic_load32(&soft_sync_min);
my_atomic_rwlock_wrunlock(&soft_sync_rwl);
DBUG_PRINT("info", ("min_unsync: %lu", (ulong) min_unsync));
if (soft && min_unsync < last_need_file)
{
last_need_file= min_unsync;
DBUG_PRINT("info", ("last_need_file set to %lu", (ulong)last_need_file));
}
pthread_mutex_lock(&log_descriptor.purger_lock);
DBUG_PRINT("info", ("last_lsn_checked file: %lu:",
(ulong) log_descriptor.last_lsn_checked));
if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file)
{
uint32 i;
uint32 min_file= translog_first_file(horizon, 1);
DBUG_ASSERT(min_file != 0); /* log is already started */
DBUG_PRINT("info", ("min_file: %lu:",(ulong) min_file));
for(i= min_file; i < last_need_file && rc == 0; i++)
{
LSN lsn= translog_get_file_max_lsn_stored(i);
......@@ -8356,6 +8712,159 @@ my_bool translog_log_debug_info(TRN *trn __attribute__((unused)),
}
/**
Sets soft sync mode
@param mode TRUE if we need switch soft sync on else off
*/
void translog_soft_sync(my_bool mode)
{
soft_sync= mode;
}
/**
Sets hard group commit
@param mode TRUE if we need switch hard group commit on else off
*/
void translog_hard_group_commit(my_bool mode)
{
hard_group_commit= mode;
}
/**
@brief forced log sync (used when we are switching modes)
*/
void translog_sync()
{
uint32 max= get_current_logfile()->number;
uint32 min;
DBUG_ENTER("ma_translog_sync");
my_atomic_rwlock_rdlock(&soft_sync_rwl);
min= my_atomic_load32(&soft_sync_min);
my_atomic_rwlock_rdunlock(&soft_sync_rwl);
if (!min)
min= max;
translog_sync_files(min, max, sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS);
DBUG_VOID_RETURN;
}
/**
@brief set rate for group commit
@param interval interval to set.
@note We use this function with additional variable because have to
restart service thread with new value which we can't make inside changing
variable routine (update_maria_group_commit_interval)
*/
void translog_set_group_commit_interval(uint32 interval)
{
DBUG_ENTER("translog_set_group_commit_interval");
group_commit_wait= interval;
DBUG_PRINT("info", ("wait: %llu",
(ulonglong)group_commit_wait));
DBUG_VOID_RETURN;
}
/**
@brief syncing service thread
*/
static pthread_handler_t
ma_soft_sync_background( void *arg __attribute__((unused)))
{
my_thread_init();
{
DBUG_ENTER("ma_soft_sync_background");
for(;;)
{
ulonglong prev_loop= my_micro_time();
ulonglong time, sleep;
uint32 min, max, sync_request;
my_atomic_rwlock_rdlock(&soft_sync_rwl);
min= my_atomic_load32(&soft_sync_min);
max= my_atomic_load32(&soft_sync_max);
sync_request= my_atomic_load32(&soft_need_sync);
my_atomic_store32(&soft_sync_min, max);
my_atomic_store32(&soft_need_sync, 0);
my_atomic_rwlock_rdunlock(&soft_sync_rwl);
sleep= group_commit_wait;
if (sync_request)
translog_sync_files(min, max, FALSE);
time= my_micro_time() - prev_loop;
if (time > sleep)
sleep= 0;
else
sleep-= time;
if (my_service_thread_sleep(&soft_sync_control, sleep))
break;
}
my_service_thread_signal_end(&soft_sync_control);
my_thread_end();
DBUG_RETURN(0);
}
}
/**
@brief Starts syncing thread
*/
int translog_soft_sync_start(void)
{
pthread_t th;
int res= 0;
uint32 min, max;
DBUG_ENTER("translog_soft_sync_start");
/* check and init variables */
my_atomic_rwlock_rdlock(&soft_sync_rwl);
min= my_atomic_load32(&soft_sync_min);
max= my_atomic_load32(&soft_sync_max);
if (!max)
my_atomic_store32(&soft_sync_max, (max= get_current_logfile()->number));
if (!min)
my_atomic_store32(&soft_sync_min, max);
my_atomic_store32(&soft_need_sync, 1);
my_atomic_rwlock_rdunlock(&soft_sync_rwl);
if (!(res= ma_service_thread_control_init(&soft_sync_control)))
if (!(res= pthread_create(&th, NULL, ma_soft_sync_background, NULL)))
soft_sync_control.status= THREAD_RUNNING;
DBUG_RETURN(res);
}
/**
@brief Stops syncing thread
*/
void translog_soft_sync_end(void)
{
DBUG_ENTER("translog_soft_sync_end");
if (soft_sync_control.inited)
{
ma_service_thread_control_end(&soft_sync_control);
}
DBUG_VOID_RETURN;
}
#ifdef MARIA_DUMP_LOG
#include <my_getopt.h>
extern void translog_example_table_init();
......
......@@ -342,6 +342,14 @@ enum enum_translog_status
TRANSLOG_SHUTDOWN /* going to shutdown the loghandler */
};
extern enum enum_translog_status translog_status;
extern ulonglong translog_syncs; /* Number of sync()s */
void translog_soft_sync(my_bool mode);
void translog_hard_group_commit(my_bool mode);
int translog_soft_sync_start(void);
void translog_soft_sync_end(void);
void translog_sync();
void translog_set_group_commit_interval(uint32 interval);
/*
all the rest added because of recovery; should we make
......@@ -439,6 +447,14 @@ typedef struct st_log_record_type_descriptor
extern LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
typedef enum
{
TRANSLOG_GCOMMIT_NONE,
TRANSLOG_GCOMMIT_HARD,
TRANSLOG_GCOMMIT_SOFT
} enum_maria_group_commit;
extern ulong maria_group_commit;
extern ulong maria_group_commit_interval;
typedef enum
{
TRANSLOG_PURGE_IMMIDIATE,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment