Commit 3967b6cf authored by unknown's avatar unknown

WL#4401 "Maria - deserialize translog_flush() calls"

storage/maria/ma_loghandler.c:
  New flush procedure added.
storage/maria/unittest/Makefile.am:
  Test of many multi-thread flush added
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
  Test of many multi-thread flush added
parent 39049add
...@@ -77,9 +77,12 @@ typedef union ...@@ -77,9 +77,12 @@ typedef union
normal circumstances (less then half of one and full other, or just normal circumstances (less then half of one and full other, or just
switched one and other), But if we met end of the file in the middle and switched one and other), But if we met end of the file in the middle and
have to switch buffer it will be 3. + 1 buffer for flushing/writing. have to switch buffer it will be 3. + 1 buffer for flushing/writing.
We have a bigger number here for higher concurrency. We have a bigger number here for higher concurrency and to make division
faster.
The number should be power of 2 to be fast.
*/ */
#define TRANSLOG_BUFFERS_NO 5 #define TRANSLOG_BUFFERS_NO 8
/* number of bytes (+ header) which can be unused on first page in sequence */ /* number of bytes (+ header) which can be unused on first page in sequence */
#define TRANSLOG_MINCHUNK_CONTENT 1 #define TRANSLOG_MINCHUNK_CONTENT 1
/* version of log file */ /* version of log file */
...@@ -100,7 +103,13 @@ struct st_translog_buffer ...@@ -100,7 +103,13 @@ struct st_translog_buffer
pagecache_inject() pagecache_inject()
*/ */
uchar buffer[TRANSLOG_WRITE_BUFFER]; uchar buffer[TRANSLOG_WRITE_BUFFER];
/*
Maximum LSN of records which ends in this buffer (or IMPOSSIBLE_LSN
if no LSNs ends here)
*/
LSN last_lsn; LSN last_lsn;
/* last_lsn of previous buffer or IMPOSSIBLE_LSN if it is very first one */
LSN prev_last_lsn;
/* This buffer offset in the file */ /* This buffer offset in the file */
TRANSLOG_ADDRESS offset; TRANSLOG_ADDRESS offset;
/* /*
...@@ -197,6 +206,8 @@ struct st_buffer_cursor ...@@ -197,6 +206,8 @@ struct st_buffer_cursor
}; };
typedef uint8 dirty_buffer_mask_t;
struct st_translog_descriptor struct st_translog_descriptor
{ {
/* *** Parameters of the log handler *** */ /* *** Parameters of the log handler *** */
...@@ -245,6 +256,10 @@ struct st_translog_descriptor ...@@ -245,6 +256,10 @@ struct st_translog_descriptor
File directory_fd; File directory_fd;
/* buffers for log writing */ /* buffers for log writing */
struct st_translog_buffer buffers[TRANSLOG_BUFFERS_NO]; struct st_translog_buffer buffers[TRANSLOG_BUFFERS_NO];
/* Mask where 1 in position N mean that buffer N is not flushed */
dirty_buffer_mask_t dirty_buffer_mask;
/* The above variable protection */
pthread_mutex_t dirty_buffer_mask_lock;
/* /*
horizon - visible end of the log (here is absolute end of the log: horizon - visible end of the log (here is absolute end of the log:
position where next chunk can start position where next chunk can start
...@@ -276,6 +291,7 @@ struct st_translog_descriptor ...@@ -276,6 +291,7 @@ struct st_translog_descriptor
be removed in v1.5 be removed in v1.5
*/ */
pthread_mutex_t log_flush_lock; pthread_mutex_t log_flush_lock;
pthread_cond_t log_flush_cond;
/* Protects changing of headers of finished files (max_lsn) */ /* Protects changing of headers of finished files (max_lsn) */
pthread_mutex_t file_header_lock; pthread_mutex_t file_header_lock;
...@@ -303,6 +319,11 @@ struct st_translog_descriptor ...@@ -303,6 +319,11 @@ struct st_translog_descriptor
is generated. is generated.
*/ */
my_bool is_everything_flushed; my_bool is_everything_flushed;
/* True when flush pass is in progress */
my_bool flush_in_progress;
/* Next flush pass variables */
TRANSLOG_ADDRESS next_pass_max_lsn;
pthread_t max_lsn_requester;
}; };
static struct st_translog_descriptor log_descriptor; static struct st_translog_descriptor log_descriptor;
...@@ -785,6 +806,7 @@ void translog_stop_writing() ...@@ -785,6 +806,7 @@ void translog_stop_writing()
translog_status= (translog_status == TRANSLOG_SHUTDOWN ? translog_status= (translog_status == TRANSLOG_SHUTDOWN ?
TRANSLOG_UNINITED : TRANSLOG_UNINITED :
TRANSLOG_READONLY); TRANSLOG_READONLY);
log_descriptor.is_everything_flushed= 1;
log_descriptor.open_flags= O_BINARY | O_RDONLY; log_descriptor.open_flags= O_BINARY | O_RDONLY;
DBUG_ASSERT(0); DBUG_ASSERT(0);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -1373,7 +1395,9 @@ LSN translog_get_file_max_lsn_stored(uint32 file) ...@@ -1373,7 +1395,9 @@ LSN translog_get_file_max_lsn_stored(uint32 file)
static my_bool translog_buffer_init(struct st_translog_buffer *buffer) static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
{ {
DBUG_ENTER("translog_buffer_init"); DBUG_ENTER("translog_buffer_init");
buffer->last_lsn= LSN_IMPOSSIBLE; buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx",
(ulong) buffer));
/* This Buffer File */ /* This Buffer File */
buffer->file= NULL; buffer->file= NULL;
buffer->overlay= 0; buffer->overlay= 0;
...@@ -1972,7 +1996,9 @@ static void translog_start_buffer(struct st_translog_buffer *buffer, ...@@ -1972,7 +1996,9 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
(ulong) LSN_OFFSET(log_descriptor.horizon), (ulong) LSN_OFFSET(log_descriptor.horizon),
(ulong) LSN_OFFSET(log_descriptor.horizon))); (ulong) LSN_OFFSET(log_descriptor.horizon)));
DBUG_ASSERT(buffer_no == buffer->buffer_no); DBUG_ASSERT(buffer_no == buffer->buffer_no);
buffer->last_lsn= LSN_IMPOSSIBLE; buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx",
(ulong) buffer));
buffer->offset= log_descriptor.horizon; buffer->offset= log_descriptor.horizon;
buffer->next_buffer_offset= LSN_IMPOSSIBLE; buffer->next_buffer_offset= LSN_IMPOSSIBLE;
buffer->file= get_current_logfile(); buffer->file= get_current_logfile();
...@@ -1987,6 +2013,10 @@ static void translog_start_buffer(struct st_translog_buffer *buffer, ...@@ -1987,6 +2013,10 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
cursor->chaser, (ulong) cursor->buffer->size, cursor->chaser, (ulong) cursor->buffer->size,
(ulong) (cursor->ptr - cursor->buffer->buffer))); (ulong) (cursor->ptr - cursor->buffer->buffer)));
translog_check_cursor(cursor); translog_check_cursor(cursor);
pthread_mutex_lock(&log_descriptor.dirty_buffer_mask_lock);
log_descriptor.dirty_buffer_mask|= (1 << buffer->buffer_no);
pthread_mutex_unlock(&log_descriptor.dirty_buffer_mask_lock);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -2046,7 +2076,6 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon, ...@@ -2046,7 +2076,6 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
if (new_file) if (new_file)
{ {
/* move the horizon to the next file and its header page */ /* move the horizon to the next file and its header page */
(*horizon)+= LSN_ONE_FILE; (*horizon)+= LSN_ONE_FILE;
(*horizon)= LSN_REPLACE_OFFSET(*horizon, TRANSLOG_PAGE_SIZE); (*horizon)= LSN_REPLACE_OFFSET(*horizon, TRANSLOG_PAGE_SIZE);
...@@ -2065,6 +2094,13 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon, ...@@ -2065,6 +2094,13 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
translog_start_buffer(new_buffer, cursor, new_buffer_no); translog_start_buffer(new_buffer, cursor, new_buffer_no);
} }
log_descriptor.buffers[old_buffer_no].next_buffer_offset= new_buffer->offset; log_descriptor.buffers[old_buffer_no].next_buffer_offset= new_buffer->offset;
new_buffer->prev_last_lsn=
((log_descriptor.buffers[old_buffer_no].last_lsn != LSN_IMPOSSIBLE) ?
log_descriptor.buffers[old_buffer_no].last_lsn :
log_descriptor.buffers[old_buffer_no].prev_last_lsn);
DBUG_PRINT("info", ("prev_last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
LSN_IN_PARTS(new_buffer->prev_last_lsn),
(ulong) new_buffer));
translog_new_page_header(horizon, cursor); translog_new_page_header(horizon, cursor);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2179,6 +2215,7 @@ static LSN translog_get_sent_to_disk() ...@@ -2179,6 +2215,7 @@ static LSN translog_get_sent_to_disk()
DBUG_ENTER("translog_get_sent_to_disk"); DBUG_ENTER("translog_get_sent_to_disk");
pthread_mutex_lock(&log_descriptor.sent_to_disk_lock); pthread_mutex_lock(&log_descriptor.sent_to_disk_lock);
lsn= log_descriptor.sent_to_disk; lsn= log_descriptor.sent_to_disk;
DBUG_PRINT("info", ("sent to disk up to (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
pthread_mutex_unlock(&log_descriptor.sent_to_disk_lock); pthread_mutex_unlock(&log_descriptor.sent_to_disk_lock);
DBUG_RETURN(lsn); DBUG_RETURN(lsn);
} }
...@@ -2392,7 +2429,6 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2392,7 +2429,6 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
TRANSLOG_FILE *file= buffer->file; TRANSLOG_FILE *file= buffer->file;
uint8 ver= buffer->ver; uint8 ver= buffer->ver;
DBUG_ENTER("translog_buffer_flush"); DBUG_ENTER("translog_buffer_flush");
DBUG_ASSERT(buffer->file != NULL);
DBUG_PRINT("enter", DBUG_PRINT("enter",
("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu", ("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
(uint) buffer->buffer_no, (ulong) buffer, (uint) buffer->buffer_no, (ulong) buffer,
...@@ -2401,6 +2437,9 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2401,6 +2437,9 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
(ulong) buffer->size)); (ulong) buffer->size));
translog_buffer_lock_assert_owner(buffer); translog_buffer_lock_assert_owner(buffer);
if (buffer->file == NULL)
DBUG_RETURN(0);
translog_wait_for_writers(buffer); translog_wait_for_writers(buffer);
if (buffer->file != file || buffer->offset != offset || buffer->ver != ver) if (buffer->file != file || buffer->offset != offset || buffer->ver != ver)
...@@ -2460,6 +2499,11 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2460,6 +2499,11 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
{ {
TRANSLOG_ADDRESS addr= (buffer->offset + i); TRANSLOG_ADDRESS addr= (buffer->offset + i);
TRANSLOG_VALIDATOR_DATA data; TRANSLOG_VALIDATOR_DATA data;
DBUG_PRINT("info", ("send log form %lu till %lu address: (%lu,0x%lx) "
"page #: %lu buffer size: %lu buffer: 0x%lx",
(ulong) i, (ulong) (i + TRANSLOG_PAGE_SIZE),
LSN_IN_PARTS(addr), (ulong) pg, (ulong) buffer->size,
(ulong) buffer));
data.addr= &addr; data.addr= &addr;
DBUG_ASSERT(log_descriptor.pagecache->block_size == TRANSLOG_PAGE_SIZE); DBUG_ASSERT(log_descriptor.pagecache->block_size == TRANSLOG_PAGE_SIZE);
DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size); DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
...@@ -2511,6 +2555,9 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2511,6 +2555,9 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
buffer->file= NULL; buffer->file= NULL;
buffer->overlay= 0; buffer->overlay= 0;
buffer->ver++; buffer->ver++;
pthread_mutex_lock(&log_descriptor.dirty_buffer_mask_lock);
log_descriptor.dirty_buffer_mask&= ~(1 << buffer->buffer_no);
pthread_mutex_unlock(&log_descriptor.dirty_buffer_mask_lock);
pthread_cond_broadcast(&buffer->waiting_filling_buffer); pthread_cond_broadcast(&buffer->waiting_filling_buffer);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -3352,9 +3399,12 @@ my_bool translog_init_with_table(const char *directory, ...@@ -3352,9 +3399,12 @@ my_bool translog_init_with_table(const char *directory,
id_to_share= NULL; id_to_share= NULL;
log_descriptor.directory_fd= -1; log_descriptor.directory_fd= -1;
log_descriptor.is_everything_flushed= 1; log_descriptor.is_everything_flushed= 1;
log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
(*init_table_func)(); (*init_table_func)();
compile_time_assert(sizeof(log_descriptor.dirty_buffer_mask) * 8 >=
TRANSLOG_BUFFERS_NO);
log_descriptor.dirty_buffer_mask= 0;
if (readonly) if (readonly)
log_descriptor.open_flags= O_BINARY | O_RDONLY; log_descriptor.open_flags= O_BINARY | O_RDONLY;
else else
...@@ -3369,6 +3419,9 @@ my_bool translog_init_with_table(const char *directory, ...@@ -3369,6 +3419,9 @@ my_bool translog_init_with_table(const char *directory,
MY_MUTEX_INIT_FAST) || MY_MUTEX_INIT_FAST) ||
pthread_mutex_init(&log_descriptor.log_flush_lock, pthread_mutex_init(&log_descriptor.log_flush_lock,
MY_MUTEX_INIT_FAST) || MY_MUTEX_INIT_FAST) ||
pthread_mutex_init(&log_descriptor.dirty_buffer_mask_lock,
MY_MUTEX_INIT_FAST) ||
pthread_cond_init(&log_descriptor.log_flush_cond, 0) ||
my_rwlock_init(&log_descriptor.open_files_lock, my_rwlock_init(&log_descriptor.open_files_lock,
NULL) || NULL) ||
my_init_dynamic_array(&log_descriptor.open_files, my_init_dynamic_array(&log_descriptor.open_files,
...@@ -4016,6 +4069,8 @@ void translog_destroy() ...@@ -4016,6 +4069,8 @@ void translog_destroy()
pthread_mutex_destroy(&log_descriptor.unfinished_files_lock); pthread_mutex_destroy(&log_descriptor.unfinished_files_lock);
pthread_mutex_destroy(&log_descriptor.purger_lock); pthread_mutex_destroy(&log_descriptor.purger_lock);
pthread_mutex_destroy(&log_descriptor.log_flush_lock); pthread_mutex_destroy(&log_descriptor.log_flush_lock);
pthread_mutex_destroy(&log_descriptor.dirty_buffer_mask_lock);
pthread_cond_destroy(&log_descriptor.log_flush_cond);
rwlock_destroy(&log_descriptor.open_files_lock); rwlock_destroy(&log_descriptor.open_files_lock);
delete_dynamic(&log_descriptor.open_files); delete_dynamic(&log_descriptor.open_files);
delete_dynamic(&log_descriptor.unfinished_files); delete_dynamic(&log_descriptor.unfinished_files);
...@@ -4686,10 +4741,13 @@ static translog_size_t translog_get_current_group_size() ...@@ -4686,10 +4741,13 @@ static translog_size_t translog_get_current_group_size()
static inline void set_lsn(LSN *lsn, LSN value) static inline void set_lsn(LSN *lsn, LSN value)
{ {
DBUG_ENTER("set_lsn");
translog_lock_assert_owner(); translog_lock_assert_owner();
*lsn= value; *lsn= value;
/* we generate LSN so something is not flushed in log */ /* we generate LSN so something is not flushed in log */
log_descriptor.is_everything_flushed= 0; log_descriptor.is_everything_flushed= 0;
DBUG_PRINT("info", ("new LSN appeared: (%lu,0x%lx)", LSN_IN_PARTS(value)));
DBUG_VOID_RETURN;
} }
...@@ -4779,6 +4837,9 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -4779,6 +4837,9 @@ translog_write_variable_record_1group(LSN *lsn,
rc|= translog_advance_pointer((int)(full_pages + additional_chunk3_page), rc|= translog_advance_pointer((int)(full_pages + additional_chunk3_page),
(record_rest ? record_rest + 3 : 0)); (record_rest ? record_rest + 3 : 0));
log_descriptor.bc.buffer->last_lsn= *lsn; log_descriptor.bc.buffer->last_lsn= *lsn;
DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn),
(ulong) log_descriptor.bc.buffer));
translog_unlock(); translog_unlock();
...@@ -4902,6 +4963,9 @@ translog_write_variable_record_1chunk(LSN *lsn, ...@@ -4902,6 +4963,9 @@ translog_write_variable_record_1chunk(LSN *lsn,
&log_descriptor.bc, &log_descriptor.bc,
parts->total_record_length, parts); parts->total_record_length, parts);
log_descriptor.bc.buffer->last_lsn= *lsn; log_descriptor.bc.buffer->last_lsn= *lsn;
DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn),
(ulong) log_descriptor.bc.buffer));
translog_unlock(); translog_unlock();
/* /*
...@@ -5246,6 +5310,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5246,6 +5310,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
uint groups_per_page= (page_capacity - header_fixed_part) / (7 + 1); uint groups_per_page= (page_capacity - header_fixed_part) / (7 + 1);
uint file_of_the_first_group; uint file_of_the_first_group;
int pages_to_skip; int pages_to_skip;
struct st_translog_buffer *buffer_of_last_lsn;
DBUG_ENTER("translog_write_variable_record_mgroup"); DBUG_ENTER("translog_write_variable_record_mgroup");
translog_lock_assert_owner(); translog_lock_assert_owner();
...@@ -5480,6 +5545,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5480,6 +5545,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
((page_capacity - ((page_capacity -
header_fixed_part) / (7 + 1)) * header_fixed_part) / (7 + 1)) *
(chunk0_pages - 1)) * (7 + 1)); (chunk0_pages - 1)) * (7 + 1));
buffer_of_last_lsn= log_descriptor.bc.buffer;
translog_unlock(); translog_unlock();
if (buffer_to_flush != NULL) if (buffer_to_flush != NULL)
...@@ -5587,6 +5653,10 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5587,6 +5653,10 @@ translog_write_variable_record_mgroup(LSN *lsn,
*/ */
translog_lock(); translog_lock();
set_lsn(lsn, horizon); set_lsn(lsn, horizon);
buffer_of_last_lsn->last_lsn= *lsn;
DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
LSN_IN_PARTS(buffer_of_last_lsn->last_lsn),
(ulong) buffer_of_last_lsn));
if (log_record_type_descriptor[type].inwrite_hook && if (log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook) (type, trn, (*log_record_type_descriptor[type].inwrite_hook) (type, trn,
tbl_info, tbl_info,
...@@ -5642,8 +5712,6 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5642,8 +5712,6 @@ translog_write_variable_record_mgroup(LSN *lsn,
*chunk0_header= (uchar) (TRANSLOG_CHUNK_LSN | TRANSLOG_CHUNK_0_CONT); *chunk0_header= (uchar) (TRANSLOG_CHUNK_LSN | TRANSLOG_CHUNK_0_CONT);
} while (chunk0_pages != 0); } while (chunk0_pages != 0);
translog_buffer_lock(cursor.buffer); translog_buffer_lock(cursor.buffer);
if (cmp_translog_addr(cursor.buffer->last_lsn, *lsn) < 0)
cursor.buffer->last_lsn= *lsn;
translog_buffer_decrease_writers(cursor.buffer); translog_buffer_decrease_writers(cursor.buffer);
translog_buffer_unlock(cursor.buffer); translog_buffer_unlock(cursor.buffer);
rc= 0; rc= 0;
...@@ -5896,6 +5964,9 @@ static my_bool translog_write_fixed_record(LSN *lsn, ...@@ -5896,6 +5964,9 @@ static my_bool translog_write_fixed_record(LSN *lsn,
parts->total_record_length, parts); parts->total_record_length, parts);
log_descriptor.bc.buffer->last_lsn= *lsn; log_descriptor.bc.buffer->last_lsn= *lsn;
DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn),
(ulong) log_descriptor.bc.buffer));
err: err:
translog_unlock(); translog_unlock();
...@@ -7301,7 +7372,8 @@ static void translog_force_current_buffer_to_finish() ...@@ -7301,7 +7372,8 @@ static void translog_force_current_buffer_to_finish()
#endif #endif
/* /*
Now only one thread can flush log (buffer can flush many threads but Now only one thread can flush log (buffer can flush many threads but
log flush is serialized) so no other thread can set is_closing_buffer log flush log flush where this function is used can do only one thread)
so no other thread can set is_closing_buffer.
*/ */
DBUG_ASSERT(!old_buffer->is_closing_buffer); DBUG_ASSERT(!old_buffer->is_closing_buffer);
old_buffer->is_closing_buffer= 1; /* Other flushes will wait */ old_buffer->is_closing_buffer= 1; /* Other flushes will wait */
...@@ -7362,6 +7434,51 @@ static void translog_force_current_buffer_to_finish() ...@@ -7362,6 +7434,51 @@ static void translog_force_current_buffer_to_finish()
} }
/**
@brief Waits while given lsn will be flushed
@param lsn log record serial number up to which (inclusive)
the log has to be flushed
*/
void translog_flush_wait_for_end(LSN lsn)
{
DBUG_ENTER("translog_flush_wait_for_end");
DBUG_PRINT("enter", ("LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
safe_mutex_assert_owner(&log_descriptor.log_flush_lock);
while (cmp_translog_addr(log_descriptor.flushed, lsn) < 0)
pthread_cond_wait(&log_descriptor.log_flush_cond,
&log_descriptor.log_flush_lock);
DBUG_VOID_RETURN;
}
/**
@brief Sets goal for the next flush pass and waits for this pass end.
@param lsn log record serial number up to which (inclusive)
the log has to be flushed
*/
void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn)
{
DBUG_ENTER("translog_flush_set_new_goal_and_wait");
DBUG_PRINT("enter", ("LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
safe_mutex_assert_owner(&log_descriptor.log_flush_lock);
if (cmp_translog_addr(lsn, log_descriptor.next_pass_max_lsn) > 0)
{
log_descriptor.next_pass_max_lsn= lsn;
log_descriptor.max_lsn_requester= pthread_self();
}
while (log_descriptor.flush_in_progress)
{
pthread_cond_wait(&log_descriptor.log_flush_cond,
&log_descriptor.log_flush_lock);
}
DBUG_VOID_RETURN;
}
/** /**
@brief Flush the log up to given LSN (included) @brief Flush the log up to given LSN (included)
...@@ -7372,161 +7489,141 @@ static void translog_force_current_buffer_to_finish() ...@@ -7372,161 +7489,141 @@ static void translog_force_current_buffer_to_finish()
@retval 0 OK @retval 0 OK
@retval 1 Error @retval 1 Error
@todo LOG: when a log write fails, we should not write to this log anymore
(if we add more log records to this log they will be unreadable: we will hit
the broken log record): all translog_flush() should be made to fail (because
translog_flush() is when a a transaction wants something durable and we
cannot make anything durable as log is corrupted). For that, a "my_bool
st_translog_descriptor::write_error" could be set to 1 when a
translog_write_record() or translog_flush() fails, and translog_flush()
would test this var (and translog_write_record() could also test this var if
it wants, though it's not absolutely needed).
Then, either shut Maria down immediately, or switch to a new log (but if we
get write error after write error, that would create too many logs).
A popular open-source transactional engine intentionally crashes as soon as
a log flush fails (we however don't want to crash the entire mysqld, but
stopping all engine's operations immediately would make sense).
Same applies to translog_write_record().
@todo: remove serialization and make group commit.
*/ */
my_bool translog_flush(TRANSLOG_ADDRESS lsn) my_bool translog_flush(TRANSLOG_ADDRESS lsn)
{ {
LSN old_flushed, sent_to_disk; LSN sent_to_disk= LSN_IMPOSSIBLE;
TRANSLOG_ADDRESS flush_horizon; TRANSLOG_ADDRESS flush_horizon;
int rc= 0; uint fn, i;
/* We can't have more different files then buffers */ dirty_buffer_mask_t dirty_buffer_mask;
TRANSLOG_FILE *file_handlers[TRANSLOG_BUFFERS_NO]; uint8 last_buffer_no, start_buffer_no;
int current_file_handler= -1; my_bool rc= 0;
uint32 prev_file= 0;
my_bool full_circle= 0;
DBUG_ENTER("translog_flush"); DBUG_ENTER("translog_flush");
DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
DBUG_ASSERT(translog_status == TRANSLOG_OK || DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY); translog_status == TRANSLOG_READONLY);
LINT_INIT(sent_to_disk); LINT_INIT(sent_to_disk);
LINT_INIT(flush_horizon);
pthread_mutex_lock(&log_descriptor.log_flush_lock); pthread_mutex_lock(&log_descriptor.log_flush_lock);
translog_lock(); DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
if (log_descriptor.is_everything_flushed) LSN_IN_PARTS(log_descriptor.flushed)));
if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
{ {
DBUG_PRINT("info", ("everything is flushed")); pthread_mutex_unlock(&log_descriptor.log_flush_lock);
translog_unlock(); DBUG_RETURN(0);
goto out;
} }
flush_horizon= LSN_IMPOSSIBLE; if (log_descriptor.flush_in_progress)
old_flushed= log_descriptor.flushed;
for (;;)
{ {
uint16 buffer_no= log_descriptor.bc.buffer_no; translog_flush_set_new_goal_and_wait(lsn);
uint16 buffer_start= buffer_no; if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self()))
struct st_translog_buffer *buffer_unlock= log_descriptor.bc.buffer;
struct st_translog_buffer *buffer= log_descriptor.bc.buffer;
if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
{ {
DBUG_PRINT("info", ("already flushed: (%lu,0x%lx)", /* fix lsn if it was horizon */
LSN_IN_PARTS(log_descriptor.flushed))); if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0)
translog_unlock(); lsn= log_descriptor.bc.buffer->last_lsn;
goto out; translog_flush_wait_for_end(lsn);
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
DBUG_RETURN(0);
} }
/* send to the file if it is not sent */ log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
if (translog_status != TRANSLOG_OK) }
log_descriptor.flush_in_progress= 1;
DBUG_PRINT("info", ("flush_in_progress is set"));
pthread_mutex_unlock(&log_descriptor.log_flush_lock);
translog_lock();
if (log_descriptor.is_everything_flushed)
{ {
rc= 1; DBUG_PRINT("info", ("everything is flushed"));
rc= (translog_status == TRANSLOG_READONLY);
translog_unlock();
goto out; goto out;
} }
sent_to_disk= translog_get_sent_to_disk();
if (cmp_translog_addr(sent_to_disk, lsn) >= 0 || full_circle)
break;
do
{
buffer_no= (buffer_no + 1) % TRANSLOG_BUFFERS_NO;
buffer= log_descriptor.buffers + buffer_no;
translog_buffer_lock(buffer);
translog_buffer_unlock(buffer_unlock);
buffer_unlock= buffer;
if (buffer->file != NULL)
{
buffer_unlock= NULL;
if (buffer_start == buffer_no)
{
/* we made a circle */
full_circle= 1;
/* /*
If buffer from which we started still current we have to We will recheck information when will lock buffers one by
finish it (we will not flush intentionally more records one so we can use unprotected read here (this is just for
then was at the moment of start flushing); speed up buffers processing)
*/ */
if (buffer_start == log_descriptor.bc.buffer_no) dirty_buffer_mask= log_descriptor.dirty_buffer_mask;
DBUG_PRINT("info", ("Dirty buffer mask: %lx current buffer: %u",
(ulong) dirty_buffer_mask,
(uint) log_descriptor.bc.buffer_no));
for (i= (log_descriptor.bc.buffer_no + 1) % TRANSLOG_BUFFERS_NO;
i != log_descriptor.bc.buffer_no && !(dirty_buffer_mask & (1 << i));
i= (i + 1) % TRANSLOG_BUFFERS_NO) {}
start_buffer_no= i;
/* if we have to flush last buffer then we will finish it */
if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0)
{ {
translog_lock_assert_owner(); struct st_translog_buffer *buffer= log_descriptor.bc.buffer;
/* lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */
Here we have loghandler locked. last_buffer_no= log_descriptor.bc.buffer_no;
We are going to flush last buffer, and will not release
log_flush_lock until it happened, so we can set the flag here
and other process which going to flush will not be able read
it and return earlier then we finish the flush process. (But
other process can drop the flag if new LSN appeared (only
after translog_force_current_buffer_to_finish() call and
transaction log unlock of course))
*/
log_descriptor.is_everything_flushed= 1; log_descriptor.is_everything_flushed= 1;
translog_force_current_buffer_to_finish(); translog_force_current_buffer_to_finish();
translog_buffer_unlock(buffer);
} }
else
{
last_buffer_no= ((log_descriptor.bc.buffer_no + TRANSLOG_BUFFERS_NO -1) %
TRANSLOG_BUFFERS_NO);
translog_unlock();
} }
break; sent_to_disk= translog_get_sent_to_disk();
} if (cmp_translog_addr(lsn, sent_to_disk) > 0)
} while ((buffer_start != buffer_no) && {
cmp_translog_addr(log_descriptor.flushed, lsn) < 0);
if (buffer_unlock != NULL && buffer_unlock != buffer)
translog_buffer_unlock(buffer_unlock);
if (prev_file != LSN_FILE_NO(buffer->offset)) DBUG_PRINT("info", ("Start buffer #: %u last buffer #: %u",
(uint) start_buffer_no, (uint) last_buffer_no));
last_buffer_no= (last_buffer_no + 1) % TRANSLOG_BUFFERS_NO;
i= start_buffer_no;
do
{ {
TRANSLOG_FILE *file; struct st_translog_buffer *buffer= log_descriptor.buffers + i;
uint32 fn= LSN_FILE_NO(buffer->offset); translog_buffer_lock(buffer);
prev_file= fn; DBUG_PRINT("info", ("Check buffer: 0x%lx #: %u "
file= get_logfile_by_number(fn); "prev last LSN: (%lu,0x%lx) "
DBUG_ASSERT(file != NULL); "last LSN: (%lu,0x%lx) status: %s",
if (!file->is_sync) (ulong)(buffer),
(uint) i,
LSN_IN_PARTS(buffer->prev_last_lsn),
LSN_IN_PARTS(buffer->last_lsn),
(buffer->file ?
"dirty" : "closed")));
if (buffer->prev_last_lsn <= lsn &&
buffer->file != NULL)
{ {
current_file_handler++;
file_handlers[current_file_handler]= file;
}
/* We sync file when we are closing it => do nothing if file closed */
}
DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size); DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size);
flush_horizon= buffer->offset + buffer->size; flush_horizon= buffer->offset + buffer->size;
rc= translog_buffer_flush(buffer); translog_buffer_flush(buffer);
}
translog_buffer_unlock(buffer); translog_buffer_unlock(buffer);
if (rc) i= (i + 1) % TRANSLOG_BUFFERS_NO;
goto out; /* rc is 1 */ } while (i != last_buffer_no);
translog_lock(); sent_to_disk= translog_get_sent_to_disk();
} }
translog_unlock();
/* sync files from previous flush till current one */
for (fn= LSN_FILE_NO(log_descriptor.flushed); fn <= LSN_FILE_NO(lsn); fn++)
{ {
TRANSLOG_FILE **cur= file_handlers; TRANSLOG_FILE *file= get_logfile_by_number(fn);
TRANSLOG_FILE **end= file_handlers + current_file_handler; DBUG_ASSERT(file != NULL);
for (; cur <= end; cur++) if (!file->is_sync)
{ {
(*cur)->is_sync= 1; if (my_sync(file->handler.file, MYF(MY_WME)))
if (my_sync((*cur)->handler.file, MYF(MY_WME)))
{ {
rc= 1; rc= 1;
translog_stop_writing(); translog_stop_writing();
sent_to_disk= LSN_IMPOSSIBLE;
goto out; goto out;
} }
file->is_sync= 1;
} }
} }
log_descriptor.flushed= sent_to_disk;
/*
If we should flush (due to directory flush mode) and
previous flush horizon was not within one page border with this one.
*/
if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
(LSN_FILE_NO(log_descriptor.previous_flush_horizon) != (LSN_FILE_NO(log_descriptor.previous_flush_horizon) !=
LSN_FILE_NO(flush_horizon) || LSN_FILE_NO(flush_horizon) ||
...@@ -7536,7 +7633,13 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) ...@@ -7536,7 +7633,13 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
log_descriptor.previous_flush_horizon= flush_horizon; log_descriptor.previous_flush_horizon= flush_horizon;
out: out:
pthread_mutex_unlock(&log_descriptor.log_flush_lock); pthread_mutex_lock(&log_descriptor.log_flush_lock);
if (sent_to_disk != LSN_IMPOSSIBLE)
log_descriptor.flushed= sent_to_disk;
log_descriptor.flush_in_progress= 0;
DBUG_PRINT("info", ("flush_in_progress is dropped"));
pthread_mutex_unlock(&log_descriptor.log_flush_lock);\
pthread_cond_broadcast(&log_descriptor.log_flush_cond);
DBUG_RETURN(rc); DBUG_RETURN(rc);
} }
......
...@@ -42,6 +42,7 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \ ...@@ -42,6 +42,7 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \
ma_test_loghandler-t \ ma_test_loghandler-t \
ma_test_loghandler_multigroup-t \ ma_test_loghandler_multigroup-t \
ma_test_loghandler_multithread-t \ ma_test_loghandler_multithread-t \
ma_test_loghandler_multiflush-t \
ma_test_loghandler_pagecache-t \ ma_test_loghandler_pagecache-t \
ma_test_loghandler_long-t \ ma_test_loghandler_long-t \
ma_test_loghandler_noflush-t \ ma_test_loghandler_noflush-t \
...@@ -54,6 +55,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \ ...@@ -54,6 +55,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \
ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c sequence_storage.c sequence_storage.h ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c sequence_storage.c sequence_storage.h
ma_test_loghandler_multithread_t_SOURCES = ma_test_loghandler_multithread-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c ma_test_loghandler_multithread_t_SOURCES = ma_test_loghandler_multithread-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_multiflush_t_SOURCES = ma_test_loghandler_multithread-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_multiflush_t_CPPFLAGS = -DMULTIFLUSH_TEST
ma_test_loghandler_pagecache_t_SOURCES = ma_test_loghandler_pagecache-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c ma_test_loghandler_pagecache_t_SOURCES = ma_test_loghandler_pagecache-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_long_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c ma_test_loghandler_long_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_long_t_CPPFLAGS = -DLONG_LOG_TEST ma_test_loghandler_long_t_CPPFLAGS = -DLONG_LOG_TEST
......
...@@ -28,16 +28,35 @@ static const char *default_dbug_option; ...@@ -28,16 +28,35 @@ static const char *default_dbug_option;
#define PCACHE_SIZE (1024*1024*10) #define PCACHE_SIZE (1024*1024*10)
#define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512)
/*#define LOG_FLAGS TRANSLOG_SECTOR_PROTECTION | TRANSLOG_PAGE_CRC */ /*#define LOG_FLAGS TRANSLOG_SECTOR_PROTECTION | TRANSLOG_PAGE_CRC */
#define LOG_FLAGS 0 #define LOG_FLAGS 0
/*#define LONG_BUFFER_SIZE (1024L*1024L*1024L + 1024L*1024L*512)*/ /*#define LONG_BUFFER_SIZE (1024L*1024L*1024L + 1024L*1024L*512)*/
#ifdef MULTIFLUSH_TEST
#define LONG_BUFFER_SIZE (16384L)
#define MIN_REC_LENGTH 10
#define SHOW_DIVIDER 20
#define ITERATIONS 10000
#define FLUSH_ITERATIONS 1000
#define WRITERS 2
#define FLUSHERS 10
#else
#define LONG_BUFFER_SIZE (512L*1024L*1024L) #define LONG_BUFFER_SIZE (512L*1024L*1024L)
#define MIN_REC_LENGTH 30 #define MIN_REC_LENGTH 30
#define SHOW_DIVIDER 10 #define SHOW_DIVIDER 10
#define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512)
#define ITERATIONS 3 #define ITERATIONS 3
#define FLUSH_ITERATIONS 0
#define WRITERS 3 #define WRITERS 3
#define FLUSHERS 0
#endif
static uint number_of_writers= WRITERS; static uint number_of_writers= WRITERS;
static uint number_of_flushers= FLUSHERS;
static pthread_cond_t COND_thread_count; static pthread_cond_t COND_thread_count;
static pthread_mutex_t LOCK_thread_count; static pthread_mutex_t LOCK_thread_count;
...@@ -48,6 +67,9 @@ static LSN lsns1[WRITERS][ITERATIONS]; ...@@ -48,6 +67,9 @@ static LSN lsns1[WRITERS][ITERATIONS];
static LSN lsns2[WRITERS][ITERATIONS]; static LSN lsns2[WRITERS][ITERATIONS];
static uchar *long_buffer; static uchar *long_buffer;
static LSN last_lsn; /* For test purposes the variable allow dirty read/write */
/* /*
Get pseudo-random length of the field in Get pseudo-random length of the field in
limits [MIN_REC_LENGTH..LONG_BUFFER_SIZE] limits [MIN_REC_LENGTH..LONG_BUFFER_SIZE]
...@@ -177,6 +199,7 @@ void writer(int num) ...@@ -177,6 +199,7 @@ void writer(int num)
return; return;
} }
lsns2[num][i]= lsn; lsns2[num][i]= lsn;
last_lsn= lsn;
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
ok(1, "write records"); ok(1, "write records");
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
...@@ -205,6 +228,33 @@ static void *test_thread_writer(void *arg) ...@@ -205,6 +228,33 @@ static void *test_thread_writer(void *arg)
} }
static void *test_thread_flusher(void *arg)
{
int param= *((int*) arg);
int i;
my_thread_init();
for(i= 0; i < FLUSH_ITERATIONS; i++)
{
translog_flush(last_lsn);
pthread_mutex_lock(&LOCK_thread_count);
ok(1, "-- flush %d", param);
pthread_mutex_unlock(&LOCK_thread_count);
}
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
ok(1, "flusher finished"); /* just to show progress */
VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are
ready */
pthread_mutex_unlock(&LOCK_thread_count);
free((uchar*) arg);
my_thread_end();
return(0);
}
int main(int argc __attribute__((unused)), int main(int argc __attribute__((unused)),
char **argv __attribute__ ((unused))) char **argv __attribute__ ((unused)))
{ {
...@@ -219,7 +269,8 @@ int main(int argc __attribute__((unused)), ...@@ -219,7 +269,8 @@ int main(int argc __attribute__((unused)),
int *param, error; int *param, error;
int rc; int rc;
plan(WRITERS + ITERATIONS * WRITERS * 3); plan(WRITERS + FLUSHERS +
ITERATIONS * WRITERS * 3 + FLUSH_ITERATIONS * FLUSHERS );
bzero(&pagecache, sizeof(pagecache)); bzero(&pagecache, sizeof(pagecache));
maria_data_root= (char *)"."; maria_data_root= (char *)".";
...@@ -329,7 +380,9 @@ int main(int argc __attribute__((unused)), ...@@ -329,7 +380,9 @@ int main(int argc __attribute__((unused)),
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
while (number_of_writers != 0) while (number_of_writers != 0 || number_of_flushers != 0)
{
if (number_of_writers)
{ {
param= (int*) malloc(sizeof(int)); param= (int*) malloc(sizeof(int));
*param= number_of_writers - 1; *param= number_of_writers - 1;
...@@ -343,6 +396,21 @@ int main(int argc __attribute__((unused)), ...@@ -343,6 +396,21 @@ int main(int argc __attribute__((unused)),
thread_count++; thread_count++;
number_of_writers--; number_of_writers--;
} }
if (number_of_flushers)
{
param= (int*) malloc(sizeof(int));
*param= number_of_flushers - 1;
if ((error= pthread_create(&tid, &thr_attr, test_thread_flusher,
(void*) param)))
{
fprintf(stderr, "Got error: %d from pthread_create (errno: %d)\n",
error, errno);
exit(1);
}
thread_count++;
number_of_flushers--;
}
}
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
pthread_attr_destroy(&thr_attr); pthread_attr_destroy(&thr_attr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment