Commit d0398f72 authored by unknown's avatar unknown

Storing/getting maximum LSN of the record which parts written

in the file to the file header.


storage/maria/ma_loghandler.h:
  Getting maximum LSN of the record which parts written
  in the file to the file header.
storage/maria/unittest/Makefile.am:
  Test suite for getting max LSN added.
storage/maria/unittest/ma_test_loghandler_first_lsn-t.c:
  Spelling fixed.
  Cleanup fixed.
storage/maria/unittest/ma_test_loghandler_noflush-t.c:
  Cleanup fixed.
storage/maria/unittest/ma_test_loghandler_max_lsn-t.c:
  New BitKeeper file ``storage/maria/unittest/ma_test_loghandler_max_lsn-t.c''
parent 4cf6756e
...@@ -152,6 +152,8 @@ struct st_translog_descriptor ...@@ -152,6 +152,8 @@ struct st_translog_descriptor
TRANSLOG_ADDRESS horizon; TRANSLOG_ADDRESS horizon;
/* horizon buffer cursor */ /* horizon buffer cursor */
struct st_buffer_cursor bc; struct st_buffer_cursor bc;
/* maximum LSN of the current (not finished) file */
LSN max_lsn;
/* Last flushed LSN */ /* Last flushed LSN */
LSN flushed; LSN flushed;
...@@ -160,6 +162,16 @@ struct st_translog_descriptor ...@@ -160,6 +162,16 @@ struct st_translog_descriptor
/* All what is after this addess is not sent to disk yet */ /* All what is after this addess is not sent to disk yet */
TRANSLOG_ADDRESS in_buffers_only; TRANSLOG_ADDRESS in_buffers_only;
pthread_mutex_t sent_to_file_lock; pthread_mutex_t sent_to_file_lock;
/* Protects changing of headers of finished files (max_lsn) */
pthread_mutex_t file_header_lock;
/*
Sorted array (with protection) of files where we started writing process
and so we can't give last LSN yet
*/
pthread_mutex_t unfinished_files_lock;
DYNAMIC_ARRAY unfinished_files;
}; };
static struct st_translog_descriptor log_descriptor; static struct st_translog_descriptor log_descriptor;
...@@ -524,7 +536,7 @@ static void translog_check_cursor(struct st_buffer_cursor *cursor) ...@@ -524,7 +536,7 @@ static void translog_check_cursor(struct st_buffer_cursor *cursor)
static char *translog_filename_by_fileno(uint32 file_no, char *path) static char *translog_filename_by_fileno(uint32 file_no, char *path)
{ {
char file_name[10 + 8 + 1]; /* See my_sprintf */ char file_name[10 + 8 + 1]; /* See fallowing my_sprintf() call */
char *res; char *res;
DBUG_ENTER("translog_filename_by_fileno"); DBUG_ENTER("translog_filename_by_fileno");
DBUG_ASSERT(file_no <= 0xfffffff); DBUG_ASSERT(file_no <= 0xfffffff);
...@@ -614,12 +626,47 @@ static my_bool translog_write_file_header() ...@@ -614,12 +626,47 @@ static my_bool translog_write_file_header()
/* file number */ /* file number */
int3store(page, LSN_FILE_NO(log_descriptor.horizon)); int3store(page, LSN_FILE_NO(log_descriptor.horizon));
page+= 3; page+= 3;
/*
Here should be max lsn storing for current file (which is LSN_IPOSSIBLE):
lsn_store(page, LSN_IPOSSIBLE);
page+= LSN_STORE_SIZE;
But it is zeros so we can rely on bzero() in this case
*/
bzero(page, sizeof(page_buff) - (page- page_buff)); bzero(page, sizeof(page_buff) - (page- page_buff));
DBUG_RETURN(my_pwrite(log_descriptor.log_file_num[0], page_buff, DBUG_RETURN(my_pwrite(log_descriptor.log_file_num[0], page_buff,
sizeof(page_buff), 0, log_write_flags) != 0); sizeof(page_buff), 0, log_write_flags) != 0);
} }
/*
@brief write the new LSN on the given file header
@param file The file descriptor
@param lsn That LSN which should be written
@retval 0 OK
@retval 1 Error
*/
static my_bool translog_max_lsn_to_header(File file, LSN lsn)
{
uchar lsn_buff[LSN_STORE_SIZE];
DBUG_ENTER("translog_max_lsn_to_header");
DBUG_PRINT("enter", ("File descriptor: %ld "
"lsn: (%lu,0x%lx)",
(long) file,
(ulong) LSN_FILE_NO(lsn),(ulong) LSN_OFFSET(lsn)));
lsn_store(lsn_buff, lsn);
DBUG_RETURN(my_pwrite(file, lsn_buff,
LSN_STORE_SIZE,
(sizeof(maria_trans_file_magic) +
8 + 4 + 4 + 4 + 2 + 3),
log_write_flags) != 0 ||
my_sync(file, MYF(MY_WME)) != 0);
}
/* /*
Information from transaction log file header Information from transaction log file header
...@@ -627,6 +674,11 @@ static my_bool translog_write_file_header() ...@@ -627,6 +674,11 @@ static my_bool translog_write_file_header()
typedef struct st_loghandler_file_info typedef struct st_loghandler_file_info
{ {
/*
LSN_IPOSSIBLE for current file and max LSN which parts stored in the
file for all other (finished) files.
*/
LSN max_lsn;
ulonglong timestamp; /* Time stamp */ ulonglong timestamp; /* Time stamp */
ulong maria_version; /* Version of maria loghandler */ ulong maria_version; /* Version of maria loghandler */
ulong mysql_versiob; /* Version of mysql server */ ulong mysql_versiob; /* Version of mysql server */
...@@ -636,20 +688,25 @@ typedef struct st_loghandler_file_info ...@@ -636,20 +688,25 @@ typedef struct st_loghandler_file_info
} LOGHANDLER_FILE_INFO; } LOGHANDLER_FILE_INFO;
/* /*
@brief Read hander file information from last opened loghandler file @brief Read hander file information from loghandler file
@param desc header information descriptor to be filled with information @param desc header information descriptor to be filled with information
@param file file descriptor to read
@retval 0 OK @retval 0 OK
@retval 1 Error @retval 1 Error
*/ */
my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc) #define LOG_HEADER_DATA_SIZE (sizeof(maria_trans_file_magic) + \
8 + 4 + 4 + 4 + 2 + 3 + \
LSN_STORE_SIZE)
my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc, File file)
{ {
uchar page_buff[TRANSLOG_PAGE_SIZE], *ptr; uchar page_buff[LOG_HEADER_DATA_SIZE], *ptr;
DBUG_ENTER("translog_read_file_header"); DBUG_ENTER("translog_read_file_header");
if (my_pread(log_descriptor.log_file_num[0], page_buff, if (my_pread(file, page_buff,
sizeof(page_buff), 0, MYF(MY_FNABP | MY_WME))) sizeof(page_buff), 0, MYF(MY_FNABP | MY_WME)))
{ {
DBUG_PRINT("info", ("log read fail error: %d", my_errno)); DBUG_PRINT("info", ("log read fail error: %d", my_errno));
...@@ -663,14 +720,249 @@ my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc) ...@@ -663,14 +720,249 @@ my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc)
desc->mysql_versiob= uint4korr(ptr); desc->mysql_versiob= uint4korr(ptr);
ptr+= 4; ptr+= 4;
desc->server_id= uint4korr(ptr); desc->server_id= uint4korr(ptr);
ptr+= 2; ptr+= 4;
desc->page_size= uint2korr(ptr); desc->page_size= uint2korr(ptr);
ptr+= 2; ptr+= 2;
desc->file_number= uint3korr(ptr); desc->file_number= uint3korr(ptr);
ptr+=3;
desc->max_lsn= lsn_korr(ptr);
DBUG_RETURN(0);
}
/*
@brief set the lsn to the files from_file - to_file if it is greater
then written in the file
@param from_file first file number (min)
@param to_file last file number (max)
@param lsn the lsn for writing
@param is_locked true if current thread locked the log handler
@retval 0 OK
@retval 1 Error
*/
static my_bool translog_set_lsn_for_files(ulong from_file, ulong to_file,
LSN lsn, my_bool is_locked)
{
ulong file;
DBUG_ENTER("translog_set_lsn_for_files");
DBUG_PRINT("enter", ("From: %lu to: %lu lsn: (%lu,0x%lx) locked: %d",
from_file, to_file,
(ulong) LSN_FILE_NO(lsn), (ulong) LSN_OFFSET(lsn),
is_locked));
DBUG_ASSERT(from_file <= to_file);
DBUG_ASSERT(from_file > 0); /* we have not file 0 */
/* Checks the current file (not finished yet file) */
if (!is_locked)
translog_lock();
if (to_file == (ulong) LSN_FILE_NO(log_descriptor.horizon))
{
if (likely(cmp_translog_addr(lsn, log_descriptor.max_lsn) > 0))
log_descriptor.max_lsn= lsn;
to_file--;
}
if (!is_locked)
translog_unlock();
/* Checks finished files if they are */
pthread_mutex_lock(&log_descriptor.file_header_lock);
for (file= from_file; file <= to_file; file++)
{
LOGHANDLER_FILE_INFO info;
File fd= open_logfile_by_number_no_cache(file);
if (fd < 0 ||
translog_read_file_header(&info, fd) ||
(cmp_translog_addr(lsn, info.max_lsn) > 0 &&
translog_max_lsn_to_header(fd, lsn)))
DBUG_RETURN(1);
}
pthread_mutex_unlock(&log_descriptor.file_header_lock);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/* descriptor of file in unfinished_files */
struct st_file_counter
{
ulong file; /* file number */
ulong counter; /* counter for started writes */
};
/*
@brief mark file "in progress" (for multi-group records)
@param file log file number
*/
static void translog_mark_file_unfinished(ulong file)
{
int place, i;
struct st_file_counter fc, *fc_ptr;
fc.file= file; fc.counter= 1;
DBUG_ENTER("translog_mark_file_unfinished");
DBUG_PRINT("enter", ("file: %lu", file));
pthread_mutex_lock(&log_descriptor.unfinished_files_lock);
if (log_descriptor.unfinished_files.elements == 0)
{
insert_dynamic(&log_descriptor.unfinished_files, (uchar*) &fc);
DBUG_PRINT("info", ("The first element inserted"));
goto end;
}
for (place= log_descriptor.unfinished_files.elements;
place >= 0;
place--)
{
fc_ptr= dynamic_element(&log_descriptor.unfinished_files,
place, struct st_file_counter *);
if (fc_ptr->file <= file)
break;
}
if (place >= 0 && fc_ptr->file == file)
{
fc_ptr->counter++;
DBUG_PRINT("info", ("counter increased"));
goto end;
}
if (place == (int)log_descriptor.unfinished_files.elements)
{
insert_dynamic(&log_descriptor.unfinished_files, (uchar*) &fc);
DBUG_PRINT("info", ("The last element inserted"));
goto end;
}
/* shift and assign new element */
insert_dynamic(&log_descriptor.unfinished_files,
(uchar*)
dynamic_element(&log_descriptor.unfinished_files,
log_descriptor.unfinished_files.elements- 1,
struct st_file_counter *));
for(i= log_descriptor.unfinished_files.elements - 1; i > place; i--)
{
/* we do not use set_dynamic() to avoid unneeded checks */
memcpy(dynamic_element(&log_descriptor.unfinished_files,
i, struct st_file_counter *),
dynamic_element(&log_descriptor.unfinished_files,
i + 1, struct st_file_counter *),
sizeof(struct st_file_counter));
}
memcpy(dynamic_element(&log_descriptor.unfinished_files,
place + 1, struct st_file_counter *),
&fc, sizeof(struct st_file_counter));
end:
pthread_mutex_unlock(&log_descriptor.unfinished_files_lock);
DBUG_VOID_RETURN;
}
/*
@brief remove file mark "in progress" (for multi-group records)
@param file log file number
*/
static void translog_mark_file_finished(ulong file)
{
int i;
struct st_file_counter *fc_ptr;
DBUG_ENTER("translog_mark_file_finished");
DBUG_PRINT("enter", ("file: %lu", file));
pthread_mutex_lock(&log_descriptor.unfinished_files_lock);
DBUG_ASSERT(log_descriptor.unfinished_files.elements > 0);
for (i= 0;
i < (int) log_descriptor.unfinished_files.elements;
i++)
{
fc_ptr= dynamic_element(&log_descriptor.unfinished_files,
i, struct st_file_counter *);
if (fc_ptr->file == file)
{
break;
}
}
DBUG_ASSERT(i < (int) log_descriptor.unfinished_files.elements);
if (! --fc_ptr->counter)
delete_dynamic_element(&log_descriptor.unfinished_files, i);
pthread_mutex_unlock(&log_descriptor.unfinished_files_lock);
DBUG_VOID_RETURN;
}
/*
@brief get max LSN of the record which parts stored in this file
@param file file number
@return requested LSN or LSN_IMPOSSIBLE/LSN_ERROR
@retval LSN_IMPOSSIBLE File is still not finished
@retval LSN_ERROR Error opening file
@retval # LSN of the record which parts stored in this file
*/
LSN translog_get_file_max_lsn_stored(ulong file)
{
ulong limit= FILENO_IMPOSSIBLE;
DBUG_ENTER("translog_get_file_max_lsn_stored");
DBUG_PRINT("enter", ("file: %lu", file));
pthread_mutex_lock(&log_descriptor.unfinished_files_lock);
/* find file with minimum file number "in progress" */
if (log_descriptor.unfinished_files.elements > 0)
{
struct st_file_counter *fc_ptr;
fc_ptr= dynamic_element(&log_descriptor.unfinished_files,
0, struct st_file_counter *);
limit= fc_ptr->file; /* minimal file number "in progress" */
}
pthread_mutex_unlock(&log_descriptor.unfinished_files_lock);
/*
if there is no "in progress file" then unfinished file is in progress
for sure
*/
if (limit == FILENO_IMPOSSIBLE)
{
TRANSLOG_ADDRESS horizon= translog_get_horizon();
limit= LSN_FILE_NO(horizon);
}
if (file >= limit)
{
DBUG_PRINT("info", ("The file in in progress"));
DBUG_RETURN(LSN_IMPOSSIBLE);
}
{
LOGHANDLER_FILE_INFO info;
File fd= open_logfile_by_number_no_cache(file);
if (fd < 0 ||
translog_read_file_header(&info, fd))
{
DBUG_PRINT("error", ("Can't read file header"));
DBUG_RETURN(LSN_ERROR);
}
DBUG_PRINT("error", ("Max lsn: (%lu,0x%lx)",
(ulong) LSN_FILE_NO(info.max_lsn),
(ulong) LSN_OFFSET(info.max_lsn)));
DBUG_RETURN(info.max_lsn);
}
}
/* /*
Initialize transaction log file buffer Initialize transaction log file buffer
...@@ -752,6 +1044,14 @@ static my_bool translog_create_new_file() ...@@ -752,6 +1044,14 @@ static my_bool translog_create_new_file()
uint32 file_no= LSN_FILE_NO(log_descriptor.horizon); uint32 file_no= LSN_FILE_NO(log_descriptor.horizon);
DBUG_ENTER("translog_create_new_file"); DBUG_ENTER("translog_create_new_file");
/*
Writes max_lsn to the file header before finishing it (it is no need to
lock file header buffer because it is still unfinished file)
*/
translog_max_lsn_to_header(log_descriptor.log_file_num[0],
log_descriptor.max_lsn);
log_descriptor.max_lsn= LSN_IMPOSSIBLE;
if (log_descriptor.log_file_num[OPENED_FILES_NUM - 1] != -1 && if (log_descriptor.log_file_num[OPENED_FILES_NUM - 1] != -1 &&
translog_close_log_file(log_descriptor.log_file_num[OPENED_FILES_NUM - translog_close_log_file(log_descriptor.log_file_num[OPENED_FILES_NUM -
1])) 1]))
...@@ -1255,6 +1555,7 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon, ...@@ -1255,6 +1555,7 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
#endif #endif
if (new_file) if (new_file)
{ {
/* move the horizon to the next file and its header page */ /* move the horizon to the next file and its header page */
(*horizon)+= LSN_ONE_FILE; (*horizon)+= LSN_ONE_FILE;
(*horizon)= LSN_REPLACE_OFFSET(*horizon, TRANSLOG_PAGE_SIZE); (*horizon)= LSN_REPLACE_OFFSET(*horizon, TRANSLOG_PAGE_SIZE);
...@@ -2234,7 +2535,14 @@ my_bool translog_init(const char *directory, ...@@ -2234,7 +2535,14 @@ my_bool translog_init(const char *directory,
loghandler_init(); /* Safe to do many times */ loghandler_init(); /* Safe to do many times */
if (pthread_mutex_init(&log_descriptor.sent_to_file_lock, if (pthread_mutex_init(&log_descriptor.sent_to_file_lock,
MY_MUTEX_INIT_FAST)) MY_MUTEX_INIT_FAST) ||
pthread_mutex_init(&log_descriptor.file_header_lock,
MY_MUTEX_INIT_FAST) ||
pthread_mutex_init(&log_descriptor.unfinished_files_lock,
MY_MUTEX_INIT_FAST) ||
init_dynamic_array(&log_descriptor.unfinished_files,
sizeof(struct st_file_counter),
10, 10 CALLER_INFO))
DBUG_RETURN(1); DBUG_RETURN(1);
/* Directory to store files */ /* Directory to store files */
...@@ -2476,7 +2784,7 @@ my_bool translog_init(const char *directory, ...@@ -2476,7 +2784,7 @@ my_bool translog_init(const char *directory,
if (!old_log_was_recovered && old_flags == flags) if (!old_log_was_recovered && old_flags == flags)
{ {
LOGHANDLER_FILE_INFO info; LOGHANDLER_FILE_INFO info;
if (translog_read_file_header(&info)) if (translog_read_file_header(&info, log_descriptor.log_file_num[0]))
DBUG_RETURN(1); DBUG_RETURN(1);
version_changed= (info.maria_version != TRANSLOG_VERSION_ID); version_changed= (info.maria_version != TRANSLOG_VERSION_ID);
} }
...@@ -2521,6 +2829,7 @@ my_bool translog_init(const char *directory, ...@@ -2521,6 +2829,7 @@ my_bool translog_init(const char *directory,
log_descriptor.sent_to_file= log_descriptor.sent_to_file=
log_descriptor.flushed= log_descriptor.horizon; log_descriptor.flushed= log_descriptor.horizon;
log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset; log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset;
log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */
/* /*
horizon is (potentially) address of the next LSN we need decrease horizon is (potentially) address of the next LSN we need decrease
it to signal that all LSNs before it are flushed it to signal that all LSNs before it are flushed
...@@ -2589,7 +2898,7 @@ void translog_destroy() ...@@ -2589,7 +2898,7 @@ void translog_destroy()
{ {
uint i; uint i;
DBUG_ENTER("translog_destroy"); DBUG_ENTER("translog_destroy");
if (translog_inited) if (translog_inited)
{ {
if (log_descriptor.bc.buffer->file != -1) if (log_descriptor.bc.buffer->file != -1)
...@@ -2608,6 +2917,10 @@ void translog_destroy() ...@@ -2608,6 +2917,10 @@ void translog_destroy()
translog_close_log_file(log_descriptor.log_file_num[i]); translog_close_log_file(log_descriptor.log_file_num[i]);
} }
pthread_mutex_destroy(&log_descriptor.sent_to_file_lock); pthread_mutex_destroy(&log_descriptor.sent_to_file_lock);
pthread_mutex_destroy(&log_descriptor.file_header_lock);
pthread_mutex_destroy(&log_descriptor.unfinished_files_lock);
delete_dynamic(&log_descriptor.unfinished_files);
my_close(log_descriptor.directory_fd, MYF(MY_WME)); my_close(log_descriptor.directory_fd, MYF(MY_WME));
my_atomic_rwlock_destroy(&LOCK_id_to_share); my_atomic_rwlock_destroy(&LOCK_id_to_share);
my_free((uchar*)(id_to_share + 1), MYF(MY_ALLOW_ZERO_PTR)); my_free((uchar*)(id_to_share + 1), MYF(MY_ALLOW_ZERO_PTR));
...@@ -3263,9 +3576,11 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -3263,9 +3576,11 @@ translog_write_variable_record_1group(LSN *lsn,
DBUG_ENTER("translog_write_variable_record_1group"); DBUG_ENTER("translog_write_variable_record_1group");
*lsn= horizon= log_descriptor.horizon; *lsn= horizon= log_descriptor.horizon;
if (log_record_type_descriptor[type].inwrite_hook && if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn),
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info, *lsn, TRUE) ||
lsn, parts)) (log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info,
lsn, parts)))
{ {
translog_unlock(); translog_unlock();
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -3417,9 +3732,11 @@ translog_write_variable_record_1chunk(LSN *lsn, ...@@ -3417,9 +3732,11 @@ translog_write_variable_record_1chunk(LSN *lsn,
header_length, chunk0_header); header_length, chunk0_header);
*lsn= log_descriptor.horizon; *lsn= log_descriptor.horizon;
if (log_record_type_descriptor[type].inwrite_hook && if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn),
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info, *lsn, TRUE) ||
lsn, parts)) (log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info,
lsn, parts)))
{ {
translog_unlock(); translog_unlock();
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -3797,6 +4114,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -3797,6 +4114,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
uchar chunk2_header[1]; uchar chunk2_header[1];
uint header_fixed_part= header_length + 2; uint header_fixed_part= header_length + 2;
uint groups_per_page= (page_capacity - header_fixed_part) / (7 + 1); uint groups_per_page= (page_capacity - header_fixed_part) / (7 + 1);
uint file_of_the_first_group;
DBUG_ENTER("translog_write_variable_record_mgroup"); DBUG_ENTER("translog_write_variable_record_mgroup");
chunk2_header[0]= TRANSLOG_CHUNK_NOHDR; chunk2_header[0]= TRANSLOG_CHUNK_NOHDR;
...@@ -3820,6 +4138,8 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -3820,6 +4138,8 @@ translog_write_variable_record_mgroup(LSN *lsn,
DBUG_ASSERT(record_rest >= buffer_rest); DBUG_ASSERT(record_rest >= buffer_rest);
} }
file_of_the_first_group= LSN_FILE_NO(log_descriptor.horizon);
translog_mark_file_unfinished(file_of_the_first_group);
do do
{ {
group.addr= horizon= log_descriptor.horizon; group.addr= horizon= log_descriptor.horizon;
...@@ -4171,6 +4491,12 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -4171,6 +4491,12 @@ translog_write_variable_record_mgroup(LSN *lsn,
translog_buffer_decrease_writers(cursor.buffer); translog_buffer_decrease_writers(cursor.buffer);
rc|= translog_buffer_unlock(cursor.buffer); rc|= translog_buffer_unlock(cursor.buffer);
if (translog_set_lsn_for_files(file_of_the_first_group, LSN_FILE_NO(*lsn),
*lsn, FALSE))
goto err;
translog_mark_file_finished(file_of_the_first_group);
delete_dynamic(&groups); delete_dynamic(&groups);
DBUG_RETURN(rc); DBUG_RETURN(rc);
...@@ -4378,9 +4704,11 @@ static my_bool translog_write_fixed_record(LSN *lsn, ...@@ -4378,9 +4704,11 @@ static my_bool translog_write_fixed_record(LSN *lsn,
} }
*lsn= log_descriptor.horizon; *lsn= log_descriptor.horizon;
if (log_record_type_descriptor[type].inwrite_hook && if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn),
(*log_record_type_descriptor[type].inwrite_hook) (type, trn, tbl_info, *lsn, TRUE) ||
lsn, parts)) (log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook) (type, trn, tbl_info,
lsn, parts)))
{ {
rc= 1; rc= 1;
goto err; goto err;
...@@ -4954,7 +5282,7 @@ translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner) ...@@ -4954,7 +5282,7 @@ translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner)
/** /**
@brief Get header of variable length record and call hook for it processing @brief Get header of variable length record and call hook for it processing
@param page Pointer to the buffer with page where LSN chunk is @param page Pointer to the buffer with page where LSN chunk is
placed placed
@param page_offset Offset of the first chunk in the page @param page_offset Offset of the first chunk in the page
...@@ -5180,10 +5508,10 @@ int translog_read_record_header_from_buffer(uchar *page, ...@@ -5180,10 +5508,10 @@ int translog_read_record_header_from_buffer(uchar *page,
/** /**
@brief Read record header and some fixed part of a record (the part depend @brief Read record header and some fixed part of a record (the part depend
on record type). on record type).
@param lsn log record serial number (address of the record) @param lsn log record serial number (address of the record)
@param buff log record header buffer @param buff log record header buffer
@note Some type of record can be read completely by this call @note Some type of record can be read completely by this call
@note "Decoded" header stored in TRANSLOG_HEADER_BUFFER::header (relative @note "Decoded" header stored in TRANSLOG_HEADER_BUFFER::header (relative
LSN can be translated to absolute one), some fields can be added (like LSN can be translated to absolute one), some fields can be added (like
...@@ -5222,11 +5550,11 @@ int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff) ...@@ -5222,11 +5550,11 @@ int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff)
/** /**
@brief Read record header and some fixed part of a record (the part depend @brief Read record header and some fixed part of a record (the part depend
on record type). on record type).
@param scan scanner position to read @param scan scanner position to read
@param buff log record header buffer @param buff log record header buffer
@param move_scanner request to move scanner to the header position @param move_scanner request to move scanner to the header position
@note Some type of record can be read completely by this call @note Some type of record can be read completely by this call
@note "Decoded" header stored in TRANSLOG_HEADER_BUFFER::header (relative @note "Decoded" header stored in TRANSLOG_HEADER_BUFFER::header (relative
LSN can be translated to absolute one), some fields can be added (like LSN can be translated to absolute one), some fields can be added (like
...@@ -6013,7 +6341,7 @@ LSN translog_first_lsn_in_log() ...@@ -6013,7 +6341,7 @@ LSN translog_first_lsn_in_log()
TRANSLOG_SCANNER_DATA scanner; TRANSLOG_SCANNER_DATA scanner;
DBUG_ENTER("translog_first_lsn_in_log"); DBUG_ENTER("translog_first_lsn_in_log");
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)",
LSN_FILE_NO(addr), LSN_OFFSET(addr))); (ulong) LSN_FILE_NO(addr), (ulong) LSN_OFFSET(addr)));
if (addr == MAKE_LSN(1, TRANSLOG_PAGE_SIZE)) if (addr == MAKE_LSN(1, TRANSLOG_PAGE_SIZE))
{ {
...@@ -6081,7 +6409,7 @@ LSN translog_first_theoretical_lsn() ...@@ -6081,7 +6409,7 @@ LSN translog_first_theoretical_lsn()
TRANSLOG_VALIDATOR_DATA data; TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_first_theoretical_lsn"); DBUG_ENTER("translog_first_theoretical_lsn");
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)",
LSN_FILE_NO(addr), LSN_OFFSET(addr))); (ulong) LSN_FILE_NO(addr), (ulong) LSN_OFFSET(addr)));
if (!translog_is_file(1)) if (!translog_is_file(1))
DBUG_RETURN(LSN_IMPOSSIBLE); DBUG_RETURN(LSN_IMPOSSIBLE);
......
...@@ -250,6 +250,7 @@ extern my_bool translog_init_scanner(LSN lsn, ...@@ -250,6 +250,7 @@ extern my_bool translog_init_scanner(LSN lsn,
extern int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner, extern int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner,
TRANSLOG_HEADER_BUFFER *buff); TRANSLOG_HEADER_BUFFER *buff);
extern LSN translog_get_file_max_lsn_stored(ulong file);
extern my_bool translog_lock(); extern my_bool translog_lock();
extern my_bool translog_unlock(); extern my_bool translog_unlock();
extern void translog_lock_assert_owner(); extern void translog_lock_assert_owner();
......
...@@ -44,7 +44,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \ ...@@ -44,7 +44,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \
ma_test_loghandler_pagecache-t \ ma_test_loghandler_pagecache-t \
ma_test_loghandler_long-t-big \ ma_test_loghandler_long-t-big \
ma_test_loghandler_noflush-t \ ma_test_loghandler_noflush-t \
ma_test_loghandler_first_lsn-t ma_test_loghandler_first_lsn-t \
ma_test_loghandler_max_lsn-t
ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c
ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c
...@@ -54,6 +55,7 @@ ma_test_loghandler_long_t_big_SOURCES = ma_test_loghandler-t.c ma_maria_log_clea ...@@ -54,6 +55,7 @@ ma_test_loghandler_long_t_big_SOURCES = ma_test_loghandler-t.c ma_maria_log_clea
ma_test_loghandler_long_t_big_CPPFLAGS = -DLONG_LOG_TEST ma_test_loghandler_long_t_big_CPPFLAGS = -DLONG_LOG_TEST
ma_test_loghandler_noflush_t_SOURCES = ma_test_loghandler_noflush-t.c ma_maria_log_cleanup.c ma_test_loghandler_noflush_t_SOURCES = ma_test_loghandler_noflush-t.c ma_maria_log_cleanup.c
ma_test_loghandler_first_lsn_t_SOURCES = ma_test_loghandler_first_lsn-t.c ma_maria_log_cleanup.c ma_test_loghandler_first_lsn_t_SOURCES = ma_test_loghandler_first_lsn-t.c ma_maria_log_cleanup.c
ma_test_loghandler_max_lsn_t_SOURCES = ma_test_loghandler_max_lsn-t.c ma_maria_log_cleanup.c
ma_pagecache_single_src = ma_pagecache_single.c test_file.c ma_pagecache_single_src = ma_pagecache_single.c test_file.c
ma_pagecache_consist_src = ma_pagecache_consist.c test_file.c ma_pagecache_consist_src = ma_pagecache_consist.c test_file.c
......
...@@ -89,7 +89,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -89,7 +89,7 @@ int main(int argc __attribute__((unused)), char *argv[])
first_lsn= translog_first_lsn_in_log(); first_lsn= translog_first_lsn_in_log();
if (first_lsn != LSN_IMPOSSIBLE) if (first_lsn != LSN_IMPOSSIBLE)
{ {
fprintf(stderr, "Incorrect first lsn responce (%lu,0x%lx).", fprintf(stderr, "Incorrect first lsn response (%lu,0x%lx).",
(ulong) LSN_FILE_NO(first_lsn), (ulong) LSN_FILE_NO(first_lsn),
(ulong) LSN_OFFSET(first_lsn)); (ulong) LSN_OFFSET(first_lsn));
translog_destroy(); translog_destroy();
...@@ -143,8 +143,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -143,8 +143,7 @@ int main(int argc __attribute__((unused)), char *argv[])
translog_destroy(); translog_destroy();
end_pagecache(&pagecache, 1); end_pagecache(&pagecache, 1);
ma_control_file_end(); ma_control_file_end();
my_delete(CONTROL_FILE_BASE_NAME, MYF(0)); if (maria_log_remove())
my_delete(first_translog_file, MYF(0)); exit(1);
exit(0); exit(0);
} }
#include "../maria_def.h"
#include <stdio.h>
#include <errno.h>
#include <tap.h>
#include "../trnman.h"
extern my_bool maria_log_remove();
#ifndef DBUG_OFF
static const char *default_dbug_option;
#endif
#define PCACHE_SIZE (1024*1024*10)
#define PCACHE_PAGE TRANSLOG_PAGE_SIZE
#define LOG_FILE_SIZE (4*1024L*1024L)
#define LOG_FLAGS 0
int main(int argc __attribute__((unused)), char *argv[])
{
ulong i;
uint pagen;
uchar long_tr_id[6];
PAGECACHE pagecache;
LSN lsn, max_lsn, last_lsn= LSN_IMPOSSIBLE;
MY_STAT st;
LEX_STRING parts[TRANSLOG_INTERNAL_PARTS + 1];
MY_INIT(argv[0]);
plan(2);
bzero(&pagecache, sizeof(pagecache));
maria_data_root= ".";
if (maria_log_remove())
exit(1);
bzero(long_tr_id, 6);
#ifndef DBUG_OFF
#if defined(__WIN__)
default_dbug_option= "d:t:i:O,\\ma_test_loghandler.trace";
#else
default_dbug_option= "d:t:i:o,/tmp/ma_test_loghandler.trace";
#endif
if (argc > 1)
{
DBUG_SET(default_dbug_option);
DBUG_SET_INITIAL(default_dbug_option);
}
#endif
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1);
}
if ((pagen= init_pagecache(&pagecache, PCACHE_SIZE, 0, 0,
PCACHE_PAGE)) == 0)
{
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1);
}
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
max_lsn= translog_get_file_max_lsn_stored(1);
if (max_lsn == 1)
{
fprintf(stderr, "Error reading the first log file.");
translog_destroy();
exit(1);
}
if (max_lsn != LSN_IMPOSSIBLE)
{
fprintf(stderr, "Incorrect first lsn response (%lu,0x%lx).",
(ulong) LSN_FILE_NO(max_lsn),
(ulong) LSN_OFFSET(max_lsn));
translog_destroy();
exit(1);
}
ok(1, "Empty log response");
/* write more then 1 file */
int4store(long_tr_id, 0);
parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id;
parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6;
for(i= 0; i < LOG_FILE_SIZE/6; i++)
{
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
exit(1);
}
if (LSN_FILE_NO(lsn) == 1)
last_lsn= lsn;
}
max_lsn= translog_get_file_max_lsn_stored(1);
if (max_lsn == 1)
{
fprintf(stderr, "Error reading the first log file\n");
translog_destroy();
exit(1);
}
if (max_lsn == LSN_IMPOSSIBLE)
{
fprintf(stderr, "Isn't first file still finished?!!\n");
translog_destroy();
exit(1);
}
if (max_lsn != last_lsn)
{
fprintf(stderr, "Incorrect max lsn: (%lu,0x%lx) "
" last lsn on first file: (%lu,0x%lx)\n",
(ulong) LSN_FILE_NO(max_lsn),
(ulong) LSN_OFFSET(max_lsn),
(ulong) LSN_FILE_NO(last_lsn),
(ulong) LSN_OFFSET(last_lsn));
translog_destroy();
exit(1);
}
ok(1, "First file max LSN");
translog_destroy();
end_pagecache(&pagecache, 1);
ma_control_file_end();
if (maria_log_remove())
exit(1);
exit(0);
}
...@@ -125,8 +125,8 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -125,8 +125,8 @@ int main(int argc __attribute__((unused)), char *argv[])
translog_destroy(); translog_destroy();
end_pagecache(&pagecache, 1); end_pagecache(&pagecache, 1);
ma_control_file_end(); ma_control_file_end();
my_delete(CONTROL_FILE_BASE_NAME, MYF(0)); if (maria_log_remove())
my_delete(first_translog_file, MYF(0)); exit(1);
exit(rc); exit(rc);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment