Commit d225521c authored by unknown's avatar unknown

Transaction log behaviour in case of write

  error fixed (switching to the read only mode).
Added read only mode of transactions log handler.


storage/maria/ha_maria.cc:
  Transaction log initialization parameters change.
storage/maria/ma_check.c:
  New status variable of transactional log.
storage/maria/ma_create.c:
  New status variable of transactional log.
storage/maria/ma_loghandler.c:
  - New status variable added.
  - Checking the status variable in the loghandler
  interface functions added
  - All fails of loghandler functions revised.
  - UNRECOVERABLE_ERROR() removed.
  - Switching to read only mode added where it nead.
  - Checking of log state added before writes log content
    and changing status variables like sent_to_file, n_buffers_only, flushed.
  - Readonly loghandler initialization added.
  - Fixed problem with example table transactional log initialization.
storage/maria/ma_loghandler.h:
  Readonly loghandler initialization added.
  Fixed problem with example table transactional log initialization.
  New status variable added.
storage/maria/ma_open.c:
  New status variable of transactional log.
storage/maria/ma_test1.c:
  Transaction log initialization parameters change.
storage/maria/ma_test2.c:
  Transaction log initialization parameters change.
storage/maria/maria_read_log.c:
  Transaction loghandler initialization in read only mode in
  case of only dysplay parameter.
storage/maria/unittest/Makefile.am:
  Test of readonly mode added.
storage/maria/unittest/ma_test_loghandler-t.c:
  Transaction log initialization parameters change.
  Fixed problem with example table transactional log initialization.
  Fixed incorrect fprintf call parameters.
storage/maria/unittest/ma_test_loghandler_first_lsn-t.c:
  Transaction log initialization parameters change.
  Fixed problem with example table transactional log initialization.
storage/maria/unittest/ma_test_loghandler_max_lsn-t.c:
  Transaction log initialization parameters change.
  Fixed problem with example table transactional log initialization.
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
  Useing this test also as read only loghandler test.
  Transaction log initialization parameters change.
  Fixed problem with example table transactional log initialization.
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
  Transaction log initialization parameters change.
  Fixed problem with example table transactional log initialization.
storage/maria/unittest/ma_test_loghandler_noflush-t.c:
  Transaction log initialization parameters change.
  Fixed problem with example table transactional log initialization.
storage/maria/unittest/ma_test_loghandler_pagecache-t.c:
  Transaction log initialization parameters change.
  Fixed problem with example table transactional log initialization.
storage/maria/unittest/ma_test_loghandler_purge-t.c:
  Transaction log initialization parameters change.
  Fixed problem with example table transactional log initialization.
parent ce2fbd9e
...@@ -2450,7 +2450,7 @@ static int ha_maria_init(void *p) ...@@ -2450,7 +2450,7 @@ static int ha_maria_init(void *p)
TRANSLOG_PAGE_SIZE) == 0) || TRANSLOG_PAGE_SIZE) == 0) ||
translog_init(maria_data_root, TRANSLOG_FILE_SIZE, translog_init(maria_data_root, TRANSLOG_FILE_SIZE,
MYSQL_VERSION_ID, server_id, maria_log_pagecache, MYSQL_VERSION_ID, server_id, maria_log_pagecache,
TRANSLOG_DEFAULT_FLAGS) || TRANSLOG_DEFAULT_FLAGS, 0) ||
maria_recover() || maria_recover() ||
ma_checkpoint_init(checkpoint_interval); ma_checkpoint_init(checkpoint_interval);
maria_multi_threaded= TRUE; maria_multi_threaded= TRUE;
......
...@@ -5583,7 +5583,7 @@ static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info) ...@@ -5583,7 +5583,7 @@ static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
{ {
MARIA_SHARE *share= info->s; MARIA_SHARE *share= info->s;
/* in case this is maria_chk or recovery... */ /* in case this is maria_chk or recovery... */
if (translog_inited && !maria_in_recovery) if (translog_status == TRANSLOG_OK && !maria_in_recovery)
{ {
/* /*
For now this record is only informative. It could serve when applying For now this record is only informative. It could serve when applying
......
...@@ -692,7 +692,8 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -692,7 +692,8 @@ int maria_create(const char *name, enum data_file_type datafile_type,
/* max_data_file_length and max_key_file_length are recalculated on open */ /* max_data_file_length and max_key_file_length are recalculated on open */
if (tmp_table) if (tmp_table)
share.base.max_data_file_length= (my_off_t) ci->data_file_length; share.base.max_data_file_length= (my_off_t) ci->data_file_length;
else if (ci->transactional && translog_inited && !maria_in_recovery) else if (ci->transactional && translog_status == TRANSLOG_OK &&
!maria_in_recovery)
{ {
/* /*
we have checked translog_inited above, because maria_chk may call us we have checked translog_inited above, because maria_chk may call us
......
...@@ -48,15 +48,6 @@ ...@@ -48,15 +48,6 @@
#define TRANSLOG_PAGE_FLAGS 6 /* transaction log page flags offset */ #define TRANSLOG_PAGE_FLAGS 6 /* transaction log page flags offset */
/* QQ: For temporary debugging */
#define UNRECOVERABLE_ERROR(E) \
do { \
DBUG_PRINT("error", E); \
printf E; \
putchar('\n'); \
DBUG_ASSERT(0); \
} while(0);
/* Maximum length of compressed LSNs (the worst case of whole LSN storing) */ /* Maximum length of compressed LSNs (the worst case of whole LSN storing) */
#define COMPRESSED_LSN_MAX_STORE_SIZE (2 + LSN_STORE_SIZE) #define COMPRESSED_LSN_MAX_STORE_SIZE (2 + LSN_STORE_SIZE)
#define MAX_NUMBER_OF_LSNS_PER_RECORD 2 #define MAX_NUMBER_OF_LSNS_PER_RECORD 2
...@@ -147,6 +138,8 @@ struct st_translog_descriptor ...@@ -147,6 +138,8 @@ struct st_translog_descriptor
PAGECACHE *pagecache; PAGECACHE *pagecache;
/* Flags */ /* Flags */
uint flags; uint flags;
/* File open flags */
uint open_flags;
/* max size of one log size (for new logs creation) */ /* max size of one log size (for new logs creation) */
uint32 log_file_max_size; uint32 log_file_max_size;
/* server version */ /* server version */
...@@ -213,7 +206,7 @@ static struct st_translog_descriptor log_descriptor; ...@@ -213,7 +206,7 @@ static struct st_translog_descriptor log_descriptor;
static uchar end_of_log= 0; static uchar end_of_log= 0;
#define END_OF_LOG &end_of_log #define END_OF_LOG &end_of_log
my_bool translog_inited= 0; enum enum_translog_status translog_status= TRANSLOG_UNINITED;
/* chunk types */ /* chunk types */
#define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */ #define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */
...@@ -360,7 +353,7 @@ static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_2LSN_EXAMPLE= ...@@ -360,7 +353,7 @@ static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_2LSN_EXAMPLE=
"variable2example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; "variable2example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
void example_loghandler_init() void translog_example_table_init()
{ {
int i; int i;
log_record_type_descriptor[LOGREC_FIXED_RECORD_0LSN_EXAMPLE]= log_record_type_descriptor[LOGREC_FIXED_RECORD_0LSN_EXAMPLE]=
...@@ -543,7 +536,7 @@ static LOG_DESC INIT_LOGREC_INCOMPLETE_LOG= ...@@ -543,7 +536,7 @@ static LOG_DESC INIT_LOGREC_INCOMPLETE_LOG=
const myf log_write_flags= MY_WME | MY_NABP | MY_WAIT_IF_FULL; const myf log_write_flags= MY_WME | MY_NABP | MY_WAIT_IF_FULL;
static void loghandler_init() void translog_table_init()
{ {
int i; int i;
log_record_type_descriptor[LOGREC_RESERVED_FOR_CHUNKS23]= log_record_type_descriptor[LOGREC_RESERVED_FOR_CHUNKS23]=
...@@ -653,6 +646,18 @@ static void translog_check_cursor(struct st_buffer_cursor *cursor) ...@@ -653,6 +646,18 @@ static void translog_check_cursor(struct st_buffer_cursor *cursor)
} }
#endif #endif
/**
@brief switch the loghandler in read only mode in case of write error
*/
void translog_stop_writing()
{
translog_status= TRANSLOG_READONLY;
log_descriptor.open_flags= O_BINARY | O_RDONLY;
}
/* /*
Get file name of the log by log number Get file name of the log by log number
...@@ -703,10 +708,10 @@ static File open_logfile_by_number_no_cache(uint32 file_no) ...@@ -703,10 +708,10 @@ static File open_logfile_by_number_no_cache(uint32 file_no)
/* TODO: add O_DIRECT to open flags (when buffer is aligned) */ /* TODO: add O_DIRECT to open flags (when buffer is aligned) */
/* TODO: use my_create() */ /* TODO: use my_create() */
if ((file= my_open(translog_filename_by_fileno(file_no, path), if ((file= my_open(translog_filename_by_fileno(file_no, path),
O_CREAT | O_BINARY | O_RDWR, log_descriptor.open_flags,
MYF(MY_WME))) < 0) MYF(MY_WME))) < 0)
{ {
UNRECOVERABLE_ERROR(("Error %d during opening file '%s'", errno, path)); DBUG_PRINT("error", ("Error %d during opening file '%s'", errno, path));
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
DBUG_PRINT("info", ("File: '%s' handler: %d", path, file)); DBUG_PRINT("info", ("File: '%s' handler: %d", path, file));
...@@ -907,8 +912,11 @@ static my_bool translog_set_lsn_for_files(uint32 from_file, uint32 to_file, ...@@ -907,8 +912,11 @@ static my_bool translog_set_lsn_for_files(uint32 from_file, uint32 to_file,
translog_read_file_header(&info, fd) || translog_read_file_header(&info, fd) ||
(cmp_translog_addr(lsn, info.max_lsn) > 0 && (cmp_translog_addr(lsn, info.max_lsn) > 0 &&
translog_max_lsn_to_header(fd, lsn))) translog_max_lsn_to_header(fd, lsn)))
{
translog_stop_writing();
DBUG_RETURN(1); DBUG_RETURN(1);
} }
}
translog_mutex_unlock(&log_descriptor.file_header_lock); translog_mutex_unlock(&log_descriptor.file_header_lock);
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -1049,7 +1057,8 @@ LSN translog_get_file_max_lsn_stored(uint32 file) ...@@ -1049,7 +1057,8 @@ LSN translog_get_file_max_lsn_stored(uint32 file)
uint32 limit= FILENO_IMPOSSIBLE; uint32 limit= FILENO_IMPOSSIBLE;
DBUG_ENTER("translog_get_file_max_lsn_stored"); DBUG_ENTER("translog_get_file_max_lsn_stored");
DBUG_PRINT("enter", ("file: %lu", (ulong)file)); DBUG_PRINT("enter", ("file: %lu", (ulong)file));
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
translog_mutex_lock(&log_descriptor.unfinished_files_lock); translog_mutex_lock(&log_descriptor.unfinished_files_lock);
...@@ -1179,25 +1188,44 @@ static my_bool translog_create_new_file() ...@@ -1179,25 +1188,44 @@ static my_bool translog_create_new_file()
Writes max_lsn to the file header before finishing it (it is no need to Writes max_lsn to the file header before finishing it (it is no need to
lock file header buffer because it is still unfinished file) lock file header buffer because it is still unfinished file)
*/ */
translog_max_lsn_to_header(log_descriptor.log_file_num[0], if (translog_max_lsn_to_header(log_descriptor.log_file_num[0],
log_descriptor.max_lsn); log_descriptor.max_lsn))
{
translog_stop_writing();
DBUG_RETURN(1);
}
log_descriptor.max_lsn= LSN_IMPOSSIBLE; log_descriptor.max_lsn= LSN_IMPOSSIBLE;
if (log_descriptor.log_file_num[OPENED_FILES_NUM - 1] != -1 && if (log_descriptor.log_file_num[OPENED_FILES_NUM - 1] != -1 &&
translog_close_log_file(log_descriptor.log_file_num[OPENED_FILES_NUM - translog_close_log_file(log_descriptor.log_file_num[OPENED_FILES_NUM -
1])) 1]))
{
translog_stop_writing();
/*
This should not be possible because we have not writing something
after last sync
*/
DBUG_ASSERT(0);
DBUG_RETURN(1); DBUG_RETURN(1);
}
for (i= OPENED_FILES_NUM - 1; i > 0; i--) for (i= OPENED_FILES_NUM - 1; i > 0; i--)
log_descriptor.log_file_num[i]= log_descriptor.log_file_num[i - 1]; log_descriptor.log_file_num[i]= log_descriptor.log_file_num[i - 1];
if ((log_descriptor.log_file_num[0]= if ((log_descriptor.log_file_num[0]=
open_logfile_by_number_no_cache(file_no)) == -1 || open_logfile_by_number_no_cache(file_no)) == -1 ||
translog_write_file_header()) translog_write_file_header())
{
translog_stop_writing();
DBUG_RETURN(1); DBUG_RETURN(1);
}
if (ma_control_file_write_and_force(LSN_IMPOSSIBLE, file_no, if (ma_control_file_write_and_force(LSN_IMPOSSIBLE, file_no,
CONTROL_FILE_UPDATE_ONLY_LOGNO)) CONTROL_FILE_UPDATE_ONLY_LOGNO))
{
translog_stop_writing();
DBUG_RETURN(1); DBUG_RETURN(1);
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1767,6 +1795,8 @@ static void translog_set_only_in_buffers(TRANSLOG_ADDRESS in_buffers) ...@@ -1767,6 +1795,8 @@ static void translog_set_only_in_buffers(TRANSLOG_ADDRESS in_buffers)
/* LSN_IMPOSSIBLE == 0 => it will work for very first time */ /* LSN_IMPOSSIBLE == 0 => it will work for very first time */
if (cmp_translog_addr(in_buffers, log_descriptor.in_buffers_only) > 0) if (cmp_translog_addr(in_buffers, log_descriptor.in_buffers_only) > 0)
{ {
if (translog_status != TRANSLOG_OK)
DBUG_VOID_RETURN;
log_descriptor.in_buffers_only= in_buffers; log_descriptor.in_buffers_only= in_buffers;
DBUG_PRINT("info", ("set new in_buffers_only")); DBUG_PRINT("info", ("set new in_buffers_only"));
} }
...@@ -2058,6 +2088,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2058,6 +2088,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
data.addr= &addr; data.addr= &addr;
DBUG_ASSERT(log_descriptor.pagecache->block_size == TRANSLOG_PAGE_SIZE); DBUG_ASSERT(log_descriptor.pagecache->block_size == TRANSLOG_PAGE_SIZE);
DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size); DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
if (translog_status != TRANSLOG_OK)
DBUG_RETURN(1);
if (pagecache_inject(log_descriptor.pagecache, if (pagecache_inject(log_descriptor.pagecache,
&file, &file,
(LSN_OFFSET(buffer->offset) + i) / TRANSLOG_PAGE_SIZE, (LSN_OFFSET(buffer->offset) + i) / TRANSLOG_PAGE_SIZE,
...@@ -2069,20 +2101,23 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2069,20 +2101,23 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
LSN_IMPOSSIBLE, LSN_IMPOSSIBLE,
&translog_page_validator, (uchar*) &data)) &translog_page_validator, (uchar*) &data))
{ {
UNRECOVERABLE_ERROR(("Can't write page (%lu,0x%lx) to pagecache", DBUG_PRINT("error", ("Can't write page (%lu,0x%lx) to pagecache",
(ulong) buffer->file, (ulong) buffer->file,
(ulong) (LSN_OFFSET(buffer->offset)+ i))); (ulong) (LSN_OFFSET(buffer->offset)+ i)));
translog_stop_writing();
DBUG_RETURN(1);
} }
} }
if (my_pwrite(buffer->file, (char*) buffer->buffer, if (my_pwrite(buffer->file, (char*) buffer->buffer,
buffer->size, LSN_OFFSET(buffer->offset), buffer->size, LSN_OFFSET(buffer->offset),
log_write_flags)) log_write_flags))
{ {
UNRECOVERABLE_ERROR(("Can't write buffer (%lu,0x%lx) size %lu " DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu "
"to the disk (%d)", "to the disk (%d)",
(ulong) buffer->file, (ulong) buffer->file,
(ulong) LSN_OFFSET(buffer->offset), (ulong) LSN_OFFSET(buffer->offset),
(ulong) buffer->size, errno)); (ulong) buffer->size, errno));
translog_stop_writing();
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -2128,7 +2163,7 @@ static my_bool translog_recover_page_up_to_sector(uchar *page, uint16 offset) ...@@ -2128,7 +2163,7 @@ static my_bool translog_recover_page_up_to_sector(uchar *page, uint16 offset)
if ((chunk_length= if ((chunk_length=
translog_get_total_chunk_length(page, chunk_offset)) == 0) translog_get_total_chunk_length(page, chunk_offset)) == 0)
{ {
UNRECOVERABLE_ERROR(("cant get chunk length (offset %u)", DBUG_PRINT("error", ("cant get chunk length (offset %u)",
(uint) chunk_offset)); (uint) chunk_offset));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -2136,7 +2171,7 @@ static my_bool translog_recover_page_up_to_sector(uchar *page, uint16 offset) ...@@ -2136,7 +2171,7 @@ static my_bool translog_recover_page_up_to_sector(uchar *page, uint16 offset)
(uint) chunk_offset, (uint) chunk_length)); (uint) chunk_offset, (uint) chunk_length));
if (((ulong) chunk_offset) + ((ulong) chunk_length) > TRANSLOG_PAGE_SIZE) if (((ulong) chunk_offset) + ((ulong) chunk_length) > TRANSLOG_PAGE_SIZE)
{ {
UNRECOVERABLE_ERROR(("damaged chunk (offset %u) in trusted area", DBUG_PRINT("error", ("damaged chunk (offset %u) in trusted area",
(uint) chunk_offset)); (uint) chunk_offset));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -2196,7 +2231,7 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr) ...@@ -2196,7 +2231,7 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
if (uint3korr(page) != LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE || if (uint3korr(page) != LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE ||
uint3korr(page + 3) != LSN_FILE_NO(addr)) uint3korr(page + 3) != LSN_FILE_NO(addr))
{ {
UNRECOVERABLE_ERROR(("Page (%lu,0x%lx): " DBUG_PRINT("error", ("Page (%lu,0x%lx): "
"page address written in the page is incorrect: " "page address written in the page is incorrect: "
"File %lu instead of %lu or page %lu instead of %lu", "File %lu instead of %lu or page %lu instead of %lu",
LSN_IN_PARTS(addr), LSN_IN_PARTS(addr),
...@@ -2210,7 +2245,7 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr) ...@@ -2210,7 +2245,7 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
if (flags & ~(TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION | if (flags & ~(TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION |
TRANSLOG_RECORD_CRC)) TRANSLOG_RECORD_CRC))
{ {
UNRECOVERABLE_ERROR(("Page (%lu,0x%lx): " DBUG_PRINT("error", ("Page (%lu,0x%lx): "
"Garbage in the page flags field detected : %x", "Garbage in the page flags field detected : %x",
LSN_IN_PARTS(addr), (uint) flags)); LSN_IN_PARTS(addr), (uint) flags));
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -2223,7 +2258,7 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr) ...@@ -2223,7 +2258,7 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
this_page_page_overhead); this_page_page_overhead);
if (crc != uint4korr(page_pos)) if (crc != uint4korr(page_pos))
{ {
UNRECOVERABLE_ERROR(("Page (%lu,0x%lx): " DBUG_PRINT("error", ("Page (%lu,0x%lx): "
"CRC mismatch: calculated: %lx on the page %lx", "CRC mismatch: calculated: %lx on the page %lx",
LSN_IN_PARTS(addr), LSN_IN_PARTS(addr),
(ulong) crc, (ulong) uint4korr(page_pos))); (ulong) crc, (ulong) uint4korr(page_pos)));
...@@ -2734,31 +2769,32 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr) ...@@ -2734,31 +2769,32 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/* /**
Initialize transaction log @brief Initialize transaction log
SYNOPSIS @param directory Directory where log files are put
translog_init() @param log_file_max_size max size of one log size (for new logs creation)
directory Directory where log files are put @param server_version version of MySQL server (MYSQL_VERSION_ID)
log_file_max_size max size of one log size (for new logs creation) @param server_id server ID (replication & Co)
server_version version of MySQL server (MYSQL_VERSION_ID) @param pagecache Page cache for the log reads
server_id server ID (replication & Co) @param flags flags (TRANSLOG_PAGE_CRC, TRANSLOG_SECTOR_PROTECTION
pagecache Page cache for the log reads
flags flags (TRANSLOG_PAGE_CRC, TRANSLOG_SECTOR_PROTECTION
TRANSLOG_RECORD_CRC) TRANSLOG_RECORD_CRC)
@param read_only Put transaction log in read-only mode
@param init_table_func function to initialize record descriptors table
TODO @todo
Free used resources in case of error. Free used resources in case of error.
RETURN @retval 0 OK
0 OK @retval 1 Error
1 Error
*/ */
my_bool translog_init(const char *directory, my_bool translog_init_with_table(const char *directory,
uint32 log_file_max_size, uint32 log_file_max_size,
uint32 server_version, uint32 server_version,
uint32 server_id, PAGECACHE *pagecache, uint flags) uint32 server_id, PAGECACHE *pagecache,
uint flags, my_bool readonly,
void (*init_table_func)())
{ {
int i; int i;
int old_log_was_recovered= 0, logs_found= 0; int old_log_was_recovered= 0, logs_found= 0;
...@@ -2766,9 +2802,14 @@ my_bool translog_init(const char *directory, ...@@ -2766,9 +2802,14 @@ my_bool translog_init(const char *directory,
TRANSLOG_ADDRESS sure_page, last_page, last_valid_page; TRANSLOG_ADDRESS sure_page, last_page, last_valid_page;
my_bool version_changed= 0; my_bool version_changed= 0;
DBUG_ENTER("translog_init"); DBUG_ENTER("translog_init");
DBUG_ASSERT(translog_inited == 0); DBUG_ASSERT(translog_status == TRANSLOG_UNINITED);
loghandler_init(); /* Safe to do many times */ (*init_table_func)();
if (readonly)
log_descriptor.open_flags= O_BINARY | O_RDONLY;
else
log_descriptor.open_flags= O_CREAT | O_BINARY | O_RDWR;
if (pthread_mutex_init(&log_descriptor.sent_to_file_lock, if (pthread_mutex_init(&log_descriptor.sent_to_file_lock,
MY_MUTEX_INIT_FAST) || MY_MUTEX_INIT_FAST) ||
...@@ -2793,7 +2834,8 @@ my_bool translog_init(const char *directory, ...@@ -2793,7 +2834,8 @@ my_bool translog_init(const char *directory,
if ((log_descriptor.directory_fd= my_open(log_descriptor.directory, if ((log_descriptor.directory_fd= my_open(log_descriptor.directory,
O_RDONLY, MYF(MY_WME))) < 0) O_RDONLY, MYF(MY_WME))) < 0)
{ {
UNRECOVERABLE_ERROR(("Error %d during opening directory '%s'", my_errno= errno;
DBUG_PRINT("error", ("Error %d during opening directory '%s'",
errno, log_descriptor.directory)); errno, log_descriptor.directory));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -2900,6 +2942,13 @@ my_bool translog_init(const char *directory, ...@@ -2900,6 +2942,13 @@ my_bool translog_init(const char *directory,
} }
} }
} }
else if (readonly)
{
/* There is no logs and there is read-only mode => nothing to read */
DBUG_PRINT("error", ("No logs and read-only mode"));
DBUG_RETURN(1);
}
if (logs_found) if (logs_found)
{ {
TRANSLOG_ADDRESS current_page= sure_page; TRANSLOG_ADDRESS current_page= sure_page;
...@@ -3051,7 +3100,8 @@ my_bool translog_init(const char *directory, ...@@ -3051,7 +3100,8 @@ my_bool translog_init(const char *directory,
translog_start_buffer(log_descriptor.buffers, &log_descriptor.bc, 0); translog_start_buffer(log_descriptor.buffers, &log_descriptor.bc, 0);
translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc); translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc);
} }
else if (old_log_was_recovered || old_flags != flags || version_changed) else if ((old_log_was_recovered || old_flags != flags || version_changed) &&
!readonly)
{ {
/* leave the damaged file untouched */ /* leave the damaged file untouched */
log_descriptor.horizon+= LSN_ONE_FILE; log_descriptor.horizon+= LSN_ONE_FILE;
...@@ -3090,7 +3140,7 @@ my_bool translog_init(const char *directory, ...@@ -3090,7 +3140,7 @@ my_bool translog_init(const char *directory,
DBUG_RETURN(1); DBUG_RETURN(1);
id_to_share--; /* min id is 1 */ id_to_share--; /* min id is 1 */
translog_inited= 1; translog_status= (readonly ? TRANSLOG_READONLY : TRANSLOG_OK);
/* Check the last LSN record integrity */ /* Check the last LSN record integrity */
if (logs_found) if (logs_found)
{ {
...@@ -3198,7 +3248,9 @@ my_bool translog_init(const char *directory, ...@@ -3198,7 +3248,9 @@ my_bool translog_init(const char *directory,
DBUG_PRINT("error", ("unexpected end of log or record during " DBUG_PRINT("error", ("unexpected end of log or record during "
"reading record header: (%lu,0x%lx) len: %d", "reading record header: (%lu,0x%lx) len: %d",
LSN_IN_PARTS(last_lsn), len)); LSN_IN_PARTS(last_lsn), len));
if (translog_truncate_log(last_lsn)) if (readonly)
log_descriptor.horizon= last_lsn;
else if (translog_truncate_log(last_lsn))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
else else
...@@ -3218,7 +3270,9 @@ my_bool translog_init(const char *directory, ...@@ -3218,7 +3270,9 @@ my_bool translog_init(const char *directory,
"reading record body: (%lu,0x%lx) len: %d", "reading record body: (%lu,0x%lx) len: %d",
LSN_IN_PARTS(rec.lsn), LSN_IN_PARTS(rec.lsn),
len)); len));
if (translog_truncate_log(last_lsn)) if (readonly)
log_descriptor.horizon= last_lsn;
else if (translog_truncate_log(last_lsn))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
} }
...@@ -3279,7 +3333,7 @@ void translog_destroy() ...@@ -3279,7 +3333,7 @@ void translog_destroy()
uint i; uint i;
DBUG_ENTER("translog_destroy"); DBUG_ENTER("translog_destroy");
if (translog_inited) if (likely(translog_status != TRANSLOG_UNINITED))
{ {
if (log_descriptor.bc.buffer->file != -1) if (log_descriptor.bc.buffer->file != -1)
translog_finish_page(&log_descriptor.horizon, &log_descriptor.bc); translog_finish_page(&log_descriptor.horizon, &log_descriptor.bc);
...@@ -3306,15 +3360,12 @@ void translog_destroy() ...@@ -3306,15 +3360,12 @@ void translog_destroy()
my_close(log_descriptor.directory_fd, MYF(MY_WME)); my_close(log_descriptor.directory_fd, MYF(MY_WME));
my_atomic_rwlock_destroy(&LOCK_id_to_share); my_atomic_rwlock_destroy(&LOCK_id_to_share);
my_free((uchar*)(id_to_share + 1), MYF(MY_ALLOW_ZERO_PTR)); my_free((uchar*)(id_to_share + 1), MYF(MY_ALLOW_ZERO_PTR));
translog_inited= 0; translog_status= TRANSLOG_UNINITED;
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/* /*
Start new page Start new page
...@@ -4486,7 +4537,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -4486,7 +4537,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
10, 10 CALLER_INFO)) 10, 10 CALLER_INFO))
{ {
translog_unlock(); translog_unlock();
UNRECOVERABLE_ERROR(("init array failed")); DBUG_PRINT("error", ("init array failed"));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -4522,7 +4573,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -4522,7 +4573,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
group.num= full_pages; group.num= full_pages;
if (insert_dynamic(&groups, (uchar*) &group)) if (insert_dynamic(&groups, (uchar*) &group))
{ {
UNRECOVERABLE_ERROR(("insert into array failed")); DBUG_PRINT("error", ("insert into array failed"));
goto err_unlock; goto err_unlock;
} }
...@@ -4552,7 +4603,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -4552,7 +4603,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
} }
if (rc) if (rc)
{ {
UNRECOVERABLE_ERROR(("flush of unlock buffer failed")); DBUG_PRINT("error", ("flush of unlock buffer failed"));
goto err; goto err;
} }
...@@ -4595,7 +4646,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -4595,7 +4646,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
} }
if (rc) if (rc)
{ {
UNRECOVERABLE_ERROR(("flush of unlock buffer failed")); DBUG_PRINT("error", ("flush of unlock buffer failed"));
goto err; goto err;
} }
rc= translog_buffer_lock(cursor.buffer); rc= translog_buffer_lock(cursor.buffer);
...@@ -4617,7 +4668,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -4617,7 +4668,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
group.num= 0; /* 0 because it does not matter */ group.num= 0; /* 0 because it does not matter */
if (insert_dynamic(&groups, (uchar*) &group)) if (insert_dynamic(&groups, (uchar*) &group))
{ {
UNRECOVERABLE_ERROR(("insert into array failed")); DBUG_PRINT("error", ("insert into array failed"));
goto err_unlock; goto err_unlock;
} }
record_rest= parts->record_length - done; record_rest= parts->record_length - done;
...@@ -4773,7 +4824,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -4773,7 +4824,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
} }
if (rc) if (rc)
{ {
UNRECOVERABLE_ERROR(("flush of unlock buffer failed")); DBUG_PRINT("error", ("flush of unlock buffer failed"));
goto err; goto err;
} }
} }
...@@ -5155,7 +5206,14 @@ my_bool translog_write_record(LSN *lsn, ...@@ -5155,7 +5206,14 @@ my_bool translog_write_record(LSN *lsn,
DBUG_ENTER("translog_write_record"); DBUG_ENTER("translog_write_record");
DBUG_PRINT("enter", ("type: %u ShortTrID: %u rec_len: %lu", DBUG_PRINT("enter", ("type: %u ShortTrID: %u rec_len: %lu",
(uint) type, (uint) short_trid, (ulong) rec_len)); (uint) type, (uint) short_trid, (ulong) rec_len));
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
if (unlikely(translog_status != TRANSLOG_OK))
{
DBUG_PRINT("error", ("Transaction log is write protected"));
DBUG_RETURN(1);
}
if (tbl_info) if (tbl_info)
{ {
...@@ -5363,7 +5421,6 @@ static int translog_fixed_length_header(uchar *page, ...@@ -5363,7 +5421,6 @@ static int translog_fixed_length_header(uchar *page,
void translog_free_record_header(TRANSLOG_HEADER_BUFFER *buff) void translog_free_record_header(TRANSLOG_HEADER_BUFFER *buff)
{ {
DBUG_ENTER("translog_free_record_header"); DBUG_ENTER("translog_free_record_header");
DBUG_ASSERT(translog_inited == 1);
if (buff->groups_no != 0) if (buff->groups_no != 0)
{ {
my_free((uchar*) buff->groups, MYF(0)); my_free((uchar*) buff->groups, MYF(0));
...@@ -5377,12 +5434,15 @@ void translog_free_record_header(TRANSLOG_HEADER_BUFFER *buff) ...@@ -5377,12 +5434,15 @@ void translog_free_record_header(TRANSLOG_HEADER_BUFFER *buff)
@brief Returns the current horizon at the end of the current log @brief Returns the current horizon at the end of the current log
@return Horizon @return Horizon
@retval LSN_ERROR error
@retvar # Horizon
*/ */
TRANSLOG_ADDRESS translog_get_horizon() TRANSLOG_ADDRESS translog_get_horizon()
{ {
TRANSLOG_ADDRESS res; TRANSLOG_ADDRESS res;
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
translog_lock(); translog_lock();
res= log_descriptor.horizon; res= log_descriptor.horizon;
translog_unlock(); translog_unlock();
...@@ -5395,11 +5455,14 @@ TRANSLOG_ADDRESS translog_get_horizon() ...@@ -5395,11 +5455,14 @@ TRANSLOG_ADDRESS translog_get_horizon()
assumed to already hold the lock assumed to already hold the lock
@return Horizon @return Horizon
@retval LSN_ERROR error
@retvar # Horizon
*/ */
TRANSLOG_ADDRESS translog_get_horizon_no_lock() TRANSLOG_ADDRESS translog_get_horizon_no_lock()
{ {
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
translog_lock_assert_owner(); translog_lock_assert_owner();
return log_descriptor.horizon; return log_descriptor.horizon;
} }
...@@ -5487,7 +5550,8 @@ my_bool translog_init_scanner(LSN lsn, ...@@ -5487,7 +5550,8 @@ my_bool translog_init_scanner(LSN lsn,
DBUG_ENTER("translog_init_scanner"); DBUG_ENTER("translog_init_scanner");
DBUG_PRINT("enter", ("Scanner: 0x%lx LSN: (0x%lu,0x%lx)", DBUG_PRINT("enter", ("Scanner: 0x%lx LSN: (0x%lu,0x%lx)",
(ulong) scanner, LSN_IN_PARTS(lsn))); (ulong) scanner, LSN_IN_PARTS(lsn)));
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
data.addr= &scanner->page_addr; data.addr= &scanner->page_addr;
data.was_recovered= 0; data.was_recovered= 0;
...@@ -5912,7 +5976,8 @@ int translog_read_record_header_from_buffer(uchar *page, ...@@ -5912,7 +5976,8 @@ int translog_read_record_header_from_buffer(uchar *page,
TRANSLOG_CHUNK_LSN || TRANSLOG_CHUNK_LSN ||
(page[page_offset] & TRANSLOG_CHUNK_TYPE) == (page[page_offset] & TRANSLOG_CHUNK_TYPE) ==
TRANSLOG_CHUNK_FIXED); TRANSLOG_CHUNK_FIXED);
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
buff->type= (page[page_offset] & TRANSLOG_REC_TYPE); buff->type= (page[page_offset] & TRANSLOG_REC_TYPE);
buff->short_trid= uint2korr(page + page_offset + 1); buff->short_trid= uint2korr(page + page_offset + 1);
DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)", DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)",
...@@ -5965,7 +6030,8 @@ int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff) ...@@ -5965,7 +6030,8 @@ int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff)
DBUG_ENTER("translog_read_record_header"); DBUG_ENTER("translog_read_record_header");
DBUG_PRINT("enter", ("LSN: (0x%lu,0x%lx)", LSN_IN_PARTS(lsn))); DBUG_PRINT("enter", ("LSN: (0x%lu,0x%lx)", LSN_IN_PARTS(lsn)));
DBUG_ASSERT(LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE != 0); DBUG_ASSERT(LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE != 0);
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
buff->lsn= lsn; buff->lsn= lsn;
buff->groups_no= 0; buff->groups_no= 0;
...@@ -6014,7 +6080,8 @@ int translog_read_record_header_scan(TRANSLOG_SCANNER_DATA *scanner, ...@@ -6014,7 +6080,8 @@ int translog_read_record_header_scan(TRANSLOG_SCANNER_DATA *scanner,
LSN_IN_PARTS(scanner->last_file_page), LSN_IN_PARTS(scanner->last_file_page),
(uint) scanner->page_offset, (uint) scanner->page_offset,
(uint) scanner->page_offset, scanner->fixed_horizon)); (uint) scanner->page_offset, scanner->fixed_horizon));
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
buff->groups_no= 0; buff->groups_no= 0;
buff->lsn= scanner->page_addr; buff->lsn= scanner->page_addr;
buff->lsn+= scanner->page_offset; /* offset increasing */ buff->lsn+= scanner->page_offset; /* offset increasing */
...@@ -6061,7 +6128,8 @@ int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner, ...@@ -6061,7 +6128,8 @@ int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner,
LSN_IN_PARTS(scanner->last_file_page), LSN_IN_PARTS(scanner->last_file_page),
(uint) scanner->page_offset, (uint) scanner->page_offset,
(uint) scanner->page_offset, scanner->fixed_horizon)); (uint) scanner->page_offset, scanner->fixed_horizon));
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
do do
{ {
...@@ -6270,7 +6338,8 @@ translog_size_t translog_read_record(LSN lsn, ...@@ -6270,7 +6338,8 @@ translog_size_t translog_read_record(LSN lsn,
translog_size_t end= offset + length; translog_size_t end= offset + length;
struct st_translog_reader_data internal_data; struct st_translog_reader_data internal_data;
DBUG_ENTER("translog_read_record"); DBUG_ENTER("translog_read_record");
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
if (data == NULL) if (data == NULL)
{ {
...@@ -6540,7 +6609,8 @@ my_bool translog_flush(LSN lsn) ...@@ -6540,7 +6609,8 @@ my_bool translog_flush(LSN lsn)
my_bool full_circle= 0; my_bool full_circle= 0;
DBUG_ENTER("translog_flush"); DBUG_ENTER("translog_flush");
DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
translog_mutex_lock(&log_descriptor.log_flush_lock); translog_mutex_lock(&log_descriptor.log_flush_lock);
translog_lock(); translog_lock();
...@@ -6562,6 +6632,11 @@ my_bool translog_flush(LSN lsn) ...@@ -6562,6 +6632,11 @@ my_bool translog_flush(LSN lsn)
goto out; goto out;
} }
/* send to the file if it is not sent */ /* send to the file if it is not sent */
if (translog_status != TRANSLOG_OK)
{
rc= 1;
goto out;
}
sent_to_file= translog_get_sent_to_file(); sent_to_file= translog_get_sent_to_file();
if (cmp_translog_addr(sent_to_file, lsn) >= 0) if (cmp_translog_addr(sent_to_file, lsn) >= 0)
break; break;
...@@ -6613,18 +6688,27 @@ my_bool translog_flush(LSN lsn) ...@@ -6613,18 +6688,27 @@ my_bool translog_flush(LSN lsn)
if ((log_descriptor.log_file_num[cache_index]= if ((log_descriptor.log_file_num[cache_index]=
open_logfile_by_number_no_cache(i)) == -1) open_logfile_by_number_no_cache(i)) == -1)
{ {
translog_stop_writing();
rc= 1; rc= 1;
goto out; goto out;
} }
} }
file= log_descriptor.log_file_num[cache_index]; file= log_descriptor.log_file_num[cache_index];
rc|= my_sync(file, MYF(MY_WME)); if (my_sync(file, MYF(MY_WME)))
{
translog_stop_writing();
rc= 1;
goto out;
}
} }
/* We sync file when we are closing it => do nothing if file closed */ /* We sync file when we are closing it => do nothing if file closed */
} }
log_descriptor.flushed= sent_to_file; log_descriptor.flushed= sent_to_file;
/** @todo LOG decide if syncing of directory is needed */ /** @todo LOG decide if syncing of directory is needed */
rc|= my_sync(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); rc|= my_sync(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
if (rc)
translog_stop_writing();
out: out:
translog_mutex_unlock(&log_descriptor.log_flush_lock); translog_mutex_unlock(&log_descriptor.log_flush_lock);
DBUG_RETURN(rc); DBUG_RETURN(rc);
...@@ -6929,7 +7013,8 @@ LSN translog_first_lsn_in_log() ...@@ -6929,7 +7013,8 @@ LSN translog_first_lsn_in_log()
uchar *page; uchar *page;
DBUG_ENTER("translog_first_lsn_in_log"); DBUG_ENTER("translog_first_lsn_in_log");
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", LSN_IN_PARTS(addr))); DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", LSN_IN_PARTS(addr)));
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
if (!(file= translog_first_file(horizon, 0))) if (!(file= translog_first_file(horizon, 0)))
{ {
...@@ -6966,7 +7051,8 @@ LSN translog_first_theoretical_lsn() ...@@ -6966,7 +7051,8 @@ LSN translog_first_theoretical_lsn()
TRANSLOG_VALIDATOR_DATA data; TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_first_theoretical_lsn"); DBUG_ENTER("translog_first_theoretical_lsn");
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", LSN_IN_PARTS(addr))); DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", LSN_IN_PARTS(addr)));
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
if (!translog_is_file(1)) if (!translog_is_file(1))
DBUG_RETURN(LSN_IMPOSSIBLE); DBUG_RETURN(LSN_IMPOSSIBLE);
...@@ -7003,7 +7089,8 @@ my_bool translog_purge(TRANSLOG_ADDRESS low) ...@@ -7003,7 +7089,8 @@ my_bool translog_purge(TRANSLOG_ADDRESS low)
int rc= 0; int rc= 0;
DBUG_ENTER("translog_purge"); DBUG_ENTER("translog_purge");
DBUG_PRINT("enter", ("low: (%lu,0x%lx)", LSN_IN_PARTS(low))); DBUG_PRINT("enter", ("low: (%lu,0x%lx)", LSN_IN_PARTS(low)));
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
translog_mutex_lock(&log_descriptor.purger_lock); translog_mutex_lock(&log_descriptor.purger_lock);
if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file) if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file)
......
...@@ -228,11 +228,18 @@ C_MODE_START ...@@ -228,11 +228,18 @@ C_MODE_START
#define LOGREC_FIXED_RECORD_2LSN_EXAMPLE 5 #define LOGREC_FIXED_RECORD_2LSN_EXAMPLE 5
#define LOGREC_VARIABLE_RECORD_2LSN_EXAMPLE 6 #define LOGREC_VARIABLE_RECORD_2LSN_EXAMPLE 6
extern void example_loghandler_init(); extern void translog_example_table_init();
extern void translog_table_init();
extern my_bool translog_init(const char *directory, uint32 log_file_max_size, #define translog_init(D,M,V,I,C,F,R) \
uint32 server_version, uint32 server_id, translog_init_with_table(D,M,V,I,C,F,R,&translog_table_init)
PAGECACHE *pagecache, uint flags); extern my_bool translog_init_with_table(const char *directory,
uint32 log_file_max_size,
uint32 server_version,
uint32 server_id,
PAGECACHE *pagecache,
uint flags,
my_bool readonly,
void (*init_table_func)());
extern my_bool extern my_bool
translog_write_record(LSN *lsn, enum translog_record_type type, translog_write_record(LSN *lsn, enum translog_record_type type,
...@@ -279,7 +286,13 @@ extern void translog_deassign_id_from_share(struct st_maria_share *share); ...@@ -279,7 +286,13 @@ extern void translog_deassign_id_from_share(struct st_maria_share *share);
extern void extern void
translog_assign_id_to_share_from_recovery(struct st_maria_share *share, translog_assign_id_to_share_from_recovery(struct st_maria_share *share,
uint16 id); uint16 id);
extern my_bool translog_inited; enum enum_translog_status
{
TRANSLOG_UNINITED, /* no initialization done or error during initialization */
TRANSLOG_OK, /* transaction log is functioning */
TRANSLOG_READONLY /* read only mode due to write errors */
};
extern enum enum_translog_status translog_status;
/* /*
all the rest added because of recovery; should we make all the rest added because of recovery; should we make
......
...@@ -1005,7 +1005,7 @@ uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite) ...@@ -1005,7 +1005,7 @@ uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite)
{ {
safe_mutex_assert_owner(&share->intern_lock); safe_mutex_assert_owner(&share->intern_lock);
} }
if (share->base.born_transactional && translog_inited && if (share->base.born_transactional && translog_status == TRANSLOG_OK &&
!maria_in_recovery) !maria_in_recovery)
{ {
/* /*
......
...@@ -82,7 +82,7 @@ int main(int argc,char *argv[]) ...@@ -82,7 +82,7 @@ int main(int argc,char *argv[])
TRANSLOG_PAGE_SIZE) == 0) || TRANSLOG_PAGE_SIZE) == 0) ||
translog_init(maria_data_root, TRANSLOG_FILE_SIZE, translog_init(maria_data_root, TRANSLOG_FILE_SIZE,
0, 0, maria_log_pagecache, 0, 0, maria_log_pagecache,
TRANSLOG_DEFAULT_FLAGS) || TRANSLOG_DEFAULT_FLAGS, 0) ||
(transactional && (trnman_init(0) || ma_checkpoint_init(0)))) (transactional && (trnman_init(0) || ma_checkpoint_init(0))))
{ {
fprintf(stderr, "Error in initialization"); fprintf(stderr, "Error in initialization");
......
...@@ -97,7 +97,7 @@ int main(int argc, char *argv[]) ...@@ -97,7 +97,7 @@ int main(int argc, char *argv[])
TRANSLOG_PAGE_SIZE) == 0) || TRANSLOG_PAGE_SIZE) == 0) ||
translog_init(maria_data_root, TRANSLOG_FILE_SIZE, translog_init(maria_data_root, TRANSLOG_FILE_SIZE,
0, 0, maria_log_pagecache, 0, 0, maria_log_pagecache,
TRANSLOG_DEFAULT_FLAGS) || TRANSLOG_DEFAULT_FLAGS, 0) ||
(transactional && (trnman_init(0) || ma_checkpoint_init(0)))) (transactional && (trnman_init(0) || ma_checkpoint_init(0))))
{ {
fprintf(stderr, "Error in initialization"); fprintf(stderr, "Error in initialization");
......
...@@ -80,7 +80,7 @@ int main(int argc, char **argv) ...@@ -80,7 +80,7 @@ int main(int argc, char **argv)
which is useless. TODO: start log handler in read-only mode. which is useless. TODO: start log handler in read-only mode.
*/ */
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, maria_pagecache, if (translog_init(".", LOG_FILE_SIZE, 50112, 0, maria_pagecache,
TRANSLOG_DEFAULT_FLAGS)) TRANSLOG_DEFAULT_FLAGS, opt_only_display))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
goto err; goto err;
......
...@@ -46,7 +46,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \ ...@@ -46,7 +46,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \
ma_test_loghandler_noflush-t \ ma_test_loghandler_noflush-t \
ma_test_loghandler_first_lsn-t \ ma_test_loghandler_first_lsn-t \
ma_test_loghandler_max_lsn-t \ ma_test_loghandler_max_lsn-t \
ma_test_loghandler_purge-t ma_test_loghandler_purge-t \
ma_test_loghandler_readonly-t
ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c
ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c
...@@ -58,6 +59,8 @@ ma_test_loghandler_noflush_t_SOURCES = ma_test_loghandler_noflush-t.c ma_maria_l ...@@ -58,6 +59,8 @@ ma_test_loghandler_noflush_t_SOURCES = ma_test_loghandler_noflush-t.c ma_maria_l
ma_test_loghandler_first_lsn_t_SOURCES = ma_test_loghandler_first_lsn-t.c ma_maria_log_cleanup.c ma_test_loghandler_first_lsn_t_SOURCES = ma_test_loghandler_first_lsn-t.c ma_maria_log_cleanup.c
ma_test_loghandler_max_lsn_t_SOURCES = ma_test_loghandler_max_lsn-t.c ma_maria_log_cleanup.c ma_test_loghandler_max_lsn_t_SOURCES = ma_test_loghandler_max_lsn-t.c ma_maria_log_cleanup.c
ma_test_loghandler_purge_t_SOURCES = ma_test_loghandler_purge-t.c ma_maria_log_cleanup.c ma_test_loghandler_purge_t_SOURCES = ma_test_loghandler_purge-t.c ma_maria_log_cleanup.c
ma_test_loghandler_readonly_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c
ma_test_loghandler_readonly_t_CPPFLAGS = -DREADONLY_TEST
ma_pagecache_single_src = ma_pagecache_single.c test_file.c test_file.h ma_pagecache_single_src = ma_pagecache_single.c test_file.c test_file.h
ma_pagecache_consist_src = ma_pagecache_consist.c test_file.c test_file.h ma_pagecache_consist_src = ma_pagecache_consist.c test_file.c test_file.h
......
...@@ -175,13 +175,13 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -175,13 +175,13 @@ int main(int argc __attribute__((unused)), char *argv[])
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1); exit(1);
} }
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS)) if (translog_init_with_table(".", LOG_FILE_SIZE, 50112, 0, &pagecache,
LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy(); translog_destroy();
exit(1); exit(1);
} }
example_loghandler_init();
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
trn->first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; trn->first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID;
...@@ -337,7 +337,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -337,7 +337,7 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_flush(translog_get_horizon())) if (translog_flush(translog_get_horizon()))
{ {
fprintf(stderr, "Can't flush up to horizon\n", (ulong) i); fprintf(stderr, "Can't flush up to horizon\n");
translog_destroy(); translog_destroy();
ok(0, "flush"); ok(0, "flush");
exit(1); exit(1);
......
...@@ -65,13 +65,13 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -65,13 +65,13 @@ int main(int argc __attribute__((unused)), char *argv[])
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1); exit(1);
} }
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS)) if (translog_init_with_table(".", LOG_FILE_SIZE, 50112, 0, &pagecache,
LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy(); translog_destroy();
exit(1); exit(1);
} }
example_loghandler_init();
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID;
......
...@@ -59,13 +59,13 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -59,13 +59,13 @@ int main(int argc __attribute__((unused)), char *argv[])
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1); exit(1);
} }
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS)) if (translog_init_with_table(".", LOG_FILE_SIZE, 50112, 0, &pagecache,
LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy(); translog_destroy();
exit(1); exit(1);
} }
example_loghandler_init();
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID;
......
...@@ -11,15 +11,27 @@ static const char *default_dbug_option; ...@@ -11,15 +11,27 @@ static const char *default_dbug_option;
#endif #endif
static TRN *trn= &dummy_transaction_object; static TRN *trn= &dummy_transaction_object;
#define PCACHE_SIZE (1024*1024*10)
#define LONG_BUFFER_SIZE ((1024L*1024L*1024L) + (1024L*1024L*512)) #ifndef READONLY_TEST
#define PCACHE_SIZE (1024*1024*10)
#define LONG_BUFFER_SIZE ((1024L*1024L*1024L) + (1024L*1024L*512))
#define MIN_REC_LENGTH (1024L*1024L + 1024L*512L + 1) #define MIN_REC_LENGTH (1024L*1024L + 1024L*512L + 1)
#define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512)
#define ITERATIONS 2
#define READONLY 0
#else
#define PCACHE_SIZE (1024*1024*10)
#define LONG_BUFFER_SIZE (1024L*1024L)
#define MIN_REC_LENGTH (1024L)
#define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512) #define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512)
#define ITERATIONS 2 #define ITERATIONS 2
/*#define ITERATIONS 63 */ #define READONLY 1
#endif /*READONLY_TEST*/
/* /*
#define LOG_FILE_SIZE 1024L*1024L*3L #define LOG_FILE_SIZE 1024L*1024L*3L
...@@ -172,13 +184,13 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -172,13 +184,13 @@ int main(int argc __attribute__((unused)), char *argv[])
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1); exit(1);
} }
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, 0)) if (translog_init_with_table(".", LOG_FILE_SIZE, 50112, 0, &pagecache,
0, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy(); translog_destroy();
exit(1); exit(1);
} }
example_loghandler_init();
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
trn->first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; trn->first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID;
...@@ -337,13 +349,13 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -337,13 +349,13 @@ int main(int argc __attribute__((unused)), char *argv[])
fprintf(stderr, "pass2: Got error: init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "pass2: Got error: init_pagecache() (errno: %d)\n", errno);
exit(1); exit(1);
} }
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, 0)) if (translog_init_with_table(".", LOG_FILE_SIZE, 50112, 0, &pagecache,
0, READONLY, &translog_example_table_init))
{ {
fprintf(stderr, "pass2: Can't init loghandler (%d)\n", errno); fprintf(stderr, "pass2: Can't init loghandler (%d)\n", errno);
translog_destroy(); translog_destroy();
exit(1); exit(1);
} }
example_loghandler_init();
srandom(122334817L); srandom(122334817L);
......
...@@ -282,13 +282,13 @@ int main(int argc __attribute__((unused)), ...@@ -282,13 +282,13 @@ int main(int argc __attribute__((unused)),
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1); exit(1);
} }
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS)) if (translog_init_with_table(".", LOG_FILE_SIZE, 50112, 0, &pagecache,
LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy(); translog_destroy();
exit(1); exit(1);
} }
example_loghandler_init();
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID;
......
...@@ -67,13 +67,13 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -67,13 +67,13 @@ int main(int argc __attribute__((unused)), char *argv[])
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1); exit(1);
} }
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS)) if (translog_init_with_table(".", LOG_FILE_SIZE, 50112, 0, &pagecache,
LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy(); translog_destroy();
exit(1); exit(1);
} }
example_loghandler_init();
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID;
......
...@@ -67,13 +67,13 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -67,13 +67,13 @@ int main(int argc __attribute__((unused)), char *argv[])
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1); exit(1);
} }
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS)) if (translog_init_with_table(".", LOG_FILE_SIZE, 50112, 0, &pagecache,
LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy(); translog_destroy();
exit(1); exit(1);
} }
example_loghandler_init();
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID;
......
...@@ -62,13 +62,13 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -62,13 +62,13 @@ int main(int argc __attribute__((unused)), char *argv[])
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1); exit(1);
} }
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS)) if (translog_init_with_table(".", LOG_FILE_SIZE, 50112, 0, &pagecache,
LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy(); translog_destroy();
exit(1); exit(1);
} }
example_loghandler_init();
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment