Commit 329ba0d4 authored by unknown's avatar unknown

Merge bk-internal.mysql.com:/home/bk/mysql-maria

into  janus.mylan:/usr/home/serg/Abk/mysql-maria

parents 5ef7e037 4b1fe65b
...@@ -20,7 +20,7 @@ INCLUDES = @ZLIB_INCLUDES@ -I$(top_builddir)/include \ ...@@ -20,7 +20,7 @@ INCLUDES = @ZLIB_INCLUDES@ -I$(top_builddir)/include \
-I$(top_srcdir)/include -I$(srcdir) -I$(top_srcdir)/include -I$(srcdir)
pkglib_LIBRARIES = libmysys.a pkglib_LIBRARIES = libmysys.a
LDADD = libmysys.a $(top_builddir)/strings/libmystrings.a $(top_builddir)/dbug/libdbug.a LDADD = libmysys.a $(top_builddir)/strings/libmystrings.a $(top_builddir)/dbug/libdbug.a
noinst_HEADERS = mysys_priv.h my_static.h noinst_HEADERS = mysys_priv.h my_static.h my_safehash.h
libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \
mf_path.c mf_loadpath.c my_file.c \ mf_path.c mf_loadpath.c my_file.c \
my_open.c my_create.c my_dup.c my_seek.c my_read.c \ my_open.c my_create.c my_dup.c my_seek.c my_read.c \
......
...@@ -33,7 +33,7 @@ SUBDIRS = . unittest ...@@ -33,7 +33,7 @@ SUBDIRS = . unittest
EXTRA_DIST = ma_test_all.sh ma_test_all.res ma_ft_stem.c CMakeLists.txt plug.in EXTRA_DIST = ma_test_all.sh ma_test_all.res ma_ft_stem.c CMakeLists.txt plug.in
pkgdata_DATA = ma_test_all ma_test_all.res pkgdata_DATA = ma_test_all ma_test_all.res
pkglib_LIBRARIES = libmaria.a pkglib_LIBRARIES = libmaria.a
bin_PROGRAMS = maria_chk maria_pack maria_ftdump bin_PROGRAMS = maria_chk maria_pack maria_ftdump maria_read_log
maria_chk_DEPENDENCIES= $(LIBRARIES) maria_chk_DEPENDENCIES= $(LIBRARIES)
# Only reason to link with libmyisam.a here is that it's where some fulltext # Only reason to link with libmyisam.a here is that it's where some fulltext
# pieces are (but soon we'll remove fulltext dependencies from Maria). # pieces are (but soon we'll remove fulltext dependencies from Maria).
...@@ -49,6 +49,12 @@ maria_pack_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \ ...@@ -49,6 +49,12 @@ maria_pack_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
$(top_builddir)/mysys/libmysys.a \ $(top_builddir)/mysys/libmysys.a \
$(top_builddir)/dbug/libdbug.a \ $(top_builddir)/dbug/libdbug.a \
$(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@ $(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@
maria_read_log_DEPENDENCIES=$(LIBRARIES)
maria_read_log_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
$(top_builddir)/storage/myisam/libmyisam.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@
noinst_PROGRAMS = ma_test1 ma_test2 ma_test3 ma_rt_test ma_sp_test noinst_PROGRAMS = ma_test1 ma_test2 ma_test3 ma_rt_test ma_sp_test
noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \ noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \
ma_sp_defs.h ma_fulltext.h ma_ftdefs.h ma_ft_test1.h \ ma_sp_defs.h ma_fulltext.h ma_ftdefs.h ma_ft_test1.h \
......
...@@ -2241,7 +2241,7 @@ static int ha_maria_init(void *p) ...@@ -2241,7 +2241,7 @@ static int ha_maria_init(void *p)
maria_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES; maria_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES;
bzero(maria_log_pagecache, sizeof(*maria_log_pagecache)); bzero(maria_log_pagecache, sizeof(*maria_log_pagecache));
maria_data_root= mysql_real_data_home; maria_data_root= mysql_real_data_home;
res= maria_init() || ma_control_file_create_or_open() || res= maria_init() || ma_control_file_create_or_open(TRUE) ||
(init_pagecache(maria_log_pagecache, (init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0, TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) || TRANSLOG_PAGE_SIZE) == 0) ||
......
...@@ -557,7 +557,8 @@ static my_bool check_if_zero(byte *pos, uint length) ...@@ -557,7 +557,8 @@ static my_bool check_if_zero(byte *pos, uint length)
SYNOPSIS SYNOPSIS
_ma_unpin_all_pages() _ma_unpin_all_pages()
info Maria handler info Maria handler
undo_lsn LSN for undo pages. 0 if we shouldn't write undo (error) undo_lsn LSN for undo pages. LSN_IMPOSSIBLE if we shouldn't write undo
(error)
NOTE NOTE
We unpin pages in the reverse order as they where pinned; This may not We unpin pages in the reverse order as they where pinned; This may not
...@@ -580,14 +581,15 @@ void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn) ...@@ -580,14 +581,15 @@ void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn)
DBUG_PRINT("info", ("undo_lsn: %lu", (ulong) undo_lsn)); DBUG_PRINT("info", ("undo_lsn: %lu", (ulong) undo_lsn));
/* True if not disk error */ /* True if not disk error */
DBUG_ASSERT(undo_lsn != 0 || !info->s->base.transactional); DBUG_ASSERT((undo_lsn != LSN_IMPOSSIBLE) || !info->s->base.transactional);
if (!info->s->base.transactional) if (!info->s->base.transactional)
{ {
/* /*
If this is a transactional table but with transactionality temporarily If this is a transactional table but with transactionality temporarily
disabled (like in ALTER TABLE) we need to give a sensible LSN to pages disabled (like in ALTER TABLE) we need to give a sensible LSN to pages
and not 0. If this is not a transactional table it will reduce to 0. and not LSN_IMPOSSIBLE. If this is not a transactional table it will
reduce to LSN_IMPOSSIBLE.
*/ */
undo_lsn= info->s->state.create_rename_lsn; undo_lsn= info->s->state.create_rename_lsn;
} }
...@@ -1958,8 +1960,8 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -1958,8 +1960,8 @@ static my_bool write_block_record(MARIA_HA *info,
size_t data_length= (size_t) (data - row_pos->data); size_t data_length= (size_t) (data - row_pos->data);
/* Log REDO changes of head page */ /* Log REDO changes of head page */
page_store(log_data+ FILEID_STORE_SIZE, head_block->page); page_store(log_data + FILEID_STORE_SIZE, head_block->page);
dirpos_store(log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE, dirpos_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE,
row_pos->rownr); row_pos->rownr);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
...@@ -2183,12 +2185,22 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -2183,12 +2185,22 @@ static my_bool write_block_record(MARIA_HA *info,
disk_err: disk_err:
/** /**
@todo RECOVERY we are going to let dirty pages go to disk while we have @todo RECOVERY we are going to let dirty pages go to disk while we have
logged UNDO, this violates WAL. If we have not written any full pages, logged UNDO, this violates WAL. We must mark the table corrupted!
all dirty pages are pinned so we could just delete them from the
pagecache. Moreover, we have written some REDOs without a closing UNDO, @todo RECOVERY we have written some REDOs without a closing UNDO,
it's possible that a next operation by this transaction succeeds and then it's possible that a next operation by this transaction succeeds and then
Recovery would glue the "orphan REDOs" to the succeeded operation and Recovery would glue the "orphan REDOs" to the succeeded operation and
execute the failed REDOs. execute the failed REDOs. We need some mark "abort this group" in the
log, or mark the table corrupted (then user will repair it and thus REDOs
will be skipped).
@todo RECOVERY to not let write errors go unnoticed, pagecache_write()
should take a MARIA_HA* in argument, and it it
fails when flushing a page to disk it should call
(*the_maria_ha->write_error_func)(the_maria_ha)
and this hook will mark the table corrupted.
Maybe hook should be stored in the pagecache's block structure, or in a
hash "file->maria_ha*".
*/ */
/* Unpin all pinned pages to not cause problems for disk cache */ /* Unpin all pinned pages to not cause problems for disk cache */
_ma_unpin_all_pages(info, 0); _ma_unpin_all_pages(info, 0);
......
...@@ -5176,7 +5176,23 @@ int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info) ...@@ -5176,7 +5176,23 @@ int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info)
/* Only called from ha_maria.cc, not maria_check, so translog is inited */ /* Only called from ha_maria.cc, not maria_check, so translog is inited */
if (share->base.transactional && !share->temporary) if (share->base.transactional && !share->temporary)
{ {
/* For now this record is only informative */ /*
For now this record is only informative. It could serve when applying
logs to a backup, but that needs more thought. Assume table became
corrupted. It is repaired, then some writes happen to it.
Later we restore an old backup, and want to apply this REDO_REPAIR_TABLE
record. For it to give the same result as originally, the table should
be corrupted the same way, so applying previous REDOs should produce the
same corruption; that's really not guaranteed (different execution paths
in execution of REDOs vs runtime code so not same bugs hit, temporary
hardware issues not repeatable etc). Corruption may not be repeatable.
A reasonable solution is to execute the REDO_REPAIR_TABLE record and
check if the checksum of the resulting table matches what it was at the
end of the original repair (should be stored in log record); or execute
the REDO_REPAIR_TABLE if the checksum of the table-before-repair matches
was it was at the start of the original repair (should be stored in log
record).
*/
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE]; uchar log_data[LSN_STORE_SIZE];
compile_time_assert(LSN_STORE_SIZE >= (FILEID_STORE_SIZE + 4)); compile_time_assert(LSN_STORE_SIZE >= (FILEID_STORE_SIZE + 4));
...@@ -5193,19 +5209,17 @@ int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info) ...@@ -5193,19 +5209,17 @@ int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info)
log_array[TRANSLOG_INTERNAL_PARTS + log_array[TRANSLOG_INTERNAL_PARTS +
0].length, 0].length,
sizeof(log_array)/sizeof(log_array[0]), sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data))) log_array, log_data) ||
translog_flush(share->state.create_rename_lsn)))
return 1; return 1;
/* /*
But this piece is really needed, to have the new table's content durable But this piece is really needed, to have the new table's content durable
and to not apply old REDOs to the new table. The table's existence was and to not apply old REDOs to the new table. The table's existence was
made durable earlier (MY_SYNC_DIR passed to maria_change_to_newfile()). made durable earlier (MY_SYNC_DIR passed to maria_change_to_newfile()).
*/ */
lsn_store(log_data, share->state.create_rename_lsn);
DBUG_ASSERT(info->dfile.file >= 0); DBUG_ASSERT(info->dfile.file >= 0);
DBUG_ASSERT(share->kfile.file >= 0); return _ma_update_create_rename_lsn_on_disk(share, FALSE) ||
return (my_pwrite(share->kfile.file, log_data, sizeof(log_data), _ma_sync_table_files(info);
sizeof(share->state.header) + 2, MYF(MY_NABP)) ||
_ma_sync_table_files(info));
} }
return 0; return 0;
} }
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB /* Copyright (C) 2006,2007 MySQL AB
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -21,14 +21,61 @@ ...@@ -21,14 +21,61 @@
/* This is the interface of this module. */ /* This is the interface of this module. */
typedef enum enum_checkpoint_level { typedef enum enum_ma_checkpoint_level {
NONE=-1, CHECKPOINT_NONE= 0,
INDIRECT, /* just write dirty_pages, transactions table and sync files */ /* just write dirty_pages, transactions table and sync files */
MEDIUM, /* also flush all dirty pages which were already dirty at prev checkpoint*/ CHECKPOINT_INDIRECT,
FULL /* also flush all dirty pages */ /* also flush all dirty pages which were already dirty at prev checkpoint */
CHECKPOINT_MEDIUM,
/* also flush all dirty pages */
CHECKPOINT_FULL
} CHECKPOINT_LEVEL; } CHECKPOINT_LEVEL;
void request_asynchronous_checkpoint(CHECKPOINT_LEVEL level); C_MODE_START
my_bool execute_synchronous_checkpoint(CHECKPOINT_LEVEL level); int ma_checkpoint_init();
my_bool execute_asynchronous_checkpoint_if_any(); void ma_checkpoint_end();
/* that's all that's needed in the interface */ int ma_checkpoint_execute(CHECKPOINT_LEVEL level, my_bool no_wait);
C_MODE_END
/**
@brief reads some LSNs with special trickery
If a 64-bit variable transitions between both halves being zero to both
halves being non-zero, and back, this function can be used to do a read of
it (without mutex, without atomic load) which always produces a correct
(though maybe slightly old) value (even on 32-bit CPUs). The value is at
least as new as the latest mutex unlock done by the calling thread.
The assumption is that the system sets both 4-byte halves either at the
same time, or one after the other (in any order), but NOT some bytes of the
first half then some bytes of the second half then the rest of bytes of the
first half. With this assumption, the function can detect when it is
seeing an inconsistent value.
@param LSN pointer to the LSN variable to read
@return LSN part (most significant byte always 0)
*/
#if ( SIZEOF_CHARP >= 8 )
/* 64-bit CPU, 64-bit reads are atomic */
#define lsn_read_non_atomic LSN_WITH_FLAGS_TO_LSN
#else
static inline LSN lsn_read_non_atomic_32(const volatile LSN *x)
{
/*
32-bit CPU, 64-bit reads may give a mixed of old half and new half (old
low bits and new high bits, or the contrary).
*/
for (;;) /* loop until no atomicity problems */
{
/*
Remove most significant byte in case this is a LSN_WITH_FLAGS object.
Those flags in TRN::first_undo_lsn break the condition on transitions so
they must be removed below.
*/
LSN y= LSN_WITH_FLAGS_TO_LSN(*x);
if (likely((y == LSN_IMPOSSIBLE) || LSN_VALID(y)))
return y;
}
}
#define lsn_read_non_atomic(x) lsn_read_non_atomic_32(&x)
#endif
...@@ -85,6 +85,7 @@ int maria_close(register MARIA_HA *info) ...@@ -85,6 +85,7 @@ int maria_close(register MARIA_HA *info)
not change the crashed state. not change the crashed state.
We can NOT write the state in other cases as other threads We can NOT write the state in other cases as other threads
may be using the file at this point may be using the file at this point
IF using --external-locking, which does not apply to Maria.
*/ */
if (share->mode != O_RDONLY && maria_is_crashed(info)) if (share->mode != O_RDONLY && maria_is_crashed(info))
_ma_state_info_write(share->kfile.file, &share->state, 1); _ma_state_info_write(share->kfile.file, &share->state, 1);
......
...@@ -40,15 +40,9 @@ ...@@ -40,15 +40,9 @@
#define CONTROL_FILE_FILENO_SIZE 4 #define CONTROL_FILE_FILENO_SIZE 4
#define CONTROL_FILE_SIZE (CONTROL_FILE_FILENO_OFFSET + CONTROL_FILE_FILENO_SIZE) #define CONTROL_FILE_SIZE (CONTROL_FILE_FILENO_OFFSET + CONTROL_FILE_FILENO_SIZE)
/* /* This module owns these two vars. */
This module owns these two vars. LSN last_checkpoint_lsn= LSN_IMPOSSIBLE;
uint32 is always atomically updated, but LSN is 8 bytes, we will need uint32 last_logno= FILENO_IMPOSSIBLE;
provisions to ensure that it's updated atomically in
ma_control_file_write_and_force(). Probably the log mutex could be
used. TODO.
*/
LSN last_checkpoint_lsn;
uint32 last_logno;
/** /**
@brief If log's lock should be asserted when writing to control file. @brief If log's lock should be asserted when writing to control file.
...@@ -65,16 +59,16 @@ my_bool maria_multi_threaded= FALSE; ...@@ -65,16 +59,16 @@ my_bool maria_multi_threaded= FALSE;
static int control_file_fd= -1; static int control_file_fd= -1;
/* /*
Initialize control file subsystem @brief Initialize control file subsystem
SYNOPSIS
ma_control_file_create_or_open()
Looks for the control file. If absent, it's a fresh start, creates file. Looks for the control file. If none and creation is requested, creates file.
If present, reads it to find out last checkpoint's LSN and last log, updates If present, reads it to find out last checkpoint's LSN and last log, updates
the last_checkpoint_lsn and last_logno global variables. the last_checkpoint_lsn and last_logno global variables.
Called at engine's start. Called at engine's start.
@param create_if_missing
@note
The format of the control file is: The format of the control file is:
4 bytes: magic string 4 bytes: magic string
4 bytes: checksum of the following bytes 4 bytes: checksum of the following bytes
...@@ -82,11 +76,11 @@ static int control_file_fd= -1; ...@@ -82,11 +76,11 @@ static int control_file_fd= -1;
4 bytes: offset in log where last checkpoint is 4 bytes: offset in log where last checkpoint is
4 bytes: number of last log 4 bytes: number of last log
RETURN @return Operation status
0 - OK @retval 0 OK
1 - Error (in which case the file is left closed) @retval 1 Error (in which case the file is left closed)
*/ */
CONTROL_FILE_ERROR ma_control_file_create_or_open() CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing)
{ {
char buffer[CONTROL_FILE_SIZE]; char buffer[CONTROL_FILE_SIZE];
char name[FN_REFLEN]; char name[FN_REFLEN];
...@@ -115,6 +109,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open() ...@@ -115,6 +109,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open()
if (create_file) if (create_file)
{ {
if (!create_if_missing)
DBUG_RETURN(CONTROL_FILE_MISSING);
if ((control_file_fd= my_create(name, 0, if ((control_file_fd= my_create(name, 0,
open_flags, MYF(MY_SYNC_DIR))) < 0) open_flags, MYF(MY_SYNC_DIR))) < 0)
DBUG_RETURN(CONTROL_FILE_UNKNOWN_ERROR); DBUG_RETURN(CONTROL_FILE_UNKNOWN_ERROR);
...@@ -136,8 +132,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open() ...@@ -136,8 +132,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open()
*/ */
/* init the file with these "undefined" values */ /* init the file with these "undefined" values */
DBUG_RETURN(ma_control_file_write_and_force(CONTROL_FILE_IMPOSSIBLE_LSN, DBUG_RETURN(ma_control_file_write_and_force(LSN_IMPOSSIBLE,
CONTROL_FILE_IMPOSSIBLE_FILENO, FILENO_IMPOSSIBLE,
CONTROL_FILE_UPDATE_ALL)); CONTROL_FILE_UPDATE_ALL));
} }
...@@ -315,8 +311,8 @@ int ma_control_file_end() ...@@ -315,8 +311,8 @@ int ma_control_file_end()
As this module owns these variables, closing the module forbids access to As this module owns these variables, closing the module forbids access to
them (just a safety): them (just a safety):
*/ */
last_checkpoint_lsn= CONTROL_FILE_IMPOSSIBLE_LSN; last_checkpoint_lsn= LSN_IMPOSSIBLE;
last_logno= CONTROL_FILE_IMPOSSIBLE_FILENO; last_logno= FILENO_IMPOSSIBLE;
DBUG_RETURN(close_error); DBUG_RETURN(close_error);
} }
...@@ -19,27 +19,17 @@ ...@@ -19,27 +19,17 @@
*/ */
#define CONTROL_FILE_BASE_NAME "maria_control" #define CONTROL_FILE_BASE_NAME "maria_control"
/*
indicate absence of the log file number; first log is always number 1, 0 is
impossible.
*/
#define CONTROL_FILE_IMPOSSIBLE_FILENO 0
/* logs always have a header */
#define CONTROL_FILE_IMPOSSIBLE_LOG_OFFSET 0
/* indicate absence of LSN. */
#define CONTROL_FILE_IMPOSSIBLE_LSN ((LSN)0)
/* Here is the interface of this module */ /* Here is the interface of this module */
/* /*
LSN of the last checkoint LSN of the last checkoint
(if last_checkpoint_lsn == CONTROL_FILE_IMPOSSIBLE_LSN (if last_checkpoint_lsn == LSN_IMPOSSIBLE then there was never a checkpoint)
then there was never a checkpoint)
*/ */
extern LSN last_checkpoint_lsn; extern LSN last_checkpoint_lsn;
/* /*
Last log number (if last_logno == Last log number (if last_logno == FILENO_IMPOSSIBLE then there is no log
CONTROL_FILE_IMPOSSIBLE_FILENO then there is no log file yet) file yet)
*/ */
extern uint32 last_logno; extern uint32 last_logno;
...@@ -51,6 +41,7 @@ typedef enum enum_control_file_error { ...@@ -51,6 +41,7 @@ typedef enum enum_control_file_error {
CONTROL_FILE_TOO_BIG, CONTROL_FILE_TOO_BIG,
CONTROL_FILE_BAD_MAGIC_STRING, CONTROL_FILE_BAD_MAGIC_STRING,
CONTROL_FILE_BAD_CHECKSUM, CONTROL_FILE_BAD_CHECKSUM,
CONTROL_FILE_MISSING,
CONTROL_FILE_UNKNOWN_ERROR /* any other error */ CONTROL_FILE_UNKNOWN_ERROR /* any other error */
} CONTROL_FILE_ERROR; } CONTROL_FILE_ERROR;
...@@ -63,11 +54,11 @@ extern "C" { ...@@ -63,11 +54,11 @@ extern "C" {
#endif #endif
/* /*
Looks for the control file. If absent, it's a fresh start, create file. Looks for the control file. If none and creation was requested, creates file.
If present, read it to find out last checkpoint's LSN and last log. If present, reads it to find out last checkpoint's LSN and last log.
Called at engine's start. Called at engine's start.
*/ */
CONTROL_FILE_ERROR ma_control_file_create_or_open(); CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool);
/* /*
Write information durably to the control file. Write information durably to the control file.
Called when we have created a new log (after syncing this log's creation) Called when we have created a new log (after syncing this log's creation)
......
This diff is collapsed.
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
/* This clears the status information and truncates files */ /* This clears the status information and truncates files */
#include "maria_def.h" #include "maria_def.h"
#include "trnman_public.h" #include "trnman.h"
/** /**
@brief deletes all rows from a table @brief deletes all rows from a table
...@@ -52,6 +52,25 @@ int maria_delete_all_rows(MARIA_HA *info) ...@@ -52,6 +52,25 @@ int maria_delete_all_rows(MARIA_HA *info)
if (_ma_mark_file_changed(info)) if (_ma_mark_file_changed(info))
goto err; goto err;
if (log_record)
{
/*
This record will be used by Recovery to finish the deletion if it
crashed. We force it because it's a non-undoable operation.
*/
LSN lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[FILEID_STORE_SIZE];
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (unlikely(translog_write_record(&lsn, LOGREC_REDO_DELETE_ALL,
info->trn, share, 0,
sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data) ||
translog_flush(lsn)))
goto err;
}
info->state->records=info->state->del=state->split=0; info->state->records=info->state->del=state->split=0;
state->changed= 0; /* File is optimized */ state->changed= 0; /* File is optimized */
state->dellink = HA_OFFSET_ERROR; state->dellink = HA_OFFSET_ERROR;
...@@ -78,6 +97,12 @@ int maria_delete_all_rows(MARIA_HA *info) ...@@ -78,6 +97,12 @@ int maria_delete_all_rows(MARIA_HA *info)
if (_ma_initialize_data_file(info->dfile.file, share)) if (_ma_initialize_data_file(info->dfile.file, share))
goto err; goto err;
/*
The operations above on the index/data file will be forced to disk at
Checkpoint or maria_close() time. So we can reset:
*/
info->trn->rec_lsn= LSN_IMPOSSIBLE;
VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE)); VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
#ifdef HAVE_MMAP #ifdef HAVE_MMAP
/* Resize mmaped area */ /* Resize mmaped area */
...@@ -85,38 +110,6 @@ int maria_delete_all_rows(MARIA_HA *info) ...@@ -85,38 +110,6 @@ int maria_delete_all_rows(MARIA_HA *info)
_ma_remap_file(info, (my_off_t)0); _ma_remap_file(info, (my_off_t)0);
rw_unlock(&info->s->mmap_lock); rw_unlock(&info->s->mmap_lock);
#endif #endif
if (log_record)
{
/* For now this record is only informative */
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE];
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= FILEID_STORE_SIZE;
if (unlikely(translog_write_record(&share->state.create_rename_lsn,
LOGREC_REDO_DELETE_ALL,
info->trn, share, 0,
sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data)))
goto err;
/*
store LSN into file. It is an optimization so that all old REDOs for
this table are ignored (scenario: checkpoint, INSERT1s, DELETE ALL;
INSERT2s, crash: then Recovery can skip INSERT1s). It also allows us to
ignore the present record at Recovery.
Note that storing the LSN could not be done by _ma_writeinfo() above as
the table is locked at this moment. So we need to do it by ourselves.
*/
lsn_store(log_data, share->state.create_rename_lsn);
if (my_pwrite(share->kfile.file, log_data, sizeof(log_data),
sizeof(share->state.header) + 2, MYF(MY_NABP)) ||
_ma_sync_table_files(info))
goto err;
/**
@todo RECOVERY Until we take into account the log record above
for log-low-water-mark calculation and use it in Recovery, we need
to sync above.
*/
}
allow_break(); /* Allow SIGHUP & SIGINT */ allow_break(); /* Allow SIGHUP & SIGINT */
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -125,9 +118,11 @@ int maria_delete_all_rows(MARIA_HA *info) ...@@ -125,9 +118,11 @@ int maria_delete_all_rows(MARIA_HA *info)
int save_errno=my_errno; int save_errno=my_errno;
VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE)); VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
info->update|=HA_STATE_WRITTEN; /* Buffer changed */ info->update|=HA_STATE_WRITTEN; /* Buffer changed */
/** @todo RECOVERY until we use the log record above we have to sync */ /**
if (log_record &&_ma_sync_table_files(info) && !save_errno) @todo RECOVERY if we come here, Recovery may later apply the REDO above,
save_errno= my_errno; which may be wrong. Not fixing it now, as anyway this way of deleting
rows will have to be re-examined when we have versioning.
*/
allow_break(); /* Allow SIGHUP & SIGINT */ allow_break(); /* Allow SIGHUP & SIGINT */
DBUG_RETURN(my_errno=save_errno); DBUG_RETURN(my_errno=save_errno);
} }
......
...@@ -78,9 +78,9 @@ int maria_delete_table(const char *name) ...@@ -78,9 +78,9 @@ int maria_delete_table(const char *name)
{ {
/* /*
For this log record to be of any use for Recovery, we need the upper For this log record to be of any use for Recovery, we need the upper
MySQL layer to be crash-safe in DDLs; when it is we should reconsider MySQL layer to be crash-safe in DDLs.
the moment of writing this log record, how to use it in Recovery, and For now this record can serve when we apply logs to a backup, so we sync
force the log. For now this record is only informative. it.
*/ */
LSN lsn; LSN lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
...@@ -91,7 +91,8 @@ int maria_delete_table(const char *name) ...@@ -91,7 +91,8 @@ int maria_delete_table(const char *name)
log_array[TRANSLOG_INTERNAL_PARTS + log_array[TRANSLOG_INTERNAL_PARTS +
0].length, 0].length,
sizeof(log_array)/sizeof(log_array[0]), sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL))) log_array, NULL) ||
translog_flush(lsn)))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
......
This diff is collapsed.
/* Copyright (C) 2007 MySQL AB & Sanja Belkin
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef _ma_loghandler_h
#define _ma_loghandler_h
/* transaction log default cache size (TODO: make it global variable) */ /* transaction log default cache size (TODO: make it global variable) */
#define TRANSLOG_PAGECACHE_SIZE 1024*1024*2 #define TRANSLOG_PAGECACHE_SIZE 1024*1024*2
/* transaction log default file size (TODO: make it global variable) */ /* transaction log default file size (TODO: make it global variable) */
...@@ -20,6 +38,7 @@ ...@@ -20,6 +38,7 @@
#define TRANSLOG_PAGE_SIZE (8*1024) #define TRANSLOG_PAGE_SIZE (8*1024)
#include "ma_loghandler_lsn.h" #include "ma_loghandler_lsn.h"
#include "trnman_public.h"
/* short transaction ID type */ /* short transaction ID type */
typedef uint16 SHORT_TRANSACTION_ID; typedef uint16 SHORT_TRANSACTION_ID;
...@@ -41,6 +60,10 @@ struct st_maria_share; ...@@ -41,6 +60,10 @@ struct st_maria_share;
#define page_store(T,A) int5store(T,A) #define page_store(T,A) int5store(T,A)
#define dirpos_store(T,A) ((*(uchar*) (T)) = A) #define dirpos_store(T,A) ((*(uchar*) (T)) = A)
#define pagerange_store(T,A) int2store(T,A) #define pagerange_store(T,A) int2store(T,A)
#define fileid_korr(P) uint2korr(P)
#define page_korr(P) uint5korr(P)
#define dirpos_korr(P) (P[0])
#define pagerange_korr(P) uint2korr(P)
/* /*
Length of disk drive sector size (we assume that writing it Length of disk drive sector size (we assume that writing it
...@@ -228,10 +251,102 @@ extern translog_size_t translog_read_next_record_header(TRANSLOG_SCANNER_DATA ...@@ -228,10 +251,102 @@ extern translog_size_t translog_read_next_record_header(TRANSLOG_SCANNER_DATA
*scanner, *scanner,
TRANSLOG_HEADER_BUFFER TRANSLOG_HEADER_BUFFER
*buff); *buff);
extern my_bool translog_lock();
extern my_bool translog_unlock();
extern void translog_lock_assert_owner(); extern void translog_lock_assert_owner();
extern TRANSLOG_ADDRESS translog_get_horizon(); extern TRANSLOG_ADDRESS translog_get_horizon();
extern int translog_assign_id_to_share(struct st_maria_share *share, extern int translog_assign_id_to_share(struct st_maria_share *share,
struct st_transaction *trn); struct st_transaction *trn);
extern void translog_deassign_id_from_share(struct st_maria_share *share); extern void translog_deassign_id_from_share(struct st_maria_share *share);
extern my_bool translog_inited; extern my_bool translog_inited;
/*
all the rest added because of recovery; should we make
ma_loghandler_for_recovery.h ?
*/
#define SHARE_ID_MAX 65535 /* array's size */
extern LSN first_lsn_in_log();
/* record parts descriptor */
struct st_translog_parts
{
/* full record length */
translog_size_t record_length;
/* full record length with chunk headers */
translog_size_t total_record_length;
/* current part index */
uint current;
/* total number of elements in parts */
uint elements;
/* array of parts (LEX_STRING) */
LEX_STRING *parts;
};
typedef my_bool(*prewrite_rec_hook) (enum translog_record_type type,
TRN *trn, struct st_maria_share *share,
struct st_translog_parts *parts);
typedef my_bool(*inwrite_rec_hook) (enum translog_record_type type,
TRN *trn,
LSN *lsn,
struct st_translog_parts *parts);
typedef uint16(*read_rec_hook) (enum translog_record_type type,
uint16 read_length, uchar *read_buff,
byte *decoded_buff);
/* record classes */
enum record_class
{
LOGRECTYPE_NOT_ALLOWED,
LOGRECTYPE_VARIABLE_LENGTH,
LOGRECTYPE_PSEUDOFIXEDLENGTH,
LOGRECTYPE_FIXEDLENGTH
};
/* C++ can't bear that a variable's name is "class" */
#ifndef __cplusplus
/*
Descriptor of log record type
Note: Don't reorder because of constructs later...
*/
typedef struct st_log_record_type_descriptor
{
/* internal class of the record */
enum record_class class;
/*
length for fixed-size record, pseudo-fixed record
length with uncompressed LSNs
*/
uint16 fixed_length;
/* how much record body (belonged to headers too) read with headers */
uint16 read_header_len;
/* HOOK for writing the record called before lock */
prewrite_rec_hook prewrite_hook;
/* HOOK for writing the record called when LSN is known, inside lock */
inwrite_rec_hook inwrite_hook;
/* HOOK for reading headers */
read_rec_hook read_hook;
/*
For pseudo fixed records number of compressed LSNs followed by
system header
*/
int16 compressed_LSN;
/* the rest is for maria_read_log & Recovery */
/** @brief for debug error messages or "maria_read_log" command-line tool */
const char *name;
my_bool record_ends_group;
/* a function to execute when we see the record during the REDO phase */
int (*record_execute_in_redo_phase)(const TRANSLOG_HEADER_BUFFER *);
/* a function to execute when we see the record during the UNDO phase */
int (*record_execute_in_undo_phase)(const TRANSLOG_HEADER_BUFFER *);
} LOG_DESC;
extern LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
#endif
C_MODE_END C_MODE_END
#endif
/* Copyright (C) 2007 MySQL AB & Sanja Belkin
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef _ma_loghandler_lsn_h #ifndef _ma_loghandler_lsn_h
#define _ma_loghandler_lsn_h #define _ma_loghandler_lsn_h
...@@ -24,7 +39,7 @@ typedef TRANSLOG_ADDRESS LSN; ...@@ -24,7 +39,7 @@ typedef TRANSLOG_ADDRESS LSN;
#define LSN_FILE_NO(L) ((L) >> 32) #define LSN_FILE_NO(L) ((L) >> 32)
/* Gets raw file number part of a LSN/log address */ /* Gets raw file number part of a LSN/log address */
#define LSN_FINE_NO_PART(L) ((L) & ((int64)0xFFFFFF00000000LL)) #define LSN_FILE_NO_PART(L) ((L) & ((int64)0xFFFFFF00000000LL))
/* Gets record offset of a LSN/log address */ /* Gets record offset of a LSN/log address */
#define LSN_OFFSET(L) ((L) & 0xFFFFFFFFL) #define LSN_OFFSET(L) ((L) & 0xFFFFFFFFL)
...@@ -33,7 +48,9 @@ typedef TRANSLOG_ADDRESS LSN; ...@@ -33,7 +48,9 @@ typedef TRANSLOG_ADDRESS LSN;
#define MAKE_LSN(F,S) ((((uint64)(F)) << 32) | (S)) #define MAKE_LSN(F,S) ((((uint64)(F)) << 32) | (S))
/* checks LSN */ /* checks LSN */
#define LSN_VALID(L) DBUG_ASSERT((L) >= 0 && (L) < (uint64)0xFFFFFFFFFFFFFFLL) #define LSN_VALID(L) \
((LSN_FILE_NO_PART(L) != FILENO_IMPOSSIBLE) && \
(LSN_OFFSET(L) != LOG_OFFSET_IMPOSSIBLE))
/* size of stored LSN on a disk, don't change it! */ /* size of stored LSN on a disk, don't change it! */
#define LSN_STORE_SIZE 7 #define LSN_STORE_SIZE 7
...@@ -51,7 +68,7 @@ typedef TRANSLOG_ADDRESS LSN; ...@@ -51,7 +68,7 @@ typedef TRANSLOG_ADDRESS LSN;
/* what we need to add to LSN to increase it on one file */ /* what we need to add to LSN to increase it on one file */
#define LSN_ONE_FILE ((int64)0x100000000LL) #define LSN_ONE_FILE ((int64)0x100000000LL)
#define LSN_REPLACE_OFFSET(L, S) (LSN_FINE_NO_PART(L) | (S)) #define LSN_REPLACE_OFFSET(L, S) (LSN_FILE_NO_PART(L) | (S))
/* /*
an 8-byte type whose most significant byte is used for "flags"; 7 an 8-byte type whose most significant byte is used for "flags"; 7
...@@ -61,4 +78,7 @@ typedef LSN LSN_WITH_FLAGS; ...@@ -61,4 +78,7 @@ typedef LSN LSN_WITH_FLAGS;
#define LSN_WITH_FLAGS_TO_LSN(x) (x & ULL(0x00FFFFFFFFFFFFFF)) #define LSN_WITH_FLAGS_TO_LSN(x) (x & ULL(0x00FFFFFFFFFFFFFF))
#define LSN_WITH_FLAGS_TO_FLAGS(x) (x & ULL(0xFF00000000000000)) #define LSN_WITH_FLAGS_TO_FLAGS(x) (x & ULL(0xFF00000000000000))
#define FILENO_IMPOSSIBLE 0 /**< log file's numbering starts at 1 */
#define LOG_OFFSET_IMPOSSIBLE 0 /**< log always has a header */
#define LSN_IMPOSSIBLE 0
#endif #endif
...@@ -587,7 +587,19 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -587,7 +587,19 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
share->base.pack_bytes + share->base.pack_bytes +
test(share->options & HA_OPTION_CHECKSUM)); test(share->options & HA_OPTION_CHECKSUM));
if (share->base.transactional) if (share->base.transactional)
{
share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE; share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE;
if (unlikely((share->state.create_rename_lsn == (LSN)ULONGLONG_MAX) &&
(open_flags & HA_OPEN_FROM_SQL_LAYER)))
{
/*
This table was repaired with maria_chk. Past log records should be
ignored, future log records should not: we define the present.
*/
share->state.create_rename_lsn= translog_get_horizon();
_ma_update_create_rename_lsn_on_disk(share, TRUE);
}
}
share->base.default_rec_buff_size= max(share->base.pack_reclength, share->base.default_rec_buff_size= max(share->base.pack_reclength,
share->base.max_key_length); share->base.max_key_length);
share->page_type= (share->base.transactional ? PAGECACHE_LSN_PAGE : share->page_type= (share->base.transactional ? PAGECACHE_LSN_PAGE :
......
...@@ -587,11 +587,7 @@ static uint pagecache_fwrite(PAGECACHE *pagecache, ...@@ -587,11 +587,7 @@ static uint pagecache_fwrite(PAGECACHE *pagecache,
DBUG_PRINT("info", ("Log handler call")); DBUG_PRINT("info", ("Log handler call"));
/* TODO: integrate with page format */ /* TODO: integrate with page format */
lsn= lsn_korr(buffer + PAGE_LSN_OFFSET); lsn= lsn_korr(buffer + PAGE_LSN_OFFSET);
/* DBUG_ASSERT(LSN_VALID(lsn));
check CONTROL_FILE_IMPOSSIBLE_FILENO &
CONTROL_FILE_IMPOSSIBLE_LOG_OFFSET
*/
DBUG_ASSERT(lsn != 0);
translog_flush(lsn); translog_flush(lsn);
} }
DBUG_RETURN(my_pwrite(filedesc->file, buffer, pagecache->block_size, DBUG_RETURN(my_pwrite(filedesc->file, buffer, pagecache->block_size,
...@@ -2474,7 +2470,7 @@ static void check_and_set_lsn(LSN lsn, PAGECACHE_BLOCK_LINK *block) ...@@ -2474,7 +2470,7 @@ static void check_and_set_lsn(LSN lsn, PAGECACHE_BLOCK_LINK *block)
lock lock change lock lock change
pin pin page pin pin page
first_REDO_LSN_for_page do not set it if it is zero first_REDO_LSN_for_page do not set it if it is zero
lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it lsn if it is not LSN_IMPOSSIBLE (0) and it
is bigger then LSN on the page it will be written on is bigger then LSN on the page it will be written on
the page the page
...@@ -2566,7 +2562,7 @@ void pagecache_unlock(PAGECACHE *pagecache, ...@@ -2566,7 +2562,7 @@ void pagecache_unlock(PAGECACHE *pagecache,
pagecache pointer to a page cache data structure pagecache pointer to a page cache data structure
file handler for the file for the block of data to be read file handler for the file for the block of data to be read
pageno number of the block of data in the file pageno number of the block of data in the file
lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it lsn if it is not LSN_IMPOSSIBLE (0) and it
is bigger then LSN on the page it will be written on is bigger then LSN on the page it will be written on
the page the page
*/ */
...@@ -2635,10 +2631,9 @@ void pagecache_unpin(PAGECACHE *pagecache, ...@@ -2635,10 +2631,9 @@ void pagecache_unpin(PAGECACHE *pagecache,
link direct link to page (returned by read or write) link direct link to page (returned by read or write)
lock lock change lock lock change
pin pin page pin pin page
first_REDO_LSN_for_page do not set it if it is zero first_REDO_LSN_for_page do not set it if it is LSN_IMPOSSIBLE (0)
lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it lsn if it is not LSN_IMPOSSIBLE and it is bigger then
is bigger then LSN on the page it will be written on LSN on the page it will be written on the page
the page
*/ */
void pagecache_unlock_by_link(PAGECACHE *pagecache, void pagecache_unlock_by_link(PAGECACHE *pagecache,
...@@ -2681,7 +2676,7 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache, ...@@ -2681,7 +2676,7 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache,
DBUG_ASSERT(pagecache->can_be_used); DBUG_ASSERT(pagecache->can_be_used);
inc_counter_for_resize_op(pagecache); inc_counter_for_resize_op(pagecache);
if (first_REDO_LSN_for_page) if (first_REDO_LSN_for_page != LSN_IMPOSSIBLE)
{ {
/* /*
LOCK_READ_UNLOCK is ok here as the page may have first locked LOCK_READ_UNLOCK is ok here as the page may have first locked
...@@ -2694,10 +2689,8 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache, ...@@ -2694,10 +2689,8 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache,
if (block->rec_lsn == 0) if (block->rec_lsn == 0)
block->rec_lsn= first_REDO_LSN_for_page; block->rec_lsn= first_REDO_LSN_for_page;
} }
if (lsn != 0) if (lsn != LSN_IMPOSSIBLE)
{
check_and_set_lsn(lsn, block); check_and_set_lsn(lsn, block);
}
if (make_lock_and_pin(pagecache, block, lock, pin)) if (make_lock_and_pin(pagecache, block, lock, pin))
DBUG_ASSERT(0); /* should not happend */ DBUG_ASSERT(0); /* should not happend */
...@@ -2726,7 +2719,7 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache, ...@@ -2726,7 +2719,7 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache,
pagecache_unpin_by_link() pagecache_unpin_by_link()
pagecache pointer to a page cache data structure pagecache pointer to a page cache data structure
link direct link to page (returned by read or write) link direct link to page (returned by read or write)
lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it lsn if it is not LSN_IMPOSSIBLE (0) and it
is bigger then LSN on the page it will be written on is bigger then LSN on the page it will be written on
the page the page
*/ */
......
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB /* Copyright (C) 2006,2007 MySQL AB
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
......
...@@ -60,13 +60,13 @@ int maria_rename(const char *old_name, const char *new_name) ...@@ -60,13 +60,13 @@ int maria_rename(const char *old_name, const char *new_name)
MY_SYNC_DIR : 0; MY_SYNC_DIR : 0;
if (sync_dir) if (sync_dir)
{ {
uchar log_data[LSN_STORE_SIZE]; uchar log_data[2 + 2];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 3]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
uint old_name_len= strlen(old_name), new_name_len= strlen(new_name); uint old_name_len= strlen(old_name), new_name_len= strlen(new_name);
int2store(log_data, old_name_len); int2store(log_data, old_name_len);
int2store(log_data + 2, new_name_len); int2store(log_data + 2, new_name_len);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= 2 + 2; log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (char *)old_name; log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (char *)old_name;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= old_name_len; log_array[TRANSLOG_INTERNAL_PARTS + 1].length= old_name_len;
log_array[TRANSLOG_INTERNAL_PARTS + 2].str= (char *)new_name; log_array[TRANSLOG_INTERNAL_PARTS + 2].str= (char *)new_name;
...@@ -76,15 +76,16 @@ int maria_rename(const char *old_name, const char *new_name) ...@@ -76,15 +76,16 @@ int maria_rename(const char *old_name, const char *new_name)
MySQL layer to be crash-safe, which it is not now (that would require MySQL layer to be crash-safe, which it is not now (that would require
work using the ddl_log of sql/sql_table.cc); when it is, we should work using the ddl_log of sql/sql_table.cc); when it is, we should
reconsider the moment of writing this log record (before or after op, reconsider the moment of writing this log record (before or after op,
under THR_LOCK_maria or not...), how to use it in Recovery, and force under THR_LOCK_maria or not...), how to use it in Recovery.
the log. For now this record is just informative. For now it can serve to apply logs to a backup so we sync it.
*/ */
if (unlikely(translog_write_record(&share->state.create_rename_lsn, if (unlikely(translog_write_record(&share->state.create_rename_lsn,
LOGREC_REDO_RENAME_TABLE, LOGREC_REDO_RENAME_TABLE,
&dummy_transaction_object, NULL, &dummy_transaction_object, NULL,
2 + 2 + old_name_len + new_name_len, 2 + 2 + old_name_len + new_name_len,
sizeof(log_array)/sizeof(log_array[0]), sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL))) log_array, NULL) ||
translog_flush(share->state.create_rename_lsn)))
{ {
maria_close(info); maria_close(info);
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -93,10 +94,7 @@ int maria_rename(const char *old_name, const char *new_name) ...@@ -93,10 +94,7 @@ int maria_rename(const char *old_name, const char *new_name)
store LSN into file, needed for Recovery to not be confused if a store LSN into file, needed for Recovery to not be confused if a
RENAME happened (applying REDOs to the wrong table). RENAME happened (applying REDOs to the wrong table).
*/ */
lsn_store(log_data, share->state.create_rename_lsn); if (_ma_update_create_rename_lsn_on_disk(share, TRUE))
if (my_pwrite(share->kfile.file, log_data, sizeof(log_data),
sizeof(share->state.header) + 2, MYF(MY_NABP)) ||
my_sync(share->kfile.file, MYF(MY_WME)))
{ {
maria_close(info); maria_close(info);
DBUG_RETURN(1); DBUG_RETURN(1);
......
...@@ -60,7 +60,7 @@ int main(int argc,char *argv[]) ...@@ -60,7 +60,7 @@ int main(int argc,char *argv[])
if (maria_init() || if (maria_init() ||
(init_pagecache(maria_pagecache, IO_SIZE*16, 0, 0, (init_pagecache(maria_pagecache, IO_SIZE*16, 0, 0,
maria_block_size) == 0) || maria_block_size) == 0) ||
ma_control_file_create_or_open() || ma_control_file_create_or_open(TRUE) ||
(init_pagecache(maria_log_pagecache, (init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0, TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) || TRANSLOG_PAGE_SIZE) == 0) ||
......
...@@ -224,7 +224,7 @@ int main(int argc, char *argv[]) ...@@ -224,7 +224,7 @@ int main(int argc, char *argv[])
/* Maria requires that we always have a page cache */ /* Maria requires that we always have a page cache */
if ((init_pagecache(maria_pagecache, pagecache_size, 0, 0, if ((init_pagecache(maria_pagecache, pagecache_size, 0, 0,
maria_block_size) == 0) || maria_block_size) == 0) ||
ma_control_file_create_or_open() || ma_control_file_create_or_open(TRUE) ||
(init_pagecache(maria_log_pagecache, (init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0, TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) || TRANSLOG_PAGE_SIZE) == 0) ||
......
...@@ -1026,6 +1026,13 @@ static int maria_chk(HA_CHECK *param, my_string filename) ...@@ -1026,6 +1026,13 @@ static int maria_chk(HA_CHECK *param, my_string filename)
} }
if (!error) if (!error)
{ {
/*
Tell the server's Recovery to ignore old REDOs on this table; we don't
know what the log's end LSN is now, so we just let the server know
that it will have to find and store it.
*/
if (share->base.transactional)
share->state.create_rename_lsn= (LSN)ULONGLONG_MAX;
if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) && if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) &&
(maria_is_any_key_active(share->state.key_map) || (maria_is_any_key_active(share->state.key_map) ||
(rep_quick && !param->keys_in_use && !recreate)) && (rep_quick && !param->keys_in_use && !recreate)) &&
......
...@@ -886,13 +886,13 @@ void _ma_remap_file(MARIA_HA *info, my_off_t size); ...@@ -886,13 +886,13 @@ void _ma_remap_file(MARIA_HA *info, my_off_t size);
MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info, const byte *record); MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info, const byte *record);
my_bool _ma_write_abort_default(MARIA_HA *info); my_bool _ma_write_abort_default(MARIA_HA *info);
/* Functions needed by _ma_check (are overrided in MySQL) */
C_MODE_START C_MODE_START
int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info);
/* Functions needed by _ma_check (are overrided in MySQL) */
volatile int *_ma_killed_ptr(HA_CHECK *param); volatile int *_ma_killed_ptr(HA_CHECK *param);
void _ma_check_print_error _VARARGS((HA_CHECK *param, const char *fmt, ...)); void _ma_check_print_error _VARARGS((HA_CHECK *param, const char *fmt, ...));
void _ma_check_print_warning _VARARGS((HA_CHECK *param, const char *fmt, ...)); void _ma_check_print_warning _VARARGS((HA_CHECK *param, const char *fmt, ...));
void _ma_check_print_info _VARARGS((HA_CHECK *param, const char *fmt, ...)); void _ma_check_print_info _VARARGS((HA_CHECK *param, const char *fmt, ...));
int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info);
C_MODE_END C_MODE_END
int _ma_flush_pending_blocks(MARIA_SORT_PARAM *param); int _ma_flush_pending_blocks(MARIA_SORT_PARAM *param);
...@@ -909,6 +909,7 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages, ...@@ -909,6 +909,7 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
ulong); ulong);
int _ma_sync_table_files(const MARIA_HA *info); int _ma_sync_table_files(const MARIA_HA *info);
int _ma_initialize_data_file(File dfile, MARIA_SHARE *share); int _ma_initialize_data_file(File dfile, MARIA_SHARE *share);
int _ma_update_create_rename_lsn_on_disk(MARIA_SHARE *share, my_bool do_sync);
void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn); void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn);
......
This diff is collapsed.
...@@ -20,6 +20,9 @@ ...@@ -20,6 +20,9 @@
to include my_atomic.h in C++ code. to include my_atomic.h in C++ code.
*/ */
#ifndef _trnman_public_h
#define _trnman_public_h
#include "ma_loghandler_lsn.h" #include "ma_loghandler_lsn.h"
C_MODE_START C_MODE_START
...@@ -52,3 +55,4 @@ my_bool trnman_has_locked_tables(TRN *trn); ...@@ -52,3 +55,4 @@ my_bool trnman_has_locked_tables(TRN *trn);
void trnman_reset_locked_tables(TRN *trn); void trnman_reset_locked_tables(TRN *trn);
C_MODE_END C_MODE_END
#endif
...@@ -121,8 +121,8 @@ static int delete_file(myf my_flags) ...@@ -121,8 +121,8 @@ static int delete_file(myf my_flags)
The error will however be printed on stderr. The error will however be printed on stderr.
*/ */
my_delete(file_name, my_flags); my_delete(file_name, my_flags);
expect_checkpoint_lsn= CONTROL_FILE_IMPOSSIBLE_LSN; expect_checkpoint_lsn= LSN_IMPOSSIBLE;
expect_logno= CONTROL_FILE_IMPOSSIBLE_FILENO; expect_logno= FILENO_IMPOSSIBLE;
return 0; return 0;
} }
...@@ -146,9 +146,9 @@ static int verify_module_values_match_expected() ...@@ -146,9 +146,9 @@ static int verify_module_values_match_expected()
*/ */
static int verify_module_values_are_impossible() static int verify_module_values_are_impossible()
{ {
RET_ERR_UNLESS(last_logno == CONTROL_FILE_IMPOSSIBLE_FILENO); RET_ERR_UNLESS(last_logno == FILENO_IMPOSSIBLE);
RET_ERR_UNLESS(last_checkpoint_lsn == RET_ERR_UNLESS(last_checkpoint_lsn ==
CONTROL_FILE_IMPOSSIBLE_LSN); LSN_IMPOSSIBLE);
return 0; return 0;
} }
...@@ -164,7 +164,7 @@ static int close_file() ...@@ -164,7 +164,7 @@ static int close_file()
static int create_or_open_file() static int create_or_open_file()
{ {
RET_ERR_UNLESS(ma_control_file_create_or_open() == CONTROL_FILE_OK); RET_ERR_UNLESS(ma_control_file_create_or_open(TRUE) == CONTROL_FILE_OK);
/* Check that the module reports expected information */ /* Check that the module reports expected information */
RET_ERR_UNLESS(verify_module_values_match_expected() == 0); RET_ERR_UNLESS(verify_module_values_match_expected() == 0);
return 0; return 0;
...@@ -188,7 +188,7 @@ static int test_one_log() ...@@ -188,7 +188,7 @@ static int test_one_log()
RET_ERR_UNLESS(create_or_open_file() == CONTROL_FILE_OK); RET_ERR_UNLESS(create_or_open_file() == CONTROL_FILE_OK);
objs_to_write= CONTROL_FILE_UPDATE_ONLY_LOGNO; objs_to_write= CONTROL_FILE_UPDATE_ONLY_LOGNO;
expect_logno= 123; expect_logno= 123;
RET_ERR_UNLESS(write_file(CONTROL_FILE_IMPOSSIBLE_LSN, RET_ERR_UNLESS(write_file(LSN_IMPOSSIBLE,
expect_logno, expect_logno,
objs_to_write) == 0); objs_to_write) == 0);
RET_ERR_UNLESS(close_file() == 0); RET_ERR_UNLESS(close_file() == 0);
...@@ -206,7 +206,7 @@ static int test_five_logs() ...@@ -206,7 +206,7 @@ static int test_five_logs()
for (i= 0; i<5; i++) for (i= 0; i<5; i++)
{ {
expect_logno*= 3; expect_logno*= 3;
RET_ERR_UNLESS(write_file(CONTROL_FILE_IMPOSSIBLE_LSN, expect_logno, RET_ERR_UNLESS(write_file(LSN_IMPOSSIBLE, expect_logno,
objs_to_write) == 0); objs_to_write) == 0);
} }
RET_ERR_UNLESS(close_file() == 0); RET_ERR_UNLESS(close_file() == 0);
...@@ -320,7 +320,7 @@ static int test_bad_magic_string() ...@@ -320,7 +320,7 @@ static int test_bad_magic_string()
RET_ERR_UNLESS(my_pwrite(fd, "papa", 4, 0, MYF(MY_FNABP | MY_WME)) == 0); RET_ERR_UNLESS(my_pwrite(fd, "papa", 4, 0, MYF(MY_FNABP | MY_WME)) == 0);
/* Check that control file module sees the problem */ /* Check that control file module sees the problem */
RET_ERR_UNLESS(ma_control_file_create_or_open() == RET_ERR_UNLESS(ma_control_file_create_or_open(TRUE) ==
CONTROL_FILE_BAD_MAGIC_STRING); CONTROL_FILE_BAD_MAGIC_STRING);
/* Restore magic string */ /* Restore magic string */
RET_ERR_UNLESS(my_pwrite(fd, buffer, 4, 0, MYF(MY_FNABP | MY_WME)) == 0); RET_ERR_UNLESS(my_pwrite(fd, buffer, 4, 0, MYF(MY_FNABP | MY_WME)) == 0);
...@@ -346,7 +346,7 @@ static int test_bad_checksum() ...@@ -346,7 +346,7 @@ static int test_bad_checksum()
buffer[0]+= 3; /* mangle checksum */ buffer[0]+= 3; /* mangle checksum */
RET_ERR_UNLESS(my_pwrite(fd, buffer, 1, 8, MYF(MY_FNABP | MY_WME)) == 0); RET_ERR_UNLESS(my_pwrite(fd, buffer, 1, 8, MYF(MY_FNABP | MY_WME)) == 0);
/* Check that control file module sees the problem */ /* Check that control file module sees the problem */
RET_ERR_UNLESS(ma_control_file_create_or_open() == RET_ERR_UNLESS(ma_control_file_create_or_open(TRUE) ==
CONTROL_FILE_BAD_CHECKSUM); CONTROL_FILE_BAD_CHECKSUM);
/* Restore checksum */ /* Restore checksum */
buffer[0]-= 3; buffer[0]-= 3;
...@@ -369,10 +369,11 @@ static int test_bad_size() ...@@ -369,10 +369,11 @@ static int test_bad_size()
MYF(MY_WME))) >= 0); MYF(MY_WME))) >= 0);
RET_ERR_UNLESS(my_write(fd, buffer, 10, MYF(MY_FNABP | MY_WME)) == 0); RET_ERR_UNLESS(my_write(fd, buffer, 10, MYF(MY_FNABP | MY_WME)) == 0);
/* Check that control file module sees the problem */ /* Check that control file module sees the problem */
RET_ERR_UNLESS(ma_control_file_create_or_open() == CONTROL_FILE_TOO_SMALL); RET_ERR_UNLESS(ma_control_file_create_or_open(TRUE) ==
CONTROL_FILE_TOO_SMALL);
RET_ERR_UNLESS(my_write(fd, buffer, 30, MYF(MY_FNABP | MY_WME)) == 0); RET_ERR_UNLESS(my_write(fd, buffer, 30, MYF(MY_FNABP | MY_WME)) == 0);
/* Check that control file module sees the problem */ /* Check that control file module sees the problem */
RET_ERR_UNLESS(ma_control_file_create_or_open() == CONTROL_FILE_TOO_BIG); RET_ERR_UNLESS(ma_control_file_create_or_open(TRUE) == CONTROL_FILE_TOO_BIG);
RET_ERR_UNLESS(my_close(fd, MYF(MY_WME)) == 0); RET_ERR_UNLESS(my_close(fd, MYF(MY_WME)) == 0);
/* Leave a correct control file */ /* Leave a correct control file */
......
...@@ -164,7 +164,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -164,7 +164,7 @@ int main(int argc __attribute__((unused)), char *argv[])
} }
#endif #endif
if (ma_control_file_create_or_open()) if (ma_control_file_create_or_open(TRUE))
{ {
fprintf(stderr, "Can't init control file (%d)\n", errno); fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1); exit(1);
...@@ -336,7 +336,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -336,7 +336,7 @@ int main(int argc __attribute__((unused)), char *argv[])
ma_control_file_end(); ma_control_file_end();
if (ma_control_file_create_or_open()) if (ma_control_file_create_or_open(TRUE))
{ {
fprintf(stderr, "pass2: Can't init control file (%d)\n", errno); fprintf(stderr, "pass2: Can't init control file (%d)\n", errno);
exit(1); exit(1);
...@@ -398,7 +398,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -398,7 +398,7 @@ int main(int argc __attribute__((unused)), char *argv[])
i, errno); i, errno);
goto err; goto err;
} }
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN) if (rec.lsn == LSN_IMPOSSIBLE)
{ {
if (i != ITERATIONS) if (i != ITERATIONS)
{ {
...@@ -477,7 +477,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -477,7 +477,7 @@ int main(int argc __attribute__((unused)), char *argv[])
"failed (%d)\n", i, errno); "failed (%d)\n", i, errno);
goto err; goto err;
} }
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN) if (rec.lsn == LSN_IMPOSSIBLE)
{ {
fprintf(stderr, "EOL met at the middle of iteration (first var) %u " fprintf(stderr, "EOL met at the middle of iteration (first var) %u "
"instead of beginning of %u\n", i, ITERATIONS); "instead of beginning of %u\n", i, ITERATIONS);
...@@ -572,7 +572,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -572,7 +572,7 @@ int main(int argc __attribute__((unused)), char *argv[])
i, errno); i, errno);
goto err; goto err;
} }
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN) if (rec.lsn == LSN_IMPOSSIBLE)
{ {
fprintf(stderr, "EOL met at the middle of iteration %u " fprintf(stderr, "EOL met at the middle of iteration %u "
"instead of beginning of %u\n", i, ITERATIONS); "instead of beginning of %u\n", i, ITERATIONS);
......
...@@ -161,7 +161,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -161,7 +161,7 @@ int main(int argc __attribute__((unused)), char *argv[])
} }
#endif #endif
if (ma_control_file_create_or_open()) if (ma_control_file_create_or_open(TRUE))
{ {
fprintf(stderr, "Can't init control file (%d)\n", errno); fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1); exit(1);
...@@ -325,7 +325,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -325,7 +325,7 @@ int main(int argc __attribute__((unused)), char *argv[])
end_pagecache(&pagecache, 1); end_pagecache(&pagecache, 1);
ma_control_file_end(); ma_control_file_end();
if (ma_control_file_create_or_open()) if (ma_control_file_create_or_open(TRUE))
{ {
fprintf(stderr, "pass2: Can't init control file (%d)\n", errno); fprintf(stderr, "pass2: Can't init control file (%d)\n", errno);
exit(1); exit(1);
...@@ -390,7 +390,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -390,7 +390,7 @@ int main(int argc __attribute__((unused)), char *argv[])
translog_free_record_header(&rec); translog_free_record_header(&rec);
goto err; goto err;
} }
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN) if (rec.lsn == LSN_IMPOSSIBLE)
{ {
if (i != ITERATIONS) if (i != ITERATIONS)
{ {
...@@ -470,7 +470,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -470,7 +470,7 @@ int main(int argc __attribute__((unused)), char *argv[])
"failed (%d)\n", i, errno); "failed (%d)\n", i, errno);
goto err; goto err;
} }
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN) if (rec.lsn == LSN_IMPOSSIBLE)
{ {
fprintf(stderr, "EOL met at the middle of iteration (first var) %u " fprintf(stderr, "EOL met at the middle of iteration (first var) %u "
"instead of beginning of %u\n", i, ITERATIONS); "instead of beginning of %u\n", i, ITERATIONS);
...@@ -568,7 +568,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -568,7 +568,7 @@ int main(int argc __attribute__((unused)), char *argv[])
translog_free_record_header(&rec); translog_free_record_header(&rec);
goto err; goto err;
} }
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN) if (rec.lsn == LSN_IMPOSSIBLE)
{ {
fprintf(stderr, "EOL met at the middle of iteration %u " fprintf(stderr, "EOL met at the middle of iteration %u "
"instead of beginning of %u\n", i, ITERATIONS); "instead of beginning of %u\n", i, ITERATIONS);
......
...@@ -270,7 +270,7 @@ int main(int argc __attribute__((unused)), ...@@ -270,7 +270,7 @@ int main(int argc __attribute__((unused)),
my_thread_global_init(); my_thread_global_init();
if (ma_control_file_create_or_open()) if (ma_control_file_create_or_open(TRUE))
{ {
fprintf(stderr, "Can't init control file (%d)\n", errno); fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1); exit(1);
...@@ -384,7 +384,7 @@ int main(int argc __attribute__((unused)), ...@@ -384,7 +384,7 @@ int main(int argc __attribute__((unused)),
translog_free_record_header(&rec); translog_free_record_header(&rec);
goto err; goto err;
} }
if (rec.lsn == CONTROL_FILE_IMPOSSIBLE_LSN) if (rec.lsn == LSN_IMPOSSIBLE)
{ {
if (i != WRITERS * ITERATIONS * 2) if (i != WRITERS * ITERATIONS * 2)
{ {
......
...@@ -56,7 +56,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -56,7 +56,7 @@ int main(int argc __attribute__((unused)), char *argv[])
} }
#endif #endif
if (ma_control_file_create_or_open()) if (ma_control_file_create_or_open(TRUE))
{ {
fprintf(stderr, "Can't init control file (%d)\n", errno); fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1); exit(1);
......
...@@ -75,6 +75,7 @@ int mi_close(register MI_INFO *info) ...@@ -75,6 +75,7 @@ int mi_close(register MI_INFO *info)
not change the crashed state. not change the crashed state.
We can NOT write the state in other cases as other threads We can NOT write the state in other cases as other threads
may be using the file at this point may be using the file at this point
IF using --external-locking.
*/ */
if (share->mode != O_RDONLY && mi_is_crashed(info)) if (share->mode != O_RDONLY && mi_is_crashed(info))
mi_state_info_write(share->kfile, &share->state, 1); mi_state_info_write(share->kfile, &share->state, 1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment