Commit 809ee8cf authored by unknown's avatar unknown

Postreview fix + some improvements of the loop.

Fixed overlaying block waiting during flushing block.
Sync directory only is file size really changed.
Fixed bug in the flush function when we access data
 in file cache and horizon value without mutex protection.

parent 74b8abc6
...@@ -222,6 +222,7 @@ struct st_translog_descriptor ...@@ -222,6 +222,7 @@ struct st_translog_descriptor
LSN flushed; LSN flushed;
/* Last LSN sent to the disk (but maybe not written yet) */ /* Last LSN sent to the disk (but maybe not written yet) */
LSN sent_to_disk; LSN sent_to_disk;
TRANSLOG_ADDRESS previous_flush_horizon;
/* All what is after this address is not sent to disk yet */ /* All what is after this address is not sent to disk yet */
TRANSLOG_ADDRESS in_buffers_only; TRANSLOG_ADDRESS in_buffers_only;
pthread_mutex_t sent_to_disk_lock; pthread_mutex_t sent_to_disk_lock;
...@@ -2070,7 +2071,7 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset) ...@@ -2070,7 +2071,7 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset)
static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
{ {
uint32 i; uint32 i, pg;
PAGECACHE_FILE file; PAGECACHE_FILE file;
DBUG_ENTER("translog_buffer_flush"); DBUG_ENTER("translog_buffer_flush");
DBUG_PRINT("enter", DBUG_PRINT("enter",
...@@ -2085,18 +2086,46 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2085,18 +2086,46 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
DBUG_ASSERT(buffer->file != -1); DBUG_ASSERT(buffer->file != -1);
translog_wait_for_writers(buffer); translog_wait_for_writers(buffer);
if (buffer->overlay && buffer->overlay->file != -1)
if (buffer->overlay && buffer->overlay->file == buffer->file &&
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
buffer->offset) > 0)
{ {
/*
This can't happen for normal translog_flush,
only during destroying the loghandler
*/
struct st_translog_buffer *overlay= buffer->overlay; struct st_translog_buffer *overlay= buffer->overlay;
TRANSLOG_ADDRESS buffer_offset= buffer->offset;
File file= buffer->file;
translog_buffer_unlock(buffer); translog_buffer_unlock(buffer);
translog_buffer_lock(overlay); translog_buffer_lock(overlay);
translog_wait_for_buffer_free(overlay); /* rechecks under mutex protection that overlay is still our overlay */
if (buffer->overlay->file == file &&
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
buffer_offset) > 0)
{
translog_wait_for_buffer_free(overlay);
}
translog_buffer_unlock(overlay); translog_buffer_unlock(overlay);
translog_buffer_lock(buffer); translog_buffer_lock(buffer);
if (buffer->file != -1 && buffer_offset == buffer->offset)
{
/*
This means that somebody else flushed the buffer while we was
waiting for overlay then for locking buffer again.
It is possible for single request for flush and destroying the
loghandler.
*/
translog_buffer_unlock(buffer);
DBUG_RETURN(0);
}
} }
file.file= buffer->file; file.file= buffer->file;
for (i= 0; i < buffer->size; i+= TRANSLOG_PAGE_SIZE) for (i= 0, pg= buffer->offset / TRANSLOG_PAGE_SIZE;
i < buffer->size;
i+= TRANSLOG_PAGE_SIZE, pg++)
{ {
TRANSLOG_ADDRESS addr= (buffer->offset + i); TRANSLOG_ADDRESS addr= (buffer->offset + i);
TRANSLOG_VALIDATOR_DATA data; TRANSLOG_VALIDATOR_DATA data;
...@@ -2104,9 +2133,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2104,9 +2133,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
DBUG_ASSERT(log_descriptor.pagecache->block_size == TRANSLOG_PAGE_SIZE); DBUG_ASSERT(log_descriptor.pagecache->block_size == TRANSLOG_PAGE_SIZE);
DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size); DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
if (pagecache_inject(log_descriptor.pagecache, if (pagecache_inject(log_descriptor.pagecache,
&file, &file, pg, 3,
(LSN_OFFSET(buffer->offset) + i) / TRANSLOG_PAGE_SIZE,
3,
buffer->buffer + i, buffer->buffer + i,
PAGECACHE_PLAIN_PAGE, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, PAGECACHE_LOCK_LEFT_UNLOCKED,
...@@ -3121,6 +3148,7 @@ my_bool translog_init(const char *directory, ...@@ -3121,6 +3148,7 @@ my_bool translog_init(const char *directory,
log_descriptor.flushed= log_descriptor.horizon; log_descriptor.flushed= log_descriptor.horizon;
log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset; log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset;
log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */ log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */
log_descriptor.previous_flush_horizon= log_descriptor.horizon;
/* /*
horizon is (potentially) address of the next LSN we need decrease horizon is (potentially) address of the next LSN we need decrease
it to signal that all LSNs before it are flushed it to signal that all LSNs before it are flushed
...@@ -6580,8 +6608,12 @@ static void translog_force_current_buffer_to_finish() ...@@ -6580,8 +6608,12 @@ static void translog_force_current_buffer_to_finish()
my_bool translog_flush(LSN lsn) my_bool translog_flush(LSN lsn)
{ {
LSN old_flushed, sent_to_disk; LSN old_flushed, sent_to_disk;
TRANSLOG_ADDRESS flush_horizon;
int rc= 0; int rc= 0;
uint i; /* We can't have more different files then buffers */
File file_handlers[TRANSLOG_BUFFERS_NO];
int current_file_handler= -1;
uint32 prev_file= 0;
my_bool full_circle= 0; my_bool full_circle= 0;
DBUG_ENTER("translog_flush"); DBUG_ENTER("translog_flush");
DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
...@@ -6590,6 +6622,7 @@ my_bool translog_flush(LSN lsn) ...@@ -6590,6 +6622,7 @@ my_bool translog_flush(LSN lsn)
translog_mutex_lock(&log_descriptor.log_flush_lock); translog_mutex_lock(&log_descriptor.log_flush_lock);
translog_lock(); translog_lock();
flush_horizon= LSN_IMPOSSIBLE;
old_flushed= log_descriptor.flushed; old_flushed= log_descriptor.flushed;
for (;;) for (;;)
{ {
...@@ -6597,8 +6630,6 @@ my_bool translog_flush(LSN lsn) ...@@ -6597,8 +6630,6 @@ my_bool translog_flush(LSN lsn)
uint16 buffer_start= buffer_no; uint16 buffer_start= buffer_no;
struct st_translog_buffer *buffer_unlock= log_descriptor.bc.buffer; struct st_translog_buffer *buffer_unlock= log_descriptor.bc.buffer;
struct st_translog_buffer *buffer= log_descriptor.bc.buffer; struct st_translog_buffer *buffer= log_descriptor.bc.buffer;
/* we can't flush in future */
DBUG_ASSERT(cmp_translog_addr(log_descriptor.horizon, lsn) >= 0);
if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0 || if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0 ||
full_circle) full_circle)
{ {
...@@ -6633,6 +6664,33 @@ my_bool translog_flush(LSN lsn) ...@@ -6633,6 +6664,33 @@ my_bool translog_flush(LSN lsn)
cmp_translog_addr(log_descriptor.flushed, lsn) < 0); cmp_translog_addr(log_descriptor.flushed, lsn) < 0);
if (buffer_unlock != NULL && buffer_unlock != buffer) if (buffer_unlock != NULL && buffer_unlock != buffer)
translog_buffer_unlock(buffer_unlock); translog_buffer_unlock(buffer_unlock);
if (prev_file != LSN_FILE_NO(buffer->offset))
{
uint cache_index;
uint32 fn= LSN_FILE_NO(buffer->offset);
prev_file= fn;
if ((cache_index= LSN_FILE_NO(log_descriptor.horizon) - fn) <
OPENED_FILES_NUM)
{
/* file in the cache */
if (log_descriptor.log_file_num[cache_index] == -1)
{
if ((log_descriptor.log_file_num[cache_index]=
open_logfile_by_number_no_cache(fn)) == -1)
{
rc= 1;
goto out;
}
}
current_file_handler++;
file_handlers[current_file_handler]=
log_descriptor.log_file_num[cache_index];
}
/* We sync file when we are closing it => do nothing if file closed */
}
DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size);
flush_horizon= buffer->offset + buffer->size;
rc= translog_buffer_flush(buffer); rc= translog_buffer_flush(buffer);
translog_buffer_unlock(buffer); translog_buffer_unlock(buffer);
if (rc) if (rc)
...@@ -6645,35 +6703,25 @@ my_bool translog_flush(LSN lsn) ...@@ -6645,35 +6703,25 @@ my_bool translog_flush(LSN lsn)
sync: sync:
translog_unlock(); translog_unlock();
for (i= (LSN_FILE_NO(old_flushed) ? LSN_FILE_NO(old_flushed) : 1);
i <= LSN_FILE_NO(lsn);
i++)
{ {
uint cache_index; File *handler= file_handlers;
File file; File *end= file_handlers + current_file_handler;
for (; handler < end; handler++)
if ((cache_index= LSN_FILE_NO(log_descriptor.horizon) - i) < rc|= my_sync(*handler, MYF(MY_WME));
OPENED_FILES_NUM)
{
/* file in the cache */
if (log_descriptor.log_file_num[cache_index] == -1)
{
if ((log_descriptor.log_file_num[cache_index]=
open_logfile_by_number_no_cache(i)) == -1)
{
rc= 1;
goto out;
}
}
file= log_descriptor.log_file_num[cache_index];
rc|= my_sync(file, MYF(MY_WME));
}
/* We sync file when we are closing it => do nothing if file closed */
} }
log_descriptor.flushed= sent_to_disk; log_descriptor.flushed= sent_to_disk;
if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS) /*
If we should flush (due to directory flush mode) and
previous flush horizon was not within one page border with this one.
*/
if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
(LSN_FILE_NO(log_descriptor.previous_flush_horizon) !=
LSN_FILE_NO(flush_horizon) ||
((LSN_OFFSET(log_descriptor.previous_flush_horizon) - 1) /
TRANSLOG_PAGE_SIZE) !=
((LSN_OFFSET(flush_horizon) - 1) / TRANSLOG_PAGE_SIZE)))
rc|= my_sync(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); rc|= my_sync(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
log_descriptor.previous_flush_horizon= flush_horizon;
out: out:
translog_mutex_unlock(&log_descriptor.log_flush_lock); translog_mutex_unlock(&log_descriptor.log_flush_lock);
DBUG_RETURN(rc); DBUG_RETURN(rc);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment